This commit is contained in:
薇薇安 2026-01-14 00:00:24 +08:00
parent e43ee63de3
commit 5b9a8fd917
2 changed files with 112 additions and 10 deletions

View File

@ -39,6 +39,9 @@ class BinanceClient:
self.unicorn_manager: Optional[UnicornWebSocketManager] = None self.unicorn_manager: Optional[UnicornWebSocketManager] = None
self.use_unicorn = config.TRADING_CONFIG.get('USE_UNICORN_WEBSOCKET', True) self.use_unicorn = config.TRADING_CONFIG.get('USE_UNICORN_WEBSOCKET', True)
self._symbol_info_cache: Dict[str, Dict] = {} # 缓存交易对信息 self._symbol_info_cache: Dict[str, Dict] = {} # 缓存交易对信息
self._last_request_time = {} # 记录每个API端点的最后请求时间
self._request_delay = 0.1 # 请求间隔(秒),避免频率限制
self._semaphore = asyncio.Semaphore(10) # 限制并发请求数
async def connect(self, timeout: int = None, retries: int = None): async def connect(self, timeout: int = None, retries: int = None):
""" """
@ -176,6 +179,24 @@ class BinanceClient:
logger.error(f"获取交易对失败: {e}") logger.error(f"获取交易对失败: {e}")
return [] return []
async def _rate_limited_request(self, endpoint: str, coro):
"""
带速率限制的API请求
Args:
endpoint: API端点标识用于记录请求时间
coro: 异步协程
"""
async with self._semaphore:
# 检查是否需要等待(避免请求过快)
if endpoint in self._last_request_time:
elapsed = asyncio.get_event_loop().time() - self._last_request_time[endpoint]
if elapsed < self._request_delay:
await asyncio.sleep(self._request_delay - elapsed)
self._last_request_time[endpoint] = asyncio.get_event_loop().time()
return await coro
async def get_klines(self, symbol: str, interval: str = '5m', limit: int = 2) -> List[List]: async def get_klines(self, symbol: str, interval: str = '5m', limit: int = 2) -> List[List]:
""" """
获取K线数据合约市场 获取K线数据合约市场
@ -189,9 +210,16 @@ class BinanceClient:
K线数据列表 K线数据列表
""" """
try: try:
klines = await self.client.futures_klines(symbol=symbol, interval=interval, limit=limit) klines = await self._rate_limited_request(
f'klines_{symbol}_{interval}',
self.client.futures_klines(symbol=symbol, interval=interval, limit=limit)
)
return klines return klines
except BinanceAPIException as e: except BinanceAPIException as e:
error_code = e.code if hasattr(e, 'code') else None
if error_code == -1003:
logger.warning(f"获取 {symbol} K线数据失败: API请求频率过高建议使用WebSocket或增加扫描间隔")
else:
logger.error(f"获取 {symbol} K线数据失败: {e}") logger.error(f"获取 {symbol} K线数据失败: {e}")
return [] return []
@ -206,8 +234,14 @@ class BinanceClient:
24小时行情数据 24小时行情数据
""" """
try: try:
ticker = await self.client.futures_symbol_ticker(symbol=symbol) ticker = await self._rate_limited_request(
stats = await self.client.futures_ticker(symbol=symbol) f'ticker_{symbol}',
self.client.futures_symbol_ticker(symbol=symbol)
)
stats = await self._rate_limited_request(
f'stats_{symbol}',
self.client.futures_ticker(symbol=symbol)
)
return { return {
'symbol': symbol, 'symbol': symbol,
'price': float(ticker['price']), 'price': float(ticker['price']),
@ -215,9 +249,48 @@ class BinanceClient:
'changePercent': float(stats.get('priceChangePercent', 0)) 'changePercent': float(stats.get('priceChangePercent', 0))
} }
except BinanceAPIException as e: except BinanceAPIException as e:
error_code = e.code if hasattr(e, 'code') else None
if error_code == -1003:
logger.warning(f"获取 {symbol} 24小时行情失败: API请求频率过高建议使用WebSocket或增加扫描间隔")
else:
logger.error(f"获取 {symbol} 24小时行情失败: {e}") logger.error(f"获取 {symbol} 24小时行情失败: {e}")
return None return None
async def get_all_tickers_24h(self) -> Dict[str, Dict]:
"""
批量获取所有交易对的24小时行情数据更高效
Returns:
交易对行情数据字典 {symbol: {price, volume, changePercent}}
"""
try:
# 使用批量API一次获取所有交易对的数据
tickers = await self._rate_limited_request(
'all_tickers',
self.client.futures_ticker()
)
result = {}
for ticker in tickers:
symbol = ticker['symbol']
if symbol.endswith('USDT'):
result[symbol] = {
'symbol': symbol,
'price': float(ticker.get('lastPrice', 0)),
'volume': float(ticker.get('quoteVolume', 0)),
'changePercent': float(ticker.get('priceChangePercent', 0))
}
logger.debug(f"批量获取到 {len(result)} 个交易对的24小时行情数据")
return result
except BinanceAPIException as e:
error_code = e.code if hasattr(e, 'code') else None
if error_code == -1003:
logger.warning(f"批量获取24小时行情失败: API请求频率过高建议使用WebSocket或增加扫描间隔")
else:
logger.error(f"批量获取24小时行情失败: {e}")
return {}
async def get_account_balance(self) -> Dict[str, float]: async def get_account_balance(self) -> Dict[str, float]:
""" """
获取U本位合约账户余额 获取U本位合约账户余额

