auto_trade_sys/trading_system/market_scanner.py
薇薇安 c535a7b1ae a
2026-01-14 23:57:52 +08:00

345 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
市场扫描器 - 发现涨跌幅最大的前N个货币对并分析技术指标
"""
import asyncio
import logging
from typing import List, Dict, Optional
try:
from .binance_client import BinanceClient
from .indicators import TechnicalIndicators
from . import config
except ImportError:
from binance_client import BinanceClient
from indicators import TechnicalIndicators
import config
logger = logging.getLogger(__name__)
class MarketScanner:
"""市场扫描器类"""
def __init__(self, client: BinanceClient):
"""
初始化市场扫描器
Args:
client: 币安客户端
"""
self.client = client
self.top_symbols: List[Dict] = []
async def scan_market(self) -> List[Dict]:
"""
扫描市场找出涨跌幅最大的前N个货币对
优先从 Redis 缓存读取扫描结果,如果缓存不可用或过期则重新扫描
Returns:
前N个货币对列表包含涨跌幅信息
"""
import time
self._scan_start_time = time.time()
# 先查 Redis 缓存扫描结果缓存TTL: 30秒
cache_key = "scan_result:top_symbols"
cached = await self.client.redis_cache.get(cache_key)
if cached:
logger.info(f"从Redis缓存获取扫描结果: {len(cached)} 个交易对")
self.top_symbols = cached
return cached
logger.info("开始扫描市场...")
# 获取所有USDT交易对
all_symbols = await self.client.get_all_usdt_pairs()
if not all_symbols:
logger.warning("未获取到交易对")
return []
# 根据配置限制扫描的交易对数量
max_scan_symbols = config.TRADING_CONFIG.get('MAX_SCAN_SYMBOLS', 500)
if max_scan_symbols > 0 and max_scan_symbols < len(all_symbols):
symbols = all_symbols[:max_scan_symbols]
logger.info(f"限制扫描数量: {len(symbols)}/{len(all_symbols)} 个交易对(配置: MAX_SCAN_SYMBOLS={max_scan_symbols}")
else:
symbols = all_symbols
logger.info(f"扫描所有 {len(symbols)} 个USDT交易对")
# 先批量获取所有交易对的24小时行情数据减少API请求
logger.info(f"批量获取 {len(symbols)} 个交易对的24小时行情数据...")
all_tickers = await self.client.get_all_tickers_24h()
# 过滤最小涨跌幅和成交量,减少需要详细分析的交易对数量
pre_filtered_symbols = []
for symbol in symbols:
ticker = all_tickers.get(symbol)
if ticker:
change_percent = abs(ticker.get('changePercent', 0))
volume = ticker.get('volume', 0)
if (change_percent >= config.TRADING_CONFIG['MIN_CHANGE_PERCENT'] and
volume >= config.TRADING_CONFIG['MIN_VOLUME_24H']):
pre_filtered_symbols.append(symbol)
logger.info(f"初步筛选后,需要详细分析的交易对: {len(pre_filtered_symbols)}")
# 只对符合条件的交易对进行详细分析获取K线和技术指标
# 限制并发数量,避免请求过快
semaphore = asyncio.Semaphore(5) # 最多5个并发请求
async def get_symbol_change_with_limit(symbol):
async with semaphore:
return await self._get_symbol_change(symbol, all_tickers.get(symbol))
tasks = [get_symbol_change_with_limit(symbol) for symbol in pre_filtered_symbols]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 过滤有效结果
valid_results = [
r for r in results
if isinstance(r, dict) and r.get('changePercent') is not None
]
# 过滤最小涨跌幅和成交量
filtered_results = [
r for r in valid_results
if abs(r['changePercent']) >= config.TRADING_CONFIG['MIN_CHANGE_PERCENT']
and r.get('volume24h', 0) >= config.TRADING_CONFIG['MIN_VOLUME_24H']
]
# 按信号得分和涨跌幅综合排序取前N个
# 优先考虑技术指标信号得分高的
sorted_results = sorted(
filtered_results,
key=lambda x: (
x.get('signalScore', 0) * 10, # 信号得分权重更高
abs(x['changePercent']) # 其次考虑涨跌幅
),
reverse=True
)
top_n = sorted_results[:config.TRADING_CONFIG['TOP_N_SYMBOLS']]
self.top_symbols = top_n
# 写入 Redis 缓存TTL: 30秒
await self.client.redis_cache.set(cache_key, top_n, ttl=30)
logger.debug(f"扫描结果已缓存: {len(top_n)} 个交易对 (TTL: 30秒)")
# 记录扫描结果到数据库
try:
import sys
from pathlib import Path
project_root = Path(__file__).parent.parent
backend_path = project_root / 'backend'
if backend_path.exists():
sys.path.insert(0, str(backend_path))
from database.models import MarketScan
import time
scan_duration = time.time() - (getattr(self, '_scan_start_time', time.time()))
MarketScan.create(
symbols_scanned=len(symbols),
symbols_found=len(top_n),
top_symbols=[s['symbol'] for s in top_n],
scan_duration=scan_duration
)
except Exception as e:
logger.debug(f"记录扫描结果失败: {e}")
logger.info(f"扫描完成,找到 {len(top_n)} 个符合条件的交易对")
# 打印结果(包含技术指标)
for i, symbol_info in enumerate(top_n, 1):
rsi_str = f"RSI:{symbol_info.get('rsi', 0):.1f}" if symbol_info.get('rsi') else "RSI:N/A"
regime_str = symbol_info.get('marketRegime', 'unknown')
score_str = f"信号:{symbol_info.get('signalScore', 0)}"
logger.info(
f"{i}. {symbol_info['symbol']}: "
f"{symbol_info['changePercent']:.2f}% | "
f"{rsi_str} | {regime_str} | {score_str} | "
f"价格: {symbol_info['price']:.4f}"
)
return top_n
async def _get_symbol_change(self, symbol: str, ticker_data: Optional[Dict] = None) -> Optional[Dict]:
"""
获取单个交易对的涨跌幅和技术指标
Args:
symbol: 交易对
ticker_data: 可选的24小时行情数据如果已批量获取
Returns:
包含涨跌幅和技术指标信息的字典
"""
try:
# 如果已有批量获取的数据,直接使用;否则单独获取
if ticker_data:
ticker = ticker_data
else:
ticker = await self.client.get_ticker_24h(symbol)
if not ticker:
return None
# 获取更多K线数据用于技术指标计算使用配置的主周期
primary_interval = config.TRADING_CONFIG.get('PRIMARY_INTERVAL', '1h')
klines = await self.client.get_klines(
symbol=symbol,
interval=primary_interval,
limit=50 # 获取更多数据用于计算指标
)
if len(klines) < 2:
return None
# 提取价格数据
close_prices = [float(k[4]) for k in klines] # 收盘价
high_prices = [float(k[2]) for k in klines] # 最高价
low_prices = [float(k[3]) for k in klines] # 最低价
# 计算涨跌幅(基于主周期)
current_price = close_prices[-1]
prev_price = close_prices[-2] if len(close_prices) >= 2 else close_prices[0]
if prev_price == 0:
return None
change_percent = ((current_price - prev_price) / prev_price) * 100
# 计算技术指标
rsi = TechnicalIndicators.calculate_rsi(close_prices, period=14)
macd = TechnicalIndicators.calculate_macd(close_prices)
bollinger = TechnicalIndicators.calculate_bollinger_bands(close_prices, period=20)
atr = TechnicalIndicators.calculate_atr(high_prices, low_prices, close_prices, period=14)
ema20 = TechnicalIndicators.calculate_ema(close_prices, period=20)
ema50 = TechnicalIndicators.calculate_ema(close_prices, period=50)
# 判断市场状态
market_regime = TechnicalIndicators.detect_market_regime(close_prices)
# 计算交易信号得分(用于排序)
signal_score = 0
# RSI信号均值回归
if rsi is not None:
if rsi < 30: # 超卖,做多信号
signal_score += 3
elif rsi > 70: # 超买,做空信号
signal_score += 3
elif 30 <= rsi <= 70: # 中性区域
signal_score += 1
# MACD信号
if macd and macd['histogram'] is not None:
if macd['histogram'] > 0 and macd['macd'] > macd['signal']: # 看涨
signal_score += 2
elif macd['histogram'] < 0 and macd['macd'] < macd['signal']: # 看跌
signal_score += 2
# 布林带信号(均值回归)
if bollinger:
if current_price <= bollinger['lower']: # 触及下轨,做多
signal_score += 3
elif current_price >= bollinger['upper']: # 触及上轨,做空
signal_score += 3
elif bollinger['lower'] < current_price < bollinger['upper']:
signal_score += 1
# 均线信号
if ema20 and ema50:
if current_price > ema20 > ema50: # 上升趋势
signal_score += 1
elif current_price < ema20 < ema50: # 下降趋势
signal_score += 1
return {
'symbol': symbol,
'price': current_price,
'prevPrice': prev_price,
'changePercent': change_percent,
'volume24h': ticker.get('volume', 0),
'direction': 'UP' if change_percent > 0 else 'DOWN',
'rsi': rsi,
'macd': macd,
'bollinger': bollinger,
'atr': atr,
'ema20': ema20,
'ema50': ema50,
'marketRegime': market_regime,
'signalScore': signal_score,
'klines': klines[-10:] # 保留最近10根K线
}
except Exception as e:
logger.debug(f"获取 {symbol} 数据失败: {e}")
return None
def get_top_symbols(self) -> List[Dict]:
"""
获取当前扫描到的前N个货币对
Returns:
前N个货币对列表
"""
return self.top_symbols
async def monitor_price(self, symbol: str, callback) -> None:
"""
监控单个交易对的价格变化WebSocket
Args:
symbol: 交易对
callback: 价格变化回调函数
"""
# 使用标准WebSocket
try:
import aiohttp
import json
# 直接使用 aiohttp 连接 Binance 期货 WebSocket API
# 根据文档https://developers.binance.com/docs/derivatives/usds-margined-futures/websocket-market-streams
# 端点wss://fstream.binance.com/ws/<symbol>@ticker
ws_url = f"wss://fstream.binance.com/ws/{symbol.lower()}@ticker"
async with aiohttp.ClientSession() as session:
async with session.ws_connect(ws_url) as ws:
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
try:
# 解析 JSON 消息
data = json.loads(msg.data)
# WebSocket 返回的数据格式:{'e': '24hrTicker', 's': 'BTCUSDT', 'c': '50000.00', ...}
if isinstance(data, dict):
if 'c' in data: # 'c' 是当前价格
price = float(data['c'])
await callback(symbol, price)
elif 'data' in data and isinstance(data['data'], dict) and 'c' in data['data']:
price = float(data['data']['c'])
await callback(symbol, price)
except (KeyError, ValueError, TypeError, json.JSONDecodeError) as e:
logger.debug(f"解析 {symbol} 价格数据失败: {e}")
continue
elif msg.type == aiohttp.WSMsgType.ERROR:
logger.warning(f"监控 {symbol} WebSocket错误: {ws.exception()}")
break
elif msg.type == aiohttp.WSMsgType.CLOSE:
logger.info(f"监控 {symbol} WebSocket连接关闭")
break
except Exception as e:
logger.error(f"监控 {symbol} 价格失败: {e}")
def get_realtime_price(self, symbol: str) -> Optional[float]:
"""
获取实时价格(从缓存)
Args:
symbol: 交易对
Returns:
实时价格
"""
if self.client:
return self.client.get_realtime_price(symbol)
return None