259 lines
8.5 KiB
Python
259 lines
8.5 KiB
Python
import requests
|
||
import json
|
||
from core.sp_trust_sharding import SentinelKeyEngine
|
||
from core.sp_vault_aes import SentinelVault
|
||
import ast
|
||
|
||
BASE_URL = "http://localhost:8000"
|
||
|
||
def register_user(username, email, password):
|
||
url = f"{BASE_URL}/register"
|
||
data = {
|
||
"username": username,
|
||
"password": password,
|
||
"email": email,
|
||
}
|
||
response = requests.post(url, json=data)
|
||
if response.status_code == 200:
|
||
print(f"User {username} registered successfully.")
|
||
return response.json()
|
||
else:
|
||
print(f"Failed to register {username}: {response.text}")
|
||
return None
|
||
|
||
def login_user(username, password):
|
||
url = f"{BASE_URL}/login"
|
||
data = {
|
||
"username": username,
|
||
"password": password
|
||
}
|
||
response = requests.post(url, json=data)
|
||
if response.status_code == 200:
|
||
print(f"User {username} logged in successfully.")
|
||
return response.json()["access_token"]
|
||
else:
|
||
print(f"Failed to login {username}: {response.text}")
|
||
return None
|
||
|
||
def create_asset(token, title, private_key_shard, content_inner_encrypted):
|
||
url = f"{BASE_URL}/assets/create"
|
||
headers = {"Authorization": f"Bearer {token}"}
|
||
data = {
|
||
"title": title,
|
||
"private_key_shard": str(private_key_shard),
|
||
"content_inner_encrypted": str(content_inner_encrypted)
|
||
}
|
||
response = requests.post(url, json=data, headers=headers)
|
||
if response.status_code == 200:
|
||
print(f"Asset '{title}' created successfully.")
|
||
return response.json()
|
||
else:
|
||
print(f"Failed to create asset: {response.text}")
|
||
return None
|
||
|
||
def assign_heir(token, asset_id, heir_email):
|
||
url = f"{BASE_URL}/assets/assign"
|
||
headers = {"Authorization": f"Bearer {token}"}
|
||
data = {
|
||
"asset_id": asset_id,
|
||
"heir_email": heir_email
|
||
}
|
||
response = requests.post(url, json=data, headers=headers)
|
||
if response.status_code == 200:
|
||
print(f"Asset {asset_id} assigned to heir {heir_email} successfully.")
|
||
return response.json()
|
||
else:
|
||
print(f"Failed to assign heir: {response.text}")
|
||
return None
|
||
|
||
def declare_user_guale(token, username):
|
||
url = f"{BASE_URL}/admin/declare-guale"
|
||
headers = {"Authorization": f"Bearer {token}"}
|
||
data = {"username": username}
|
||
response = requests.post(url, json=data, headers=headers)
|
||
if response.status_code == 200:
|
||
print(f"User {username} declared as 'guale' successfully.")
|
||
return response.json()
|
||
else:
|
||
print(f"Failed to declare guale: {response.text}")
|
||
return None
|
||
|
||
def claim_asset(token, asset_id, private_key_shard):
|
||
url = f"{BASE_URL}/assets/claim"
|
||
headers = {"Authorization": f"Bearer {token}"}
|
||
data = {
|
||
"asset_id": asset_id,
|
||
"private_key_shard": private_key_shard
|
||
}
|
||
response = requests.post(url, json=data, headers=headers)
|
||
if response.status_code == 200:
|
||
print(f"Asset {asset_id} claimed successfully.")
|
||
return response.json()
|
||
else:
|
||
print(f"Failed to claim asset: {response.text}")
|
||
return None
|
||
|
||
|
||
def get_my_assets(token):
|
||
url = f"{BASE_URL}/assets/get"
|
||
headers = {"Authorization": f"Bearer {token}"}
|
||
response = requests.get(url, headers=headers)
|
||
if response.status_code == 200:
|
||
print(f"Assets retrieved successfully.")
|
||
return response.json()
|
||
else:
|
||
print(f"Failed to retrieve assets: {response.text}")
|
||
return None
|
||
|
||
def get_designated_assets(token):
|
||
url = f"{BASE_URL}/assets/designated"
|
||
headers = {"Authorization": f"Bearer {token}"}
|
||
response = requests.get(url, headers=headers)
|
||
if response.status_code == 200:
|
||
print(f"Designated assets retrieved successfully.")
|
||
return response.json()
|
||
else:
|
||
print(f"Failed to retrieve designated assets: {response.text}")
|
||
return None
|
||
|
||
def main():
|
||
# 1. 创建三个用户
|
||
users = [
|
||
("user1", "pass123", "user1@example.com"),
|
||
("user2", "pass123", "user2@example.com"),
|
||
("user3", "pass123", "user3@example.com")
|
||
]
|
||
|
||
for username, password, email in users:
|
||
register_user(username, email, password)
|
||
|
||
# 1.1 用户一信息生成
|
||
key_engine = SentinelKeyEngine()
|
||
# 1.1 生成助记词 (BIP-39)
|
||
master_words, entropy = key_engine.generate_vault_keys()
|
||
print(f" [生成] 原始助记词: {master_words}")
|
||
|
||
# 1.2 SSS 分片 (3-of-2)
|
||
shares = key_engine.split_to_shares(entropy)
|
||
share_a = shares[0] # Device (手机)
|
||
share_b = shares[1] # Cloud (云端)
|
||
share_c = shares[2] # Physical (传承卡)
|
||
|
||
print("\n## 2. 用户内层加密流 (Vault Layer)")
|
||
user_data = "我的瑞士银行账号是:CH123456789,密码是:Sentinel2027"
|
||
print(f" [输入] 用户隐私数据: {user_data}")
|
||
|
||
vault = SentinelVault()
|
||
# 2.1 派生 AES 密钥
|
||
aes_key = vault.derive_key(master_words)
|
||
# 2.2 加密数据
|
||
ciphertext_1 = vault.encrypt_data(aes_key, user_data)
|
||
|
||
# 2. 用户一登录
|
||
token1 = login_user("user1", "pass123")
|
||
if not token1:
|
||
return
|
||
|
||
# 3. 创建三个 asset
|
||
asset1 = create_asset(
|
||
token1,
|
||
"My Secret Asset1",
|
||
share_a,
|
||
ciphertext_1
|
||
)
|
||
|
||
asset2 = create_asset(
|
||
token1,
|
||
"My Secret Asset2",
|
||
share_a,
|
||
ciphertext_1
|
||
)
|
||
|
||
asset3 = create_asset(
|
||
token1,
|
||
"My Secret Asset3",
|
||
share_a,
|
||
ciphertext_1
|
||
)
|
||
|
||
if not asset1 or not asset2 or not asset3:
|
||
print(" [失败] 创建资产失败")
|
||
return
|
||
|
||
|
||
# 3.1 测试 /assets/get
|
||
print("\n [测试] 获取用户资产列表")
|
||
user1_assets = get_my_assets(token1)
|
||
if user1_assets:
|
||
print(f" [输出] 用户1共有 {len(user1_assets)} 个资产")
|
||
else:
|
||
print(" [失败] 无法获取资产列表")
|
||
|
||
print("用户 1 为用户 2 分配遗产")
|
||
assign_heir(token1, asset1["id"], "user2@example.com")
|
||
assign_heir(token1, asset2["id"], "user2@example.com")
|
||
|
||
# 4.1 用户2查询自己能继承多少遗产
|
||
print("\n [测试] 用户 2 查询自己被指定的资产")
|
||
token2_temp = login_user("user2", "pass123")
|
||
designated_assets = get_designated_assets(token2_temp)
|
||
if designated_assets:
|
||
print(f" [输出] 用户 2 共有 {len(designated_assets)} 个被指定的资产")
|
||
for asset in designated_assets:
|
||
print(f" - Asset ID: {asset['id']}, Title: {asset['title']}")
|
||
else:
|
||
print(" [失败] 无法获取被指定资产列表")
|
||
|
||
|
||
print("\n## 3. 继承流 (Inheritance Layer)")
|
||
# 5. Admin 宣布用户 1 挂了
|
||
print("Admin 宣布用户 1 挂了")
|
||
admin_token = login_user("admin", "admin123")
|
||
if not admin_token:
|
||
print("Failed to login as admin. Make sure the database is initialized with an admin user.")
|
||
return
|
||
|
||
declare_user_guale(admin_token, "user1")
|
||
|
||
# 6. 用户 2 登录
|
||
print("用户 2 登录")
|
||
token2 = login_user("user2", "pass123")
|
||
if not token2:
|
||
return
|
||
|
||
# 7. 用户 2 申领资产,并带上自己的分片 (share_c)
|
||
print("用户 2 申领资产,并带上自己的分片 (share_c)")
|
||
claim_res = claim_asset(token2, asset1["id"], json.dumps(share_c))
|
||
if not claim_res:
|
||
return
|
||
|
||
print(f" [输出] Claim Result (私钥分片与加密内容已获取):")
|
||
print(f" - Server Shard Key: {claim_res['server_shard_key']}")
|
||
print(f" - Decrypted Content (Outer Layer): {claim_res['decrypted_content'][:50]}...")
|
||
|
||
print("\n## 4. 客户端恢复流 (Client Recovery)")
|
||
# 8. 恢复助记词
|
||
# 继承人有自己的 share_c,从服务器拿到了存储在 asset 里的 share_a (server_shard_key)
|
||
#server_shard = tuple(claim_res['server_shard_key'])
|
||
server_shard = ast.literal_eval(claim_res['server_shard_key'])
|
||
|
||
recovered_mnemonic = key_engine.recover_from_shares(server_shard, share_c)
|
||
print(f" [恢复] 助记词: {recovered_mnemonic}")
|
||
|
||
# 9. 派生密钥
|
||
recovered_aes_key = vault.derive_key(recovered_mnemonic)
|
||
|
||
# 10. 解密内容 (Inner Layer)
|
||
#inner_ciphertext = bytes.fromhex(claim_res["decrypted_content"])
|
||
inner_ciphertext = ast.literal_eval(claim_res["decrypted_content"])
|
||
decrypted_final = vault.decrypt_data(recovered_aes_key, inner_ciphertext)
|
||
print(f" [完成] 解密后的原始数据: {decrypted_final}")
|
||
|
||
if decrypted_final == user_data:
|
||
print("\n✅ 测试成功!数据完整恢复。")
|
||
else:
|
||
print("\n❌ 测试失败!解密数据不匹配。")
|
||
|
||
if __name__ == "__main__":
|
||
main()
|