auto_trade_sys/trading_system/binance_client.py
薇薇安 ba15992aeb a
2026-01-16 14:53:44 +08:00

873 lines
37 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
币安客户端封装 - 提供异步交易接口
"""
import asyncio
import logging
from typing import Dict, List, Optional, Any
from binance import AsyncClient, BinanceSocketManager
from binance.exceptions import BinanceAPIException
try:
from . import config
from .redis_cache import RedisCache
except ImportError:
import config
from redis_cache import RedisCache
logger = logging.getLogger(__name__)
class BinanceClient:
"""币安客户端封装类"""
def __init__(self, api_key: str = None, api_secret: str = None, testnet: bool = False):
"""
初始化币安客户端
Args:
api_key: API密钥
api_secret: API密钥
testnet: 是否使用测试网
"""
self.api_key = api_key or config.BINANCE_API_KEY
self.api_secret = api_secret or config.BINANCE_API_SECRET
self.testnet = testnet or config.USE_TESTNET
self.client: Optional[AsyncClient] = None
self.socket_manager: Optional[BinanceSocketManager] = None
self._symbol_info_cache: Dict[str, Dict] = {} # 缓存交易对信息
self._last_request_time = {} # 记录每个API端点的最后请求时间
self._request_delay = 0.1 # 请求间隔(秒),避免频率限制
self._semaphore = asyncio.Semaphore(10) # 限制并发请求数
self._price_cache: Dict[str, Dict] = {} # WebSocket价格缓存 {symbol: {price, volume, changePercent, timestamp}}
self._price_cache_ttl = 60 # 价格缓存有效期(秒)
# 初始化 Redis 缓存
self.redis_cache = RedisCache(
redis_url=config.REDIS_URL,
use_tls=config.REDIS_USE_TLS,
ssl_cert_reqs=config.REDIS_SSL_CERT_REQS,
ssl_ca_certs=config.REDIS_SSL_CA_CERTS,
username=config.REDIS_USERNAME,
password=config.REDIS_PASSWORD
)
async def connect(self, timeout: int = None, retries: int = None):
"""
连接币安API
Args:
timeout: 连接超时时间默认从config读取
retries: 重试次数默认从config读取
"""
timeout = timeout or config.CONNECTION_TIMEOUT
retries = retries or config.CONNECTION_RETRIES
last_error = None
for attempt in range(retries):
try:
logger.info(
f"尝试连接币安API (第 {attempt + 1}/{retries} 次, "
f"测试网: {self.testnet}, 超时: {timeout}秒)..."
)
# 创建客户端
self.client = await AsyncClient.create(
api_key=self.api_key,
api_secret=self.api_secret,
testnet=self.testnet
)
# 测试连接(带超时)
try:
await asyncio.wait_for(self.client.ping(), timeout=timeout)
except asyncio.TimeoutError:
await self.client.close_connection()
raise asyncio.TimeoutError(f"ping超时 ({timeout}秒)")
self.socket_manager = BinanceSocketManager(self.client)
logger.info(f"✓ 币安客户端连接成功 (测试网: {self.testnet})")
# 连接 Redis 缓存
await self.redis_cache.connect()
# 验证API密钥权限
await self._verify_api_permissions()
return
except asyncio.TimeoutError as e:
last_error = f"连接超时: {e}"
logger.warning(f"连接超时,剩余 {retries - attempt - 1} 次重试机会")
if attempt < retries - 1:
await asyncio.sleep(2) # 等待2秒后重试
except Exception as e:
last_error = str(e)
logger.warning(f"连接失败: {e},剩余 {retries - attempt - 1} 次重试机会")
if self.client:
try:
await self.client.close_connection()
except:
pass
if attempt < retries - 1:
await asyncio.sleep(2)
error_msg = f"连接币安API失败 (已重试 {retries} 次): {last_error}"
logger.error("=" * 60)
logger.error(error_msg)
logger.error("=" * 60)
logger.error("故障排查建议:")
logger.error("1. 检查网络连接是否正常")
logger.error("2. 检查API密钥是否正确")
logger.error("3. 如果在中国大陆可能需要使用代理或VPN")
if self.testnet:
logger.error("4. 测试网地址可能无法访问,尝试设置 USE_TESTNET=False")
logger.error("5. 检查防火墙设置")
logger.error("=" * 60)
raise ConnectionError(error_msg)
async def _verify_api_permissions(self):
"""
验证API密钥权限
"""
try:
# 尝试获取账户信息来验证权限
await self.client.futures_account()
logger.info("✓ API密钥权限验证通过")
except BinanceAPIException as e:
error_code = e.code if hasattr(e, 'code') else None
if error_code == -2015:
logger.warning("⚠ API密钥权限验证失败可能无法进行合约交易")
logger.warning("请检查API密钥是否启用了合约交易权限")
else:
logger.warning(f"⚠ API密钥验证时出现错误: {e}")
async def disconnect(self):
"""断开连接"""
# 关闭 Redis 连接
await self.redis_cache.close()
if self.client:
await self.client.close_connection()
logger.info("币安客户端已断开连接")
async def get_all_usdt_pairs(self) -> List[str]:
"""
获取所有USDT交易对
Returns:
USDT交易对列表
"""
try:
# 获取合约市场信息
exchange_info = await self.client.futures_exchange_info()
usdt_pairs = [
symbol['symbol']
for symbol in exchange_info['symbols']
if symbol['symbol'].endswith('USDT')
and symbol['status'] == 'TRADING'
and symbol.get('contractType') == 'PERPETUAL' # U本位永续合约
]
logger.info(f"获取到 {len(usdt_pairs)} 个USDT永续合约交易对")
return usdt_pairs
except BinanceAPIException as e:
logger.error(f"获取交易对失败: {e}")
return []
async def _rate_limited_request(self, endpoint: str, coro):
"""
带速率限制的API请求
Args:
endpoint: API端点标识用于记录请求时间
coro: 异步协程
"""
async with self._semaphore:
# 检查是否需要等待(避免请求过快)
if endpoint in self._last_request_time:
elapsed = asyncio.get_event_loop().time() - self._last_request_time[endpoint]
if elapsed < self._request_delay:
await asyncio.sleep(self._request_delay - elapsed)
self._last_request_time[endpoint] = asyncio.get_event_loop().time()
return await coro
async def get_klines(self, symbol: str, interval: str = '5m', limit: int = 2) -> List[List]:
"""
获取K线数据合约市场
优先从 Redis 缓存读取,如果缓存不可用或过期则使用 REST API
Args:
symbol: 交易对
interval: K线周期
limit: 获取数量
Returns:
K线数据列表
"""
# 先查 Redis 缓存
cache_key = f"klines:{symbol}:{interval}:{limit}"
cached = await self.redis_cache.get(cache_key)
if cached:
logger.debug(f"从缓存获取 {symbol} K线数据: {interval} x{limit}")
return cached
try:
# 缓存未命中,调用 API
klines = await self._rate_limited_request(
f'klines_{symbol}_{interval}',
self.client.futures_klines(symbol=symbol, interval=interval, limit=limit)
)
# 写入 Redis 缓存(根据 interval 动态设置 TTL
if klines:
# TTL 设置1m=10秒, 5m=30秒, 15m=1分钟, 1h=5分钟, 4h=15分钟, 1d=1小时
ttl_map = {
'1m': 10,
'3m': 20,
'5m': 30,
'15m': 60,
'30m': 120,
'1h': 300,
'2h': 600,
'4h': 900,
'6h': 1200,
'8h': 1800,
'12h': 2400,
'1d': 3600
}
ttl = ttl_map.get(interval, 300) # 默认 5 分钟
await self.redis_cache.set(cache_key, klines, ttl=ttl)
logger.debug(f"已缓存 {symbol} K线数据: {interval} x{limit} (TTL: {ttl}秒)")
return klines
except BinanceAPIException as e:
error_code = e.code if hasattr(e, 'code') else None
if error_code == -1003:
logger.warning(f"获取 {symbol} K线数据失败: API请求频率过高建议使用WebSocket或增加扫描间隔")
else:
logger.error(f"获取 {symbol} K线数据失败: {e}")
return []
async def get_ticker_24h(self, symbol: str) -> Optional[Dict]:
"""
获取24小时行情数据合约市场
优先从WebSocket缓存读取其次从Redis缓存读取最后使用REST API
Args:
symbol: 交易对
Returns:
24小时行情数据
"""
import time
# 1. 优先从WebSocket缓存读取
if symbol in self._price_cache:
cached = self._price_cache[symbol]
cache_age = time.time() - cached.get('timestamp', 0)
if cache_age < self._price_cache_ttl:
logger.debug(f"从WebSocket缓存获取 {symbol} 价格: {cached['price']:.8f} (缓存年龄: {cache_age:.1f}秒)")
return {
'symbol': symbol,
'price': cached['price'],
'volume': cached.get('volume', 0),
'changePercent': cached.get('changePercent', 0)
}
else:
logger.debug(f"{symbol} WebSocket缓存已过期 ({cache_age:.1f}秒 > {self._price_cache_ttl}秒)")
# 2. 从 Redis 缓存读取
cache_key = f"ticker_24h:{symbol}"
cached = await self.redis_cache.get(cache_key)
if cached:
logger.debug(f"从Redis缓存获取 {symbol} 24小时行情数据")
return cached
# 3. 如果缓存不可用或过期使用REST APIfallback
logger.debug(f"{symbol} 未在缓存中使用REST API获取")
try:
ticker = await self._rate_limited_request(
f'ticker_{symbol}',
self.client.futures_symbol_ticker(symbol=symbol)
)
stats = await self._rate_limited_request(
f'stats_{symbol}',
self.client.futures_ticker(symbol=symbol)
)
result = {
'symbol': symbol,
'price': float(ticker['price']),
'volume': float(stats.get('quoteVolume', 0)),
'changePercent': float(stats.get('priceChangePercent', 0))
}
# 更新 WebSocket 缓存
self._price_cache[symbol] = {
**result,
'timestamp': time.time()
}
# 写入 Redis 缓存TTL: 30秒
await self.redis_cache.set(cache_key, result, ttl=30)
return result
except BinanceAPIException as e:
error_code = e.code if hasattr(e, 'code') else None
if error_code == -1003:
logger.warning(f"获取 {symbol} 24小时行情失败: API请求频率过高建议使用WebSocket或增加扫描间隔")
else:
logger.error(f"获取 {symbol} 24小时行情失败: {e}")
return None
async def get_all_tickers_24h(self) -> Dict[str, Dict]:
"""
批量获取所有交易对的24小时行情数据更高效
优先从 Redis 缓存读取,如果缓存不可用或过期则使用 REST API
Returns:
交易对行情数据字典 {symbol: {price, volume, changePercent}}
"""
# 先查 Redis 缓存
cache_key = "ticker_24h:all"
cached = await self.redis_cache.get(cache_key)
if cached:
logger.debug(f"从Redis缓存获取所有交易对的24小时行情数据: {len(cached)} 个交易对")
return cached
try:
# 使用批量API一次获取所有交易对的数据
tickers = await self._rate_limited_request(
'all_tickers',
self.client.futures_ticker()
)
result = {}
for ticker in tickers:
symbol = ticker['symbol']
if symbol.endswith('USDT'):
result[symbol] = {
'symbol': symbol,
'price': float(ticker.get('lastPrice', 0)),
'volume': float(ticker.get('quoteVolume', 0)),
'changePercent': float(ticker.get('priceChangePercent', 0))
}
# 写入 Redis 缓存TTL: 30秒
await self.redis_cache.set(cache_key, result, ttl=30)
logger.debug(f"批量获取到 {len(result)} 个交易对的24小时行情数据已缓存")
return result
except BinanceAPIException as e:
error_code = e.code if hasattr(e, 'code') else None
if error_code == -1003:
logger.warning(f"批量获取24小时行情失败: API请求频率过高建议使用WebSocket或增加扫描间隔")
else:
logger.error(f"批量获取24小时行情失败: {e}")
return {}
async def get_account_balance(self) -> Dict[str, float]:
"""
获取U本位合约账户余额
Returns:
账户余额字典 {'available': 可用余额, 'total': 总余额}
"""
try:
account = await self.client.futures_account()
assets = account.get('assets', [])
usdt_asset = next((a for a in assets if a['asset'] == 'USDT'), None)
if usdt_asset:
return {
'available': float(usdt_asset['availableBalance']),
'total': float(usdt_asset['walletBalance']),
'margin': float(usdt_asset['marginBalance'])
}
return {'available': 0.0, 'total': 0.0, 'margin': 0.0}
except BinanceAPIException as e:
error_code = e.code if hasattr(e, 'code') else None
error_msg = str(e)
logger.error("=" * 60)
logger.error(f"获取账户余额失败: {error_msg}")
if error_code == -2015:
logger.error("=" * 60)
logger.error("API密钥权限错误 (错误代码: -2015)")
logger.error("可能的原因:")
logger.error("1. API密钥无效或已过期")
logger.error("2. API密钥没有合约交易权限")
logger.error("3. IP地址未添加到API密钥白名单")
logger.error("4. 测试网/生产网环境不匹配")
logger.error("=" * 60)
logger.error("解决方案:")
logger.error("1. 登录币安账户检查API密钥状态")
logger.error("2. 确保API密钥已启用'合约交易'权限")
logger.error("3. 如果设置了IP白名单请添加当前服务器IP")
logger.error("4. 检查 USE_TESTNET 配置是否正确")
logger.error(f" 当前配置: USE_TESTNET = {self.testnet}")
logger.error("=" * 60)
elif error_code == -1022:
logger.error("签名错误请检查API密钥和密钥是否正确")
elif error_code == -2010:
logger.error("账户余额不足")
else:
logger.error(f"错误代码: {error_code}")
return {'available': 0.0, 'total': 0.0, 'margin': 0.0}
async def get_open_positions(self) -> List[Dict]:
"""
获取当前持仓
Returns:
持仓列表
"""
try:
positions = await self.client.futures_position_information()
open_positions = [
{
'symbol': pos['symbol'],
'positionAmt': float(pos['positionAmt']),
'entryPrice': float(pos['entryPrice']),
'markPrice': float(pos.get('markPrice', 0)),
'unRealizedProfit': float(pos['unRealizedProfit']),
'leverage': int(pos['leverage'])
}
for pos in positions
if float(pos['positionAmt']) != 0
]
return open_positions
except BinanceAPIException as e:
logger.error(f"获取持仓信息失败: {e}")
return []
async def get_symbol_info(self, symbol: str) -> Optional[Dict]:
"""
获取交易对的精度和限制信息
优先从 Redis 缓存读取,如果缓存不可用或过期则使用 REST API
Args:
symbol: 交易对
Returns:
交易对信息字典,包含 quantityPrecision, minQty, stepSize 等
"""
# 1. 先检查内存缓存
if symbol in self._symbol_info_cache:
return self._symbol_info_cache[symbol]
# 2. 从 Redis 缓存读取
cache_key = f"symbol_info:{symbol}"
cached = await self.redis_cache.get(cache_key)
if cached:
logger.debug(f"从Redis缓存获取 {symbol} 交易对信息")
# 同时更新内存缓存
self._symbol_info_cache[symbol] = cached
return cached
# 3. 缓存未命中,调用 API
try:
exchange_info = await self.client.futures_exchange_info()
for s in exchange_info['symbols']:
if s['symbol'] == symbol:
# 提取数量精度信息
quantity_precision = s.get('quantityPrecision', 8)
# 从filters中提取minQty、stepSize和minNotional
min_qty = None
step_size = None
min_notional = None
for f in s.get('filters', []):
if f['filterType'] == 'LOT_SIZE':
min_qty = float(f.get('minQty', 0))
step_size = float(f.get('stepSize', 0))
elif f['filterType'] == 'MIN_NOTIONAL':
min_notional = float(f.get('notional', 0))
# 如果没有从filters获取到minNotional使用默认值5 USDT
if min_notional is None or min_notional == 0:
min_notional = 5.0
# 获取交易对支持的最大杠杆倍数
# 币安API的exchange_info中可能没有直接的leverageBracket信息
# 我们尝试从leverageBracket获取如果没有则使用默认值
max_leverage_supported = 125 # 币安合约默认最大杠杆
# 尝试从leverageBracket获取如果存在
if s.get('leverageBracket') and len(s.get('leverageBracket', [])) > 0:
max_leverage_supported = s['leverageBracket'][0].get('maxLeverage', 125)
else:
# 如果leverageBracket不存在尝试通过futures_leverage_bracket API获取
# 但为了不增加API调用这里先使用默认值125
# 实际使用时会在设置杠杆时检查,如果失败会自动降低
max_leverage_supported = 125
info = {
'quantityPrecision': quantity_precision,
'minQty': min_qty or 0,
'stepSize': step_size or 0,
'minNotional': min_notional,
'maxLeverage': int(max_leverage_supported) # 交易对支持的最大杠杆
}
# 写入 Redis 缓存TTL: 1小时
await self.redis_cache.set(cache_key, info, ttl=3600)
# 同时更新内存缓存
self._symbol_info_cache[symbol] = info
logger.debug(f"获取 {symbol} 精度信息: {info}")
return info
logger.warning(f"未找到交易对 {symbol} 的信息")
return None
except Exception as e:
logger.error(f"获取 {symbol} 交易对信息失败: {e}")
return None
def _adjust_quantity_precision(self, quantity: float, symbol_info: Dict) -> float:
"""
调整数量精度,使其符合币安要求
Args:
quantity: 原始数量
symbol_info: 交易对信息
Returns:
调整后的数量
"""
if not symbol_info:
# 如果没有交易对信息使用默认精度3位小数
return round(quantity, 3)
quantity_precision = symbol_info.get('quantityPrecision', 8)
step_size = symbol_info.get('stepSize', 0)
min_qty = symbol_info.get('minQty', 0)
# 如果有stepSize按照stepSize调整
if step_size > 0:
# 向下取整到stepSize的倍数使用浮点数除法
adjusted = float(int(quantity / step_size)) * step_size
else:
# 否则按照精度调整
adjusted = round(quantity, quantity_precision)
# 确保不小于最小数量
if min_qty > 0 and adjusted < min_qty:
# 如果小于最小数量,尝试向上取整到最小数量
if step_size > 0:
adjusted = min_qty
else:
adjusted = round(min_qty, quantity_precision)
logger.warning(f"数量 {quantity} 小于最小数量 {min_qty},调整为 {adjusted}")
# 最终精度调整
adjusted = round(adjusted, quantity_precision)
if adjusted != quantity:
logger.info(f"数量精度调整: {quantity} -> {adjusted} (精度: {quantity_precision}, stepSize: {step_size}, minQty: {min_qty})")
return adjusted
def _adjust_quantity_precision_up(self, quantity: float, symbol_info: Dict) -> float:
"""
向上取整调整数量精度,使其符合币安要求
Args:
quantity: 原始数量
symbol_info: 交易对信息
Returns:
调整后的数量(向上取整)
"""
import math
if not symbol_info:
# 如果没有交易对信息向上取整到3位小数
return round(math.ceil(quantity * 1000) / 1000, 3)
quantity_precision = symbol_info.get('quantityPrecision', 8)
step_size = symbol_info.get('stepSize', 0)
min_qty = symbol_info.get('minQty', 0)
# 如果有stepSize按照stepSize向上取整
if step_size > 0:
# 向上取整到stepSize的倍数
adjusted = math.ceil(quantity / step_size) * step_size
else:
# 否则按照精度向上取整
multiplier = 10 ** quantity_precision
adjusted = math.ceil(quantity * multiplier) / multiplier
# 确保不小于最小数量
if min_qty > 0 and adjusted < min_qty:
adjusted = min_qty
# 最终精度调整
adjusted = round(adjusted, quantity_precision)
if adjusted != quantity:
logger.info(f"数量向上取整调整: {quantity} -> {adjusted} (精度: {quantity_precision}, stepSize: {step_size}, minQty: {min_qty})")
return adjusted
async def place_order(
self,
symbol: str,
side: str,
quantity: float,
order_type: str = 'MARKET',
price: Optional[float] = None,
reduce_only: bool = False
) -> Optional[Dict]:
"""
下单
Args:
symbol: 交易对
side: 方向 'BUY''SELL'
quantity: 数量
order_type: 订单类型 'MARKET''LIMIT'
price: 限价单价格
Returns:
订单信息
"""
try:
# 获取交易对精度信息
symbol_info = await self.get_symbol_info(symbol)
# 获取当前价格以计算名义价值
if price is None:
ticker = await self.get_ticker_24h(symbol)
if not ticker:
logger.error(f"无法获取 {symbol} 的价格信息")
return None
current_price = ticker['price']
else:
current_price = price
# 先按原始数量计算名义价值,用于保证金检查
initial_notional_value = quantity * current_price
min_notional = symbol_info.get('minNotional', 5.0) if symbol_info else 5.0
# 调整数量精度(在保证金检查之前)
adjusted_quantity = self._adjust_quantity_precision(quantity, symbol_info)
if adjusted_quantity <= 0:
logger.error(f"调整后的数量无效: {adjusted_quantity} (原始: {quantity})")
return None
# 使用调整后的数量重新计算名义价值
notional_value = adjusted_quantity * current_price
logger.info(f"下单检查: {symbol} {side} {adjusted_quantity} (原始: {quantity}) @ {order_type}")
logger.info(f" 当前价格: {current_price:.4f} USDT")
logger.info(f" 订单名义价值: {notional_value:.2f} USDT")
logger.info(f" 最小名义价值: {min_notional:.2f} USDT")
# 检查名义价值是否满足最小要求
if notional_value < min_notional:
logger.warning(
f"{symbol} 订单名义价值不足: {notional_value:.2f} USDT < "
f"最小要求: {min_notional:.2f} USDT"
)
logger.warning(f" 需要增加数量或提高仓位大小")
return None
# 检查最小保证金要求(避免手续费侵蚀收益)
# 获取当前杠杆(如果无法获取,使用默认值)
current_leverage = config.TRADING_CONFIG.get('LEVERAGE', 10)
try:
# 尝试从持仓信息获取实际使用的杠杆
positions = await self.client.futures_position_information(symbol=symbol)
if positions and len(positions) > 0:
position = positions[0]
if float(position.get('positionAmt', 0)) != 0:
# 有持仓,使用持仓的杠杆
leverage_bracket = position.get('leverage', current_leverage)
if leverage_bracket:
current_leverage = int(leverage_bracket)
except Exception as e:
logger.debug(f"无法获取 {symbol} 的杠杆信息,使用默认值: {current_leverage}x ({e})")
min_margin_usdt = config.TRADING_CONFIG.get('MIN_MARGIN_USDT', 0.5) # 默认0.5 USDT
required_margin = notional_value / current_leverage
if required_margin < min_margin_usdt:
# 如果保证金不足,自动调整到最小保证金要求
required_notional_value = min_margin_usdt * current_leverage
logger.warning(
f"{symbol} 订单保证金不足: {required_margin:.4f} USDT < "
f"最小保证金要求: {min_margin_usdt:.2f} USDT"
)
logger.info(
f" 自动调整订单名义价值: {notional_value:.2f} USDT -> {required_notional_value:.2f} USDT "
f"(杠杆: {current_leverage}x, 保证金: {min_margin_usdt:.2f} USDT)"
)
# 调整数量以满足最小保证金要求
if current_price > 0:
new_quantity = required_notional_value / current_price
# 先尝试向下取整调整
adjusted_quantity = self._adjust_quantity_precision(new_quantity, symbol_info)
# 重新计算名义价值和保证金
notional_value = adjusted_quantity * current_price
required_margin = notional_value / current_leverage
# 如果调整后保证金仍然不足,使用向上取整
if required_margin < min_margin_usdt:
logger.warning(
f" ⚠ 向下取整后保证金仍不足: {required_margin:.4f} USDT < {min_margin_usdt:.2f} USDT"
)
adjusted_quantity = self._adjust_quantity_precision_up(new_quantity, symbol_info)
# 重新计算名义价值和保证金
notional_value = adjusted_quantity * current_price
required_margin = notional_value / current_leverage
# 再次检查保证金
if required_margin < min_margin_usdt:
logger.error(
f" ❌ 调整后保证金仍不足: {required_margin:.4f} USDT < {min_margin_usdt:.2f} USDT"
)
logger.error(
f" 💡 建议: 增加账户余额或降低杠杆倍数,才能满足最小保证金要求"
)
return None
logger.info(
f" ✓ 调整数量: {quantity:.4f} -> {adjusted_quantity:.4f}, "
f"名义价值: {notional_value:.2f} USDT, "
f"保证金: {required_margin:.4f} USDT"
)
else:
logger.error(f" ❌ 无法获取 {symbol} 的当前价格,无法调整订单大小")
return None
# 最终检查:确保调整后的保证金满足要求
if required_margin < min_margin_usdt:
logger.error(
f"{symbol} 订单保证金不足: {required_margin:.4f} USDT < "
f"最小保证金要求: {min_margin_usdt:.2f} USDT拒绝下单"
)
return None
logger.info(
f" 保证金检查通过: {required_margin:.4f} USDT >= "
f"最小要求: {min_margin_usdt:.2f} USDT (杠杆: {current_leverage}x)"
)
# 构建订单参数
order_params = {
'symbol': symbol,
'side': side,
'type': order_type,
'quantity': adjusted_quantity
}
# 如果是平仓订单,添加 reduceOnly 参数
if reduce_only:
order_params['reduceOnly'] = True
logger.debug(f"{symbol} 使用 reduceOnly=True 平仓订单")
if order_type == 'MARKET':
order = await self.client.futures_create_order(**order_params)
else:
if price is None:
raise ValueError("限价单必须指定价格")
order_params['timeInForce'] = 'GTC'
order_params['price'] = price
order = await self.client.futures_create_order(**order_params)
logger.info(f"下单成功: {symbol} {side} {adjusted_quantity} @ {order_type} (名义价值: {notional_value:.2f} USDT)")
return order
except BinanceAPIException as e:
error_code = e.code if hasattr(e, 'code') else None
if error_code == -1111:
logger.error(f"下单失败 {symbol} {side}: 精度错误 - {e}")
logger.error(f" 原始数量: {quantity}")
if symbol_info:
logger.error(f" 交易对精度: {symbol_info}")
elif error_code == -4164:
logger.error(f"下单失败 {symbol} {side}: 订单名义价值不足 - {e}")
logger.error(f" 订单名义价值必须至少为 5 USDT (除非选择 reduce only)")
if symbol_info:
logger.error(f" 最小名义价值: {symbol_info.get('minNotional', 5.0)} USDT")
else:
logger.error(f"下单失败 {symbol} {side}: {e}")
return None
async def cancel_order(self, symbol: str, order_id: int) -> bool:
"""
取消订单
Args:
symbol: 交易对
order_id: 订单ID
Returns:
是否成功
"""
try:
await self.client.futures_cancel_order(symbol=symbol, orderId=order_id)
logger.info(f"取消订单成功: {symbol} {order_id}")
return True
except BinanceAPIException as e:
logger.error(f"取消订单失败: {e}")
return False
async def set_leverage(self, symbol: str, leverage: int = 10) -> bool:
"""
设置杠杆倍数
如果设置失败(比如超过交易对支持的最大杠杆),会自动降低杠杆重试
Args:
symbol: 交易对
leverage: 杠杆倍数
Returns:
是否成功
"""
try:
await self.client.futures_change_leverage(symbol=symbol, leverage=leverage)
logger.info(f"设置杠杆成功: {symbol} {leverage}x")
return True
except BinanceAPIException as e:
error_msg = str(e).lower()
# 如果错误信息包含杠杆相关的内容,尝试降低杠杆
if 'leverage' in error_msg or 'invalid' in error_msg:
# 尝试降低杠杆每次降低5倍最低到1倍
for reduced_leverage in range(leverage - 5, 0, -5):
if reduced_leverage < 1:
reduced_leverage = 1
try:
await self.client.futures_change_leverage(symbol=symbol, leverage=reduced_leverage)
logger.warning(
f"{symbol} 杠杆 {leverage}x 设置失败,已自动降低为 {reduced_leverage}x "
f"(原因: {e})"
)
return True
except BinanceAPIException:
if reduced_leverage <= 1:
break
continue
logger.error(f"设置杠杆失败: {symbol} {leverage}x, 错误: {e}")
return False
def get_realtime_price(self, symbol: str) -> Optional[float]:
"""
获取实时价格(从缓存)
Args:
symbol: 交易对
Returns:
实时价格如果缓存中有则返回否则返回None
"""
import time
if symbol in self._price_cache:
cached = self._price_cache[symbol]
cache_age = time.time() - cached.get('timestamp', 0)
if cache_age < self._price_cache_ttl:
return cached.get('price')
return None