auto_trade_sys/trading_system/position_manager.py
薇薇安 8e8a8e633f a
2026-01-14 18:54:35 +08:00

771 lines
33 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
仓位管理模块 - 管理持仓和订单
"""
import asyncio
import logging
from typing import Dict, List, Optional
try:
from .binance_client import BinanceClient
from .risk_manager import RiskManager
from . import config
except ImportError:
from binance_client import BinanceClient
from risk_manager import RiskManager
import config
logger = logging.getLogger(__name__)
# 尝试导入数据库模型(如果可用)
DB_AVAILABLE = False
Trade = None
try:
import sys
from pathlib import Path
project_root = Path(__file__).parent.parent
backend_path = project_root / 'backend'
if backend_path.exists():
sys.path.insert(0, str(backend_path))
from database.models import Trade
DB_AVAILABLE = True
logger.info("✓ 数据库模型导入成功,交易记录将保存到数据库")
else:
logger.warning("⚠ backend目录不存在无法使用数据库功能")
DB_AVAILABLE = False
except ImportError as e:
logger.warning(f"⚠ 无法导入数据库模型: {e}")
logger.warning(" 交易记录将不会保存到数据库")
DB_AVAILABLE = False
except Exception as e:
logger.warning(f"⚠ 数据库初始化失败: {e}")
logger.warning(" 交易记录将不会保存到数据库")
DB_AVAILABLE = False
class PositionManager:
"""仓位管理类"""
def __init__(self, client: BinanceClient, risk_manager: RiskManager):
"""
初始化仓位管理器
Args:
client: 币安客户端
risk_manager: 风险管理器
"""
self.client = client
self.risk_manager = risk_manager
self.active_positions: Dict[str, Dict] = {}
self._monitor_tasks: Dict[str, asyncio.Task] = {} # WebSocket监控任务字典
self._monitoring_enabled = True # 是否启用实时监控
async def open_position(
self,
symbol: str,
change_percent: float,
leverage: int = 10,
trade_direction: Optional[str] = None,
entry_reason: str = '',
atr: Optional[float] = None
) -> Optional[Dict]:
"""
开仓
Args:
symbol: 交易对
change_percent: 涨跌幅百分比
leverage: 杠杆倍数
Returns:
订单信息失败返回None
"""
try:
# 判断是否应该交易
if not await self.risk_manager.should_trade(symbol, change_percent):
return None
# 设置杠杆
await self.client.set_leverage(symbol, leverage)
# 计算仓位大小
logger.info(f"开始为 {symbol} 计算仓位大小...")
quantity = await self.risk_manager.calculate_position_size(
symbol, change_percent
)
if quantity is None:
logger.warning(f"{symbol} 仓位计算失败,跳过交易")
logger.warning(f" 可能原因:")
logger.warning(f" 1. 账户余额不足")
logger.warning(f" 2. 单笔仓位超过限制")
logger.warning(f" 3. 总仓位超过限制")
logger.warning(f" 4. 无法获取价格数据")
return None
logger.info(f"{symbol} 仓位计算成功: {quantity:.4f}")
# 确定交易方向(优先使用技术指标信号)
if trade_direction:
side = trade_direction
else:
side = 'BUY' if change_percent > 0 else 'SELL'
# 获取当前价格
ticker = await self.client.get_ticker_24h(symbol)
if not ticker:
return None
entry_price = ticker['price']
# 计算动态止损止盈使用ATR或固定比例
if atr and atr > 0:
# 使用ATR计算动态止损2倍ATR
atr_stop_loss_pct = (atr * 2) / entry_price
# 限制在合理范围内1%-5%
atr_stop_loss_pct = max(0.01, min(0.05, atr_stop_loss_pct))
stop_loss_price = self.risk_manager.get_stop_loss_price(
entry_price, side, stop_loss_pct=atr_stop_loss_pct
)
# 止盈为止损的1.5-2倍
take_profit_pct = atr_stop_loss_pct * 1.8
take_profit_price = self.risk_manager.get_take_profit_price(
entry_price, side, take_profit_pct=take_profit_pct
)
else:
# 使用固定止损止盈
stop_loss_price = self.risk_manager.get_stop_loss_price(entry_price, side)
take_profit_price = self.risk_manager.get_take_profit_price(entry_price, side)
# 下单
order = await self.client.place_order(
symbol=symbol,
side=side,
quantity=quantity,
order_type='MARKET'
)
if order:
# 记录到数据库
trade_id = None
if DB_AVAILABLE and Trade:
try:
logger.info(f"正在保存 {symbol} 交易记录到数据库...")
trade_id = Trade.create(
symbol=symbol,
side=side,
quantity=quantity,
entry_price=entry_price,
leverage=leverage,
entry_reason=entry_reason
)
logger.info(f"{symbol} 交易记录已保存到数据库 (ID: {trade_id})")
except Exception as e:
logger.error(f"❌ 保存交易记录到数据库失败: {e}")
logger.error(f" 错误类型: {type(e).__name__}")
import traceback
logger.error(f" 错误详情:\n{traceback.format_exc()}")
elif not DB_AVAILABLE:
logger.debug(f"数据库不可用,跳过保存 {symbol} 交易记录")
elif not Trade:
logger.warning(f"Trade模型未导入无法保存 {symbol} 交易记录")
# 记录持仓信息(包含动态止损止盈)
position_info = {
'symbol': symbol,
'side': side,
'quantity': quantity,
'entryPrice': entry_price,
'changePercent': change_percent,
'orderId': order.get('orderId'),
'tradeId': trade_id, # 数据库交易ID
'stopLoss': stop_loss_price,
'takeProfit': take_profit_price,
'initialStopLoss': stop_loss_price, # 初始止损(用于移动止损)
'leverage': leverage,
'entryReason': entry_reason,
'atr': atr,
'maxProfit': 0.0, # 记录最大盈利(用于移动止损)
'trailingStopActivated': False # 移动止损是否已激活
}
self.active_positions[symbol] = position_info
logger.info(
f"开仓成功: {symbol} {side} {quantity} @ {entry_price:.4f} "
f"(涨跌幅: {change_percent:.2f}%)"
)
# 启动WebSocket实时监控
if self._monitoring_enabled:
await self._start_position_monitoring(symbol)
return position_info
return None
except Exception as e:
logger.error(f"开仓失败 {symbol}: {e}")
return None
async def close_position(self, symbol: str) -> bool:
"""
平仓
Args:
symbol: 交易对
Returns:
是否成功
"""
try:
# 获取当前持仓
positions = await self.client.get_open_positions()
position = next(
(p for p in positions if p['symbol'] == symbol),
None
)
if not position:
logger.warning(f"{symbol} 没有持仓")
return False
# 确定平仓方向(与开仓相反)
position_amt = position['positionAmt']
side = 'SELL' if position_amt > 0 else 'BUY'
quantity = abs(position_amt)
# 平仓
order = await self.client.place_order(
symbol=symbol,
side=side,
quantity=quantity,
order_type='MARKET'
)
if order:
# 获取平仓价格
ticker = await self.client.get_ticker_24h(symbol)
if not ticker:
logger.warning(f"无法获取 {symbol} 价格,使用订单价格")
exit_price = float(order.get('avgPrice', 0)) or float(order.get('price', 0))
else:
exit_price = ticker['price']
# 更新数据库记录
if DB_AVAILABLE and Trade and symbol in self.active_positions:
position_info = self.active_positions[symbol]
trade_id = position_info.get('tradeId')
if trade_id:
try:
logger.info(f"正在更新 {symbol} 平仓记录到数据库 (ID: {trade_id})...")
# 计算盈亏
entry_price = position_info['entryPrice']
if position_info['side'] == 'BUY':
pnl = (exit_price - entry_price) * quantity
pnl_percent = ((exit_price - entry_price) / entry_price) * 100
else: # SELL
pnl = (entry_price - exit_price) * quantity
pnl_percent = ((entry_price - exit_price) / entry_price) * 100
Trade.update_exit(
trade_id=trade_id,
exit_price=exit_price,
exit_reason='manual',
pnl=pnl,
pnl_percent=pnl_percent
)
logger.info(f"{symbol} 平仓记录已更新到数据库 (盈亏: {pnl:.2f} USDT, {pnl_percent:.2f}%)")
except Exception as e:
logger.error(f"❌ 更新平仓记录到数据库失败: {e}")
logger.error(f" 错误类型: {type(e).__name__}")
import traceback
logger.error(f" 错误详情:\n{traceback.format_exc()}")
else:
logger.warning(f"{symbol} 没有关联的数据库交易ID无法更新平仓记录")
elif not DB_AVAILABLE:
logger.debug(f"数据库不可用,跳过更新 {symbol} 平仓记录")
elif not Trade:
logger.warning(f"Trade模型未导入无法更新 {symbol} 平仓记录")
# 停止WebSocket监控
await self._stop_position_monitoring(symbol)
# 移除持仓记录
if symbol in self.active_positions:
del self.active_positions[symbol]
logger.info(f"平仓成功: {symbol} {side} {quantity}")
return True
return False
except Exception as e:
logger.error(f"平仓失败 {symbol}: {e}")
return False
async def check_stop_loss_take_profit(self) -> List[str]:
"""
检查止损止盈
Returns:
需要平仓的交易对列表
"""
closed_positions = []
try:
# 获取当前持仓
positions = await self.client.get_open_positions()
position_dict = {p['symbol']: p for p in positions}
for symbol, position_info in list(self.active_positions.items()):
if symbol not in position_dict:
# 持仓已不存在,移除记录
del self.active_positions[symbol]
continue
current_position = position_dict[symbol]
entry_price = position_info['entryPrice']
quantity = position_info['quantity'] # 修复获取quantity
# 获取当前标记价格
current_price = current_position.get('markPrice', 0)
if current_price == 0:
# 如果标记价格为0尝试从ticker获取
ticker = await self.client.get_ticker_24h(symbol)
if ticker:
current_price = ticker['price']
else:
current_price = entry_price
# 计算当前盈亏
if position_info['side'] == 'BUY':
pnl_percent = ((current_price - entry_price) / entry_price) * 100
else:
pnl_percent = ((entry_price - current_price) / entry_price) * 100
# 更新最大盈利
if pnl_percent > position_info.get('maxProfit', 0):
position_info['maxProfit'] = pnl_percent
# 移动止损逻辑(盈利后保护利润)
use_trailing = config.TRADING_CONFIG.get('USE_TRAILING_STOP', True)
if use_trailing:
trailing_activation = config.TRADING_CONFIG.get('TRAILING_STOP_ACTIVATION', 0.01)
trailing_protect = config.TRADING_CONFIG.get('TRAILING_STOP_PROTECT', 0.01)
if not position_info.get('trailingStopActivated', False):
# 盈利超过阈值后,激活移动止损
if pnl_percent > trailing_activation * 100:
position_info['trailingStopActivated'] = True
# 将止损移至成本价(保本)
position_info['stopLoss'] = entry_price
logger.info(
f"{symbol} 移动止损激活: 止损移至成本价 {entry_price:.4f} "
f"(盈利: {pnl_percent:.2f}%)"
)
else:
# 盈利超过2%后,止损移至保护利润位
if pnl_percent > 2.0:
if position_info['side'] == 'BUY':
new_stop_loss = entry_price * (1 + trailing_protect)
if new_stop_loss > position_info['stopLoss']:
position_info['stopLoss'] = new_stop_loss
logger.info(
f"{symbol} 移动止损更新: {new_stop_loss:.4f} "
f"(保护{trailing_protect*100:.1f}%利润)"
)
else:
new_stop_loss = entry_price * (1 - trailing_protect)
if new_stop_loss < position_info['stopLoss']:
position_info['stopLoss'] = new_stop_loss
logger.info(
f"{symbol} 移动止损更新: {new_stop_loss:.4f} "
f"(保护{trailing_protect*100:.1f}%利润)"
)
# 检查止损(使用更新后的止损价)
stop_loss = position_info['stopLoss']
if position_info['side'] == 'BUY' and current_price <= stop_loss:
logger.warning(
f"{symbol} 触发止损: {current_price:.4f} <= {stop_loss:.4f} "
f"(盈亏: {pnl_percent:.2f}%)"
)
# 更新数据库
if DB_AVAILABLE:
trade_id = position_info.get('tradeId')
if trade_id:
try:
exit_reason = 'trailing_stop' if position_info.get('trailingStopActivated') else 'stop_loss'
Trade.update_exit(
trade_id=trade_id,
exit_price=current_price,
exit_reason=exit_reason,
pnl=pnl_percent * entry_price * quantity / 100,
pnl_percent=pnl_percent
)
except Exception as e:
logger.warning(f"更新止损记录失败: {e}")
if await self.close_position(symbol):
closed_positions.append(symbol)
continue
if position_info['side'] == 'SELL' and current_price >= stop_loss:
logger.warning(
f"{symbol} 触发止损: {current_price:.4f} >= {stop_loss:.4f} "
f"(盈亏: {pnl_percent:.2f}%)"
)
# 更新数据库
if DB_AVAILABLE:
trade_id = position_info.get('tradeId')
if trade_id:
try:
exit_reason = 'trailing_stop' if position_info.get('trailingStopActivated') else 'stop_loss'
Trade.update_exit(
trade_id=trade_id,
exit_price=current_price,
exit_reason=exit_reason,
pnl=pnl_percent * entry_price * quantity / 100,
pnl_percent=pnl_percent
)
except Exception as e:
logger.warning(f"更新止损记录失败: {e}")
if await self.close_position(symbol):
closed_positions.append(symbol)
continue
# 检查止盈
take_profit = position_info['takeProfit']
if position_info['side'] == 'BUY' and current_price >= take_profit:
logger.info(
f"{symbol} 触发止盈: {current_price:.4f} >= {take_profit:.4f} "
f"(盈亏: {pnl_percent:.2f}%)"
)
# 更新数据库
if DB_AVAILABLE:
trade_id = position_info.get('tradeId')
if trade_id:
try:
Trade.update_exit(
trade_id=trade_id,
exit_price=current_price,
exit_reason='take_profit',
pnl=pnl_percent * entry_price * quantity / 100,
pnl_percent=pnl_percent
)
except Exception as e:
logger.warning(f"更新止盈记录失败: {e}")
if await self.close_position(symbol):
closed_positions.append(symbol)
continue
if position_info['side'] == 'SELL' and current_price <= take_profit:
logger.info(
f"{symbol} 触发止盈: {current_price:.4f} <= {take_profit:.4f} "
f"(盈亏: {pnl_percent:.2f}%)"
)
# 更新数据库
if DB_AVAILABLE:
trade_id = position_info.get('tradeId')
if trade_id:
try:
Trade.update_exit(
trade_id=trade_id,
exit_price=current_price,
exit_reason='take_profit',
pnl=pnl_percent * entry_price * quantity / 100,
pnl_percent=pnl_percent
)
except Exception as e:
logger.warning(f"更新止盈记录失败: {e}")
if await self.close_position(symbol):
closed_positions.append(symbol)
continue
except Exception as e:
logger.error(f"检查止损止盈失败: {e}")
return closed_positions
async def get_position_summary(self) -> Dict:
"""
获取持仓摘要
Returns:
持仓摘要信息
"""
try:
positions = await self.client.get_open_positions()
balance = await self.client.get_account_balance()
total_pnl = sum(p['unRealizedProfit'] for p in positions)
return {
'totalPositions': len(positions),
'totalBalance': balance.get('total', 0),
'availableBalance': balance.get('available', 0),
'totalPnL': total_pnl,
'positions': [
{
'symbol': p['symbol'],
'positionAmt': p['positionAmt'],
'entryPrice': p['entryPrice'],
'pnl': p['unRealizedProfit']
}
for p in positions
]
}
except Exception as e:
logger.error(f"获取持仓摘要失败: {e}")
return {}
async def start_all_position_monitoring(self):
"""
启动所有持仓的WebSocket实时监控
"""
if not self._monitoring_enabled:
logger.info("实时监控已禁用,跳过启动")
return
if not self.client.socket_manager:
logger.warning("WebSocket未初始化无法启动实时监控")
return
# 获取当前所有持仓
positions = await self.client.get_open_positions()
for position in positions:
symbol = position['symbol']
if symbol not in self._monitor_tasks and symbol in self.active_positions:
await self._start_position_monitoring(symbol)
logger.info(f"已启动 {len(self._monitor_tasks)} 个持仓的实时监控")
async def stop_all_position_monitoring(self):
"""
停止所有持仓的WebSocket监控
"""
symbols = list(self._monitor_tasks.keys())
for symbol in symbols:
await self._stop_position_monitoring(symbol)
logger.info(f"已停止所有持仓监控 ({len(symbols)} 个)")
async def _start_position_monitoring(self, symbol: str):
"""
启动单个持仓的WebSocket价格监控
Args:
symbol: 交易对
"""
if symbol in self._monitor_tasks:
logger.debug(f"{symbol} 监控任务已存在,跳过")
return
if not self.client.socket_manager:
logger.warning(f"{symbol} WebSocket未初始化无法启动监控")
return
try:
task = asyncio.create_task(self._monitor_position_price(symbol))
self._monitor_tasks[symbol] = task
logger.info(f"✓ 启动 {symbol} WebSocket实时价格监控")
except Exception as e:
logger.error(f"启动 {symbol} 监控失败: {e}")
async def _stop_position_monitoring(self, symbol: str):
"""
停止单个持仓的WebSocket监控
Args:
symbol: 交易对
"""
if symbol not in self._monitor_tasks:
return
task = self._monitor_tasks[symbol]
if not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
del self._monitor_tasks[symbol]
logger.debug(f"已停止 {symbol} 的WebSocket监控")
async def _monitor_position_price(self, symbol: str):
"""
监控单个持仓的价格WebSocket实时推送
Args:
symbol: 交易对
"""
retry_count = 0
max_retries = 5
while retry_count < max_retries:
try:
if symbol not in self.active_positions:
logger.info(f"{symbol} 持仓已不存在,停止监控")
break
# 使用WebSocket订阅价格流
async with self.client.socket_manager.futures_socket(symbol.lower()) as stream:
logger.debug(f"{symbol} WebSocket连接已建立开始接收价格更新")
retry_count = 0 # 连接成功,重置重试计数
async for msg in stream:
if symbol not in self.active_positions:
logger.info(f"{symbol} 持仓已不存在,停止监控")
break
if 'data' in msg:
try:
current_price = float(msg['data']['c']) # 最新价格
# 立即检查止损止盈
await self._check_single_position(symbol, current_price)
except (KeyError, ValueError, TypeError) as e:
logger.debug(f"{symbol} 解析价格数据失败: {e}")
continue
except asyncio.CancelledError:
logger.info(f"{symbol} 监控任务已取消")
break
except Exception as e:
retry_count += 1
logger.warning(f"{symbol} WebSocket监控出错 (重试 {retry_count}/{max_retries}): {e}")
if retry_count < max_retries:
# 指数退避重试
wait_time = min(2 ** retry_count, 30)
logger.info(f"{symbol} {wait_time}秒后重试连接...")
await asyncio.sleep(wait_time)
else:
logger.error(f"{symbol} WebSocket监控失败已达到最大重试次数")
# 回退到定时检查模式
logger.info(f"{symbol} 将使用定时检查模式(非实时)")
break
# 清理任务
if symbol in self._monitor_tasks:
del self._monitor_tasks[symbol]
async def _check_single_position(self, symbol: str, current_price: float):
"""
检查单个持仓的止损止盈(实时检查)
Args:
symbol: 交易对
current_price: 当前价格
"""
position_info = self.active_positions.get(symbol)
if not position_info:
return
entry_price = position_info['entryPrice']
quantity = position_info['quantity']
# 计算当前盈亏
if position_info['side'] == 'BUY':
pnl_percent = ((current_price - entry_price) / entry_price) * 100
else: # SELL
pnl_percent = ((entry_price - current_price) / entry_price) * 100
# 更新最大盈利
if pnl_percent > position_info.get('maxProfit', 0):
position_info['maxProfit'] = pnl_percent
# 移动止损逻辑(盈利后保护利润)
use_trailing = config.TRADING_CONFIG.get('USE_TRAILING_STOP', True)
if use_trailing:
trailing_activation = config.TRADING_CONFIG.get('TRAILING_STOP_ACTIVATION', 0.01)
trailing_protect = config.TRADING_CONFIG.get('TRAILING_STOP_PROTECT', 0.01)
if not position_info.get('trailingStopActivated', False):
# 盈利超过阈值后,激活移动止损
if pnl_percent > trailing_activation * 100:
position_info['trailingStopActivated'] = True
# 将止损移至成本价(保本)
position_info['stopLoss'] = entry_price
logger.info(
f"{symbol} [实时监控] 移动止损激活: 止损移至成本价 {entry_price:.4f} "
f"(盈利: {pnl_percent:.2f}%)"
)
else:
# 盈利超过2%后,止损移至保护利润位
if pnl_percent > 2.0:
if position_info['side'] == 'BUY':
new_stop_loss = entry_price * (1 + trailing_protect)
if new_stop_loss > position_info['stopLoss']:
position_info['stopLoss'] = new_stop_loss
logger.info(
f"{symbol} [实时监控] 移动止损更新: {new_stop_loss:.4f} "
f"(保护{trailing_protect*100:.1f}%利润)"
)
else: # SELL
new_stop_loss = entry_price * (1 - trailing_protect)
if new_stop_loss < position_info['stopLoss']:
position_info['stopLoss'] = new_stop_loss
logger.info(
f"{symbol} [实时监控] 移动止损更新: {new_stop_loss:.4f} "
f"(保护{trailing_protect*100:.1f}%利润)"
)
# 检查止损
stop_loss = position_info['stopLoss']
should_close = False
exit_reason = None
if position_info['side'] == 'BUY' and current_price <= stop_loss:
should_close = True
exit_reason = 'trailing_stop' if position_info.get('trailingStopActivated') else 'stop_loss'
logger.warning(
f"{symbol} [实时监控] 触发止损: {current_price:.4f} <= {stop_loss:.4f} "
f"(盈亏: {pnl_percent:.2f}%)"
)
elif position_info['side'] == 'SELL' and current_price >= stop_loss:
should_close = True
exit_reason = 'trailing_stop' if position_info.get('trailingStopActivated') else 'stop_loss'
logger.warning(
f"{symbol} [实时监控] 触发止损: {current_price:.4f} >= {stop_loss:.4f} "
f"(盈亏: {pnl_percent:.2f}%)"
)
# 检查止盈
if not should_close:
take_profit = position_info['takeProfit']
if position_info['side'] == 'BUY' and current_price >= take_profit:
should_close = True
exit_reason = 'take_profit'
logger.info(
f"{symbol} [实时监控] 触发止盈: {current_price:.4f} >= {take_profit:.4f} "
f"(盈亏: {pnl_percent:.2f}%)"
)
elif position_info['side'] == 'SELL' and current_price <= take_profit:
should_close = True
exit_reason = 'take_profit'
logger.info(
f"{symbol} [实时监控] 触发止盈: {current_price:.4f} <= {take_profit:.4f} "
f"(盈亏: {pnl_percent:.2f}%)"
)
# 如果触发止损止盈,执行平仓
if should_close:
# 更新数据库
if DB_AVAILABLE and Trade:
trade_id = position_info.get('tradeId')
if trade_id:
try:
if position_info['side'] == 'BUY':
pnl = (current_price - entry_price) * quantity
else: # SELL
pnl = (entry_price - current_price) * quantity
Trade.update_exit(
trade_id=trade_id,
exit_price=current_price,
exit_reason=exit_reason,
pnl=pnl,
pnl_percent=pnl_percent
)
except Exception as e:
logger.warning(f"更新{exit_reason}记录失败: {e}")
# 执行平仓(注意:这里会停止监控,所以先更新数据库)
await self.close_position(symbol)