import requests import json from core.sp_trust_sharding import SentinelKeyEngine from core.sp_vault_aes import SentinelVault import ast BASE_URL = "http://localhost:8000" def register_user(username, email, password): url = f"{BASE_URL}/register" data = { "username": username, "password": password, "email": email, } response = requests.post(url, json=data) if response.status_code == 200: print(f"User {username} registered successfully.") return response.json() else: print(f"Failed to register {username}: {response.text}") return None def login_user(username, password): url = f"{BASE_URL}/login" data = { "username": username, "password": password } response = requests.post(url, json=data) if response.status_code == 200: print(f"User {username} logged in successfully.") return response.json()["access_token"] else: print(f"Failed to login {username}: {response.text}") return None def create_asset(token, title, private_key_shard, content_inner_encrypted): url = f"{BASE_URL}/assets/create" headers = {"Authorization": f"Bearer {token}"} data = { "title": title, "private_key_shard": str(private_key_shard), "content_inner_encrypted": str(content_inner_encrypted) } response = requests.post(url, json=data, headers=headers) if response.status_code == 200: print(f"Asset '{title}' created successfully.") return response.json() else: print(f"Failed to create asset: {response.text}") return None def assign_heir(token, asset_id, heir_name): url = f"{BASE_URL}/assets/assign" headers = {"Authorization": f"Bearer {token}"} data = { "asset_id": asset_id, "heir_name": heir_name } response = requests.post(url, json=data, headers=headers) if response.status_code == 200: print(f"Asset {asset_id} assigned to heir {heir_name} successfully.") return response.json() else: print(f"Failed to assign heir: {response.text}") return None def declare_user_guale(token, username): url = f"{BASE_URL}/admin/declare-guale" headers = {"Authorization": f"Bearer {token}"} data = {"username": username} response = requests.post(url, json=data, headers=headers) if response.status_code == 200: print(f"User {username} declared as 'guale' successfully.") return response.json() else: print(f"Failed to declare guale: {response.text}") return None def claim_asset(token, asset_id, private_key_shard): url = f"{BASE_URL}/assets/claim" headers = {"Authorization": f"Bearer {token}"} data = { "asset_id": asset_id, "private_key_shard": private_key_shard } response = requests.post(url, json=data, headers=headers) if response.status_code == 200: print(f"Asset {asset_id} claimed successfully.") return response.json() else: print(f"Failed to claim asset: {response.text}") return None def get_my_assets(token): url = f"{BASE_URL}/assets/get" headers = {"Authorization": f"Bearer {token}"} response = requests.get(url, headers=headers) if response.status_code == 200: print(f"Assets retrieved successfully.") return response.json() else: print(f"Failed to retrieve assets: {response.text}") return None def main(): # 1. 创建三个用户 users = [ ("user1", "pass123", "user1@example.com"), ("user2", "pass123", "user2@example.com"), ("user3", "pass123", "user3@example.com") ] for username, password, email in users: register_user(username, email, password) # 1.1 用户一信息生成 key_engine = SentinelKeyEngine() # 1.1 生成助记词 (BIP-39) master_words, entropy = key_engine.generate_vault_keys() print(f" [生成] 原始助记词: {master_words}") # 1.2 SSS 分片 (3-of-2) shares = key_engine.split_to_shares(entropy) share_a = shares[0] # Device (手机) share_b = shares[1] # Cloud (云端) share_c = shares[2] # Physical (传承卡) print("\n## 2. 用户内层加密流 (Vault Layer)") user_data = "我的瑞士银行账号是:CH123456789,密码是:Sentinel2027" print(f" [输入] 用户隐私数据: {user_data}") vault = SentinelVault() # 2.1 派生 AES 密钥 aes_key = vault.derive_key(master_words) # 2.2 加密数据 ciphertext_1 = vault.encrypt_data(aes_key, user_data) # 2. 用户一登录 token1 = login_user("user1", "pass123") if not token1: return # 3. 创建一个 asset asset1 = create_asset( token1, "My Secret Asset1", share_a, ciphertext_1 ) asset2 = create_asset( token1, "My Secret Asset2", share_a, ciphertext_1 ) if not asset1 or not asset2: print(" [失败] 创建资产失败") return # 3.1 测试 /assets/get print("\n [测试] 获取用户资产列表") my_assets = get_my_assets(token1) if my_assets: print(f" [输出] 成功获取 {len(my_assets)} 个资产") else: print(" [失败] 无法获取资产列表") # 4. 指定用户 2 为继承人 print("用户 1 指定用户 2 为继承人") assign_heir(token1, asset1["id"], "user2") print("\n## 3. 继承流 (Inheritance Layer)") # 5. Admin 宣布用户 1 挂了 print("Admin 宣布用户 1 挂了") admin_token = login_user("admin", "admin123") if not admin_token: print("Failed to login as admin. Make sure the database is initialized with an admin user.") return declare_user_guale(admin_token, "user1") # 6. 用户 2 登录 print("用户 2 登录") token2 = login_user("user2", "pass123") if not token2: return # 7. 用户 2 申领资产,并带上自己的分片 (share_c) print("用户 2 申领资产,并带上自己的分片 (share_c)") claim_res = claim_asset(token2, asset1["id"], json.dumps(share_c)) if not claim_res: return print(f" [输出] Claim Result (私钥分片与加密内容已获取):") print(f" - Server Shard Key: {claim_res['server_shard_key']}") print(f" - Decrypted Content (Outer Layer): {claim_res['decrypted_content'][:50]}...") print("\n## 4. 客户端恢复流 (Client Recovery)") # 8. 恢复助记词 # 继承人有自己的 share_c,从服务器拿到了存储在 asset 里的 share_a (server_shard_key) #server_shard = tuple(claim_res['server_shard_key']) server_shard = ast.literal_eval(claim_res['server_shard_key']) recovered_mnemonic = key_engine.recover_from_shares(server_shard, share_c) print(f" [恢复] 助记词: {recovered_mnemonic}") # 9. 派生密钥 recovered_aes_key = vault.derive_key(recovered_mnemonic) # 10. 解密内容 (Inner Layer) #inner_ciphertext = bytes.fromhex(claim_res["decrypted_content"]) inner_ciphertext = ast.literal_eval(claim_res["decrypted_content"]) decrypted_final = vault.decrypt_data(recovered_aes_key, inner_ciphertext) print(f" [完成] 解密后的原始数据: {decrypted_final}") if decrypted_final == user_data: print("\n✅ 测试成功!数据完整恢复。") else: print("\n❌ 测试失败!解密数据不匹配。") if __name__ == "__main__": main()