This commit is contained in:
薇薇安 2026-01-14 12:03:26 +08:00
parent 1bfa7c18ef
commit 043276bb81
3 changed files with 108 additions and 16 deletions

View File

@ -42,6 +42,8 @@ class BinanceClient:
self._last_request_time = {} # 记录每个API端点的最后请求时间 self._last_request_time = {} # 记录每个API端点的最后请求时间
self._request_delay = 0.1 # 请求间隔(秒),避免频率限制 self._request_delay = 0.1 # 请求间隔(秒),避免频率限制
self._semaphore = asyncio.Semaphore(10) # 限制并发请求数 self._semaphore = asyncio.Semaphore(10) # 限制并发请求数
self._price_cache: Dict[str, Dict] = {} # WebSocket价格缓存 {symbol: {price, volume, changePercent, timestamp}}
self._price_cache_ttl = 60 # 价格缓存有效期(秒)
async def connect(self, timeout: int = None, retries: int = None): async def connect(self, timeout: int = None, retries: int = None):
""" """
@ -87,6 +89,7 @@ class BinanceClient:
logger.info("✓ Unicorn WebSocket管理器启动成功") logger.info("✓ Unicorn WebSocket管理器启动成功")
# 启动异步数据处理 # 启动异步数据处理
asyncio.create_task(self.unicorn_manager.process_stream_data()) asyncio.create_task(self.unicorn_manager.process_stream_data())
logger.info("WebSocket价格缓存已启用将在订阅交易对后自动更新")
else: else:
logger.warning("Unicorn WebSocket管理器启动失败将使用标准WebSocket") logger.warning("Unicorn WebSocket管理器启动失败将使用标准WebSocket")
self.unicorn_manager = None self.unicorn_manager = None
@ -223,9 +226,45 @@ class BinanceClient:
logger.error(f"获取 {symbol} K线数据失败: {e}") logger.error(f"获取 {symbol} K线数据失败: {e}")
return [] return []
def _on_price_update(self, symbol: str, price: float = None, price_data: Dict = None):
"""
WebSocket价格更新回调自动更新缓存
Args:
symbol: 交易对
price: 最新价格可选
price_data: 完整的ticker数据可选
"""
import time
try:
# 如果提供了price_data优先使用它
if price_data and isinstance(price_data, dict):
# 从price_data中提取信息
current_price = float(price_data.get('c', price_data.get('lastPrice', price or 0)))
volume = float(price_data.get('v', price_data.get('quoteVolume', 0)))
change_percent = float(price_data.get('P', price_data.get('priceChangePercent', 0)))
elif price:
# 如果只有价格,使用价格
current_price = price
volume = 0
change_percent = 0
else:
return
self._price_cache[symbol] = {
'symbol': symbol,
'price': current_price,
'volume': volume,
'changePercent': change_percent,
'timestamp': time.time()
}
except Exception as e:
logger.debug(f"更新价格缓存失败 {symbol}: {e}")
async def get_ticker_24h(self, symbol: str) -> Optional[Dict]: async def get_ticker_24h(self, symbol: str) -> Optional[Dict]:
""" """
获取24小时行情数据合约市场 获取24小时行情数据合约市场
优先从WebSocket缓存读取如果缓存不可用或过期则使用REST API
Args: Args:
symbol: 交易对 symbol: 交易对
@ -233,6 +272,22 @@ class BinanceClient:
Returns: Returns:
24小时行情数据 24小时行情数据
""" """
import time
# 优先从WebSocket缓存读取
if symbol in self._price_cache:
cached = self._price_cache[symbol]
cache_age = time.time() - cached.get('timestamp', 0)
if cache_age < self._price_cache_ttl:
logger.debug(f"从WebSocket缓存获取 {symbol} 价格")
return {
'symbol': symbol,
'price': cached['price'],
'volume': cached.get('volume', 0),
'changePercent': cached.get('changePercent', 0)
}
# 如果缓存不可用或过期使用REST APIfallback
try: try:
ticker = await self._rate_limited_request( ticker = await self._rate_limited_request(
f'ticker_{symbol}', f'ticker_{symbol}',
@ -242,12 +297,20 @@ class BinanceClient:
f'stats_{symbol}', f'stats_{symbol}',
self.client.futures_ticker(symbol=symbol) self.client.futures_ticker(symbol=symbol)
) )
return { result = {
'symbol': symbol, 'symbol': symbol,
'price': float(ticker['price']), 'price': float(ticker['price']),
'volume': float(stats.get('quoteVolume', 0)), 'volume': float(stats.get('quoteVolume', 0)),
'changePercent': float(stats.get('priceChangePercent', 0)) 'changePercent': float(stats.get('priceChangePercent', 0))
} }
# 更新缓存
if self.unicorn_manager:
import time
self._price_cache[symbol] = {
**result,
'timestamp': time.time()
}
return result
except BinanceAPIException as e: except BinanceAPIException as e:
error_code = e.code if hasattr(e, 'code') else None error_code = e.code if hasattr(e, 'code') else None
if error_code == -1003: if error_code == -1003:
@ -599,13 +662,14 @@ class BinanceClient:
logger.error(f"设置杠杆失败: {e}") logger.error(f"设置杠杆失败: {e}")
return False return False
def subscribe_realtime_prices(self, symbols: List[str], callback) -> bool: def subscribe_realtime_prices(self, symbols: List[str], callback = None) -> bool:
""" """
订阅实时价格流使用Unicorn 订阅实时价格流使用Unicorn
Args: Args:
symbols: 交易对列表 symbols: 交易对列表
callback: 价格更新回调函数 callback(symbol, price, price_data) callback: 可选的价格更新回调函数 callback(symbol, price, price_data)
如果不提供将使用默认的缓存更新回调
Returns: Returns:
是否成功 是否成功
@ -615,6 +679,10 @@ class BinanceClient:
return False return False
try: try:
# 如果没有提供回调,使用默认的缓存更新回调
if callback is None:
callback = self._on_price_update
self.unicorn_manager.subscribe_ticker(symbols, callback) self.unicorn_manager.subscribe_ticker(symbols, callback)
logger.info(f"订阅 {len(symbols)} 个交易对的实时价格流") logger.info(f"订阅 {len(symbols)} 个交易对的实时价格流")
return True return True

