From 043276bb81dfc2476e8b14a27336431d8eb25d93 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=96=87=E8=96=87=E5=AE=89?= Date: Wed, 14 Jan 2026 12:03:26 +0800 Subject: [PATCH] a --- trading_system/binance_client.py | 74 +++++++++++++++++++++++++++-- trading_system/main.py | 20 +++++++- trading_system/unicorn_websocket.py | 30 +++++++----- 3 files changed, 108 insertions(+), 16 deletions(-) diff --git a/trading_system/binance_client.py b/trading_system/binance_client.py index 56fdfb7..e6f3734 100644 --- a/trading_system/binance_client.py +++ b/trading_system/binance_client.py @@ -42,6 +42,8 @@ class BinanceClient: self._last_request_time = {} # 记录每个API端点的最后请求时间 self._request_delay = 0.1 # 请求间隔(秒),避免频率限制 self._semaphore = asyncio.Semaphore(10) # 限制并发请求数 + self._price_cache: Dict[str, Dict] = {} # WebSocket价格缓存 {symbol: {price, volume, changePercent, timestamp}} + self._price_cache_ttl = 60 # 价格缓存有效期(秒) async def connect(self, timeout: int = None, retries: int = None): """ @@ -87,6 +89,7 @@ class BinanceClient: logger.info("✓ Unicorn WebSocket管理器启动成功") # 启动异步数据处理 asyncio.create_task(self.unicorn_manager.process_stream_data()) + logger.info("WebSocket价格缓存已启用,将在订阅交易对后自动更新") else: logger.warning("Unicorn WebSocket管理器启动失败,将使用标准WebSocket") self.unicorn_manager = None @@ -223,9 +226,45 @@ class BinanceClient: logger.error(f"获取 {symbol} K线数据失败: {e}") return [] + def _on_price_update(self, symbol: str, price: float = None, price_data: Dict = None): + """ + WebSocket价格更新回调,自动更新缓存 + + Args: + symbol: 交易对 + price: 最新价格(可选) + price_data: 完整的ticker数据(可选) + """ + import time + try: + # 如果提供了price_data,优先使用它 + if price_data and isinstance(price_data, dict): + # 从price_data中提取信息 + current_price = float(price_data.get('c', price_data.get('lastPrice', price or 0))) + volume = float(price_data.get('v', price_data.get('quoteVolume', 0))) + change_percent = float(price_data.get('P', price_data.get('priceChangePercent', 0))) + elif price: + # 如果只有价格,使用价格 + current_price = price + volume = 0 + change_percent = 0 + else: + return + + self._price_cache[symbol] = { + 'symbol': symbol, + 'price': current_price, + 'volume': volume, + 'changePercent': change_percent, + 'timestamp': time.time() + } + except Exception as e: + logger.debug(f"更新价格缓存失败 {symbol}: {e}") + async def get_ticker_24h(self, symbol: str) -> Optional[Dict]: """ 获取24小时行情数据(合约市场) + 优先从WebSocket缓存读取,如果缓存不可用或过期则使用REST API Args: symbol: 交易对 @@ -233,6 +272,22 @@ class BinanceClient: Returns: 24小时行情数据 """ + import time + + # 优先从WebSocket缓存读取 + if symbol in self._price_cache: + cached = self._price_cache[symbol] + cache_age = time.time() - cached.get('timestamp', 0) + if cache_age < self._price_cache_ttl: + logger.debug(f"从WebSocket缓存获取 {symbol} 价格") + return { + 'symbol': symbol, + 'price': cached['price'], + 'volume': cached.get('volume', 0), + 'changePercent': cached.get('changePercent', 0) + } + + # 如果缓存不可用或过期,使用REST API(fallback) try: ticker = await self._rate_limited_request( f'ticker_{symbol}', @@ -242,12 +297,20 @@ class BinanceClient: f'stats_{symbol}', self.client.futures_ticker(symbol=symbol) ) - return { + result = { 'symbol': symbol, 'price': float(ticker['price']), 'volume': float(stats.get('quoteVolume', 0)), 'changePercent': float(stats.get('priceChangePercent', 0)) } + # 更新缓存 + if self.unicorn_manager: + import time + self._price_cache[symbol] = { + **result, + 'timestamp': time.time() + } + return result except BinanceAPIException as e: error_code = e.code if hasattr(e, 'code') else None if error_code == -1003: @@ -599,13 +662,14 @@ class BinanceClient: logger.error(f"设置杠杆失败: {e}") return False - def subscribe_realtime_prices(self, symbols: List[str], callback) -> bool: + def subscribe_realtime_prices(self, symbols: List[str], callback = None) -> bool: """ 订阅实时价格流(使用Unicorn) Args: symbols: 交易对列表 - callback: 价格更新回调函数 callback(symbol, price, price_data) + callback: 可选的价格更新回调函数 callback(symbol, price, price_data) + 如果不提供,将使用默认的缓存更新回调 Returns: 是否成功 @@ -615,6 +679,10 @@ class BinanceClient: return False try: + # 如果没有提供回调,使用默认的缓存更新回调 + if callback is None: + callback = self._on_price_update + self.unicorn_manager.subscribe_ticker(symbols, callback) logger.info(f"订阅 {len(symbols)} 个交易对的实时价格流") return True diff --git a/trading_system/main.py b/trading_system/main.py index ad852f9..36a6c17 100644 --- a/trading_system/main.py +++ b/trading_system/main.py @@ -145,7 +145,25 @@ async def main(): logger.error("请先充值到合约账户") return - # 3. 初始化各个模块 + # 3. 订阅所有USDT交易对的实时价格流(WebSocket优化) + if client.unicorn_manager: + logger.info("订阅所有USDT交易对的实时价格流(WebSocket)...") + try: + all_pairs = await client.get_all_usdt_pairs() + if all_pairs: + # 订阅前100个最活跃的交易对(避免订阅过多) + # 可以根据需要调整数量 + max_subscribe = 100 + pairs_to_subscribe = all_pairs[:max_subscribe] + if client.subscribe_realtime_prices(pairs_to_subscribe): + logger.info(f"✓ 已订阅 {len(pairs_to_subscribe)} 个交易对的实时价格流") + logger.info("价格数据将通过WebSocket实时更新,减少REST API调用") + else: + logger.warning("订阅实时价格流失败,将使用REST API") + except Exception as e: + logger.warning(f"订阅实时价格流时出错: {e},将使用REST API作为fallback") + + # 4. 初始化各个模块 logger.info("初始化交易模块...") scanner = MarketScanner(client) risk_manager = RiskManager(client) diff --git a/trading_system/unicorn_websocket.py b/trading_system/unicorn_websocket.py index c360261..b4b4607 100644 --- a/trading_system/unicorn_websocket.py +++ b/trading_system/unicorn_websocket.py @@ -261,18 +261,24 @@ class UnicornWebSocketManager: # 处理ticker数据 if 'event_type' in data and data['event_type'] == '24hrTicker': price_data = data.get('data', {}) - if 'c' in price_data: # 最新价格 - price = float(price_data['c']) - # 调用所有注册的回调 - if symbol in self.price_callbacks: - for callback in self.price_callbacks[symbol]: - try: - if asyncio.iscoroutinefunction(callback): - await callback(symbol, price, price_data) - else: - callback(symbol, price, price_data) - except Exception as e: - logger.debug(f"回调函数执行失败: {e}") + if price_data: + # 提取交易对符号(从stream名称或数据中) + stream_symbol = symbol + if not stream_symbol and 's' in price_data: + stream_symbol = price_data['s'] # 交易对符号 + + if stream_symbol: + price = float(price_data.get('c', price_data.get('lastPrice', 0))) + # 调用所有注册的回调 + if stream_symbol in self.price_callbacks: + for callback in self.price_callbacks[stream_symbol]: + try: + if asyncio.iscoroutinefunction(callback): + await callback(stream_symbol, price, price_data) + else: + callback(stream_symbol, price, price_data) + except Exception as e: + logger.debug(f"回调函数执行失败: {e}") # 处理K线数据 elif 'event_type' in data and data['event_type'] == 'kline':