basic work
This commit is contained in:
85
core/sp_gateway_rsa.py
Normal file
85
core/sp_gateway_rsa.py
Normal file
@@ -0,0 +1,85 @@
|
||||
from cryptography.hazmat.primitives.asymmetric import rsa, padding
|
||||
from cryptography.hazmat.primitives import serialization, hashes
|
||||
|
||||
class SentinelSystemProvider:
|
||||
"""系统级非对称加密提供者 (独立于用户)"""
|
||||
|
||||
@staticmethod
|
||||
def generate_system_keys():
|
||||
"""生成全新的系统公私钥对"""
|
||||
private_key = rsa.generate_private_key(
|
||||
public_exponent=65537,
|
||||
key_size=2048
|
||||
)
|
||||
public_key = private_key.public_key()
|
||||
|
||||
# 序列化私钥 (用于保存到安全服务器)
|
||||
private_pem = private_key.private_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PrivateFormat.PKCS8,
|
||||
encryption_algorithm=serialization.NoEncryption()
|
||||
)
|
||||
|
||||
# 序列化公钥 (用于下发或在线加密)
|
||||
public_pem = public_key.public_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PublicFormat.SubjectPublicKeyInfo
|
||||
)
|
||||
|
||||
return private_pem, public_pem
|
||||
|
||||
@staticmethod
|
||||
def encrypt_with_system_public(public_pem, data_bytes):
|
||||
"""使用系统公钥进行二次加密"""
|
||||
public_key = serialization.load_pem_public_key(public_pem)
|
||||
ciphertext = public_key.encrypt(
|
||||
data_bytes,
|
||||
padding.OAEP(
|
||||
mgf=padding.MGF1(algorithm=hashes.SHA256()),
|
||||
algorithm=hashes.SHA256(),
|
||||
label=None
|
||||
)
|
||||
)
|
||||
return ciphertext
|
||||
|
||||
@staticmethod
|
||||
def decrypt_with_system_private(private_pem, ciphertext):
|
||||
"""使用系统私钥进行二次解密"""
|
||||
private_key = serialization.load_pem_private_key(private_pem, password=None)
|
||||
plaintext = private_key.decrypt(
|
||||
ciphertext,
|
||||
padding.OAEP(
|
||||
mgf=padding.MGF1(algorithm=hashes.SHA256()),
|
||||
algorithm=hashes.SHA256(),
|
||||
label=None
|
||||
)
|
||||
)
|
||||
return plaintext
|
||||
if __name__ == "__main__":
|
||||
# --- 演示流程 ---
|
||||
|
||||
# 1. 初始化系统密钥 (这一步通常只在系统上线时执行一次)
|
||||
sys_provider = SentinelSystemProvider()
|
||||
private_pem, public_pem = sys_provider.generate_system_keys()
|
||||
|
||||
print("【系统层】: 独立公私钥已生成。")
|
||||
print(f" - 公钥 (PEM): {public_pem.decode('utf-8')[:50]}...")
|
||||
print(f" - 私钥 (PEM): {private_pem.decode('utf-8')[:50]}...")
|
||||
|
||||
# 2. 模拟用户已经加密过的数据 (这已经是用户那一层加密后的二进制数据)
|
||||
user_encrypted_data = b"User_Encrypted_Blob_v1.0_Data"
|
||||
print(f"【输入数据】: {user_encrypted_data}")
|
||||
|
||||
# 3. 系统二次加密 (外层锁)
|
||||
# 这一步发生在数据上传服务器时,或者存入信托池时
|
||||
double_locked_data = sys_provider.encrypt_with_system_public(public_pem, user_encrypted_data)
|
||||
print(f"【使用公钥加密完成 (密文)】: {double_locked_data.hex()[:50]}...")
|
||||
|
||||
# 4. 系统二次解密 (判定传承触发后)
|
||||
# 只有在满足触发条件(如订阅失败)后,系统才调取私钥进行这第一层解密
|
||||
try:
|
||||
system_unlocked_data = sys_provider.decrypt_with_system_private(private_pem, double_locked_data)
|
||||
print(f"【使用私钥解密成功】: {system_unlocked_data}")
|
||||
print("【后续步骤】: 现在数据已回归用户初级加密态,可交给用户或者继承人进行最后解密。")
|
||||
except Exception as e:
|
||||
print(f"解密失败: {e}")
|
||||
93
core/sp_trust_sharding.py
Normal file
93
core/sp_trust_sharding.py
Normal file
@@ -0,0 +1,93 @@
|
||||
import hashlib
|
||||
import secrets
|
||||
from mnemonic import Mnemonic # 仅用于标准的助记词转换
|
||||
|
||||
class SentinelKeyEngine:
|
||||
def __init__(self):
|
||||
self.mnemo = Mnemonic("english")
|
||||
|
||||
def generate_vault_keys(self):
|
||||
"""
|
||||
1. 生成原始 12 助记词 (Master Key)
|
||||
"""
|
||||
words = self.mnemo.generate(strength=128)
|
||||
entropy = self.mnemo.to_entropy(words)
|
||||
return words, entropy
|
||||
|
||||
def split_to_shares(self, entropy):
|
||||
"""
|
||||
2. SSS (3,2) 门限分片逻辑
|
||||
公式: f(x) = S + a*x (直线方程,S为秘密,a为随机斜率)
|
||||
我们将秘密 S 分成 3 份,任选 2 份即可恢复
|
||||
"""
|
||||
# 将熵转换为大整数
|
||||
secret_int = int.from_bytes(entropy, 'big')
|
||||
|
||||
# 生成一个随机系数 a (安全性需与秘密强度一致)
|
||||
# 这里使用 secrets 保证加密级随机
|
||||
a = int.from_bytes(secrets.token_bytes(16), 'big')
|
||||
|
||||
# 定义 3 个点: x=1, x=2, x=3
|
||||
# Share = (x, f(x))
|
||||
def f(x): return secret_int + a * x
|
||||
|
||||
share1 = (1, f(1)) # 手机分片
|
||||
share2 = (2, f(2)) # 云端分片
|
||||
share3 = (3, f(3)) # 传承卡分片
|
||||
|
||||
return [share1, share2, share3]
|
||||
|
||||
def recover_from_shares(self, share_a, share_b):
|
||||
"""
|
||||
3. 恢复逻辑:拉格朗日插值还原
|
||||
已知 (x1, y1) 和 (x2, y2),求 f(0) 即秘密 S
|
||||
公式: S = (x2*y1 - x1*y2) / (x2 - x1)
|
||||
"""
|
||||
x1, y1 = share_a
|
||||
x2, y2 = share_b
|
||||
|
||||
# 还原常数项 S
|
||||
secret_int = (x2 * y1 - x1 * y2) // (x2 - x1)
|
||||
|
||||
# 转回字节并生成助记词
|
||||
recovered_entropy = secret_int.to_bytes(16, 'big')
|
||||
return self.mnemo.to_mnemonic(recovered_entropy)
|
||||
if __name__ == "__main__":
|
||||
# --- Sentinel 协议业务流程模拟 ---
|
||||
|
||||
engine = SentinelKeyEngine()
|
||||
|
||||
# [生前]:初始化金库
|
||||
master_words, entropy = engine.generate_vault_keys()
|
||||
print(f"【1. 生成原始助记词】: {master_words}")
|
||||
|
||||
shares = engine.split_to_shares(entropy)
|
||||
print(f"【2. SSS 分片完成】:")
|
||||
print(f" - 分片1 (手机安全区): {shares[0]}")
|
||||
print(f" - 分片2 (Sentinel云): {shares[1]}")
|
||||
print(f" - 分片3 (传承卡单词): {shares[2]}")
|
||||
|
||||
print("-" * 50)
|
||||
|
||||
# [死后/传承]:模拟用户失联,触发被动验证
|
||||
# 假设继承人拿着卡片 (Share 3),向服务器请求分片 (Share 2)
|
||||
successor_share = shares[2]
|
||||
server_share = shares[1]
|
||||
|
||||
# 执行恢复
|
||||
recovered_words = engine.recover_from_shares(shares[0], shares[1])
|
||||
print(f"【1. 手机+云 : {recovered_words}")
|
||||
|
||||
recovered_words = engine.recover_from_shares(shares[0], shares[2])
|
||||
print(f"【2. 手机+传承卡 : {recovered_words}")
|
||||
|
||||
recovered_words = engine.recover_from_shares(shares[1], shares[2])
|
||||
print(f"【3. 云+传承卡 : {recovered_words}")
|
||||
|
||||
# 校验一致性
|
||||
assert recovered_words == master_words
|
||||
print("\n结果:恢复出的助记词与原始完全一致。")
|
||||
|
||||
|
||||
with open("words.txt", "w") as f:
|
||||
f.write("%s\n"%master_words)
|
||||
76
core/sp_vault_aes.py
Normal file
76
core/sp_vault_aes.py
Normal file
@@ -0,0 +1,76 @@
|
||||
import os
|
||||
from mnemonic import Mnemonic
|
||||
from Crypto.Cipher import AES
|
||||
from Crypto.Protocol.KDF import PBKDF2
|
||||
from Crypto.Util.Padding import pad, unpad
|
||||
|
||||
class SentinelVault:
|
||||
def __init__(self, salt=b'Sentinel_Salt_2026'): # 固定的盐值,用于增加派生强度
|
||||
self.mnemo = Mnemonic("english")
|
||||
self.salt = salt
|
||||
|
||||
def derive_key(self, mnemonic_phrase):
|
||||
"""
|
||||
使用 PBKDF2 将助记词转换为 AES-256 密钥 (32 bytes)
|
||||
"""
|
||||
# 种子生成遵循 BIP-39 逻辑
|
||||
seed = self.mnemo.to_seed(mnemonic_phrase, passphrase="")
|
||||
# 派生出一个 32 字节的强密钥
|
||||
key = PBKDF2(seed, self.salt, dkLen=32, count=100000)
|
||||
return key
|
||||
|
||||
def encrypt_data(self, key, plaintext):
|
||||
"""
|
||||
使用 AES-256 GCM 模式进行加密 (具备完整性校验)
|
||||
"""
|
||||
cipher = AES.new(key, AES.MODE_GCM)
|
||||
nonce = cipher.nonce
|
||||
ciphertext, tag = cipher.encrypt_and_digest(plaintext.encode('utf-8'))
|
||||
# 返回:随机数 + 校验位 + 密文
|
||||
return nonce + tag + ciphertext
|
||||
|
||||
def decrypt_data(self, key, encrypted_blob):
|
||||
"""
|
||||
AES-256 GCM 解密
|
||||
"""
|
||||
nonce = encrypted_blob[:16]
|
||||
tag = encrypted_blob[16:32]
|
||||
ciphertext = encrypted_blob[32:]
|
||||
|
||||
cipher = AES.new(key, AES.MODE_GCM, nonce=nonce)
|
||||
try:
|
||||
plaintext = cipher.decrypt_and_verify(ciphertext, tag)
|
||||
return plaintext.decode('utf-8')
|
||||
except ValueError:
|
||||
return "【解密失败】:密钥错误或数据被篡改"
|
||||
|
||||
if __name__ == "__main__":
|
||||
# --- 模拟 Sentinel 协议完整业务流 ---
|
||||
|
||||
# 1. 假设这是通过之前 SSS 算法恢复出来的 12 词
|
||||
recovered_mnemonic = "apple banana cherry dog elephant fish goat horse ice jacket kangaroo lion"
|
||||
try:
|
||||
with open("words.txt", "r") as f:
|
||||
recovered_mnemonic = f.read().strip()
|
||||
except FileNotFoundError:
|
||||
print("words.txt 文件未找到,使用默认助记词进行演示。")
|
||||
|
||||
print(f"Demo助记词:{recovered_mnemonic}")
|
||||
vault = SentinelVault()
|
||||
|
||||
# 2. 生成加密密钥
|
||||
aes_key = vault.derive_key(recovered_mnemonic)
|
||||
aes_key_hex = aes_key.hex()
|
||||
print(f"【密钥派生完成】:len:{len(aes_key_hex)} -> {aes_key_hex[:20]}...")
|
||||
|
||||
# 3. 用户生前加密资产(如:银行账户、数字遗产)
|
||||
my_legacy = "我的瑞士银行账号是:CH123456789,密码是:Sentinel2026"
|
||||
print(f"【Demo资产信息】:{my_legacy}")
|
||||
encrypted_asset = vault.encrypt_data(aes_key, my_legacy)
|
||||
encrypted_asset_hex = encrypted_asset.hex()
|
||||
print(f"【数据已加密】:len:{len(encrypted_asset_hex)} -> {encrypted_asset_hex[:40]}...")
|
||||
|
||||
# 4. 模拟继承人通过分片拼凑后进行解密
|
||||
print("-" * 50)
|
||||
decrypted_content = vault.decrypt_data(aes_key, encrypted_asset)
|
||||
print(f"【继承人解密成功】:{decrypted_content}")
|
||||
Reference in New Issue
Block a user