120 lines
3.1 KiB
Python
120 lines
3.1 KiB
Python
"""
|
||
对称加密工具(用于存储 API Key/Secret 等敏感字段)
|
||
|
||
说明:
|
||
- 使用 AES-GCM(需要 cryptography 依赖)
|
||
- master key 来自环境变量:
|
||
- ATS_MASTER_KEY(推荐):32字节 key 的 base64(urlsafe) 或 hex
|
||
- AUTO_TRADE_SYS_MASTER_KEY(兼容)
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import base64
|
||
import binascii
|
||
import os
|
||
from typing import Optional
|
||
|
||
|
||
def _load_master_key_bytes() -> Optional[bytes]:
|
||
raw = (
|
||
os.getenv("ATS_MASTER_KEY")
|
||
or os.getenv("AUTO_TRADE_SYS_MASTER_KEY")
|
||
or os.getenv("MASTER_KEY")
|
||
or ""
|
||
).strip()
|
||
if not raw:
|
||
return None
|
||
|
||
# 1) hex
|
||
try:
|
||
b = bytes.fromhex(raw)
|
||
if len(b) == 32:
|
||
return b
|
||
except Exception:
|
||
pass
|
||
|
||
# 2) urlsafe base64
|
||
try:
|
||
padded = raw + ("=" * (-len(raw) % 4))
|
||
b = base64.urlsafe_b64decode(padded.encode("utf-8"))
|
||
if len(b) == 32:
|
||
return b
|
||
except binascii.Error:
|
||
pass
|
||
except Exception:
|
||
pass
|
||
|
||
return None
|
||
|
||
|
||
def _aesgcm():
|
||
try:
|
||
from cryptography.hazmat.primitives.ciphers.aead import AESGCM # type: ignore
|
||
|
||
return AESGCM
|
||
except Exception as e: # pragma: no cover
|
||
raise RuntimeError(
|
||
"缺少加密依赖 cryptography,无法安全存储敏感字段。请安装 cryptography 并设置 ATS_MASTER_KEY。"
|
||
) from e
|
||
|
||
|
||
def encrypt_str(plaintext: str) -> str:
|
||
"""
|
||
加密字符串,返回带版本前缀的密文:
|
||
enc:v1:<b64(nonce)>:<b64(ciphertext)>
|
||
"""
|
||
if plaintext is None:
|
||
plaintext = ""
|
||
s = str(plaintext)
|
||
if s == "":
|
||
return ""
|
||
|
||
key = _load_master_key_bytes()
|
||
if not key:
|
||
# 允许降级:不加密直接存(避免线上因缺KEY彻底不可用),但强烈建议尽快配置 master key
|
||
return s
|
||
|
||
import os as _os
|
||
|
||
AESGCM = _aesgcm()
|
||
nonce = _os.urandom(12)
|
||
aes = AESGCM(key)
|
||
ct = aes.encrypt(nonce, s.encode("utf-8"), None)
|
||
return "enc:v1:{}:{}".format(
|
||
base64.urlsafe_b64encode(nonce).decode("utf-8").rstrip("="),
|
||
base64.urlsafe_b64encode(ct).decode("utf-8").rstrip("="),
|
||
)
|
||
|
||
|
||
def decrypt_str(ciphertext: str) -> str:
|
||
"""
|
||
解密 encrypt_str 的输出;若不是 enc:v1 前缀,则视为明文原样返回(兼容旧数据)。
|
||
"""
|
||
if ciphertext is None:
|
||
return ""
|
||
s = str(ciphertext)
|
||
if s == "":
|
||
return ""
|
||
if not s.startswith("enc:v1:"):
|
||
return s
|
||
|
||
key = _load_master_key_bytes()
|
||
if not key:
|
||
raise RuntimeError("密文存在但未配置 ATS_MASTER_KEY,无法解密敏感字段。")
|
||
|
||
parts = s.split(":")
|
||
if len(parts) != 4:
|
||
raise ValueError("密文格式不正确")
|
||
|
||
b64_nonce = parts[2] + ("=" * (-len(parts[2]) % 4))
|
||
b64_ct = parts[3] + ("=" * (-len(parts[3]) % 4))
|
||
nonce = base64.urlsafe_b64decode(b64_nonce.encode("utf-8"))
|
||
ct = base64.urlsafe_b64decode(b64_ct.encode("utf-8"))
|
||
|
||
AESGCM = _aesgcm()
|
||
aes = AESGCM(key)
|
||
pt = aes.decrypt(nonce, ct, None)
|
||
return pt.decode("utf-8")
|
||
|