auto_trade_sys/trading_system/market_scanner.py
薇薇安 9490207537 a
2026-01-29 23:34:15 +08:00

592 lines
29 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
市场扫描器 - 发现涨跌幅最大的前N个货币对并分析技术指标
"""
import asyncio
import logging
from typing import List, Dict, Optional
try:
from .binance_client import BinanceClient
from .indicators import TechnicalIndicators
from . import config
except ImportError:
from binance_client import BinanceClient
from indicators import TechnicalIndicators
import config
logger = logging.getLogger(__name__)
class MarketScanner:
"""市场扫描器类"""
def __init__(self, client: BinanceClient):
"""
初始化市场扫描器
Args:
client: 币安客户端
"""
self.client = client
self.top_symbols: List[Dict] = []
async def scan_market(self, cache_namespace: str = "trade", config_override: Optional[Dict] = None) -> List[Dict]:
"""
扫描市场找出涨跌幅最大的前N个货币对
优先从 Redis 缓存读取扫描结果,如果缓存不可用或过期则重新扫描
Returns:
前N个货币对列表包含涨跌幅信息
"""
import time
self._scan_start_time = time.time()
# 允许“推荐进程”和“交易进程”使用不同的扫描参数/缓存命名空间,互不干扰
cfg = dict(config.TRADING_CONFIG or {})
if config_override and isinstance(config_override, dict):
cfg.update(config_override)
ns = (cache_namespace or "trade").strip() or "trade"
# ⚠️ 已禁用扫描结果缓存,确保每个账户都使用最新的市场数据
# 虽然中间数据K线、技术指标已经缓存但最终扫描结果不缓存
# 这样可以避免使用过期的交易对,确保每个账户都基于最新市场数据扫描
logger.info("开始扫描市场...")
# 获取所有USDT交易对
all_symbols = await self.client.get_all_usdt_pairs()
if not all_symbols:
logger.warning("未获取到交易对")
return []
# 根据配置限制扫描的交易对数量
max_scan_symbols = cfg.get('MAX_SCAN_SYMBOLS', 500)
if max_scan_symbols > 0 and max_scan_symbols < len(all_symbols):
symbols = all_symbols[:max_scan_symbols]
logger.info(f"限制扫描数量: {len(symbols)}/{len(all_symbols)} 个交易对(配置: MAX_SCAN_SYMBOLS={max_scan_symbols}")
else:
symbols = all_symbols
logger.info(f"扫描所有 {len(symbols)} 个USDT交易对")
# 过滤主流币(山寨币策略应该排除主流币)
exclude_major_coins = cfg.get('EXCLUDE_MAJOR_COINS', True)
if exclude_major_coins:
# 主流币列表市值排名前15的主流币
major_coins = {
'BTCUSDT', 'ETHUSDT', 'BNBUSDT', 'SOLUSDT', 'XRPUSDT',
'ADAUSDT', 'DOGEUSDT', 'DOTUSDT', 'AVAXUSDT', 'MATICUSDT',
'LINKUSDT', 'UNIUSDT', 'ATOMUSDT', 'ETCUSDT', 'LTCUSDT',
'NEARUSDT', 'APTUSDT', 'ARBUSDT', 'OPUSDT', 'SUIUSDT'
}
symbols_before = len(symbols)
symbols = [s for s in symbols if s not in major_coins]
excluded_count = symbols_before - len(symbols)
if excluded_count > 0:
logger.info(f"排除主流币 {excluded_count} 个,剩余 {len(symbols)} 个交易对(专注于山寨币)")
# 先批量获取所有交易对的24小时行情数据减少API请求
logger.info(f"批量获取 {len(symbols)} 个交易对的24小时行情数据...")
all_tickers = await self.client.get_all_tickers_24h()
# 过滤最小涨跌幅和成交量,减少需要详细分析的交易对数量
pre_filtered_symbols = []
for symbol in symbols:
ticker = all_tickers.get(symbol)
if ticker:
change_percent = abs(ticker.get('changePercent', 0))
volume = ticker.get('volume', 0)
if (change_percent >= cfg.get('MIN_CHANGE_PERCENT', config.TRADING_CONFIG['MIN_CHANGE_PERCENT']) and
volume >= cfg.get('MIN_VOLUME_24H', config.TRADING_CONFIG['MIN_VOLUME_24H'])):
pre_filtered_symbols.append(symbol)
logger.info(f"初步筛选后,需要详细分析的交易对: {len(pre_filtered_symbols)}")
# 只对符合条件的交易对进行详细分析获取K线和技术指标
# ⚠️ 并发数说明:
# - 这是单个账户扫描时,同时分析多少个交易对(不是用户进程数)
# - 并发数5单用户时扫描更快15-25秒
# - 并发数3多用户时系统更稳定4个账户最多12个并发请求
# - 如果只有一个账户建议保持5如果后续增加用户可以降低到3
# - 由于中间数据K线、技术指标已经缓存实际API请求会大大减少
semaphore = asyncio.Semaphore(3) # 最多5个并发请求单用户建议5多用户建议3
async def get_symbol_change_with_limit(symbol):
async with semaphore:
try:
# 添加超时控制,确保单个交易对的分析不会无限期阻塞
return await asyncio.wait_for(
self._get_symbol_change(symbol, all_tickers.get(symbol)),
timeout=10.0 # 单个交易对分析超时10秒
)
except asyncio.TimeoutError:
logger.warning(f"{symbol} 分析超时10秒跳过")
return None
except Exception as e:
logger.debug(f"{symbol} 分析出错: {e}")
return None
tasks = [get_symbol_change_with_limit(symbol) for symbol in pre_filtered_symbols]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 过滤有效结果
valid_results = [
r for r in results
if isinstance(r, dict) and r.get('changePercent') is not None
]
# ⚠️ 优化2成交量验证 - 24H Volume低于1000万美金直接剔除
min_volume_strict = cfg.get('MIN_VOLUME_24H_STRICT', 10000000) # 默认1000万美金
min_volume_normal = cfg.get('MIN_VOLUME_24H', config.TRADING_CONFIG['MIN_VOLUME_24H'])
# 使用更严格的成交量要求
min_volume = max(min_volume_strict, min_volume_normal)
# 过滤最小涨跌幅和成交量
filtered_results = [
r for r in valid_results
if abs(r['changePercent']) >= cfg.get('MIN_CHANGE_PERCENT', config.TRADING_CONFIG['MIN_CHANGE_PERCENT'])
and r.get('volume24h', 0) >= min_volume
]
if min_volume_strict > min_volume_normal:
logger.info(f"使用严格成交量过滤: {min_volume_strict/1000000:.1f}M USDT (原标准: {min_volume_normal/1000000:.1f}M USDT)")
# ⚠️ 2026-01-27优化按真实的signal_strength排序而不是简单的signalScore
# 优先考虑信号强度高的交易对8-10分提升胜率
sorted_results = sorted(
filtered_results,
key=lambda x: (
x.get('signal_strength', 0) * 100, # 信号强度权重最高乘以100确保优先级
x.get('signalScore', 0) * 10, # 其次考虑信号得分(兼容性)
abs(x['changePercent']) # 最后考虑涨跌幅
),
reverse=True
)
# 智能补单:返回 TOP_N + 额外候选数,当前 TOP_N 中部分因冷却等被跳过时,策略仍会尝试后续交易对,避免无单可下
top_n_val = cfg.get('TOP_N_SYMBOLS', config.TRADING_CONFIG['TOP_N_SYMBOLS'])
extra = cfg.get('SCAN_EXTRA_SYMBOLS_FOR_SUPPLEMENT', config.TRADING_CONFIG.get('SCAN_EXTRA_SYMBOLS_FOR_SUPPLEMENT', 8))
take_count = min(len(sorted_results), top_n_val + extra)
top_n = sorted_results[:take_count]
self.top_symbols = top_n
# ⚠️ 已禁用扫描结果缓存,确保每个账户都使用最新的市场数据
# 虽然中间数据K线、技术指标已经缓存但最终扫描结果不缓存
# 这样可以避免使用过期的交易对,确保每个账户都基于最新市场数据扫描
# 记录扫描性能(用于监控多用户时的系统压力)
scan_duration = time.time() - self._scan_start_time
logger.info(f"扫描完成,找到 {len(top_n)} 个符合条件的交易对,耗时 {scan_duration:.2f}")
if scan_duration > 60:
logger.warning(f"⚠️ 扫描耗时较长({scan_duration:.2f}秒),可能影响系统性能,建议检查缓存命中率")
# 记录扫描结果到数据库
try:
import sys
from pathlib import Path
project_root = Path(__file__).parent.parent
backend_path = project_root / 'backend'
if backend_path.exists():
sys.path.insert(0, str(backend_path))
from database.models import MarketScan
MarketScan.create(
symbols_scanned=len(symbols),
symbols_found=len(top_n),
top_symbols=[s['symbol'] for s in top_n],
scan_duration=scan_duration
)
except Exception as e:
logger.debug(f"记录扫描结果失败: {e}")
logger.info(f"扫描完成,找到 {len(top_n)} 个符合条件的交易对")
# 打印结果(包含技术指标)
for i, symbol_info in enumerate(top_n, 1):
rsi_str = f"RSI:{symbol_info.get('rsi', 0):.1f}" if symbol_info.get('rsi') else "RSI:N/A"
regime_str = symbol_info.get('marketRegime', 'unknown')
# ⚠️ 2026-01-27优化显示真实的signal_strength而不是signalScore
strength_str = f"信号:{symbol_info.get('signal_strength', symbol_info.get('signalScore', 0))}"
logger.info(
f"{i}. {symbol_info['symbol']}: "
f"{symbol_info['changePercent']:.2f}% | "
f"{rsi_str} | {regime_str} | {strength_str} | "
f"价格: {symbol_info['price']:.4f}"
)
return top_n
async def _get_symbol_change(self, symbol: str, ticker_data: Optional[Dict] = None) -> Optional[Dict]:
"""
获取单个交易对的涨跌幅和技术指标
Args:
symbol: 交易对
ticker_data: 可选的24小时行情数据如果已批量获取
Returns:
包含涨跌幅和技术指标信息的字典
"""
try:
# 如果已有批量获取的数据,直接使用;否则单独获取
if ticker_data:
ticker = ticker_data
else:
ticker = await self.client.get_ticker_24h(symbol)
if not ticker:
return None
# 24h ticker 的“最新价”(更接近用户理解的“当前价格”)
# 注意:技术指标仍然基于 K 线收盘价计算;这里额外携带一份展示用价格与时间戳
ticker_price = ticker.get('price')
try:
ticker_price = float(ticker_price) if ticker_price is not None else None
except Exception:
ticker_price = None
ticker_ts = ticker.get('ts')
try:
ticker_ts = int(ticker_ts) if ticker_ts is not None else None
except Exception:
ticker_ts = None
# 获取更多K线数据用于技术指标计算使用配置的主周期
primary_interval = config.TRADING_CONFIG.get('PRIMARY_INTERVAL', '1h')
klines = await self.client.get_klines(
symbol=symbol,
interval=primary_interval,
limit=50 # 获取更多数据用于计算指标
)
if len(klines) < 2:
return None
# 获取4H周期数据用于多周期共振检查
confirm_interval = config.TRADING_CONFIG.get('CONFIRM_INTERVAL', '4h')
klines_4h = await self.client.get_klines(
symbol=symbol,
interval=confirm_interval,
limit=50 # 获取4H周期数据
)
# 提取价格数据(主周期)
close_prices = [float(k[4]) for k in klines] # 收盘价
high_prices = [float(k[2]) for k in klines] # 最高价
low_prices = [float(k[3]) for k in klines] # 最低价
# 提取4H周期价格数据
close_prices_4h = [float(k[4]) for k in klines_4h] if len(klines_4h) >= 2 else close_prices
# 计算涨跌幅(基于主周期)
current_price = close_prices[-1]
prev_price = close_prices[-2] if len(close_prices) >= 2 else close_prices[0]
if prev_price == 0:
return None
change_percent = ((current_price - prev_price) / prev_price) * 100
# ⚠️ 优化检查技术指标计算结果缓存基于K线数据的最后更新时间
# 如果K线数据没有更新可以直接使用缓存的技术指标
primary_interval = config.TRADING_CONFIG.get('PRIMARY_INTERVAL', '1h')
confirm_interval = config.TRADING_CONFIG.get('CONFIRM_INTERVAL', '4h')
cache_key_indicators = f"indicators:{symbol}:{primary_interval}:{confirm_interval}"
last_kline_time = int(klines[-1][0]) if klines else 0 # 最后一根K线的时间戳
# 尝试从缓存获取技术指标计算结果
cached_indicators = None
try:
cached_indicators = await self.client.redis_cache.get(cache_key_indicators)
except Exception:
pass
use_cached_indicators = False
if cached_indicators and cached_indicators.get('last_kline_time') == last_kline_time:
# 缓存命中,使用缓存的技术指标
use_cached_indicators = True
logger.debug(f"{symbol} 使用缓存的技术指标计算结果")
rsi = cached_indicators.get('rsi')
macd = cached_indicators.get('macd')
bollinger = cached_indicators.get('bollinger')
atr = cached_indicators.get('atr')
ema20 = cached_indicators.get('ema20')
ema50 = cached_indicators.get('ema50')
ema20_4h = cached_indicators.get('ema20_4h')
market_regime = cached_indicators.get('marketRegime')
else:
# 缓存未命中,重新计算技术指标
rsi = TechnicalIndicators.calculate_rsi(close_prices, period=14)
macd = TechnicalIndicators.calculate_macd(close_prices)
bollinger = TechnicalIndicators.calculate_bollinger_bands(close_prices, period=20)
atr_period = int(config.TRADING_CONFIG.get('ATR_PERIOD', 14) or 14)
atr = TechnicalIndicators.calculate_atr(high_prices, low_prices, close_prices, period=atr_period)
ema20 = TechnicalIndicators.calculate_ema(close_prices, period=20)
ema50 = TechnicalIndicators.calculate_ema(close_prices, period=50)
# 计算4H周期的EMA20用于多周期共振检查
ema20_4h = TechnicalIndicators.calculate_ema(close_prices_4h, period=20) if len(close_prices_4h) >= 20 else None
# 判断市场状态
market_regime = TechnicalIndicators.detect_market_regime(close_prices)
# 保存技术指标计算结果到缓存TTL: 30秒与K线缓存一致
try:
indicators_cache = {
'last_kline_time': last_kline_time,
'rsi': rsi,
'macd': macd,
'bollinger': bollinger,
'atr': atr,
'ema20': ema20,
'ema50': ema50,
'ema20_4h': ema20_4h,
'marketRegime': market_regime,
}
await self.client.redis_cache.set(cache_key_indicators, indicators_cache, ttl=30)
logger.debug(f"{symbol} 技术指标计算结果已缓存 (TTL: 30秒)")
except Exception as e:
logger.debug(f"{symbol} 缓存技术指标计算结果失败: {e}")
# 计算交易信号得分(用于排序)- 保留用于兼容性
signal_score = 0
# RSI信号均值回归
if rsi is not None:
if rsi < 30: # 超卖,做多信号
signal_score += 3
elif rsi > 70: # 超买,做空信号
signal_score += 3
elif 30 <= rsi <= 70: # 中性区域
signal_score += 1
# MACD信号
if macd and macd['histogram'] is not None:
if macd['histogram'] > 0 and macd['macd'] > macd['signal']: # 看涨
signal_score += 2
elif macd['histogram'] < 0 and macd['macd'] < macd['signal']: # 看跌
signal_score += 2
# 布林带信号(均值回归)
if bollinger:
if current_price <= bollinger['lower']: # 触及下轨,做多
signal_score += 3
elif current_price >= bollinger['upper']: # 触及上轨,做空
signal_score += 3
elif bollinger['lower'] < current_price < bollinger['upper']:
signal_score += 1
# 均线信号
if ema20 and ema50:
if current_price > ema20 > ema50: # 上升趋势
signal_score += 1
elif current_price < ema20 < ema50: # 下降趋势
signal_score += 1
# ⚠️ 2026-01-27优化计算真实的signal_strength用于排序和筛选
# 使用与strategy.py相同的逻辑确保排序依据与交易判断一致
signal_strength = 0
direction = None
# 获取4H周期当前价格用于判断4H趋势
price_4h = close_prices_4h[-1] if len(close_prices_4h) > 0 else current_price
# 判断4H周期趋势方向
trend_4h = None
if ema20_4h is not None:
if price_4h > ema20_4h:
trend_4h = 'up'
elif price_4h < ema20_4h:
trend_4h = 'down'
else:
trend_4h = 'neutral'
# 策略权重配置与strategy.py保持一致
TREND_SIGNAL_WEIGHTS = {
'macd_cross': 5, # MACD金叉/死叉
'ema_cross': 4, # EMA20上穿/下穿EMA50
'price_above_ema20': 3, # 价格在EMA20之上/下
'4h_trend_confirmation': 2, # 4H趋势确认
}
# MACD金叉/死叉(权重最高)
if macd and macd['macd'] > macd['signal'] and macd['histogram'] > 0:
# MACD金叉做多信号需4H趋势向上或中性
if trend_4h in ('up', 'neutral', None):
signal_strength += TREND_SIGNAL_WEIGHTS['macd_cross']
if direction is None:
direction = 'BUY'
elif macd and macd['macd'] < macd['signal'] and macd['histogram'] < 0:
# MACD死叉做空信号需4H趋势向下或中性
if trend_4h in ('down', 'neutral', None):
signal_strength += TREND_SIGNAL_WEIGHTS['macd_cross']
if direction is None:
direction = 'SELL'
# EMA均线系统
if ema20 and ema50:
if current_price > ema20 > ema50: # 上升趋势
if trend_4h in ('up', 'neutral', None):
signal_strength += TREND_SIGNAL_WEIGHTS['ema_cross']
if direction is None:
direction = 'BUY'
elif current_price < ema20 < ema50: # 下降趋势
if trend_4h in ('down', 'neutral', None):
signal_strength += TREND_SIGNAL_WEIGHTS['ema_cross']
if direction is None:
direction = 'SELL'
# 价格与EMA20关系
if ema20:
if current_price > ema20:
if trend_4h in ('up', 'neutral', None) and direction == 'BUY':
signal_strength += TREND_SIGNAL_WEIGHTS['price_above_ema20']
elif current_price < ema20:
if trend_4h in ('down', 'neutral', None) and direction == 'SELL':
signal_strength += TREND_SIGNAL_WEIGHTS['price_above_ema20']
# 4H趋势确认加分
if direction and trend_4h:
if (direction == 'BUY' and trend_4h == 'up') or (direction == 'SELL' and trend_4h == 'down'):
signal_strength += TREND_SIGNAL_WEIGHTS['4h_trend_confirmation']
elif (direction == 'BUY' and trend_4h == 'down') or (direction == 'SELL' and trend_4h == 'up'):
# 逆势信号,直接拒绝
signal_strength = 0
direction = None
# 强度上限归一到 0-10
signal_strength = max(0, min(int(signal_strength), 10))
# ===== 趋势状态缓存(用于后续“入场时机过滤”)=====
try:
# 只有在方向明确且信号强度达到最小门槛时,才记录趋势状态
min_strength = int(config.TRADING_CONFIG.get('MIN_SIGNAL_STRENGTH', 7) or 7)
use_trend_filter = bool(config.TRADING_CONFIG.get('USE_TREND_ENTRY_FILTER', False))
if use_trend_filter and direction and signal_strength >= min_strength:
trend_state_key = f"trend_state:{symbol}"
trend_state_value = {
'symbol': symbol,
'direction': direction, # BUY / SELL
'signal_strength': int(signal_strength),
'marketRegime': market_regime,
'trend_4h': trend_4h,
# 使用技术分析用的收盘价作为“信号价格”,同时附带 24h ticker 价格
'signal_price': float(current_price),
'ticker_price': float(ticker_price) if ticker_price is not None else None,
'ema20': float(ema20) if ema20 is not None else None,
'ema50': float(ema50) if ema50 is not None else None,
'ema20_4h': float(ema20_4h) if ema20_4h is not None else None,
'price_4h': float(price_4h) if price_4h is not None else None,
'last_kline_time': last_kline_time,
'created_at_ts': int(ticker_ts or 0),
}
ttl_sec = int(config.TRADING_CONFIG.get('TREND_STATE_TTL_SEC', 3600) or 3600)
if self.client and getattr(self.client, "redis_cache", None):
try:
await self.client.redis_cache.set(trend_state_key, trend_state_value, ttl=ttl_sec)
logger.debug(f"{symbol} 趋势状态已缓存: dir={direction}, strength={signal_strength}, price={current_price:.6f}")
except Exception as e:
logger.debug(f"{symbol} 缓存趋势状态失败: {e}")
except Exception as e:
logger.debug(f"{symbol} 处理趋势状态缓存时出错: {e}")
return {
'symbol': symbol,
# 技术分析使用的价格K线收盘价
'price': current_price,
'kline_close_price': current_price,
# 展示用“当前价”24h ticker 最新价,通常更贴近用户的直觉)
'ticker_price': ticker_price,
'ticker_ts': ticker_ts,
'prevPrice': prev_price,
# 1) 主周期涨跌幅(用于内部信号)
'kline_change_percent': change_percent,
# 2) 24h涨跌幅用于前端展示更符合“24h涨跌”的文案
'changePercent': float(ticker.get('changePercent', 0) or 0),
'volume24h': ticker.get('volume', 0),
'direction': 'UP' if change_percent > 0 else 'DOWN',
'rsi': rsi,
'macd': macd,
'bollinger': bollinger,
'atr': atr,
'ema20': ema20,
'ema50': ema50,
'ema20_4h': ema20_4h, # 4H周期EMA20用于多周期共振
'price_4h': close_prices_4h[-1] if len(close_prices_4h) > 0 else current_price, # 4H周期当前价格
'marketRegime': market_regime,
'signalScore': signal_score, # 保留用于兼容性
'signal_strength': signal_strength, # ⚠️ 2026-01-27优化添加真实的信号强度
'trend_4h': trend_4h, # 4H趋势方向
'klines': klines[-10:], # 保留最近10根K线
'klines_4h': klines_4h[-10:] if len(klines_4h) >= 10 else klines_4h # 保留最近10根4H K线
}
except Exception as e:
logger.debug(f"获取 {symbol} 数据失败: {e}")
return None
def get_top_symbols(self) -> List[Dict]:
"""
获取当前扫描到的前N个货币对
Returns:
前N个货币对列表
"""
return self.top_symbols
async def monitor_price(self, symbol: str, callback) -> None:
"""
监控单个交易对的价格变化WebSocket
Args:
symbol: 交易对
callback: 价格变化回调函数
"""
# 使用标准WebSocket
try:
import aiohttp
import json
# 直接使用 aiohttp 连接 Binance 期货 WebSocket API
# 根据文档https://developers.binance.com/docs/derivatives/usds-margined-futures/websocket-market-streams
# 端点wss://fstream.binance.com/ws/<symbol>@ticker
ws_url = f"wss://fstream.binance.com/ws/{symbol.lower()}@ticker"
async with aiohttp.ClientSession() as session:
async with session.ws_connect(ws_url) as ws:
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
try:
# 解析 JSON 消息
data = json.loads(msg.data)
# WebSocket 返回的数据格式:{'e': '24hrTicker', 's': 'BTCUSDT', 'c': '50000.00', ...}
if isinstance(data, dict):
if 'c' in data: # 'c' 是当前价格
price = float(data['c'])
await callback(symbol, price)
elif 'data' in data and isinstance(data['data'], dict) and 'c' in data['data']:
price = float(data['data']['c'])
await callback(symbol, price)
except (KeyError, ValueError, TypeError, json.JSONDecodeError) as e:
logger.debug(f"解析 {symbol} 价格数据失败: {e}")
continue
elif msg.type == aiohttp.WSMsgType.ERROR:
logger.warning(f"监控 {symbol} WebSocket错误: {ws.exception()}")
break
elif msg.type == aiohttp.WSMsgType.CLOSE:
logger.info(f"监控 {symbol} WebSocket连接关闭")
break
except Exception as e:
logger.error(f"监控 {symbol} 价格失败: {e}")
def get_realtime_price(self, symbol: str) -> Optional[float]:
"""
获取实时价格(从缓存)
Args:
symbol: 交易对
Returns:
实时价格
"""
if self.client:
return self.client.get_realtime_price(symbol)
return None