auto_trade_sys/trading_system/binance_client.py
薇薇安 7f736c9081 a
2026-01-26 14:25:59 +08:00

1789 lines
83 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
币安客户端封装 - 提供异步交易接口
"""
import asyncio
import logging
import time
from typing import Dict, List, Optional, Any
from binance import AsyncClient, BinanceSocketManager
from binance.exceptions import BinanceAPIException
try:
from . import config
from .redis_cache import RedisCache
except ImportError:
import config
from redis_cache import RedisCache
logger = logging.getLogger(__name__)
class BinanceClient:
"""币安客户端封装类"""
def __init__(self, api_key: str = None, api_secret: str = None, testnet: bool = None):
"""
初始化币安客户端
Args:
api_key: API密钥如果为None从config读取
api_secret: API密钥如果为None从config读取
testnet: 是否使用测试网如果为None从config读取
"""
# 记录是否明确传入了 api_key 和 api_secret用于后续判断是否应该从 config 刷新)
# 注意:空字符串 "" 也被视为明确传入(用于推荐服务等只需要公开数据的场景)
self._explicit_api_key = api_key is not None
self._explicit_api_secret = api_secret is not None
# 如果未提供参数None从config读取确保使用最新值
# 注意:如果明确传递了 api_key 和 api_secret包括空字符串应该使用传递的值而不是从 config 读取
if api_key is None:
# 尝试从配置管理器直接获取
if config._config_manager:
try:
api_key = config._config_manager.get('BINANCE_API_KEY')
except:
pass
# 如果还是None使用config模块的值
if not api_key:
api_key = config.BINANCE_API_KEY
# 如果传入的是空字符串,保持为空字符串(不覆盖)
# 这样推荐服务可以使用空字符串来明确表示"只使用公开接口"
if api_secret is None:
# 尝试从配置管理器直接获取
if config._config_manager:
try:
api_secret = config._config_manager.get('BINANCE_API_SECRET')
except:
pass
# 如果还是None使用config模块的值
if not api_secret:
api_secret = config.BINANCE_API_SECRET
# 如果传入的是空字符串,保持为空字符串(不覆盖)
if testnet is None:
testnet = config.USE_TESTNET
self.api_key = api_key
self.api_secret = api_secret
self.testnet = testnet
# 记录使用的 API Key用于调试只显示前后4位
if api_key:
key_display = f"{api_key[:4]}...{api_key[-4:]}" if len(api_key) > 8 else api_key
logger.info(f"BinanceClient.__init__: 使用 API Key {key_display}, testnet={testnet}, explicit_key={self._explicit_api_key}, explicit_secret={self._explicit_api_secret}")
else:
logger.warning("BinanceClient.__init__: API Key 为空!")
# 如果传入的是空字符串,确保不会被覆盖(用于推荐服务等场景)
if self._explicit_api_key and not api_key:
logger.info("BinanceClient.__init__: 明确传入空 API Key推荐服务模式只使用公开接口")
# 初始化 Redis 缓存(必须在 __init__ 中初始化,不能依赖 _refresh_api_credentials
try:
self.redis_cache = RedisCache(
redis_url=config.REDIS_URL,
use_tls=config.REDIS_USE_TLS,
ssl_cert_reqs=config.REDIS_SSL_CERT_REQS,
ssl_ca_certs=config.REDIS_SSL_CA_CERTS,
username=config.REDIS_USERNAME,
password=config.REDIS_PASSWORD
)
except Exception as e:
logger.warning(f"初始化 Redis 缓存失败: {e},某些功能可能不可用")
self.redis_cache = None
self.client: Optional[AsyncClient] = None
self.socket_manager: Optional[BinanceSocketManager] = None
self._symbol_info_cache: Dict[str, Dict] = {} # 缓存交易对信息
self._last_request_time = {} # 记录每个API端点的最后请求时间
self._request_delay = 0.1 # 请求间隔(秒),避免频率限制
self._semaphore = asyncio.Semaphore(10) # 限制并发请求数
self._price_cache: Dict[str, Dict] = {} # WebSocket价格缓存 {symbol: {price, volume, changePercent, timestamp}}
self._price_cache_ttl = 60 # 价格缓存有效期(秒)
# 持仓模式缓存(币安 U本位合约dualSidePosition=True => 对冲模式False => 单向模式
self._dual_side_position: Optional[bool] = None
self._position_mode_checked_at: float = 0.0
# 你可能会在币安端切换“对冲/单向”模式TTL 过长会导致短时间内仍按旧模式下单(携带 positionSide
# 这里缩短到 10s兼顾及时性与接口频率。
self._position_mode_ttl: float = 10.0 # 秒,避免频繁调用接口
# 隐藏敏感信息只显示前4位和后4位
api_key_display = f"{self.api_key[:4]}...{self.api_key[-4:]}" if self.api_key and len(self.api_key) > 8 else self.api_key
api_secret_display = f"{self.api_secret[:4]}...{self.api_secret[-4:]}" if self.api_secret and len(self.api_secret) > 8 else self.api_secret
logger.info(f"初始化币安客户端: {api_key_display}, {api_secret_display}, {self.testnet}")
def _refresh_api_credentials(self):
"""刷新API密钥从配置管理器重新读取确保使用最新值"""
# 如果明确传入了 api_key 和 api_secret不应该从 config 刷新(避免覆盖多账号场景下的正确密钥)
if self._explicit_api_key and self._explicit_api_secret:
logger.debug("BinanceClient: 使用了明确的 API 密钥,跳过从 config 刷新(避免覆盖多账号场景)")
return
# 优先从配置管理器读取会从Redis获取最新值
if config._config_manager:
try:
# 强制从Redis重新加载配置
config._config_manager.reload_from_redis()
new_api_key = config._config_manager.get('BINANCE_API_KEY')
new_api_secret = config._config_manager.get('BINANCE_API_SECRET')
# 如果获取到新值且与当前值不同,更新
# 注意:即使传入了明确的密钥,如果只传入了其中一个,另一个仍可以从 config 刷新
if not self._explicit_api_key and new_api_key and new_api_key != self.api_key:
old_key_display = f"{self.api_key[:4]}...{self.api_key[-4:]}" if self.api_key and len(self.api_key) > 8 else self.api_key
new_key_display = f"{new_api_key[:4]}...{new_api_key[-4:]}" if new_api_key and len(new_api_key) > 8 else new_api_key
logger.info(f"检测到API密钥已更新: {old_key_display} -> {new_key_display}")
self.api_key = new_api_key
# 如果客户端已连接,需要重新连接以使用新密钥
if self.client:
logger.warning("API密钥已更新但客户端已连接需要重新连接才能使用新密钥")
if not self._explicit_api_secret and new_api_secret and new_api_secret != self.api_secret:
logger.info("检测到API密钥Secret已更新")
self.api_secret = new_api_secret
# 如果客户端已连接,需要重新连接以使用新密钥
if self.client:
logger.warning("API密钥Secret已更新但客户端已连接需要重新连接才能使用新密钥")
except Exception as e:
logger.debug(f"从配置管理器刷新API密钥失败: {e},使用现有值")
# 注意redis_cache 已在 __init__ 中初始化,这里不需要再次初始化
async def connect(self, timeout: int = None, retries: int = None):
"""
连接币安API
Args:
timeout: 连接超时时间默认从config读取
retries: 重试次数默认从config读取
"""
# 连接前刷新API密钥确保使用最新值支持热更新
# 但如果 API 密钥为空(只用于获取公开行情),则跳过
if self.api_key and self.api_secret:
self._refresh_api_credentials()
else:
logger.info("BinanceClient: 使用公开 API无需认证只能获取行情数据")
timeout = timeout or config.CONNECTION_TIMEOUT
retries = retries or config.CONNECTION_RETRIES
last_error = None
for attempt in range(retries):
try:
logger.info(
f"尝试连接币安API (第 {attempt + 1}/{retries} 次, "
f"测试网: {self.testnet}, 超时: {timeout}秒)..."
)
# 创建客户端使用最新的API密钥如果为空则只能访问公开接口
self.client = await AsyncClient.create(
api_key=self.api_key or None, # 空字符串转为 None
api_secret=self.api_secret or None,
testnet=self.testnet
)
# 测试连接(带超时)
try:
await asyncio.wait_for(self.client.ping(), timeout=timeout)
except asyncio.TimeoutError:
await self.client.close_connection()
raise asyncio.TimeoutError(f"ping超时 ({timeout}秒)")
self.socket_manager = BinanceSocketManager(self.client)
logger.info(f"✓ 币安客户端连接成功 (测试网: {self.testnet})")
# 连接 Redis 缓存
await self.redis_cache.connect()
# 验证API密钥权限仅当提供了有效的 API key 时)
if self.api_key and self.api_secret:
await self._verify_api_permissions()
else:
logger.info("✓ 使用公开 API跳过权限验证只能获取行情数据")
# 预热读取持仓模式(避免首次下单时才触发 -4061
try:
dual = await self._get_dual_side_position(force=True)
mode = "HEDGE(对冲)" if dual else "ONE-WAY(单向)"
logger.info(f"✓ 当前合约持仓模式: {mode}")
except Exception as e:
logger.debug(f"读取持仓模式失败(可忽略,后续下单会再尝试): {e}")
return
except asyncio.TimeoutError as e:
last_error = f"连接超时: {e}"
logger.warning(f"连接超时,剩余 {retries - attempt - 1} 次重试机会")
if attempt < retries - 1:
await asyncio.sleep(2) # 等待2秒后重试
except Exception as e:
last_error = str(e)
logger.warning(f"连接失败: {e},剩余 {retries - attempt - 1} 次重试机会")
if self.client:
try:
await self.client.close_connection()
except:
pass
if attempt < retries - 1:
await asyncio.sleep(2)
error_msg = f"连接币安API失败 (已重试 {retries} 次): {last_error}"
logger.error("=" * 60)
logger.error(error_msg)
logger.error("=" * 60)
logger.error("故障排查建议:")
logger.error("1. 检查网络连接是否正常")
logger.error("2. 检查API密钥是否正确")
logger.error("3. 如果在中国大陆可能需要使用代理或VPN")
if self.testnet:
logger.error("4. 测试网地址可能无法访问,尝试设置 USE_TESTNET=False")
logger.error("5. 检查防火墙设置")
logger.error("=" * 60)
raise ConnectionError(error_msg)
async def _verify_api_permissions(self):
"""
验证API密钥权限
"""
try:
# 尝试获取账户信息来验证权限
await self.client.futures_account()
logger.info("✓ API密钥权限验证通过")
except BinanceAPIException as e:
error_code = e.code if hasattr(e, 'code') else None
if error_code == -2015:
logger.warning("⚠ API密钥权限验证失败可能无法进行合约交易")
logger.warning("请检查API密钥是否启用了合约交易权限")
else:
logger.warning(f"⚠ API密钥验证时出现错误: {e}")
@staticmethod
def _to_bool(value: Any) -> Optional[bool]:
if value is None:
return None
if isinstance(value, bool):
return value
if isinstance(value, (int, float)):
return bool(value)
s = str(value).strip().lower()
if s in {"true", "1", "yes", "y"}:
return True
if s in {"false", "0", "no", "n"}:
return False
return None
async def _get_dual_side_position(self, force: bool = False) -> Optional[bool]:
"""
获取币安合约持仓模式:
- True: 对冲模式Hedge Mode需要下单时传 positionSide=LONG/SHORT
- False: 单向模式One-way Mode不应传 positionSide=LONG/SHORT
"""
if not self.client:
return None
now = time.time()
if not force and self._dual_side_position is not None and (now - self._position_mode_checked_at) < self._position_mode_ttl:
return self._dual_side_position
# python-binance: futures_get_position_mode -> {"dualSidePosition": true/false}
res = await self.client.futures_get_position_mode()
dual = self._to_bool(res.get("dualSidePosition") if isinstance(res, dict) else None)
self._dual_side_position = dual
self._position_mode_checked_at = now
return dual
async def _resolve_position_side_for_order(
self,
symbol: str,
side: str,
reduce_only: bool,
provided: Optional[str],
) -> Optional[str]:
"""
在对冲模式下,必须传 positionSide=LONG/SHORT。
- 开仓BUY=>LONG, SELL=>SHORT
- 平仓(reduceOnly):需要知道要减少的是 LONG 还是 SHORT
"""
if provided:
ps = provided.strip().upper()
if ps in {"LONG", "SHORT"}:
return ps
if ps == "BOTH":
# 对冲模式下 BOTH 无效,这里按开仓方向兜底
return "LONG" if side.upper() == "BUY" else "SHORT"
side_u = side.upper()
if not reduce_only:
return "LONG" if side_u == "BUY" else "SHORT"
# reduceOnly 平仓:尝试从当前持仓推断(避免调用方漏传)
try:
positions = await self.client.futures_position_information(symbol=symbol)
nonzero = []
for p in positions or []:
try:
amt = float(p.get("positionAmt", 0))
except Exception:
continue
if abs(amt) > 0:
nonzero.append((amt, p))
if len(nonzero) == 1:
amt, p = nonzero[0]
ps = (p.get("positionSide") or "").upper()
if ps in {"LONG", "SHORT"}:
return ps
return "LONG" if amt > 0 else "SHORT"
# 多持仓按平仓方向推断SELL 通常平 LONGBUY 通常平 SHORT
if side_u == "SELL":
cand = next((p for amt, p in nonzero if amt > 0), None)
return (cand.get("positionSide") or "LONG").upper() if cand else "LONG"
if side_u == "BUY":
cand = next((p for amt, p in nonzero if amt < 0), None)
return (cand.get("positionSide") or "SHORT").upper() if cand else "SHORT"
except Exception as e:
logger.debug(f"{symbol} 推断 positionSide 失败: {e}")
return None
async def disconnect(self):
"""断开连接"""
# 关闭 Redis 连接
await self.redis_cache.close()
if self.client:
await self.client.close_connection()
logger.info("币安客户端已断开连接")
async def get_all_usdt_pairs(self, max_retries: int = 3, timeout: int = 30) -> List[str]:
"""
获取所有USDT交易对
添加超时处理和重试机制,避免推荐系统因网络超时中断
Args:
max_retries: 最大重试次数默认3次
timeout: 单次请求超时时间默认30秒
Returns:
USDT交易对列表失败时返回空列表
"""
for attempt in range(1, max_retries + 1):
try:
# 使用 _rate_limited_request 包装请求,添加速率限制
# 同时使用 asyncio.wait_for 添加超时处理
exchange_info = await asyncio.wait_for(
self._rate_limited_request(
'futures_exchange_info',
self.client.futures_exchange_info()
),
timeout=timeout
)
usdt_pairs = [
symbol['symbol']
for symbol in exchange_info['symbols']
if symbol['symbol'].endswith('USDT')
and symbol['status'] == 'TRADING'
and symbol.get('contractType') == 'PERPETUAL' # U本位永续合约
]
logger.info(f"获取到 {len(usdt_pairs)} 个USDT永续合约交易对")
return usdt_pairs
except asyncio.TimeoutError:
if attempt < max_retries:
wait_time = attempt * 2 # 递增等待时间2秒、4秒、6秒
logger.warning(f"获取交易对超时({timeout}秒),{wait_time}秒后重试 ({attempt}/{max_retries})")
await asyncio.sleep(wait_time)
else:
logger.error(f"获取交易对失败:{max_retries}次重试后仍然超时")
return []
except BinanceAPIException as e:
logger.error(f"获取交易对失败API错误: {e}")
return []
except Exception as e:
if attempt < max_retries:
wait_time = attempt * 2
logger.warning(f"获取交易对出错: {e}{wait_time}秒后重试 ({attempt}/{max_retries})")
await asyncio.sleep(wait_time)
else:
logger.error(f"获取交易对失败(未知错误): {e}")
return []
return [] # 所有重试都失败,返回空列表
async def _rate_limited_request(self, endpoint: str, coro):
"""
带速率限制的API请求
Args:
endpoint: API端点标识用于记录请求时间
coro: 异步协程
"""
async with self._semaphore:
# 检查是否需要等待(避免请求过快)
if endpoint in self._last_request_time:
elapsed = asyncio.get_event_loop().time() - self._last_request_time[endpoint]
if elapsed < self._request_delay:
await asyncio.sleep(self._request_delay - elapsed)
self._last_request_time[endpoint] = asyncio.get_event_loop().time()
return await coro
async def get_klines(self, symbol: str, interval: str = '5m', limit: int = 2) -> List[List]:
"""
获取K线数据合约市场
优先从 Redis 缓存读取,如果缓存不可用或过期则使用 REST API
Args:
symbol: 交易对
interval: K线周期
limit: 获取数量
Returns:
K线数据列表
"""
# 先查 Redis 缓存
cache_key = f"klines:{symbol}:{interval}:{limit}"
cached = await self.redis_cache.get(cache_key)
if cached:
logger.debug(f"从缓存获取 {symbol} K线数据: {interval} x{limit}")
return cached
try:
# 缓存未命中,调用 API
klines = await self._rate_limited_request(
f'klines_{symbol}_{interval}',
self.client.futures_klines(symbol=symbol, interval=interval, limit=limit)
)
# 写入 Redis 缓存(根据 interval 动态设置 TTL
if klines:
# TTL 设置1m=10秒, 5m=30秒, 15m=1分钟, 1h=5分钟, 4h=15分钟, 1d=1小时
ttl_map = {
'1m': 10,
'3m': 20,
'5m': 30,
'15m': 60,
'30m': 120,
'1h': 300,
'2h': 600,
'4h': 900,
'6h': 1200,
'8h': 1800,
'12h': 2400,
'1d': 3600
}
ttl = ttl_map.get(interval, 300) # 默认 5 分钟
await self.redis_cache.set(cache_key, klines, ttl=ttl)
logger.debug(f"已缓存 {symbol} K线数据: {interval} x{limit} (TTL: {ttl}秒)")
return klines
except BinanceAPIException as e:
error_code = e.code if hasattr(e, 'code') else None
if error_code == -1003:
logger.warning(f"获取 {symbol} K线数据失败: API请求频率过高建议使用WebSocket或增加扫描间隔")
else:
logger.error(f"获取 {symbol} K线数据失败: {e}")
return []
async def get_ticker_24h(self, symbol: str) -> Optional[Dict]:
"""
获取24小时行情数据合约市场
优先从WebSocket缓存读取其次从Redis缓存读取最后使用REST API
Args:
symbol: 交易对
Returns:
24小时行情数据
"""
import time
# 1. 优先从WebSocket缓存读取
if symbol in self._price_cache:
cached = self._price_cache[symbol]
cache_age = time.time() - cached.get('timestamp', 0)
if cache_age < self._price_cache_ttl:
logger.debug(f"从WebSocket缓存获取 {symbol} 价格: {cached['price']:.8f} (缓存年龄: {cache_age:.1f}秒)")
return {
'symbol': symbol,
'price': cached['price'],
'volume': cached.get('volume', 0),
'changePercent': cached.get('changePercent', 0)
}
else:
logger.debug(f"{symbol} WebSocket缓存已过期 ({cache_age:.1f}秒 > {self._price_cache_ttl}秒)")
# 2. 从 Redis 缓存读取
cache_key = f"ticker_24h:{symbol}"
cached = await self.redis_cache.get(cache_key)
if cached:
logger.debug(f"从Redis缓存获取 {symbol} 24小时行情数据")
return cached
# 3. 如果缓存不可用或过期使用REST APIfallback
logger.debug(f"{symbol} 未在缓存中使用REST API获取")
try:
ticker = await self._rate_limited_request(
f'ticker_{symbol}',
self.client.futures_symbol_ticker(symbol=symbol)
)
stats = await self._rate_limited_request(
f'stats_{symbol}',
self.client.futures_ticker(symbol=symbol)
)
result = {
'symbol': symbol,
'price': float(ticker['price']),
'volume': float(stats.get('quoteVolume', 0)),
'changePercent': float(stats.get('priceChangePercent', 0))
}
# 更新 WebSocket 缓存
self._price_cache[symbol] = {
**result,
'timestamp': time.time()
}
# 写入 Redis 缓存TTL: 30秒
await self.redis_cache.set(cache_key, result, ttl=30)
return result
except BinanceAPIException as e:
error_code = e.code if hasattr(e, 'code') else None
if error_code == -1003:
logger.warning(f"获取 {symbol} 24小时行情失败: API请求频率过高建议使用WebSocket或增加扫描间隔")
else:
logger.error(f"获取 {symbol} 24小时行情失败: {e}")
return None
async def get_all_tickers_24h(self) -> Dict[str, Dict]:
"""
批量获取所有交易对的24小时行情数据更高效
优先从 Redis 缓存读取,如果缓存不可用或过期则使用 REST API
Returns:
交易对行情数据字典 {symbol: {price, volume, changePercent}}
"""
# 先查 Redis 缓存
cache_key = "ticker_24h:all"
cached = await self.redis_cache.get(cache_key)
if cached:
logger.debug(f"从Redis缓存获取所有交易对的24小时行情数据: {len(cached)} 个交易对")
return cached
try:
# 使用批量API一次获取所有交易对的数据
tickers = await self._rate_limited_request(
'all_tickers',
self.client.futures_ticker()
)
result = {}
now_ms = int(__import__("time").time() * 1000)
for ticker in tickers:
symbol = ticker['symbol']
if symbol.endswith('USDT'):
result[symbol] = {
'symbol': symbol,
'price': float(ticker.get('lastPrice', 0)),
'volume': float(ticker.get('quoteVolume', 0)),
'changePercent': float(ticker.get('priceChangePercent', 0)),
# 用于前端展示“当前价格更新时间”(以及后端合并时判断新鲜度)
'ts': now_ms
}
# 写入 Redis 缓存TTL: 60秒多个账户可以共用
await self.redis_cache.set(cache_key, result, ttl=60)
logger.debug(f"批量获取到 {len(result)} 个交易对的24小时行情数据已缓存 (TTL: 60秒)")
return result
except BinanceAPIException as e:
error_code = e.code if hasattr(e, 'code') else None
if error_code == -1003:
logger.warning(f"批量获取24小时行情失败: API请求频率过高建议使用WebSocket或增加扫描间隔")
else:
logger.error(f"批量获取24小时行情失败: {e}")
return {}
async def get_account_balance(self) -> Dict[str, float]:
"""
获取U本位合约账户余额
Returns:
账户余额字典 {'available': 可用余额, 'total': 总余额, 'margin': 保证金余额}
"""
try:
account = await self.client.futures_account()
assets = account.get('assets', [])
usdt_asset = next((a for a in assets if a['asset'] == 'USDT'), None)
if usdt_asset:
# 币安合约账户字段说明(根据官方文档):
# - walletBalance: 钱包余额(不包括未实现盈亏,只反映已实现的盈亏、转账、手续费等)
# - marginBalance: 保证金余额(钱包余额 + 未实现盈亏),这是账户的总权益,用户看到的"总余额"
# - availableBalance: 可用余额(可用于开仓的余额 = 钱包余额 - 初始保证金 + 未实现盈亏)
wallet_balance = float(usdt_asset.get('walletBalance', 0))
available_balance = float(usdt_asset.get('availableBalance', 0))
margin_balance = float(usdt_asset.get('marginBalance', 0))
unrealized_profit = float(usdt_asset.get('unrealizedProfit', 0))
# 记录所有字段以便调试
logger.info(f"币安合约账户余额详情 (USDT):")
logger.info(f" - walletBalance (钱包余额,不包括未实现盈亏): {wallet_balance}")
logger.info(f" - marginBalance (保证金余额,总权益,包括未实现盈亏): {margin_balance}")
logger.info(f" - availableBalance (可用余额): {available_balance}")
logger.info(f" - unrealizedProfit (未实现盈亏): {unrealized_profit}")
logger.info(f" - 验证: marginBalance ({margin_balance}) = walletBalance ({wallet_balance}) + unrealizedProfit ({unrealized_profit}) = {wallet_balance + unrealized_profit}")
return {
'ok': True,
'available': available_balance,
'total': margin_balance, # 使用保证金余额作为总余额(包括未实现盈亏),这是用户看到的"总余额"
'margin': margin_balance,
# 添加原始字段以便调试
'walletBalance': wallet_balance,
'availableBalance': available_balance,
'marginBalance': margin_balance,
'unrealizedProfit': unrealized_profit
}
logger.warning("币安账户中没有找到 USDT 资产")
return {'ok': True, 'available': 0.0, 'total': 0.0, 'margin': 0.0}
except BinanceAPIException as e:
error_code = e.code if hasattr(e, 'code') else None
error_msg = str(e)
# 合并成“单条多行日志”,避免日志/Redis 里刷屏
lines = [
"=" * 60,
f"获取账户余额失败: {error_msg}",
]
if error_code == -2015:
lines += [
"=" * 60,
"API密钥权限错误 (错误代码: -2015)",
"可能的原因:",
"1. API密钥无效或已过期",
"2. API密钥没有合约交易权限",
"3. IP地址未添加到API密钥白名单",
"4. 测试网/生产网环境不匹配",
"=" * 60,
"解决方案:",
"1. 登录币安账户检查API密钥状态",
"2. 确保API密钥已启用'合约交易'权限",
"3. 如果设置了IP白名单请添加当前服务器IP",
"4. 检查 USE_TESTNET 配置是否正确",
f" 当前配置: USE_TESTNET = {self.testnet}",
"=" * 60,
]
elif error_code == -1022:
lines += [
f"错误代码: {error_code}",
"签名错误,请检查 API_KEY / API_SECRET 是否匹配、是否有多余空格/换行",
]
elif error_code == -2010:
lines += [
f"错误代码: {error_code}",
"账户余额不足",
]
else:
lines += [f"错误代码: {error_code}"]
logger.error("\n".join(lines))
return {
'ok': False,
'available': 0.0,
'total': 0.0,
'margin': 0.0,
'error_code': error_code,
'error_msg': error_msg,
}
async def get_open_positions(self) -> List[Dict]:
"""
获取当前持仓
Returns:
持仓列表
"""
try:
positions = await self.client.futures_position_information()
open_positions = [
{
'symbol': pos['symbol'],
'positionAmt': float(pos['positionAmt']),
'entryPrice': float(pos['entryPrice']),
'markPrice': float(pos.get('markPrice', 0)),
'unRealizedProfit': float(pos['unRealizedProfit']),
'leverage': int(pos['leverage'])
}
for pos in positions
if float(pos['positionAmt']) != 0
]
return open_positions
except BinanceAPIException as e:
logger.error(f"获取持仓信息失败: {e}")
return []
async def get_symbol_info(self, symbol: str) -> Optional[Dict]:
"""
获取交易对的精度和限制信息
优先从 Redis 缓存读取,如果缓存不可用或过期则使用 REST API
Args:
symbol: 交易对
Returns:
交易对信息字典,包含 quantityPrecision, minQty, stepSize 等
"""
# 1. 先检查内存缓存
if symbol in self._symbol_info_cache:
cached_mem = self._symbol_info_cache[symbol]
# 兼容旧缓存:早期版本没有 tickSize/pricePrecision容易触发 -4014/-1111
if isinstance(cached_mem, dict) and ("tickSize" not in cached_mem or "pricePrecision" not in cached_mem):
try:
self._symbol_info_cache.pop(symbol, None)
except Exception:
pass
else:
return cached_mem
# 2. 从 Redis 缓存读取
cache_key = f"symbol_info:{symbol}"
cached = await self.redis_cache.get(cache_key)
if cached:
logger.debug(f"从Redis缓存获取 {symbol} 交易对信息")
# 兼容旧缓存:早期版本没有 tickSize/pricePrecision容易触发 -4014/-1111
if isinstance(cached, dict) and ("tickSize" not in cached or "pricePrecision" not in cached):
logger.info(f"{symbol} symbol_info 缓存缺少 tickSize/pricePrecision自动刷新一次")
else:
# 同时更新内存缓存
self._symbol_info_cache[symbol] = cached
return cached
# 3. 缓存未命中,调用 API
try:
exchange_info = await self.client.futures_exchange_info()
for s in exchange_info['symbols']:
if s['symbol'] == symbol:
# 提取数量/价格精度信息
quantity_precision = s.get('quantityPrecision', 8)
price_precision = s.get('pricePrecision', 8)
# 从filters中提取 minQty/stepSize/minNotional/tickSize
min_qty = None
step_size = None
min_notional = None
tick_size = None
for f in s.get('filters', []):
if f['filterType'] == 'LOT_SIZE':
min_qty = float(f.get('minQty', 0))
step_size = float(f.get('stepSize', 0))
elif f.get('filterType') == 'PRICE_FILTER':
tick_size = float(f.get('tickSize', 0) or 0)
elif f['filterType'] == 'MIN_NOTIONAL':
min_notional = float(f.get('notional', 0))
# 如果没有从filters获取到minNotional使用默认值5 USDT
if min_notional is None or min_notional == 0:
min_notional = 5.0
# 获取交易对支持的最大杠杆倍数
# 币安API的exchange_info中可能没有直接的leverageBracket信息
# 我们尝试从leverageBracket获取如果没有则使用默认值
max_leverage_supported = 125 # 币安合约默认最大杠杆
# 尝试从leverageBracket获取如果存在
if s.get('leverageBracket') and len(s.get('leverageBracket', [])) > 0:
max_leverage_supported = s['leverageBracket'][0].get('maxLeverage', 125)
else:
# 如果leverageBracket不存在尝试通过futures_leverage_bracket API获取
# 但为了不增加API调用这里先使用默认值125
# 实际使用时会在设置杠杆时检查,如果失败会自动降低
max_leverage_supported = 125
info = {
'quantityPrecision': quantity_precision,
'pricePrecision': price_precision,
'minQty': min_qty or 0,
'stepSize': step_size or 0,
'tickSize': tick_size or 0,
'minNotional': min_notional,
'maxLeverage': int(max_leverage_supported) # 交易对支持的最大杠杆
}
# 写入 Redis 缓存TTL: 1小时
await self.redis_cache.set(cache_key, info, ttl=3600)
# 同时更新内存缓存
self._symbol_info_cache[symbol] = info
logger.debug(f"获取 {symbol} 精度信息: {info}")
return info
logger.warning(f"未找到交易对 {symbol} 的信息")
return None
except Exception as e:
logger.error(f"获取 {symbol} 交易对信息失败: {e}")
return None
def _adjust_quantity_precision(self, quantity: float, symbol_info: Dict) -> float:
"""
调整数量精度,使其符合币安要求
Args:
quantity: 原始数量
symbol_info: 交易对信息
Returns:
调整后的数量
"""
if not symbol_info:
# 如果没有交易对信息使用默认精度3位小数
return round(quantity, 3)
quantity_precision = symbol_info.get('quantityPrecision', 8)
step_size = symbol_info.get('stepSize', 0)
min_qty = symbol_info.get('minQty', 0)
# 如果有stepSize按照stepSize调整
if step_size > 0:
# 向下取整到stepSize的倍数使用浮点数除法
adjusted = float(int(quantity / step_size)) * step_size
else:
# 否则按照精度调整
adjusted = round(quantity, quantity_precision)
# 确保不小于最小数量
if min_qty > 0 and adjusted < min_qty:
# 如果小于最小数量,尝试向上取整到最小数量
if step_size > 0:
adjusted = min_qty
else:
adjusted = round(min_qty, quantity_precision)
logger.warning(f"数量 {quantity} 小于最小数量 {min_qty},调整为 {adjusted}")
# 最终精度调整
adjusted = round(adjusted, quantity_precision)
if adjusted != quantity:
logger.info(f"数量精度调整: {quantity} -> {adjusted} (精度: {quantity_precision}, stepSize: {step_size}, minQty: {min_qty})")
return adjusted
@staticmethod
def _format_decimal_str(d) -> str:
try:
s = format(d, 'f')
except Exception:
s = str(d)
if '.' in s:
s = s.rstrip('0').rstrip('.')
return s
@staticmethod
def _format_quantity_str(quantity: float, symbol_info: Optional[Dict]) -> str:
"""
把数量格式化为币安可接受的字符串,避免 quantityPrecision=0 时发送 "197.0" 导致 -1111。
"""
try:
from decimal import Decimal, ROUND_DOWN
except Exception:
# fallback尽量去掉浮点尾巴
q = float(quantity or 0)
if not symbol_info:
return str(q)
qp = int(symbol_info.get("quantityPrecision", 8) or 8)
if qp <= 0:
return str(int(q))
return str(round(q, qp))
qp = 8
try:
qp = int(symbol_info.get("quantityPrecision", 8) or 8) if symbol_info else 8
except Exception:
qp = 8
qd = Decimal(str(quantity))
if qp <= 0:
q2 = qd.to_integral_value(rounding=ROUND_DOWN)
return BinanceClient._format_decimal_str(q2)
q2 = qd.quantize(Decimal(f"1e-{qp}"), rounding=ROUND_DOWN)
return BinanceClient._format_decimal_str(q2)
@staticmethod
def _format_limit_price_str(price: float, symbol_info: Optional[Dict], side: str) -> str:
"""
把 LIMIT 价格格式化为币安可接受的字符串tickSize/pricePrecision 对齐),避免:
-4014 Price not increased by tick size
-1111 Precision is over the maximum defined for this asset
"""
try:
from decimal import Decimal, ROUND_DOWN, ROUND_UP
except Exception:
return str(round(float(price), 8))
tick = 0.0
pp = 8
try:
tick = float(symbol_info.get("tickSize", 0) or 0) if symbol_info else 0.0
except Exception:
tick = 0.0
try:
pp = int(symbol_info.get("pricePrecision", 8) or 8) if symbol_info else 8
except Exception:
pp = 8
p = Decimal(str(price))
rounding = ROUND_DOWN if (side or "").upper() == "BUY" else ROUND_UP
# 1) tickSize 优先(最严格)
try:
t = Decimal(str(tick))
if t > 0:
q = p / t
q2 = q.to_integral_value(rounding=rounding)
p2 = q2 * t
return BinanceClient._format_decimal_str(p2)
except Exception:
pass
# 2) 没有 tickSize 时,用 pricePrecision 兜底
try:
if pp <= 0:
return BinanceClient._format_decimal_str(p.to_integral_value(rounding=rounding))
p2 = p.quantize(Decimal(f"1e-{pp}"), rounding=rounding)
return BinanceClient._format_decimal_str(p2)
except Exception:
return BinanceClient._format_decimal_str(p)
@staticmethod
def _format_price_str_with_rounding(price: float, symbol_info: Optional[Dict], rounding_mode: str) -> str:
"""
通用价格格式化tickSize/pricePrecision 对齐),并允许显式指定 ROUND_UP / ROUND_DOWN。
rounding_mode: "UP""DOWN"
"""
try:
from decimal import Decimal, ROUND_DOWN, ROUND_UP
except Exception:
return str(round(float(price), 8))
tick = 0.0
pp = 8
try:
tick = float(symbol_info.get("tickSize", 0) or 0) if symbol_info else 0.0
except Exception:
tick = 0.0
try:
pp = int(symbol_info.get("pricePrecision", 8) or 8) if symbol_info else 8
except Exception:
pp = 8
p = Decimal(str(price))
rounding = ROUND_UP if str(rounding_mode).upper() == "UP" else ROUND_DOWN
# tickSize 优先
try:
t = Decimal(str(tick))
if t > 0:
q = p / t
q2 = q.to_integral_value(rounding=rounding)
p2 = q2 * t
return BinanceClient._format_decimal_str(p2)
except Exception:
pass
# pricePrecision 兜底
try:
if pp <= 0:
return BinanceClient._format_decimal_str(p.to_integral_value(rounding=rounding))
p2 = p.quantize(Decimal(f"1e-{pp}"), rounding=rounding)
return BinanceClient._format_decimal_str(p2)
except Exception:
return BinanceClient._format_decimal_str(p)
@staticmethod
def _adjust_price_to_tick(price: float, tick_size: float, side: str) -> float:
"""
把 LIMIT 价格调整为 tickSize 的整数倍,避免:
-4014 Price not increased by tick size
-1111 Precision is over the maximum defined for this asset部分情况下也会由 price 引发)
规则(入场限价):
- BUY向下取整更便宜且不会“买贵了”
- SELL向上取整更贵/更高,且不会“卖便宜了”)
"""
try:
from decimal import Decimal, ROUND_DOWN, ROUND_UP
except Exception:
# fallback不用 Decimal 时,至少 round 到 8 位,尽量减少精度问题
return float(round(float(price), 8))
try:
p = Decimal(str(price))
t = Decimal(str(tick_size))
if t <= 0:
return float(p)
q = p / t
rounding = ROUND_DOWN if (side or "").upper() == "BUY" else ROUND_UP
q2 = q.to_integral_value(rounding=rounding)
p2 = q2 * t
return float(p2)
except Exception:
return float(round(float(price), 8))
def _adjust_quantity_precision_up(self, quantity: float, symbol_info: Dict) -> float:
"""
向上取整调整数量精度,使其符合币安要求
Args:
quantity: 原始数量
symbol_info: 交易对信息
Returns:
调整后的数量(向上取整)
"""
import math
if not symbol_info:
# 如果没有交易对信息向上取整到3位小数
return round(math.ceil(quantity * 1000) / 1000, 3)
quantity_precision = symbol_info.get('quantityPrecision', 8)
step_size = symbol_info.get('stepSize', 0)
min_qty = symbol_info.get('minQty', 0)
# 如果有stepSize按照stepSize向上取整
if step_size > 0:
# 向上取整到stepSize的倍数
adjusted = math.ceil(quantity / step_size) * step_size
else:
# 否则按照精度向上取整
multiplier = 10 ** quantity_precision
adjusted = math.ceil(quantity * multiplier) / multiplier
# 确保不小于最小数量
if min_qty > 0 and adjusted < min_qty:
adjusted = min_qty
# 最终精度调整
adjusted = round(adjusted, quantity_precision)
if adjusted != quantity:
logger.info(f"数量向上取整调整: {quantity} -> {adjusted} (精度: {quantity_precision}, stepSize: {step_size}, minQty: {min_qty})")
return adjusted
async def place_order(
self,
symbol: str,
side: str,
quantity: float,
order_type: str = 'MARKET',
price: Optional[float] = None,
reduce_only: bool = False,
position_side: Optional[str] = None,
) -> Optional[Dict]:
"""
下单
Args:
symbol: 交易对
side: 方向 'BUY''SELL'
quantity: 数量
order_type: 订单类型 'MARKET''LIMIT'
price: 限价单价格
Returns:
订单信息
"""
try:
# 获取交易对精度信息
symbol_info = await self.get_symbol_info(symbol)
# 获取当前价格以计算名义价值
if price is None:
ticker = await self.get_ticker_24h(symbol)
if not ticker:
logger.error(f"无法获取 {symbol} 的价格信息")
return None
current_price = ticker['price']
else:
current_price = price
# 先按原始数量计算名义价值,用于保证金检查
initial_notional_value = quantity * current_price
min_notional = symbol_info.get('minNotional', 5.0) if symbol_info else 5.0
# 调整数量精度(在保证金检查之前)
adjusted_quantity = self._adjust_quantity_precision(quantity, symbol_info)
if adjusted_quantity <= 0:
logger.error(f"调整后的数量无效: {adjusted_quantity} (原始: {quantity})")
return None
# 使用调整后的数量重新计算名义价值
notional_value = adjusted_quantity * current_price
logger.info(f"下单检查: {symbol} {side} {adjusted_quantity} (原始: {quantity}) @ {order_type}")
logger.info(f" 当前价格: {current_price:.4f} USDT")
logger.info(f" 订单名义价值: {notional_value:.2f} USDT")
logger.info(f" 最小名义价值: {min_notional:.2f} USDT")
logger.info(f" 平仓模式: {reduce_only}")
# 检查名义价值是否满足最小要求
# 注意对于平仓操作reduce_only=True完全跳过名义价值检查
# 因为平仓是关闭现有持仓币安允许任意大小的平仓订单只要大于0
if reduce_only:
logger.info(f" 平仓操作:跳过名义价值检查(名义价值: {notional_value:.2f} USDT")
elif notional_value < min_notional:
logger.warning(
f"{symbol} 订单名义价值不足: {notional_value:.2f} USDT < "
f"最小要求: {min_notional:.2f} USDT"
)
logger.warning(f" 需要增加数量或提高仓位大小")
return None
# 检查最小保证金要求(避免手续费侵蚀收益)
# 获取当前杠杆(如果无法获取,使用默认值)
current_leverage = config.TRADING_CONFIG.get('LEVERAGE', 10)
try:
# 尝试从持仓信息获取实际使用的杠杆
positions = await self.client.futures_position_information(symbol=symbol)
if positions and len(positions) > 0:
position = positions[0]
if float(position.get('positionAmt', 0)) != 0:
# 有持仓,使用持仓的杠杆
leverage_bracket = position.get('leverage', current_leverage)
if leverage_bracket:
current_leverage = int(leverage_bracket)
except Exception as e:
logger.debug(f"无法获取 {symbol} 的杠杆信息,使用默认值: {current_leverage}x ({e})")
min_margin_usdt = config.TRADING_CONFIG.get('MIN_MARGIN_USDT', 0.5) # 默认0.5 USDT
required_margin = notional_value / current_leverage
# 对于平仓操作reduce_only=True跳过最小保证金检查
# 因为这是关闭现有持仓,不应该因为保证金不足而拒绝平仓
if reduce_only:
logger.info(f" 平仓操作:跳过最小保证金检查(名义价值: {notional_value:.2f} USDT")
elif required_margin < min_margin_usdt:
# 如果保证金不足,自动调整到最小保证金要求
required_notional_value = min_margin_usdt * current_leverage
logger.warning(
f"{symbol} 订单保证金不足: {required_margin:.4f} USDT < "
f"最小保证金要求: {min_margin_usdt:.2f} USDT"
)
logger.info(
f" 自动调整订单名义价值: {notional_value:.2f} USDT -> {required_notional_value:.2f} USDT "
f"(杠杆: {current_leverage}x, 保证金: {min_margin_usdt:.2f} USDT)"
)
# 调整数量以满足最小保证金要求
if current_price > 0:
new_quantity = required_notional_value / current_price
# 先尝试向下取整调整
adjusted_quantity = self._adjust_quantity_precision(new_quantity, symbol_info)
# 重新计算名义价值和保证金
notional_value = adjusted_quantity * current_price
required_margin = notional_value / current_leverage
# 如果调整后保证金仍然不足,使用向上取整
if required_margin < min_margin_usdt:
logger.warning(
f" ⚠ 向下取整后保证金仍不足: {required_margin:.4f} USDT < {min_margin_usdt:.2f} USDT"
)
adjusted_quantity = self._adjust_quantity_precision_up(new_quantity, symbol_info)
# 重新计算名义价值和保证金
notional_value = adjusted_quantity * current_price
required_margin = notional_value / current_leverage
# 再次检查保证金
if required_margin < min_margin_usdt:
logger.error(
f" ❌ 调整后保证金仍不足: {required_margin:.4f} USDT < {min_margin_usdt:.2f} USDT"
)
logger.error(
f" 💡 建议: 增加账户余额或降低杠杆倍数,才能满足最小保证金要求"
)
return None
logger.info(
f" ✓ 调整数量: {quantity:.4f} -> {adjusted_quantity:.4f}, "
f"名义价值: {notional_value:.2f} USDT, "
f"保证金: {required_margin:.4f} USDT"
)
else:
logger.error(f" ❌ 无法获取 {symbol} 的当前价格,无法调整订单大小")
return None
# 最终检查:确保调整后的保证金满足要求
# 对于平仓操作reduce_only=True跳过最终保证金检查
if reduce_only:
logger.info(f" 平仓操作:跳过最终保证金检查(保证金: {required_margin:.4f} USDT")
elif required_margin < min_margin_usdt:
logger.error(
f"{symbol} 订单保证金不足: {required_margin:.4f} USDT < "
f"最小保证金要求: {min_margin_usdt:.2f} USDT拒绝下单"
)
return None
else:
logger.info(
f" 保证金检查通过: {required_margin:.4f} USDT >= "
f"最小要求: {min_margin_usdt:.2f} USDT (杠杆: {current_leverage}x)"
)
# 最终检查确保名义价值不小于0.2 USDT避免无意义的小单子
# 对于平仓操作reduce_only=True跳过此检查
# MIN_NOTIONAL_VALUE = 0.2 # 最小名义价值0.2 USDT
# if not reduce_only and notional_value < MIN_NOTIONAL_VALUE:
# logger.error(
# f"❌ {symbol} 订单名义价值 {notional_value:.4f} USDT < 最小要求 {MIN_NOTIONAL_VALUE:.2f} USDT拒绝下单"
# )
# logger.error(f" 💡 此类小单子意义不大,拒绝开仓")
# return None
# 构建订单参数
order_params = {
'symbol': symbol,
'side': side,
'type': order_type,
# 关键quantityPrecision=0 时必须是 "197" 而不是 "197.0",否则会触发 -1111
'quantity': self._format_quantity_str(adjusted_quantity, symbol_info)
}
# 处理持仓模式(解决 -4061position side 与账户设置不匹配)
# - 对冲模式:必须传 positionSide=LONG/SHORT
# - 单向模式:不要传 positionSide=LONG/SHORT否则会 -4061
dual = None
try:
dual = await self._get_dual_side_position()
except Exception as e:
logger.debug(f"{symbol} 获取持仓模式失败(稍后用重试兜底): {e}")
if dual is True:
ps = await self._resolve_position_side_for_order(symbol, side, reduce_only, position_side)
if not ps:
logger.error(f"{symbol} 对冲模式下无法确定 positionSide拒绝下单以避免 -4061")
return None
order_params["positionSide"] = ps
elif dual is False:
if position_side:
logger.info(f"{symbol} 单向模式下忽略 positionSide={position_side}(避免 -4061")
# 如果是平仓订单,添加 reduceOnly 参数
# 根据币安API文档reduceOnly 应该是字符串 "true" 或 "false"
if reduce_only:
# 实测:某些账户/模式下(尤其是对冲模式 + positionSide会报
# APIError(code=-1106): Parameter 'reduceonly' sent when not required.
# 因此:当我们已经明确指定 positionSide=LONG/SHORT 时,不再传 reduceOnly
# 其余情况仍保留 reduceOnly 以避免反向开仓。
if order_params.get("positionSide") in {"LONG", "SHORT"}:
logger.info(f"{symbol} 对冲模式平仓:已指定 positionSide跳过 reduceOnly避免 -1106")
else:
# python-binance 可以接受 bool同时后面也做 -1106 自动兜底重试
order_params['reduceOnly'] = True
logger.info(f"{symbol} 使用 reduceOnly=true 平仓订单")
async def _submit(params: Dict[str, Any]) -> Dict[str, Any]:
if order_type == 'MARKET':
return await self.client.futures_create_order(**params)
if price is None:
raise ValueError("限价单必须指定价格")
params = dict(params)
params['timeInForce'] = 'GTC'
# LIMIT 价格按 tickSize/pricePrecision 修正(避免 -4014 / -1111
params['price'] = self._format_limit_price_str(float(price), symbol_info, side)
return await self.client.futures_create_order(**params)
# 提交订单;若遇到:
# -4061: positionSide 与账户模式不匹配 → 在“带/不带 positionSide”之间兜底切换重试
# -1106: reduceOnly not required → 去掉 reduceOnly 重试(避免自动平仓失败)
try:
order = await _submit(order_params)
except BinanceAPIException as e:
code = getattr(e, "code", None)
if code in (-4061, -1106):
retry_params = dict(order_params)
if code == -4061:
logger.error(f"{symbol} 触发 -4061持仓模式不匹配尝试自动兜底重试一次")
if "positionSide" in retry_params:
retry_params.pop("positionSide", None)
else:
ps = await self._resolve_position_side_for_order(symbol, side, reduce_only, position_side)
if ps:
retry_params["positionSide"] = ps
elif code == -1106:
# 常见Parameter 'reduceonly' sent when not required.
msg = str(e).lower()
if "reduceonly" in msg or "reduce only" in msg:
logger.error(f"{symbol} 触发 -1106reduceOnly 不被接受),去掉 reduceOnly 后重试一次")
retry_params.pop("reduceOnly", None)
else:
raise
order = await _submit(retry_params)
else:
raise
logger.info(f"下单成功: {symbol} {side} {adjusted_quantity} @ {order_type} (名义价值: {notional_value:.2f} USDT)")
return order
except BinanceAPIException as e:
error_code = e.code if hasattr(e, 'code') else None
error_msg = str(e)
if error_code == -1111:
logger.error(f"下单失败 {symbol} {side}: 精度错误 - {e}")
logger.error(f" 原始数量: {quantity}")
if order_type == 'LIMIT':
logger.error(f" 原始价格: {price}")
if symbol_info:
logger.error(f" 交易对精度: {symbol_info}")
elif error_code == -4014:
# Price not increased by tick size.
logger.error(f"下单失败 {symbol} {side}: 价格步长错误(-4014) - {e}")
logger.error(f" 原始数量: {quantity}")
logger.error(f" 原始价格: {price}")
if symbol_info:
logger.error(f" tickSize: {symbol_info.get('tickSize')}, pricePrecision: {symbol_info.get('pricePrecision')}")
elif error_code == -4164:
logger.error(f"下单失败 {symbol} {side}: 订单名义价值不足 - {e}")
logger.error(f" 订单名义价值必须至少为 5 USDT (除非选择 reduce only)")
if symbol_info:
logger.error(f" 最小名义价值: {symbol_info.get('minNotional', 5.0)} USDT")
elif error_code == -2022:
# ReduceOnly Order is rejected - 可能是没有持仓或持仓方向不对
# 这类错误在并发/竞态场景很常见:我们以为还有仓位,但实际上已经被其他任务/手动操作平掉了
# 对于 reduce_only=True调用方应当把它当作“幂等平仓”的可接受结果再查一次实时持仓即可
if reduce_only:
logger.warning(f"下单被拒绝 {symbol} {side}: ReduceOnly(-2022)可能仓位已为0/方向腿不匹配),将由上层做幂等处理")
else:
logger.error(f"下单失败 {symbol} {side}: ReduceOnly 订单被拒绝 - {e}")
elif "reduceOnly" in error_msg.lower() or "reduce only" in error_msg.lower():
logger.error(f"下单失败 {symbol} {side}: ReduceOnly 相关错误 - {e}")
logger.error(f" 错误码: {error_code}")
else:
logger.error(f"下单失败 {symbol} {side}: {e}")
logger.error(f" 错误码: {error_code}")
logger.error(
f" 下单参数: symbol={symbol}, side={side}, quantity={adjusted_quantity}, type={order_type}, "
f"reduceOnly={reduce_only}, positionSide={order_params.get('positionSide') if 'order_params' in locals() else None}"
)
return None
except Exception as e:
# 捕获其他异常
logger.error(f"下单失败 {symbol} {side}: 未知错误 - {e}")
logger.error(f" 错误类型: {type(e).__name__}")
import traceback
logger.error(f" 完整错误堆栈:\n{traceback.format_exc()}")
return None
async def cancel_order(self, symbol: str, order_id: int) -> bool:
"""
取消订单
Args:
symbol: 交易对
order_id: 订单ID
Returns:
是否成功
"""
try:
await self.client.futures_cancel_order(symbol=symbol, orderId=order_id)
logger.info(f"取消订单成功: {symbol} {order_id}")
return True
except BinanceAPIException as e:
# -2011 Unknown order sent订单可能已成交/已撤销/已过期,这是典型幂等场景
# 取消操作应视为“已达成目标”(不再继续报错刷屏)
code = getattr(e, "code", None)
msg = str(e)
if code == -2011 or "code=-2011" in msg or "Unknown order sent" in msg:
logger.info(f"取消订单幂等成功(订单可能已不存在): {symbol} {order_id} | {e}")
return True
logger.error(f"取消订单失败: {e}")
return False
# =========================
# Algo Orders条件单/止盈止损/计划委托)
# 说明:币安在 2025-12 后将 USDT-M 合约的 STOP/TP/Trailing 等条件单迁移到 Algo 接口:
# - POST /fapi/v1/algoOrder
# - GET /fapi/v1/openAlgoOrders
# - DELETE /fapi/v1/algoOrder
# 如果仍用 /fapi/v1/order 下 STOP_MARKET/TAKE_PROFIT_MARKET + closePosition 会报 -4120。
# =========================
async def futures_create_algo_order(self, params: Dict[str, Any]) -> Optional[Dict[str, Any]]:
try:
# python-binance 内部会自动补 timestamp / signature
res = await self.client._request_futures_api("post", "algoOrder", True, data=params)
return res if isinstance(res, dict) else None
except BinanceAPIException as e:
error_code = e.code if hasattr(e, 'code') else None
error_msg = str(e)
symbol = params.get('symbol', 'UNKNOWN')
trigger_type = params.get('type', 'UNKNOWN')
# 详细错误日志
logger.error(f"{symbol} ❌ 创建 Algo 条件单失败({trigger_type}): {error_msg}")
logger.error(f" 错误代码: {error_code}")
logger.error(f" 参数: {params}")
# 常见错误码处理
if error_code == -4014:
logger.error(f" 原因: 价格步长错误triggerPrice 需要调整到 tickSize 的倍数")
elif error_code == -4164:
logger.error(f" 原因: 订单名义价值不足(至少需要 5 USDT")
elif error_code == -2022:
logger.error(f" 原因: ReduceOnly 订单被拒绝(可能没有持仓或持仓方向不对)")
elif error_code == -4120:
logger.error(f" 原因: 不支持的条件单类型(可能需要使用 Algo 接口)")
elif "immediately trigger" in error_msg.lower() or "would immediately trigger" in error_msg.lower():
logger.error(f" 原因: 触发价格会导致立即触发(止损/止盈价不在正确一侧)")
elif "position" in error_msg.lower():
logger.error(f" 原因: 持仓相关问题(可能没有持仓或持仓方向不匹配)")
return None
except Exception as e:
symbol = params.get('symbol', 'UNKNOWN')
logger.error(f"{symbol} ❌ 创建 Algo 条件单失败: {type(e).__name__}: {e}")
logger.error(f" 参数: {params}")
import traceback
logger.debug(f" 堆栈跟踪: {traceback.format_exc()}")
return None
async def futures_get_open_algo_orders(self, symbol: Optional[str] = None, algo_type: str = "CONDITIONAL") -> List[Dict[str, Any]]:
try:
data: Dict[str, Any] = {}
if symbol:
data["symbol"] = symbol
if algo_type:
data["algoType"] = algo_type
res = await self.client._request_futures_api("get", "openAlgoOrders", True, data=data)
return res if isinstance(res, list) else []
except Exception as e:
logger.debug(f"{symbol or ''} 获取 openAlgoOrders 失败: {e}")
return []
async def futures_cancel_algo_order(self, algo_id: int) -> bool:
try:
_ = await self.client._request_futures_api("delete", "algoOrder", True, data={"algoId": int(algo_id)})
return True
except Exception as e:
logger.debug(f"取消 Algo 条件单失败 algoId={algo_id}: {e}")
return False
async def get_open_orders(self, symbol: str) -> List[Dict]:
"""
获取某交易对的未成交委托(用于防止重复挂保护单)。
"""
try:
orders = await self.client.futures_get_open_orders(symbol=symbol)
return orders if isinstance(orders, list) else []
except Exception as e:
logger.debug(f"{symbol} 获取未成交委托失败: {e}")
return []
async def cancel_open_orders_by_types(self, symbol: str, types: set[str]) -> int:
"""
取消指定类型的未成交委托(只取消保护单相关类型,避免重复下单)。
返回取消数量。
"""
try:
want = {str(t).upper() for t in (types or set())}
if not want:
return 0
orders = await self.get_open_orders(symbol)
cancelled = 0
for o in orders:
try:
if not isinstance(o, dict):
continue
otype = str(o.get("type") or "").upper()
oid = o.get("orderId")
if otype in want and oid:
ok = await self.cancel_order(symbol, int(oid))
if ok:
cancelled += 1
except Exception:
continue
return cancelled
except Exception:
return 0
async def cancel_open_algo_orders_by_order_types(self, symbol: str, order_types: set[str]) -> int:
"""
取消指定类型的“Algo 条件单”openAlgoOrders
返回取消数量。
"""
try:
want = {str(t).upper() for t in (order_types or set())}
if not want:
return 0
orders = await self.futures_get_open_algo_orders(symbol=symbol, algo_type="CONDITIONAL")
cancelled = 0
for o in orders or []:
try:
if not isinstance(o, dict):
continue
otype = str(o.get("orderType") or o.get("type") or "").upper()
algo_id = o.get("algoId")
if algo_id and otype in want:
ok = await self.futures_cancel_algo_order(int(algo_id))
if ok:
cancelled += 1
except Exception:
continue
return cancelled
except Exception:
return 0
async def place_trigger_close_position_order(
self,
symbol: str,
position_direction: str,
trigger_type: str,
stop_price: float,
current_price: Optional[float] = None,
working_type: str = "MARK_PRICE",
) -> Optional[Dict]:
"""
在币安侧挂“保护单”,用于止损/止盈:
- STOP_MARKET / TAKE_PROFIT_MARKET
- closePosition=True自动平掉该 symbol 的当前仓位)
注意:这类单子不会在本地生成 exit_reason触发后我们靠“持仓同步/订单同步”去回写数据库。
"""
try:
symbol_info = await self.get_symbol_info(symbol)
dual = None
try:
dual = await self._get_dual_side_position()
except Exception:
dual = None
pd = (position_direction or "").upper()
if pd not in {"BUY", "SELL"}:
return None
ttype = str(trigger_type or "").upper()
if ttype not in {"STOP_MARKET", "TAKE_PROFIT_MARKET"}:
return None
close_side = "SELL" if pd == "BUY" else "BUY"
# stopPrice 的“避免立即触发”修正(按 MARK_PRICE
cp = None
try:
cp = float(current_price) if current_price is not None else None
except Exception:
cp = None
tick = 0.0
pp = 8
try:
tick = float(symbol_info.get("tickSize", 0) or 0) if symbol_info else 0.0
except Exception:
tick = 0.0
try:
pp = int(symbol_info.get("pricePrecision", 8) or 8) if symbol_info else 8
except Exception:
pp = 8
min_step = tick if tick and tick > 0 else (10 ** (-pp) if pp and pp > 0 else 1e-8)
sp = float(stop_price or 0)
if sp <= 0:
return None
# 触发方向约束(避免立即触发):
# - long 止损:价格 <= stopPricestopPrice 应 < current至少差一个 min_step
# - short 止损:价格 >= stopPricestopPrice 应 > current至少差一个 min_step
# - long 止盈:价格 >= stopPricestopPrice 应 > current至少差一个 min_step
# - short 止盈:价格 <= stopPricestopPrice 应 < current至少差一个 min_step
if cp and cp > 0:
if ttype == "STOP_MARKET":
if pd == "BUY":
# 做多止损:止损价必须 < 当前价,至少差一个 min_step
if sp >= cp:
# 如果止损价 >= 当前价,调整为当前价 - min_step但这样止损太紧可能不合理
# 更好的做法是:如果止损价太接近当前价,增加一个安全距离(例如 0.5%
safety_margin = max(min_step, cp * 0.005) # 至少 0.5% 的安全距离
sp = max(0.0, cp - safety_margin)
logger.warning(f"{symbol} [止损修正] BUY止损价({sp:.8f})太接近当前价({cp:.8f}),调整为 {sp:.8f}")
elif pd == "SELL":
# 做空止损:止损价必须 > 当前价,至少差一个 min_step
if sp <= cp:
# 如果止损价 <= 当前价,调整为当前价 + min_step但这样止损太紧可能不合理
# 更好的做法是:如果止损价太接近当前价,增加一个安全距离(例如 0.5%
safety_margin = max(min_step, cp * 0.005) # 至少 0.5% 的安全距离
sp = cp + safety_margin
logger.warning(f"{symbol} [止损修正] SELL止损价({sp:.8f})太接近当前价({cp:.8f}),调整为 {sp:.8f}")
if ttype == "TAKE_PROFIT_MARKET":
if pd == "BUY":
# 做多止盈:止盈价必须 > 当前价,至少差一个 min_step
if sp <= cp:
safety_margin = max(min_step, cp * 0.005)
sp = cp + safety_margin
logger.warning(f"{symbol} [止盈修正] BUY止盈价({sp:.8f})太接近当前价({cp:.8f}),调整为 {sp:.8f}")
elif pd == "SELL":
# 做空止盈:止盈价必须 < 当前价,至少差一个 min_step
if sp >= cp:
safety_margin = max(min_step, cp * 0.005)
sp = max(0.0, cp - safety_margin)
logger.warning(f"{symbol} [止盈修正] SELL止盈价({sp:.8f})太接近当前价({cp:.8f}),调整为 {sp:.8f}")
# rounding 规则(提高命中概率,避免“显示等于入场价”的误差带来立即触发/永不触发):
# 止损long 用 UP更靠近当前价short 用 DOWN
# 止盈long 用 DOWN更靠近当前价short 用 UP
rounding_mode = "DOWN"
if ttype == "STOP_MARKET":
rounding_mode = "UP" if pd == "BUY" else "DOWN"
else: # TAKE_PROFIT_MARKET
rounding_mode = "DOWN" if pd == "BUY" else "UP"
stop_price_str = self._format_price_str_with_rounding(sp, symbol_info, rounding_mode)
# Algo 条件单接口使用 triggerPrice不是 stopPrice
params: Dict[str, Any] = {
"algoType": "CONDITIONAL",
"symbol": symbol,
"side": close_side,
"type": ttype,
"triggerPrice": stop_price_str,
"workingType": working_type,
"closePosition": True,
}
# 对冲模式:必须指定 positionSide
if dual is True:
params["positionSide"] = "LONG" if pd == "BUY" else "SHORT"
# 走 Algo Order 接口(避免 -4120
order = await self.futures_create_algo_order(params)
if order:
return order
# 兜底:对冲/单向模式可能导致 positionSide 误判,尝试切换一次
logger.warning(f"{symbol} 首次挂保护单失败,尝试切换 positionSide 重试...")
if dual is True:
retry = dict(params)
retry.pop("positionSide", None)
logger.debug(f"{symbol} 重试1: 移除 positionSide对冲模式 -> 单向模式)")
retry_order = await self.futures_create_algo_order(retry)
if retry_order:
logger.info(f"{symbol} ✓ 重试成功(移除 positionSide")
return retry_order
else:
retry = dict(params)
retry["positionSide"] = "LONG" if pd == "BUY" else "SHORT"
logger.debug(f"{symbol} 重试1: 添加 positionSide={retry['positionSide']}(单向模式 -> 对冲模式)")
retry_order = await self.futures_create_algo_order(retry)
if retry_order:
logger.info(f"{symbol} ✓ 重试成功(添加 positionSide")
return retry_order
# 如果还是失败,记录详细参数用于调试
logger.error(f"{symbol} ❌ 所有重试都失败,保护单挂单失败")
logger.error(f" 参数: {params}")
logger.error(f" 对冲模式: {dual}")
return None
except BinanceAPIException as e:
error_code = e.code if hasattr(e, 'code') else None
error_msg = str(e)
# 详细错误日志
logger.error(f"{symbol} ❌ 挂保护单失败({trigger_type}): {error_msg}")
logger.error(f" 错误代码: {error_code}")
logger.error(f" 触发价格: {stop_price:.8f} (格式化后: {stop_price_str})")
logger.error(f" 当前价格: {cp if cp else 'N/A'}")
logger.error(f" 持仓方向: {pd}")
logger.error(f" 平仓方向: {close_side}")
logger.error(f" 工作类型: {working_type}")
if symbol_info:
logger.error(f" 价格精度: {pp}, 价格步长: {tick}")
# 常见错误码处理
if error_code == -4014:
logger.error(f" 原因: 价格步长错误,需要调整到 tickSize 的倍数")
elif error_code == -4164:
logger.error(f" 原因: 订单名义价值不足(至少需要 5 USDT")
elif error_code == -2022:
logger.error(f" 原因: ReduceOnly 订单被拒绝(可能没有持仓或持仓方向不对)")
elif "immediately trigger" in error_msg.lower() or "would immediately trigger" in error_msg.lower():
logger.error(f" 原因: 触发价格会导致立即触发(止损/止盈价不在正确一侧)")
logger.error(f" 建议: 检查止损/止盈价格计算是否正确")
elif "position" in error_msg.lower():
logger.error(f" 原因: 持仓相关问题(可能没有持仓或持仓方向不匹配)")
return None
except Exception as e:
logger.error(f"{symbol} ❌ 挂保护单失败({trigger_type}): {type(e).__name__}: {e}")
logger.error(f" 触发价格: {stop_price:.8f}")
logger.error(f" 持仓方向: {pd}")
import traceback
logger.debug(f" 堆栈跟踪: {traceback.format_exc()}")
return None
async def set_leverage(self, symbol: str, leverage: int = 10) -> bool:
"""
设置杠杆倍数
如果设置失败(比如超过交易对支持的最大杠杆),会自动降低杠杆重试
Args:
symbol: 交易对
leverage: 杠杆倍数
Returns:
是否成功
"""
try:
await self.client.futures_change_leverage(symbol=symbol, leverage=leverage)
logger.info(f"设置杠杆成功: {symbol} {leverage}x")
return True
except BinanceAPIException as e:
error_msg = str(e).lower()
# 如果错误信息包含杠杆相关的内容,尝试降低杠杆
if 'leverage' in error_msg or 'invalid' in error_msg:
# 尝试降低杠杆每次降低5倍最低到1倍
for reduced_leverage in range(leverage - 5, 0, -5):
if reduced_leverage < 1:
reduced_leverage = 1
try:
await self.client.futures_change_leverage(symbol=symbol, leverage=reduced_leverage)
logger.warning(
f"{symbol} 杠杆 {leverage}x 设置失败,已自动降低为 {reduced_leverage}x "
f"(原因: {e})"
)
return True
except BinanceAPIException:
if reduced_leverage <= 1:
break
continue
logger.error(f"设置杠杆失败: {symbol} {leverage}x, 错误: {e}")
return False
def get_realtime_price(self, symbol: str) -> Optional[float]:
"""
获取实时价格(从缓存)
Args:
symbol: 交易对
Returns:
实时价格如果缓存中有则返回否则返回None
"""
import time
if symbol in self._price_cache:
cached = self._price_cache[symbol]
cache_age = time.time() - cached.get('timestamp', 0)
if cache_age < self._price_cache_ttl:
return cached.get('price')
return None