345 lines
14 KiB
Python
345 lines
14 KiB
Python
"""
|
||
市场扫描器 - 发现涨跌幅最大的前N个货币对,并分析技术指标
|
||
"""
|
||
import asyncio
|
||
import logging
|
||
from typing import List, Dict, Optional
|
||
try:
|
||
from .binance_client import BinanceClient
|
||
from .indicators import TechnicalIndicators
|
||
from . import config
|
||
except ImportError:
|
||
from binance_client import BinanceClient
|
||
from indicators import TechnicalIndicators
|
||
import config
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class MarketScanner:
|
||
"""市场扫描器类"""
|
||
|
||
def __init__(self, client: BinanceClient):
|
||
"""
|
||
初始化市场扫描器
|
||
|
||
Args:
|
||
client: 币安客户端
|
||
"""
|
||
self.client = client
|
||
self.top_symbols: List[Dict] = []
|
||
|
||
async def scan_market(self) -> List[Dict]:
|
||
"""
|
||
扫描市场,找出涨跌幅最大的前N个货币对
|
||
优先从 Redis 缓存读取扫描结果,如果缓存不可用或过期则重新扫描
|
||
|
||
Returns:
|
||
前N个货币对列表,包含涨跌幅信息
|
||
"""
|
||
import time
|
||
self._scan_start_time = time.time()
|
||
|
||
# 先查 Redis 缓存(扫描结果缓存,TTL: 30秒)
|
||
cache_key = "scan_result:top_symbols"
|
||
cached = await self.client.redis_cache.get(cache_key)
|
||
if cached:
|
||
logger.info(f"从Redis缓存获取扫描结果: {len(cached)} 个交易对")
|
||
self.top_symbols = cached
|
||
return cached
|
||
|
||
logger.info("开始扫描市场...")
|
||
|
||
# 获取所有USDT交易对
|
||
all_symbols = await self.client.get_all_usdt_pairs()
|
||
if not all_symbols:
|
||
logger.warning("未获取到交易对")
|
||
return []
|
||
|
||
# 根据配置限制扫描的交易对数量
|
||
max_scan_symbols = config.TRADING_CONFIG.get('MAX_SCAN_SYMBOLS', 500)
|
||
if max_scan_symbols > 0 and max_scan_symbols < len(all_symbols):
|
||
symbols = all_symbols[:max_scan_symbols]
|
||
logger.info(f"限制扫描数量: {len(symbols)}/{len(all_symbols)} 个交易对(配置: MAX_SCAN_SYMBOLS={max_scan_symbols})")
|
||
else:
|
||
symbols = all_symbols
|
||
logger.info(f"扫描所有 {len(symbols)} 个USDT交易对")
|
||
|
||
# 先批量获取所有交易对的24小时行情数据(减少API请求)
|
||
logger.info(f"批量获取 {len(symbols)} 个交易对的24小时行情数据...")
|
||
all_tickers = await self.client.get_all_tickers_24h()
|
||
|
||
# 过滤最小涨跌幅和成交量,减少需要详细分析的交易对数量
|
||
pre_filtered_symbols = []
|
||
for symbol in symbols:
|
||
ticker = all_tickers.get(symbol)
|
||
if ticker:
|
||
change_percent = abs(ticker.get('changePercent', 0))
|
||
volume = ticker.get('volume', 0)
|
||
if (change_percent >= config.TRADING_CONFIG['MIN_CHANGE_PERCENT'] and
|
||
volume >= config.TRADING_CONFIG['MIN_VOLUME_24H']):
|
||
pre_filtered_symbols.append(symbol)
|
||
|
||
logger.info(f"初步筛选后,需要详细分析的交易对: {len(pre_filtered_symbols)} 个")
|
||
|
||
# 只对符合条件的交易对进行详细分析(获取K线和技术指标)
|
||
# 限制并发数量,避免请求过快
|
||
semaphore = asyncio.Semaphore(5) # 最多5个并发请求
|
||
|
||
async def get_symbol_change_with_limit(symbol):
|
||
async with semaphore:
|
||
return await self._get_symbol_change(symbol, all_tickers.get(symbol))
|
||
|
||
tasks = [get_symbol_change_with_limit(symbol) for symbol in pre_filtered_symbols]
|
||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||
|
||
# 过滤有效结果
|
||
valid_results = [
|
||
r for r in results
|
||
if isinstance(r, dict) and r.get('changePercent') is not None
|
||
]
|
||
|
||
# 过滤最小涨跌幅和成交量
|
||
filtered_results = [
|
||
r for r in valid_results
|
||
if abs(r['changePercent']) >= config.TRADING_CONFIG['MIN_CHANGE_PERCENT']
|
||
and r.get('volume24h', 0) >= config.TRADING_CONFIG['MIN_VOLUME_24H']
|
||
]
|
||
|
||
# 按信号得分和涨跌幅综合排序,取前N个
|
||
# 优先考虑技术指标信号得分高的
|
||
sorted_results = sorted(
|
||
filtered_results,
|
||
key=lambda x: (
|
||
x.get('signalScore', 0) * 10, # 信号得分权重更高
|
||
abs(x['changePercent']) # 其次考虑涨跌幅
|
||
),
|
||
reverse=True
|
||
)
|
||
|
||
top_n = sorted_results[:config.TRADING_CONFIG['TOP_N_SYMBOLS']]
|
||
|
||
self.top_symbols = top_n
|
||
|
||
# 写入 Redis 缓存(TTL: 30秒)
|
||
await self.client.redis_cache.set(cache_key, top_n, ttl=30)
|
||
logger.debug(f"扫描结果已缓存: {len(top_n)} 个交易对 (TTL: 30秒)")
|
||
|
||
# 记录扫描结果到数据库
|
||
try:
|
||
import sys
|
||
from pathlib import Path
|
||
project_root = Path(__file__).parent.parent
|
||
backend_path = project_root / 'backend'
|
||
if backend_path.exists():
|
||
sys.path.insert(0, str(backend_path))
|
||
from database.models import MarketScan
|
||
import time
|
||
scan_duration = time.time() - (getattr(self, '_scan_start_time', time.time()))
|
||
MarketScan.create(
|
||
symbols_scanned=len(symbols),
|
||
symbols_found=len(top_n),
|
||
top_symbols=[s['symbol'] for s in top_n],
|
||
scan_duration=scan_duration
|
||
)
|
||
except Exception as e:
|
||
logger.debug(f"记录扫描结果失败: {e}")
|
||
|
||
logger.info(f"扫描完成,找到 {len(top_n)} 个符合条件的交易对")
|
||
|
||
# 打印结果(包含技术指标)
|
||
for i, symbol_info in enumerate(top_n, 1):
|
||
rsi_str = f"RSI:{symbol_info.get('rsi', 0):.1f}" if symbol_info.get('rsi') else "RSI:N/A"
|
||
regime_str = symbol_info.get('marketRegime', 'unknown')
|
||
score_str = f"信号:{symbol_info.get('signalScore', 0)}"
|
||
|
||
logger.info(
|
||
f"{i}. {symbol_info['symbol']}: "
|
||
f"{symbol_info['changePercent']:.2f}% | "
|
||
f"{rsi_str} | {regime_str} | {score_str} | "
|
||
f"价格: {symbol_info['price']:.4f}"
|
||
)
|
||
|
||
return top_n
|
||
|
||
async def _get_symbol_change(self, symbol: str, ticker_data: Optional[Dict] = None) -> Optional[Dict]:
|
||
"""
|
||
获取单个交易对的涨跌幅和技术指标
|
||
|
||
Args:
|
||
symbol: 交易对
|
||
ticker_data: 可选的24小时行情数据(如果已批量获取)
|
||
|
||
Returns:
|
||
包含涨跌幅和技术指标信息的字典
|
||
"""
|
||
try:
|
||
# 如果已有批量获取的数据,直接使用;否则单独获取
|
||
if ticker_data:
|
||
ticker = ticker_data
|
||
else:
|
||
ticker = await self.client.get_ticker_24h(symbol)
|
||
|
||
if not ticker:
|
||
return None
|
||
|
||
# 获取更多K线数据用于技术指标计算(使用配置的主周期)
|
||
primary_interval = config.TRADING_CONFIG.get('PRIMARY_INTERVAL', '1h')
|
||
klines = await self.client.get_klines(
|
||
symbol=symbol,
|
||
interval=primary_interval,
|
||
limit=50 # 获取更多数据用于计算指标
|
||
)
|
||
|
||
if len(klines) < 2:
|
||
return None
|
||
|
||
# 提取价格数据
|
||
close_prices = [float(k[4]) for k in klines] # 收盘价
|
||
high_prices = [float(k[2]) for k in klines] # 最高价
|
||
low_prices = [float(k[3]) for k in klines] # 最低价
|
||
|
||
# 计算涨跌幅(基于主周期)
|
||
current_price = close_prices[-1]
|
||
prev_price = close_prices[-2] if len(close_prices) >= 2 else close_prices[0]
|
||
|
||
if prev_price == 0:
|
||
return None
|
||
|
||
change_percent = ((current_price - prev_price) / prev_price) * 100
|
||
|
||
# 计算技术指标
|
||
rsi = TechnicalIndicators.calculate_rsi(close_prices, period=14)
|
||
macd = TechnicalIndicators.calculate_macd(close_prices)
|
||
bollinger = TechnicalIndicators.calculate_bollinger_bands(close_prices, period=20)
|
||
atr = TechnicalIndicators.calculate_atr(high_prices, low_prices, close_prices, period=14)
|
||
ema20 = TechnicalIndicators.calculate_ema(close_prices, period=20)
|
||
ema50 = TechnicalIndicators.calculate_ema(close_prices, period=50)
|
||
|
||
# 判断市场状态
|
||
market_regime = TechnicalIndicators.detect_market_regime(close_prices)
|
||
|
||
# 计算交易信号得分(用于排序)
|
||
signal_score = 0
|
||
|
||
# RSI信号(均值回归)
|
||
if rsi is not None:
|
||
if rsi < 30: # 超卖,做多信号
|
||
signal_score += 3
|
||
elif rsi > 70: # 超买,做空信号
|
||
signal_score += 3
|
||
elif 30 <= rsi <= 70: # 中性区域
|
||
signal_score += 1
|
||
|
||
# MACD信号
|
||
if macd and macd['histogram'] is not None:
|
||
if macd['histogram'] > 0 and macd['macd'] > macd['signal']: # 看涨
|
||
signal_score += 2
|
||
elif macd['histogram'] < 0 and macd['macd'] < macd['signal']: # 看跌
|
||
signal_score += 2
|
||
|
||
# 布林带信号(均值回归)
|
||
if bollinger:
|
||
if current_price <= bollinger['lower']: # 触及下轨,做多
|
||
signal_score += 3
|
||
elif current_price >= bollinger['upper']: # 触及上轨,做空
|
||
signal_score += 3
|
||
elif bollinger['lower'] < current_price < bollinger['upper']:
|
||
signal_score += 1
|
||
|
||
# 均线信号
|
||
if ema20 and ema50:
|
||
if current_price > ema20 > ema50: # 上升趋势
|
||
signal_score += 1
|
||
elif current_price < ema20 < ema50: # 下降趋势
|
||
signal_score += 1
|
||
|
||
return {
|
||
'symbol': symbol,
|
||
'price': current_price,
|
||
'prevPrice': prev_price,
|
||
'changePercent': change_percent,
|
||
'volume24h': ticker.get('volume', 0),
|
||
'direction': 'UP' if change_percent > 0 else 'DOWN',
|
||
'rsi': rsi,
|
||
'macd': macd,
|
||
'bollinger': bollinger,
|
||
'atr': atr,
|
||
'ema20': ema20,
|
||
'ema50': ema50,
|
||
'marketRegime': market_regime,
|
||
'signalScore': signal_score,
|
||
'klines': klines[-10:] # 保留最近10根K线
|
||
}
|
||
except Exception as e:
|
||
logger.debug(f"获取 {symbol} 数据失败: {e}")
|
||
return None
|
||
|
||
def get_top_symbols(self) -> List[Dict]:
|
||
"""
|
||
获取当前扫描到的前N个货币对
|
||
|
||
Returns:
|
||
前N个货币对列表
|
||
"""
|
||
return self.top_symbols
|
||
|
||
async def monitor_price(self, symbol: str, callback) -> None:
|
||
"""
|
||
监控单个交易对的价格变化(WebSocket)
|
||
|
||
Args:
|
||
symbol: 交易对
|
||
callback: 价格变化回调函数
|
||
"""
|
||
# 使用标准WebSocket
|
||
try:
|
||
import aiohttp
|
||
import json
|
||
|
||
# 直接使用 aiohttp 连接 Binance 期货 WebSocket API
|
||
# 根据文档:https://developers.binance.com/docs/derivatives/usds-margined-futures/websocket-market-streams
|
||
# 端点:wss://fstream.binance.com/ws/<symbol>@ticker
|
||
ws_url = f"wss://fstream.binance.com/ws/{symbol.lower()}@ticker"
|
||
|
||
async with aiohttp.ClientSession() as session:
|
||
async with session.ws_connect(ws_url) as ws:
|
||
async for msg in ws:
|
||
if msg.type == aiohttp.WSMsgType.TEXT:
|
||
try:
|
||
# 解析 JSON 消息
|
||
data = json.loads(msg.data)
|
||
|
||
# WebSocket 返回的数据格式:{'e': '24hrTicker', 's': 'BTCUSDT', 'c': '50000.00', ...}
|
||
if isinstance(data, dict):
|
||
if 'c' in data: # 'c' 是当前价格
|
||
price = float(data['c'])
|
||
await callback(symbol, price)
|
||
elif 'data' in data and isinstance(data['data'], dict) and 'c' in data['data']:
|
||
price = float(data['data']['c'])
|
||
await callback(symbol, price)
|
||
except (KeyError, ValueError, TypeError, json.JSONDecodeError) as e:
|
||
logger.debug(f"解析 {symbol} 价格数据失败: {e}")
|
||
continue
|
||
elif msg.type == aiohttp.WSMsgType.ERROR:
|
||
logger.warning(f"监控 {symbol} WebSocket错误: {ws.exception()}")
|
||
break
|
||
elif msg.type == aiohttp.WSMsgType.CLOSE:
|
||
logger.info(f"监控 {symbol} WebSocket连接关闭")
|
||
break
|
||
except Exception as e:
|
||
logger.error(f"监控 {symbol} 价格失败: {e}")
|
||
|
||
def get_realtime_price(self, symbol: str) -> Optional[float]:
|
||
"""
|
||
获取实时价格(从缓存)
|
||
|
||
Args:
|
||
symbol: 交易对
|
||
|
||
Returns:
|
||
实时价格
|
||
"""
|
||
if self.client:
|
||
return self.client.get_realtime_price(symbol)
|
||
return None |