222 lines
8.4 KiB
Python
222 lines
8.4 KiB
Python
"""
|
||
Redis 缓存管理器 - 支持 TLS 连接
|
||
"""
|
||
import json
|
||
import logging
|
||
from typing import Optional, Any, Dict, List
|
||
import ssl
|
||
|
||
try:
|
||
import aioredis
|
||
from aioredis import Redis
|
||
AIOREDIS_AVAILABLE = True
|
||
except ImportError:
|
||
AIOREDIS_AVAILABLE = False
|
||
Redis = None
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class RedisCache:
|
||
"""Redis 缓存管理器 - 支持 TLS 连接和降级到内存缓存"""
|
||
|
||
def __init__(self, redis_url: str = None, use_tls: bool = False,
|
||
ssl_cert_reqs: str = 'required', ssl_ca_certs: str = None,
|
||
username: str = None, password: str = None):
|
||
"""
|
||
初始化 Redis 缓存管理器
|
||
|
||
Args:
|
||
redis_url: Redis 连接 URL(例如: redis://localhost:6379 或 rediss://localhost:6380)
|
||
如果 URL 中包含用户名和密码,会优先使用 URL 中的认证信息
|
||
use_tls: 是否使用 TLS(如果 redis_url 以 rediss:// 开头,会自动启用)
|
||
ssl_cert_reqs: SSL 证书验证要求 ('none', 'optional', 'required')
|
||
ssl_ca_certs: SSL CA 证书路径
|
||
username: Redis 用户名(如果 URL 中未包含)
|
||
password: Redis 密码(如果 URL 中未包含)
|
||
"""
|
||
self.redis_url = redis_url or "redis://localhost:6379"
|
||
self.use_tls = use_tls or self.redis_url.startswith('rediss://')
|
||
self.ssl_cert_reqs = ssl_cert_reqs
|
||
self.ssl_ca_certs = ssl_ca_certs
|
||
self.username = username
|
||
self.password = password
|
||
self.redis: Optional[Redis] = None
|
||
self._memory_cache: Dict[str, Any] = {} # 降级到内存缓存
|
||
self._connected = False
|
||
|
||
async def connect(self):
|
||
"""连接 Redis"""
|
||
if not AIOREDIS_AVAILABLE:
|
||
logger.warning("aioredis 未安装,将使用内存缓存")
|
||
self.redis = None
|
||
self._connected = False
|
||
return
|
||
|
||
try:
|
||
# 构建连接参数
|
||
connection_kwargs = {}
|
||
|
||
# 如果使用 TLS
|
||
if self.use_tls or self.redis_url.startswith('rediss://'):
|
||
# 配置 SSL 上下文
|
||
ssl_context = ssl.create_default_context()
|
||
|
||
# 设置证书验证要求
|
||
if self.ssl_cert_reqs == 'none':
|
||
ssl_context.check_hostname = False
|
||
ssl_context.verify_mode = ssl.CERT_NONE
|
||
elif self.ssl_cert_reqs == 'optional':
|
||
ssl_context.check_hostname = False
|
||
ssl_context.verify_mode = ssl.CERT_OPTIONAL
|
||
else: # required
|
||
ssl_context.check_hostname = True
|
||
ssl_context.verify_mode = ssl.CERT_REQUIRED
|
||
|
||
# 如果提供了 CA 证书路径
|
||
if self.ssl_ca_certs:
|
||
ssl_context.load_verify_locations(self.ssl_ca_certs)
|
||
|
||
connection_kwargs['ssl'] = ssl_context
|
||
logger.info(f"使用 TLS 连接 Redis: {self.redis_url}")
|
||
|
||
# 如果 URL 中不包含用户名和密码,且提供了独立的用户名和密码参数,则添加到连接参数中
|
||
# 注意:如果 URL 中已经包含认证信息(如 redis://user:pass@host:port),则优先使用 URL 中的
|
||
if self.username and self.password:
|
||
# 检查 URL 中是否已包含认证信息(格式:redis://user:pass@host:port)
|
||
url_parts = self.redis_url.split('://')
|
||
if len(url_parts) == 2:
|
||
url_after_scheme = url_parts[1]
|
||
# 如果 URL 中不包含 @ 符号,说明没有在 URL 中指定认证信息
|
||
if '@' not in url_after_scheme:
|
||
# URL 中不包含认证信息,使用独立的用户名和密码参数
|
||
connection_kwargs['username'] = self.username
|
||
connection_kwargs['password'] = self.password
|
||
logger.info(f"使用独立的用户名和密码进行认证: {self.username}")
|
||
else:
|
||
logger.info("URL 中已包含认证信息,优先使用 URL 中的认证信息")
|
||
else:
|
||
# URL 格式异常,尝试使用独立的用户名和密码
|
||
connection_kwargs['username'] = self.username
|
||
connection_kwargs['password'] = self.password
|
||
logger.info(f"URL 格式异常,使用独立的用户名和密码进行认证: {self.username}")
|
||
|
||
# 创建 Redis 连接
|
||
self.redis = await aioredis.from_url(
|
||
self.redis_url,
|
||
**connection_kwargs
|
||
)
|
||
|
||
# 测试连接
|
||
await self.redis.ping()
|
||
self._connected = True
|
||
logger.info(f"✓ Redis 连接成功: {self.redis_url}")
|
||
|
||
except Exception as e:
|
||
logger.warning(f"Redis 连接失败: {e},将使用内存缓存")
|
||
self.redis = None
|
||
self._connected = False
|
||
if self.redis:
|
||
try:
|
||
await self.redis.close()
|
||
except:
|
||
pass
|
||
self.redis = None
|
||
|
||
async def get(self, key: str) -> Optional[Any]:
|
||
"""
|
||
获取缓存
|
||
|
||
Args:
|
||
key: 缓存键
|
||
|
||
Returns:
|
||
缓存值,如果不存在则返回 None
|
||
"""
|
||
# 先尝试从 Redis 获取
|
||
if self.redis and self._connected:
|
||
try:
|
||
data = await self.redis.get(key)
|
||
if data:
|
||
return json.loads(data)
|
||
except Exception as e:
|
||
logger.debug(f"Redis 获取失败 {key}: {e}")
|
||
# Redis 失败时,尝试重新连接
|
||
if not self._connected:
|
||
await self.connect()
|
||
|
||
# 降级到内存缓存
|
||
if key in self._memory_cache:
|
||
return self._memory_cache[key]
|
||
|
||
return None
|
||
|
||
async def set(self, key: str, value: Any, ttl: int = 3600):
|
||
"""
|
||
设置缓存
|
||
|
||
Args:
|
||
key: 缓存键
|
||
value: 缓存值
|
||
ttl: 过期时间(秒)
|
||
|
||
Returns:
|
||
是否成功
|
||
"""
|
||
# 先尝试写入 Redis
|
||
if self.redis and self._connected:
|
||
try:
|
||
await self.redis.setex(key, ttl, json.dumps(value))
|
||
return True
|
||
except Exception as e:
|
||
logger.debug(f"Redis 设置失败 {key}: {e}")
|
||
# Redis 失败时,尝试重新连接
|
||
if not self._connected:
|
||
await self.connect()
|
||
if self.redis and self._connected:
|
||
try:
|
||
await self.redis.setex(key, ttl, json.dumps(value))
|
||
return True
|
||
except:
|
||
pass
|
||
|
||
# 降级到内存缓存(不设置 TTL,因为内存缓存不支持)
|
||
self._memory_cache[key] = value
|
||
return False
|
||
|
||
async def delete(self, key: str):
|
||
"""删除缓存"""
|
||
if self.redis and self._connected:
|
||
try:
|
||
await self.redis.delete(key)
|
||
except Exception as e:
|
||
logger.debug(f"Redis 删除失败 {key}: {e}")
|
||
|
||
# 同时删除内存缓存
|
||
if key in self._memory_cache:
|
||
del self._memory_cache[key]
|
||
|
||
async def exists(self, key: str) -> bool:
|
||
"""检查缓存是否存在"""
|
||
if self.redis and self._connected:
|
||
try:
|
||
return await self.redis.exists(key) > 0
|
||
except Exception as e:
|
||
logger.debug(f"Redis 检查失败 {key}: {e}")
|
||
|
||
return key in self._memory_cache
|
||
|
||
async def close(self):
|
||
"""关闭连接"""
|
||
if self.redis:
|
||
try:
|
||
await self.redis.close()
|
||
self._connected = False
|
||
logger.info("Redis 连接已关闭")
|
||
except Exception as e:
|
||
logger.debug(f"关闭 Redis 连接时出错: {e}")
|
||
|
||
def is_connected(self) -> bool:
|
||
"""检查是否已连接"""
|
||
return self._connected and self.redis is not None
|