""" Redis 缓存管理器 - 支持 TLS 连接 """ import json import logging from typing import Optional, Any, Dict, List import ssl try: import aioredis from aioredis import Redis AIOREDIS_AVAILABLE = True except ImportError: AIOREDIS_AVAILABLE = False Redis = None logger = logging.getLogger(__name__) class RedisCache: """Redis 缓存管理器 - 支持 TLS 连接和降级到内存缓存""" def __init__(self, redis_url: str = None, use_tls: bool = False, ssl_cert_reqs: str = 'required', ssl_ca_certs: str = None, username: str = None, password: str = None): """ 初始化 Redis 缓存管理器 Args: redis_url: Redis 连接 URL(例如: redis://localhost:6379 或 rediss://localhost:6380) 如果 URL 中包含用户名和密码,会优先使用 URL 中的认证信息 use_tls: 是否使用 TLS(如果 redis_url 以 rediss:// 开头,会自动启用) ssl_cert_reqs: SSL 证书验证要求 ('none', 'optional', 'required') ssl_ca_certs: SSL CA 证书路径 username: Redis 用户名(如果 URL 中未包含) password: Redis 密码(如果 URL 中未包含) """ self.redis_url = redis_url or "redis://localhost:6379" self.use_tls = use_tls or self.redis_url.startswith('rediss://') self.ssl_cert_reqs = ssl_cert_reqs self.ssl_ca_certs = ssl_ca_certs self.username = username self.password = password self.redis: Optional[Redis] = None self._memory_cache: Dict[str, Any] = {} # 降级到内存缓存 self._connected = False async def connect(self): """连接 Redis""" if not AIOREDIS_AVAILABLE: logger.warning("aioredis 未安装,将使用内存缓存") self.redis = None self._connected = False return try: # 构建连接参数 connection_kwargs = {} # 如果使用 TLS if self.use_tls or self.redis_url.startswith('rediss://'): # 配置 SSL 上下文 ssl_context = ssl.create_default_context() # 设置证书验证要求 if self.ssl_cert_reqs == 'none': ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE elif self.ssl_cert_reqs == 'optional': ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_OPTIONAL else: # required ssl_context.check_hostname = True ssl_context.verify_mode = ssl.CERT_REQUIRED # 如果提供了 CA 证书路径 if self.ssl_ca_certs: ssl_context.load_verify_locations(self.ssl_ca_certs) connection_kwargs['ssl'] = ssl_context logger.info(f"使用 TLS 连接 Redis: {self.redis_url}") # 如果 URL 中不包含用户名和密码,且提供了独立的用户名和密码参数,则添加到连接参数中 # 注意:如果 URL 中已经包含认证信息(如 redis://user:pass@host:port),则优先使用 URL 中的 if self.username and self.password: # 检查 URL 中是否已包含认证信息(格式:redis://user:pass@host:port) url_parts = self.redis_url.split('://') if len(url_parts) == 2: url_after_scheme = url_parts[1] # 如果 URL 中不包含 @ 符号,说明没有在 URL 中指定认证信息 if '@' not in url_after_scheme: # URL 中不包含认证信息,使用独立的用户名和密码参数 connection_kwargs['username'] = self.username connection_kwargs['password'] = self.password logger.info(f"使用独立的用户名和密码进行认证: {self.username}") else: logger.info("URL 中已包含认证信息,优先使用 URL 中的认证信息") else: # URL 格式异常,尝试使用独立的用户名和密码 connection_kwargs['username'] = self.username connection_kwargs['password'] = self.password logger.info(f"URL 格式异常,使用独立的用户名和密码进行认证: {self.username}") # 创建 Redis 连接 self.redis = await aioredis.from_url( self.redis_url, **connection_kwargs ) # 测试连接 await self.redis.ping() self._connected = True logger.info(f"✓ Redis 连接成功: {self.redis_url}") except Exception as e: logger.warning(f"Redis 连接失败: {e},将使用内存缓存") self.redis = None self._connected = False if self.redis: try: await self.redis.close() except: pass self.redis = None async def get(self, key: str) -> Optional[Any]: """ 获取缓存 Args: key: 缓存键 Returns: 缓存值,如果不存在则返回 None """ # 先尝试从 Redis 获取 if self.redis and self._connected: try: data = await self.redis.get(key) if data: return json.loads(data) except Exception as e: logger.debug(f"Redis 获取失败 {key}: {e}") # Redis 失败时,尝试重新连接 if not self._connected: await self.connect() # 降级到内存缓存 if key in self._memory_cache: return self._memory_cache[key] return None async def set(self, key: str, value: Any, ttl: int = 3600): """ 设置缓存 Args: key: 缓存键 value: 缓存值 ttl: 过期时间(秒) Returns: 是否成功 """ # 先尝试写入 Redis if self.redis and self._connected: try: await self.redis.setex(key, ttl, json.dumps(value)) return True except Exception as e: logger.debug(f"Redis 设置失败 {key}: {e}") # Redis 失败时,尝试重新连接 if not self._connected: await self.connect() if self.redis and self._connected: try: await self.redis.setex(key, ttl, json.dumps(value)) return True except: pass # 降级到内存缓存(不设置 TTL,因为内存缓存不支持) self._memory_cache[key] = value return False async def delete(self, key: str): """删除缓存""" if self.redis and self._connected: try: await self.redis.delete(key) except Exception as e: logger.debug(f"Redis 删除失败 {key}: {e}") # 同时删除内存缓存 if key in self._memory_cache: del self._memory_cache[key] async def exists(self, key: str) -> bool: """检查缓存是否存在""" if self.redis and self._connected: try: return await self.redis.exists(key) > 0 except Exception as e: logger.debug(f"Redis 检查失败 {key}: {e}") return key in self._memory_cache async def close(self): """关闭连接""" if self.redis: try: await self.redis.close() self._connected = False logger.info("Redis 连接已关闭") except Exception as e: logger.debug(f"关闭 Redis 连接时出错: {e}") def is_connected(self) -> bool: """检查是否已连接""" return self._connected and self.redis is not None