diff --git a/trading_system/market_scanner.py b/trading_system/market_scanner.py index 13e9aa3..ee483b0 100644 --- a/trading_system/market_scanner.py +++ b/trading_system/market_scanner.py @@ -280,21 +280,39 @@ class MarketScanner: """ # 使用标准WebSocket try: - if self.client and self.client.socket_manager: - # 使用 BinanceSocketManager 的 futures_symbol_ticker_socket 方法 - async with self.client.socket_manager.futures_symbol_ticker_socket(symbol.lower()) as stream: - async for msg in stream: - try: - # WebSocket 返回的数据格式:{'e': '24hrTicker', 's': 'BTCUSDT', 'c': '50000.00', ...} - if 'c' in msg: # 'c' 是当前价格 - price = float(msg['c']) - await callback(symbol, price) - elif 'data' in msg and 'c' in msg['data']: - price = float(msg['data']['c']) - await callback(symbol, price) - except (KeyError, ValueError, TypeError) as e: - logger.debug(f"解析 {symbol} 价格数据失败: {e}") - continue + import aiohttp + import json + + # 直接使用 aiohttp 连接 Binance 期货 WebSocket API + # 根据文档:https://developers.binance.com/docs/derivatives/usds-margined-futures/websocket-market-streams + # 端点:wss://fstream.binance.com/ws/@ticker + ws_url = f"wss://fstream.binance.com/ws/{symbol.lower()}@ticker" + + async with aiohttp.ClientSession() as session: + async with session.ws_connect(ws_url) as ws: + async for msg in ws: + if msg.type == aiohttp.WSMsgType.TEXT: + try: + # 解析 JSON 消息 + data = json.loads(msg.data) + + # WebSocket 返回的数据格式:{'e': '24hrTicker', 's': 'BTCUSDT', 'c': '50000.00', ...} + if isinstance(data, dict): + if 'c' in data: # 'c' 是当前价格 + price = float(data['c']) + await callback(symbol, price) + elif 'data' in data and isinstance(data['data'], dict) and 'c' in data['data']: + price = float(data['data']['c']) + await callback(symbol, price) + except (KeyError, ValueError, TypeError, json.JSONDecodeError) as e: + logger.debug(f"解析 {symbol} 价格数据失败: {e}") + continue + elif msg.type == aiohttp.WSMsgType.ERROR: + logger.warning(f"监控 {symbol} WebSocket错误: {ws.exception()}") + break + elif msg.type == aiohttp.WSMsgType.CLOSE: + logger.info(f"监控 {symbol} WebSocket连接关闭") + break except Exception as e: logger.error(f"监控 {symbol} 价格失败: {e}") diff --git a/trading_system/position_manager.py b/trading_system/position_manager.py index 48525a1..d93effe 100644 --- a/trading_system/position_manager.py +++ b/trading_system/position_manager.py @@ -3,6 +3,8 @@ """ import asyncio import logging +import json +import aiohttp from typing import Dict, List, Optional try: from .binance_client import BinanceClient @@ -745,8 +747,9 @@ class PositionManager: logger.info("实时监控已禁用,跳过启动") return - if not self.client or not self.client.client: - logger.warning("WebSocket未初始化,无法启动实时监控") + # WebSocket 现在直接使用 aiohttp,不需要检查 socket_manager + if not self.client: + logger.warning("客户端未初始化,无法启动实时监控") return # 获取当前所有持仓 @@ -824,8 +827,9 @@ class PositionManager: logger.debug(f"{symbol} 监控任务已存在,跳过") return - if not self.client or not self.client.client: - logger.warning(f"{symbol} WebSocket未初始化,无法启动监控") + # WebSocket 现在直接使用 aiohttp,不需要检查 socket_manager + if not self.client: + logger.warning(f"{symbol} 客户端未初始化,无法启动监控") return try: @@ -873,34 +877,47 @@ class PositionManager: break # 使用WebSocket订阅价格流 - # 使用 BinanceSocketManager 的 futures_symbol_ticker_socket 方法 - # 注意:BinanceSocketManager 需要 AsyncClient 实例 - if not self.client.socket_manager: - raise ValueError("WebSocket管理器未初始化") + # 直接使用 aiohttp 连接 Binance 期货 WebSocket API + # 根据文档:https://developers.binance.com/docs/derivatives/usds-margined-futures/websocket-market-streams + # 端点:wss://fstream.binance.com/ws/@ticker + ws_url = f"wss://fstream.binance.com/ws/{symbol.lower()}@ticker" - async with self.client.socket_manager.futures_symbol_ticker_socket(symbol.lower()) as stream: - logger.debug(f"{symbol} WebSocket连接已建立,开始接收价格更新") - retry_count = 0 # 连接成功,重置重试计数 - - async for msg in stream: - if symbol not in self.active_positions: - logger.info(f"{symbol} 持仓已不存在,停止监控") - break + async with aiohttp.ClientSession() as session: + async with session.ws_connect(ws_url) as ws: + logger.debug(f"{symbol} WebSocket连接已建立,开始接收价格更新") + retry_count = 0 # 连接成功,重置重试计数 - try: - # WebSocket 返回的数据格式:{'e': '24hrTicker', 's': 'BTCUSDT', 'c': '50000.00', ...} - if 'c' in msg: # 'c' 是当前价格 - current_price = float(msg['c']) - # 立即检查止损止盈 - await self._check_single_position(symbol, current_price) - elif 'data' in msg: - # 兼容嵌套的数据格式 - if 'c' in msg['data']: - current_price = float(msg['data']['c']) - await self._check_single_position(symbol, current_price) - except (KeyError, ValueError, TypeError) as e: - logger.debug(f"{symbol} 解析价格数据失败: {e}, 消息: {msg}") - continue + async for msg in ws: + if symbol not in self.active_positions: + logger.info(f"{symbol} 持仓已不存在,停止监控") + break + + if msg.type == aiohttp.WSMsgType.TEXT: + try: + # 解析 JSON 消息 + data = json.loads(msg.data) + + # WebSocket 返回的数据格式:{'e': '24hrTicker', 's': 'BTCUSDT', 'c': '50000.00', ...} + # 根据文档,ticker 流包含 'c' 字段(最后价格) + if isinstance(data, dict): + if 'c' in data: # 'c' 是当前价格 + current_price = float(data['c']) + # 立即检查止损止盈 + await self._check_single_position(symbol, current_price) + elif 'data' in data: + # 兼容组合流格式(如果使用 /stream 端点) + if isinstance(data['data'], dict) and 'c' in data['data']: + current_price = float(data['data']['c']) + await self._check_single_position(symbol, current_price) + except (KeyError, ValueError, TypeError, json.JSONDecodeError) as e: + logger.debug(f"{symbol} 解析价格数据失败: {e}, 消息: {msg.data[:100] if hasattr(msg, 'data') else 'N/A'}") + continue + elif msg.type == aiohttp.WSMsgType.ERROR: + logger.warning(f"{symbol} WebSocket错误: {ws.exception()}") + break + elif msg.type == aiohttp.WSMsgType.CLOSE: + logger.info(f"{symbol} WebSocket连接关闭") + break except asyncio.CancelledError: logger.info(f"{symbol} 监控任务已取消")