Files
backend/test/test_scenario.py
2026-01-31 16:29:57 -08:00

265 lines
8.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import requests
import json
from core.sp_trust_sharding import SentinelKeyEngine
from core.sp_vault_aes import SentinelVault
import ast
BASE_URL = "http://localhost:8000"
def register_user(username, email, password):
url = f"{BASE_URL}/register"
data = {
"username": username,
"password": password,
"email": email,
}
response = requests.post(url, json=data)
if response.status_code == 200:
print(f"User {username} registered successfully.")
return response.json()
else:
print(f"Failed to register {username}: {response.text}")
return None
def login_user(username, password):
url = f"{BASE_URL}/login"
data = {
"username": username,
"password": password
}
response = requests.post(url, json=data)
if response.status_code == 200:
print(f"User {username} logged in successfully.")
return response.json()["access_token"]
else:
print(f"Failed to login {username}: {response.text}")
return None
def create_asset(token, title, private_key_shard, content_inner_encrypted, asset_type="note"):
url = f"{BASE_URL}/assets/create"
headers = {"Authorization": f"Bearer {token}"}
data = {
"title": title,
"type": asset_type,
"private_key_shard": str(private_key_shard),
"content_inner_encrypted": str(content_inner_encrypted)
}
response = requests.post(url, json=data, headers=headers)
if response.status_code == 200:
asset_data = response.json()
print(f"Asset '{title}' (type: {asset_type}) created successfully.")
print(f" [校验] Timestamps: created_at={asset_data.get('created_at')}, updated_at={asset_data.get('updated_at')}")
return asset_data
else:
print(f"Failed to create asset: {response.text}")
return None
def assign_heir(token, asset_id, heir_email):
url = f"{BASE_URL}/assets/assign"
headers = {"Authorization": f"Bearer {token}"}
data = {
"asset_id": asset_id,
"heir_email": heir_email
}
response = requests.post(url, json=data, headers=headers)
if response.status_code == 200:
print(f"Asset {asset_id} assigned to heir {heir_email} successfully.")
return response.json()
else:
print(f"Failed to assign heir: {response.text}")
return None
def declare_user_guale(token, username):
url = f"{BASE_URL}/admin/declare-guale"
headers = {"Authorization": f"Bearer {token}"}
data = {"username": username}
response = requests.post(url, json=data, headers=headers)
if response.status_code == 200:
print(f"User {username} declared as 'guale' successfully.")
return response.json()
else:
print(f"Failed to declare guale: {response.text}")
return None
def claim_asset(token, asset_id, private_key_shard):
url = f"{BASE_URL}/assets/claim"
headers = {"Authorization": f"Bearer {token}"}
data = {
"asset_id": asset_id,
"private_key_shard": private_key_shard
}
response = requests.post(url, json=data, headers=headers)
if response.status_code == 200:
print(f"Asset {asset_id} claimed successfully.")
return response.json()
else:
print(f"Failed to claim asset: {response.text}")
return None
def get_my_assets(token):
url = f"{BASE_URL}/assets/get"
headers = {"Authorization": f"Bearer {token}"}
response = requests.get(url, headers=headers)
if response.status_code == 200:
print(f"Assets retrieved successfully.")
return response.json()
else:
print(f"Failed to retrieve assets: {response.text}")
return None
def get_designated_assets(token):
url = f"{BASE_URL}/assets/designated"
headers = {"Authorization": f"Bearer {token}"}
response = requests.get(url, headers=headers)
if response.status_code == 200:
print(f"Designated assets retrieved successfully.")
return response.json()
else:
print(f"Failed to retrieve designated assets: {response.text}")
return None
def main():
# 1. 创建三个用户
users = [
("user1", "pass123", "user1@example.com"),
("user2", "pass123", "user2@example.com"),
("user3", "pass123", "user3@example.com")
]
for username, password, email in users:
register_user(username, email, password)
# 1.1 用户一信息生成
key_engine = SentinelKeyEngine()
# 1.1 生成助记词 (BIP-39)
master_words, entropy = key_engine.generate_vault_keys()
print(f" [生成] 原始助记词: {master_words}")
# 1.2 SSS 分片 (3-of-2)
shares = key_engine.split_to_shares(entropy)
share_a = shares[0] # Device (手机)
share_b = shares[1] # Cloud (云端)
share_c = shares[2] # Physical (传承卡)
print("\n## 2. 用户内层加密流 (Vault Layer)")
user_data = "我的瑞士银行账号是CH123456789密码是Sentinel2027"
print(f" [输入] 用户隐私数据: {user_data}")
vault = SentinelVault()
# 2.1 派生 AES 密钥
aes_key = vault.derive_key(master_words)
# 2.2 加密数据
ciphertext_1 = vault.encrypt_data(aes_key, user_data)
# 2. 用户一登录
token1 = login_user("user1", "pass123")
if not token1:
return
# 3. 创建三个 asset
asset1 = create_asset(
token1,
"My Secret Asset1",
share_a,
ciphertext_1,
"note"
)
asset2 = create_asset(
token1,
"My Secret Asset2",
share_a,
ciphertext_1,
"note"
)
asset3 = create_asset(
token1,
"My Secret Asset3",
share_a,
ciphertext_1,
"note"
)
if not asset1 or not asset2 or not asset3:
print(" [失败] 创建资产失败")
return
# 3.1 测试 /assets/get
print("\n [测试] 获取用户资产列表")
user1_assets = get_my_assets(token1)
if user1_assets:
print(f" [输出] 用户1共有 {len(user1_assets)} 个资产")
else:
print(" [失败] 无法获取资产列表")
print("用户 1 为用户 2 分配遗产")
assign_heir(token1, asset1["id"], "user2@example.com")
assign_heir(token1, asset2["id"], "user2@example.com")
# 4.1 用户2查询自己能继承多少遗产
print("\n [测试] 用户 2 查询自己被指定的资产")
token2_temp = login_user("user2", "pass123")
designated_assets = get_designated_assets(token2_temp)
if designated_assets:
print(f" [输出] 用户 2 共有 {len(designated_assets)} 个被指定的资产")
for asset in designated_assets:
print(f" - Asset ID: {asset['id']}, Title: {asset['title']}")
else:
print(" [失败] 无法获取被指定资产列表")
print("\n## 3. 继承流 (Inheritance Layer)")
# 5. Admin 宣布用户 1 挂了
print("Admin 宣布用户 1 挂了")
admin_token = login_user("admin", "admin123")
if not admin_token:
print("Failed to login as admin. Make sure the database is initialized with an admin user.")
return
declare_user_guale(admin_token, "user1")
# 6. 用户 2 登录
print("用户 2 登录")
token2 = login_user("user2", "pass123")
if not token2:
return
# 7. 用户 2 申领资产,并带上自己的分片 (share_c)
print("用户 2 申领资产,并带上自己的分片 (share_c)")
claim_res = claim_asset(token2, asset1["id"], json.dumps(share_c))
if not claim_res:
return
print(f" [输出] Claim Result (私钥分片与加密内容已获取):")
print(f" - Server Shard Key: {claim_res['server_shard_key']}")
print(f" - Decrypted Content (Outer Layer): {claim_res['decrypted_content'][:50]}...")
print("\n## 4. 客户端恢复流 (Client Recovery)")
# 8. 恢复助记词
# 继承人有自己的 share_c从服务器拿到了存储在 asset 里的 share_a (server_shard_key)
#server_shard = tuple(claim_res['server_shard_key'])
server_shard = ast.literal_eval(claim_res['server_shard_key'])
recovered_mnemonic = key_engine.recover_from_shares(server_shard, share_c)
print(f" [恢复] 助记词: {recovered_mnemonic}")
# 9. 派生密钥
recovered_aes_key = vault.derive_key(recovered_mnemonic)
# 10. 解密内容 (Inner Layer)
#inner_ciphertext = bytes.fromhex(claim_res["decrypted_content"])
inner_ciphertext = ast.literal_eval(claim_res["decrypted_content"])
decrypted_final = vault.decrypt_data(recovered_aes_key, inner_ciphertext)
print(f" [完成] 解密后的原始数据: {decrypted_final}")
if decrypted_final == user_data:
print("\n✅ 测试成功!数据完整恢复。")
else:
print("\n❌ 测试失败!解密数据不匹配。")
if __name__ == "__main__":
main()