""" Valkey/Redis TLS 连接测试脚本 基于: redis-cli --tls -h asredis-hadmun.serverless.ape1.cache.amazonaws.com -p 6379 """ import asyncio import redis.asyncio as aioredis import ssl from typing import Optional async def test_valkey_tls_connection(): """测试 Valkey TLS 连接""" # 连接参数 host = "asredis-hadmun.serverless.ape1.cache.amazonaws.com" port = 6379 use_tls = True print("=" * 60) print("🧪 Valkey TLS 连接测试") print("=" * 60) print(f"主机: {host}") print(f"端口: {port}") print(f"TLS: {'启用' if use_tls else '禁用'}") print() client: Optional[aioredis.Redis] = None try: # 创建 SSL 上下文(如果需要证书验证) ssl_context = None if use_tls: ssl_context = ssl.create_default_context() # 对于 AWS ElastiCache,通常需要禁用证书验证 # 或者使用 AWS 提供的 CA 证书 ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE # 创建 Redis 客户端 print("🔄 正在连接...") client = await aioredis.from_url( f"rediss://{host}:{port}", # rediss:// 表示使用 TLS ssl=ssl_context if use_tls else None, decode_responses=True, # 自动解码字符串 socket_connect_timeout=10, socket_timeout=10, retry_on_timeout=True, health_check_interval=30 ) # 测试连接 print("📡 测试 PING...") ping_result = await client.ping() print(f"✅ PING 成功: {ping_result}") # 测试 SET print("\n📝 测试 SET 操作...") set_result = await client.set("test:valkey:tls", "Hello Valkey with TLS!") print(f"✅ SET 结果: {set_result}") # 测试 GET print("\n📖 测试 GET 操作...") get_result = await client.get("test:valkey:tls") print(f"✅ GET 结果: {get_result}") # 测试 EXISTS print("\n🔍 测试 EXISTS 操作...") exists_result = await client.exists("test:valkey:tls") print(f"✅ EXISTS 结果: {exists_result}") # 测试缓存数据结构(交易对信息) print("\n📊 测试交易对信息缓存...") symbol_info = { "quantityPrecision": 3, "pricePrecision": 2, "minQty": 0.001, "stepSize": 0.001, "minNotional": 5.0 } import json await client.set("symbol_info:BTCUSDT", json.dumps(symbol_info), ex=3600) cached_info = await client.get("symbol_info:BTCUSDT") if cached_info: print(f"✅ 交易对信息缓存成功: {json.loads(cached_info)}") # 清理测试数据 print("\n🧹 清理测试数据...") await client.delete("test:valkey:tls", "symbol_info:BTCUSDT") print("✅ 清理完成") print("\n" + "=" * 60) print("🎉 Valkey TLS 连接测试通过!") print("=" * 60) except aioredis.ConnectionError as e: print(f"\n❌ 连接错误: {e}") print(" 可能原因:") print(" - Valkey 服务未启动或不可访问") print(" - 网络连接问题") print(" - 安全组/防火墙配置") print(" - TLS 配置不正确") except aioredis.TimeoutError as e: print(f"\n❌ 连接超时: {e}") print(" 可能原因:") print(" - 网络延迟过高") print(" - 服务响应慢") except Exception as e: print(f"\n❌ 未知错误: {e}") import traceback print("\n详细错误信息:") traceback.print_exc() finally: if client: try: await client.close() print("\n🔒 连接已关闭") except Exception as e: print(f"\n⚠️ 关闭连接时出错: {e}") if __name__ == "__main__": asyncio.run(test_valkey_tls_connection())