auto_trade_sys/market_scanner.py
薇薇安 5c841621f7 a
2026-01-13 14:30:57 +08:00

153 lines
4.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
市场扫描器 - 发现涨跌幅最大的前N个货币对
"""
import asyncio
import logging
from typing import List, Dict, Optional
from binance_client import BinanceClient
import config
logger = logging.getLogger(__name__)
class MarketScanner:
"""市场扫描器类"""
def __init__(self, client: BinanceClient):
"""
初始化市场扫描器
Args:
client: 币安客户端
"""
self.client = client
self.top_symbols: List[Dict] = []
async def scan_market(self) -> List[Dict]:
"""
扫描市场找出涨跌幅最大的前N个货币对
Returns:
前N个货币对列表包含涨跌幅信息
"""
logger.info("开始扫描市场...")
# 获取所有USDT交易对
symbols = await self.client.get_all_usdt_pairs()
if not symbols:
logger.warning("未获取到交易对")
return []
# 并发获取所有交易对的涨跌幅数据
tasks = [self._get_symbol_change(symbol) for symbol in symbols]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 过滤有效结果
valid_results = [
r for r in results
if isinstance(r, dict) and r.get('changePercent') is not None
]
# 过滤最小涨跌幅和成交量
filtered_results = [
r for r in valid_results
if abs(r['changePercent']) >= config.TRADING_CONFIG['MIN_CHANGE_PERCENT']
and r.get('volume24h', 0) >= config.TRADING_CONFIG['MIN_VOLUME_24H']
]
# 按涨跌幅绝对值排序取前N个
sorted_results = sorted(
filtered_results,
key=lambda x: abs(x['changePercent']),
reverse=True
)
top_n = sorted_results[:config.TRADING_CONFIG['TOP_N_SYMBOLS']]
self.top_symbols = top_n
logger.info(f"扫描完成,找到 {len(top_n)} 个符合条件的交易对")
# 打印结果
for i, symbol_info in enumerate(top_n, 1):
logger.info(
f"{i}. {symbol_info['symbol']}: "
f"{symbol_info['changePercent']:.2f}% "
f"(价格: {symbol_info['price']:.4f}, "
f"成交量: {symbol_info.get('volume24h', 0):.0f})"
)
return top_n
async def _get_symbol_change(self, symbol: str) -> Optional[Dict]:
"""
获取单个交易对的涨跌幅
Args:
symbol: 交易对
Returns:
包含涨跌幅信息的字典
"""
try:
# 获取24小时行情数据
ticker = await self.client.get_ticker_24h(symbol)
if not ticker:
return None
# 获取5分钟K线数据计算精确涨跌幅
klines = await self.client.get_klines(
symbol=symbol,
interval=config.TRADING_CONFIG['KLINE_INTERVAL'],
limit=2
)
if len(klines) < 2:
return None
# 计算5分钟涨跌幅
current_price = float(klines[-1][4]) # 最新收盘价
prev_price = float(klines[-2][4]) # 5分钟前收盘价
if prev_price == 0:
return None
change_percent = ((current_price - prev_price) / prev_price) * 100
return {
'symbol': symbol,
'price': current_price,
'prevPrice': prev_price,
'changePercent': change_percent,
'volume24h': ticker.get('volume', 0),
'direction': 'UP' if change_percent > 0 else 'DOWN'
}
except Exception as e:
logger.debug(f"获取 {symbol} 涨跌幅失败: {e}")
return None
def get_top_symbols(self) -> List[Dict]:
"""
获取当前扫描到的前N个货币对
Returns:
前N个货币对列表
"""
return self.top_symbols
async def monitor_price(self, symbol: str, callback) -> None:
"""
监控单个交易对的价格变化WebSocket
Args:
symbol: 交易对
callback: 价格变化回调函数
"""
try:
async with self.client.socket_manager.futures_socket(symbol.lower()) as stream:
async for msg in stream:
if 'data' in msg:
price = float(msg['data']['c']) # 最新价格
await callback(symbol, price)
except Exception as e:
logger.error(f"监控 {symbol} 价格失败: {e}")