""" Redis 缓存管理器 - 支持 TLS 连接 """ import json import logging from typing import Optional, Any, Dict, List try: # 使用 redis-py 4.2+ 的异步客户端(替代 aioredis) import redis.asyncio as aioredis from redis.asyncio import Redis REDIS_AVAILABLE = True except ImportError as e: # 如果 redis.asyncio 不可用,尝试回退到 aioredis(向后兼容) try: import aioredis from aioredis import Redis REDIS_AVAILABLE = True except ImportError: REDIS_AVAILABLE = False Redis = None import sys import os # 保存导入错误信息,用于诊断 _import_error = str(e) _python_path = sys.executable _python_version = sys.version logger = logging.getLogger(__name__) class RedisCache: """Redis 缓存管理器 - 支持 TLS 连接和降级到内存缓存""" def __init__(self, redis_url: str = None, use_tls: bool = False, ssl_cert_reqs: str = 'required', ssl_ca_certs: str = None, username: str = None, password: str = None): """ 初始化 Redis 缓存管理器 Args: redis_url: Redis 连接 URL(例如: redis://localhost:6379 或 rediss://localhost:6380) 如果 URL 中包含用户名和密码,会优先使用 URL 中的认证信息 use_tls: 是否使用 TLS(如果 redis_url 以 rediss:// 开头,会自动启用) ssl_cert_reqs: SSL 证书验证要求 ('none', 'optional', 'required') ssl_ca_certs: SSL CA 证书路径 username: Redis 用户名(如果 URL 中未包含) password: Redis 密码(如果 URL 中未包含) """ # 验证和规范化Redis URL if not redis_url or not isinstance(redis_url, str): logger.warning(f"Redis URL无效: {redis_url},使用默认值") redis_url = "redis://localhost:6379" # 如果使用TLS但URL不是rediss://,自动转换 if use_tls and not redis_url.startswith('rediss://'): if redis_url.startswith('redis://'): redis_url = redis_url.replace('redis://', 'rediss://', 1) logger.info(f"已自动转换Redis URL为TLS格式: {redis_url}") else: # 如果URL格式不正确,构建新的URL logger.warning(f"Redis URL格式不正确且需要TLS: {redis_url},尝试构建rediss://URL") if '://' in redis_url: parts = redis_url.split('://', 1) host_port = parts[1].split('/')[0] if '/' in parts[1] else parts[1] redis_url = f"rediss://{host_port}" else: # 如果完全没有scheme,添加rediss:// redis_url = f"rediss://{redis_url}" logger.info(f"已构建Redis TLS URL: {redis_url}") self.redis_url = redis_url self.use_tls = use_tls or self.redis_url.startswith('rediss://') self.ssl_cert_reqs = ssl_cert_reqs self.ssl_ca_certs = ssl_ca_certs self.username = username self.password = password self.redis: Optional[Redis] = None self._memory_cache: Dict[str, Any] = {} # 降级到内存缓存 self._connected = False async def connect(self): """连接 Redis""" if not REDIS_AVAILABLE: import sys import os logger.warning("redis 未安装,将使用内存缓存") logger.debug(f" Python 路径: {sys.executable}") logger.debug(f" Python 版本: {sys.version.split()[0]}") logger.debug(f" 导入错误: {_import_error if '_import_error' in globals() else '未知'}") logger.debug(f" 提示: 请运行 'pip install redis>=4.2.0' 安装 redis-py") self.redis = None self._connected = False return try: # 构建连接参数 connection_kwargs = {} # 如果使用 TLS(redis-py的异步客户端使用特定的SSL参数,而不是ssl上下文) if self.use_tls or self.redis_url.startswith('rediss://'): # redis-py的异步客户端不支持直接传递ssl上下文 # 而是使用ssl_cert_reqs、ssl_ca_certs等参数 # 当URL是rediss://时,redis-py会自动启用SSL # 设置证书验证要求(字符串格式:'required', 'optional', 'none') connection_kwargs['ssl_cert_reqs'] = self.ssl_cert_reqs # 如果提供了 CA 证书路径 if self.ssl_ca_certs: connection_kwargs['ssl_ca_certs'] = self.ssl_ca_certs # 设置主机名验证(根据ssl_cert_reqs自动设置,但可以显式指定) if self.ssl_cert_reqs == 'none': connection_kwargs['ssl_check_hostname'] = False elif self.ssl_cert_reqs == 'required': connection_kwargs['ssl_check_hostname'] = True else: # optional connection_kwargs['ssl_check_hostname'] = False logger.info(f"使用 TLS 连接 Redis: {self.redis_url} (ssl_cert_reqs={self.ssl_cert_reqs})") # 如果 URL 中不包含用户名和密码,且提供了独立的用户名和密码参数,则添加到连接参数中 # 注意:如果 URL 中已经包含认证信息(如 redis://user:pass@host:port),则优先使用 URL 中的 if self.username and self.password: # 检查 URL 中是否已包含认证信息(格式:redis://user:pass@host:port) url_parts = self.redis_url.split('://') if len(url_parts) == 2: url_after_scheme = url_parts[1] # 如果 URL 中不包含 @ 符号,说明没有在 URL 中指定认证信息 if '@' not in url_after_scheme: # URL 中不包含认证信息,使用独立的用户名和密码参数 connection_kwargs['username'] = self.username connection_kwargs['password'] = self.password logger.info(f"使用独立的用户名和密码进行认证: {self.username}") else: logger.info("URL 中已包含认证信息,优先使用 URL 中的认证信息") else: # URL 格式异常,尝试使用独立的用户名和密码 connection_kwargs['username'] = self.username connection_kwargs['password'] = self.password logger.info(f"URL 格式异常,使用独立的用户名和密码进行认证: {self.username}") # 验证URL格式(redis-py要求URL必须有正确的scheme) if not self.redis_url or not any(self.redis_url.startswith(scheme) for scheme in ['redis://', 'rediss://', 'unix://']): raise ValueError( f"Redis URL必须指定以下scheme之一 (redis://, rediss://, unix://): {self.redis_url}" ) # 创建 Redis 连接 # 使用 redis.asyncio.from_url(redis-py 4.2+)或 aioredis.from_url(向后兼容) # redis-py 需要 decode_responses=True,aioredis 2.0 可能不需要 # 检查是否是 redis.asyncio(通过检查模块名称) try: module_name = aioredis.__name__ if hasattr(aioredis, '__name__') else '' is_redis_py = 'redis.asyncio' in module_name or 'redis' in module_name except: is_redis_py = False if is_redis_py: # redis-py 4.2+ 的异步客户端,需要 decode_responses=True connection_kwargs['decode_responses'] = True # aioredis 2.0 不需要 decode_responses(它默认返回字节) self.redis = await aioredis.from_url( self.redis_url, **connection_kwargs ) # 测试连接 await self.redis.ping() self._connected = True logger.info(f"✓ Redis 连接成功: {self.redis_url}") except Exception as e: logger.warning(f"Redis 连接失败: {e},将使用内存缓存") self.redis = None self._connected = False if self.redis: try: await self.redis.close() except: pass self.redis = None async def get(self, key: str) -> Optional[Any]: """ 获取缓存 Args: key: 缓存键 Returns: 缓存值,如果不存在则返回 None """ # 先尝试从 Redis 获取 if self.redis and self._connected: try: data = await self.redis.get(key) if data: return json.loads(data) except Exception as e: logger.debug(f"Redis 获取失败 {key}: {e}") # Redis 失败时,尝试重新连接 if not self._connected: await self.connect() # 降级到内存缓存 if key in self._memory_cache: return self._memory_cache[key] return None async def set(self, key: str, value: Any, ttl: int = 3600): """ 设置缓存 Args: key: 缓存键 value: 缓存值 ttl: 过期时间(秒) Returns: 是否成功 """ # 先尝试写入 Redis if self.redis and self._connected: try: await self.redis.setex(key, ttl, json.dumps(value)) return True except Exception as e: logger.debug(f"Redis 设置失败 {key}: {e}") # Redis 失败时,尝试重新连接 if not self._connected: await self.connect() if self.redis and self._connected: try: await self.redis.setex(key, ttl, json.dumps(value)) return True except: pass # 降级到内存缓存(不设置 TTL,因为内存缓存不支持) self._memory_cache[key] = value return False async def delete(self, key: str): """删除缓存""" if self.redis and self._connected: try: await self.redis.delete(key) except Exception as e: logger.debug(f"Redis 删除失败 {key}: {e}") # 同时删除内存缓存 if key in self._memory_cache: del self._memory_cache[key] async def exists(self, key: str) -> bool: """检查缓存是否存在""" if self.redis and self._connected: try: return await self.redis.exists(key) > 0 except Exception as e: logger.debug(f"Redis 检查失败 {key}: {e}") return key in self._memory_cache async def close(self): """关闭连接""" if self.redis: try: await self.redis.close() self._connected = False logger.info("Redis 连接已关闭") except Exception as e: logger.debug(f"关闭 Redis 连接时出错: {e}") def is_connected(self) -> bool: """检查是否已连接""" return self._connected and self.redis is not None async def hset(self, name: str, key: str, value: Any, ttl: int = None): """ 设置Hash字段 Args: name: Hash名称 key: 字段名 value: 字段值 ttl: 过期时间(秒),如果设置,会对整个Hash设置TTL """ if self.redis and self._connected: try: await self.redis.hset(name, key, json.dumps(value)) # 如果设置了TTL,对整个Hash设置过期时间 if ttl: await self.redis.expire(name, ttl) return True except Exception as e: logger.debug(f"Redis Hash设置失败 {name}.{key}: {e}") # Redis失败时,尝试重新连接 if not self._connected: await self.connect() if self.redis and self._connected: try: await self.redis.hset(name, key, json.dumps(value)) if ttl: await self.redis.expire(name, ttl) return True except: pass # 降级到内存缓存 if name not in self._memory_cache: self._memory_cache[name] = {} self._memory_cache[name][key] = value return False async def hget(self, name: str, key: str) -> Optional[Any]: """ 获取Hash字段 Args: name: Hash名称 key: 字段名 Returns: 字段值,如果不存在则返回None """ if self.redis and self._connected: try: data = await self.redis.hget(name, key) if data: return json.loads(data) except Exception as e: logger.debug(f"Redis Hash获取失败 {name}.{key}: {e}") # 降级到内存缓存 if name in self._memory_cache and key in self._memory_cache[name]: return self._memory_cache[name][key] return None async def hgetall(self, name: str) -> Dict[str, Any]: """ 获取Hash所有字段 Args: name: Hash名称 Returns: 所有字段的字典 """ if self.redis and self._connected: try: data = await self.redis.hgetall(name) result = {} for k, v in data.items(): if isinstance(k, bytes): k = k.decode('utf-8') if isinstance(v, bytes): try: v = json.loads(v.decode('utf-8')) except: v = v.decode('utf-8') else: try: v = json.loads(v) except: pass result[k] = v return result except Exception as e: logger.debug(f"Redis Hash获取全部失败 {name}: {e}") # Redis失败时,尝试重新连接 if not self._connected: await self.connect() # 降级到内存缓存 if name in self._memory_cache: return self._memory_cache[name].copy() return {} async def hdel(self, name: str, *keys: str): """ 删除Hash字段 Args: name: Hash名称 *keys: 要删除的字段名 """ if self.redis and self._connected: try: await self.redis.hdel(name, *keys) except Exception as e: logger.debug(f"Redis Hash删除失败 {name}: {e}") # 同时删除内存缓存 if name in self._memory_cache: for key in keys: if key in self._memory_cache[name]: del self._memory_cache[name][key] async def zadd(self, name: str, mapping: Dict[str, float], ttl: int = None): """ 添加Sorted Set成员 Args: name: Sorted Set名称 mapping: {member: score} 字典 ttl: 过期时间(秒) """ if self.redis and self._connected: try: await self.redis.zadd(name, mapping) if ttl: await self.redis.expire(name, ttl) return True except Exception as e: logger.debug(f"Redis Sorted Set添加失败 {name}: {e}") return False async def zrange(self, name: str, start: int = 0, end: int = -1, desc: bool = False, withscores: bool = False) -> List: """ 获取Sorted Set成员(按分数排序) Args: name: Sorted Set名称 start: 起始索引 end: 结束索引 desc: 是否降序(默认False,升序) withscores: 是否包含分数 Returns: 成员列表 """ if self.redis and self._connected: try: if desc: return await self.redis.zrevrange(name, start, end, withscores=withscores) else: return await self.redis.zrange(name, start, end, withscores=withscores) except Exception as e: logger.debug(f"Redis Sorted Set获取失败 {name}: {e}") return [] async def zrem(self, name: str, *members: str): """ 删除Sorted Set成员 Args: name: Sorted Set名称 *members: 要删除的成员 """ if self.redis and self._connected: try: await self.redis.zrem(name, *members) except Exception as e: logger.debug(f"Redis Sorted Set删除失败 {name}: {e}")