Compare commits

4 Commits
main ... renee

Author SHA1 Message Date
renee
d23c27a177 加入图片ai功能 2026-02-02 12:39:01 -08:00
renee
04c44b4775 完成ai 2026-02-02 12:33:57 -08:00
renee
14289c2d8f 联入SQLite 2026-01-30 22:49:12 -08:00
renee
adab4877ad AI的框架 2026-01-30 19:31:38 -08:00
18 changed files with 219 additions and 532 deletions

7
.gitignore vendored
View File

@@ -1,8 +1,3 @@
gitignore gitignore
# Python virtual environments # Python virtual environments
venv/ venv/
# Python cache
__pycache__/
app/__pycache__/
*.pyc

103
README.md
View File

@@ -1,103 +1,86 @@
# CypherLegacy Backend # CypherLegacy Backend
CypherLegacy is a secure digital legacy inheritance system backend built with FastAPI. It allows users to safely store encrypted digital assets and designate specific heirs who can claim these assets after the user's status is confirmed as "deceased" by an administrator. CypherLegacy 是一个基于 FastAPI 构建的数字遗产继承系统后端。它允许用户安全地存储加密的数字资产,并指定在用户去世(被管理员确认状态)后可以由特定的继承人领取这些资产。
## 🌟 Core Features ## 🌟 核心功能
- **Secure Authentication**: Registration automatically generates an RSA key pair (public/private). Public keys are used for encryption, while private keys are used for secure decryption. - **安全注册与认证**: 采用 RSA 密钥对生成机制。用户注册时自动生成公私钥对,公钥用于加密资产,私钥用于解密。
- **Encrypted Asset Storage**: Users can upload assets that are encrypted using their unique public key before storage. - **数字资产加密存储**: 用户可以上传资产,系统使用用户的公钥对资产内容进行加密存储。
- **Legacy Designation**: Users can assign specific heirs (via email) to each of their digital assets. - **遗产指定**: 用户可以为每个资产指定一名继承人Heir
- **Status Monitoring**: Administrators can officially declare a user as "deceased" (`guale`), triggering the inheritance process. - **吊唁/状态确认**: 系统管理员有权标记用户为“已故”guale
- **Inheritance Claiming**: - **遗产申领**:
- Designated heirs can claim assets only after the owner's status is verified. - 只有被确认标记为“已故”的用户,其指定的继承人才能申领资产。
- The system securely decrypts the content using the deceased user's private key for the authorized heir. - 申领过程中,系统利用已故用户的私钥解密内容并安全交付给继承人。
- **AI Proxy Service**: A built-in proxy for interacting with AI models with role-based configurations and weekly quota/token management. - **密钥分片 (Sharding)**: 支持私钥分片存储逻辑,增强安全性。
- **Subscription Tiers**: Multi-tier subscription system (Free, Pro, etc.) controlling limits on heirs, AI usage, and more.
- **Last Active Tracking**: Automatically tracks user activity to help monitor status.
## 🛠 Technology Stack ## 🛠 技术栈
- **Framework**: [FastAPI](https://fastapi.tiangolo.com/) - **框架**: [FastAPI](https://fastapi.tiangolo.com/)
- **Database**: [PostgreSQL](https://www.postgresql.org/) (via `asyncpg` async driver) - **数据库**: [PostgreSQL](https://www.postgresql.org/) (通过 `asyncpg` 异步驱动)
- **ORM**: [SQLAlchemy 2.0](https://www.sqlalchemy.org/) (AsyncIO) - **ORM**: [SQLAlchemy 2.0](https://www.sqlalchemy.org/) (AsyncIO)
- **Encryption**: [Cryptography](https://cryptography.io/) & [PyCryptodome](https://pycryptodome.org/) (RSA Encryption) - **加密**: [Cryptography](https://cryptography.io/) & [PyCryptodome](https://pycryptodome.org/) (RSA 加密)
- **Authentication**: [python-jose](https://github.com/mpdavis/python-jose) (JWT Tokens) & Passlib (Bcrypt/Argon2) - **权限**: [python-jose](https://github.com/mpdavis/python-jose) (JWT Token)
- **Request Client**: [HTTPX](https://www.python-httpx.org/) (for AI Proxy)
## 🚀 Getting Started ## 🚀 快速开始
### 1. Using Docker Compose (Recommended) ### 1. 使用 Docker Compose 启动 (推荐)
This is the fastest way to get the system running with a pre-configured database. 这是最快的方式,会自动配置数据库和应用环境。
```bash ```bash
docker-compose up --build docker-compose up --build
``` ```
简单测试:
- **API Documentation**: `http://localhost:8000/docs`
- **Default Admin**: `admin` / `admin123`
### 🧪 Running Tests
Once the service is up, you can run the automated test scenario:
```bash ```bash
python3 test/test_scenario.py python3 test/test_scenario.py
``` ```
### 2. Local Manual Setup - 接口文档: `http://localhost:8000/docs`
- 管理员默认账号: `admin` / `admin123`
1. **Environment Setup**: ### 2. 本地手动启动
1. **安装环境**:
```bash ```bash
python -m venv venv python -m venv venv
source venv/bin/activate # Linux/macOS source venv/bin/activate # Linux/macOS
# or venv\Scripts\activate on Windows
pip install -r requirements.txt pip install -r requirements.txt
``` ```
2. **Configuration**: 2. **配置环境变量**:
Create a `.env` file or set the `DATABASE_URL` environment variable. Default: 创建一个 `.env` 文件或设置环境变量 `DATABASE_URL`。默认为:
`postgresql+asyncpg://user:password@localhost:5432/fastapi_db` `postgresql+asyncpg://user:password@localhost:5432/fastapi_db`
3. **Database Initialization**: 3. **初始化数据库**:
```bash ```bash
python reset_db.py python reset_db.py
``` ```
4. **Run Service**: 4. **运行服务**:
```bash ```bash
uvicorn app.main:app --reload uvicorn app.main:app --reload
``` ```
## 📖 API Overview ## 📖 API 概览
### User & Auth ### 用户接口
- `POST /register`: Register and generate RSA keys. - `POST /register`: 用户注册,自动生成 RSA 密钥对。
- `POST /login`: Login and receive JWT access token. - `POST /token`: 登录获取 JWT Access Token
- `GET /users/search`: Search for users by username or email.
### Asset Management ### 资产接口
- `GET /assets/get`: Retrieve assets owned by the current user. - `POST /assets/`: 创建加密资产(上传者自动加密)。
- `POST /assets/create`: Create a new encrypted asset. - `POST /assets/assign`: 为资产指定继承人。
- `POST /assets/assign`: Assign or unassign an heir to an asset. - `POST /assets/claim`: 继承人申领资产(需原所有人状态为 `guale`)。
- `POST /assets/delete`: Remove an asset.
- `GET /assets/designated`: List assets where the user is the designated heir.
### Inheritance ### 管理员接口
- `POST /assets/claim`: Claim an asset (requires owner to be marked as deceased). - `POST /admin/declare-guale`: 管理员宣告用户“已故”。
### AI & Roles ## 🔒 安全设计
- `POST /ai/proxy`: Proxy requests to external AI providers with quota tracking.
- `GET /get_ai_roles`: Retrieve available AI personas/roles.
### Admin 1. **端到端思想**: 资产在存储前使用 RSA 公钥加密。
- `POST /admin/declare-guale`: (Admin Only) Declare a user as deceased. 2. **状态验证**: 申领逻辑严格校验 `heir_id` 以及 `author.guale` 状态。
3. **密钥管理**: 本项目目前为了演示方便在数据库中存储了私钥,在生产环境下建议配合 KMS 或硬件安全模块使用。
## 🔒 Security Design ## 📜 许可证
1. **End-to-End Principles**: Sensitive assets are encrypted before storage.
2. **State Verification**: Claim logic strictly validates the `heir_id` and the `deceased` status of the owner.
3. **Key Management**: For demonstration, private keys are stored in the database. In a production environment, integration with a KMS (Key Management Service) or HSM (Hardware Security Module) is highly recommended.
## 📜 License
MIT License MIT License

Binary file not shown.

Binary file not shown.

131
app/ai/ai.py Normal file
View File

@@ -0,0 +1,131 @@
!pip install -q langgraph-checkpoint-sqlite langchain_google_genai
import sqlite3
from google.colab import userdata
from typing import Literal, TypedDict, Annotated
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.messages import SystemMessage, HumanMessage, RemoveMessage, AnyMessage
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.graph.message import add_messages
from langchain_core.runnables import RunnableConfig
from typing import Union, List, Dict
# --- 1. 状态定义 ---
class State(TypedDict):
messages: Annotated[list[AnyMessage], add_messages]
summary: str # 永久存储在数据库中的摘要内容
# --- 2. 核心逻辑 ---
llm = ChatGoogleGenerativeAI(model="gemini-2.5-flash", temperature = 0.7, google_api_key='') #在这里倒入API key
def call_model(state: State, config: RunnableConfig):
"""对话节点:融合了动态 Prompt 和 长期摘要"""
# 获取当前 session 特有的 System Prompt如果没传则使用默认
configurable = config.get("configurable", {})
system_base_prompt = configurable.get("system_prompt", "你是一个通用的 AI 助手。")
# 构造当前上下文
summary = state.get("summary", "")
if summary:
system_base_prompt += f"\n\n<context_summary>\n{summary}\n</context_summary>"
messages = [SystemMessage(content=system_base_prompt)] + state["messages"]
response = llm.invoke(messages)
return {"messages": [response]}
def summarize_conversation(state: State):
"""总结节点:负责更新摘要并清理过期消息"""
summary = state.get("summary", "")
messages_to_summarize = state["messages"][:-1]
# If there's nothing to summarize yet, just END
if not messages_to_summarize:
return {"summary": summary}
system_prompt = (
"你是一个记忆管理专家。请更新摘要,合并新旧信息。"
"1. 保持简练,仅保留事实(姓名、偏好、核心议题)。"
"2. 如果新消息包含对旧信息的修正,请更新它。"
"3. 如果对话中包含图片描述,请将图片的关键视觉信息也记录在摘要中"
)
summary_input = f"现有摘要: {summary}\n\n待加入的新信息: {messages_to_summarize}"
# Invoke model to get new condensed summary
response = llm.invoke([
SystemMessage(content=system_prompt),
HumanMessage(content=summary_input)
])
# Important: Create RemoveMessage objects for all messages that were summarized
delete_messages = [RemoveMessage(id=m.id) for m in messages_to_summarize if m.id]
return {
"summary": response.content,
"messages": delete_messages
}
def should_continue(state: State) -> Literal["summarize", END]:
"""如果消息累积超过3条则去总结节点"""
if len(state["messages"]) > 3: #changed
return "summarize"
return END
# --- 3. 构建图 ---
db_path = "multi_session_chat.sqlite"
conn = sqlite3.connect(db_path, check_same_thread=False)
memory = SqliteSaver(conn)
workflow = StateGraph(State)
workflow.add_node("chatbot", call_model)
workflow.add_node("summarize", summarize_conversation)
workflow.add_edge(START, "chatbot")
workflow.add_conditional_edges("chatbot", should_continue)
workflow.add_edge("summarize", END)
app = workflow.compile(checkpointer=memory)
def chat(thread_id: str, system_prompt: str, user_content: Union[str, List[Dict]]):
"""
Processes a single user message and returns the AI response,
persisting memory via the thread_id.
"""
config = {
"configurable": {
"thread_id": thread_id,
"system_prompt": system_prompt
}
}
# Prepare the input for this specific turn
input_data = {"messages": [HumanMessage(content=user_content)]}
ai_response = ""
# Stream the values to get the final AI message
for event in app.stream(input_data, config, stream_mode="values"):
if "messages" in event:
last_msg = event["messages"][-1]
if last_msg.type == "ai":
ai_response = last_msg.content
return ai_response
# 使用范例
# if __name__ == "__main__":
# tid = "py_expert_001"
# sys_p = "你是个善解人意的机器人。"
# # Call 1: Establish context
# resp1 = chat(tid, sys_p, "你好,我叫小明。")
# print(f"Bot: {resp1}")
# # Call 2: Test memory (The model should remember the name '小明')
# resp2 = chat(tid, sys_p, "我今天很开心")
# print(f"Bot: {resp2}")

View File

@@ -24,62 +24,12 @@ async def init_db():
# 自动创建表 # 自动创建表
await conn.run_sync(Base.metadata.create_all) await conn.run_sync(Base.metadata.create_all)
# 创建默认管理员用户
async with AsyncSessionLocal() as session: async with AsyncSessionLocal() as session:
from . import auth from . import auth
from sqlalchemy.future import select from sqlalchemy.future import select
# 1. 检查并创建默认订阅级别 (MUST BE FIRST because of FK constraints) # 检查是否已存在 admin 用户
tiers = [
{
"name": "Free",
"max_heirs": 1,
"weekly_token_limit": 1000,
"max_assets": 5,
"max_storage_mb": 10,
"can_use_ai_proxy": False,
"description": "Standard free tier"
},
{
"name": "Pro",
"max_heirs": 5,
"weekly_token_limit": 10000,
"max_assets": 50,
"max_storage_mb": 100,
"can_use_ai_proxy": True,
"description": "Professional tier for active users"
},
{
"name": "Ultra",
"max_heirs": 100,
"weekly_token_limit": 100000,
"max_assets": 1000,
"max_storage_mb": 1024,
"can_use_ai_proxy": True,
"description": "Ultimate tier for power users"
},
{
"name": "Unlimited",
"max_heirs": 9999,
"weekly_token_limit": 999999,
"max_assets": 9999,
"max_storage_mb": 999999,
"can_use_ai_proxy": True,
"description": "Internal unlimited tier"
}
]
for tier_data in tiers:
result = await session.execute(
select(models.SubscriptionPlans).where(models.SubscriptionPlans.name == tier_data["name"])
)
if not result.scalars().first():
new_tier = models.SubscriptionPlans(**tier_data)
session.add(new_tier)
print(f"✅ Default subscription tier '{tier_data['name']}' created")
await session.commit()
# 2. 检查并创建默认管理员用户
result = await session.execute( result = await session.execute(
select(models.User).where(models.User.username == "admin") select(models.User).where(models.User.username == "admin")
) )
@@ -102,7 +52,7 @@ async def init_db():
await session.commit() await session.commit()
print("✅ Default admin user created (username: admin, password: admin123)") print("✅ Default admin user created (username: admin, password: admin123)")
# 3. 检查是否已存在 Gemini 配置 # 检查是否已存在 Gemini 配置
result = await session.execute( result = await session.execute(
select(models.AIConfig).where(models.AIConfig.provider_name == "gemini") select(models.AIConfig).where(models.AIConfig.provider_name == "gemini")
) )
@@ -118,51 +68,4 @@ async def init_db():
) )
session.add(gemini_config) session.add(gemini_config)
await session.commit() await session.commit()
print("✅ Default Gemini AI configuration created") print("✅ Default Gemini AI configuration created")
# 4. 检查并初始化 AI Roles
ai_roles_data = [
{
"id": 0,
"name": 'Reflective Assistant',
"description": 'Helps you dive deep into your thoughts and feelings through meaningful reflection.',
"systemPrompt": 'You are a helpful journal assistant. Help the user reflect on their thoughts and feelings.',
"icon": 'journal-outline',
"iconFamily": 'Ionicons',
},
{
"id": 1,
"name": 'Creative Spark',
"description": 'A partner for brainstorming, creative writing, and exploring new ideas.',
"systemPrompt": 'You are a creative brainstorming partner. Help the user explore new ideas, write stories, or look at things from a fresh perspective.',
"icon": 'bulb-outline',
"iconFamily": 'Ionicons',
},
{
"id": 2,
"name": 'Action Planner',
"description": 'Focused on turning thoughts into actionable plans and organized goals.',
"systemPrompt": 'You are a productivity coach. Help the user break down their thoughts into actionable steps and clear goals.',
"icon": 'list-outline',
"iconFamily": 'Ionicons',
},
{
"id": 3,
"name": 'Empathetic Guide',
"description": 'Provides a safe, non-judgmental space for emotional support and empathy.',
"systemPrompt": 'You are a supportive and empathetic friend. Listen to the user\'s concerns and provide emotional support without judgment.',
"icon": 'heart-outline',
"iconFamily": 'Ionicons',
},
]
for role_data in ai_roles_data:
result = await session.execute(
select(models.AIRole).where(models.AIRole.id == role_data["id"])
)
if not result.scalars().first():
new_role = models.AIRole(**role_data)
session.add(new_role)
print(f"✅ AI Role '{role_data['name']}' created")
await session.commit()

View File

@@ -7,7 +7,7 @@ from passlib.context import CryptContext
from sqlalchemy.exc import IntegrityError from sqlalchemy.exc import IntegrityError
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
import httpx import httpx
from datetime import datetime, timedelta from datetime import datetime
from typing import List from typing import List
@asynccontextmanager @asynccontextmanager
@@ -99,18 +99,9 @@ async def get_my_assets(
db: AsyncSession = Depends(database.get_db) db: AsyncSession = Depends(database.get_db)
): ):
result = await db.execute( result = await db.execute(
select(models.Asset) select(models.Asset).where(models.Asset.author_id == current_user.id)
.options(selectinload(models.Asset.heir))
.where(models.Asset.author_id == current_user.id)
) )
assets = result.scalars().all() return result.scalars().all()
# Populate heir_email for the schema
for asset in assets:
if asset.heir:
asset.heir_email = asset.heir.email
else:
asset.heir_email = None
return assets
@app.post("/assets/create", response_model=schemas.AssetOut) @app.post("/assets/create", response_model=schemas.AssetOut)
@@ -124,12 +115,9 @@ async def create_asset(
new_asset = models.Asset( new_asset = models.Asset(
title=asset.title, title=asset.title,
type=asset.type,
content_outer_encrypted=encrypted_content, content_outer_encrypted=encrypted_content,
private_key_shard=asset.private_key_shard, private_key_shard=asset.private_key_shard,
author_id=current_user.id, author_id=current_user.id
created_at=datetime.utcnow(),
updated_at=datetime.utcnow()
) )
db.add(new_asset) db.add(new_asset)
await db.commit() await db.commit()
@@ -211,7 +199,7 @@ async def assign_asset(
heir_result = await db.execute( heir_result = await db.execute(
select(models.User).where( select(models.User).where(
models.User.email == assignment.heir_email models.User.username == assignment.heir_name
) )
) )
heir_user = heir_result.scalars().first() heir_user = heir_result.scalars().first()
@@ -228,67 +216,9 @@ async def assign_asset(
asset.heir = heir_user asset.heir = heir_user
await db.commit() await db.commit()
return {"message": f"Asset assigned to {assignment.heir_email}"} return {"message": f"Asset assigned to {assignment.heir_name}"}
@app.get("/assets/designated", response_model=List[schemas.AssetOut])
async def get_designated_assets(
current_user: models.User = Depends(auth.get_current_user),
db: AsyncSession = Depends(database.get_db)
):
"""
Query assets where the current user is the designated heir.
"""
result = await db.execute(
select(models.Asset).where(models.Asset.heir_id == current_user.id)
)
return result.scalars().all()
@app.post("/assets/delete")
async def delete_asset(
asset_del: schemas.AssetDelete,
current_user: models.User = Depends(auth.get_current_user),
db: AsyncSession = Depends(database.get_db)
):
"""
Delete an asset owned by the current user.
"""
result = await db.execute(
select(models.Asset).where(models.Asset.id == asset_del.asset_id)
)
asset = result.scalars().first()
if not asset:
raise HTTPException(status_code=404, detail="Asset not found")
if asset.author_id != current_user.id:
raise HTTPException(status_code=403, detail="Not authorized to delete this asset")
await db.delete(asset)
await db.commit()
return {"message": "Asset deleted successfully"}
@app.get("/users/search", response_model=List[schemas.UserOut])
async def search_users(
query: str,
current_user: models.User = Depends(auth.get_current_user),
db: AsyncSession = Depends(database.get_db)
):
"""
Search for users by username or email.
"""
if not query:
raise HTTPException(status_code=400, detail="Search query is required")
# Search for username or email containing the query (case-insensitive)
result = await db.execute(
select(models.User).where(
(models.User.username.ilike(f"%{query}%")) |
(models.User.email.ilike(f"%{query}%"))
).limit(20) # Limit results for performance
)
users = result.scalars().all()
return users
@app.post("/admin/declare-guale") @app.post("/admin/declare-guale")
@@ -323,34 +253,14 @@ async def declare_user_guale(
"guale": target_user.guale "guale": target_user.guale
} }
# 用于测试热加载
@app.post("/post1")
async def test1():
a=2
b=3
c = a+b
return {"msg": f"this is a msg {c}"}
async def get_or_create_token_usage(user_id: int, db: AsyncSession):
# Get current week start (Monday)
now = datetime.utcnow()
monday = now - timedelta(days=now.weekday())
week_start = monday.replace(hour=0, minute=0, second=0, microsecond=0)
result = await db.execute(
select(models.UserTokenUsage).where(models.UserTokenUsage.user_id == user_id)
)
usage = result.scalars().first()
if not usage:
usage = models.UserTokenUsage(
user_id=user_id,
tokens_used=0,
last_reset_at=week_start
)
db.add(usage)
await db.commit()
await db.refresh(usage)
#每周重置token使用情况
elif usage.last_reset_at < week_start:
usage.tokens_used = 0
usage.last_reset_at = week_start
await db.commit()
return usage
@app.post("/ai/proxy", response_model=schemas.AIResponse) @app.post("/ai/proxy", response_model=schemas.AIResponse)
async def ai_proxy( async def ai_proxy(
@@ -362,43 +272,6 @@ async def ai_proxy(
Proxy relay for AI requests. Proxy relay for AI requests.
Fetches AI configuration from the database. Fetches AI configuration from the database.
""" """
def get_quota_exceeded_response():
return {
"id": f"chatcmpl-{int(datetime.utcnow().timestamp())}",
"object": "chat.completion",
"created": int(datetime.utcnow().timestamp()),
"model": "quota-manager",
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": "quota exceeded, please upgrade plan"
},
"finish_reason": "stop"
}
],
"usage": {
"prompt_tokens": 0,
"completion_tokens": 0,
"total_tokens": 0
}
}
# 1. 检查 Tier 是否允许使用 AI
result = await db.execute(
select(models.SubscriptionPlans).where(models.SubscriptionPlans.name == current_user.tier)
)
tier_plan = result.scalars().first()
if not tier_plan or not tier_plan.can_use_ai_proxy:
return get_quota_exceeded_response()
# 2. 检查本周 Token 使用是否超过限制
usage_record = await get_or_create_token_usage(current_user.id, db)
if usage_record.tokens_used >= tier_plan.weekly_token_limit:
return get_quota_exceeded_response()
# Fetch active AI config # Fetch active AI config
result = await db.execute( result = await db.execute(
select(models.AIConfig).where(models.AIConfig.is_active == True) select(models.AIConfig).where(models.AIConfig.is_active == True)
@@ -417,9 +290,6 @@ async def ai_proxy(
payload = ai_request.model_dump() payload = ai_request.model_dump()
payload["model"] = config.default_model payload["model"] = config.default_model
current_user.last_active_at = datetime.utcnow()
await db.commit()
async with httpx.AsyncClient() as client: async with httpx.AsyncClient() as client:
try: try:
response = await client.post( response = await client.post(
@@ -429,15 +299,7 @@ async def ai_proxy(
timeout=30.0 timeout=30.0
) )
response.raise_for_status() response.raise_for_status()
ai_data = response.json() return response.json()
# 3. 记录使用的 Token
total_tokens = ai_data.get("usage", {}).get("total_tokens", 0)
if total_tokens > 0:
usage_record.tokens_used += total_tokens
await db.commit()
return ai_data
except httpx.HTTPStatusError as e: except httpx.HTTPStatusError as e:
raise HTTPException( raise HTTPException(
status_code=e.response.status_code, status_code=e.response.status_code,
@@ -449,23 +311,3 @@ async def ai_proxy(
detail=f"An error occurred while requesting AI provider: {str(e)}" detail=f"An error occurred while requesting AI provider: {str(e)}"
) )
@app.get("/get_ai_roles", response_model=List[schemas.AIRoleOut])
async def get_ai_roles(
current_user: models.User = Depends(auth.get_current_user),
db: AsyncSession = Depends(database.get_db)
):
"""
Get all available AI roles for the logged-in user.
"""
result = await db.execute(select(models.AIRole).order_by(models.AIRole.id))
return result.scalars().all()
# 用于测试热加载
@app.post("/post1")
async def test1():
a=2
b=3
c = a+b
return {"msg": f"this is a msg {c}"}

View File

@@ -2,23 +2,6 @@ from sqlalchemy import Column, Integer, String, ForeignKey, Text, Table, Boolean
from sqlalchemy.orm import relationship from sqlalchemy.orm import relationship
from .database import Base from .database import Base
from datetime import datetime
class SubscriptionPlans(Base):
__tablename__ = "subscription_plans"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, unique=True, index=True) # "Free", "Pro", "Ultra"
max_heirs = Column(Integer, default=1)
weekly_token_limit = Column(Integer, default=1000)
max_assets = Column(Integer, default=5)
max_storage_mb = Column(Integer, default=10)
can_use_ai_proxy = Column(Boolean, default=False)
description = Column(Text, nullable=True)
users = relationship("User", back_populates="subscription_plans")
class User(Base): class User(Base):
@@ -29,13 +12,10 @@ class User(Base):
email = Column(String, unique=True, index=True) email = Column(String, unique=True, index=True)
hashed_password = Column(String) hashed_password = Column(String)
tier = Column(String)
tier = Column(String, ForeignKey("subscription_plans.name"), default="Free")
tier_expires_at = Column(DateTime) tier_expires_at = Column(DateTime)
last_active_at = Column(DateTime) last_active_at = Column(DateTime)
subscription_plans = relationship("SubscriptionPlans", back_populates="users")
# System keys # System keys
public_key = Column(String) public_key = Column(String)
private_key = Column(String) # Encrypted or raw? Storing raw for now as per req private_key = Column(String) # Encrypted or raw? Storing raw for now as per req
@@ -55,7 +35,6 @@ class Asset(Base):
content_outer_encrypted = Column(Text) content_outer_encrypted = Column(Text)
author_id = Column(Integer, ForeignKey("users.id")) author_id = Column(Integer, ForeignKey("users.id"))
heir_id = Column(Integer, ForeignKey("users.id")) heir_id = Column(Integer, ForeignKey("users.id"))
type = Column(String, index=True, nullable=True)
# Key shard for this asset # Key shard for this asset
private_key_shard = Column(String) private_key_shard = Column(String)
@@ -63,9 +42,6 @@ class Asset(Base):
author = relationship("User", foreign_keys=[author_id], back_populates="assets") author = relationship("User", foreign_keys=[author_id], back_populates="assets")
heir = relationship("User", foreign_keys=[heir_id], back_populates="inherited_assets") heir = relationship("User", foreign_keys=[heir_id], back_populates="inherited_assets")
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
class AIConfig(Base): class AIConfig(Base):
__tablename__ = "ai_configs" __tablename__ = "ai_configs"
@@ -74,24 +50,4 @@ class AIConfig(Base):
api_key = Column(String) api_key = Column(String)
api_url = Column(String) api_url = Column(String)
default_model = Column(String) default_model = Column(String)
is_active = Column(Boolean, default=True) is_active = Column(Boolean, default=True)
class UserTokenUsage(Base):
__tablename__ = "user_token_usage"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(Integer, ForeignKey("users.id"), unique=True)
tokens_used = Column(Integer, default=0)
last_reset_at = Column(DateTime)
user = relationship("User", backref="token_usage", uselist=False)
class AIRole(Base):
__tablename__ = "ai_roles"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, index=True)
description = Column(Text)
systemPrompt = Column(Text)
icon = Column(String)
iconFamily = Column(String)

View File

@@ -1,5 +1,5 @@
from pydantic import BaseModel, ConfigDict from pydantic import BaseModel, ConfigDict
from typing import List, Optional, Union from typing import List, Optional
from datetime import datetime from datetime import datetime
# Heir Schemas # Heir Schemas
@@ -45,7 +45,6 @@ class LoginResponse(BaseModel):
# Asset Schemas (renamed from Article) # Asset Schemas (renamed from Article)
class AssetBase(BaseModel): class AssetBase(BaseModel):
title: str title: str
type: Optional[str] = "note"
class AssetCreate(AssetBase): class AssetCreate(AssetBase):
private_key_shard: str private_key_shard: str
@@ -56,10 +55,6 @@ class AssetOut(AssetBase):
author_id: int author_id: int
private_key_shard: str private_key_shard: str
content_outer_encrypted: str content_outer_encrypted: str
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
heir_id: Optional[int] = None
heir_email: Optional[str] = None
model_config = ConfigDict(from_attributes=True) model_config = ConfigDict(from_attributes=True)
class AssetClaim(BaseModel): class AssetClaim(BaseModel):
@@ -72,18 +67,15 @@ class AssetClaimOut(AssetClaim):
class AssetAssign(BaseModel): class AssetAssign(BaseModel):
asset_id: int asset_id: int
heir_email: str heir_name: str
class AssetDelete(BaseModel):
asset_id: int
class DeclareGuale(BaseModel): class DeclareGuale(BaseModel):
username: str username: str
# AI Proxy Schemas (content: str for text-only, list for multimodal e.g. image_url) # AI Proxy Schemas
class AIMessage(BaseModel): class AIMessage(BaseModel):
role: str role: str
content: Union[str, List[dict]] content: str
class AIRequest(BaseModel): class AIRequest(BaseModel):
messages: List[AIMessage] messages: List[AIMessage]
@@ -95,31 +87,4 @@ class AIResponse(BaseModel):
created: int created: int
model: str model: str
choices: List[dict] choices: List[dict]
usage: dict usage: dict
# Subscription Plans Schemas
class SubscriptionPlansBase(BaseModel):
name: str
max_heirs: int
weekly_token_limit: int
max_assets: int
max_storage_mb: int
can_use_ai_proxy: bool
description: Optional[str] = None
class SubscriptionPlansOut(SubscriptionPlansBase):
id: int
model_config = ConfigDict(from_attributes=True)
# AI Role Schemas
class AIRoleBase(BaseModel):
id: int
name: str
description: str
systemPrompt: str
icon: str
iconFamily: str
class AIRoleOut(AIRoleBase):
model_config = ConfigDict(from_attributes=True)

View File

@@ -10,5 +10,4 @@ watchfiles
argon2_cffi argon2_cffi
pycryptodome pycryptodome
cryptography cryptography
httpx

View File

@@ -14,8 +14,7 @@ async def test_ai_proxy_integration():
print(f"1. Registering user: {username}") print(f"1. Registering user: {username}")
reg_res = await client.post("/register", json={ reg_res = await client.post("/register", json={
"username": username, "username": username,
"password": "testpassword", "password": "testpassword"
"email": f"user_{int(time.time())}@example.com"
}) })
if reg_res.status_code != 200: if reg_res.status_code != 200:
print(f"Registration failed: {reg_res.text}") print(f"Registration failed: {reg_res.text}")
@@ -23,7 +22,7 @@ async def test_ai_proxy_integration():
# 2. Login to get token # 2. Login to get token
print("2. Logging in...") print("2. Logging in...")
login_res = await client.post("/login", json={ login_res = await client.post("/token", json={
"username": username, "username": username,
"password": "testpassword" "password": "testpassword"
}) })

View File

@@ -35,35 +35,32 @@ def login_user(username, password):
print(f"Failed to login {username}: {response.text}") print(f"Failed to login {username}: {response.text}")
return None return None
def create_asset(token, title, private_key_shard, content_inner_encrypted, asset_type="note"): def create_asset(token, title, private_key_shard, content_inner_encrypted):
url = f"{BASE_URL}/assets/create" url = f"{BASE_URL}/assets/create"
headers = {"Authorization": f"Bearer {token}"} headers = {"Authorization": f"Bearer {token}"}
data = { data = {
"title": title, "title": title,
"type": asset_type,
"private_key_shard": str(private_key_shard), "private_key_shard": str(private_key_shard),
"content_inner_encrypted": str(content_inner_encrypted) "content_inner_encrypted": str(content_inner_encrypted)
} }
response = requests.post(url, json=data, headers=headers) response = requests.post(url, json=data, headers=headers)
if response.status_code == 200: if response.status_code == 200:
asset_data = response.json() print(f"Asset '{title}' created successfully.")
print(f"Asset '{title}' (type: {asset_type}) created successfully.") return response.json()
print(f" [校验] Timestamps: created_at={asset_data.get('created_at')}, updated_at={asset_data.get('updated_at')}")
return asset_data
else: else:
print(f"Failed to create asset: {response.text}") print(f"Failed to create asset: {response.text}")
return None return None
def assign_heir(token, asset_id, heir_email): def assign_heir(token, asset_id, heir_name):
url = f"{BASE_URL}/assets/assign" url = f"{BASE_URL}/assets/assign"
headers = {"Authorization": f"Bearer {token}"} headers = {"Authorization": f"Bearer {token}"}
data = { data = {
"asset_id": asset_id, "asset_id": asset_id,
"heir_email": heir_email "heir_name": heir_name
} }
response = requests.post(url, json=data, headers=headers) response = requests.post(url, json=data, headers=headers)
if response.status_code == 200: if response.status_code == 200:
print(f"Asset {asset_id} assigned to heir {heir_email} successfully.") print(f"Asset {asset_id} assigned to heir {heir_name} successfully.")
return response.json() return response.json()
else: else:
print(f"Failed to assign heir: {response.text}") print(f"Failed to assign heir: {response.text}")
@@ -108,17 +105,6 @@ def get_my_assets(token):
print(f"Failed to retrieve assets: {response.text}") print(f"Failed to retrieve assets: {response.text}")
return None return None
def get_designated_assets(token):
url = f"{BASE_URL}/assets/designated"
headers = {"Authorization": f"Bearer {token}"}
response = requests.get(url, headers=headers)
if response.status_code == 200:
print(f"Designated assets retrieved successfully.")
return response.json()
else:
print(f"Failed to retrieve designated assets: {response.text}")
return None
def main(): def main():
# 1. 创建三个用户 # 1. 创建三个用户
users = [ users = [
@@ -157,60 +143,38 @@ def main():
if not token1: if not token1:
return return
# 3. 创建个 asset # 3. 创建个 asset
asset1 = create_asset( asset1 = create_asset(
token1, token1,
"My Secret Asset1", "My Secret Asset1",
share_a, share_a,
ciphertext_1, ciphertext_1
"note"
) )
asset2 = create_asset( asset2 = create_asset(
token1, token1,
"My Secret Asset2", "My Secret Asset2",
share_a, share_a,
ciphertext_1, ciphertext_1
"note"
) )
asset3 = create_asset( if not asset1 or not asset2:
token1,
"My Secret Asset3",
share_a,
ciphertext_1,
"note"
)
if not asset1 or not asset2 or not asset3:
print(" [失败] 创建资产失败") print(" [失败] 创建资产失败")
return return
# 3.1 测试 /assets/get # 3.1 测试 /assets/get
print("\n [测试] 获取用户资产列表") print("\n [测试] 获取用户资产列表")
user1_assets = get_my_assets(token1) my_assets = get_my_assets(token1)
if user1_assets: if my_assets:
print(f" [输出] 用户1共有 {len(user1_assets)} 个资产") print(f" [输出] 成功获取 {len(my_assets)} 个资产")
else: else:
print(" [失败] 无法获取资产列表") print(" [失败] 无法获取资产列表")
print("用户 1 为用户 2 分配遗产") # 4. 指定用户 2 为继承人
assign_heir(token1, asset1["id"], "user2@example.com") print("用户 1 指定用户 2 为继承人")
assign_heir(token1, asset2["id"], "user2@example.com") assign_heir(token1, asset1["id"], "user2")
# 4.1 用户2查询自己能继承多少遗产
print("\n [测试] 用户 2 查询自己被指定的资产")
token2_temp = login_user("user2", "pass123")
designated_assets = get_designated_assets(token2_temp)
if designated_assets:
print(f" [输出] 用户 2 共有 {len(designated_assets)} 个被指定的资产")
for asset in designated_assets:
print(f" - Asset ID: {asset['id']}, Title: {asset['title']}")
else:
print(" [失败] 无法获取被指定资产列表")
print("\n## 3. 继承流 (Inheritance Layer)") print("\n## 3. 继承流 (Inheritance Layer)")
# 5. Admin 宣布用户 1 挂了 # 5. Admin 宣布用户 1 挂了
print("Admin 宣布用户 1 挂了") print("Admin 宣布用户 1 挂了")

View File

@@ -1,50 +0,0 @@
import httpx
import asyncio
BASE_URL = "http://localhost:8000"
async def test_search():
async with httpx.AsyncClient() as client:
# 1. Login to get token
login_data = {
"username": "testuser",
"password": "testpassword"
}
# First, try to register if user doesn't exist (assuming test env)
try:
await client.post(f"{BASE_URL}/register", json={
"username": "testuser",
"email": "test@example.com",
"password": "testpassword"
})
except:
pass
response = await client.post(f"{BASE_URL}/login", json=login_data)
if response.status_code != 200:
print(f"Login failed: {response.text}")
return
token = response.json()["access_token"]
headers = {"Authorization": f"Bearer {token}"}
# 2. Test search by username
print("Testing search by username 'test'...")
response = await client.get(f"{BASE_URL}/users/search?query=test", headers=headers)
print(f"Status: {response.status_code}")
print(f"Body: {response.json()}")
# 3. Test search by email
print("\nTesting search by email 'example'...")
response = await client.get(f"{BASE_URL}/users/search?query=example", headers=headers)
print(f"Status: {response.status_code}")
print(f"Body: {response.json()}")
# 4. Test search with no results
print("\nTesting search with no results 'nonexistent'...")
response = await client.get(f"{BASE_URL}/users/search?query=nonexistent", headers=headers)
print(f"Status: {response.status_code}")
print(f"Body: {response.json()}")
if __name__ == "__main__":
asyncio.run(test_search())