461 lines
21 KiB
Python
461 lines
21 KiB
Python
"""
|
||
市场扫描器 - 发现涨跌幅最大的前N个货币对,并分析技术指标
|
||
"""
|
||
import asyncio
|
||
import logging
|
||
from typing import List, Dict, Optional
|
||
try:
|
||
from .binance_client import BinanceClient
|
||
from .indicators import TechnicalIndicators
|
||
from . import config
|
||
except ImportError:
|
||
from binance_client import BinanceClient
|
||
from indicators import TechnicalIndicators
|
||
import config
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class MarketScanner:
|
||
"""市场扫描器类"""
|
||
|
||
def __init__(self, client: BinanceClient):
|
||
"""
|
||
初始化市场扫描器
|
||
|
||
Args:
|
||
client: 币安客户端
|
||
"""
|
||
self.client = client
|
||
self.top_symbols: List[Dict] = []
|
||
|
||
async def scan_market(self, cache_namespace: str = "trade", config_override: Optional[Dict] = None) -> List[Dict]:
|
||
"""
|
||
扫描市场,找出涨跌幅最大的前N个货币对
|
||
优先从 Redis 缓存读取扫描结果,如果缓存不可用或过期则重新扫描
|
||
|
||
Returns:
|
||
前N个货币对列表,包含涨跌幅信息
|
||
"""
|
||
import time
|
||
self._scan_start_time = time.time()
|
||
|
||
# 允许“推荐进程”和“交易进程”使用不同的扫描参数/缓存命名空间,互不干扰
|
||
cfg = dict(config.TRADING_CONFIG or {})
|
||
if config_override and isinstance(config_override, dict):
|
||
cfg.update(config_override)
|
||
ns = (cache_namespace or "trade").strip() or "trade"
|
||
|
||
# 先查 Redis 缓存(扫描结果缓存,TTL: 30秒)
|
||
cache_key = f"scan_result:top_symbols:{ns}"
|
||
cached = await self.client.redis_cache.get(cache_key)
|
||
if cached:
|
||
logger.info(f"从Redis缓存获取扫描结果: {len(cached)} 个交易对")
|
||
self.top_symbols = cached
|
||
return cached
|
||
|
||
logger.info("开始扫描市场...")
|
||
|
||
# 获取所有USDT交易对
|
||
all_symbols = await self.client.get_all_usdt_pairs()
|
||
if not all_symbols:
|
||
logger.warning("未获取到交易对")
|
||
return []
|
||
|
||
# 根据配置限制扫描的交易对数量
|
||
max_scan_symbols = cfg.get('MAX_SCAN_SYMBOLS', 500)
|
||
if max_scan_symbols > 0 and max_scan_symbols < len(all_symbols):
|
||
symbols = all_symbols[:max_scan_symbols]
|
||
logger.info(f"限制扫描数量: {len(symbols)}/{len(all_symbols)} 个交易对(配置: MAX_SCAN_SYMBOLS={max_scan_symbols})")
|
||
else:
|
||
symbols = all_symbols
|
||
logger.info(f"扫描所有 {len(symbols)} 个USDT交易对")
|
||
|
||
# 过滤主流币(山寨币策略应该排除主流币)
|
||
exclude_major_coins = cfg.get('EXCLUDE_MAJOR_COINS', True)
|
||
if exclude_major_coins:
|
||
# 主流币列表(市值排名前15的主流币)
|
||
major_coins = {
|
||
'BTCUSDT', 'ETHUSDT', 'BNBUSDT', 'SOLUSDT', 'XRPUSDT',
|
||
'ADAUSDT', 'DOGEUSDT', 'DOTUSDT', 'AVAXUSDT', 'MATICUSDT',
|
||
'LINKUSDT', 'UNIUSDT', 'ATOMUSDT', 'ETCUSDT', 'LTCUSDT',
|
||
'NEARUSDT', 'APTUSDT', 'ARBUSDT', 'OPUSDT', 'SUIUSDT'
|
||
}
|
||
symbols_before = len(symbols)
|
||
symbols = [s for s in symbols if s not in major_coins]
|
||
excluded_count = symbols_before - len(symbols)
|
||
if excluded_count > 0:
|
||
logger.info(f"排除主流币 {excluded_count} 个,剩余 {len(symbols)} 个交易对(专注于山寨币)")
|
||
|
||
# 先批量获取所有交易对的24小时行情数据(减少API请求)
|
||
logger.info(f"批量获取 {len(symbols)} 个交易对的24小时行情数据...")
|
||
all_tickers = await self.client.get_all_tickers_24h()
|
||
|
||
# 过滤最小涨跌幅和成交量,减少需要详细分析的交易对数量
|
||
pre_filtered_symbols = []
|
||
for symbol in symbols:
|
||
ticker = all_tickers.get(symbol)
|
||
if ticker:
|
||
change_percent = abs(ticker.get('changePercent', 0))
|
||
volume = ticker.get('volume', 0)
|
||
if (change_percent >= cfg.get('MIN_CHANGE_PERCENT', config.TRADING_CONFIG['MIN_CHANGE_PERCENT']) and
|
||
volume >= cfg.get('MIN_VOLUME_24H', config.TRADING_CONFIG['MIN_VOLUME_24H'])):
|
||
pre_filtered_symbols.append(symbol)
|
||
|
||
logger.info(f"初步筛选后,需要详细分析的交易对: {len(pre_filtered_symbols)} 个")
|
||
|
||
# 只对符合条件的交易对进行详细分析(获取K线和技术指标)
|
||
# 限制并发数量,避免请求过快
|
||
semaphore = asyncio.Semaphore(5) # 最多5个并发请求
|
||
|
||
async def get_symbol_change_with_limit(symbol):
|
||
async with semaphore:
|
||
return await self._get_symbol_change(symbol, all_tickers.get(symbol))
|
||
|
||
tasks = [get_symbol_change_with_limit(symbol) for symbol in pre_filtered_symbols]
|
||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||
|
||
# 过滤有效结果
|
||
valid_results = [
|
||
r for r in results
|
||
if isinstance(r, dict) and r.get('changePercent') is not None
|
||
]
|
||
|
||
# ⚠️ 优化2:成交量验证 - 24H Volume低于1000万美金,直接剔除
|
||
min_volume_strict = cfg.get('MIN_VOLUME_24H_STRICT', 10000000) # 默认1000万美金
|
||
min_volume_normal = cfg.get('MIN_VOLUME_24H', config.TRADING_CONFIG['MIN_VOLUME_24H'])
|
||
# 使用更严格的成交量要求
|
||
min_volume = max(min_volume_strict, min_volume_normal)
|
||
|
||
# 过滤最小涨跌幅和成交量
|
||
filtered_results = [
|
||
r for r in valid_results
|
||
if abs(r['changePercent']) >= cfg.get('MIN_CHANGE_PERCENT', config.TRADING_CONFIG['MIN_CHANGE_PERCENT'])
|
||
and r.get('volume24h', 0) >= min_volume
|
||
]
|
||
|
||
if min_volume_strict > min_volume_normal:
|
||
logger.info(f"使用严格成交量过滤: {min_volume_strict/1000000:.1f}M USDT (原标准: {min_volume_normal/1000000:.1f}M USDT)")
|
||
|
||
# 按信号得分和涨跌幅综合排序,取前N个
|
||
# 优先考虑技术指标信号得分高的
|
||
sorted_results = sorted(
|
||
filtered_results,
|
||
key=lambda x: (
|
||
x.get('signalScore', 0) * 10, # 信号得分权重更高
|
||
abs(x['changePercent']) # 其次考虑涨跌幅
|
||
),
|
||
reverse=True
|
||
)
|
||
|
||
top_n = sorted_results[:cfg.get('TOP_N_SYMBOLS', config.TRADING_CONFIG['TOP_N_SYMBOLS'])]
|
||
|
||
self.top_symbols = top_n
|
||
|
||
# 写入 Redis 缓存(TTL: 30秒)
|
||
await self.client.redis_cache.set(cache_key, top_n, ttl=30)
|
||
logger.debug(f"扫描结果已缓存: {len(top_n)} 个交易对 (TTL: 30秒)")
|
||
|
||
# 记录扫描结果到数据库
|
||
try:
|
||
import sys
|
||
from pathlib import Path
|
||
project_root = Path(__file__).parent.parent
|
||
backend_path = project_root / 'backend'
|
||
if backend_path.exists():
|
||
sys.path.insert(0, str(backend_path))
|
||
from database.models import MarketScan
|
||
import time
|
||
scan_duration = time.time() - (getattr(self, '_scan_start_time', time.time()))
|
||
MarketScan.create(
|
||
symbols_scanned=len(symbols),
|
||
symbols_found=len(top_n),
|
||
top_symbols=[s['symbol'] for s in top_n],
|
||
scan_duration=scan_duration
|
||
)
|
||
except Exception as e:
|
||
logger.debug(f"记录扫描结果失败: {e}")
|
||
|
||
logger.info(f"扫描完成,找到 {len(top_n)} 个符合条件的交易对")
|
||
|
||
# 打印结果(包含技术指标)
|
||
for i, symbol_info in enumerate(top_n, 1):
|
||
rsi_str = f"RSI:{symbol_info.get('rsi', 0):.1f}" if symbol_info.get('rsi') else "RSI:N/A"
|
||
regime_str = symbol_info.get('marketRegime', 'unknown')
|
||
score_str = f"信号:{symbol_info.get('signalScore', 0)}"
|
||
|
||
logger.info(
|
||
f"{i}. {symbol_info['symbol']}: "
|
||
f"{symbol_info['changePercent']:.2f}% | "
|
||
f"{rsi_str} | {regime_str} | {score_str} | "
|
||
f"价格: {symbol_info['price']:.4f}"
|
||
)
|
||
|
||
return top_n
|
||
|
||
async def _get_symbol_change(self, symbol: str, ticker_data: Optional[Dict] = None) -> Optional[Dict]:
|
||
"""
|
||
获取单个交易对的涨跌幅和技术指标
|
||
|
||
Args:
|
||
symbol: 交易对
|
||
ticker_data: 可选的24小时行情数据(如果已批量获取)
|
||
|
||
Returns:
|
||
包含涨跌幅和技术指标信息的字典
|
||
"""
|
||
try:
|
||
# 如果已有批量获取的数据,直接使用;否则单独获取
|
||
if ticker_data:
|
||
ticker = ticker_data
|
||
else:
|
||
ticker = await self.client.get_ticker_24h(symbol)
|
||
|
||
if not ticker:
|
||
return None
|
||
|
||
# 24h ticker 的“最新价”(更接近用户理解的“当前价格”)
|
||
# 注意:技术指标仍然基于 K 线收盘价计算;这里额外携带一份展示用价格与时间戳
|
||
ticker_price = ticker.get('price')
|
||
try:
|
||
ticker_price = float(ticker_price) if ticker_price is not None else None
|
||
except Exception:
|
||
ticker_price = None
|
||
ticker_ts = ticker.get('ts')
|
||
try:
|
||
ticker_ts = int(ticker_ts) if ticker_ts is not None else None
|
||
except Exception:
|
||
ticker_ts = None
|
||
|
||
# 获取更多K线数据用于技术指标计算(使用配置的主周期)
|
||
primary_interval = config.TRADING_CONFIG.get('PRIMARY_INTERVAL', '1h')
|
||
klines = await self.client.get_klines(
|
||
symbol=symbol,
|
||
interval=primary_interval,
|
||
limit=50 # 获取更多数据用于计算指标
|
||
)
|
||
|
||
if len(klines) < 2:
|
||
return None
|
||
|
||
# 获取4H周期数据用于多周期共振检查
|
||
confirm_interval = config.TRADING_CONFIG.get('CONFIRM_INTERVAL', '4h')
|
||
klines_4h = await self.client.get_klines(
|
||
symbol=symbol,
|
||
interval=confirm_interval,
|
||
limit=50 # 获取4H周期数据
|
||
)
|
||
|
||
# 提取价格数据(主周期)
|
||
close_prices = [float(k[4]) for k in klines] # 收盘价
|
||
high_prices = [float(k[2]) for k in klines] # 最高价
|
||
low_prices = [float(k[3]) for k in klines] # 最低价
|
||
|
||
# 提取4H周期价格数据
|
||
close_prices_4h = [float(k[4]) for k in klines_4h] if len(klines_4h) >= 2 else close_prices
|
||
|
||
# 计算涨跌幅(基于主周期)
|
||
current_price = close_prices[-1]
|
||
prev_price = close_prices[-2] if len(close_prices) >= 2 else close_prices[0]
|
||
|
||
if prev_price == 0:
|
||
return None
|
||
|
||
change_percent = ((current_price - prev_price) / prev_price) * 100
|
||
|
||
# ⚠️ 优化:检查技术指标计算结果缓存(基于K线数据的最后更新时间)
|
||
# 如果K线数据没有更新,可以直接使用缓存的技术指标
|
||
primary_interval = config.TRADING_CONFIG.get('PRIMARY_INTERVAL', '1h')
|
||
confirm_interval = config.TRADING_CONFIG.get('CONFIRM_INTERVAL', '4h')
|
||
cache_key_indicators = f"indicators:{symbol}:{primary_interval}:{confirm_interval}"
|
||
last_kline_time = int(klines[-1][0]) if klines else 0 # 最后一根K线的时间戳
|
||
|
||
# 尝试从缓存获取技术指标计算结果
|
||
cached_indicators = None
|
||
try:
|
||
cached_indicators = await self.client.redis_cache.get(cache_key_indicators)
|
||
except Exception:
|
||
pass
|
||
|
||
use_cached_indicators = False
|
||
if cached_indicators and cached_indicators.get('last_kline_time') == last_kline_time:
|
||
# 缓存命中,使用缓存的技术指标
|
||
use_cached_indicators = True
|
||
logger.debug(f"{symbol} 使用缓存的技术指标计算结果")
|
||
rsi = cached_indicators.get('rsi')
|
||
macd = cached_indicators.get('macd')
|
||
bollinger = cached_indicators.get('bollinger')
|
||
atr = cached_indicators.get('atr')
|
||
ema20 = cached_indicators.get('ema20')
|
||
ema50 = cached_indicators.get('ema50')
|
||
ema20_4h = cached_indicators.get('ema20_4h')
|
||
market_regime = cached_indicators.get('marketRegime')
|
||
else:
|
||
# 缓存未命中,重新计算技术指标
|
||
rsi = TechnicalIndicators.calculate_rsi(close_prices, period=14)
|
||
macd = TechnicalIndicators.calculate_macd(close_prices)
|
||
bollinger = TechnicalIndicators.calculate_bollinger_bands(close_prices, period=20)
|
||
atr_period = int(config.TRADING_CONFIG.get('ATR_PERIOD', 14) or 14)
|
||
atr = TechnicalIndicators.calculate_atr(high_prices, low_prices, close_prices, period=atr_period)
|
||
ema20 = TechnicalIndicators.calculate_ema(close_prices, period=20)
|
||
ema50 = TechnicalIndicators.calculate_ema(close_prices, period=50)
|
||
|
||
# 计算4H周期的EMA20用于多周期共振检查
|
||
ema20_4h = TechnicalIndicators.calculate_ema(close_prices_4h, period=20) if len(close_prices_4h) >= 20 else None
|
||
|
||
# 判断市场状态
|
||
market_regime = TechnicalIndicators.detect_market_regime(close_prices)
|
||
|
||
# 保存技术指标计算结果到缓存(TTL: 30秒,与K线缓存一致)
|
||
try:
|
||
indicators_cache = {
|
||
'last_kline_time': last_kline_time,
|
||
'rsi': rsi,
|
||
'macd': macd,
|
||
'bollinger': bollinger,
|
||
'atr': atr,
|
||
'ema20': ema20,
|
||
'ema50': ema50,
|
||
'ema20_4h': ema20_4h,
|
||
'marketRegime': market_regime,
|
||
}
|
||
await self.client.redis_cache.set(cache_key_indicators, indicators_cache, ttl=30)
|
||
logger.debug(f"{symbol} 技术指标计算结果已缓存 (TTL: 30秒)")
|
||
except Exception as e:
|
||
logger.debug(f"{symbol} 缓存技术指标计算结果失败: {e}")
|
||
|
||
# 计算交易信号得分(用于排序)
|
||
signal_score = 0
|
||
|
||
# RSI信号(均值回归)
|
||
if rsi is not None:
|
||
if rsi < 30: # 超卖,做多信号
|
||
signal_score += 3
|
||
elif rsi > 70: # 超买,做空信号
|
||
signal_score += 3
|
||
elif 30 <= rsi <= 70: # 中性区域
|
||
signal_score += 1
|
||
|
||
# MACD信号
|
||
if macd and macd['histogram'] is not None:
|
||
if macd['histogram'] > 0 and macd['macd'] > macd['signal']: # 看涨
|
||
signal_score += 2
|
||
elif macd['histogram'] < 0 and macd['macd'] < macd['signal']: # 看跌
|
||
signal_score += 2
|
||
|
||
# 布林带信号(均值回归)
|
||
if bollinger:
|
||
if current_price <= bollinger['lower']: # 触及下轨,做多
|
||
signal_score += 3
|
||
elif current_price >= bollinger['upper']: # 触及上轨,做空
|
||
signal_score += 3
|
||
elif bollinger['lower'] < current_price < bollinger['upper']:
|
||
signal_score += 1
|
||
|
||
# 均线信号
|
||
if ema20 and ema50:
|
||
if current_price > ema20 > ema50: # 上升趋势
|
||
signal_score += 1
|
||
elif current_price < ema20 < ema50: # 下降趋势
|
||
signal_score += 1
|
||
|
||
return {
|
||
'symbol': symbol,
|
||
# 技术分析使用的价格(K线收盘价)
|
||
'price': current_price,
|
||
'kline_close_price': current_price,
|
||
# 展示用“当前价”(24h ticker 最新价,通常更贴近用户的直觉)
|
||
'ticker_price': ticker_price,
|
||
'ticker_ts': ticker_ts,
|
||
'prevPrice': prev_price,
|
||
# 1) 主周期涨跌幅(用于内部信号)
|
||
'kline_change_percent': change_percent,
|
||
# 2) 24h涨跌幅(用于前端展示更符合“24h涨跌”的文案)
|
||
'changePercent': float(ticker.get('changePercent', 0) or 0),
|
||
'volume24h': ticker.get('volume', 0),
|
||
'direction': 'UP' if change_percent > 0 else 'DOWN',
|
||
'rsi': rsi,
|
||
'macd': macd,
|
||
'bollinger': bollinger,
|
||
'atr': atr,
|
||
'ema20': ema20,
|
||
'ema50': ema50,
|
||
'ema20_4h': ema20_4h, # 4H周期EMA20,用于多周期共振
|
||
'price_4h': close_prices_4h[-1] if len(close_prices_4h) > 0 else current_price, # 4H周期当前价格
|
||
'marketRegime': market_regime,
|
||
'signalScore': signal_score,
|
||
'klines': klines[-10:], # 保留最近10根K线
|
||
'klines_4h': klines_4h[-10:] if len(klines_4h) >= 10 else klines_4h # 保留最近10根4H K线
|
||
}
|
||
except Exception as e:
|
||
logger.debug(f"获取 {symbol} 数据失败: {e}")
|
||
return None
|
||
|
||
def get_top_symbols(self) -> List[Dict]:
|
||
"""
|
||
获取当前扫描到的前N个货币对
|
||
|
||
Returns:
|
||
前N个货币对列表
|
||
"""
|
||
return self.top_symbols
|
||
|
||
async def monitor_price(self, symbol: str, callback) -> None:
|
||
"""
|
||
监控单个交易对的价格变化(WebSocket)
|
||
|
||
Args:
|
||
symbol: 交易对
|
||
callback: 价格变化回调函数
|
||
"""
|
||
# 使用标准WebSocket
|
||
try:
|
||
import aiohttp
|
||
import json
|
||
|
||
# 直接使用 aiohttp 连接 Binance 期货 WebSocket API
|
||
# 根据文档:https://developers.binance.com/docs/derivatives/usds-margined-futures/websocket-market-streams
|
||
# 端点:wss://fstream.binance.com/ws/<symbol>@ticker
|
||
ws_url = f"wss://fstream.binance.com/ws/{symbol.lower()}@ticker"
|
||
|
||
async with aiohttp.ClientSession() as session:
|
||
async with session.ws_connect(ws_url) as ws:
|
||
async for msg in ws:
|
||
if msg.type == aiohttp.WSMsgType.TEXT:
|
||
try:
|
||
# 解析 JSON 消息
|
||
data = json.loads(msg.data)
|
||
|
||
# WebSocket 返回的数据格式:{'e': '24hrTicker', 's': 'BTCUSDT', 'c': '50000.00', ...}
|
||
if isinstance(data, dict):
|
||
if 'c' in data: # 'c' 是当前价格
|
||
price = float(data['c'])
|
||
await callback(symbol, price)
|
||
elif 'data' in data and isinstance(data['data'], dict) and 'c' in data['data']:
|
||
price = float(data['data']['c'])
|
||
await callback(symbol, price)
|
||
except (KeyError, ValueError, TypeError, json.JSONDecodeError) as e:
|
||
logger.debug(f"解析 {symbol} 价格数据失败: {e}")
|
||
continue
|
||
elif msg.type == aiohttp.WSMsgType.ERROR:
|
||
logger.warning(f"监控 {symbol} WebSocket错误: {ws.exception()}")
|
||
break
|
||
elif msg.type == aiohttp.WSMsgType.CLOSE:
|
||
logger.info(f"监控 {symbol} WebSocket连接关闭")
|
||
break
|
||
except Exception as e:
|
||
logger.error(f"监控 {symbol} 价格失败: {e}")
|
||
|
||
def get_realtime_price(self, symbol: str) -> Optional[float]:
|
||
"""
|
||
获取实时价格(从缓存)
|
||
|
||
Args:
|
||
symbol: 交易对
|
||
|
||
Returns:
|
||
实时价格
|
||
"""
|
||
if self.client:
|
||
return self.client.get_realtime_price(symbol)
|
||
return None |