""" 风险管理模块 - 严格控制仓位和风险 """ import logging from typing import Dict, List, Optional from binance_client import BinanceClient import config logger = logging.getLogger(__name__) class RiskManager: """风险管理类""" def __init__(self, client: BinanceClient): """ 初始化风险管理器 Args: client: 币安客户端 """ self.client = client self.config = config.TRADING_CONFIG async def check_position_size(self, symbol: str, quantity: float) -> bool: """ 检查单笔仓位大小是否符合要求 Args: symbol: 交易对 quantity: 下单数量 Returns: 是否通过检查 """ try: logger.info(f"检查 {symbol} 单笔仓位大小...") # 获取账户余额 balance = await self.client.get_account_balance() available_balance = balance.get('available', 0) if available_balance <= 0: logger.warning(f"❌ {symbol} 账户可用余额不足: {available_balance:.2f} USDT") return False # 计算仓位价值(假设使用当前价格) ticker = await self.client.get_ticker_24h(symbol) if not ticker: logger.warning(f"❌ {symbol} 无法获取价格数据") return False current_price = ticker['price'] position_value = quantity * current_price # 检查单笔仓位是否超过最大限制 max_position_value = available_balance * self.config['MAX_POSITION_PERCENT'] min_position_value = available_balance * self.config['MIN_POSITION_PERCENT'] max_position_pct = self.config['MAX_POSITION_PERCENT'] * 100 min_position_pct = self.config['MIN_POSITION_PERCENT'] * 100 logger.info(f" 数量: {quantity:.4f}") logger.info(f" 价格: {current_price:.4f} USDT") logger.info(f" 仓位价值: {position_value:.2f} USDT") logger.info(f" 单笔最大限制: {max_position_value:.2f} USDT ({max_position_pct:.1f}%)") logger.info(f" 单笔最小限制: {min_position_value:.2f} USDT ({min_position_pct:.1f}%)") if position_value > max_position_value: logger.warning( f"❌ {symbol} 单笔仓位过大: {position_value:.2f} USDT > " f"最大限制: {max_position_value:.2f} USDT " f"(超出: {position_value - max_position_value:.2f} USDT)" ) return False if position_value < min_position_value: logger.warning( f"❌ {symbol} 单笔仓位过小: {position_value:.2f} USDT < " f"最小限制: {min_position_value:.2f} USDT" ) return False logger.info(f"✓ {symbol} 单笔仓位大小检查通过") # 检查总仓位是否超过限制 logger.info(f"检查 {symbol} 总仓位限制...") if not await self.check_total_position(position_value): return False logger.info( f"✓ {symbol} 所有仓位检查通过: {position_value:.2f} USDT " f"(账户可用余额: {available_balance:.2f} USDT)" ) return True except Exception as e: logger.error(f"检查仓位大小失败 {symbol}: {e}", exc_info=True) return False async def check_total_position(self, new_position_value: float) -> bool: """ 检查总仓位是否超过限制 Args: new_position_value: 新仓位价值 Returns: 是否通过检查 """ try: # 获取当前持仓 positions = await self.client.get_open_positions() # 计算当前总仓位价值 current_position_values = [] total_position_value = 0 for pos in positions: position_value = abs(pos['positionAmt'] * pos['entryPrice']) current_position_values.append({ 'symbol': pos['symbol'], 'value': position_value, 'amount': pos['positionAmt'], 'entryPrice': pos['entryPrice'] }) total_position_value += position_value # 加上新仓位 total_with_new = total_position_value + new_position_value # 获取账户余额 balance = await self.client.get_account_balance() total_balance = balance.get('total', 0) available_balance = balance.get('available', 0) if total_balance <= 0: logger.warning("账户总余额为0,无法开仓") return False max_total_position = total_balance * self.config['MAX_TOTAL_POSITION_PERCENT'] max_total_position_pct = self.config['MAX_TOTAL_POSITION_PERCENT'] * 100 # 详细日志 logger.info("=" * 60) logger.info("总仓位检查详情:") logger.info(f" 账户总余额: {total_balance:.2f} USDT") logger.info(f" 账户可用余额: {available_balance:.2f} USDT") logger.info(f" 总仓位上限: {max_total_position:.2f} USDT ({max_total_position_pct:.1f}%)") logger.info(f" 当前持仓数量: {len(positions)} 个") if current_position_values: logger.info(" 当前持仓明细:") for pos_info in current_position_values: logger.info( f" - {pos_info['symbol']}: " f"{pos_info['value']:.2f} USDT " f"(数量: {pos_info['amount']:.4f}, " f"入场价: {pos_info['entryPrice']:.4f})" ) logger.info(f" 当前总仓位: {total_position_value:.2f} USDT") logger.info(f" 新仓位价值: {new_position_value:.2f} USDT") logger.info(f" 开仓后总仓位: {total_with_new:.2f} USDT") logger.info(f" 剩余可用仓位: {max_total_position - total_position_value:.2f} USDT") if total_with_new > max_total_position: logger.warning("=" * 60) logger.warning( f"❌ 总仓位超限: {total_with_new:.2f} USDT > " f"最大限制: {max_total_position:.2f} USDT" ) logger.warning( f" 超出: {total_with_new - max_total_position:.2f} USDT " f"({((total_with_new - max_total_position) / max_total_position * 100):.1f}%)" ) logger.warning(" 建议: 平掉部分持仓或等待现有持仓平仓后再开新仓") logger.warning("=" * 60) return False logger.info( f"✓ 总仓位检查通过: {total_with_new:.2f} USDT / " f"最大限制: {max_total_position:.2f} USDT " f"({(total_with_new / max_total_position * 100):.1f}%)" ) logger.info("=" * 60) return True except Exception as e: logger.error(f"检查总仓位失败: {e}", exc_info=True) return False async def calculate_position_size( self, symbol: str, change_percent: float ) -> Optional[float]: """ 根据涨跌幅和风险参数计算合适的仓位大小 Args: symbol: 交易对 change_percent: 涨跌幅百分比 Returns: 建议的仓位数量,如果不符合条件则返回None """ try: logger.info(f"开始计算 {symbol} 的仓位大小...") # 获取账户余额 balance = await self.client.get_account_balance() available_balance = balance.get('available', 0) total_balance = balance.get('total', 0) logger.info(f" 账户可用余额: {available_balance:.2f} USDT") logger.info(f" 账户总余额: {total_balance:.2f} USDT") if available_balance <= 0: logger.warning(f"❌ {symbol} 账户可用余额不足: {available_balance:.2f} USDT") return None # 获取当前价格 ticker = await self.client.get_ticker_24h(symbol) if not ticker: logger.warning(f"❌ {symbol} 无法获取价格数据") return None current_price = ticker['price'] logger.info(f" 当前价格: {current_price:.4f} USDT") # 根据涨跌幅调整仓位大小(涨跌幅越大,仓位可以适当增加) base_position_percent = self.config['MAX_POSITION_PERCENT'] max_position_percent = self.config['MAX_POSITION_PERCENT'] min_position_percent = self.config['MIN_POSITION_PERCENT'] # 涨跌幅超过5%时,可以适当增加仓位(但不超过1.5倍) if abs(change_percent) > 5: position_percent = min( base_position_percent * 1.5, max_position_percent * 1.5 ) logger.info(f" 涨跌幅 {change_percent:.2f}% > 5%,使用增强仓位比例: {position_percent*100:.1f}%") else: position_percent = base_position_percent logger.info(f" 涨跌幅 {change_percent:.2f}%,使用标准仓位比例: {position_percent*100:.1f}%") # 计算仓位价值 position_value = available_balance * position_percent logger.info(f" 计算仓位价值: {position_value:.2f} USDT ({position_percent*100:.1f}% of {available_balance:.2f})") # 计算数量(考虑合约的最小数量精度) quantity = position_value / current_price logger.info(f" 计算数量: {quantity:.4f} (价值: {position_value:.2f} / 价格: {current_price:.4f})") # 检查是否通过风险控制 logger.info(f" 检查仓位大小是否符合风险控制要求...") if await self.check_position_size(symbol, quantity): logger.info(f"✓ {symbol} 仓位计算成功: {quantity:.4f} (价值: {position_value:.2f} USDT)") return quantity else: logger.warning(f"❌ {symbol} 仓位检查未通过,无法开仓") return None except Exception as e: logger.error(f"计算仓位大小失败 {symbol}: {e}", exc_info=True) return None async def should_trade(self, symbol: str, change_percent: float) -> bool: """ 判断是否应该交易 Args: symbol: 交易对 change_percent: 涨跌幅百分比 Returns: 是否应该交易 """ # 检查最小涨跌幅阈值 if abs(change_percent) < self.config['MIN_CHANGE_PERCENT']: logger.debug(f"{symbol} 涨跌幅 {change_percent:.2f}% 小于阈值") return False # 检查是否已有持仓 positions = await self.client.get_open_positions() existing_position = next( (p for p in positions if p['symbol'] == symbol), None ) if existing_position: logger.info(f"{symbol} 已有持仓,跳过") return False return True def get_stop_loss_price( self, entry_price: float, side: str, stop_loss_pct: Optional[float] = None ) -> float: """ 计算止损价格 Args: entry_price: 入场价格 side: 方向 'BUY' 或 'SELL' stop_loss_pct: 止损百分比,如果为None则使用配置值 Returns: 止损价格 """ stop_loss_percent = stop_loss_pct or self.config['STOP_LOSS_PERCENT'] if side == 'BUY': # 做多,止损价低于入场价 return entry_price * (1 - stop_loss_percent) else: # 做空,止损价高于入场价 return entry_price * (1 + stop_loss_percent) def get_take_profit_price( self, entry_price: float, side: str, take_profit_pct: Optional[float] = None ) -> float: """ 计算止盈价格 Args: entry_price: 入场价格 side: 方向 'BUY' 或 'SELL' take_profit_pct: 止盈百分比,如果为None则使用配置值 Returns: 止盈价格 """ take_profit_percent = take_profit_pct or self.config['TAKE_PROFIT_PERCENT'] if side == 'BUY': # 做多,止盈价高于入场价 return entry_price * (1 + take_profit_percent) else: # 做空,止盈价低于入场价 return entry_price * (1 - take_profit_percent)