Compare commits

7 Commits
renee ... main

Author SHA1 Message Date
lusixing
070cb2696e updated_readme 2026-02-08 17:20:00 -08:00
Ada
bd5cd5eea9 fix(ai): allow multimodal message content to fix image upload 422
- AIMessage.content: Union[str, List[dict]] for text-only and multimodal (e.g. image_url)
- Image requests with content array now pass validation and are forwarded to AI
2026-02-04 17:15:00 -08:00
lusixing
9cbd3347f6 update_260202-3 2026-02-02 22:22:11 -08:00
lusixing
106c3421e6 added_user_serach 2026-01-31 17:52:00 -08:00
Ada
5f047fd95f Update requirements.txt 2026-01-31 16:52:42 -08:00
Ada
27d6c40178 Update .gitignore 2026-01-31 16:52:20 -08:00
lusixing
e2c05af372 backend_update_260131 2026-01-31 16:29:57 -08:00
18 changed files with 532 additions and 219 deletions

7
.gitignore vendored
View File

@@ -1,3 +1,8 @@
gitignore gitignore
# Python virtual environments # Python virtual environments
venv/ venv/
# Python cache
__pycache__/
app/__pycache__/
*.pyc

103
README.md
View File

@@ -1,86 +1,103 @@
# CypherLegacy Backend # CypherLegacy Backend
CypherLegacy 是一个基于 FastAPI 构建的数字遗产继承系统后端。它允许用户安全地存储加密的数字资产,并指定在用户去世(被管理员确认状态)后可以由特定的继承人领取这些资产。 CypherLegacy is a secure digital legacy inheritance system backend built with FastAPI. It allows users to safely store encrypted digital assets and designate specific heirs who can claim these assets after the user's status is confirmed as "deceased" by an administrator.
## 🌟 核心功能 ## 🌟 Core Features
- **安全注册与认证**: 采用 RSA 密钥对生成机制。用户注册时自动生成公私钥对,公钥用于加密资产,私钥用于解密。 - **Secure Authentication**: Registration automatically generates an RSA key pair (public/private). Public keys are used for encryption, while private keys are used for secure decryption.
- **数字资产加密存储**: 用户可以上传资产,系统使用用户的公钥对资产内容进行加密存储。 - **Encrypted Asset Storage**: Users can upload assets that are encrypted using their unique public key before storage.
- **遗产指定**: 用户可以为每个资产指定一名继承人Heir - **Legacy Designation**: Users can assign specific heirs (via email) to each of their digital assets.
- **吊唁/状态确认**: 系统管理员有权标记用户为“已故”guale - **Status Monitoring**: Administrators can officially declare a user as "deceased" (`guale`), triggering the inheritance process.
- **遗产申领**: - **Inheritance Claiming**:
- 只有被确认标记为“已故”的用户,其指定的继承人才能申领资产。 - Designated heirs can claim assets only after the owner's status is verified.
- 申领过程中,系统利用已故用户的私钥解密内容并安全交付给继承人。 - The system securely decrypts the content using the deceased user's private key for the authorized heir.
- **密钥分片 (Sharding)**: 支持私钥分片存储逻辑,增强安全性。 - **AI Proxy Service**: A built-in proxy for interacting with AI models with role-based configurations and weekly quota/token management.
- **Subscription Tiers**: Multi-tier subscription system (Free, Pro, etc.) controlling limits on heirs, AI usage, and more.
- **Last Active Tracking**: Automatically tracks user activity to help monitor status.
## 🛠 技术栈 ## 🛠 Technology Stack
- **框架**: [FastAPI](https://fastapi.tiangolo.com/) - **Framework**: [FastAPI](https://fastapi.tiangolo.com/)
- **数据库**: [PostgreSQL](https://www.postgresql.org/) (通过 `asyncpg` 异步驱动) - **Database**: [PostgreSQL](https://www.postgresql.org/) (via `asyncpg` async driver)
- **ORM**: [SQLAlchemy 2.0](https://www.sqlalchemy.org/) (AsyncIO) - **ORM**: [SQLAlchemy 2.0](https://www.sqlalchemy.org/) (AsyncIO)
- **加密**: [Cryptography](https://cryptography.io/) & [PyCryptodome](https://pycryptodome.org/) (RSA 加密) - **Encryption**: [Cryptography](https://cryptography.io/) & [PyCryptodome](https://pycryptodome.org/) (RSA Encryption)
- **权限**: [python-jose](https://github.com/mpdavis/python-jose) (JWT Token) - **Authentication**: [python-jose](https://github.com/mpdavis/python-jose) (JWT Tokens) & Passlib (Bcrypt/Argon2)
- **Request Client**: [HTTPX](https://www.python-httpx.org/) (for AI Proxy)
## 🚀 快速开始 ## 🚀 Getting Started
### 1. 使用 Docker Compose 启动 (推荐) ### 1. Using Docker Compose (Recommended)
这是最快的方式,会自动配置数据库和应用环境。 This is the fastest way to get the system running with a pre-configured database.
```bash ```bash
docker-compose up --build docker-compose up --build
``` ```
简单测试:
- **API Documentation**: `http://localhost:8000/docs`
- **Default Admin**: `admin` / `admin123`
### 🧪 Running Tests
Once the service is up, you can run the automated test scenario:
```bash ```bash
python3 test/test_scenario.py python3 test/test_scenario.py
``` ```
- 接口文档: `http://localhost:8000/docs` ### 2. Local Manual Setup
- 管理员默认账号: `admin` / `admin123`
### 2. 本地手动启动 1. **Environment Setup**:
1. **安装环境**:
```bash ```bash
python -m venv venv python -m venv venv
source venv/bin/activate # Linux/macOS source venv/bin/activate # Linux/macOS
# or venv\Scripts\activate on Windows
pip install -r requirements.txt pip install -r requirements.txt
``` ```
2. **配置环境变量**: 2. **Configuration**:
创建一个 `.env` 文件或设置环境变量 `DATABASE_URL`。默认为: Create a `.env` file or set the `DATABASE_URL` environment variable. Default:
`postgresql+asyncpg://user:password@localhost:5432/fastapi_db` `postgresql+asyncpg://user:password@localhost:5432/fastapi_db`
3. **初始化数据库**: 3. **Database Initialization**:
```bash ```bash
python reset_db.py python reset_db.py
``` ```
4. **运行服务**: 4. **Run Service**:
```bash ```bash
uvicorn app.main:app --reload uvicorn app.main:app --reload
``` ```
## 📖 API 概览 ## 📖 API Overview
### 用户接口 ### User & Auth
- `POST /register`: 用户注册,自动生成 RSA 密钥对。 - `POST /register`: Register and generate RSA keys.
- `POST /token`: 登录获取 JWT Access Token - `POST /login`: Login and receive JWT access token.
- `GET /users/search`: Search for users by username or email.
### 资产接口 ### Asset Management
- `POST /assets/`: 创建加密资产(上传者自动加密)。 - `GET /assets/get`: Retrieve assets owned by the current user.
- `POST /assets/assign`: 为资产指定继承人。 - `POST /assets/create`: Create a new encrypted asset.
- `POST /assets/claim`: 继承人申领资产(需原所有人状态为 `guale`)。 - `POST /assets/assign`: Assign or unassign an heir to an asset.
- `POST /assets/delete`: Remove an asset.
- `GET /assets/designated`: List assets where the user is the designated heir.
### 管理员接口 ### Inheritance
- `POST /admin/declare-guale`: 管理员宣告用户“已故”。 - `POST /assets/claim`: Claim an asset (requires owner to be marked as deceased).
## 🔒 安全设计 ### AI & Roles
- `POST /ai/proxy`: Proxy requests to external AI providers with quota tracking.
- `GET /get_ai_roles`: Retrieve available AI personas/roles.
1. **端到端思想**: 资产在存储前使用 RSA 公钥加密。 ### Admin
2. **状态验证**: 申领逻辑严格校验 `heir_id` 以及 `author.guale` 状态。 - `POST /admin/declare-guale`: (Admin Only) Declare a user as deceased.
3. **密钥管理**: 本项目目前为了演示方便在数据库中存储了私钥,在生产环境下建议配合 KMS 或硬件安全模块使用。
## 📜 许可证 ## 🔒 Security Design
1. **End-to-End Principles**: Sensitive assets are encrypted before storage.
2. **State Verification**: Claim logic strictly validates the `heir_id` and the `deceased` status of the owner.
3. **Key Management**: For demonstration, private keys are stored in the database. In a production environment, integration with a KMS (Key Management Service) or HSM (Hardware Security Module) is highly recommended.
## 📜 License
MIT License MIT License

Binary file not shown.

Binary file not shown.

View File

@@ -1,131 +0,0 @@
!pip install -q langgraph-checkpoint-sqlite langchain_google_genai
import sqlite3
from google.colab import userdata
from typing import Literal, TypedDict, Annotated
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.messages import SystemMessage, HumanMessage, RemoveMessage, AnyMessage
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.graph.message import add_messages
from langchain_core.runnables import RunnableConfig
from typing import Union, List, Dict
# --- 1. 状态定义 ---
class State(TypedDict):
messages: Annotated[list[AnyMessage], add_messages]
summary: str # 永久存储在数据库中的摘要内容
# --- 2. 核心逻辑 ---
llm = ChatGoogleGenerativeAI(model="gemini-2.5-flash", temperature = 0.7, google_api_key='') #在这里倒入API key
def call_model(state: State, config: RunnableConfig):
"""对话节点:融合了动态 Prompt 和 长期摘要"""
# 获取当前 session 特有的 System Prompt如果没传则使用默认
configurable = config.get("configurable", {})
system_base_prompt = configurable.get("system_prompt", "你是一个通用的 AI 助手。")
# 构造当前上下文
summary = state.get("summary", "")
if summary:
system_base_prompt += f"\n\n<context_summary>\n{summary}\n</context_summary>"
messages = [SystemMessage(content=system_base_prompt)] + state["messages"]
response = llm.invoke(messages)
return {"messages": [response]}
def summarize_conversation(state: State):
"""总结节点:负责更新摘要并清理过期消息"""
summary = state.get("summary", "")
messages_to_summarize = state["messages"][:-1]
# If there's nothing to summarize yet, just END
if not messages_to_summarize:
return {"summary": summary}
system_prompt = (
"你是一个记忆管理专家。请更新摘要,合并新旧信息。"
"1. 保持简练,仅保留事实(姓名、偏好、核心议题)。"
"2. 如果新消息包含对旧信息的修正,请更新它。"
"3. 如果对话中包含图片描述,请将图片的关键视觉信息也记录在摘要中"
)
summary_input = f"现有摘要: {summary}\n\n待加入的新信息: {messages_to_summarize}"
# Invoke model to get new condensed summary
response = llm.invoke([
SystemMessage(content=system_prompt),
HumanMessage(content=summary_input)
])
# Important: Create RemoveMessage objects for all messages that were summarized
delete_messages = [RemoveMessage(id=m.id) for m in messages_to_summarize if m.id]
return {
"summary": response.content,
"messages": delete_messages
}
def should_continue(state: State) -> Literal["summarize", END]:
"""如果消息累积超过3条则去总结节点"""
if len(state["messages"]) > 3: #changed
return "summarize"
return END
# --- 3. 构建图 ---
db_path = "multi_session_chat.sqlite"
conn = sqlite3.connect(db_path, check_same_thread=False)
memory = SqliteSaver(conn)
workflow = StateGraph(State)
workflow.add_node("chatbot", call_model)
workflow.add_node("summarize", summarize_conversation)
workflow.add_edge(START, "chatbot")
workflow.add_conditional_edges("chatbot", should_continue)
workflow.add_edge("summarize", END)
app = workflow.compile(checkpointer=memory)
def chat(thread_id: str, system_prompt: str, user_content: Union[str, List[Dict]]):
"""
Processes a single user message and returns the AI response,
persisting memory via the thread_id.
"""
config = {
"configurable": {
"thread_id": thread_id,
"system_prompt": system_prompt
}
}
# Prepare the input for this specific turn
input_data = {"messages": [HumanMessage(content=user_content)]}
ai_response = ""
# Stream the values to get the final AI message
for event in app.stream(input_data, config, stream_mode="values"):
if "messages" in event:
last_msg = event["messages"][-1]
if last_msg.type == "ai":
ai_response = last_msg.content
return ai_response
# 使用范例
# if __name__ == "__main__":
# tid = "py_expert_001"
# sys_p = "你是个善解人意的机器人。"
# # Call 1: Establish context
# resp1 = chat(tid, sys_p, "你好,我叫小明。")
# print(f"Bot: {resp1}")
# # Call 2: Test memory (The model should remember the name '小明')
# resp2 = chat(tid, sys_p, "我今天很开心")
# print(f"Bot: {resp2}")

View File

@@ -24,12 +24,62 @@ async def init_db():
# 自动创建表 # 自动创建表
await conn.run_sync(Base.metadata.create_all) await conn.run_sync(Base.metadata.create_all)
# 创建默认管理员用户
async with AsyncSessionLocal() as session: async with AsyncSessionLocal() as session:
from . import auth from . import auth
from sqlalchemy.future import select from sqlalchemy.future import select
# 检查是否已存在 admin 用户 # 1. 检查并创建默认订阅级别 (MUST BE FIRST because of FK constraints)
tiers = [
{
"name": "Free",
"max_heirs": 1,
"weekly_token_limit": 1000,
"max_assets": 5,
"max_storage_mb": 10,
"can_use_ai_proxy": False,
"description": "Standard free tier"
},
{
"name": "Pro",
"max_heirs": 5,
"weekly_token_limit": 10000,
"max_assets": 50,
"max_storage_mb": 100,
"can_use_ai_proxy": True,
"description": "Professional tier for active users"
},
{
"name": "Ultra",
"max_heirs": 100,
"weekly_token_limit": 100000,
"max_assets": 1000,
"max_storage_mb": 1024,
"can_use_ai_proxy": True,
"description": "Ultimate tier for power users"
},
{
"name": "Unlimited",
"max_heirs": 9999,
"weekly_token_limit": 999999,
"max_assets": 9999,
"max_storage_mb": 999999,
"can_use_ai_proxy": True,
"description": "Internal unlimited tier"
}
]
for tier_data in tiers:
result = await session.execute(
select(models.SubscriptionPlans).where(models.SubscriptionPlans.name == tier_data["name"])
)
if not result.scalars().first():
new_tier = models.SubscriptionPlans(**tier_data)
session.add(new_tier)
print(f"✅ Default subscription tier '{tier_data['name']}' created")
await session.commit()
# 2. 检查并创建默认管理员用户
result = await session.execute( result = await session.execute(
select(models.User).where(models.User.username == "admin") select(models.User).where(models.User.username == "admin")
) )
@@ -52,7 +102,7 @@ async def init_db():
await session.commit() await session.commit()
print("✅ Default admin user created (username: admin, password: admin123)") print("✅ Default admin user created (username: admin, password: admin123)")
# 检查是否已存在 Gemini 配置 # 3. 检查是否已存在 Gemini 配置
result = await session.execute( result = await session.execute(
select(models.AIConfig).where(models.AIConfig.provider_name == "gemini") select(models.AIConfig).where(models.AIConfig.provider_name == "gemini")
) )
@@ -68,4 +118,51 @@ async def init_db():
) )
session.add(gemini_config) session.add(gemini_config)
await session.commit() await session.commit()
print("✅ Default Gemini AI configuration created") print("✅ Default Gemini AI configuration created")
# 4. 检查并初始化 AI Roles
ai_roles_data = [
{
"id": 0,
"name": 'Reflective Assistant',
"description": 'Helps you dive deep into your thoughts and feelings through meaningful reflection.',
"systemPrompt": 'You are a helpful journal assistant. Help the user reflect on their thoughts and feelings.',
"icon": 'journal-outline',
"iconFamily": 'Ionicons',
},
{
"id": 1,
"name": 'Creative Spark',
"description": 'A partner for brainstorming, creative writing, and exploring new ideas.',
"systemPrompt": 'You are a creative brainstorming partner. Help the user explore new ideas, write stories, or look at things from a fresh perspective.',
"icon": 'bulb-outline',
"iconFamily": 'Ionicons',
},
{
"id": 2,
"name": 'Action Planner',
"description": 'Focused on turning thoughts into actionable plans and organized goals.',
"systemPrompt": 'You are a productivity coach. Help the user break down their thoughts into actionable steps and clear goals.',
"icon": 'list-outline',
"iconFamily": 'Ionicons',
},
{
"id": 3,
"name": 'Empathetic Guide',
"description": 'Provides a safe, non-judgmental space for emotional support and empathy.',
"systemPrompt": 'You are a supportive and empathetic friend. Listen to the user\'s concerns and provide emotional support without judgment.',
"icon": 'heart-outline',
"iconFamily": 'Ionicons',
},
]
for role_data in ai_roles_data:
result = await session.execute(
select(models.AIRole).where(models.AIRole.id == role_data["id"])
)
if not result.scalars().first():
new_role = models.AIRole(**role_data)
session.add(new_role)
print(f"✅ AI Role '{role_data['name']}' created")
await session.commit()

View File

@@ -7,7 +7,7 @@ from passlib.context import CryptContext
from sqlalchemy.exc import IntegrityError from sqlalchemy.exc import IntegrityError
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
import httpx import httpx
from datetime import datetime from datetime import datetime, timedelta
from typing import List from typing import List
@asynccontextmanager @asynccontextmanager
@@ -99,9 +99,18 @@ async def get_my_assets(
db: AsyncSession = Depends(database.get_db) db: AsyncSession = Depends(database.get_db)
): ):
result = await db.execute( result = await db.execute(
select(models.Asset).where(models.Asset.author_id == current_user.id) select(models.Asset)
.options(selectinload(models.Asset.heir))
.where(models.Asset.author_id == current_user.id)
) )
return result.scalars().all() assets = result.scalars().all()
# Populate heir_email for the schema
for asset in assets:
if asset.heir:
asset.heir_email = asset.heir.email
else:
asset.heir_email = None
return assets
@app.post("/assets/create", response_model=schemas.AssetOut) @app.post("/assets/create", response_model=schemas.AssetOut)
@@ -115,9 +124,12 @@ async def create_asset(
new_asset = models.Asset( new_asset = models.Asset(
title=asset.title, title=asset.title,
type=asset.type,
content_outer_encrypted=encrypted_content, content_outer_encrypted=encrypted_content,
private_key_shard=asset.private_key_shard, private_key_shard=asset.private_key_shard,
author_id=current_user.id author_id=current_user.id,
created_at=datetime.utcnow(),
updated_at=datetime.utcnow()
) )
db.add(new_asset) db.add(new_asset)
await db.commit() await db.commit()
@@ -199,7 +211,7 @@ async def assign_asset(
heir_result = await db.execute( heir_result = await db.execute(
select(models.User).where( select(models.User).where(
models.User.username == assignment.heir_name models.User.email == assignment.heir_email
) )
) )
heir_user = heir_result.scalars().first() heir_user = heir_result.scalars().first()
@@ -216,9 +228,67 @@ async def assign_asset(
asset.heir = heir_user asset.heir = heir_user
await db.commit() await db.commit()
return {"message": f"Asset assigned to {assignment.heir_name}"} return {"message": f"Asset assigned to {assignment.heir_email}"}
@app.get("/assets/designated", response_model=List[schemas.AssetOut])
async def get_designated_assets(
current_user: models.User = Depends(auth.get_current_user),
db: AsyncSession = Depends(database.get_db)
):
"""
Query assets where the current user is the designated heir.
"""
result = await db.execute(
select(models.Asset).where(models.Asset.heir_id == current_user.id)
)
return result.scalars().all()
@app.post("/assets/delete")
async def delete_asset(
asset_del: schemas.AssetDelete,
current_user: models.User = Depends(auth.get_current_user),
db: AsyncSession = Depends(database.get_db)
):
"""
Delete an asset owned by the current user.
"""
result = await db.execute(
select(models.Asset).where(models.Asset.id == asset_del.asset_id)
)
asset = result.scalars().first()
if not asset:
raise HTTPException(status_code=404, detail="Asset not found")
if asset.author_id != current_user.id:
raise HTTPException(status_code=403, detail="Not authorized to delete this asset")
await db.delete(asset)
await db.commit()
return {"message": "Asset deleted successfully"}
@app.get("/users/search", response_model=List[schemas.UserOut])
async def search_users(
query: str,
current_user: models.User = Depends(auth.get_current_user),
db: AsyncSession = Depends(database.get_db)
):
"""
Search for users by username or email.
"""
if not query:
raise HTTPException(status_code=400, detail="Search query is required")
# Search for username or email containing the query (case-insensitive)
result = await db.execute(
select(models.User).where(
(models.User.username.ilike(f"%{query}%")) |
(models.User.email.ilike(f"%{query}%"))
).limit(20) # Limit results for performance
)
users = result.scalars().all()
return users
@app.post("/admin/declare-guale") @app.post("/admin/declare-guale")
@@ -253,14 +323,34 @@ async def declare_user_guale(
"guale": target_user.guale "guale": target_user.guale
} }
# 用于测试热加载
@app.post("/post1")
async def test1():
a=2
b=3
c = a+b
return {"msg": f"this is a msg {c}"}
async def get_or_create_token_usage(user_id: int, db: AsyncSession):
# Get current week start (Monday)
now = datetime.utcnow()
monday = now - timedelta(days=now.weekday())
week_start = monday.replace(hour=0, minute=0, second=0, microsecond=0)
result = await db.execute(
select(models.UserTokenUsage).where(models.UserTokenUsage.user_id == user_id)
)
usage = result.scalars().first()
if not usage:
usage = models.UserTokenUsage(
user_id=user_id,
tokens_used=0,
last_reset_at=week_start
)
db.add(usage)
await db.commit()
await db.refresh(usage)
#每周重置token使用情况
elif usage.last_reset_at < week_start:
usage.tokens_used = 0
usage.last_reset_at = week_start
await db.commit()
return usage
@app.post("/ai/proxy", response_model=schemas.AIResponse) @app.post("/ai/proxy", response_model=schemas.AIResponse)
async def ai_proxy( async def ai_proxy(
@@ -272,6 +362,43 @@ async def ai_proxy(
Proxy relay for AI requests. Proxy relay for AI requests.
Fetches AI configuration from the database. Fetches AI configuration from the database.
""" """
def get_quota_exceeded_response():
return {
"id": f"chatcmpl-{int(datetime.utcnow().timestamp())}",
"object": "chat.completion",
"created": int(datetime.utcnow().timestamp()),
"model": "quota-manager",
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": "quota exceeded, please upgrade plan"
},
"finish_reason": "stop"
}
],
"usage": {
"prompt_tokens": 0,
"completion_tokens": 0,
"total_tokens": 0
}
}
# 1. 检查 Tier 是否允许使用 AI
result = await db.execute(
select(models.SubscriptionPlans).where(models.SubscriptionPlans.name == current_user.tier)
)
tier_plan = result.scalars().first()
if not tier_plan or not tier_plan.can_use_ai_proxy:
return get_quota_exceeded_response()
# 2. 检查本周 Token 使用是否超过限制
usage_record = await get_or_create_token_usage(current_user.id, db)
if usage_record.tokens_used >= tier_plan.weekly_token_limit:
return get_quota_exceeded_response()
# Fetch active AI config # Fetch active AI config
result = await db.execute( result = await db.execute(
select(models.AIConfig).where(models.AIConfig.is_active == True) select(models.AIConfig).where(models.AIConfig.is_active == True)
@@ -290,6 +417,9 @@ async def ai_proxy(
payload = ai_request.model_dump() payload = ai_request.model_dump()
payload["model"] = config.default_model payload["model"] = config.default_model
current_user.last_active_at = datetime.utcnow()
await db.commit()
async with httpx.AsyncClient() as client: async with httpx.AsyncClient() as client:
try: try:
response = await client.post( response = await client.post(
@@ -299,7 +429,15 @@ async def ai_proxy(
timeout=30.0 timeout=30.0
) )
response.raise_for_status() response.raise_for_status()
return response.json() ai_data = response.json()
# 3. 记录使用的 Token
total_tokens = ai_data.get("usage", {}).get("total_tokens", 0)
if total_tokens > 0:
usage_record.tokens_used += total_tokens
await db.commit()
return ai_data
except httpx.HTTPStatusError as e: except httpx.HTTPStatusError as e:
raise HTTPException( raise HTTPException(
status_code=e.response.status_code, status_code=e.response.status_code,
@@ -311,3 +449,23 @@ async def ai_proxy(
detail=f"An error occurred while requesting AI provider: {str(e)}" detail=f"An error occurred while requesting AI provider: {str(e)}"
) )
@app.get("/get_ai_roles", response_model=List[schemas.AIRoleOut])
async def get_ai_roles(
current_user: models.User = Depends(auth.get_current_user),
db: AsyncSession = Depends(database.get_db)
):
"""
Get all available AI roles for the logged-in user.
"""
result = await db.execute(select(models.AIRole).order_by(models.AIRole.id))
return result.scalars().all()
# 用于测试热加载
@app.post("/post1")
async def test1():
a=2
b=3
c = a+b
return {"msg": f"this is a msg {c}"}

View File

@@ -2,6 +2,23 @@ from sqlalchemy import Column, Integer, String, ForeignKey, Text, Table, Boolean
from sqlalchemy.orm import relationship from sqlalchemy.orm import relationship
from .database import Base from .database import Base
from datetime import datetime
class SubscriptionPlans(Base):
__tablename__ = "subscription_plans"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, unique=True, index=True) # "Free", "Pro", "Ultra"
max_heirs = Column(Integer, default=1)
weekly_token_limit = Column(Integer, default=1000)
max_assets = Column(Integer, default=5)
max_storage_mb = Column(Integer, default=10)
can_use_ai_proxy = Column(Boolean, default=False)
description = Column(Text, nullable=True)
users = relationship("User", back_populates="subscription_plans")
class User(Base): class User(Base):
@@ -12,10 +29,13 @@ class User(Base):
email = Column(String, unique=True, index=True) email = Column(String, unique=True, index=True)
hashed_password = Column(String) hashed_password = Column(String)
tier = Column(String)
tier = Column(String, ForeignKey("subscription_plans.name"), default="Free")
tier_expires_at = Column(DateTime) tier_expires_at = Column(DateTime)
last_active_at = Column(DateTime) last_active_at = Column(DateTime)
subscription_plans = relationship("SubscriptionPlans", back_populates="users")
# System keys # System keys
public_key = Column(String) public_key = Column(String)
private_key = Column(String) # Encrypted or raw? Storing raw for now as per req private_key = Column(String) # Encrypted or raw? Storing raw for now as per req
@@ -35,6 +55,7 @@ class Asset(Base):
content_outer_encrypted = Column(Text) content_outer_encrypted = Column(Text)
author_id = Column(Integer, ForeignKey("users.id")) author_id = Column(Integer, ForeignKey("users.id"))
heir_id = Column(Integer, ForeignKey("users.id")) heir_id = Column(Integer, ForeignKey("users.id"))
type = Column(String, index=True, nullable=True)
# Key shard for this asset # Key shard for this asset
private_key_shard = Column(String) private_key_shard = Column(String)
@@ -42,6 +63,9 @@ class Asset(Base):
author = relationship("User", foreign_keys=[author_id], back_populates="assets") author = relationship("User", foreign_keys=[author_id], back_populates="assets")
heir = relationship("User", foreign_keys=[heir_id], back_populates="inherited_assets") heir = relationship("User", foreign_keys=[heir_id], back_populates="inherited_assets")
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
class AIConfig(Base): class AIConfig(Base):
__tablename__ = "ai_configs" __tablename__ = "ai_configs"
@@ -50,4 +74,24 @@ class AIConfig(Base):
api_key = Column(String) api_key = Column(String)
api_url = Column(String) api_url = Column(String)
default_model = Column(String) default_model = Column(String)
is_active = Column(Boolean, default=True) is_active = Column(Boolean, default=True)
class UserTokenUsage(Base):
__tablename__ = "user_token_usage"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(Integer, ForeignKey("users.id"), unique=True)
tokens_used = Column(Integer, default=0)
last_reset_at = Column(DateTime)
user = relationship("User", backref="token_usage", uselist=False)
class AIRole(Base):
__tablename__ = "ai_roles"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, index=True)
description = Column(Text)
systemPrompt = Column(Text)
icon = Column(String)
iconFamily = Column(String)

View File

@@ -1,5 +1,5 @@
from pydantic import BaseModel, ConfigDict from pydantic import BaseModel, ConfigDict
from typing import List, Optional from typing import List, Optional, Union
from datetime import datetime from datetime import datetime
# Heir Schemas # Heir Schemas
@@ -45,6 +45,7 @@ class LoginResponse(BaseModel):
# Asset Schemas (renamed from Article) # Asset Schemas (renamed from Article)
class AssetBase(BaseModel): class AssetBase(BaseModel):
title: str title: str
type: Optional[str] = "note"
class AssetCreate(AssetBase): class AssetCreate(AssetBase):
private_key_shard: str private_key_shard: str
@@ -55,6 +56,10 @@ class AssetOut(AssetBase):
author_id: int author_id: int
private_key_shard: str private_key_shard: str
content_outer_encrypted: str content_outer_encrypted: str
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
heir_id: Optional[int] = None
heir_email: Optional[str] = None
model_config = ConfigDict(from_attributes=True) model_config = ConfigDict(from_attributes=True)
class AssetClaim(BaseModel): class AssetClaim(BaseModel):
@@ -67,15 +72,18 @@ class AssetClaimOut(AssetClaim):
class AssetAssign(BaseModel): class AssetAssign(BaseModel):
asset_id: int asset_id: int
heir_name: str heir_email: str
class AssetDelete(BaseModel):
asset_id: int
class DeclareGuale(BaseModel): class DeclareGuale(BaseModel):
username: str username: str
# AI Proxy Schemas # AI Proxy Schemas (content: str for text-only, list for multimodal e.g. image_url)
class AIMessage(BaseModel): class AIMessage(BaseModel):
role: str role: str
content: str content: Union[str, List[dict]]
class AIRequest(BaseModel): class AIRequest(BaseModel):
messages: List[AIMessage] messages: List[AIMessage]
@@ -87,4 +95,31 @@ class AIResponse(BaseModel):
created: int created: int
model: str model: str
choices: List[dict] choices: List[dict]
usage: dict usage: dict
# Subscription Plans Schemas
class SubscriptionPlansBase(BaseModel):
name: str
max_heirs: int
weekly_token_limit: int
max_assets: int
max_storage_mb: int
can_use_ai_proxy: bool
description: Optional[str] = None
class SubscriptionPlansOut(SubscriptionPlansBase):
id: int
model_config = ConfigDict(from_attributes=True)
# AI Role Schemas
class AIRoleBase(BaseModel):
id: int
name: str
description: str
systemPrompt: str
icon: str
iconFamily: str
class AIRoleOut(AIRoleBase):
model_config = ConfigDict(from_attributes=True)

View File

@@ -10,4 +10,5 @@ watchfiles
argon2_cffi argon2_cffi
pycryptodome pycryptodome
cryptography cryptography
httpx

View File

@@ -14,7 +14,8 @@ async def test_ai_proxy_integration():
print(f"1. Registering user: {username}") print(f"1. Registering user: {username}")
reg_res = await client.post("/register", json={ reg_res = await client.post("/register", json={
"username": username, "username": username,
"password": "testpassword" "password": "testpassword",
"email": f"user_{int(time.time())}@example.com"
}) })
if reg_res.status_code != 200: if reg_res.status_code != 200:
print(f"Registration failed: {reg_res.text}") print(f"Registration failed: {reg_res.text}")
@@ -22,7 +23,7 @@ async def test_ai_proxy_integration():
# 2. Login to get token # 2. Login to get token
print("2. Logging in...") print("2. Logging in...")
login_res = await client.post("/token", json={ login_res = await client.post("/login", json={
"username": username, "username": username,
"password": "testpassword" "password": "testpassword"
}) })

View File

@@ -35,32 +35,35 @@ def login_user(username, password):
print(f"Failed to login {username}: {response.text}") print(f"Failed to login {username}: {response.text}")
return None return None
def create_asset(token, title, private_key_shard, content_inner_encrypted): def create_asset(token, title, private_key_shard, content_inner_encrypted, asset_type="note"):
url = f"{BASE_URL}/assets/create" url = f"{BASE_URL}/assets/create"
headers = {"Authorization": f"Bearer {token}"} headers = {"Authorization": f"Bearer {token}"}
data = { data = {
"title": title, "title": title,
"type": asset_type,
"private_key_shard": str(private_key_shard), "private_key_shard": str(private_key_shard),
"content_inner_encrypted": str(content_inner_encrypted) "content_inner_encrypted": str(content_inner_encrypted)
} }
response = requests.post(url, json=data, headers=headers) response = requests.post(url, json=data, headers=headers)
if response.status_code == 200: if response.status_code == 200:
print(f"Asset '{title}' created successfully.") asset_data = response.json()
return response.json() print(f"Asset '{title}' (type: {asset_type}) created successfully.")
print(f" [校验] Timestamps: created_at={asset_data.get('created_at')}, updated_at={asset_data.get('updated_at')}")
return asset_data
else: else:
print(f"Failed to create asset: {response.text}") print(f"Failed to create asset: {response.text}")
return None return None
def assign_heir(token, asset_id, heir_name): def assign_heir(token, asset_id, heir_email):
url = f"{BASE_URL}/assets/assign" url = f"{BASE_URL}/assets/assign"
headers = {"Authorization": f"Bearer {token}"} headers = {"Authorization": f"Bearer {token}"}
data = { data = {
"asset_id": asset_id, "asset_id": asset_id,
"heir_name": heir_name "heir_email": heir_email
} }
response = requests.post(url, json=data, headers=headers) response = requests.post(url, json=data, headers=headers)
if response.status_code == 200: if response.status_code == 200:
print(f"Asset {asset_id} assigned to heir {heir_name} successfully.") print(f"Asset {asset_id} assigned to heir {heir_email} successfully.")
return response.json() return response.json()
else: else:
print(f"Failed to assign heir: {response.text}") print(f"Failed to assign heir: {response.text}")
@@ -105,6 +108,17 @@ def get_my_assets(token):
print(f"Failed to retrieve assets: {response.text}") print(f"Failed to retrieve assets: {response.text}")
return None return None
def get_designated_assets(token):
url = f"{BASE_URL}/assets/designated"
headers = {"Authorization": f"Bearer {token}"}
response = requests.get(url, headers=headers)
if response.status_code == 200:
print(f"Designated assets retrieved successfully.")
return response.json()
else:
print(f"Failed to retrieve designated assets: {response.text}")
return None
def main(): def main():
# 1. 创建三个用户 # 1. 创建三个用户
users = [ users = [
@@ -143,38 +157,60 @@ def main():
if not token1: if not token1:
return return
# 3. 创建个 asset # 3. 创建个 asset
asset1 = create_asset( asset1 = create_asset(
token1, token1,
"My Secret Asset1", "My Secret Asset1",
share_a, share_a,
ciphertext_1 ciphertext_1,
"note"
) )
asset2 = create_asset( asset2 = create_asset(
token1, token1,
"My Secret Asset2", "My Secret Asset2",
share_a, share_a,
ciphertext_1 ciphertext_1,
"note"
) )
if not asset1 or not asset2: asset3 = create_asset(
token1,
"My Secret Asset3",
share_a,
ciphertext_1,
"note"
)
if not asset1 or not asset2 or not asset3:
print(" [失败] 创建资产失败") print(" [失败] 创建资产失败")
return return
# 3.1 测试 /assets/get # 3.1 测试 /assets/get
print("\n [测试] 获取用户资产列表") print("\n [测试] 获取用户资产列表")
my_assets = get_my_assets(token1) user1_assets = get_my_assets(token1)
if my_assets: if user1_assets:
print(f" [输出] 成功获取 {len(my_assets)} 个资产") print(f" [输出] 用户1共有 {len(user1_assets)} 个资产")
else: else:
print(" [失败] 无法获取资产列表") print(" [失败] 无法获取资产列表")
# 4. 指定用户 2 为继承人 print("用户 1 为用户 2 分配遗产")
print("用户 1 指定用户 2 为继承人") assign_heir(token1, asset1["id"], "user2@example.com")
assign_heir(token1, asset1["id"], "user2") assign_heir(token1, asset2["id"], "user2@example.com")
# 4.1 用户2查询自己能继承多少遗产
print("\n [测试] 用户 2 查询自己被指定的资产")
token2_temp = login_user("user2", "pass123")
designated_assets = get_designated_assets(token2_temp)
if designated_assets:
print(f" [输出] 用户 2 共有 {len(designated_assets)} 个被指定的资产")
for asset in designated_assets:
print(f" - Asset ID: {asset['id']}, Title: {asset['title']}")
else:
print(" [失败] 无法获取被指定资产列表")
print("\n## 3. 继承流 (Inheritance Layer)") print("\n## 3. 继承流 (Inheritance Layer)")
# 5. Admin 宣布用户 1 挂了 # 5. Admin 宣布用户 1 挂了
print("Admin 宣布用户 1 挂了") print("Admin 宣布用户 1 挂了")

50
test/test_user_search.py Normal file
View File

@@ -0,0 +1,50 @@
import httpx
import asyncio
BASE_URL = "http://localhost:8000"
async def test_search():
async with httpx.AsyncClient() as client:
# 1. Login to get token
login_data = {
"username": "testuser",
"password": "testpassword"
}
# First, try to register if user doesn't exist (assuming test env)
try:
await client.post(f"{BASE_URL}/register", json={
"username": "testuser",
"email": "test@example.com",
"password": "testpassword"
})
except:
pass
response = await client.post(f"{BASE_URL}/login", json=login_data)
if response.status_code != 200:
print(f"Login failed: {response.text}")
return
token = response.json()["access_token"]
headers = {"Authorization": f"Bearer {token}"}
# 2. Test search by username
print("Testing search by username 'test'...")
response = await client.get(f"{BASE_URL}/users/search?query=test", headers=headers)
print(f"Status: {response.status_code}")
print(f"Body: {response.json()}")
# 3. Test search by email
print("\nTesting search by email 'example'...")
response = await client.get(f"{BASE_URL}/users/search?query=example", headers=headers)
print(f"Status: {response.status_code}")
print(f"Body: {response.json()}")
# 4. Test search with no results
print("\nTesting search with no results 'nonexistent'...")
response = await client.get(f"{BASE_URL}/users/search?query=nonexistent", headers=headers)
print(f"Status: {response.status_code}")
print(f"Body: {response.json()}")
if __name__ == "__main__":
asyncio.run(test_search())