auto_trade_sys/trading_system/risk_manager.py
薇薇安 81526052e7 a
2026-01-15 13:08:41 +08:00

507 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
风险管理模块 - 严格控制仓位和风险
"""
import logging
from typing import Dict, List, Optional
try:
from .binance_client import BinanceClient
from . import config
except ImportError:
from binance_client import BinanceClient
import config
logger = logging.getLogger(__name__)
class RiskManager:
"""风险管理类"""
def __init__(self, client: BinanceClient):
"""
初始化风险管理器
Args:
client: 币安客户端
"""
self.client = client
self.config = config.TRADING_CONFIG
async def check_position_size(self, symbol: str, quantity: float) -> bool:
"""
检查单笔仓位大小是否符合要求
Args:
symbol: 交易对
quantity: 下单数量
Returns:
是否通过检查
"""
try:
logger.info(f"检查 {symbol} 单笔仓位大小...")
# 获取账户余额
balance = await self.client.get_account_balance()
available_balance = balance.get('available', 0)
if available_balance <= 0:
logger.warning(f"{symbol} 账户可用余额不足: {available_balance:.2f} USDT")
return False
# 计算仓位价值(假设使用当前价格)
ticker = await self.client.get_ticker_24h(symbol)
if not ticker:
logger.warning(f"{symbol} 无法获取价格数据")
return False
current_price = ticker['price']
position_value = quantity * current_price
# 检查单笔仓位是否超过最大限制
max_position_value = available_balance * self.config['MAX_POSITION_PERCENT']
min_position_value = available_balance * self.config['MIN_POSITION_PERCENT']
max_position_pct = self.config['MAX_POSITION_PERCENT'] * 100
min_position_pct = self.config['MIN_POSITION_PERCENT'] * 100
logger.info(f" 数量: {quantity:.4f}")
logger.info(f" 价格: {current_price:.4f} USDT")
logger.info(f" 仓位价值: {position_value:.2f} USDT")
logger.info(f" 单笔最大限制: {max_position_value:.2f} USDT ({max_position_pct:.1f}%)")
logger.info(f" 单笔最小限制: {min_position_value:.2f} USDT ({min_position_pct:.1f}%)")
# 使用小的容差来处理浮点数精度问题0.01 USDT
tolerance = 0.01
if position_value > max_position_value + tolerance:
logger.warning(
f"{symbol} 单笔仓位过大: {position_value:.2f} USDT > "
f"最大限制: {max_position_value:.2f} USDT "
f"(超出: {position_value - max_position_value:.2f} USDT)"
)
return False
elif position_value > max_position_value:
# 在容差范围内,允许通过(浮点数精度问题)
logger.info(
f"{symbol} 仓位价值略超限制但 within 容差: "
f"{position_value:.2f} USDT vs {max_position_value:.2f} USDT "
f"(差异: {position_value - max_position_value:.4f} USDT)"
)
if position_value < min_position_value:
logger.warning(
f"{symbol} 单笔仓位过小: {position_value:.2f} USDT < "
f"最小限制: {min_position_value:.2f} USDT"
)
return False
logger.info(f"{symbol} 单笔仓位大小检查通过")
# 检查总仓位是否超过限制
logger.info(f"检查 {symbol} 总仓位限制...")
if not await self.check_total_position(position_value):
return False
logger.info(
f"{symbol} 所有仓位检查通过: {position_value:.2f} USDT "
f"(账户可用余额: {available_balance:.2f} USDT)"
)
return True
except Exception as e:
logger.error(f"检查仓位大小失败 {symbol}: {e}", exc_info=True)
return False
async def check_total_position(self, new_position_value: float) -> bool:
"""
检查总仓位是否超过限制
Args:
new_position_value: 新仓位价值
Returns:
是否通过检查
"""
try:
# 获取当前持仓
positions = await self.client.get_open_positions()
# 计算当前总仓位价值
current_position_values = []
total_position_value = 0
for pos in positions:
position_value = abs(pos['positionAmt'] * pos['entryPrice'])
current_position_values.append({
'symbol': pos['symbol'],
'value': position_value,
'amount': pos['positionAmt'],
'entryPrice': pos['entryPrice']
})
total_position_value += position_value
# 加上新仓位
total_with_new = total_position_value + new_position_value
# 获取账户余额
balance = await self.client.get_account_balance()
total_balance = balance.get('total', 0)
available_balance = balance.get('available', 0)
if total_balance <= 0:
logger.warning("账户总余额为0无法开仓")
return False
max_total_position = total_balance * self.config['MAX_TOTAL_POSITION_PERCENT']
max_total_position_pct = self.config['MAX_TOTAL_POSITION_PERCENT'] * 100
# 详细日志
logger.info("=" * 60)
logger.info("总仓位检查详情:")
logger.info(f" 账户总余额: {total_balance:.2f} USDT")
logger.info(f" 账户可用余额: {available_balance:.2f} USDT")
logger.info(f" 总仓位上限: {max_total_position:.2f} USDT ({max_total_position_pct:.1f}%)")
logger.info(f" 当前持仓数量: {len(positions)}")
if current_position_values:
logger.info(" 当前持仓明细:")
for pos_info in current_position_values:
logger.info(
f" - {pos_info['symbol']}: "
f"{pos_info['value']:.2f} USDT "
f"(数量: {pos_info['amount']:.4f}, "
f"入场价: {pos_info['entryPrice']:.4f})"
)
logger.info(f" 当前总仓位: {total_position_value:.2f} USDT")
logger.info(f" 新仓位价值: {new_position_value:.2f} USDT")
logger.info(f" 开仓后总仓位: {total_with_new:.2f} USDT")
logger.info(f" 剩余可用仓位: {max_total_position - total_position_value:.2f} USDT")
if total_with_new > max_total_position:
logger.warning("=" * 60)
logger.warning(
f"❌ 总仓位超限: {total_with_new:.2f} USDT > "
f"最大限制: {max_total_position:.2f} USDT"
)
logger.warning(
f" 超出: {total_with_new - max_total_position:.2f} USDT "
f"({((total_with_new - max_total_position) / max_total_position * 100):.1f}%)"
)
logger.warning(" 建议: 平掉部分持仓或等待现有持仓平仓后再开新仓")
logger.warning("=" * 60)
return False
logger.info(
f"✓ 总仓位检查通过: {total_with_new:.2f} USDT / "
f"最大限制: {max_total_position:.2f} USDT "
f"({(total_with_new / max_total_position * 100):.1f}%)"
)
logger.info("=" * 60)
return True
except Exception as e:
logger.error(f"检查总仓位失败: {e}", exc_info=True)
return False
async def calculate_position_size(
self,
symbol: str,
change_percent: float
) -> Optional[float]:
"""
根据涨跌幅和风险参数计算合适的仓位大小
Args:
symbol: 交易对
change_percent: 涨跌幅百分比
Returns:
建议的仓位数量如果不符合条件则返回None
"""
try:
logger.info(f"开始计算 {symbol} 的仓位大小...")
# 获取账户余额
balance = await self.client.get_account_balance()
available_balance = balance.get('available', 0)
total_balance = balance.get('total', 0)
logger.info(f" 账户可用余额: {available_balance:.2f} USDT")
logger.info(f" 账户总余额: {total_balance:.2f} USDT")
if available_balance <= 0:
logger.warning(f"{symbol} 账户可用余额不足: {available_balance:.2f} USDT")
return None
# 获取当前价格
ticker = await self.client.get_ticker_24h(symbol)
if not ticker:
logger.warning(f"{symbol} 无法获取价格数据")
return None
current_price = ticker['price']
logger.info(f" 当前价格: {current_price:.4f} USDT")
# 根据涨跌幅调整仓位大小(涨跌幅越大,仓位可以适当增加)
base_position_percent = self.config['MAX_POSITION_PERCENT']
max_position_percent = self.config['MAX_POSITION_PERCENT']
min_position_percent = self.config['MIN_POSITION_PERCENT']
# 涨跌幅超过5%时可以适当增加仓位但不超过1.5倍)
if abs(change_percent) > 5:
position_percent = min(
base_position_percent * 1.5,
max_position_percent * 1.5
)
logger.info(f" 涨跌幅 {change_percent:.2f}% > 5%,使用增强仓位比例: {position_percent*100:.1f}%")
else:
position_percent = base_position_percent
logger.info(f" 涨跌幅 {change_percent:.2f}%,使用标准仓位比例: {position_percent*100:.1f}%")
# 计算仓位价值
position_value = available_balance * position_percent
logger.info(f" 计算仓位价值: {position_value:.2f} USDT ({position_percent*100:.1f}% of {available_balance:.2f})")
# 确保仓位价值满足最小名义价值要求币安要求至少5 USDT
min_notional = 5.0 # 币安合约最小名义价值
if position_value < min_notional:
logger.warning(f" ⚠ 仓位价值 {position_value:.2f} USDT < 最小名义价值 {min_notional:.2f} USDT")
# 计算需要的最小仓位比例来满足最小名义价值
required_position_percent = min_notional / available_balance
logger.info(f" 需要的最小仓位比例: {required_position_percent*100:.2f}% (最小名义价值 {min_notional:.2f} USDT / 可用余额 {available_balance:.2f} USDT)")
# 检查是否可以使用更大的仓位比例(但不超过最大仓位限制)
if required_position_percent <= max_position_percent:
# 可以使用更大的仓位比例来满足最小名义价值
position_percent = required_position_percent
position_value = min_notional
logger.info(f" ✓ 调整仓位比例到 {position_percent*100:.2f}% 以满足最小名义价值: {position_value:.2f} USDT")
else:
# 即使使用最大仓位比例也无法满足最小名义价值
max_allowed_value = available_balance * max_position_percent
logger.warning(
f" ❌ 无法满足最小名义价值要求: "
f"需要 {min_notional:.2f} USDT (仓位比例 {required_position_percent*100:.2f}%)"
f"但最大允许 {max_allowed_value:.2f} USDT (仓位比例 {max_position_percent*100:.1f}%)"
)
logger.warning(f" 💡 建议: 增加账户余额到至少 {min_notional / max_position_percent:.2f} USDT 才能满足最小名义价值要求")
return None
# 计算数量(考虑合约的最小数量精度)
quantity = position_value / current_price
logger.info(f" 计算数量: {quantity:.4f} (价值: {position_value:.2f} / 价格: {current_price:.4f})")
# 验证计算出的数量对应的名义价值
calculated_notional = quantity * current_price
if calculated_notional < min_notional:
# 如果计算出的名义价值仍然不足,增加数量
required_quantity = min_notional / current_price
logger.warning(f" ⚠ 计算出的名义价值 {calculated_notional:.2f} USDT < {min_notional:.2f} USDT")
logger.info(f" ✓ 调整数量从 {quantity:.4f}{required_quantity:.4f}")
quantity = required_quantity
position_value = required_quantity * current_price
# 检查最小保证金要求(避免手续费侵蚀收益)
min_margin_usdt = self.config.get('MIN_MARGIN_USDT', 0.5) # 默认0.5 USDT
leverage = self.config.get('LEVERAGE', 10)
# 计算实际需要的保证金 = 仓位价值 / 杠杆
required_margin = position_value / leverage
logger.info(f" 计算保证金: {required_margin:.4f} USDT (仓位价值: {position_value:.2f} USDT / 杠杆: {leverage}x)")
if required_margin < min_margin_usdt:
# 保证金不足,需要增加仓位价值
required_position_value = min_margin_usdt * leverage
logger.warning(
f" ⚠ 保证金 {required_margin:.4f} USDT < 最小保证金要求 {min_margin_usdt:.2f} USDT"
)
logger.info(
f" 需要的最小仓位价值: {required_position_value:.2f} USDT "
f"(最小保证金 {min_margin_usdt:.2f} USDT × 杠杆 {leverage}x)"
)
# 检查是否可以使用更大的仓位价值(但不超过最大仓位限制)
max_position_value = available_balance * max_position_percent
if required_position_value <= max_position_value:
# 可以增加仓位价值以满足最小保证金要求
position_value = required_position_value
quantity = position_value / current_price
logger.info(
f" ✓ 调整仓位价值到 {position_value:.2f} USDT "
f"以满足最小保证金要求 (保证金: {min_margin_usdt:.2f} USDT)"
)
else:
# 即使使用最大仓位也无法满足最小保证金要求
max_margin = max_position_value / leverage
logger.warning(
f" ❌ 无法满足最小保证金要求: "
f"需要 {min_margin_usdt:.2f} USDT 保证金 (仓位价值 {required_position_value:.2f} USDT)"
f"但最大允许 {max_margin:.2f} USDT 保证金 (仓位价值 {max_position_value:.2f} USDT)"
)
logger.warning(
f" 💡 建议: 增加账户余额到至少 "
f"{required_position_value / max_position_percent:.2f} USDT "
f"才能满足最小保证金要求"
)
return None
# 检查是否通过风险控制
logger.info(f" 检查仓位大小是否符合风险控制要求...")
if await self.check_position_size(symbol, quantity):
final_margin = (quantity * current_price) / leverage
logger.info(
f"{symbol} 仓位计算成功: {quantity:.4f} "
f"(仓位价值: {position_value:.2f} USDT, "
f"名义价值: {quantity * current_price:.2f} USDT, "
f"保证金: {final_margin:.4f} USDT)"
)
return quantity
else:
logger.warning(f"{symbol} 仓位检查未通过,无法开仓")
return None
except Exception as e:
logger.error(f"计算仓位大小失败 {symbol}: {e}", exc_info=True)
return None
async def should_trade(self, symbol: str, change_percent: float) -> bool:
"""
判断是否应该交易
Args:
symbol: 交易对
change_percent: 涨跌幅百分比
Returns:
是否应该交易
"""
# 检查最小涨跌幅阈值
if abs(change_percent) < self.config['MIN_CHANGE_PERCENT']:
logger.debug(f"{symbol} 涨跌幅 {change_percent:.2f}% 小于阈值")
return False
# 检查是否已有持仓
positions = await self.client.get_open_positions()
existing_position = next(
(p for p in positions if p['symbol'] == symbol),
None
)
if existing_position:
logger.info(f"{symbol} 已有持仓,跳过")
return False
return True
def get_stop_loss_price(
self,
entry_price: float,
side: str,
stop_loss_pct: Optional[float] = None,
klines: Optional[List] = None,
bollinger: Optional[Dict] = None,
atr: Optional[float] = None
) -> float:
"""
计算止损价格(基于支撑/阻力的动态止损)
Args:
entry_price: 入场价格
side: 方向 'BUY''SELL'
stop_loss_pct: 止损百分比如果为None则使用配置值
klines: K线数据用于计算支撑/阻力位
bollinger: 布林带数据,用于计算动态止损
atr: 平均真实波幅,用于计算动态止损
Returns:
止损价格
"""
# 优先使用基于技术结构的动态止损
if klines and len(klines) >= 10:
# 计算支撑/阻力位
low_prices = [float(k[3]) for k in klines[-20:]] # 最近20根K线的最低价
high_prices = [float(k[2]) for k in klines[-20:]] # 最近20根K线的最高价
if side == 'BUY': # 做多,止损放在支撑位下方
# 找到近期波段低点
recent_low = min(low_prices)
# 止损放在低点下方0.5-1%
buffer = entry_price * 0.005 # 0.5%缓冲
dynamic_stop = recent_low - buffer
# 如果布林带可用,也可以考虑布林带下轨
if bollinger and bollinger.get('lower'):
bollinger_stop = bollinger['lower'] * 0.995 # 布林带下轨下方0.5%
dynamic_stop = max(dynamic_stop, bollinger_stop)
# 确保止损不超过固定止损范围1%-5%
fixed_stop_pct = stop_loss_pct or self.config['STOP_LOSS_PERCENT']
max_stop = entry_price * (1 - 0.01) # 最多1%止损
min_stop = entry_price * (1 - 0.05) # 最少5%止损(极端情况)
dynamic_stop = max(min_stop, min(max_stop, dynamic_stop))
logger.info(
f"动态止损计算 (BUY): 入场价={entry_price:.4f}, "
f"近期低点={recent_low:.4f}, 动态止损={dynamic_stop:.4f}, "
f"止损比例={((entry_price - dynamic_stop) / entry_price * 100):.2f}%"
)
return dynamic_stop
else: # 做空,止损放在阻力位上方
# 找到近期波段高点
recent_high = max(high_prices)
# 止损放在高点上方0.5-1%
buffer = entry_price * 0.005 # 0.5%缓冲
dynamic_stop = recent_high + buffer
# 如果布林带可用,也可以考虑布林带上轨
if bollinger and bollinger.get('upper'):
bollinger_stop = bollinger['upper'] * 1.005 # 布林带上轨上方0.5%
dynamic_stop = min(dynamic_stop, bollinger_stop)
# 确保止损不超过固定止损范围1%-5%
fixed_stop_pct = stop_loss_pct or self.config['STOP_LOSS_PERCENT']
min_stop = entry_price * (1 + 0.01) # 最少1%止损
max_stop = entry_price * (1 + 0.05) # 最多5%止损(极端情况)
dynamic_stop = min(max_stop, max(min_stop, dynamic_stop))
logger.info(
f"动态止损计算 (SELL): 入场价={entry_price:.4f}, "
f"近期高点={recent_high:.4f}, 动态止损={dynamic_stop:.4f}, "
f"止损比例={((dynamic_stop - entry_price) / entry_price * 100):.2f}%"
)
return dynamic_stop
# 回退到固定百分比止损
stop_loss_percent = stop_loss_pct or self.config['STOP_LOSS_PERCENT']
if side == 'BUY': # 做多,止损价低于入场价
return entry_price * (1 - stop_loss_percent)
else: # 做空,止损价高于入场价
return entry_price * (1 + stop_loss_percent)
def get_take_profit_price(
self,
entry_price: float,
side: str,
take_profit_pct: Optional[float] = None
) -> float:
"""
计算止盈价格
Args:
entry_price: 入场价格
side: 方向 'BUY''SELL'
take_profit_pct: 止盈百分比如果为None则使用配置值
Returns:
止盈价格
"""
take_profit_percent = take_profit_pct or self.config['TAKE_PROFIT_PERCENT']
if side == 'BUY': # 做多,止盈价高于入场价
return entry_price * (1 + take_profit_percent)
else: # 做空,止盈价低于入场价
return entry_price * (1 - take_profit_percent)