From f99f508b0957e25932b668977342b26fa3b1f839 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=96=87=E8=96=87=E5=AE=89?= Date: Wed, 14 Jan 2026 13:43:06 +0800 Subject: [PATCH] a --- backend/config_manager.py | 2 - backend/database/init.sql | 2 - backend/init_config.py | 2 - backend/requirements.txt | 1 - frontend/src/components/ConfigPanel.jsx | 1 - trading_system/README.md | 1 - trading_system/WEBSOCKET_LICENSE_FIX.md | 85 ----- trading_system/WEBSOCKET_TROUBLESHOOTING.md | 143 ------- trading_system/binance_client.py | 125 +------ trading_system/config.py | 2 - trading_system/main.py | 33 -- trading_system/market_scanner.py | 15 +- trading_system/requirements.txt | 2 - trading_system/strategy.py | 13 +- trading_system/unicorn_websocket.py | 391 -------------------- 15 files changed, 23 insertions(+), 795 deletions(-) delete mode 100644 trading_system/WEBSOCKET_LICENSE_FIX.md delete mode 100644 trading_system/WEBSOCKET_TROUBLESHOOTING.md delete mode 100644 trading_system/unicorn_websocket.py diff --git a/backend/config_manager.py b/backend/config_manager.py index 768f9a2..a5ba403 100644 --- a/backend/config_manager.py +++ b/backend/config_manager.py @@ -139,8 +139,6 @@ class ConfigManager: 'TRAILING_STOP_ACTIVATION': self.get('TRAILING_STOP_ACTIVATION', 0.01), 'TRAILING_STOP_PROTECT': self.get('TRAILING_STOP_PROTECT', 0.01), - # Unicorn WebSocket配置 - 'USE_UNICORN_WEBSOCKET': self.get('USE_UNICORN_WEBSOCKET', True), } diff --git a/backend/database/init.sql b/backend/database/init.sql index 87d9272..f4270c1 100644 --- a/backend/database/init.sql +++ b/backend/database/init.sql @@ -113,8 +113,6 @@ INSERT INTO `trading_config` (`config_key`, `config_value`, `config_type`, `cate ('TRAILING_STOP_ACTIVATION', '0.01', 'number', 'strategy', '移动止损激活阈值(盈利1%后激活)'), ('TRAILING_STOP_PROTECT', '0.01', 'number', 'strategy', '移动止损保护利润(保护1%利润)'), --- Unicorn WebSocket配置 -('USE_UNICORN_WEBSOCKET', 'true', 'boolean', 'strategy', '是否使用Unicorn WebSocket'), -- API配置 ('BINANCE_API_KEY', '', 'string', 'api', '币安API密钥'), diff --git a/backend/init_config.py b/backend/init_config.py index 9faa136..9d705e1 100644 --- a/backend/init_config.py +++ b/backend/init_config.py @@ -57,8 +57,6 @@ def init_configs(): category = 'risk' elif 'SCAN' in key or 'INTERVAL' in key or 'VOLUME' in key: category = 'scan' - elif 'WEBSOCKET' in key or 'UNICORN' in key: - category = 'websocket' else: category = 'strategy' diff --git a/backend/requirements.txt b/backend/requirements.txt index 9fe0b58..5938355 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -15,4 +15,3 @@ python-dotenv==1.0.0 python-binance==1.0.19 websocket-client==1.6.1 aiohttp==3.9.1 -unicorn-binance-websocket-api==2.4.0 diff --git a/frontend/src/components/ConfigPanel.jsx b/frontend/src/components/ConfigPanel.jsx index a03c570..29a58fa 100644 --- a/frontend/src/components/ConfigPanel.jsx +++ b/frontend/src/components/ConfigPanel.jsx @@ -462,7 +462,6 @@ const getConfigDetail = (key) => { 'USE_TRAILING_STOP': '是否启用移动止损(true/false)。启用后,当盈利达到激活阈值时,止损会自动跟踪价格,保护利润。适合趋势行情,可以捕捉更大的利润空间。建议:平衡和激进策略启用,保守策略可关闭。', 'TRAILING_STOP_ACTIVATION': '移动止损激活阈值(百分比,如0.01表示1%)。当盈利达到此百分比时,移动止损开始跟踪价格,将止损移至成本价(保本)。值越小激活越早,更早保护利润但可能过早退出。值越大激活越晚,给价格更多波动空间。建议:1-2%。', 'TRAILING_STOP_PROTECT': '移动止损保护利润(百分比,如0.01表示1%)。当价格从最高点回撤达到此百分比时,触发止损平仓,锁定利润。值越小保护更严格,能锁定更多利润但可能过早退出。值越大允许更大回撤,可能捕捉更大趋势但利润可能回吐。建议:1-2%。', - 'USE_UNICORN_WEBSOCKET': '是否使用高性能Unicorn WebSocket API(true/false)。启用后使用unicorn-binance-websocket-api获取实时数据,效率更高,减少API请求频率限制。建议:启用(true)。', // API配置 'BINANCE_API_KEY': '币安API密钥。用于访问币安账户的凭证,需要启用"合约交易"权限。请妥善保管,不要泄露。', diff --git a/trading_system/README.md b/trading_system/README.md index 9db2546..b3661d2 100644 --- a/trading_system/README.md +++ b/trading_system/README.md @@ -15,7 +15,6 @@ trading_system/ ├── position_manager.py # 仓位管理 ├── strategy.py # 交易策略 ├── indicators.py # 技术指标 -├── unicorn_websocket.py # Unicorn WebSocket └── requirements.txt # 依赖 ``` diff --git a/trading_system/WEBSOCKET_LICENSE_FIX.md b/trading_system/WEBSOCKET_LICENSE_FIX.md deleted file mode 100644 index 6567431..0000000 --- a/trading_system/WEBSOCKET_LICENSE_FIX.md +++ /dev/null @@ -1,85 +0,0 @@ -# WebSocket 许可证问题解决方案 - -## 问题描述 - -`unicorn-binance-websocket-api` 2.4.0 版本需要 LUCIT 商业许可证,导致 WebSocket 管理器无法启动。 - -## 解决方案 - -### 方案1:使用标准 WebSocket(推荐,已实现) - -系统已自动回退到使用 `python-binance` 自带的 `BinanceSocketManager`,功能不受影响。 - -**优点**: -- ✅ 无需许可证 -- ✅ 完全免费 -- ✅ 功能完整 -- ✅ 已集成在系统中 - -**缺点**: -- ⚠️ 性能略低于 Unicorn(但对大多数场景足够) - -### 方案2:降级到旧版本(可选) - -如果需要使用 Unicorn WebSocket,可以降级到不需要许可证的旧版本: - -```bash -pip uninstall unicorn-binance-websocket-api -pip install unicorn-binance-websocket-api==1.45.0 -``` - -**注意**: 旧版本可能功能较少,但不需要许可证。 - -### 方案3:获取 LUCIT 许可证(不推荐) - -如果需要使用 Unicorn WebSocket 2.4.0+ 的高级功能,可以获取 LUCIT 许可证: -- 参考: https://medium.lucit.tech/87b0088124a8 -- 需要商业许可证费用 - -## 当前实现 - -系统已实现智能回退机制: - -1. **优先尝试 Unicorn WebSocket** - - 如果可用且不需要许可证,使用 Unicorn - - 性能最优 - -2. **自动回退到标准 WebSocket** - - 如果 Unicorn 不可用或需要许可证 - - 使用 `python-binance` 的 `BinanceSocketManager` - - 功能完全兼容 - -3. **价格缓存机制** - - 无论使用哪种 WebSocket,都会维护价格缓存 - - 减少 REST API 调用 - -## 验证标准 WebSocket 是否工作 - -查看日志,应该看到: - -``` -⚠ Unicorn WebSocket管理器启动失败,将使用标准WebSocket -提示: 系统将使用python-binance的标准WebSocket,功能不受影响 -``` - -系统会继续正常运行,使用标准 WebSocket 获取实时价格数据。 - -## 性能对比 - -| 特性 | Unicorn WebSocket | 标准 WebSocket | -|------|------------------|---------------| -| 性能 | 高 | 中 | -| 许可证 | 需要(2.4.0+) | 不需要 | -| 功能 | 完整 | 完整 | -| 稳定性 | 高 | 高 | -| 推荐度 | ⭐⭐⭐ | ⭐⭐⭐⭐⭐ | - -## 结论 - -**推荐使用标准 WebSocket**(当前实现): -- ✅ 无需许可证 -- ✅ 功能完整 -- ✅ 性能足够 -- ✅ 系统已自动处理 - -系统会自动使用标准 WebSocket,无需额外配置。 diff --git a/trading_system/WEBSOCKET_TROUBLESHOOTING.md b/trading_system/WEBSOCKET_TROUBLESHOOTING.md deleted file mode 100644 index c922794..0000000 --- a/trading_system/WEBSOCKET_TROUBLESHOOTING.md +++ /dev/null @@ -1,143 +0,0 @@ -# WebSocket 故障排除指南 - -## 常见错误及解决方案 - -### 错误1: `__init__() got an unexpected keyword argument 'throw_exception_if_unrepairable'` - -**原因**: -- `unicorn-binance-websocket-api` 2.4.0 版本不支持 `throw_exception_if_unrepairable` 参数 -- 该参数可能在新版本中被移除或改名 - -**解决方案**: -- ✅ 已修复:移除了不支持的参数 -- ✅ 已添加:多种初始化方式的回退机制 -- ✅ 已优化:更好的错误日志输出 - -**修复后的行为**: -1. 首先尝试使用 `high_performance=True` 参数 -2. 如果失败,尝试只使用 `exchange` 参数 -3. 如果还是失败,尝试使用位置参数 -4. 所有方式都失败时,输出详细的错误信息 - -### 错误2: `ModuleNotFoundError: No module named 'unicorn_binance_websocket_api'` - -**原因**: -- 未安装 `unicorn-binance-websocket-api` 依赖 - -**解决方案**: -```bash -cd trading_system -pip install -r requirements.txt -# 或单独安装 -pip install unicorn-binance-websocket-api==2.4.0 -``` - -### 错误3: WebSocket连接失败 - -**可能原因**: -1. 网络连接问题 -2. 防火墙阻止 -3. Binance服务器问题 - -**检查步骤**: -```bash -# 检查网络连接 -ping stream.binance.com - -# 检查DNS解析 -nslookup stream.binance.com -``` - -**解决方案**: -- 检查服务器网络配置 -- 检查防火墙规则 -- 等待Binance服务恢复 - -### 错误4: WebSocket订阅失败 - -**可能原因**: -1. 订阅的交易对数量过多 -2. 交易对格式错误 -3. WebSocket连接不稳定 - -**解决方案**: -- 减少订阅数量(调整 `WEBSOCKET_SUBSCRIBE_COUNT`) -- 检查交易对格式(应该是大写,如 `BTCUSDT`) -- 查看日志中的详细错误信息 - -## 验证WebSocket是否正常工作 - -### 1. 查看启动日志 - -正常启动应该看到: -``` -✓ Unicorn WebSocket管理器启动成功 (测试网: False) - 交易所: binance.com-futures -✓ 已成功订阅 100 个交易对的实时价格流 -✓ 价格数据将通过WebSocket实时更新,减少REST API调用 -``` - -### 2. 查看运行时日志 - -正常运行时应该看到: -``` -✓ [WebSocket] 从缓存获取 BTCUSDT 价格: 43250.50000000 (缓存年龄: 2.3秒) -``` - -### 3. 查看统计信息 - -每次扫描后应该看到: -``` -WebSocket状态: 订阅=100个交易对, 缓存=100个价格, 活跃流=1个 -``` - -## 调试技巧 - -### 启用详细日志 - -在 `config.py` 或环境变量中设置: -```python -LOG_LEVEL = 'DEBUG' -``` - -### 检查WebSocket状态 - -在代码中添加: -```python -if client.unicorn_manager: - stats = client.unicorn_manager.get_stream_statistics() - print(f"WebSocket统计: {stats}") -``` - -### 手动测试WebSocket连接 - -```python -from unicorn_binance_websocket_api.manager import BinanceWebSocketApiManager - -manager = BinanceWebSocketApiManager(exchange="binance.com-futures") -stream_id = manager.create_stream(["arr"], ["btcusdt@ticker"]) -print(f"Stream ID: {stream_id}") -``` - -## 版本兼容性 - -### 支持的版本 -- `unicorn-binance-websocket-api >= 2.0.0` -- 推荐版本: `2.4.0` - -### 已知问题 -- 2.4.0版本不支持 `throw_exception_if_unrepairable` 参数(已修复) -- 某些旧版本可能不支持 `high_performance` 参数(已添加回退) - -## 性能优化建议 - -1. **订阅数量**: 根据实际需求调整,不要订阅过多 -2. **缓存TTL**: 默认60秒,可以根据需要调整 -3. **日志级别**: 生产环境使用 `INFO`,调试时使用 `DEBUG` - -## 联系支持 - -如果问题仍然存在: -1. 查看完整的错误日志 -2. 检查 `unicorn-binance-websocket-api` 版本 -3. 查看 [Unicorn Binance WebSocket API 文档](https://github.com/LUCIT-Systems-and-Development/unicorn-binance-websocket-api) diff --git a/trading_system/binance_client.py b/trading_system/binance_client.py index f20e3df..7d77788 100644 --- a/trading_system/binance_client.py +++ b/trading_system/binance_client.py @@ -6,10 +6,6 @@ import logging from typing import Dict, List, Optional, Any from binance import AsyncClient, BinanceSocketManager from binance.exceptions import BinanceAPIException -try: - from .unicorn_websocket import UnicornWebSocketManager -except ImportError: - from unicorn_websocket import UnicornWebSocketManager try: from . import config @@ -36,8 +32,6 @@ class BinanceClient: self.testnet = testnet or config.USE_TESTNET self.client: Optional[AsyncClient] = None self.socket_manager: Optional[BinanceSocketManager] = None - self.unicorn_manager: Optional[UnicornWebSocketManager] = None - self.use_unicorn = config.TRADING_CONFIG.get('USE_UNICORN_WEBSOCKET', True) self._symbol_info_cache: Dict[str, Dict] = {} # 缓存交易对信息 self._last_request_time = {} # 记录每个API端点的最后请求时间 self._request_delay = 0.1 # 请求间隔(秒),避免频率限制 @@ -81,24 +75,6 @@ class BinanceClient: self.socket_manager = BinanceSocketManager(self.client) logger.info(f"✓ 币安客户端连接成功 (测试网: {self.testnet})") - # 启动Unicorn WebSocket(如果启用) - if self.use_unicorn: - try: - self.unicorn_manager = UnicornWebSocketManager(testnet=self.testnet) - if self.unicorn_manager.start(): - logger.info("✓ Unicorn WebSocket管理器启动成功") - # 启动异步数据处理 - asyncio.create_task(self.unicorn_manager.process_stream_data()) - logger.info("WebSocket价格缓存已启用,将在订阅交易对后自动更新") - else: - logger.warning("⚠ Unicorn WebSocket管理器启动失败,将使用标准WebSocket") - logger.info("提示: 系统将使用python-binance的标准WebSocket,功能不受影响") - self.unicorn_manager = None - except Exception as e: - logger.warning(f"⚠ 启动Unicorn WebSocket失败: {e},将使用标准WebSocket") - logger.info("提示: 系统将使用python-binance的标准WebSocket,功能不受影响") - self.unicorn_manager = None - # 验证API密钥权限 await self._verify_api_permissions() @@ -152,10 +128,6 @@ class BinanceClient: async def disconnect(self): """断开连接""" - # 停止Unicorn WebSocket - if self.unicorn_manager: - self.unicorn_manager.stop() - self.unicorn_manager = None if self.client: await self.client.close_connection() @@ -228,44 +200,6 @@ class BinanceClient: logger.error(f"获取 {symbol} K线数据失败: {e}") return [] - def _on_price_update(self, symbol: str, price: float = None, price_data: Dict = None): - """ - WebSocket价格更新回调,自动更新缓存 - - Args: - symbol: 交易对 - price: 最新价格(可选) - price_data: 完整的ticker数据(可选) - """ - import time - try: - # 如果提供了price_data,优先使用它 - if price_data and isinstance(price_data, dict): - # 从price_data中提取信息 - current_price = float(price_data.get('c', price_data.get('lastPrice', price or 0))) - volume = float(price_data.get('v', price_data.get('quoteVolume', 0))) - change_percent = float(price_data.get('P', price_data.get('priceChangePercent', 0))) - elif price: - # 如果只有价格,使用价格 - current_price = price - volume = 0 - change_percent = 0 - else: - return - - self._price_cache[symbol] = { - 'symbol': symbol, - 'price': current_price, - 'volume': volume, - 'changePercent': change_percent, - 'timestamp': time.time() - } - # 每100次更新输出一次日志(避免日志过多) - if len(self._price_cache) % 100 == 0: - logger.info(f"✓ [WebSocket] 价格缓存已更新 {len(self._price_cache)} 个交易对,最新: {symbol} = {current_price:.8f}") - except Exception as e: - logger.debug(f"更新价格缓存失败 {symbol}: {e}") - async def get_ticker_24h(self, symbol: str) -> Optional[Dict]: """ 获取24小时行情数据(合约市场) @@ -284,7 +218,7 @@ class BinanceClient: cached = self._price_cache[symbol] cache_age = time.time() - cached.get('timestamp', 0) if cache_age < self._price_cache_ttl: - logger.debug(f"✓ [WebSocket] 从缓存获取 {symbol} 价格: {cached['price']:.8f} (缓存年龄: {cache_age:.1f}秒)") + logger.debug(f"从缓存获取 {symbol} 价格: {cached['price']:.8f} (缓存年龄: {cache_age:.1f}秒)") return { 'symbol': symbol, 'price': cached['price'], @@ -292,10 +226,10 @@ class BinanceClient: 'changePercent': cached.get('changePercent', 0) } else: - logger.debug(f"⚠ [WebSocket] {symbol} 缓存已过期 ({cache_age:.1f}秒 > {self._price_cache_ttl}秒),使用REST API") + logger.debug(f"{symbol} 缓存已过期 ({cache_age:.1f}秒 > {self._price_cache_ttl}秒),使用REST API") # 如果缓存不可用或过期,使用REST API(fallback) - logger.debug(f"⚠ [REST API] {symbol} 未在WebSocket缓存中,使用REST API获取") + logger.debug(f"{symbol} 未在价格缓存中,使用REST API获取") try: ticker = await self._rate_limited_request( f'ticker_{symbol}', @@ -311,13 +245,12 @@ class BinanceClient: 'volume': float(stats.get('quoteVolume', 0)), 'changePercent': float(stats.get('priceChangePercent', 0)) } - # 更新缓存(如果WebSocket已启用,但该交易对未订阅) - if self.unicorn_manager: - import time - self._price_cache[symbol] = { - **result, - 'timestamp': time.time() - } + # 更新缓存 + import time + self._price_cache[symbol] = { + **result, + 'timestamp': time.time() + } return result except BinanceAPIException as e: error_code = e.code if hasattr(e, 'code') else None @@ -670,44 +603,20 @@ class BinanceClient: logger.error(f"设置杠杆失败: {e}") return False - def subscribe_realtime_prices(self, symbols: List[str], callback = None) -> bool: - """ - 订阅实时价格流(使用Unicorn) - - Args: - symbols: 交易对列表 - callback: 可选的价格更新回调函数 callback(symbol, price, price_data) - 如果不提供,将使用默认的缓存更新回调 - - Returns: - 是否成功 - """ - if not self.unicorn_manager: - logger.warning("Unicorn WebSocket未启用,无法订阅实时价格") - return False - - try: - # 如果没有提供回调,使用默认的缓存更新回调 - if callback is None: - callback = self._on_price_update - - self.unicorn_manager.subscribe_ticker(symbols, callback) - logger.info(f"订阅 {len(symbols)} 个交易对的实时价格流") - return True - except Exception as e: - logger.error(f"订阅实时价格流失败: {e}") - return False - def get_realtime_price(self, symbol: str) -> Optional[float]: """ - 获取实时价格(从Unicorn WebSocket) + 获取实时价格(从缓存) Args: symbol: 交易对 Returns: - 实时价格,如果未订阅则返回None + 实时价格,如果缓存中有则返回,否则返回None """ - if self.unicorn_manager: - return self.unicorn_manager.get_realtime_price(symbol) + import time + if symbol in self._price_cache: + cached = self._price_cache[symbol] + cache_age = time.time() - cached.get('timestamp', 0) + if cache_age < self._price_cache_ttl: + return cached.get('price') return None \ No newline at end of file diff --git a/trading_system/config.py b/trading_system/config.py index 6ac6a8e..1db135a 100644 --- a/trading_system/config.py +++ b/trading_system/config.py @@ -155,8 +155,6 @@ def _get_trading_config(): 'USE_TRAILING_STOP': True, 'TRAILING_STOP_ACTIVATION': 0.01, 'TRAILING_STOP_PROTECT': 0.01, - 'USE_UNICORN_WEBSOCKET': True, - 'WEBSOCKET_SUBSCRIBE_COUNT': 100, # WebSocket订阅的交易对数量(0表示订阅所有) } # 币安API配置(优先从数据库,回退到环境变量和默认值) diff --git a/trading_system/main.py b/trading_system/main.py index 2fbb5b4..5b9e2df 100644 --- a/trading_system/main.py +++ b/trading_system/main.py @@ -145,39 +145,6 @@ async def main(): logger.error("请先充值到合约账户") return - # 3. 订阅所有USDT交易对的实时价格流(WebSocket优化) - if client.unicorn_manager: - logger.info("=" * 60) - logger.info("初始化WebSocket实时价格订阅...") - try: - all_pairs = await client.get_all_usdt_pairs() - if all_pairs: - # 从配置读取订阅数量(0表示订阅所有) - subscribe_count = config.TRADING_CONFIG.get('WEBSOCKET_SUBSCRIBE_COUNT', 100) - if subscribe_count == 0: - pairs_to_subscribe = all_pairs - logger.info(f"配置为订阅所有交易对,共 {len(all_pairs)} 个") - else: - pairs_to_subscribe = all_pairs[:subscribe_count] - logger.info(f"配置订阅数量: {subscribe_count},实际可用: {len(all_pairs)} 个") - - if client.subscribe_realtime_prices(pairs_to_subscribe): - logger.info(f"✓ 已成功订阅 {len(pairs_to_subscribe)} 个交易对的实时价格流") - logger.info("✓ 价格数据将通过WebSocket实时更新,减少REST API调用") - logger.info("✓ WebSocket缓存已启用,价格查询将优先使用缓存") - - # 输出WebSocket统计信息 - stats = client.unicorn_manager.get_stream_statistics() - logger.info(f"WebSocket统计: 总流数={stats.get('total_streams', 0)}, 活跃流数={stats.get('active_streams', 0)}") - else: - logger.warning("⚠ 订阅实时价格流失败,将使用REST API作为fallback") - else: - logger.warning("⚠ 未获取到交易对列表,无法订阅WebSocket") - except Exception as e: - logger.warning(f"⚠ 订阅实时价格流时出错: {e},将使用REST API作为fallback") - logger.info("=" * 60) - else: - logger.warning("⚠ Unicorn WebSocket未启用,所有价格查询将使用REST API") # 4. 初始化各个模块 logger.info("初始化交易模块...") diff --git a/trading_system/market_scanner.py b/trading_system/market_scanner.py index 8599174..42f593b 100644 --- a/trading_system/market_scanner.py +++ b/trading_system/market_scanner.py @@ -264,23 +264,12 @@ class MarketScanner: async def monitor_price(self, symbol: str, callback) -> None: """ 监控单个交易对的价格变化(WebSocket) - 优先使用Unicorn,如果不可用则使用标准WebSocket Args: symbol: 交易对 callback: 价格变化回调函数 """ - # 优先使用Unicorn WebSocket - if self.client.unicorn_manager: - try: - # 订阅实时价格 - self.client.subscribe_realtime_prices([symbol], callback) - logger.info(f"使用Unicorn监控 {symbol} 价格") - return - except Exception as e: - logger.warning(f"Unicorn监控失败,使用标准WebSocket: {e}") - - # 回退到标准WebSocket + # 使用标准WebSocket try: if self.client.socket_manager: async with self.client.socket_manager.futures_socket(symbol.lower()) as stream: @@ -293,7 +282,7 @@ class MarketScanner: def get_realtime_price(self, symbol: str) -> Optional[float]: """ - 获取实时价格(从Unicorn WebSocket) + 获取实时价格(从缓存) Args: symbol: 交易对 diff --git a/trading_system/requirements.txt b/trading_system/requirements.txt index 3c110c5..34a35a6 100644 --- a/trading_system/requirements.txt +++ b/trading_system/requirements.txt @@ -2,8 +2,6 @@ python-binance==1.0.19 websocket-client==1.6.1 aiohttp==3.9.1 -# unicorn-binance-websocket-api 2.4.0需要LUCIT许可证,改用1.x版本或使用标准WebSocket -# unicorn-binance-websocket-api==1.45.0 # 可选:旧版本不需要许可证 # 数据库依赖(用于从数据库读取配置) pymysql==1.1.0 diff --git a/trading_system/strategy.py b/trading_system/strategy.py index 346aec1..438f29f 100644 --- a/trading_system/strategy.py +++ b/trading_system/strategy.py @@ -156,15 +156,10 @@ class TradingStrategy: f"可用余额: {summary['availableBalance']:.2f} USDT" ) - # 输出WebSocket使用统计 - if self.client.unicorn_manager: - stats = self.client.unicorn_manager.get_stream_statistics() - cache_size = len(self.client._price_cache) - logger.info( - f"WebSocket状态: 订阅={stats.get('total_streams', 0)}个交易对, " - f"缓存={cache_size}个价格, " - f"活跃流={stats.get('active_streams', 0)}个" - ) + # 输出价格缓存统计 + cache_size = len(self.client._price_cache) + if cache_size > 0: + logger.info(f"价格缓存: {cache_size}个交易对") # 记录账户快照到数据库 try: diff --git a/trading_system/unicorn_websocket.py b/trading_system/unicorn_websocket.py deleted file mode 100644 index 2d98737..0000000 --- a/trading_system/unicorn_websocket.py +++ /dev/null @@ -1,391 +0,0 @@ -""" -Unicorn WebSocket模块 - 提供高性能实时数据流 -""" -import asyncio -import logging -from typing import Dict, List, Optional, Callable - -# 尝试导入Unicorn WebSocket(可选,如果未安装或需要许可证则使用标准WebSocket) -_unicorn_available = False -BinanceWebSocketApiManager = None - -try: - # 新版本导入路径 - from unicorn_binance_websocket_api.manager import BinanceWebSocketApiManager - _unicorn_available = True -except ImportError: - try: - # 兼容旧版本路径 - from unicorn_binance_websocket_api.unicorn_binance_websocket_api_manager import ( - BinanceWebSocketApiManager, - ) - _unicorn_available = True - except ImportError: - # Unicorn WebSocket不可用,将使用标准WebSocket - _unicorn_available = False - BinanceWebSocketApiManager = None - except Exception: - # 其他错误(如许可证问题),也标记为不可用 - _unicorn_available = False - BinanceWebSocketApiManager = None - -try: - from . import config -except ImportError: - import config - -logger = logging.getLogger(__name__) - - -class UnicornWebSocketManager: - """Unicorn WebSocket管理器""" - - def __init__(self, testnet: bool = False): - """ - 初始化Unicorn WebSocket管理器 - - Args: - testnet: 是否使用测试网 - """ - self.testnet = testnet or config.USE_TESTNET - self.manager: Optional[BinanceWebSocketApiManager] = None - self.stream_ids: Dict[str, str] = {} # symbol -> stream_id - self.price_callbacks: Dict[str, List[Callable]] = {} # symbol -> callbacks - self.running = False - - def start(self): - """启动WebSocket管理器""" - # 检查Unicorn是否可用 - if not _unicorn_available or BinanceWebSocketApiManager is None: - logger.warning("Unicorn WebSocket不可用(未安装或需要许可证),将使用标准WebSocket") - return False - - try: - # 创建管理器 - # 注意:unicorn-binance-websocket-api 2.4.0版本需要LUCIT许可证 - exchange = "binance.com-futures" if not self.testnet else "binance.com-futures-testnet" - - # 尝试不同的初始化方式(兼容不同版本) - try: - # 方式1:使用high_performance参数(如果支持) - self.manager = BinanceWebSocketApiManager( - exchange=exchange, - high_performance=True - ) - logger.debug("使用 high_performance=True 参数初始化成功") - except (TypeError, ValueError) as e1: - # 方式2:只使用exchange参数 - try: - self.manager = BinanceWebSocketApiManager(exchange=exchange) - logger.debug("使用 exchange 参数初始化成功") - except (TypeError, ValueError) as e2: - # 方式3:使用位置参数 - try: - self.manager = BinanceWebSocketApiManager(exchange) - logger.debug("使用位置参数初始化成功") - except Exception as e3: - logger.error(f"所有初始化方式都失败:") - logger.error(f" 方式1错误: {e1}") - logger.error(f" 方式2错误: {e2}") - logger.error(f" 方式3错误: {e3}") - raise e3 - except Exception as e: - # 捕获许可证错误或其他异常 - error_msg = str(e) - if "license" in error_msg.lower() or "lucit" in error_msg.lower(): - logger.warning(f"Unicorn WebSocket需要LUCIT许可证: {e}") - logger.warning("将使用标准WebSocket作为替代方案") - return False - raise - - self.running = True - logger.info(f"✓ Unicorn WebSocket管理器启动成功 (测试网: {self.testnet})") - logger.info(f" 交易所: {exchange}") - return True - except Exception as e: - error_msg = str(e) - error_type = type(e).__name__ - - # 检查是否是许可证相关错误 - if "license" in error_msg.lower() or "lucit" in error_msg.lower() or "NoValidatedLucitLicense" in error_type: - logger.warning(f"Unicorn WebSocket需要LUCIT许可证: {e}") - logger.warning("将使用标准WebSocket作为替代方案") - logger.info("提示: 如果需要使用Unicorn WebSocket,请:") - logger.info(" 1. 获取LUCIT许可证: https://medium.lucit.tech/87b0088124a8") - logger.info(" 2. 或降级到旧版本: pip install unicorn-binance-websocket-api==1.45.0") - return False - - logger.error(f"启动Unicorn WebSocket管理器失败: {e}") - logger.error(f"错误类型: {error_type}") - import traceback - logger.debug(f"详细错误信息:\n{traceback.format_exc()}") - return False - - def stop(self): - """停止WebSocket管理器""" - self.running = False - if self.manager: - # 停止所有流 - for stream_id in self.stream_ids.values(): - try: - self.manager.stop_stream(stream_id) - except: - pass - - # 停止管理器 - try: - self.manager.stop_manager_with_all_streams() - except: - pass - - logger.info("Unicorn WebSocket管理器已停止") - - def subscribe_ticker(self, symbols: List[str], callback: Callable) -> Dict[str, str]: - """ - 订阅交易对的价格流 - - Args: - symbols: 交易对列表 - callback: 价格更新回调函数 callback(symbol, price_data) - - Returns: - 交易对到stream_id的映射 - """ - if not self.manager: - logger.error("WebSocket管理器未启动") - return {} - - stream_ids = {} - - try: - # 构建流名称列表 - streams = [] - for symbol in symbols: - # 转换为小写(币安要求) - symbol_lower = symbol.lower() - # 订阅ticker流 - stream_name = f"{symbol_lower}@ticker" - streams.append(stream_name) - - # 注册回调 - if symbol not in self.price_callbacks: - self.price_callbacks[symbol] = [] - self.price_callbacks[symbol].append(callback) - - # 创建多路复用流 - if streams: - stream_id = self.manager.create_stream( - ["arr"], - streams, - output="UnicornFy" - ) - - # 记录stream_id - for symbol in symbols: - self.stream_ids[symbol] = stream_id - stream_ids[symbol] = stream_id - - logger.info(f"订阅 {len(symbols)} 个交易对的价格流") - - return stream_ids - - except Exception as e: - logger.error(f"订阅价格流失败: {e}") - return {} - - def subscribe_kline( - self, - symbols: List[str], - interval: str = "5m", - callback: Optional[Callable] = None - ) -> Dict[str, str]: - """ - 订阅K线数据流 - - Args: - symbols: 交易对列表 - interval: K线周期(1m, 5m, 15m等) - callback: K线更新回调函数 callback(symbol, kline_data) - - Returns: - 交易对到stream_id的映射 - """ - if not self.manager: - logger.error("WebSocket管理器未启动") - return {} - - stream_ids = {} - - try: - # 构建流名称列表 - streams = [] - for symbol in symbols: - symbol_lower = symbol.lower() - stream_name = f"{symbol_lower}@kline_{interval}" - streams.append(stream_name) - - # 创建多路复用流 - if streams: - stream_id = self.manager.create_stream( - ["arr"], - streams, - output="UnicornFy" - ) - - for symbol in symbols: - stream_ids[symbol] = stream_id - - logger.info(f"订阅 {len(symbols)} 个交易对的K线流 ({interval})") - - return stream_ids - - except Exception as e: - logger.error(f"订阅K线流失败: {e}") - return {} - - def get_realtime_price(self, symbol: str) -> Optional[float]: - """ - 获取实时价格(从WebSocket流缓冲区中) - - Args: - symbol: 交易对 - - Returns: - 实时价格,如果未订阅则返回None - """ - if not self.manager or symbol not in self.stream_ids: - return None - - try: - stream_id = self.stream_ids[symbol] - # 从流缓冲区获取最新数据 - data = self.manager.pop_stream_data_from_stream_buffer(stream_id) - - if data and isinstance(data, dict): - # 解析ticker数据 - if 'event_type' in data and data['event_type'] == '24hrTicker': - price_data = data.get('data', {}) - if 'c' in price_data: # 最新价格 - return float(price_data['c']) - elif 'data' in data and isinstance(data['data'], dict): - if 'c' in data['data']: # 最新价格 - return float(data['data']['c']) - elif 'close' in data['data']: # K线收盘价 - return float(data['data']['close']) - - return None - - except Exception as e: - logger.debug(f"获取 {symbol} 实时价格失败: {e}") - return None - - async def process_stream_data(self): - """ - 处理WebSocket流数据(异步) - """ - if not self.manager: - return - - while self.running: - try: - # 处理所有流的数据 - for symbol, stream_id in list(self.stream_ids.items()): - try: - # 获取流数据(非阻塞) - stream_data = self.manager.pop_stream_data_from_stream_buffer(stream_id) - if stream_data: - # 处理数据 - await self._handle_stream_data(symbol, stream_data) - except Exception as e: - logger.debug(f"处理 {symbol} 流数据失败: {e}") - - # 短暂休眠,避免CPU占用过高 - await asyncio.sleep(0.1) - - except Exception as e: - logger.error(f"处理WebSocket流数据失败: {e}") - await asyncio.sleep(1) - - async def _handle_stream_data(self, symbol: str, data: Dict): - """ - 处理单个流的数据 - - Args: - symbol: 交易对 - data: 流数据 - """ - try: - if not data or not isinstance(data, dict): - return - - # 处理ticker数据 - if 'event_type' in data and data['event_type'] == '24hrTicker': - price_data = data.get('data', {}) - if price_data: - # 提取交易对符号(从stream名称或数据中) - stream_symbol = symbol - if not stream_symbol and 's' in price_data: - stream_symbol = price_data['s'] # 交易对符号 - - if stream_symbol: - price = float(price_data.get('c', price_data.get('lastPrice', 0))) - # 调用所有注册的回调 - if stream_symbol in self.price_callbacks: - for callback in self.price_callbacks[stream_symbol]: - try: - if asyncio.iscoroutinefunction(callback): - await callback(stream_symbol, price, price_data) - else: - callback(stream_symbol, price, price_data) - except Exception as e: - logger.debug(f"回调函数执行失败: {e}") - - # 处理K线数据 - elif 'event_type' in data and data['event_type'] == 'kline': - kline_data = data.get('data', {}) - if 'k' in kline_data: - kline = kline_data['k'] - # 可以在这里处理K线更新 - logger.debug(f"{symbol} K线更新: {kline.get('c', 'N/A')}") - - except Exception as e: - logger.debug(f"处理 {symbol} 流数据失败: {e}") - - def unsubscribe(self, symbol: str): - """ - 取消订阅交易对 - - Args: - symbol: 交易对 - """ - if symbol in self.stream_ids: - stream_id = self.stream_ids[symbol] - try: - self.manager.stop_stream(stream_id) - del self.stream_ids[symbol] - if symbol in self.price_callbacks: - del self.price_callbacks[symbol] - logger.info(f"取消订阅 {symbol}") - except Exception as e: - logger.error(f"取消订阅 {symbol} 失败: {e}") - - def get_stream_statistics(self) -> Dict: - """ - 获取流统计信息 - - Returns: - 统计信息字典 - """ - if not self.manager: - return {} - - try: - stats = { - 'total_streams': len(self.stream_ids), - 'active_streams': len([s for s in self.stream_ids.values() if s]), - 'subscribed_symbols': list(self.stream_ids.keys()) - } - return stats - except Exception as e: - logger.error(f"获取流统计信息失败: {e}") - return {}