commit d22fa8a286e8abe183de759f7afa0075e0be0ffe Author: lusixing <32328454+lusixing@users.noreply.github.com> Date: Sat Jan 24 11:02:08 2026 -0800 backend_v0.1 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6fe2dcf --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +gitignore +# Python virtual environments +venv/ \ No newline at end of file diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..fe655ac --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,20 @@ +{ + "version": "0.2.0", + "configurations": [ + { + "name": "Python: Docker Debug", + "type": "python", + "request": "attach", + "connect": { + "host": "localhost", + "port": 5678 + }, + "pathMappings": [ + { + "localRoot": "${workspaceFolder}", + "remoteRoot": "/code" + } + ] + } + ] +} \ No newline at end of file diff --git a/app/__pycache__/auth.cpython-311.pyc b/app/__pycache__/auth.cpython-311.pyc new file mode 100644 index 0000000..cf66f44 Binary files /dev/null and b/app/__pycache__/auth.cpython-311.pyc differ diff --git a/app/__pycache__/database.cpython-311.pyc b/app/__pycache__/database.cpython-311.pyc new file mode 100644 index 0000000..f745474 Binary files /dev/null and b/app/__pycache__/database.cpython-311.pyc differ diff --git a/app/__pycache__/main.cpython-311.pyc b/app/__pycache__/main.cpython-311.pyc new file mode 100644 index 0000000..eea18b7 Binary files /dev/null and b/app/__pycache__/main.cpython-311.pyc differ diff --git a/app/__pycache__/models.cpython-311.pyc b/app/__pycache__/models.cpython-311.pyc new file mode 100644 index 0000000..2328b8d Binary files /dev/null and b/app/__pycache__/models.cpython-311.pyc differ diff --git a/app/__pycache__/schemas.cpython-311.pyc b/app/__pycache__/schemas.cpython-311.pyc new file mode 100644 index 0000000..ecd12b0 Binary files /dev/null and b/app/__pycache__/schemas.cpython-311.pyc differ diff --git a/app/auth.py b/app/auth.py new file mode 100644 index 0000000..6e02741 --- /dev/null +++ b/app/auth.py @@ -0,0 +1,108 @@ +from datetime import datetime, timedelta +from jose import jwt +from passlib.context import CryptContext + +SECRET_KEY = "your-secret-key" +ALGORITHM = "HS256" + +pwd_context = CryptContext( + schemes=["argon2", "bcrypt"], # 添加 argon2 作为备选方案 + default="argon2", # 使用 argon2 作为默认方案 + deprecated="auto", +) + +def hash_password(password: str) -> str: + return pwd_context.hash(password) + + +def verify_password(plain_password, hashed_password): + return pwd_context.verify(plain_password, hashed_password) + +def create_access_token(data: dict): + to_encode = data.copy() + expire = datetime.utcnow() + timedelta(minutes=30) + to_encode.update({"exp": expire}) + return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) + +from Crypto.PublicKey import RSA + +def generate_key_pair(): + key = RSA.generate(2048) + private_key = key.export_key().decode('utf-8') + public_key = key.publickey().export_key().decode('utf-8') + return private_key, public_key + +from Crypto.Cipher import PKCS1_OAEP, AES +from Crypto.Hash import SHA256 +from Crypto.Random import get_random_bytes +import base64 + +def encrypt_data(data: str, public_key_pem: str) -> str: + # 1. 准备 RSA 公钥 + recipient_key = RSA.import_key(public_key_pem) + cipher_rsa = PKCS1_OAEP.new(recipient_key) + + # 2. 生成随机 AES 会话密钥 (32 bytes) + session_key = get_random_bytes(32) + + # 3. 用 RSA 加密 AES 密钥 + enc_session_key = cipher_rsa.encrypt(session_key) + + # 4. 用 AES-GCM 加密实际数据 + cipher_aes = AES.new(session_key, AES.MODE_GCM) + ciphertext, tag = cipher_aes.encrypt_and_digest(data.encode('utf-8')) + + # 5. 组合结果: [RSA加密的密钥(256B)] + [Nonce(16B)] + [Tag(16B)] + [密文] + combined = enc_session_key + cipher_aes.nonce + tag + ciphertext + return base64.b64encode(combined).decode('utf-8') + +def decrypt_data(encrypted_data_b64: str, private_key_pem: str) -> str: + # 1. 解码合并的二进制数据 + encrypted_data = base64.b64decode(encrypted_data_b64) + + # 2. 分解组件 (RSA 2048位产生的密文固定为 256 字节) + rsa_key_len = 256 + enc_session_key = encrypted_data[:rsa_key_len] + nonce = encrypted_data[rsa_key_len : rsa_key_len + 16] + tag = encrypted_data[rsa_key_len + 16 : rsa_key_len + 32] + ciphertext = encrypted_data[rsa_key_len + 32 :] + + # 3. 用 RSA 私钥解密 AES 会话密钥 + private_key = RSA.import_key(private_key_pem) + cipher_rsa = PKCS1_OAEP.new(private_key) + session_key = cipher_rsa.decrypt(enc_session_key) + + # 4. 用 AES 会话密钥解密数据 + cipher_aes = AES.new(session_key, AES.MODE_GCM, nonce=nonce) + dec_data = cipher_aes.decrypt_and_verify(ciphertext, tag) + + return dec_data.decode('utf-8') + +from fastapi import Depends, HTTPException, status +from fastapi.security import OAuth2PasswordBearer +from sqlalchemy.future import select +from sqlalchemy.ext.asyncio import AsyncSession +from jose import JWTError +from . import database, models + +oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") + +async def get_current_user(token: str = Depends(oauth2_scheme), db: AsyncSession = Depends(database.get_db)): + credentials_exception = HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Could not validate credentials", + headers={"WWW-Authenticate": "Bearer"}, + ) + try: + payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) + username: str = payload.get("sub") + if username is None: + raise credentials_exception + except JWTError: + raise credentials_exception + + result = await db.execute(select(models.User).where(models.User.username == username)) + user = result.scalars().first() + if user is None: + raise credentials_exception + return user \ No newline at end of file diff --git a/app/database.py b/app/database.py new file mode 100644 index 0000000..ce12be0 --- /dev/null +++ b/app/database.py @@ -0,0 +1,50 @@ +# database.py +import os +from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession +from sqlalchemy.orm import sessionmaker, declarative_base + +# 优先从环境变量读取(Docker 部署推荐) +SQLALCHEMY_DATABASE_URL = os.getenv("DATABASE_URL", "postgresql+asyncpg://user:password@db:5432/fastapi_db") + +engine = create_async_engine(SQLALCHEMY_DATABASE_URL, echo=True) +AsyncSessionLocal = sessionmaker(bind=engine, class_=AsyncSession, expire_on_commit=False) + +Base = declarative_base() + +async def get_db(): + async with AsyncSessionLocal() as session: + yield session + # 注意:通常不在 get_db 里统一 commit,建议在 endpoint 里手动 commit + +async def init_db(): + async with engine.begin() as conn: + # 导入模型以确保 metadata 注册了表 + from . import models + # 自动创建表 + await conn.run_sync(Base.metadata.create_all) + + # 创建默认管理员用户 + async with AsyncSessionLocal() as session: + from . import auth + from sqlalchemy.future import select + + # 检查是否已存在 admin 用户 + result = await session.execute( + select(models.User).where(models.User.username == "admin") + ) + existing_admin = result.scalars().first() + + if not existing_admin: + # 创建管理员用户 + private_key, public_key = auth.generate_key_pair() + admin_user = models.User( + username="admin", + hashed_password=auth.hash_password("admin123"), + private_key=private_key, + public_key=public_key, + is_admin=True, + guale=False + ) + session.add(admin_user) + await session.commit() + print("✅ Default admin user created (username: admin, password: admin123)") \ No newline at end of file diff --git a/app/main.py b/app/main.py new file mode 100644 index 0000000..42de947 --- /dev/null +++ b/app/main.py @@ -0,0 +1,214 @@ +from fastapi import FastAPI, Depends, HTTPException, status +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.future import select +from sqlalchemy.orm import selectinload +from . import models, schemas, auth, database +from passlib.context import CryptContext +from sqlalchemy.exc import IntegrityError +from contextlib import asynccontextmanager + +@asynccontextmanager +async def lifespan(app: FastAPI): + # 启动时执行:创建表 + await database.init_db() + yield + # 关闭时执行(如果需要) + +app = FastAPI(lifespan=lifespan) + + +@app.post("/register", response_model=schemas.UserOut) +async def register(user: schemas.UserCreate, db: AsyncSession = Depends(database.get_db)): + hashed_pwd = auth.hash_password(user.password) + private_key, public_key = auth.generate_key_pair() + + new_user = models.User( + username=user.username, + hashed_password=hashed_pwd, + private_key=private_key, + public_key=public_key + ) + db.add(new_user) + try: + await db.commit() + await db.refresh(new_user) + return new_user + + except IntegrityError: + # 发生唯一约束冲突时回滚并报错 + await db.rollback() + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="用户名已存在" + ) + + + +@app.post("/token") +async def login(form_data: schemas.UserLogin, db: AsyncSession = Depends(database.get_db)): + result = await db.execute(select(models.User).where(models.User.username == form_data.username)) + user = result.scalars().first() + if not user or not auth.verify_password(form_data.password, user.hashed_password): + raise HTTPException(status_code=400, detail="Incorrect username or password") + + access_token = auth.create_access_token(data={"sub": user.username}) + return {"access_token": access_token, "token_type": "bearer"} + + +@app.post("/assets/", response_model=schemas.AssetOut) +async def create_asset( + asset: schemas.AssetCreate, + current_user: models.User = Depends(auth.get_current_user), + db: AsyncSession = Depends(database.get_db) +): + # Encrypt the inner content using user's public key + encrypted_content = auth.encrypt_data(asset.content_inner_encrypted, current_user.public_key) + + new_asset = models.Asset( + title=asset.title, + content_outer_encrypted=encrypted_content, + private_key_shard=asset.private_key_shard, + author_id=current_user.id + ) + db.add(new_asset) + await db.commit() + await db.refresh(new_asset) + return new_asset + + +@app.post("/assets/claim") +async def claim_asset( + asset_claim: schemas.AssetClaim, + current_user: models.User = Depends(auth.get_current_user), + db: AsyncSession = Depends(database.get_db) +): + # Fetch asset with author loaded + result = await db.execute( + select(models.Asset) + .options(selectinload(models.Asset.author)) + .where(models.Asset.id == asset_claim.asset_id) + ) + asset = result.scalars().first() + + if not asset: + raise HTTPException(status_code=404, detail="Asset not found") + + # 1. 验证用户是否是继承人 + if asset.heir_id != current_user.id: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="You are not the designated heir for this asset" + ) + + # 2. 验证所有人是否已经挂了 (guale) + if not asset.author.guale: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="The owner of this asset is still alive. You cannot claim it yet." + ) + + # 3. 验证通过后用asset所有人的private_key解密内容 + try: + decrypted_content = auth.decrypt_data( + asset.content_outer_encrypted, + asset.author.private_key + ) + except Exception as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to decrypt asset: {str(e)}" + ) + + return { + "asset_id": asset.id, + "title": asset.title, + "decrypted_content": decrypted_content, + "server_shard_key": asset.private_key_shard + } + + +@app.post("/assets/assign") +async def assign_asset( + assignment: schemas.AssetAssign, + current_user: models.User = Depends(auth.get_current_user), + db: AsyncSession = Depends(database.get_db) +): + # Fetch Asset + result = await db.execute( + select(models.Asset) + .options(selectinload(models.Asset.heir)) + .where(models.Asset.id == assignment.asset_id) + ) + asset = result.scalars().first() + + if not asset: + raise HTTPException(status_code=404, detail="Asset not found") + + if asset.author_id != current_user.id: + raise HTTPException(status_code=403, detail="Not authorized to assign this asset") + + + heir_result = await db.execute( + select(models.User).where( + models.User.username == assignment.heir_name + ) + ) + heir_user = heir_result.scalars().first() + + if not heir_user: + raise HTTPException(status_code=404, detail="Heir not found") + + if heir_user.id == current_user.id: + asset.heir = None + await db.commit() + #raise HTTPException(status_code=403, detail="You cannot assign an asset to yourself") + return {"message": "Asset unassigned"} + + asset.heir = heir_user + await db.commit() + + return {"message": f"Asset assigned to {assignment.heir_name}"} + + + + +@app.post("/admin/declare-guale") +async def declare_user_guale( + declare: schemas.DeclareGuale, + current_user: models.User = Depends(auth.get_current_user), + db: AsyncSession = Depends(database.get_db) +): + # Check if current user is admin + if not current_user.is_admin: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Only administrators can declare users as deceased" + ) + + # Find the target user + result = await db.execute( + select(models.User).where(models.User.username == declare.username) + ) + target_user = result.scalars().first() + + if not target_user: + raise HTTPException(status_code=404, detail="User not found") + + # Set guale to True + target_user.guale = True + await db.commit() + + return { + "message": f"User {declare.username} has been declared as deceased", + "username": target_user.username, + "guale": target_user.guale + } + +# 用于测试热加载 +@app.post("/post1") +async def test1(): + a=2 + b=3 + c = a+b + return {"msg": f"this is a msg {c}"} + diff --git a/app/models.py b/app/models.py new file mode 100644 index 0000000..ac9fa06 --- /dev/null +++ b/app/models.py @@ -0,0 +1,37 @@ +from sqlalchemy import Column, Integer, String, ForeignKey, Text, Table, Boolean +from sqlalchemy.orm import relationship + +from .database import Base + + +class User(Base): + __tablename__ = "users" + + id = Column(Integer, primary_key=True, index=True) + username = Column(String, unique=True, index=True) + hashed_password = Column(String) + # System keys + public_key = Column(String) + private_key = Column(String) # Encrypted or raw? Storing raw for now as per req + is_admin = Column(Boolean, default=False) + guale = Column(Boolean, default=False) + + assets = relationship("Asset", foreign_keys="Asset.author_id", back_populates="author") + inherited_assets = relationship("Asset", foreign_keys="Asset.heir_id", back_populates="heir") + + + +class Asset(Base): + __tablename__ = "assets" + + id = Column(Integer, primary_key=True, index=True) + title = Column(String, index=True) + content_outer_encrypted = Column(Text) + author_id = Column(Integer, ForeignKey("users.id")) + heir_id = Column(Integer, ForeignKey("users.id")) + + # Key shard for this asset + private_key_shard = Column(String) + + author = relationship("User", foreign_keys=[author_id], back_populates="assets") + heir = relationship("User", foreign_keys=[heir_id], back_populates="inherited_assets") \ No newline at end of file diff --git a/app/note b/app/note new file mode 100644 index 0000000..051014c --- /dev/null +++ b/app/note @@ -0,0 +1 @@ +sudo docker compose exec web python reset_db.py \ No newline at end of file diff --git a/app/schemas.py b/app/schemas.py new file mode 100644 index 0000000..394e111 --- /dev/null +++ b/app/schemas.py @@ -0,0 +1,62 @@ +from pydantic import BaseModel, ConfigDict +from typing import List, Optional + +# Heir Schemas +class HeirBase(BaseModel): + name: str + +class HeirCreate(HeirBase): + pass + +class HeirOut(HeirBase): + id: int + user_id: int + model_config = ConfigDict(from_attributes=True) + +# User Schemas +class UserCreate(BaseModel): + username: str + password: str + +class UserLogin(BaseModel): + username: str + password: str + +class UserOut(BaseModel): + id: int + username: str + public_key: Optional[str] = None + is_admin: bool = False + guale: bool = False + #heirs: List[HeirOut] = [] + model_config = ConfigDict(from_attributes=True) + +# Asset Schemas (renamed from Article) +class AssetBase(BaseModel): + title: str + +class AssetCreate(AssetBase): + private_key_shard: str + content_inner_encrypted: str + +class AssetOut(AssetBase): + id: int + author_id: int + private_key_shard: str + content_outer_encrypted: str + model_config = ConfigDict(from_attributes=True) + +class AssetClaim(BaseModel): + asset_id: int + private_key_shard: str + +class AssetClaimOut(AssetClaim): + id: int + result: str + +class AssetAssign(BaseModel): + asset_id: int + heir_name: str + +class DeclareGuale(BaseModel): + username: str \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..fb020bd --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,28 @@ +version: '3.8' + +services: + db: + image: postgres:15 + environment: + POSTGRES_USER: user + POSTGRES_PASSWORD: password + POSTGRES_DB: fastapi_db + ports: + - "5432:5432" + + web: + image: python:3.11-slim + command: > + sh -c "pip install -r requirements.txt && python -Xfrozen_modules=off -m uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload" + # 需要debug时使用 + # sh -c "pip install -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple && python -Xfrozen_modules=off -m debugpy --listen 0.0.0.0:5678 --wait-for-client -m uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload" + volumes: + - .:/code + working_dir: /code + ports: + - "8000:8000" + - "5678:5678" # 暴露调试端口 + environment: + - DATABASE_URL=postgresql+asyncpg://user:password@db:5432/fastapi_db + depends_on: + - db \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..c03768a --- /dev/null +++ b/requirements.txt @@ -0,0 +1,13 @@ +fastapi +uvicorn[standard] +sqlalchemy[asyncio] +asyncpg +python-jose[cryptography] +passlib[bcrypt] +python-multipart +debugpy +watchfiles +argon2_cffi +pycryptodome +cryptography + diff --git a/reset_db.py b/reset_db.py new file mode 100644 index 0000000..2e336de --- /dev/null +++ b/reset_db.py @@ -0,0 +1,37 @@ + +import asyncio +from sqlalchemy.ext.asyncio import create_async_engine +from sqlalchemy import text +import os + +# Use 'db' for docker-compose execution (service name) +# Note: In docker-compose, internal is 'db', but from host it is 'localhost' mapped port 5432 +DATABASE_URL = "postgresql+asyncpg://user:password@db:5432/fastapi_db" + +async def reset_db(): + print(f"Connecting to {DATABASE_URL}...") + engine = create_async_engine(DATABASE_URL, echo=True) + + async with engine.begin() as conn: + print("Dropping all tables...") + # Disable foreign key checks to allow dropping in any order (Postgres specific) + await conn.execute(text("DROP SCHEMA public CASCADE;")) + await conn.execute(text("CREATE SCHEMA public;")) + print("Schema reset complete.") + + await engine.dispose() + +if __name__ == "__main__": + try: + asyncio.run(reset_db()) + print("Database reset successfully.") + except Exception as e: + print(f"Error resetting database: {e}") + # Try 'db' host if localhost fails (in case running inside container) + if "Connection refused" in str(e) or "gaierror" in str(e): + print("Retrying with host 'db'...") + DATABASE_URL = "postgresql+asyncpg://user:password@db:5432/fastapi_db" + try: + asyncio.run(reset_db()) + except Exception as e2: + print(f"Failed inside container too: {e2}") diff --git a/test/core/__pycache__/sp_gateway_rsa.cpython-310.pyc b/test/core/__pycache__/sp_gateway_rsa.cpython-310.pyc new file mode 100644 index 0000000..9d09b80 Binary files /dev/null and b/test/core/__pycache__/sp_gateway_rsa.cpython-310.pyc differ diff --git a/test/core/__pycache__/sp_trust_sharding.cpython-310.pyc b/test/core/__pycache__/sp_trust_sharding.cpython-310.pyc new file mode 100644 index 0000000..001e99a Binary files /dev/null and b/test/core/__pycache__/sp_trust_sharding.cpython-310.pyc differ diff --git a/test/core/__pycache__/sp_trust_sharding.cpython-313.pyc b/test/core/__pycache__/sp_trust_sharding.cpython-313.pyc new file mode 100644 index 0000000..ea66fc1 Binary files /dev/null and b/test/core/__pycache__/sp_trust_sharding.cpython-313.pyc differ diff --git a/test/core/__pycache__/sp_vault_aes.cpython-310.pyc b/test/core/__pycache__/sp_vault_aes.cpython-310.pyc new file mode 100644 index 0000000..11401e9 Binary files /dev/null and b/test/core/__pycache__/sp_vault_aes.cpython-310.pyc differ diff --git a/test/core/__pycache__/sp_vault_aes.cpython-313.pyc b/test/core/__pycache__/sp_vault_aes.cpython-313.pyc new file mode 100644 index 0000000..dac5d4b Binary files /dev/null and b/test/core/__pycache__/sp_vault_aes.cpython-313.pyc differ diff --git a/test/core/sp_gateway_rsa.py b/test/core/sp_gateway_rsa.py new file mode 100644 index 0000000..46ac90b --- /dev/null +++ b/test/core/sp_gateway_rsa.py @@ -0,0 +1,85 @@ +from cryptography.hazmat.primitives.asymmetric import rsa, padding +from cryptography.hazmat.primitives import serialization, hashes + +class SentinelSystemProvider: + """系统级非对称加密提供者 (独立于用户)""" + + @staticmethod + def generate_system_keys(): + """生成全新的系统公私钥对""" + private_key = rsa.generate_private_key( + public_exponent=65537, + key_size=4096 + ) + public_key = private_key.public_key() + + # 序列化私钥 (用于保存到安全服务器) + private_pem = private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption() + ) + + # 序列化公钥 (用于下发或在线加密) + public_pem = public_key.public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo + ) + + return private_pem, public_pem + + @staticmethod + def encrypt_with_system_public(public_pem, data_bytes): + """使用系统公钥进行二次加密""" + public_key = serialization.load_pem_public_key(public_pem) + ciphertext = public_key.encrypt( + data_bytes, + padding.OAEP( + mgf=padding.MGF1(algorithm=hashes.SHA256()), + algorithm=hashes.SHA256(), + label=None + ) + ) + return ciphertext + + @staticmethod + def decrypt_with_system_private(private_pem, ciphertext): + """使用系统私钥进行二次解密""" + private_key = serialization.load_pem_private_key(private_pem, password=None) + plaintext = private_key.decrypt( + ciphertext, + padding.OAEP( + mgf=padding.MGF1(algorithm=hashes.SHA256()), + algorithm=hashes.SHA256(), + label=None + ) + ) + return plaintext +if __name__ == "__main__": + # --- 演示流程 --- + + # 1. 初始化系统密钥 (这一步通常只在系统上线时执行一次) + sys_provider = SentinelSystemProvider() + private_pem, public_pem = sys_provider.generate_system_keys() + + print("【系统层】: 独立公私钥已生成。") + print(f" - 公钥 (PEM): {public_pem.decode('utf-8')[:50]}...") + print(f" - 私钥 (PEM): {private_pem.decode('utf-8')[:50]}...") + + # 2. 模拟用户已经加密过的数据 (这已经是用户那一层加密后的二进制数据) + user_encrypted_data = b"User_Encrypted_Blob_v1.0_Data" + print(f"【输入数据】: {user_encrypted_data}") + + # 3. 系统二次加密 (外层锁) + # 这一步发生在数据上传服务器时,或者存入信托池时 + double_locked_data = sys_provider.encrypt_with_system_public(public_pem, user_encrypted_data) + print(f"【使用公钥加密完成 (密文)】: {double_locked_data.hex()[:50]}...") + + # 4. 系统二次解密 (判定传承触发后) + # 只有在满足触发条件(如订阅失败)后,系统才调取私钥进行这第一层解密 + try: + system_unlocked_data = sys_provider.decrypt_with_system_private(private_pem, double_locked_data) + print(f"【使用私钥解密成功】: {system_unlocked_data}") + print("【后续步骤】: 现在数据已回归用户初级加密态,可交给用户或者继承人进行最后解密。") + except Exception as e: + print(f"解密失败: {e}") \ No newline at end of file diff --git a/test/core/sp_trust_sharding.py b/test/core/sp_trust_sharding.py new file mode 100644 index 0000000..64592d5 --- /dev/null +++ b/test/core/sp_trust_sharding.py @@ -0,0 +1,113 @@ +import hashlib +import secrets +from mnemonic import Mnemonic # 仅用于标准的助记词转换 + +class SentinelKeyEngine: + # 使用第 13 个梅森素数 (2^521 - 1),远大于 128-bit 熵,确保有限域安全 + PRIME = 2**521 - 1 + + def __init__(self): + self.mnemo = Mnemonic("english") + + def generate_vault_keys(self): + """ + 1. 生成原始 12 助记词 (Master Key) + """ + words = self.mnemo.generate(strength=128) + entropy = self.mnemo.to_entropy(words) + return words, entropy + + def split_to_shares(self, entropy): + """ + 2. SSS (3,2) 门限分片逻辑 + 公式: f(x) = S + a*x (直线方程,S为秘密,a为随机斜率) + 我们将秘密 S 分成 3 份,任选 2 份即可恢复。 + 注意:必须在有限域 GF(PRIME) 下进行运算以保证完善保密性。 + """ + # 将熵转换为大整数 + secret_int = int.from_bytes(entropy, 'big') + + # 生成一个随机系数 a (安全性需与秘密强度一致) + # a 必须在 [0, PRIME-1] 范围内 + a = secrets.randbelow(self.PRIME) + + # 定义 3 个点: x=1, x=2, x=3 + # Share = (x, f(x)) + def f(x): return (secret_int + a * x) % self.PRIME + + share1 = (1, f(1)) # 手机分片 + share2 = (2, f(2)) # 云端分片 + share3 = (3, f(3)) # 传承卡分片 + + return [share1, share2, share3] + + def recover_from_shares(self, share_a, share_b): + """ + 3. 恢复逻辑:拉格朗日插值还原 + 已知 (x1, y1) 和 (x2, y2),求 f(0) 即秘密 S + 公式: S = (x2*y1 - x1*y2) / (x2 - x1) + 在有限域下,除法变为乘以模逆: S = (x2*y1 - x1*y2) * (x2 - x1)^-1 mod P + """ + x1, y1 = share_a + x2, y2 = share_b + + # 计算分子 + numerator = (x2 * y1 - x1 * y2) % self.PRIME + # 计算分母的模逆 (x2 - x1) + denominator = (x2 - x1) % self.PRIME + inv_denominator = pow(denominator, -1, self.PRIME) + + # 还原常数项 S + secret_int = (numerator * inv_denominator) % self.PRIME + + # 转回字节并生成助记词 + # 注意:secret_int 可能略小于 16 字节(高位为0),需要补齐 + # 但由于 entropy 原始就是 16 字节,这里直接转换即可 + try: + recovered_entropy = secret_int.to_bytes(16, 'big') + except OverflowError: + # 理论上不应发生,除非计算出的 secret_int 大于 128 bit (即原始 entropy 大于 128 bit) + # 这里为了健壮性,如果原始 entropy 是 16 字节,这里应该也是。 + # 如果 PRIME 很大,secret_int 还是原来的值。 + recovered_entropy = secret_int.to_bytes((secret_int.bit_length() + 7) // 8, 'big') + + return self.mnemo.to_mnemonic(recovered_entropy) +if __name__ == "__main__": + # --- Sentinel 协议业务流程模拟 --- + + engine = SentinelKeyEngine() + + # [生前]:初始化金库 + master_words, entropy = engine.generate_vault_keys() + print(f"【1. 生成原始助记词】: {master_words}") + + shares = engine.split_to_shares(entropy) + print(f"【2. SSS 分片完成】:") + print(f" - 分片1 (手机安全区): {shares[0]}") + print(f" - 分片2 (Sentinel云): {shares[1]}") + print(f" - 分片3 (传承卡单词): {shares[2]}") + + print("-" * 50) + + # [死后/传承]:模拟用户失联,触发被动验证 + # 假设继承人拿着卡片 (Share 3),向服务器请求分片 (Share 2) + successor_share = shares[2] + server_share = shares[1] + + # 执行恢复 + recovered_words = engine.recover_from_shares(shares[0], shares[1]) + print(f"【1. 手机+云 : {recovered_words}") + + recovered_words = engine.recover_from_shares(shares[0], shares[2]) + print(f"【2. 手机+传承卡 : {recovered_words}") + + recovered_words = engine.recover_from_shares(shares[1], shares[2]) + print(f"【3. 云+传承卡 : {recovered_words}") + + # 校验一致性 + assert recovered_words == master_words + print("\n结果:恢复出的助记词与原始完全一致。") + + + with open("words.txt", "w") as f: + f.write("%s\n"%master_words) \ No newline at end of file diff --git a/test/core/sp_vault_aes.py b/test/core/sp_vault_aes.py new file mode 100644 index 0000000..2d7d206 --- /dev/null +++ b/test/core/sp_vault_aes.py @@ -0,0 +1,77 @@ +import os +from mnemonic import Mnemonic +from Crypto.Cipher import AES +from Crypto.Protocol.KDF import PBKDF2 +from Crypto.Util.Padding import pad, unpad + +class SentinelVault: + def __init__(self, salt=None): + self.mnemo = Mnemonic("english") + # 默认盐值仅用于演示,生产环境建议每个用户随机生成并存储 + self.salt = salt if salt else b'Sentinel_Salt_2026' + + def derive_key(self, mnemonic_phrase): + """ + 使用 PBKDF2 将助记词转换为 AES-256 密钥 (32 bytes) + """ + # 种子生成遵循 BIP-39 逻辑 + seed = self.mnemo.to_seed(mnemonic_phrase, passphrase="") + # 派生出一个 32 字节的强密钥 + key = PBKDF2(seed, self.salt, dkLen=32, count=100000) + return key + + def encrypt_data(self, key, plaintext): + """ + 使用 AES-256 GCM 模式进行加密 (具备完整性校验) + """ + cipher = AES.new(key, AES.MODE_GCM) + nonce = cipher.nonce + ciphertext, tag = cipher.encrypt_and_digest(plaintext.encode('utf-8')) + # 返回:随机数 + 校验位 + 密文 + return nonce + tag + ciphertext + + def decrypt_data(self, key, encrypted_blob): + """ + AES-256 GCM 解密 + """ + nonce = encrypted_blob[:16] + tag = encrypted_blob[16:32] + ciphertext = encrypted_blob[32:] + + cipher = AES.new(key, AES.MODE_GCM, nonce=nonce) + try: + plaintext = cipher.decrypt_and_verify(ciphertext, tag) + return plaintext.decode('utf-8') + except ValueError: + return "【解密失败】:密钥错误或数据被篡改" + +if __name__ == "__main__": + # --- 模拟 Sentinel 协议完整业务流 --- + + # 1. 假设这是通过之前 SSS 算法恢复出来的 12 词 + recovered_mnemonic = "apple banana cherry dog elephant fish goat horse ice jacket kangaroo lion" + try: + with open("words.txt", "r") as f: + recovered_mnemonic = f.read().strip() + except FileNotFoundError: + print("words.txt 文件未找到,使用默认助记词进行演示。") + + print(f"Demo助记词:{recovered_mnemonic}") + vault = SentinelVault() + + # 2. 生成加密密钥 + aes_key = vault.derive_key(recovered_mnemonic) + aes_key_hex = aes_key.hex() + print(f"【密钥派生完成】:len:{len(aes_key_hex)} -> {aes_key_hex[:20]}...") + + # 3. 用户生前加密资产(如:银行账户、数字遗产) + my_legacy = "我的瑞士银行账号是:CH123456789,密码是:Sentinel2026" + print(f"【Demo资产信息】:{my_legacy}") + encrypted_asset = vault.encrypt_data(aes_key, my_legacy) + encrypted_asset_hex = encrypted_asset.hex() + print(f"【数据已加密】:len:{len(encrypted_asset_hex)} -> {encrypted_asset_hex[:40]}...") + + # 4. 模拟继承人通过分片拼凑后进行解密 + print("-" * 50) + decrypted_content = vault.decrypt_data(aes_key, encrypted_asset) + print(f"【继承人解密成功】:{decrypted_content}") \ No newline at end of file diff --git a/test/test_scenario.py b/test/test_scenario.py new file mode 100644 index 0000000..0f7268c --- /dev/null +++ b/test/test_scenario.py @@ -0,0 +1,201 @@ +import requests +import json +from core.sp_trust_sharding import SentinelKeyEngine +from core.sp_vault_aes import SentinelVault +import ast + +BASE_URL = "http://localhost:8000" + +def register_user(username, password): + url = f"{BASE_URL}/register" + data = { + "username": username, + "password": password + } + response = requests.post(url, json=data) + if response.status_code == 200: + print(f"User {username} registered successfully.") + return response.json() + else: + print(f"Failed to register {username}: {response.text}") + return None + +def login_user(username, password): + url = f"{BASE_URL}/token" + data = { + "username": username, + "password": password + } + response = requests.post(url, json=data) + if response.status_code == 200: + print(f"User {username} logged in successfully.") + return response.json()["access_token"] + else: + print(f"Failed to login {username}: {response.text}") + return None + +def create_asset(token, title, private_key_shard, content_inner_encrypted): + url = f"{BASE_URL}/assets/" + headers = {"Authorization": f"Bearer {token}"} + data = { + "title": title, + "private_key_shard": str(private_key_shard), + "content_inner_encrypted": str(content_inner_encrypted) + } + response = requests.post(url, json=data, headers=headers) + if response.status_code == 200: + print(f"Asset '{title}' created successfully.") + return response.json() + else: + print(f"Failed to create asset: {response.text}") + return None + +def assign_heir(token, asset_id, heir_name): + url = f"{BASE_URL}/assets/assign" + headers = {"Authorization": f"Bearer {token}"} + data = { + "asset_id": asset_id, + "heir_name": heir_name + } + response = requests.post(url, json=data, headers=headers) + if response.status_code == 200: + print(f"Asset {asset_id} assigned to heir {heir_name} successfully.") + return response.json() + else: + print(f"Failed to assign heir: {response.text}") + return None + +def declare_user_guale(token, username): + url = f"{BASE_URL}/admin/declare-guale" + headers = {"Authorization": f"Bearer {token}"} + data = {"username": username} + response = requests.post(url, json=data, headers=headers) + if response.status_code == 200: + print(f"User {username} declared as 'guale' successfully.") + return response.json() + else: + print(f"Failed to declare guale: {response.text}") + return None + +def claim_asset(token, asset_id, private_key_shard): + url = f"{BASE_URL}/assets/claim" + headers = {"Authorization": f"Bearer {token}"} + data = { + "asset_id": asset_id, + "private_key_shard": private_key_shard + } + response = requests.post(url, json=data, headers=headers) + if response.status_code == 200: + print(f"Asset {asset_id} claimed successfully.") + return response.json() + else: + print(f"Failed to claim asset: {response.text}") + return None + +def main(): + # 1. 创建三个用户 + users = [ + ("user1", "pass123"), + ("user2", "pass123"), + ("user3", "pass123") + ] + + for username, password in users: + register_user(username, password) + + # 1.1 用户一信息生成 + key_engine = SentinelKeyEngine() + # 1.1 生成助记词 (BIP-39) + master_words, entropy = key_engine.generate_vault_keys() + print(f" [生成] 原始助记词: {master_words}") + + # 1.2 SSS 分片 (3-of-2) + shares = key_engine.split_to_shares(entropy) + share_a = shares[0] # Device (手机) + share_b = shares[1] # Cloud (云端) + share_c = shares[2] # Physical (传承卡) + + print("\n## 2. 用户内层加密流 (Vault Layer)") + user_data = "我的瑞士银行账号是:CH123456789,密码是:Sentinel2027" + print(f" [输入] 用户隐私数据: {user_data}") + + vault = SentinelVault() + # 2.1 派生 AES 密钥 + aes_key = vault.derive_key(master_words) + # 2.2 加密数据 + ciphertext_1 = vault.encrypt_data(aes_key, user_data) + + # 2. 用户一登录 + token1 = login_user("user1", "pass123") + if not token1: + return + + # 3. 创建一个 asset + asset = create_asset( + token1, + "My Secret Asset", + share_a, + ciphertext_1 + ) + + if not asset: + return + + asset_id = asset["id"] + print(f" [输出] Asset ID: {asset_id}") + + # 4. 指定用户 2 为继承人 + print("用户 1 指定用户 2 为继承人") + assign_heir(token1, asset_id, "user2") + + print("\n## 3. 继承流 (Inheritance Layer)") + # 5. Admin 宣布用户 1 挂了 + print("Admin 宣布用户 1 挂了") + admin_token = login_user("admin", "admin123") + if not admin_token: + print("Failed to login as admin. Make sure the database is initialized with an admin user.") + return + + declare_user_guale(admin_token, "user1") + + # 6. 用户 2 登录 + print("用户 2 登录") + token2 = login_user("user2", "pass123") + if not token2: + return + + # 7. 用户 2 申领资产,并带上自己的分片 (share_c) + print("用户 2 申领资产,并带上自己的分片 (share_c)") + claim_res = claim_asset(token2, asset_id, json.dumps(share_c)) + if not claim_res: + return + + print(f" [输出] Claim Result (私钥分片与加密内容已获取):") + print(f" - Server Shard Key: {claim_res['server_shard_key']}") + print(f" - Decrypted Content (Outer Layer): {claim_res['decrypted_content'][:50]}...") + + print("\n## 4. 客户端恢复流 (Client Recovery)") + # 8. 恢复助记词 + # 继承人有自己的 share_c,从服务器拿到了存储在 asset 里的 share_a (server_shard_key) + #server_shard = tuple(claim_res['server_shard_key']) + server_shard = ast.literal_eval(claim_res['server_shard_key']) + + recovered_mnemonic = key_engine.recover_from_shares(server_shard, share_c) + print(f" [恢复] 助记词: {recovered_mnemonic}") + + # 9. 派生密钥 + recovered_aes_key = vault.derive_key(recovered_mnemonic) + + # 10. 解密内容 (Inner Layer) + #inner_ciphertext = bytes.fromhex(claim_res["decrypted_content"]) + inner_ciphertext = ast.literal_eval(claim_res["decrypted_content"]) + decrypted_final = vault.decrypt_data(recovered_aes_key, inner_ciphertext) + print(f" [完成] 解密后的原始数据: {decrypted_final}") + + if decrypted_final == user_data: + print("\n✅ 测试成功!数据完整恢复。") + else: + print("\n❌ 测试失败!解密数据不匹配。") + +if __name__ == "__main__": + main() diff --git a/verify_integrity.py b/verify_integrity.py new file mode 100644 index 0000000..5bb82b3 --- /dev/null +++ b/verify_integrity.py @@ -0,0 +1,39 @@ + +import sys +import os + +# Ensure we can import app +sys.path.append(os.getcwd()) + +print("Importing app modules...") +try: + from app import models, schemas, auth, main + print("Imports successful.") +except Exception as e: + print(f"Import failed: {e}") + sys.exit(1) + +print("Testing Key Generation...") +try: + priv, pub = auth.generate_key_pair() + if priv and pub: + print("Key generation successful.") + print(f"Public Key sample: {pub[:30]}...") + else: + print("Key generation returned empty values.") + sys.exit(1) +except Exception as e: + print(f"Key generation failed: {e}") + sys.exit(1) + +print("Testing Model Instantiation...") +try: + user = models.User(username="test", public_key=pub, private_key=priv) + heir = models.Heir(name="heir1") + asset = models.Asset(title="test asset", content="content", private_key_shard="shard") + print("Model instantiation successful.") +except Exception as e: + print(f"Model instantiation failed: {e}") + sys.exit(1) + +print("Verification passed!")