""" 市场扫描器 - 发现涨跌幅最大的前N个货币对 """ import asyncio import logging from typing import List, Dict, Optional from binance_client import BinanceClient import config logger = logging.getLogger(__name__) class MarketScanner: """市场扫描器类""" def __init__(self, client: BinanceClient): """ 初始化市场扫描器 Args: client: 币安客户端 """ self.client = client self.top_symbols: List[Dict] = [] async def scan_market(self) -> List[Dict]: """ 扫描市场,找出涨跌幅最大的前N个货币对 Returns: 前N个货币对列表,包含涨跌幅信息 """ logger.info("开始扫描市场...") # 获取所有USDT交易对 symbols = await self.client.get_all_usdt_pairs() if not symbols: logger.warning("未获取到交易对") return [] # 并发获取所有交易对的涨跌幅数据 tasks = [self._get_symbol_change(symbol) for symbol in symbols] results = await asyncio.gather(*tasks, return_exceptions=True) # 过滤有效结果 valid_results = [ r for r in results if isinstance(r, dict) and r.get('changePercent') is not None ] # 过滤最小涨跌幅和成交量 filtered_results = [ r for r in valid_results if abs(r['changePercent']) >= config.TRADING_CONFIG['MIN_CHANGE_PERCENT'] and r.get('volume24h', 0) >= config.TRADING_CONFIG['MIN_VOLUME_24H'] ] # 按涨跌幅绝对值排序,取前N个 sorted_results = sorted( filtered_results, key=lambda x: abs(x['changePercent']), reverse=True ) top_n = sorted_results[:config.TRADING_CONFIG['TOP_N_SYMBOLS']] self.top_symbols = top_n logger.info(f"扫描完成,找到 {len(top_n)} 个符合条件的交易对") # 打印结果 for i, symbol_info in enumerate(top_n, 1): logger.info( f"{i}. {symbol_info['symbol']}: " f"{symbol_info['changePercent']:.2f}% " f"(价格: {symbol_info['price']:.4f}, " f"成交量: {symbol_info.get('volume24h', 0):.0f})" ) return top_n async def _get_symbol_change(self, symbol: str) -> Optional[Dict]: """ 获取单个交易对的涨跌幅 Args: symbol: 交易对 Returns: 包含涨跌幅信息的字典 """ try: # 获取24小时行情数据 ticker = await self.client.get_ticker_24h(symbol) if not ticker: return None # 获取5分钟K线数据计算精确涨跌幅 klines = await self.client.get_klines( symbol=symbol, interval=config.TRADING_CONFIG['KLINE_INTERVAL'], limit=2 ) if len(klines) < 2: return None # 计算5分钟涨跌幅 current_price = float(klines[-1][4]) # 最新收盘价 prev_price = float(klines[-2][4]) # 5分钟前收盘价 if prev_price == 0: return None change_percent = ((current_price - prev_price) / prev_price) * 100 return { 'symbol': symbol, 'price': current_price, 'prevPrice': prev_price, 'changePercent': change_percent, 'volume24h': ticker.get('volume', 0), 'direction': 'UP' if change_percent > 0 else 'DOWN' } except Exception as e: logger.debug(f"获取 {symbol} 涨跌幅失败: {e}") return None def get_top_symbols(self) -> List[Dict]: """ 获取当前扫描到的前N个货币对 Returns: 前N个货币对列表 """ return self.top_symbols async def monitor_price(self, symbol: str, callback) -> None: """ 监控单个交易对的价格变化(WebSocket) Args: symbol: 交易对 callback: 价格变化回调函数 """ try: async with self.client.socket_manager.futures_socket(symbol.lower()) as stream: async for msg in stream: if 'data' in msg: price = float(msg['data']['c']) # 最新价格 await callback(symbol, price) except Exception as e: logger.error(f"监控 {symbol} 价格失败: {e}")