""" 对称加密工具(用于存储 API Key/Secret 等敏感字段) 说明: - 使用 AES-GCM(需要 cryptography 依赖) - master key 来自环境变量: - ATS_MASTER_KEY(推荐):32字节 key 的 base64(urlsafe) 或 hex - AUTO_TRADE_SYS_MASTER_KEY(兼容) """ from __future__ import annotations import base64 import binascii import os from typing import Optional def _load_master_key_bytes() -> Optional[bytes]: raw = ( os.getenv("ATS_MASTER_KEY") or os.getenv("AUTO_TRADE_SYS_MASTER_KEY") or os.getenv("MASTER_KEY") or "" ).strip() if not raw: return None # 1) hex try: b = bytes.fromhex(raw) if len(b) == 32: return b except Exception: pass # 2) urlsafe base64 try: padded = raw + ("=" * (-len(raw) % 4)) b = base64.urlsafe_b64decode(padded.encode("utf-8")) if len(b) == 32: return b except binascii.Error: pass except Exception: pass return None def _aesgcm(): try: from cryptography.hazmat.primitives.ciphers.aead import AESGCM # type: ignore return AESGCM except Exception as e: # pragma: no cover raise RuntimeError( "缺少加密依赖 cryptography,无法安全存储敏感字段。请安装 cryptography 并设置 ATS_MASTER_KEY。" ) from e def encrypt_str(plaintext: str) -> str: """ 加密字符串,返回带版本前缀的密文: enc:v1:: """ if plaintext is None: plaintext = "" s = str(plaintext) if s == "": return "" key = _load_master_key_bytes() if not key: # 允许降级:不加密直接存(避免线上因缺KEY彻底不可用),但强烈建议尽快配置 master key return s import os as _os AESGCM = _aesgcm() nonce = _os.urandom(12) aes = AESGCM(key) ct = aes.encrypt(nonce, s.encode("utf-8"), None) return "enc:v1:{}:{}".format( base64.urlsafe_b64encode(nonce).decode("utf-8").rstrip("="), base64.urlsafe_b64encode(ct).decode("utf-8").rstrip("="), ) def decrypt_str(ciphertext: str) -> str: """ 解密 encrypt_str 的输出;若不是 enc:v1 前缀,则视为明文原样返回(兼容旧数据)。 """ if ciphertext is None: return "" s = str(ciphertext) if s == "": return "" if not s.startswith("enc:v1:"): return s key = _load_master_key_bytes() if not key: raise RuntimeError("密文存在但未配置 ATS_MASTER_KEY,无法解密敏感字段。") parts = s.split(":") if len(parts) != 4: raise ValueError("密文格式不正确") b64_nonce = parts[2] + ("=" * (-len(parts[2]) % 4)) b64_ct = parts[3] + ("=" * (-len(parts[3]) % 4)) nonce = base64.urlsafe_b64decode(b64_nonce.encode("utf-8")) ct = base64.urlsafe_b64decode(b64_ct.encode("utf-8")) AESGCM = _aesgcm() aes = AESGCM(key) pt = aes.decrypt(nonce, ct, None) return pt.decode("utf-8")