""" 市场扫描器 - 发现涨跌幅最大的前N个货币对,并分析技术指标 """ import asyncio import logging from typing import List, Dict, Optional try: from .binance_client import BinanceClient from .indicators import TechnicalIndicators from . import config except ImportError: from binance_client import BinanceClient from indicators import TechnicalIndicators import config logger = logging.getLogger(__name__) class MarketScanner: """市场扫描器类""" def __init__(self, client: BinanceClient): """ 初始化市场扫描器 Args: client: 币安客户端 """ self.client = client self.top_symbols: List[Dict] = [] async def scan_market(self, cache_namespace: str = "trade", config_override: Optional[Dict] = None) -> List[Dict]: """ 扫描市场,找出涨跌幅最大的前N个货币对 优先从 Redis 缓存读取扫描结果,如果缓存不可用或过期则重新扫描 Returns: 前N个货币对列表,包含涨跌幅信息 """ import time self._scan_start_time = time.time() # 允许“推荐进程”和“交易进程”使用不同的扫描参数/缓存命名空间,互不干扰 cfg = dict(config.TRADING_CONFIG or {}) if config_override and isinstance(config_override, dict): cfg.update(config_override) ns = (cache_namespace or "trade").strip() or "trade" # ⚠️ 已禁用扫描结果缓存,确保每个账户都使用最新的市场数据 # 虽然中间数据(K线、技术指标)已经缓存,但最终扫描结果不缓存 # 这样可以避免使用过期的交易对,确保每个账户都基于最新市场数据扫描 logger.info("开始扫描市场...") # 获取所有USDT交易对 all_symbols = await self.client.get_all_usdt_pairs() if not all_symbols: logger.warning("未获取到交易对") return [] # 根据配置限制扫描的交易对数量 max_scan_symbols = cfg.get('MAX_SCAN_SYMBOLS', 500) if max_scan_symbols > 0 and max_scan_symbols < len(all_symbols): symbols = all_symbols[:max_scan_symbols] logger.info(f"限制扫描数量: {len(symbols)}/{len(all_symbols)} 个交易对(配置: MAX_SCAN_SYMBOLS={max_scan_symbols})") else: symbols = all_symbols logger.info(f"扫描所有 {len(symbols)} 个USDT交易对") # 过滤主流币(山寨币策略应该排除主流币) exclude_major_coins = cfg.get('EXCLUDE_MAJOR_COINS', True) if exclude_major_coins: # 主流币列表(市值排名前15的主流币) major_coins = { 'BTCUSDT', 'ETHUSDT', 'BNBUSDT', 'SOLUSDT', 'XRPUSDT', 'ADAUSDT', 'DOGEUSDT', 'DOTUSDT', 'AVAXUSDT', 'MATICUSDT', 'LINKUSDT', 'UNIUSDT', 'ATOMUSDT', 'ETCUSDT', 'LTCUSDT', 'NEARUSDT', 'APTUSDT', 'ARBUSDT', 'OPUSDT', 'SUIUSDT' } symbols_before = len(symbols) symbols = [s for s in symbols if s not in major_coins] excluded_count = symbols_before - len(symbols) if excluded_count > 0: logger.info(f"排除主流币 {excluded_count} 个,剩余 {len(symbols)} 个交易对(专注于山寨币)") # 先批量获取所有交易对的24小时行情数据(减少API请求) logger.info(f"批量获取 {len(symbols)} 个交易对的24小时行情数据...") all_tickers = await self.client.get_all_tickers_24h() # 过滤最小涨跌幅和成交量,减少需要详细分析的交易对数量 pre_filtered_symbols = [] for symbol in symbols: ticker = all_tickers.get(symbol) if ticker: change_percent = abs(ticker.get('changePercent', 0)) volume = ticker.get('volume', 0) if (change_percent >= cfg.get('MIN_CHANGE_PERCENT', config.TRADING_CONFIG['MIN_CHANGE_PERCENT']) and volume >= cfg.get('MIN_VOLUME_24H', config.TRADING_CONFIG['MIN_VOLUME_24H'])): pre_filtered_symbols.append(symbol) logger.info(f"初步筛选后,需要详细分析的交易对: {len(pre_filtered_symbols)} 个") # 只对符合条件的交易对进行详细分析(获取K线和技术指标) # ⚠️ 并发数说明: # - 这是单个账户扫描时,同时分析多少个交易对(不是用户进程数) # - 并发数5:单用户时扫描更快(15-25秒) # - 并发数3:多用户时系统更稳定(4个账户最多12个并发请求) # - 如果只有一个账户,建议保持5;如果后续增加用户,可以降低到3 # - 由于中间数据(K线、技术指标)已经缓存,实际API请求会大大减少 semaphore = asyncio.Semaphore(3) # 最多5个并发请求(单用户建议5,多用户建议3) async def get_symbol_change_with_limit(symbol): async with semaphore: try: # 添加超时控制,确保单个交易对的分析不会无限期阻塞 return await asyncio.wait_for( self._get_symbol_change(symbol, all_tickers.get(symbol)), timeout=10.0 # 单个交易对分析超时10秒 ) except asyncio.TimeoutError: logger.warning(f"{symbol} 分析超时(10秒),跳过") return None except Exception as e: logger.debug(f"{symbol} 分析出错: {e}") return None tasks = [get_symbol_change_with_limit(symbol) for symbol in pre_filtered_symbols] results = await asyncio.gather(*tasks, return_exceptions=True) # 过滤有效结果 valid_results = [ r for r in results if isinstance(r, dict) and r.get('changePercent') is not None ] # ⚠️ 优化2:成交量验证 - 24H Volume低于1000万美金,直接剔除 min_volume_strict = cfg.get('MIN_VOLUME_24H_STRICT', 10000000) # 默认1000万美金 min_volume_normal = cfg.get('MIN_VOLUME_24H', config.TRADING_CONFIG['MIN_VOLUME_24H']) # 使用更严格的成交量要求 min_volume = max(min_volume_strict, min_volume_normal) # 过滤最小涨跌幅和成交量 filtered_results = [ r for r in valid_results if abs(r['changePercent']) >= cfg.get('MIN_CHANGE_PERCENT', config.TRADING_CONFIG['MIN_CHANGE_PERCENT']) and r.get('volume24h', 0) >= min_volume ] if min_volume_strict > min_volume_normal: logger.info(f"使用严格成交量过滤: {min_volume_strict/1000000:.1f}M USDT (原标准: {min_volume_normal/1000000:.1f}M USDT)") # 按信号得分和涨跌幅综合排序,取前N个 # 优先考虑技术指标信号得分高的 sorted_results = sorted( filtered_results, key=lambda x: ( x.get('signalScore', 0) * 10, # 信号得分权重更高 abs(x['changePercent']) # 其次考虑涨跌幅 ), reverse=True ) top_n = sorted_results[:cfg.get('TOP_N_SYMBOLS', config.TRADING_CONFIG['TOP_N_SYMBOLS'])] self.top_symbols = top_n # ⚠️ 已禁用扫描结果缓存,确保每个账户都使用最新的市场数据 # 虽然中间数据(K线、技术指标)已经缓存,但最终扫描结果不缓存 # 这样可以避免使用过期的交易对,确保每个账户都基于最新市场数据扫描 # 记录扫描性能(用于监控多用户时的系统压力) scan_duration = time.time() - self._scan_start_time logger.info(f"扫描完成,找到 {len(top_n)} 个符合条件的交易对,耗时 {scan_duration:.2f}秒") if scan_duration > 60: logger.warning(f"⚠️ 扫描耗时较长({scan_duration:.2f}秒),可能影响系统性能,建议检查缓存命中率") # 记录扫描结果到数据库 try: import sys from pathlib import Path project_root = Path(__file__).parent.parent backend_path = project_root / 'backend' if backend_path.exists(): sys.path.insert(0, str(backend_path)) from database.models import MarketScan MarketScan.create( symbols_scanned=len(symbols), symbols_found=len(top_n), top_symbols=[s['symbol'] for s in top_n], scan_duration=scan_duration ) except Exception as e: logger.debug(f"记录扫描结果失败: {e}") logger.info(f"扫描完成,找到 {len(top_n)} 个符合条件的交易对") # 打印结果(包含技术指标) for i, symbol_info in enumerate(top_n, 1): rsi_str = f"RSI:{symbol_info.get('rsi', 0):.1f}" if symbol_info.get('rsi') else "RSI:N/A" regime_str = symbol_info.get('marketRegime', 'unknown') score_str = f"信号:{symbol_info.get('signalScore', 0)}" logger.info( f"{i}. {symbol_info['symbol']}: " f"{symbol_info['changePercent']:.2f}% | " f"{rsi_str} | {regime_str} | {score_str} | " f"价格: {symbol_info['price']:.4f}" ) return top_n async def _get_symbol_change(self, symbol: str, ticker_data: Optional[Dict] = None) -> Optional[Dict]: """ 获取单个交易对的涨跌幅和技术指标 Args: symbol: 交易对 ticker_data: 可选的24小时行情数据(如果已批量获取) Returns: 包含涨跌幅和技术指标信息的字典 """ try: # 如果已有批量获取的数据,直接使用;否则单独获取 if ticker_data: ticker = ticker_data else: ticker = await self.client.get_ticker_24h(symbol) if not ticker: return None # 24h ticker 的“最新价”(更接近用户理解的“当前价格”) # 注意:技术指标仍然基于 K 线收盘价计算;这里额外携带一份展示用价格与时间戳 ticker_price = ticker.get('price') try: ticker_price = float(ticker_price) if ticker_price is not None else None except Exception: ticker_price = None ticker_ts = ticker.get('ts') try: ticker_ts = int(ticker_ts) if ticker_ts is not None else None except Exception: ticker_ts = None # 获取更多K线数据用于技术指标计算(使用配置的主周期) primary_interval = config.TRADING_CONFIG.get('PRIMARY_INTERVAL', '1h') klines = await self.client.get_klines( symbol=symbol, interval=primary_interval, limit=50 # 获取更多数据用于计算指标 ) if len(klines) < 2: return None # 获取4H周期数据用于多周期共振检查 confirm_interval = config.TRADING_CONFIG.get('CONFIRM_INTERVAL', '4h') klines_4h = await self.client.get_klines( symbol=symbol, interval=confirm_interval, limit=50 # 获取4H周期数据 ) # 提取价格数据(主周期) close_prices = [float(k[4]) for k in klines] # 收盘价 high_prices = [float(k[2]) for k in klines] # 最高价 low_prices = [float(k[3]) for k in klines] # 最低价 # 提取4H周期价格数据 close_prices_4h = [float(k[4]) for k in klines_4h] if len(klines_4h) >= 2 else close_prices # 计算涨跌幅(基于主周期) current_price = close_prices[-1] prev_price = close_prices[-2] if len(close_prices) >= 2 else close_prices[0] if prev_price == 0: return None change_percent = ((current_price - prev_price) / prev_price) * 100 # ⚠️ 优化:检查技术指标计算结果缓存(基于K线数据的最后更新时间) # 如果K线数据没有更新,可以直接使用缓存的技术指标 primary_interval = config.TRADING_CONFIG.get('PRIMARY_INTERVAL', '1h') confirm_interval = config.TRADING_CONFIG.get('CONFIRM_INTERVAL', '4h') cache_key_indicators = f"indicators:{symbol}:{primary_interval}:{confirm_interval}" last_kline_time = int(klines[-1][0]) if klines else 0 # 最后一根K线的时间戳 # 尝试从缓存获取技术指标计算结果 cached_indicators = None try: cached_indicators = await self.client.redis_cache.get(cache_key_indicators) except Exception: pass use_cached_indicators = False if cached_indicators and cached_indicators.get('last_kline_time') == last_kline_time: # 缓存命中,使用缓存的技术指标 use_cached_indicators = True logger.debug(f"{symbol} 使用缓存的技术指标计算结果") rsi = cached_indicators.get('rsi') macd = cached_indicators.get('macd') bollinger = cached_indicators.get('bollinger') atr = cached_indicators.get('atr') ema20 = cached_indicators.get('ema20') ema50 = cached_indicators.get('ema50') ema20_4h = cached_indicators.get('ema20_4h') market_regime = cached_indicators.get('marketRegime') else: # 缓存未命中,重新计算技术指标 rsi = TechnicalIndicators.calculate_rsi(close_prices, period=14) macd = TechnicalIndicators.calculate_macd(close_prices) bollinger = TechnicalIndicators.calculate_bollinger_bands(close_prices, period=20) atr_period = int(config.TRADING_CONFIG.get('ATR_PERIOD', 14) or 14) atr = TechnicalIndicators.calculate_atr(high_prices, low_prices, close_prices, period=atr_period) ema20 = TechnicalIndicators.calculate_ema(close_prices, period=20) ema50 = TechnicalIndicators.calculate_ema(close_prices, period=50) # 计算4H周期的EMA20用于多周期共振检查 ema20_4h = TechnicalIndicators.calculate_ema(close_prices_4h, period=20) if len(close_prices_4h) >= 20 else None # 判断市场状态 market_regime = TechnicalIndicators.detect_market_regime(close_prices) # 保存技术指标计算结果到缓存(TTL: 30秒,与K线缓存一致) try: indicators_cache = { 'last_kline_time': last_kline_time, 'rsi': rsi, 'macd': macd, 'bollinger': bollinger, 'atr': atr, 'ema20': ema20, 'ema50': ema50, 'ema20_4h': ema20_4h, 'marketRegime': market_regime, } await self.client.redis_cache.set(cache_key_indicators, indicators_cache, ttl=30) logger.debug(f"{symbol} 技术指标计算结果已缓存 (TTL: 30秒)") except Exception as e: logger.debug(f"{symbol} 缓存技术指标计算结果失败: {e}") # 计算交易信号得分(用于排序) signal_score = 0 # RSI信号(均值回归) if rsi is not None: if rsi < 30: # 超卖,做多信号 signal_score += 3 elif rsi > 70: # 超买,做空信号 signal_score += 3 elif 30 <= rsi <= 70: # 中性区域 signal_score += 1 # MACD信号 if macd and macd['histogram'] is not None: if macd['histogram'] > 0 and macd['macd'] > macd['signal']: # 看涨 signal_score += 2 elif macd['histogram'] < 0 and macd['macd'] < macd['signal']: # 看跌 signal_score += 2 # 布林带信号(均值回归) if bollinger: if current_price <= bollinger['lower']: # 触及下轨,做多 signal_score += 3 elif current_price >= bollinger['upper']: # 触及上轨,做空 signal_score += 3 elif bollinger['lower'] < current_price < bollinger['upper']: signal_score += 1 # 均线信号 if ema20 and ema50: if current_price > ema20 > ema50: # 上升趋势 signal_score += 1 elif current_price < ema20 < ema50: # 下降趋势 signal_score += 1 return { 'symbol': symbol, # 技术分析使用的价格(K线收盘价) 'price': current_price, 'kline_close_price': current_price, # 展示用“当前价”(24h ticker 最新价,通常更贴近用户的直觉) 'ticker_price': ticker_price, 'ticker_ts': ticker_ts, 'prevPrice': prev_price, # 1) 主周期涨跌幅(用于内部信号) 'kline_change_percent': change_percent, # 2) 24h涨跌幅(用于前端展示更符合“24h涨跌”的文案) 'changePercent': float(ticker.get('changePercent', 0) or 0), 'volume24h': ticker.get('volume', 0), 'direction': 'UP' if change_percent > 0 else 'DOWN', 'rsi': rsi, 'macd': macd, 'bollinger': bollinger, 'atr': atr, 'ema20': ema20, 'ema50': ema50, 'ema20_4h': ema20_4h, # 4H周期EMA20,用于多周期共振 'price_4h': close_prices_4h[-1] if len(close_prices_4h) > 0 else current_price, # 4H周期当前价格 'marketRegime': market_regime, 'signalScore': signal_score, 'klines': klines[-10:], # 保留最近10根K线 'klines_4h': klines_4h[-10:] if len(klines_4h) >= 10 else klines_4h # 保留最近10根4H K线 } except Exception as e: logger.debug(f"获取 {symbol} 数据失败: {e}") return None def get_top_symbols(self) -> List[Dict]: """ 获取当前扫描到的前N个货币对 Returns: 前N个货币对列表 """ return self.top_symbols async def monitor_price(self, symbol: str, callback) -> None: """ 监控单个交易对的价格变化(WebSocket) Args: symbol: 交易对 callback: 价格变化回调函数 """ # 使用标准WebSocket try: import aiohttp import json # 直接使用 aiohttp 连接 Binance 期货 WebSocket API # 根据文档:https://developers.binance.com/docs/derivatives/usds-margined-futures/websocket-market-streams # 端点:wss://fstream.binance.com/ws/@ticker ws_url = f"wss://fstream.binance.com/ws/{symbol.lower()}@ticker" async with aiohttp.ClientSession() as session: async with session.ws_connect(ws_url) as ws: async for msg in ws: if msg.type == aiohttp.WSMsgType.TEXT: try: # 解析 JSON 消息 data = json.loads(msg.data) # WebSocket 返回的数据格式:{'e': '24hrTicker', 's': 'BTCUSDT', 'c': '50000.00', ...} if isinstance(data, dict): if 'c' in data: # 'c' 是当前价格 price = float(data['c']) await callback(symbol, price) elif 'data' in data and isinstance(data['data'], dict) and 'c' in data['data']: price = float(data['data']['c']) await callback(symbol, price) except (KeyError, ValueError, TypeError, json.JSONDecodeError) as e: logger.debug(f"解析 {symbol} 价格数据失败: {e}") continue elif msg.type == aiohttp.WSMsgType.ERROR: logger.warning(f"监控 {symbol} WebSocket错误: {ws.exception()}") break elif msg.type == aiohttp.WSMsgType.CLOSE: logger.info(f"监控 {symbol} WebSocket连接关闭") break except Exception as e: logger.error(f"监控 {symbol} 价格失败: {e}") def get_realtime_price(self, symbol: str) -> Optional[float]: """ 获取实时价格(从缓存) Args: symbol: 交易对 Returns: 实时价格 """ if self.client: return self.client.get_realtime_price(symbol) return None