backend_v0.1

This commit is contained in:
lusixing
2026-01-24 11:02:08 -08:00
commit d22fa8a286
26 changed files with 1088 additions and 0 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,85 @@
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import serialization, hashes
class SentinelSystemProvider:
"""系统级非对称加密提供者 (独立于用户)"""
@staticmethod
def generate_system_keys():
"""生成全新的系统公私钥对"""
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=4096
)
public_key = private_key.public_key()
# 序列化私钥 (用于保存到安全服务器)
private_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
# 序列化公钥 (用于下发或在线加密)
public_pem = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
return private_pem, public_pem
@staticmethod
def encrypt_with_system_public(public_pem, data_bytes):
"""使用系统公钥进行二次加密"""
public_key = serialization.load_pem_public_key(public_pem)
ciphertext = public_key.encrypt(
data_bytes,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
return ciphertext
@staticmethod
def decrypt_with_system_private(private_pem, ciphertext):
"""使用系统私钥进行二次解密"""
private_key = serialization.load_pem_private_key(private_pem, password=None)
plaintext = private_key.decrypt(
ciphertext,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
return plaintext
if __name__ == "__main__":
# --- 演示流程 ---
# 1. 初始化系统密钥 (这一步通常只在系统上线时执行一次)
sys_provider = SentinelSystemProvider()
private_pem, public_pem = sys_provider.generate_system_keys()
print("【系统层】: 独立公私钥已生成。")
print(f" - 公钥 (PEM): {public_pem.decode('utf-8')[:50]}...")
print(f" - 私钥 (PEM): {private_pem.decode('utf-8')[:50]}...")
# 2. 模拟用户已经加密过的数据 (这已经是用户那一层加密后的二进制数据)
user_encrypted_data = b"User_Encrypted_Blob_v1.0_Data"
print(f"【输入数据】: {user_encrypted_data}")
# 3. 系统二次加密 (外层锁)
# 这一步发生在数据上传服务器时,或者存入信托池时
double_locked_data = sys_provider.encrypt_with_system_public(public_pem, user_encrypted_data)
print(f"【使用公钥加密完成 (密文)】: {double_locked_data.hex()[:50]}...")
# 4. 系统二次解密 (判定传承触发后)
# 只有在满足触发条件(如订阅失败)后,系统才调取私钥进行这第一层解密
try:
system_unlocked_data = sys_provider.decrypt_with_system_private(private_pem, double_locked_data)
print(f"【使用私钥解密成功】: {system_unlocked_data}")
print("【后续步骤】: 现在数据已回归用户初级加密态,可交给用户或者继承人进行最后解密。")
except Exception as e:
print(f"解密失败: {e}")

View File

@@ -0,0 +1,113 @@
import hashlib
import secrets
from mnemonic import Mnemonic # 仅用于标准的助记词转换
class SentinelKeyEngine:
# 使用第 13 个梅森素数 (2^521 - 1),远大于 128-bit 熵,确保有限域安全
PRIME = 2**521 - 1
def __init__(self):
self.mnemo = Mnemonic("english")
def generate_vault_keys(self):
"""
1. 生成原始 12 助记词 (Master Key)
"""
words = self.mnemo.generate(strength=128)
entropy = self.mnemo.to_entropy(words)
return words, entropy
def split_to_shares(self, entropy):
"""
2. SSS (3,2) 门限分片逻辑
公式: f(x) = S + a*x (直线方程S为秘密a为随机斜率)
我们将秘密 S 分成 3 份,任选 2 份即可恢复。
注意:必须在有限域 GF(PRIME) 下进行运算以保证完善保密性。
"""
# 将熵转换为大整数
secret_int = int.from_bytes(entropy, 'big')
# 生成一个随机系数 a (安全性需与秘密强度一致)
# a 必须在 [0, PRIME-1] 范围内
a = secrets.randbelow(self.PRIME)
# 定义 3 个点: x=1, x=2, x=3
# Share = (x, f(x))
def f(x): return (secret_int + a * x) % self.PRIME
share1 = (1, f(1)) # 手机分片
share2 = (2, f(2)) # 云端分片
share3 = (3, f(3)) # 传承卡分片
return [share1, share2, share3]
def recover_from_shares(self, share_a, share_b):
"""
3. 恢复逻辑:拉格朗日插值还原
已知 (x1, y1) 和 (x2, y2),求 f(0) 即秘密 S
公式: S = (x2*y1 - x1*y2) / (x2 - x1)
在有限域下,除法变为乘以模逆: S = (x2*y1 - x1*y2) * (x2 - x1)^-1 mod P
"""
x1, y1 = share_a
x2, y2 = share_b
# 计算分子
numerator = (x2 * y1 - x1 * y2) % self.PRIME
# 计算分母的模逆 (x2 - x1)
denominator = (x2 - x1) % self.PRIME
inv_denominator = pow(denominator, -1, self.PRIME)
# 还原常数项 S
secret_int = (numerator * inv_denominator) % self.PRIME
# 转回字节并生成助记词
# 注意secret_int 可能略小于 16 字节高位为0需要补齐
# 但由于 entropy 原始就是 16 字节,这里直接转换即可
try:
recovered_entropy = secret_int.to_bytes(16, 'big')
except OverflowError:
# 理论上不应发生,除非计算出的 secret_int 大于 128 bit (即原始 entropy 大于 128 bit)
# 这里为了健壮性,如果原始 entropy 是 16 字节,这里应该也是。
# 如果 PRIME 很大secret_int 还是原来的值。
recovered_entropy = secret_int.to_bytes((secret_int.bit_length() + 7) // 8, 'big')
return self.mnemo.to_mnemonic(recovered_entropy)
if __name__ == "__main__":
# --- Sentinel 协议业务流程模拟 ---
engine = SentinelKeyEngine()
# [生前]:初始化金库
master_words, entropy = engine.generate_vault_keys()
print(f"【1. 生成原始助记词】: {master_words}")
shares = engine.split_to_shares(entropy)
print(f"【2. SSS 分片完成】:")
print(f" - 分片1 (手机安全区): {shares[0]}")
print(f" - 分片2 (Sentinel云): {shares[1]}")
print(f" - 分片3 (传承卡单词): {shares[2]}")
print("-" * 50)
# [死后/传承]:模拟用户失联,触发被动验证
# 假设继承人拿着卡片 (Share 3),向服务器请求分片 (Share 2)
successor_share = shares[2]
server_share = shares[1]
# 执行恢复
recovered_words = engine.recover_from_shares(shares[0], shares[1])
print(f"【1. 手机+云 : {recovered_words}")
recovered_words = engine.recover_from_shares(shares[0], shares[2])
print(f"【2. 手机+传承卡 : {recovered_words}")
recovered_words = engine.recover_from_shares(shares[1], shares[2])
print(f"【3. 云+传承卡 : {recovered_words}")
# 校验一致性
assert recovered_words == master_words
print("\n结果:恢复出的助记词与原始完全一致。")
with open("words.txt", "w") as f:
f.write("%s\n"%master_words)

77
test/core/sp_vault_aes.py Normal file
View File

@@ -0,0 +1,77 @@
import os
from mnemonic import Mnemonic
from Crypto.Cipher import AES
from Crypto.Protocol.KDF import PBKDF2
from Crypto.Util.Padding import pad, unpad
class SentinelVault:
def __init__(self, salt=None):
self.mnemo = Mnemonic("english")
# 默认盐值仅用于演示,生产环境建议每个用户随机生成并存储
self.salt = salt if salt else b'Sentinel_Salt_2026'
def derive_key(self, mnemonic_phrase):
"""
使用 PBKDF2 将助记词转换为 AES-256 密钥 (32 bytes)
"""
# 种子生成遵循 BIP-39 逻辑
seed = self.mnemo.to_seed(mnemonic_phrase, passphrase="")
# 派生出一个 32 字节的强密钥
key = PBKDF2(seed, self.salt, dkLen=32, count=100000)
return key
def encrypt_data(self, key, plaintext):
"""
使用 AES-256 GCM 模式进行加密 (具备完整性校验)
"""
cipher = AES.new(key, AES.MODE_GCM)
nonce = cipher.nonce
ciphertext, tag = cipher.encrypt_and_digest(plaintext.encode('utf-8'))
# 返回:随机数 + 校验位 + 密文
return nonce + tag + ciphertext
def decrypt_data(self, key, encrypted_blob):
"""
AES-256 GCM 解密
"""
nonce = encrypted_blob[:16]
tag = encrypted_blob[16:32]
ciphertext = encrypted_blob[32:]
cipher = AES.new(key, AES.MODE_GCM, nonce=nonce)
try:
plaintext = cipher.decrypt_and_verify(ciphertext, tag)
return plaintext.decode('utf-8')
except ValueError:
return "【解密失败】:密钥错误或数据被篡改"
if __name__ == "__main__":
# --- 模拟 Sentinel 协议完整业务流 ---
# 1. 假设这是通过之前 SSS 算法恢复出来的 12 词
recovered_mnemonic = "apple banana cherry dog elephant fish goat horse ice jacket kangaroo lion"
try:
with open("words.txt", "r") as f:
recovered_mnemonic = f.read().strip()
except FileNotFoundError:
print("words.txt 文件未找到,使用默认助记词进行演示。")
print(f"Demo助记词{recovered_mnemonic}")
vault = SentinelVault()
# 2. 生成加密密钥
aes_key = vault.derive_key(recovered_mnemonic)
aes_key_hex = aes_key.hex()
print(f"【密钥派生完成】len:{len(aes_key_hex)} -> {aes_key_hex[:20]}...")
# 3. 用户生前加密资产(如:银行账户、数字遗产)
my_legacy = "我的瑞士银行账号是CH123456789密码是Sentinel2026"
print(f"【Demo资产信息】{my_legacy}")
encrypted_asset = vault.encrypt_data(aes_key, my_legacy)
encrypted_asset_hex = encrypted_asset.hex()
print(f"【数据已加密】len:{len(encrypted_asset_hex)} -> {encrypted_asset_hex[:40]}...")
# 4. 模拟继承人通过分片拼凑后进行解密
print("-" * 50)
decrypted_content = vault.decrypt_data(aes_key, encrypted_asset)
print(f"【继承人解密成功】:{decrypted_content}")

201
test/test_scenario.py Normal file
View File

@@ -0,0 +1,201 @@
import requests
import json
from core.sp_trust_sharding import SentinelKeyEngine
from core.sp_vault_aes import SentinelVault
import ast
BASE_URL = "http://localhost:8000"
def register_user(username, password):
url = f"{BASE_URL}/register"
data = {
"username": username,
"password": password
}
response = requests.post(url, json=data)
if response.status_code == 200:
print(f"User {username} registered successfully.")
return response.json()
else:
print(f"Failed to register {username}: {response.text}")
return None
def login_user(username, password):
url = f"{BASE_URL}/token"
data = {
"username": username,
"password": password
}
response = requests.post(url, json=data)
if response.status_code == 200:
print(f"User {username} logged in successfully.")
return response.json()["access_token"]
else:
print(f"Failed to login {username}: {response.text}")
return None
def create_asset(token, title, private_key_shard, content_inner_encrypted):
url = f"{BASE_URL}/assets/"
headers = {"Authorization": f"Bearer {token}"}
data = {
"title": title,
"private_key_shard": str(private_key_shard),
"content_inner_encrypted": str(content_inner_encrypted)
}
response = requests.post(url, json=data, headers=headers)
if response.status_code == 200:
print(f"Asset '{title}' created successfully.")
return response.json()
else:
print(f"Failed to create asset: {response.text}")
return None
def assign_heir(token, asset_id, heir_name):
url = f"{BASE_URL}/assets/assign"
headers = {"Authorization": f"Bearer {token}"}
data = {
"asset_id": asset_id,
"heir_name": heir_name
}
response = requests.post(url, json=data, headers=headers)
if response.status_code == 200:
print(f"Asset {asset_id} assigned to heir {heir_name} successfully.")
return response.json()
else:
print(f"Failed to assign heir: {response.text}")
return None
def declare_user_guale(token, username):
url = f"{BASE_URL}/admin/declare-guale"
headers = {"Authorization": f"Bearer {token}"}
data = {"username": username}
response = requests.post(url, json=data, headers=headers)
if response.status_code == 200:
print(f"User {username} declared as 'guale' successfully.")
return response.json()
else:
print(f"Failed to declare guale: {response.text}")
return None
def claim_asset(token, asset_id, private_key_shard):
url = f"{BASE_URL}/assets/claim"
headers = {"Authorization": f"Bearer {token}"}
data = {
"asset_id": asset_id,
"private_key_shard": private_key_shard
}
response = requests.post(url, json=data, headers=headers)
if response.status_code == 200:
print(f"Asset {asset_id} claimed successfully.")
return response.json()
else:
print(f"Failed to claim asset: {response.text}")
return None
def main():
# 1. 创建三个用户
users = [
("user1", "pass123"),
("user2", "pass123"),
("user3", "pass123")
]
for username, password in users:
register_user(username, password)
# 1.1 用户一信息生成
key_engine = SentinelKeyEngine()
# 1.1 生成助记词 (BIP-39)
master_words, entropy = key_engine.generate_vault_keys()
print(f" [生成] 原始助记词: {master_words}")
# 1.2 SSS 分片 (3-of-2)
shares = key_engine.split_to_shares(entropy)
share_a = shares[0] # Device (手机)
share_b = shares[1] # Cloud (云端)
share_c = shares[2] # Physical (传承卡)
print("\n## 2. 用户内层加密流 (Vault Layer)")
user_data = "我的瑞士银行账号是CH123456789密码是Sentinel2027"
print(f" [输入] 用户隐私数据: {user_data}")
vault = SentinelVault()
# 2.1 派生 AES 密钥
aes_key = vault.derive_key(master_words)
# 2.2 加密数据
ciphertext_1 = vault.encrypt_data(aes_key, user_data)
# 2. 用户一登录
token1 = login_user("user1", "pass123")
if not token1:
return
# 3. 创建一个 asset
asset = create_asset(
token1,
"My Secret Asset",
share_a,
ciphertext_1
)
if not asset:
return
asset_id = asset["id"]
print(f" [输出] Asset ID: {asset_id}")
# 4. 指定用户 2 为继承人
print("用户 1 指定用户 2 为继承人")
assign_heir(token1, asset_id, "user2")
print("\n## 3. 继承流 (Inheritance Layer)")
# 5. Admin 宣布用户 1 挂了
print("Admin 宣布用户 1 挂了")
admin_token = login_user("admin", "admin123")
if not admin_token:
print("Failed to login as admin. Make sure the database is initialized with an admin user.")
return
declare_user_guale(admin_token, "user1")
# 6. 用户 2 登录
print("用户 2 登录")
token2 = login_user("user2", "pass123")
if not token2:
return
# 7. 用户 2 申领资产,并带上自己的分片 (share_c)
print("用户 2 申领资产,并带上自己的分片 (share_c)")
claim_res = claim_asset(token2, asset_id, json.dumps(share_c))
if not claim_res:
return
print(f" [输出] Claim Result (私钥分片与加密内容已获取):")
print(f" - Server Shard Key: {claim_res['server_shard_key']}")
print(f" - Decrypted Content (Outer Layer): {claim_res['decrypted_content'][:50]}...")
print("\n## 4. 客户端恢复流 (Client Recovery)")
# 8. 恢复助记词
# 继承人有自己的 share_c从服务器拿到了存储在 asset 里的 share_a (server_shard_key)
#server_shard = tuple(claim_res['server_shard_key'])
server_shard = ast.literal_eval(claim_res['server_shard_key'])
recovered_mnemonic = key_engine.recover_from_shares(server_shard, share_c)
print(f" [恢复] 助记词: {recovered_mnemonic}")
# 9. 派生密钥
recovered_aes_key = vault.derive_key(recovered_mnemonic)
# 10. 解密内容 (Inner Layer)
#inner_ciphertext = bytes.fromhex(claim_res["decrypted_content"])
inner_ciphertext = ast.literal_eval(claim_res["decrypted_content"])
decrypted_final = vault.decrypt_data(recovered_aes_key, inner_ciphertext)
print(f" [完成] 解密后的原始数据: {decrypted_final}")
if decrypted_final == user_data:
print("\n✅ 测试成功!数据完整恢复。")
else:
print("\n❌ 测试失败!解密数据不匹配。")
if __name__ == "__main__":
main()