View File

@ -46,8 +46,32 @@ class MarketScanner:
logger.warning("未获取到交易对") logger.warning("未获取到交易对")
return [] return []
# 并发获取所有交易对的涨跌幅数据 # 先批量获取所有交易对的24小时行情数据减少API请求
tasks = [self._get_symbol_change(symbol) for symbol in symbols] logger.info(f"批量获取 {len(symbols)} 个交易对的24小时行情数据...")
all_tickers = await self.client.get_all_tickers_24h()
# 过滤最小涨跌幅和成交量,减少需要详细分析的交易对数量
pre_filtered_symbols = []
for symbol in symbols:
ticker = all_tickers.get(symbol)
if ticker:
change_percent = abs(ticker.get('changePercent', 0))
volume = ticker.get('volume', 0)
if (change_percent >= config.TRADING_CONFIG['MIN_CHANGE_PERCENT'] and
volume >= config.TRADING_CONFIG['MIN_VOLUME_24H']):
pre_filtered_symbols.append(symbol)
logger.info(f"初步筛选后,需要详细分析的交易对: {len(pre_filtered_symbols)}")
# 只对符合条件的交易对进行详细分析获取K线和技术指标
# 限制并发数量,避免请求过快
semaphore = asyncio.Semaphore(5) # 最多5个并发请求
async def get_symbol_change_with_limit(symbol):
async with semaphore:
return await self._get_symbol_change(symbol, all_tickers.get(symbol))
tasks = [get_symbol_change_with_limit(symbol) for symbol in pre_filtered_symbols]
results = await asyncio.gather(*tasks, return_exceptions=True) results = await asyncio.gather(*tasks, return_exceptions=True)
# 过滤有效结果 # 过滤有效结果
@ -115,19 +139,24 @@ class MarketScanner:
return top_n return top_n
async def _get_symbol_change(self, symbol: str) -> Optional[Dict]: async def _get_symbol_change(self, symbol: str, ticker_data: Optional[Dict] = None) -> Optional[Dict]:
""" """
获取单个交易对的涨跌幅和技术指标 获取单个交易对的涨跌幅和技术指标
Args: Args:
symbol: 交易对 symbol: 交易对
ticker_data: 可选的24小时行情数据如果已批量获取
Returns: Returns:
包含涨跌幅和技术指标信息的字典 包含涨跌幅和技术指标信息的字典
""" """
try: try:
# 获取24小时行情数据 # 如果已有批量获取的数据,直接使用;否则单独获取
if ticker_data:
ticker = ticker_data
else:
ticker = await self.client.get_ticker_24h(symbol) ticker = await self.client.get_ticker_24h(symbol)
if not ticker: if not ticker:
return None return None