auto_trade_sys/trading_system/redis_cache.py
薇薇安 0b11b72eb8 a
2026-01-18 09:36:15 +08:00

437 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Redis 缓存管理器 - 支持 TLS 连接
"""
import json
import logging
from typing import Optional, Any, Dict, List
import ssl
try:
# 使用 redis-py 4.2+ 的异步客户端(替代 aioredis
import redis.asyncio as aioredis
from redis.asyncio import Redis
REDIS_AVAILABLE = True
except ImportError as e:
# 如果 redis.asyncio 不可用,尝试回退到 aioredis向后兼容
try:
import aioredis
from aioredis import Redis
REDIS_AVAILABLE = True
except ImportError:
REDIS_AVAILABLE = False
Redis = None
import sys
import os
# 保存导入错误信息,用于诊断
_import_error = str(e)
_python_path = sys.executable
_python_version = sys.version
logger = logging.getLogger(__name__)
class RedisCache:
"""Redis 缓存管理器 - 支持 TLS 连接和降级到内存缓存"""
def __init__(self, redis_url: str = None, use_tls: bool = False,
ssl_cert_reqs: str = 'required', ssl_ca_certs: str = None,
username: str = None, password: str = None):
"""
初始化 Redis 缓存管理器
Args:
redis_url: Redis 连接 URL例如: redis://localhost:6379 或 rediss://localhost:6380
如果 URL 中包含用户名和密码,会优先使用 URL 中的认证信息
use_tls: 是否使用 TLS如果 redis_url 以 rediss:// 开头,会自动启用)
ssl_cert_reqs: SSL 证书验证要求 ('none', 'optional', 'required')
ssl_ca_certs: SSL CA 证书路径
username: Redis 用户名(如果 URL 中未包含)
password: Redis 密码(如果 URL 中未包含)
"""
self.redis_url = redis_url or "redis://localhost:6379"
self.use_tls = use_tls or self.redis_url.startswith('rediss://')
self.ssl_cert_reqs = ssl_cert_reqs
self.ssl_ca_certs = ssl_ca_certs
self.username = username
self.password = password
self.redis: Optional[Redis] = None
self._memory_cache: Dict[str, Any] = {} # 降级到内存缓存
self._connected = False
async def connect(self):
"""连接 Redis"""
if not REDIS_AVAILABLE:
import sys
import os
logger.warning("redis 未安装,将使用内存缓存")
logger.debug(f" Python 路径: {sys.executable}")
logger.debug(f" Python 版本: {sys.version.split()[0]}")
logger.debug(f" 导入错误: {_import_error if '_import_error' in globals() else '未知'}")
logger.debug(f" 提示: 请运行 'pip install redis>=4.2.0' 安装 redis-py")
self.redis = None
self._connected = False
return
try:
# 构建连接参数
connection_kwargs = {}
# 如果使用 TLS
if self.use_tls or self.redis_url.startswith('rediss://'):
# 配置 SSL 上下文
ssl_context = ssl.create_default_context()
# 设置证书验证要求
if self.ssl_cert_reqs == 'none':
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
elif self.ssl_cert_reqs == 'optional':
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_OPTIONAL
else: # required
ssl_context.check_hostname = True
ssl_context.verify_mode = ssl.CERT_REQUIRED
# 如果提供了 CA 证书路径
if self.ssl_ca_certs:
ssl_context.load_verify_locations(self.ssl_ca_certs)
connection_kwargs['ssl'] = ssl_context
logger.info(f"使用 TLS 连接 Redis: {self.redis_url}")
# 如果 URL 中不包含用户名和密码,且提供了独立的用户名和密码参数,则添加到连接参数中
# 注意:如果 URL 中已经包含认证信息(如 redis://user:pass@host:port则优先使用 URL 中的
if self.username and self.password:
# 检查 URL 中是否已包含认证信息格式redis://user:pass@host:port
url_parts = self.redis_url.split('://')
if len(url_parts) == 2:
url_after_scheme = url_parts[1]
# 如果 URL 中不包含 @ 符号,说明没有在 URL 中指定认证信息
if '@' not in url_after_scheme:
# URL 中不包含认证信息,使用独立的用户名和密码参数
connection_kwargs['username'] = self.username
connection_kwargs['password'] = self.password
logger.info(f"使用独立的用户名和密码进行认证: {self.username}")
else:
logger.info("URL 中已包含认证信息,优先使用 URL 中的认证信息")
else:
# URL 格式异常,尝试使用独立的用户名和密码
connection_kwargs['username'] = self.username
connection_kwargs['password'] = self.password
logger.info(f"URL 格式异常,使用独立的用户名和密码进行认证: {self.username}")
# 创建 Redis 连接
# 使用 redis.asyncio.from_urlredis-py 4.2+)或 aioredis.from_url向后兼容
# redis-py 需要 decode_responses=Trueaioredis 2.0 可能不需要
# 检查是否是 redis.asyncio通过检查模块名称
try:
module_name = aioredis.__name__ if hasattr(aioredis, '__name__') else ''
is_redis_py = 'redis.asyncio' in module_name or 'redis' in module_name
except:
is_redis_py = False
if is_redis_py:
# redis-py 4.2+ 的异步客户端,需要 decode_responses=True
connection_kwargs['decode_responses'] = True
# aioredis 2.0 不需要 decode_responses它默认返回字节
self.redis = await aioredis.from_url(
self.redis_url,
**connection_kwargs
)
# 测试连接
await self.redis.ping()
self._connected = True
logger.info(f"✓ Redis 连接成功: {self.redis_url}")
except Exception as e:
logger.warning(f"Redis 连接失败: {e},将使用内存缓存")
self.redis = None
self._connected = False
if self.redis:
try:
await self.redis.close()
except:
pass
self.redis = None
async def get(self, key: str) -> Optional[Any]:
"""
获取缓存
Args:
key: 缓存键
Returns:
缓存值,如果不存在则返回 None
"""
# 先尝试从 Redis 获取
if self.redis and self._connected:
try:
data = await self.redis.get(key)
if data:
return json.loads(data)
except Exception as e:
logger.debug(f"Redis 获取失败 {key}: {e}")
# Redis 失败时,尝试重新连接
if not self._connected:
await self.connect()
# 降级到内存缓存
if key in self._memory_cache:
return self._memory_cache[key]
return None
async def set(self, key: str, value: Any, ttl: int = 3600):
"""
设置缓存
Args:
key: 缓存键
value: 缓存值
ttl: 过期时间(秒)
Returns:
是否成功
"""
# 先尝试写入 Redis
if self.redis and self._connected:
try:
await self.redis.setex(key, ttl, json.dumps(value))
return True
except Exception as e:
logger.debug(f"Redis 设置失败 {key}: {e}")
# Redis 失败时,尝试重新连接
if not self._connected:
await self.connect()
if self.redis and self._connected:
try:
await self.redis.setex(key, ttl, json.dumps(value))
return True
except:
pass
# 降级到内存缓存(不设置 TTL因为内存缓存不支持
self._memory_cache[key] = value
return False
async def delete(self, key: str):
"""删除缓存"""
if self.redis and self._connected:
try:
await self.redis.delete(key)
except Exception as e:
logger.debug(f"Redis 删除失败 {key}: {e}")
# 同时删除内存缓存
if key in self._memory_cache:
del self._memory_cache[key]
async def exists(self, key: str) -> bool:
"""检查缓存是否存在"""
if self.redis and self._connected:
try:
return await self.redis.exists(key) > 0
except Exception as e:
logger.debug(f"Redis 检查失败 {key}: {e}")
return key in self._memory_cache
async def close(self):
"""关闭连接"""
if self.redis:
try:
await self.redis.close()
self._connected = False
logger.info("Redis 连接已关闭")
except Exception as e:
logger.debug(f"关闭 Redis 连接时出错: {e}")
def is_connected(self) -> bool:
"""检查是否已连接"""
return self._connected and self.redis is not None
async def hset(self, name: str, key: str, value: Any, ttl: int = None):
"""
设置Hash字段
Args:
name: Hash名称
key: 字段名
value: 字段值
ttl: 过期时间如果设置会对整个Hash设置TTL
"""
if self.redis and self._connected:
try:
await self.redis.hset(name, key, json.dumps(value))
# 如果设置了TTL对整个Hash设置过期时间
if ttl:
await self.redis.expire(name, ttl)
return True
except Exception as e:
logger.debug(f"Redis Hash设置失败 {name}.{key}: {e}")
# Redis失败时尝试重新连接
if not self._connected:
await self.connect()
if self.redis and self._connected:
try:
await self.redis.hset(name, key, json.dumps(value))
if ttl:
await self.redis.expire(name, ttl)
return True
except:
pass
# 降级到内存缓存
if name not in self._memory_cache:
self._memory_cache[name] = {}
self._memory_cache[name][key] = value
return False
async def hget(self, name: str, key: str) -> Optional[Any]:
"""
获取Hash字段
Args:
name: Hash名称
key: 字段名
Returns:
字段值如果不存在则返回None
"""
if self.redis and self._connected:
try:
data = await self.redis.hget(name, key)
if data:
return json.loads(data)
except Exception as e:
logger.debug(f"Redis Hash获取失败 {name}.{key}: {e}")
# 降级到内存缓存
if name in self._memory_cache and key in self._memory_cache[name]:
return self._memory_cache[name][key]
return None
async def hgetall(self, name: str) -> Dict[str, Any]:
"""
获取Hash所有字段
Args:
name: Hash名称
Returns:
所有字段的字典
"""
if self.redis and self._connected:
try:
data = await self.redis.hgetall(name)
result = {}
for k, v in data.items():
if isinstance(k, bytes):
k = k.decode('utf-8')
if isinstance(v, bytes):
try:
v = json.loads(v.decode('utf-8'))
except:
v = v.decode('utf-8')
else:
try:
v = json.loads(v)
except:
pass
result[k] = v
return result
except Exception as e:
logger.debug(f"Redis Hash获取全部失败 {name}: {e}")
# Redis失败时尝试重新连接
if not self._connected:
await self.connect()
# 降级到内存缓存
if name in self._memory_cache:
return self._memory_cache[name].copy()
return {}
async def hdel(self, name: str, *keys: str):
"""
删除Hash字段
Args:
name: Hash名称
*keys: 要删除的字段名
"""
if self.redis and self._connected:
try:
await self.redis.hdel(name, *keys)
except Exception as e:
logger.debug(f"Redis Hash删除失败 {name}: {e}")
# 同时删除内存缓存
if name in self._memory_cache:
for key in keys:
if key in self._memory_cache[name]:
del self._memory_cache[name][key]
async def zadd(self, name: str, mapping: Dict[str, float], ttl: int = None):
"""
添加Sorted Set成员
Args:
name: Sorted Set名称
mapping: {member: score} 字典
ttl: 过期时间(秒)
"""
if self.redis and self._connected:
try:
await self.redis.zadd(name, mapping)
if ttl:
await self.redis.expire(name, ttl)
return True
except Exception as e:
logger.debug(f"Redis Sorted Set添加失败 {name}: {e}")
return False
async def zrange(self, name: str, start: int = 0, end: int = -1, desc: bool = False, withscores: bool = False) -> List:
"""
获取Sorted Set成员按分数排序
Args:
name: Sorted Set名称
start: 起始索引
end: 结束索引
desc: 是否降序默认False升序
withscores: 是否包含分数
Returns:
成员列表
"""
if self.redis and self._connected:
try:
if desc:
return await self.redis.zrevrange(name, start, end, withscores=withscores)
else:
return await self.redis.zrange(name, start, end, withscores=withscores)
except Exception as e:
logger.debug(f"Redis Sorted Set获取失败 {name}: {e}")
return []
async def zrem(self, name: str, *members: str):
"""
删除Sorted Set成员
Args:
name: Sorted Set名称
*members: 要删除的成员
"""
if self.redis and self._connected:
try:
await self.redis.zrem(name, *members)
except Exception as e:
logger.debug(f"Redis Sorted Set删除失败 {name}: {e}")