View File

@ -145,7 +145,25 @@ async def main():
logger.error("请先充值到合约账户") logger.error("请先充值到合约账户")
return return
# 3. 初始化各个模块 # 3. 订阅所有USDT交易对的实时价格流WebSocket优化
if client.unicorn_manager:
logger.info("订阅所有USDT交易对的实时价格流WebSocket...")
try:
all_pairs = await client.get_all_usdt_pairs()
if all_pairs:
# 订阅前100个最活跃的交易对避免订阅过多
# 可以根据需要调整数量
max_subscribe = 100
pairs_to_subscribe = all_pairs[:max_subscribe]
if client.subscribe_realtime_prices(pairs_to_subscribe):
logger.info(f"✓ 已订阅 {len(pairs_to_subscribe)} 个交易对的实时价格流")
logger.info("价格数据将通过WebSocket实时更新减少REST API调用")
else:
logger.warning("订阅实时价格流失败将使用REST API")
except Exception as e:
logger.warning(f"订阅实时价格流时出错: {e}将使用REST API作为fallback")
# 4. 初始化各个模块
logger.info("初始化交易模块...") logger.info("初始化交易模块...")
scanner = MarketScanner(client) scanner = MarketScanner(client)
risk_manager = RiskManager(client) risk_manager = RiskManager(client)

View File

@ -261,16 +261,22 @@ class UnicornWebSocketManager:
# 处理ticker数据 # 处理ticker数据
if 'event_type' in data and data['event_type'] == '24hrTicker': if 'event_type' in data and data['event_type'] == '24hrTicker':
price_data = data.get('data', {}) price_data = data.get('data', {})
if 'c' in price_data: # 最新价格 if price_data:
price = float(price_data['c']) # 提取交易对符号从stream名称或数据中
stream_symbol = symbol
if not stream_symbol and 's' in price_data:
stream_symbol = price_data['s'] # 交易对符号
if stream_symbol:
price = float(price_data.get('c', price_data.get('lastPrice', 0)))
# 调用所有注册的回调 # 调用所有注册的回调
if symbol in self.price_callbacks: if stream_symbol in self.price_callbacks:
for callback in self.price_callbacks[symbol]: for callback in self.price_callbacks[stream_symbol]:
try: try:
if asyncio.iscoroutinefunction(callback): if asyncio.iscoroutinefunction(callback):
await callback(symbol, price, price_data) await callback(stream_symbol, price, price_data)
else: else:
callback(symbol, price, price_data) callback(stream_symbol, price, price_data)
except Exception as e: except Exception as e:
logger.debug(f"回调函数执行失败: {e}") logger.debug(f"回调函数执行失败: {e}")