""" 币安客户端封装 - 提供异步交易接口 """ import asyncio import logging from typing import Dict, List, Optional, Any from binance import AsyncClient, BinanceSocketManager from binance.exceptions import BinanceAPIException try: from .unicorn_websocket import UnicornWebSocketManager except ImportError: from unicorn_websocket import UnicornWebSocketManager try: from . import config except ImportError: import config logger = logging.getLogger(__name__) class BinanceClient: """币安客户端封装类""" def __init__(self, api_key: str = None, api_secret: str = None, testnet: bool = False): """ 初始化币安客户端 Args: api_key: API密钥 api_secret: API密钥 testnet: 是否使用测试网 """ self.api_key = api_key or config.BINANCE_API_KEY self.api_secret = api_secret or config.BINANCE_API_SECRET self.testnet = testnet or config.USE_TESTNET self.client: Optional[AsyncClient] = None self.socket_manager: Optional[BinanceSocketManager] = None self.unicorn_manager: Optional[UnicornWebSocketManager] = None self.use_unicorn = config.TRADING_CONFIG.get('USE_UNICORN_WEBSOCKET', True) self._symbol_info_cache: Dict[str, Dict] = {} # 缓存交易对信息 async def connect(self, timeout: int = None, retries: int = None): """ 连接币安API Args: timeout: 连接超时时间(秒),默认从config读取 retries: 重试次数,默认从config读取 """ timeout = timeout or config.CONNECTION_TIMEOUT retries = retries or config.CONNECTION_RETRIES last_error = None for attempt in range(retries): try: logger.info( f"尝试连接币安API (第 {attempt + 1}/{retries} 次, " f"测试网: {self.testnet}, 超时: {timeout}秒)..." ) # 创建客户端 self.client = await AsyncClient.create( api_key=self.api_key, api_secret=self.api_secret, testnet=self.testnet ) # 测试连接(带超时) try: await asyncio.wait_for(self.client.ping(), timeout=timeout) except asyncio.TimeoutError: await self.client.close_connection() raise asyncio.TimeoutError(f"ping超时 ({timeout}秒)") self.socket_manager = BinanceSocketManager(self.client) logger.info(f"✓ 币安客户端连接成功 (测试网: {self.testnet})") # 启动Unicorn WebSocket(如果启用) if self.use_unicorn: try: self.unicorn_manager = UnicornWebSocketManager(testnet=self.testnet) if self.unicorn_manager.start(): logger.info("✓ Unicorn WebSocket管理器启动成功") # 启动异步数据处理 asyncio.create_task(self.unicorn_manager.process_stream_data()) else: logger.warning("Unicorn WebSocket管理器启动失败,将使用标准WebSocket") self.unicorn_manager = None except Exception as e: logger.warning(f"启动Unicorn WebSocket失败: {e},将使用标准WebSocket") self.unicorn_manager = None # 验证API密钥权限 await self._verify_api_permissions() return except asyncio.TimeoutError as e: last_error = f"连接超时: {e}" logger.warning(f"连接超时,剩余 {retries - attempt - 1} 次重试机会") if attempt < retries - 1: await asyncio.sleep(2) # 等待2秒后重试 except Exception as e: last_error = str(e) logger.warning(f"连接失败: {e},剩余 {retries - attempt - 1} 次重试机会") if self.client: try: await self.client.close_connection() except: pass if attempt < retries - 1: await asyncio.sleep(2) error_msg = f"连接币安API失败 (已重试 {retries} 次): {last_error}" logger.error("=" * 60) logger.error(error_msg) logger.error("=" * 60) logger.error("故障排查建议:") logger.error("1. 检查网络连接是否正常") logger.error("2. 检查API密钥是否正确") logger.error("3. 如果在中国大陆,可能需要使用代理或VPN") if self.testnet: logger.error("4. 测试网地址可能无法访问,尝试设置 USE_TESTNET=False") logger.error("5. 检查防火墙设置") logger.error("=" * 60) raise ConnectionError(error_msg) async def _verify_api_permissions(self): """ 验证API密钥权限 """ try: # 尝试获取账户信息来验证权限 await self.client.futures_account() logger.info("✓ API密钥权限验证通过") except BinanceAPIException as e: error_code = e.code if hasattr(e, 'code') else None if error_code == -2015: logger.warning("⚠ API密钥权限验证失败,可能无法进行合约交易") logger.warning("请检查API密钥是否启用了合约交易权限") else: logger.warning(f"⚠ API密钥验证时出现错误: {e}") async def disconnect(self): """断开连接""" # 停止Unicorn WebSocket if self.unicorn_manager: self.unicorn_manager.stop() self.unicorn_manager = None if self.client: await self.client.close_connection() logger.info("币安客户端已断开连接") async def get_all_usdt_pairs(self) -> List[str]: """ 获取所有USDT交易对 Returns: USDT交易对列表 """ try: # 获取合约市场信息 exchange_info = await self.client.futures_exchange_info() usdt_pairs = [ symbol['symbol'] for symbol in exchange_info['symbols'] if symbol['symbol'].endswith('USDT') and symbol['status'] == 'TRADING' and symbol.get('contractType') == 'PERPETUAL' # U本位永续合约 ] logger.info(f"获取到 {len(usdt_pairs)} 个USDT永续合约交易对") return usdt_pairs except BinanceAPIException as e: logger.error(f"获取交易对失败: {e}") return [] async def get_klines(self, symbol: str, interval: str = '5m', limit: int = 2) -> List[List]: """ 获取K线数据(合约市场) Args: symbol: 交易对 interval: K线周期 limit: 获取数量 Returns: K线数据列表 """ try: klines = await self.client.futures_klines(symbol=symbol, interval=interval, limit=limit) return klines except BinanceAPIException as e: logger.error(f"获取 {symbol} K线数据失败: {e}") return [] async def get_ticker_24h(self, symbol: str) -> Optional[Dict]: """ 获取24小时行情数据(合约市场) Args: symbol: 交易对 Returns: 24小时行情数据 """ try: ticker = await self.client.futures_symbol_ticker(symbol=symbol) stats = await self.client.futures_ticker(symbol=symbol) return { 'symbol': symbol, 'price': float(ticker['price']), 'volume': float(stats.get('quoteVolume', 0)), 'changePercent': float(stats.get('priceChangePercent', 0)) } except BinanceAPIException as e: logger.error(f"获取 {symbol} 24小时行情失败: {e}") return None async def get_account_balance(self) -> Dict[str, float]: """ 获取U本位合约账户余额 Returns: 账户余额字典 {'available': 可用余额, 'total': 总余额} """ try: account = await self.client.futures_account() assets = account.get('assets', []) usdt_asset = next((a for a in assets if a['asset'] == 'USDT'), None) if usdt_asset: return { 'available': float(usdt_asset['availableBalance']), 'total': float(usdt_asset['walletBalance']), 'margin': float(usdt_asset['marginBalance']) } return {'available': 0.0, 'total': 0.0, 'margin': 0.0} except BinanceAPIException as e: error_code = e.code if hasattr(e, 'code') else None error_msg = str(e) logger.error("=" * 60) logger.error(f"获取账户余额失败: {error_msg}") if error_code == -2015: logger.error("=" * 60) logger.error("API密钥权限错误 (错误代码: -2015)") logger.error("可能的原因:") logger.error("1. API密钥无效或已过期") logger.error("2. API密钥没有合约交易权限") logger.error("3. IP地址未添加到API密钥白名单") logger.error("4. 测试网/生产网环境不匹配") logger.error("=" * 60) logger.error("解决方案:") logger.error("1. 登录币安账户,检查API密钥状态") logger.error("2. 确保API密钥已启用'合约交易'权限") logger.error("3. 如果设置了IP白名单,请添加当前服务器IP") logger.error("4. 检查 USE_TESTNET 配置是否正确") logger.error(f" 当前配置: USE_TESTNET = {self.testnet}") logger.error("=" * 60) elif error_code == -1022: logger.error("签名错误,请检查API密钥和密钥是否正确") elif error_code == -2010: logger.error("账户余额不足") else: logger.error(f"错误代码: {error_code}") return {'available': 0.0, 'total': 0.0, 'margin': 0.0} async def get_open_positions(self) -> List[Dict]: """ 获取当前持仓 Returns: 持仓列表 """ try: positions = await self.client.futures_position_information() open_positions = [ { 'symbol': pos['symbol'], 'positionAmt': float(pos['positionAmt']), 'entryPrice': float(pos['entryPrice']), 'markPrice': float(pos.get('markPrice', 0)), 'unRealizedProfit': float(pos['unRealizedProfit']), 'leverage': int(pos['leverage']) } for pos in positions if float(pos['positionAmt']) != 0 ] return open_positions except BinanceAPIException as e: logger.error(f"获取持仓信息失败: {e}") return [] async def get_symbol_info(self, symbol: str) -> Optional[Dict]: """ 获取交易对的精度和限制信息 Args: symbol: 交易对 Returns: 交易对信息字典,包含 quantityPrecision, minQty, stepSize 等 """ # 先检查缓存 if symbol in self._symbol_info_cache: return self._symbol_info_cache[symbol] try: exchange_info = await self.client.futures_exchange_info() for s in exchange_info['symbols']: if s['symbol'] == symbol: # 提取数量精度信息 quantity_precision = s.get('quantityPrecision', 8) # 从filters中提取minQty和stepSize min_qty = None step_size = None for f in s.get('filters', []): if f['filterType'] == 'LOT_SIZE': min_qty = float(f.get('minQty', 0)) step_size = float(f.get('stepSize', 0)) break info = { 'quantityPrecision': quantity_precision, 'minQty': min_qty or 0, 'stepSize': step_size or 0 } # 缓存信息 self._symbol_info_cache[symbol] = info logger.debug(f"获取 {symbol} 精度信息: {info}") return info logger.warning(f"未找到交易对 {symbol} 的信息") return None except Exception as e: logger.error(f"获取 {symbol} 交易对信息失败: {e}") return None def _adjust_quantity_precision(self, quantity: float, symbol_info: Dict) -> float: """ 调整数量精度,使其符合币安要求 Args: quantity: 原始数量 symbol_info: 交易对信息 Returns: 调整后的数量 """ if not symbol_info: # 如果没有交易对信息,使用默认精度(3位小数) return round(quantity, 3) quantity_precision = symbol_info.get('quantityPrecision', 8) step_size = symbol_info.get('stepSize', 0) min_qty = symbol_info.get('minQty', 0) # 如果有stepSize,按照stepSize调整 if step_size > 0: # 向下取整到stepSize的倍数(使用浮点数除法) adjusted = float(int(quantity / step_size)) * step_size else: # 否则按照精度调整 adjusted = round(quantity, quantity_precision) # 确保不小于最小数量 if min_qty > 0 and adjusted < min_qty: # 如果小于最小数量,尝试向上取整到最小数量 if step_size > 0: adjusted = min_qty else: adjusted = round(min_qty, quantity_precision) logger.warning(f"数量 {quantity} 小于最小数量 {min_qty},调整为 {adjusted}") # 最终精度调整 adjusted = round(adjusted, quantity_precision) if adjusted != quantity: logger.info(f"数量精度调整: {quantity} -> {adjusted} (精度: {quantity_precision}, stepSize: {step_size}, minQty: {min_qty})") return adjusted async def place_order( self, symbol: str, side: str, quantity: float, order_type: str = 'MARKET', price: Optional[float] = None ) -> Optional[Dict]: """ 下单 Args: symbol: 交易对 side: 方向 'BUY' 或 'SELL' quantity: 数量 order_type: 订单类型 'MARKET' 或 'LIMIT' price: 限价单价格 Returns: 订单信息 """ try: # 获取交易对精度信息并调整数量 symbol_info = await self.get_symbol_info(symbol) adjusted_quantity = self._adjust_quantity_precision(quantity, symbol_info) if adjusted_quantity <= 0: logger.error(f"调整后的数量无效: {adjusted_quantity} (原始: {quantity})") return None logger.info(f"下单: {symbol} {side} {adjusted_quantity} (原始: {quantity}) @ {order_type}") if order_type == 'MARKET': order = await self.client.futures_create_order( symbol=symbol, side=side, type='MARKET', quantity=adjusted_quantity ) else: if price is None: raise ValueError("限价单必须指定价格") order = await self.client.futures_create_order( symbol=symbol, side=side, type='LIMIT', timeInForce='GTC', quantity=adjusted_quantity, price=price ) logger.info(f"下单成功: {symbol} {side} {adjusted_quantity} @ {order_type}") return order except BinanceAPIException as e: error_code = e.code if hasattr(e, 'code') else None if error_code == -1111: logger.error(f"下单失败 {symbol} {side}: 精度错误 - {e}") logger.error(f" 原始数量: {quantity}") if symbol_info: logger.error(f" 交易对精度: {symbol_info}") else: logger.error(f"下单失败 {symbol} {side}: {e}") return None async def cancel_order(self, symbol: str, order_id: int) -> bool: """ 取消订单 Args: symbol: 交易对 order_id: 订单ID Returns: 是否成功 """ try: await self.client.futures_cancel_order(symbol=symbol, orderId=order_id) logger.info(f"取消订单成功: {symbol} {order_id}") return True except BinanceAPIException as e: logger.error(f"取消订单失败: {e}") return False async def set_leverage(self, symbol: str, leverage: int = 10) -> bool: """ 设置杠杆倍数 Args: symbol: 交易对 leverage: 杠杆倍数 Returns: 是否成功 """ try: await self.client.futures_change_leverage(symbol=symbol, leverage=leverage) logger.info(f"设置杠杆成功: {symbol} {leverage}x") return True except BinanceAPIException as e: logger.error(f"设置杠杆失败: {e}") return False def subscribe_realtime_prices(self, symbols: List[str], callback) -> bool: """ 订阅实时价格流(使用Unicorn) Args: symbols: 交易对列表 callback: 价格更新回调函数 callback(symbol, price, price_data) Returns: 是否成功 """ if not self.unicorn_manager: logger.warning("Unicorn WebSocket未启用,无法订阅实时价格") return False try: self.unicorn_manager.subscribe_ticker(symbols, callback) logger.info(f"订阅 {len(symbols)} 个交易对的实时价格流") return True except Exception as e: logger.error(f"订阅实时价格流失败: {e}") return False def get_realtime_price(self, symbol: str) -> Optional[float]: """ 获取实时价格(从Unicorn WebSocket) Args: symbol: 交易对 Returns: 实时价格,如果未订阅则返回None """ if self.unicorn_manager: return self.unicorn_manager.get_realtime_price(symbol) return None