""" 市场扫描器 - 发现涨跌幅最大的前N个货币对,并分析技术指标 """ import asyncio import logging from typing import List, Dict, Optional try: from .binance_client import BinanceClient from .indicators import TechnicalIndicators from . import config except ImportError: from binance_client import BinanceClient from indicators import TechnicalIndicators import config logger = logging.getLogger(__name__) class MarketScanner: """市场扫描器类""" def __init__(self, client: BinanceClient): """ 初始化市场扫描器 Args: client: 币安客户端 """ self.client = client self.top_symbols: List[Dict] = [] async def scan_market(self) -> List[Dict]: """ 扫描市场,找出涨跌幅最大的前N个货币对 Returns: 前N个货币对列表,包含涨跌幅信息 """ import time self._scan_start_time = time.time() logger.info("开始扫描市场...") # 获取所有USDT交易对 all_symbols = await self.client.get_all_usdt_pairs() if not all_symbols: logger.warning("未获取到交易对") return [] # 根据配置限制扫描的交易对数量 max_scan_symbols = config.TRADING_CONFIG.get('MAX_SCAN_SYMBOLS', 500) if max_scan_symbols > 0 and max_scan_symbols < len(all_symbols): symbols = all_symbols[:max_scan_symbols] logger.info(f"限制扫描数量: {len(symbols)}/{len(all_symbols)} 个交易对(配置: MAX_SCAN_SYMBOLS={max_scan_symbols})") else: symbols = all_symbols logger.info(f"扫描所有 {len(symbols)} 个USDT交易对") # 先批量获取所有交易对的24小时行情数据(减少API请求) logger.info(f"批量获取 {len(symbols)} 个交易对的24小时行情数据...") all_tickers = await self.client.get_all_tickers_24h() # 过滤最小涨跌幅和成交量,减少需要详细分析的交易对数量 pre_filtered_symbols = [] for symbol in symbols: ticker = all_tickers.get(symbol) if ticker: change_percent = abs(ticker.get('changePercent', 0)) volume = ticker.get('volume', 0) if (change_percent >= config.TRADING_CONFIG['MIN_CHANGE_PERCENT'] and volume >= config.TRADING_CONFIG['MIN_VOLUME_24H']): pre_filtered_symbols.append(symbol) logger.info(f"初步筛选后,需要详细分析的交易对: {len(pre_filtered_symbols)} 个") # 只对符合条件的交易对进行详细分析(获取K线和技术指标) # 限制并发数量,避免请求过快 semaphore = asyncio.Semaphore(5) # 最多5个并发请求 async def get_symbol_change_with_limit(symbol): async with semaphore: return await self._get_symbol_change(symbol, all_tickers.get(symbol)) tasks = [get_symbol_change_with_limit(symbol) for symbol in pre_filtered_symbols] results = await asyncio.gather(*tasks, return_exceptions=True) # 过滤有效结果 valid_results = [ r for r in results if isinstance(r, dict) and r.get('changePercent') is not None ] # 过滤最小涨跌幅和成交量 filtered_results = [ r for r in valid_results if abs(r['changePercent']) >= config.TRADING_CONFIG['MIN_CHANGE_PERCENT'] and r.get('volume24h', 0) >= config.TRADING_CONFIG['MIN_VOLUME_24H'] ] # 按信号得分和涨跌幅综合排序,取前N个 # 优先考虑技术指标信号得分高的 sorted_results = sorted( filtered_results, key=lambda x: ( x.get('signalScore', 0) * 10, # 信号得分权重更高 abs(x['changePercent']) # 其次考虑涨跌幅 ), reverse=True ) top_n = sorted_results[:config.TRADING_CONFIG['TOP_N_SYMBOLS']] self.top_symbols = top_n # 记录扫描结果到数据库 try: import sys from pathlib import Path project_root = Path(__file__).parent.parent backend_path = project_root / 'backend' if backend_path.exists(): sys.path.insert(0, str(backend_path)) from database.models import MarketScan import time scan_duration = time.time() - (getattr(self, '_scan_start_time', time.time())) MarketScan.create( symbols_scanned=len(symbols), symbols_found=len(top_n), top_symbols=[s['symbol'] for s in top_n], scan_duration=scan_duration ) except Exception as e: logger.debug(f"记录扫描结果失败: {e}") logger.info(f"扫描完成,找到 {len(top_n)} 个符合条件的交易对") # 打印结果(包含技术指标) for i, symbol_info in enumerate(top_n, 1): rsi_str = f"RSI:{symbol_info.get('rsi', 0):.1f}" if symbol_info.get('rsi') else "RSI:N/A" regime_str = symbol_info.get('marketRegime', 'unknown') score_str = f"信号:{symbol_info.get('signalScore', 0)}" logger.info( f"{i}. {symbol_info['symbol']}: " f"{symbol_info['changePercent']:.2f}% | " f"{rsi_str} | {regime_str} | {score_str} | " f"价格: {symbol_info['price']:.4f}" ) return top_n async def _get_symbol_change(self, symbol: str, ticker_data: Optional[Dict] = None) -> Optional[Dict]: """ 获取单个交易对的涨跌幅和技术指标 Args: symbol: 交易对 ticker_data: 可选的24小时行情数据(如果已批量获取) Returns: 包含涨跌幅和技术指标信息的字典 """ try: # 如果已有批量获取的数据,直接使用;否则单独获取 if ticker_data: ticker = ticker_data else: ticker = await self.client.get_ticker_24h(symbol) if not ticker: return None # 获取更多K线数据用于技术指标计算(使用配置的主周期) primary_interval = config.TRADING_CONFIG.get('PRIMARY_INTERVAL', '1h') klines = await self.client.get_klines( symbol=symbol, interval=primary_interval, limit=50 # 获取更多数据用于计算指标 ) if len(klines) < 2: return None # 提取价格数据 close_prices = [float(k[4]) for k in klines] # 收盘价 high_prices = [float(k[2]) for k in klines] # 最高价 low_prices = [float(k[3]) for k in klines] # 最低价 # 计算涨跌幅(基于主周期) current_price = close_prices[-1] prev_price = close_prices[-2] if len(close_prices) >= 2 else close_prices[0] if prev_price == 0: return None change_percent = ((current_price - prev_price) / prev_price) * 100 # 计算技术指标 rsi = TechnicalIndicators.calculate_rsi(close_prices, period=14) macd = TechnicalIndicators.calculate_macd(close_prices) bollinger = TechnicalIndicators.calculate_bollinger_bands(close_prices, period=20) atr = TechnicalIndicators.calculate_atr(high_prices, low_prices, close_prices, period=14) ema20 = TechnicalIndicators.calculate_ema(close_prices, period=20) ema50 = TechnicalIndicators.calculate_ema(close_prices, period=50) # 判断市场状态 market_regime = TechnicalIndicators.detect_market_regime(close_prices) # 计算交易信号得分(用于排序) signal_score = 0 # RSI信号(均值回归) if rsi is not None: if rsi < 30: # 超卖,做多信号 signal_score += 3 elif rsi > 70: # 超买,做空信号 signal_score += 3 elif 30 <= rsi <= 70: # 中性区域 signal_score += 1 # MACD信号 if macd and macd['histogram'] is not None: if macd['histogram'] > 0 and macd['macd'] > macd['signal']: # 看涨 signal_score += 2 elif macd['histogram'] < 0 and macd['macd'] < macd['signal']: # 看跌 signal_score += 2 # 布林带信号(均值回归) if bollinger: if current_price <= bollinger['lower']: # 触及下轨,做多 signal_score += 3 elif current_price >= bollinger['upper']: # 触及上轨,做空 signal_score += 3 elif bollinger['lower'] < current_price < bollinger['upper']: signal_score += 1 # 均线信号 if ema20 and ema50: if current_price > ema20 > ema50: # 上升趋势 signal_score += 1 elif current_price < ema20 < ema50: # 下降趋势 signal_score += 1 return { 'symbol': symbol, 'price': current_price, 'prevPrice': prev_price, 'changePercent': change_percent, 'volume24h': ticker.get('volume', 0), 'direction': 'UP' if change_percent > 0 else 'DOWN', 'rsi': rsi, 'macd': macd, 'bollinger': bollinger, 'atr': atr, 'ema20': ema20, 'ema50': ema50, 'marketRegime': market_regime, 'signalScore': signal_score, 'klines': klines[-10:] # 保留最近10根K线 } except Exception as e: logger.debug(f"获取 {symbol} 数据失败: {e}") return None def get_top_symbols(self) -> List[Dict]: """ 获取当前扫描到的前N个货币对 Returns: 前N个货币对列表 """ return self.top_symbols async def monitor_price(self, symbol: str, callback) -> None: """ 监控单个交易对的价格变化(WebSocket) Args: symbol: 交易对 callback: 价格变化回调函数 """ # 使用标准WebSocket try: if self.client and self.client.client: # 对于 AsyncClient,直接使用 client 的 futures_socket 方法 # 单个交易对的 ticker 流路径格式:f"{symbol.lower()}@ticker" ws_path = f"{symbol.lower()}@ticker" async with self.client.client.futures_socket(ws_path) as stream: async for msg in stream: try: # WebSocket 返回的数据格式:{'e': '24hrTicker', 's': 'BTCUSDT', 'c': '50000.00', ...} if 'c' in msg: # 'c' 是当前价格 price = float(msg['c']) await callback(symbol, price) elif 'data' in msg and 'c' in msg['data']: price = float(msg['data']['c']) await callback(symbol, price) except (KeyError, ValueError, TypeError) as e: logger.debug(f"解析 {symbol} 价格数据失败: {e}") continue except Exception as e: logger.error(f"监控 {symbol} 价格失败: {e}") def get_realtime_price(self, symbol: str) -> Optional[float]: """ 获取实时价格(从缓存) Args: symbol: 交易对 Returns: 实时价格 """ if self.client: return self.client.get_realtime_price(symbol) return None