From 5b9a8fd917ad5d6c73f32e0aceb20c43af60dd2c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=96=87=E8=96=87=E5=AE=89?= Date: Wed, 14 Jan 2026 00:00:24 +0800 Subject: [PATCH] a --- trading_system/binance_client.py | 83 ++++++++++++++++++++++++++++++-- trading_system/market_scanner.py | 39 +++++++++++++-- 2 files changed, 112 insertions(+), 10 deletions(-) diff --git a/trading_system/binance_client.py b/trading_system/binance_client.py index 79a32fd..56fdfb7 100644 --- a/trading_system/binance_client.py +++ b/trading_system/binance_client.py @@ -39,6 +39,9 @@ class BinanceClient: self.unicorn_manager: Optional[UnicornWebSocketManager] = None self.use_unicorn = config.TRADING_CONFIG.get('USE_UNICORN_WEBSOCKET', True) self._symbol_info_cache: Dict[str, Dict] = {} # 缓存交易对信息 + self._last_request_time = {} # 记录每个API端点的最后请求时间 + self._request_delay = 0.1 # 请求间隔(秒),避免频率限制 + self._semaphore = asyncio.Semaphore(10) # 限制并发请求数 async def connect(self, timeout: int = None, retries: int = None): """ @@ -176,6 +179,24 @@ class BinanceClient: logger.error(f"获取交易对失败: {e}") return [] + async def _rate_limited_request(self, endpoint: str, coro): + """ + 带速率限制的API请求 + + Args: + endpoint: API端点标识(用于记录请求时间) + coro: 异步协程 + """ + async with self._semaphore: + # 检查是否需要等待(避免请求过快) + if endpoint in self._last_request_time: + elapsed = asyncio.get_event_loop().time() - self._last_request_time[endpoint] + if elapsed < self._request_delay: + await asyncio.sleep(self._request_delay - elapsed) + + self._last_request_time[endpoint] = asyncio.get_event_loop().time() + return await coro + async def get_klines(self, symbol: str, interval: str = '5m', limit: int = 2) -> List[List]: """ 获取K线数据(合约市场) @@ -189,10 +210,17 @@ class BinanceClient: K线数据列表 """ try: - klines = await self.client.futures_klines(symbol=symbol, interval=interval, limit=limit) + klines = await self._rate_limited_request( + f'klines_{symbol}_{interval}', + self.client.futures_klines(symbol=symbol, interval=interval, limit=limit) + ) return klines except BinanceAPIException as e: - logger.error(f"获取 {symbol} K线数据失败: {e}") + error_code = e.code if hasattr(e, 'code') else None + if error_code == -1003: + logger.warning(f"获取 {symbol} K线数据失败: API请求频率过高,建议使用WebSocket或增加扫描间隔") + else: + logger.error(f"获取 {symbol} K线数据失败: {e}") return [] async def get_ticker_24h(self, symbol: str) -> Optional[Dict]: @@ -206,8 +234,14 @@ class BinanceClient: 24小时行情数据 """ try: - ticker = await self.client.futures_symbol_ticker(symbol=symbol) - stats = await self.client.futures_ticker(symbol=symbol) + ticker = await self._rate_limited_request( + f'ticker_{symbol}', + self.client.futures_symbol_ticker(symbol=symbol) + ) + stats = await self._rate_limited_request( + f'stats_{symbol}', + self.client.futures_ticker(symbol=symbol) + ) return { 'symbol': symbol, 'price': float(ticker['price']), @@ -215,9 +249,48 @@ class BinanceClient: 'changePercent': float(stats.get('priceChangePercent', 0)) } except BinanceAPIException as e: - logger.error(f"获取 {symbol} 24小时行情失败: {e}") + error_code = e.code if hasattr(e, 'code') else None + if error_code == -1003: + logger.warning(f"获取 {symbol} 24小时行情失败: API请求频率过高,建议使用WebSocket或增加扫描间隔") + else: + logger.error(f"获取 {symbol} 24小时行情失败: {e}") return None + async def get_all_tickers_24h(self) -> Dict[str, Dict]: + """ + 批量获取所有交易对的24小时行情数据(更高效) + + Returns: + 交易对行情数据字典 {symbol: {price, volume, changePercent}} + """ + try: + # 使用批量API,一次获取所有交易对的数据 + tickers = await self._rate_limited_request( + 'all_tickers', + self.client.futures_ticker() + ) + + result = {} + for ticker in tickers: + symbol = ticker['symbol'] + if symbol.endswith('USDT'): + result[symbol] = { + 'symbol': symbol, + 'price': float(ticker.get('lastPrice', 0)), + 'volume': float(ticker.get('quoteVolume', 0)), + 'changePercent': float(ticker.get('priceChangePercent', 0)) + } + + logger.debug(f"批量获取到 {len(result)} 个交易对的24小时行情数据") + return result + except BinanceAPIException as e: + error_code = e.code if hasattr(e, 'code') else None + if error_code == -1003: + logger.warning(f"批量获取24小时行情失败: API请求频率过高,建议使用WebSocket或增加扫描间隔") + else: + logger.error(f"批量获取24小时行情失败: {e}") + return {} + async def get_account_balance(self) -> Dict[str, float]: """ 获取U本位合约账户余额 diff --git a/trading_system/market_scanner.py b/trading_system/market_scanner.py index c746223..8599174 100644 --- a/trading_system/market_scanner.py +++ b/trading_system/market_scanner.py @@ -46,8 +46,32 @@ class MarketScanner: logger.warning("未获取到交易对") return [] - # 并发获取所有交易对的涨跌幅数据 - tasks = [self._get_symbol_change(symbol) for symbol in symbols] + # 先批量获取所有交易对的24小时行情数据(减少API请求) + logger.info(f"批量获取 {len(symbols)} 个交易对的24小时行情数据...") + all_tickers = await self.client.get_all_tickers_24h() + + # 过滤最小涨跌幅和成交量,减少需要详细分析的交易对数量 + pre_filtered_symbols = [] + for symbol in symbols: + ticker = all_tickers.get(symbol) + if ticker: + change_percent = abs(ticker.get('changePercent', 0)) + volume = ticker.get('volume', 0) + if (change_percent >= config.TRADING_CONFIG['MIN_CHANGE_PERCENT'] and + volume >= config.TRADING_CONFIG['MIN_VOLUME_24H']): + pre_filtered_symbols.append(symbol) + + logger.info(f"初步筛选后,需要详细分析的交易对: {len(pre_filtered_symbols)} 个") + + # 只对符合条件的交易对进行详细分析(获取K线和技术指标) + # 限制并发数量,避免请求过快 + semaphore = asyncio.Semaphore(5) # 最多5个并发请求 + + async def get_symbol_change_with_limit(symbol): + async with semaphore: + return await self._get_symbol_change(symbol, all_tickers.get(symbol)) + + tasks = [get_symbol_change_with_limit(symbol) for symbol in pre_filtered_symbols] results = await asyncio.gather(*tasks, return_exceptions=True) # 过滤有效结果 @@ -115,19 +139,24 @@ class MarketScanner: return top_n - async def _get_symbol_change(self, symbol: str) -> Optional[Dict]: + async def _get_symbol_change(self, symbol: str, ticker_data: Optional[Dict] = None) -> Optional[Dict]: """ 获取单个交易对的涨跌幅和技术指标 Args: symbol: 交易对 + ticker_data: 可选的24小时行情数据(如果已批量获取) Returns: 包含涨跌幅和技术指标信息的字典 """ try: - # 获取24小时行情数据 - ticker = await self.client.get_ticker_24h(symbol) + # 如果已有批量获取的数据,直接使用;否则单独获取 + if ticker_data: + ticker = ticker_data + else: + ticker = await self.client.get_ticker_24h(symbol) + if not ticker: return None