auto_trade_sys/trading_system/position_manager.py
薇薇安 cd74e96c22 a
2026-01-14 21:08:32 +08:00

1208 lines
58 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
仓位管理模块 - 管理持仓和订单
"""
import asyncio
import logging
import json
import aiohttp
from typing import Dict, List, Optional
try:
from .binance_client import BinanceClient
from .risk_manager import RiskManager
from . import config
except ImportError:
from binance_client import BinanceClient
from risk_manager import RiskManager
import config
logger = logging.getLogger(__name__)
# 尝试导入数据库模型(如果可用)
DB_AVAILABLE = False
Trade = None
try:
import sys
from pathlib import Path
project_root = Path(__file__).parent.parent
backend_path = project_root / 'backend'
if backend_path.exists():
sys.path.insert(0, str(backend_path))
from database.models import Trade
DB_AVAILABLE = True
logger.info("✓ 数据库模型导入成功,交易记录将保存到数据库")
else:
logger.warning("⚠ backend目录不存在无法使用数据库功能")
DB_AVAILABLE = False
except ImportError as e:
logger.warning(f"⚠ 无法导入数据库模型: {e}")
logger.warning(" 交易记录将不会保存到数据库")
DB_AVAILABLE = False
except Exception as e:
logger.warning(f"⚠ 数据库初始化失败: {e}")
logger.warning(" 交易记录将不会保存到数据库")
DB_AVAILABLE = False
class PositionManager:
"""仓位管理类"""
def __init__(self, client: BinanceClient, risk_manager: RiskManager):
"""
初始化仓位管理器
Args:
client: 币安客户端
risk_manager: 风险管理器
"""
self.client = client
self.risk_manager = risk_manager
self.active_positions: Dict[str, Dict] = {}
self._monitor_tasks: Dict[str, asyncio.Task] = {} # WebSocket监控任务字典
self._monitoring_enabled = True # 是否启用实时监控
async def open_position(
self,
symbol: str,
change_percent: float,
leverage: int = 10,
trade_direction: Optional[str] = None,
entry_reason: str = '',
atr: Optional[float] = None
) -> Optional[Dict]:
"""
开仓
Args:
symbol: 交易对
change_percent: 涨跌幅百分比
leverage: 杠杆倍数
Returns:
订单信息失败返回None
"""
try:
# 判断是否应该交易
if not await self.risk_manager.should_trade(symbol, change_percent):
return None
# 设置杠杆
await self.client.set_leverage(symbol, leverage)
# 计算仓位大小
logger.info(f"开始为 {symbol} 计算仓位大小...")
quantity = await self.risk_manager.calculate_position_size(
symbol, change_percent
)
if quantity is None:
logger.warning(f"{symbol} 仓位计算失败,跳过交易")
logger.warning(f" 可能原因:")
logger.warning(f" 1. 账户余额不足")
logger.warning(f" 2. 单笔仓位超过限制")
logger.warning(f" 3. 总仓位超过限制")
logger.warning(f" 4. 无法获取价格数据")
return None
logger.info(f"{symbol} 仓位计算成功: {quantity:.4f}")
# 确定交易方向(优先使用技术指标信号)
if trade_direction:
side = trade_direction
else:
side = 'BUY' if change_percent > 0 else 'SELL'
# 获取当前价格
ticker = await self.client.get_ticker_24h(symbol)
if not ticker:
return None
entry_price = ticker['price']
# 计算动态止损止盈使用ATR或固定比例
if atr and atr > 0:
# 使用ATR计算动态止损2倍ATR
atr_stop_loss_pct = (atr * 2) / entry_price
# 限制在合理范围内1%-5%
atr_stop_loss_pct = max(0.01, min(0.05, atr_stop_loss_pct))
stop_loss_price = self.risk_manager.get_stop_loss_price(
entry_price, side, stop_loss_pct=atr_stop_loss_pct
)
# 止盈为止损的1.5-2倍
take_profit_pct = atr_stop_loss_pct * 1.8
take_profit_price = self.risk_manager.get_take_profit_price(
entry_price, side, take_profit_pct=take_profit_pct
)
else:
# 使用固定止损止盈
stop_loss_price = self.risk_manager.get_stop_loss_price(entry_price, side)
take_profit_price = self.risk_manager.get_take_profit_price(entry_price, side)
# 下单
order = await self.client.place_order(
symbol=symbol,
side=side,
quantity=quantity,
order_type='MARKET'
)
if order:
# 记录到数据库
trade_id = None
if DB_AVAILABLE and Trade:
try:
logger.info(f"正在保存 {symbol} 交易记录到数据库...")
trade_id = Trade.create(
symbol=symbol,
side=side,
quantity=quantity,
entry_price=entry_price,
leverage=leverage,
entry_reason=entry_reason
)
logger.info(f"{symbol} 交易记录已保存到数据库 (ID: {trade_id})")
except Exception as e:
logger.error(f"❌ 保存交易记录到数据库失败: {e}")
logger.error(f" 错误类型: {type(e).__name__}")
import traceback
logger.error(f" 错误详情:\n{traceback.format_exc()}")
elif not DB_AVAILABLE:
logger.debug(f"数据库不可用,跳过保存 {symbol} 交易记录")
elif not Trade:
logger.warning(f"Trade模型未导入无法保存 {symbol} 交易记录")
# 记录持仓信息(包含动态止损止盈)
position_info = {
'symbol': symbol,
'side': side,
'quantity': quantity,
'entryPrice': entry_price,
'changePercent': change_percent,
'orderId': order.get('orderId'),
'tradeId': trade_id, # 数据库交易ID
'stopLoss': stop_loss_price,
'takeProfit': take_profit_price,
'initialStopLoss': stop_loss_price, # 初始止损(用于移动止损)
'leverage': leverage,
'entryReason': entry_reason,
'atr': atr,
'maxProfit': 0.0, # 记录最大盈利(用于移动止损)
'trailingStopActivated': False # 移动止损是否已激活
}
self.active_positions[symbol] = position_info
logger.info(
f"开仓成功: {symbol} {side} {quantity} @ {entry_price:.4f} "
f"(涨跌幅: {change_percent:.2f}%)"
)
# 启动WebSocket实时监控
if self._monitoring_enabled:
await self._start_position_monitoring(symbol)
return position_info
return None
except Exception as e:
logger.error(f"开仓失败 {symbol}: {e}")
return None
async def close_position(self, symbol: str, reason: str = 'manual') -> bool:
"""
平仓
Args:
symbol: 交易对
reason: 平仓原因manual, stop_loss, take_profit, trailing_stop, sync
Returns:
是否成功
"""
try:
logger.info(f"{symbol} [平仓] 开始平仓操作 (原因: {reason})")
# 获取当前持仓
positions = await self.client.get_open_positions()
position = next(
(p for p in positions if p['symbol'] == symbol),
None
)
if not position:
logger.warning(f"{symbol} [平仓] 币安账户中没有持仓,可能已被平仓")
# 即使币安没有持仓,也要更新数据库状态
if DB_AVAILABLE and Trade and symbol in self.active_positions:
position_info = self.active_positions[symbol]
trade_id = position_info.get('tradeId')
if trade_id:
try:
logger.info(f"{symbol} [平仓] 更新数据库状态为已平仓 (ID: {trade_id})...")
# 获取当前价格作为平仓价格
ticker = await self.client.get_ticker_24h(symbol)
exit_price = float(ticker['price']) if ticker else float(position_info['entryPrice'])
# 确保所有值都是float类型
entry_price = float(position_info['entryPrice'])
quantity = float(position_info['quantity'])
if position_info['side'] == 'BUY':
pnl = (exit_price - entry_price) * quantity
pnl_percent = ((exit_price - entry_price) / entry_price) * 100
else:
pnl = (entry_price - exit_price) * quantity
pnl_percent = ((entry_price - exit_price) / entry_price) * 100
Trade.update_exit(
trade_id=trade_id,
exit_price=exit_price,
exit_reason=reason,
pnl=pnl,
pnl_percent=pnl_percent
)
logger.info(f"{symbol} [平仓] ✓ 数据库状态已更新")
except Exception as e:
logger.error(f"{symbol} [平仓] ❌ 更新数据库状态失败: {e}")
# 清理本地记录
await self._stop_position_monitoring(symbol)
if symbol in self.active_positions:
del self.active_positions[symbol]
return False
# 确定平仓方向(与开仓相反)
position_amt = position['positionAmt']
side = 'SELL' if position_amt > 0 else 'BUY'
quantity = abs(position_amt)
logger.info(
f"{symbol} [平仓] 下单信息: {side} {quantity:.4f} @ MARKET "
f"(持仓数量: {position_amt:.4f})"
)
# 平仓
order = await self.client.place_order(
symbol=symbol,
side=side,
quantity=quantity,
order_type='MARKET'
)
if order:
logger.info(f"{symbol} [平仓] ✓ 平仓订单已提交 (订单ID: {order.get('orderId', 'N/A')})")
# 获取平仓价格确保是float类型
ticker = await self.client.get_ticker_24h(symbol)
if not ticker:
logger.warning(f"无法获取 {symbol} 价格,使用订单价格")
exit_price = float(order.get('avgPrice', 0)) or float(order.get('price', 0))
else:
exit_price = float(ticker['price'])
# 更新数据库记录
if DB_AVAILABLE and Trade and symbol in self.active_positions:
position_info = self.active_positions[symbol]
trade_id = position_info.get('tradeId')
if trade_id:
try:
logger.info(f"正在更新 {symbol} 平仓记录到数据库 (ID: {trade_id})...")
# 计算盈亏确保所有值都是float类型
entry_price = float(position_info['entryPrice'])
quantity_float = float(quantity)
exit_price_float = float(exit_price)
if position_info['side'] == 'BUY':
pnl = (exit_price_float - entry_price) * quantity_float
pnl_percent = ((exit_price_float - entry_price) / entry_price) * 100
else: # SELL
pnl = (entry_price - exit_price_float) * quantity_float
pnl_percent = ((entry_price - exit_price_float) / entry_price) * 100
Trade.update_exit(
trade_id=trade_id,
exit_price=exit_price_float,
exit_reason=reason,
pnl=pnl,
pnl_percent=pnl_percent
)
logger.info(
f"{symbol} [平仓] ✓ 数据库记录已更新 "
f"(盈亏: {pnl:.2f} USDT, {pnl_percent:.2f}%, 原因: {reason})"
)
except Exception as e:
logger.error(f"❌ 更新平仓记录到数据库失败: {e}")
logger.error(f" 错误类型: {type(e).__name__}")
import traceback
logger.error(f" 错误详情:\n{traceback.format_exc()}")
else:
logger.warning(f"{symbol} 没有关联的数据库交易ID无法更新平仓记录")
elif not DB_AVAILABLE:
logger.debug(f"数据库不可用,跳过更新 {symbol} 平仓记录")
elif not Trade:
logger.warning(f"Trade模型未导入无法更新 {symbol} 平仓记录")
# 停止WebSocket监控
await self._stop_position_monitoring(symbol)
# 移除持仓记录
if symbol in self.active_positions:
del self.active_positions[symbol]
logger.info(
f"{symbol} [平仓] ✓ 平仓完成: {side} {quantity:.4f} @ {exit_price:.4f} "
f"(原因: {reason})"
)
return True
return False
except Exception as e:
logger.error(f"{symbol} [平仓] ❌ 平仓失败: {e}")
import traceback
logger.error(f" 错误详情:\n{traceback.format_exc()}")
return False
async def check_stop_loss_take_profit(self) -> List[str]:
"""
检查止损止盈
Returns:
需要平仓的交易对列表
"""
closed_positions = []
try:
# 获取当前持仓
positions = await self.client.get_open_positions()
position_dict = {p['symbol']: p for p in positions}
for symbol, position_info in list(self.active_positions.items()):
if symbol not in position_dict:
# 持仓已不存在,移除记录
del self.active_positions[symbol]
continue
current_position = position_dict[symbol]
entry_price = position_info['entryPrice']
quantity = position_info['quantity'] # 修复获取quantity
# 获取当前标记价格
current_price = current_position.get('markPrice', 0)
if current_price == 0:
# 如果标记价格为0尝试从ticker获取
ticker = await self.client.get_ticker_24h(symbol)
if ticker:
current_price = ticker['price']
else:
current_price = entry_price
# 计算当前盈亏
if position_info['side'] == 'BUY':
pnl_percent = ((current_price - entry_price) / entry_price) * 100
else:
pnl_percent = ((entry_price - current_price) / entry_price) * 100
# 更新最大盈利
if pnl_percent > position_info.get('maxProfit', 0):
position_info['maxProfit'] = pnl_percent
# 移动止损逻辑(盈利后保护利润)
use_trailing = config.TRADING_CONFIG.get('USE_TRAILING_STOP', True)
if use_trailing:
trailing_activation = config.TRADING_CONFIG.get('TRAILING_STOP_ACTIVATION', 0.01)
trailing_protect = config.TRADING_CONFIG.get('TRAILING_STOP_PROTECT', 0.01)
if not position_info.get('trailingStopActivated', False):
# 盈利超过阈值后,激活移动止损
if pnl_percent > trailing_activation * 100:
position_info['trailingStopActivated'] = True
# 将止损移至成本价(保本)
position_info['stopLoss'] = entry_price
logger.info(
f"{symbol} 移动止损激活: 止损移至成本价 {entry_price:.4f} "
f"(盈利: {pnl_percent:.2f}%)"
)
else:
# 盈利超过2%后,止损移至保护利润位
if pnl_percent > 2.0:
if position_info['side'] == 'BUY':
new_stop_loss = entry_price * (1 + trailing_protect)
if new_stop_loss > position_info['stopLoss']:
position_info['stopLoss'] = new_stop_loss
logger.info(
f"{symbol} 移动止损更新: {new_stop_loss:.4f} "
f"(保护{trailing_protect*100:.1f}%利润)"
)
else:
new_stop_loss = entry_price * (1 - trailing_protect)
if new_stop_loss < position_info['stopLoss']:
position_info['stopLoss'] = new_stop_loss
logger.info(
f"{symbol} 移动止损更新: {new_stop_loss:.4f} "
f"(保护{trailing_protect*100:.1f}%利润)"
)
# 检查止损(使用更新后的止损价)
stop_loss = position_info['stopLoss']
if position_info['side'] == 'BUY' and current_price <= stop_loss:
logger.warning(
f"{symbol} 触发止损: {current_price:.4f} <= {stop_loss:.4f} "
f"(盈亏: {pnl_percent:.2f}%)"
)
# 更新数据库
if DB_AVAILABLE:
trade_id = position_info.get('tradeId')
if trade_id:
try:
exit_reason = 'trailing_stop' if position_info.get('trailingStopActivated') else 'stop_loss'
Trade.update_exit(
trade_id=trade_id,
exit_price=current_price,
exit_reason=exit_reason,
pnl=pnl_percent * entry_price * quantity / 100,
pnl_percent=pnl_percent
)
except Exception as e:
logger.warning(f"更新止损记录失败: {e}")
if await self.close_position(symbol):
closed_positions.append(symbol)
continue
if position_info['side'] == 'SELL' and current_price >= stop_loss:
logger.warning(
f"{symbol} 触发止损: {current_price:.4f} >= {stop_loss:.4f} "
f"(盈亏: {pnl_percent:.2f}%)"
)
# 更新数据库
if DB_AVAILABLE:
trade_id = position_info.get('tradeId')
if trade_id:
try:
exit_reason = 'trailing_stop' if position_info.get('trailingStopActivated') else 'stop_loss'
Trade.update_exit(
trade_id=trade_id,
exit_price=current_price,
exit_reason=exit_reason,
pnl=pnl_percent * entry_price * quantity / 100,
pnl_percent=pnl_percent
)
except Exception as e:
logger.warning(f"更新止损记录失败: {e}")
if await self.close_position(symbol):
closed_positions.append(symbol)
continue
# 检查止盈
take_profit = position_info['takeProfit']
if position_info['side'] == 'BUY' and current_price >= take_profit:
logger.info(
f"{symbol} 触发止盈: {current_price:.4f} >= {take_profit:.4f} "
f"(盈亏: {pnl_percent:.2f}%)"
)
# 更新数据库
if DB_AVAILABLE:
trade_id = position_info.get('tradeId')
if trade_id:
try:
Trade.update_exit(
trade_id=trade_id,
exit_price=current_price,
exit_reason='take_profit',
pnl=pnl_percent * entry_price * quantity / 100,
pnl_percent=pnl_percent
)
except Exception as e:
logger.warning(f"更新止盈记录失败: {e}")
if await self.close_position(symbol):
closed_positions.append(symbol)
continue
if position_info['side'] == 'SELL' and current_price <= take_profit:
logger.info(
f"{symbol} 触发止盈: {current_price:.4f} <= {take_profit:.4f} "
f"(盈亏: {pnl_percent:.2f}%)"
)
# 更新数据库
if DB_AVAILABLE:
trade_id = position_info.get('tradeId')
if trade_id:
try:
Trade.update_exit(
trade_id=trade_id,
exit_price=current_price,
exit_reason='take_profit',
pnl=pnl_percent * entry_price * quantity / 100,
pnl_percent=pnl_percent
)
except Exception as e:
logger.warning(f"更新止盈记录失败: {e}")
if await self.close_position(symbol):
closed_positions.append(symbol)
continue
except Exception as e:
logger.error(f"检查止损止盈失败: {e}")
return closed_positions
async def get_position_summary(self) -> Dict:
"""
获取持仓摘要
Returns:
持仓摘要信息
"""
try:
positions = await self.client.get_open_positions()
balance = await self.client.get_account_balance()
total_pnl = sum(p['unRealizedProfit'] for p in positions)
return {
'totalPositions': len(positions),
'totalBalance': balance.get('total', 0),
'availableBalance': balance.get('available', 0),
'totalPnL': total_pnl,
'positions': [
{
'symbol': p['symbol'],
'positionAmt': p['positionAmt'],
'entryPrice': p['entryPrice'],
'pnl': p['unRealizedProfit']
}
for p in positions
]
}
except Exception as e:
logger.error(f"获取持仓摘要失败: {e}")
return {}
async def sync_positions_with_binance(self):
"""
同步币安实际持仓状态与数据库状态
检查哪些持仓在数据库中还是open状态但在币安已经不存在了
"""
if not DB_AVAILABLE or not Trade:
logger.debug("数据库不可用,跳过持仓状态同步")
return
try:
logger.info("开始同步币安持仓状态与数据库...")
# 1. 获取币安实际持仓
binance_positions = await self.client.get_open_positions()
binance_symbols = {p['symbol'] for p in binance_positions}
logger.debug(f"币安实际持仓: {len(binance_symbols)} 个 ({', '.join(binance_symbols) if binance_symbols else ''})")
# 2. 获取数据库中状态为open的交易记录
db_open_trades = Trade.get_all(status='open')
db_open_symbols = {t['symbol'] for t in db_open_trades}
logger.debug(f"数据库open状态: {len(db_open_symbols)} 个 ({', '.join(db_open_symbols) if db_open_symbols else ''})")
# 3. 找出在数据库中open但在币安已不存在的持仓
missing_in_binance = db_open_symbols - binance_symbols
if missing_in_binance:
logger.warning(
f"发现 {len(missing_in_binance)} 个持仓在数据库中为open状态但在币安已不存在: "
f"{', '.join(missing_in_binance)}"
)
# 4. 更新这些持仓的状态
for symbol in missing_in_binance:
trades = Trade.get_by_symbol(symbol, status='open')
for trade in trades:
trade_id = trade['id']
try:
logger.info(f"{symbol} [状态同步] 更新交易记录状态 (ID: {trade_id})...")
# 获取当前价格作为平仓价格
ticker = await self.client.get_ticker_24h(symbol)
exit_price = float(ticker['price']) if ticker else float(trade['entry_price'])
# 计算盈亏确保所有值都是float类型避免Decimal类型问题
entry_price = float(trade['entry_price'])
quantity = float(trade['quantity'])
if trade['side'] == 'BUY':
pnl = (exit_price - entry_price) * quantity
pnl_percent = ((exit_price - entry_price) / entry_price) * 100
else: # SELL
pnl = (entry_price - exit_price) * quantity
pnl_percent = ((entry_price - exit_price) / entry_price) * 100
Trade.update_exit(
trade_id=trade_id,
exit_price=exit_price,
exit_reason='sync', # 标记为同步更新
pnl=pnl,
pnl_percent=pnl_percent
)
logger.info(
f"{symbol} [状态同步] ✓ 已更新 (ID: {trade_id}, "
f"盈亏: {pnl:.2f} USDT, {pnl_percent:.2f}%)"
)
# 清理本地记录
if symbol in self.active_positions:
await self._stop_position_monitoring(symbol)
del self.active_positions[symbol]
logger.debug(f"{symbol} [状态同步] 已清理本地持仓记录")
except Exception as e:
logger.error(f"{symbol} [状态同步] ❌ 更新失败 (ID: {trade_id}): {e}")
else:
logger.info("✓ 持仓状态同步完成,数据库与币安状态一致")
# 5. 检查币安有但数据库没有记录的持仓(可能是手动开仓的)
missing_in_db = binance_symbols - db_open_symbols
if missing_in_db:
logger.info(
f"发现 {len(missing_in_db)} 个持仓在币安存在但数据库中没有记录: "
f"{', '.join(missing_in_db)} (可能是手动开仓)"
)
# 为手动开仓的持仓创建数据库记录并启动监控
for symbol in missing_in_db:
try:
# 获取币安持仓详情
binance_position = next(
(p for p in binance_positions if p['symbol'] == symbol),
None
)
if not binance_position:
continue
position_amt = binance_position['positionAmt']
entry_price = binance_position['entryPrice']
quantity = abs(position_amt)
side = 'BUY' if position_amt > 0 else 'SELL'
logger.info(
f"{symbol} [状态同步] 检测到手动开仓,创建数据库记录... "
f"({side} {quantity:.4f} @ {entry_price:.4f})"
)
# 创建数据库记录
trade_id = Trade.create(
symbol=symbol,
side=side,
quantity=quantity,
entry_price=entry_price,
leverage=binance_position.get('leverage', 10),
entry_reason='manual_entry' # 标记为手动开仓
)
logger.info(f"{symbol} [状态同步] ✓ 数据库记录已创建 (ID: {trade_id})")
# 创建本地持仓记录(用于监控)
ticker = await self.client.get_ticker_24h(symbol)
current_price = ticker['price'] if ticker else entry_price
# 计算止损止盈
stop_loss_price = self.risk_manager.get_stop_loss_price(entry_price, side)
take_profit_price = self.risk_manager.get_take_profit_price(entry_price, side)
position_info = {
'symbol': symbol,
'side': side,
'quantity': quantity,
'entryPrice': entry_price,
'changePercent': 0, # 手动开仓,无法计算涨跌幅
'orderId': None,
'tradeId': trade_id,
'stopLoss': stop_loss_price,
'takeProfit': take_profit_price,
'initialStopLoss': stop_loss_price,
'leverage': binance_position.get('leverage', 10),
'entryReason': 'manual_entry',
'atr': None,
'maxProfit': 0.0,
'trailingStopActivated': False
}
self.active_positions[symbol] = position_info
# 启动WebSocket监控
if self._monitoring_enabled:
await self._start_position_monitoring(symbol)
logger.info(f"{symbol} [状态同步] ✓ 已启动实时监控")
logger.info(f"{symbol} [状态同步] ✓ 手动开仓同步完成")
except Exception as e:
logger.error(f"{symbol} [状态同步] ❌ 处理手动开仓失败: {e}")
import traceback
logger.error(f" 错误详情:\n{traceback.format_exc()}")
logger.info("持仓状态同步完成")
except Exception as e:
logger.error(f"同步持仓状态失败: {e}")
import traceback
logger.error(f"错误详情:\n{traceback.format_exc()}")
async def start_all_position_monitoring(self):
"""
启动所有持仓的WebSocket实时监控
"""
if not self._monitoring_enabled:
logger.info("实时监控已禁用,跳过启动")
return
# WebSocket 现在直接使用 aiohttp不需要检查 socket_manager
if not self.client:
logger.warning("客户端未初始化,无法启动实时监控")
return
# 获取当前所有持仓
positions = await self.client.get_open_positions()
binance_symbols = {p['symbol'] for p in positions}
active_symbols = set(self.active_positions.keys())
logger.info(f"币安持仓: {len(binance_symbols)} 个 ({', '.join(binance_symbols) if binance_symbols else ''})")
logger.info(f"本地持仓记录: {len(active_symbols)} 个 ({', '.join(active_symbols) if active_symbols else ''})")
# 为所有币安持仓启动监控即使不在active_positions中可能是手动开仓的
for position in positions:
symbol = position['symbol']
if symbol not in self._monitor_tasks:
# 如果不在active_positions中先创建记录
if symbol not in self.active_positions:
logger.warning(f"{symbol} 在币安有持仓但不在本地记录中,可能是手动开仓,尝试创建记录...")
# 这里会通过sync_positions_with_binance来处理但先启动监控
try:
entry_price = position.get('entryPrice', 0)
position_amt = position['positionAmt']
quantity = abs(position_amt)
side = 'BUY' if position_amt > 0 else 'SELL'
# 创建临时记录用于监控
ticker = await self.client.get_ticker_24h(symbol)
current_price = ticker['price'] if ticker else entry_price
stop_loss_price = self.risk_manager.get_stop_loss_price(entry_price, side)
take_profit_price = self.risk_manager.get_take_profit_price(entry_price, side)
position_info = {
'symbol': symbol,
'side': side,
'quantity': quantity,
'entryPrice': entry_price,
'changePercent': 0,
'orderId': None,
'tradeId': None,
'stopLoss': stop_loss_price,
'takeProfit': take_profit_price,
'initialStopLoss': stop_loss_price,
'leverage': position.get('leverage', 10),
'entryReason': 'manual_entry_temp',
'atr': None,
'maxProfit': 0.0,
'trailingStopActivated': False
}
self.active_positions[symbol] = position_info
logger.info(f"{symbol} 已创建临时持仓记录用于监控")
except Exception as e:
logger.error(f"{symbol} 创建临时持仓记录失败: {e}")
await self._start_position_monitoring(symbol)
logger.info(f"已启动 {len(self._monitor_tasks)} 个持仓的实时监控")
async def stop_all_position_monitoring(self):
"""
停止所有持仓的WebSocket监控
"""
symbols = list(self._monitor_tasks.keys())
for symbol in symbols:
await self._stop_position_monitoring(symbol)
logger.info(f"已停止所有持仓监控 ({len(symbols)} 个)")
async def _start_position_monitoring(self, symbol: str):
"""
启动单个持仓的WebSocket价格监控
Args:
symbol: 交易对
"""
if symbol in self._monitor_tasks:
logger.debug(f"{symbol} 监控任务已存在,跳过")
return
# WebSocket 现在直接使用 aiohttp不需要检查 socket_manager
if not self.client:
logger.warning(f"{symbol} 客户端未初始化,无法启动监控")
return
try:
task = asyncio.create_task(self._monitor_position_price(symbol))
self._monitor_tasks[symbol] = task
logger.info(f"✓ 启动 {symbol} WebSocket实时价格监控")
except Exception as e:
logger.error(f"启动 {symbol} 监控失败: {e}")
async def _stop_position_monitoring(self, symbol: str):
"""
停止单个持仓的WebSocket监控
Args:
symbol: 交易对
"""
if symbol not in self._monitor_tasks:
return
task = self._monitor_tasks[symbol]
if not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
del self._monitor_tasks[symbol]
logger.debug(f"已停止 {symbol} 的WebSocket监控")
async def _monitor_position_price(self, symbol: str):
"""
监控单个持仓的价格WebSocket实时推送
Args:
symbol: 交易对
"""
retry_count = 0
max_retries = 5
while retry_count < max_retries:
try:
if symbol not in self.active_positions:
logger.info(f"{symbol} 持仓已不存在,停止监控")
break
# 使用WebSocket订阅价格流
# 直接使用 aiohttp 连接 Binance 期货 WebSocket API
# 根据文档https://developers.binance.com/docs/derivatives/usds-margined-futures/websocket-market-streams
# 端点wss://fstream.binance.com/ws/<symbol>@ticker
ws_url = f"wss://fstream.binance.com/ws/{symbol.lower()}@ticker"
async with aiohttp.ClientSession() as session:
async with session.ws_connect(ws_url) as ws:
logger.debug(f"{symbol} WebSocket连接已建立开始接收价格更新")
retry_count = 0 # 连接成功,重置重试计数
async for msg in ws:
if symbol not in self.active_positions:
logger.info(f"{symbol} 持仓已不存在,停止监控")
break
if msg.type == aiohttp.WSMsgType.TEXT:
try:
# 解析 JSON 消息
data = json.loads(msg.data)
# WebSocket 返回的数据格式:{'e': '24hrTicker', 's': 'BTCUSDT', 'c': '50000.00', ...}
# 根据文档ticker 流包含 'c' 字段(最后价格)
if isinstance(data, dict):
if 'c' in data: # 'c' 是当前价格
current_price = float(data['c'])
# 立即检查止损止盈
await self._check_single_position(symbol, current_price)
elif 'data' in data:
# 兼容组合流格式(如果使用 /stream 端点)
if isinstance(data['data'], dict) and 'c' in data['data']:
current_price = float(data['data']['c'])
await self._check_single_position(symbol, current_price)
except (KeyError, ValueError, TypeError, json.JSONDecodeError) as e:
logger.debug(f"{symbol} 解析价格数据失败: {e}, 消息: {msg.data[:100] if hasattr(msg, 'data') else 'N/A'}")
continue
elif msg.type == aiohttp.WSMsgType.ERROR:
logger.warning(f"{symbol} WebSocket错误: {ws.exception()}")
break
elif msg.type == aiohttp.WSMsgType.CLOSE:
logger.info(f"{symbol} WebSocket连接关闭")
break
except asyncio.CancelledError:
logger.info(f"{symbol} 监控任务已取消")
break
except Exception as e:
retry_count += 1
logger.warning(f"{symbol} WebSocket监控出错 (重试 {retry_count}/{max_retries}): {e}")
if retry_count < max_retries:
# 指数退避重试
wait_time = min(2 ** retry_count, 30)
logger.info(f"{symbol} {wait_time}秒后重试连接...")
await asyncio.sleep(wait_time)
else:
logger.error(f"{symbol} WebSocket监控失败已达到最大重试次数")
# 回退到定时检查模式
logger.info(f"{symbol} 将使用定时检查模式(非实时)")
break
# 清理任务
if symbol in self._monitor_tasks:
del self._monitor_tasks[symbol]
async def _check_single_position(self, symbol: str, current_price: float):
"""
检查单个持仓的止损止盈(实时检查)
Args:
symbol: 交易对
current_price: 当前价格
"""
position_info = self.active_positions.get(symbol)
if not position_info:
return
# 确保所有值都是float类型
entry_price = float(position_info['entryPrice'])
quantity = float(position_info['quantity'])
current_price_float = float(current_price)
# 计算当前盈亏
if position_info['side'] == 'BUY':
pnl_percent = ((current_price_float - entry_price) / entry_price) * 100
else: # SELL
pnl_percent = ((entry_price - current_price_float) / entry_price) * 100
# 更新最大盈利
if pnl_percent > position_info.get('maxProfit', 0):
position_info['maxProfit'] = pnl_percent
# 移动止损逻辑(盈利后保护利润)
use_trailing = config.TRADING_CONFIG.get('USE_TRAILING_STOP', True)
if use_trailing:
trailing_activation = config.TRADING_CONFIG.get('TRAILING_STOP_ACTIVATION', 0.01)
trailing_protect = config.TRADING_CONFIG.get('TRAILING_STOP_PROTECT', 0.01)
if not position_info.get('trailingStopActivated', False):
# 盈利超过阈值后,激活移动止损
if pnl_percent > trailing_activation * 100:
position_info['trailingStopActivated'] = True
# 将止损移至成本价(保本)
position_info['stopLoss'] = entry_price
logger.info(
f"{symbol} [实时监控] 移动止损激活: 止损移至成本价 {entry_price:.4f} "
f"(盈利: {pnl_percent:.2f}%)"
)
else:
# 盈利超过2%后,止损移至保护利润位
if pnl_percent > 2.0:
if position_info['side'] == 'BUY':
new_stop_loss = entry_price * (1 + trailing_protect)
if new_stop_loss > position_info['stopLoss']:
position_info['stopLoss'] = new_stop_loss
logger.info(
f"{symbol} [实时监控] 移动止损更新: {new_stop_loss:.4f} "
f"(保护{trailing_protect*100:.1f}%利润)"
)
else: # SELL
new_stop_loss = entry_price * (1 - trailing_protect)
if new_stop_loss < position_info['stopLoss']:
position_info['stopLoss'] = new_stop_loss
logger.info(
f"{symbol} [实时监控] 移动止损更新: {new_stop_loss:.4f} "
f"(保护{trailing_protect*100:.1f}%利润)"
)
# 检查止损
stop_loss = position_info['stopLoss']
should_close = False
exit_reason = None
if position_info['side'] == 'BUY' and current_price <= stop_loss:
should_close = True
exit_reason = 'trailing_stop' if position_info.get('trailingStopActivated') else 'stop_loss'
logger.warning(
f"{symbol} [实时监控] 触发止损: {current_price:.4f} <= {stop_loss:.4f} "
f"(盈亏: {pnl_percent:.2f}%)"
)
elif position_info['side'] == 'SELL' and current_price >= stop_loss:
should_close = True
exit_reason = 'trailing_stop' if position_info.get('trailingStopActivated') else 'stop_loss'
logger.warning(
f"{symbol} [实时监控] 触发止损: {current_price:.4f} >= {stop_loss:.4f} "
f"(盈亏: {pnl_percent:.2f}%)"
)
# 检查止盈
if not should_close:
take_profit = float(position_info['takeProfit'])
# 计算止盈百分比(用于诊断)
if position_info['side'] == 'BUY':
take_profit_pct = ((take_profit - entry_price) / entry_price) * 100
else: # SELL
take_profit_pct = ((entry_price - take_profit) / entry_price) * 100
# 每5%盈利记录一次诊断日志(帮助排查问题)
# 使用更宽松的条件,避免因为浮点数精度问题导致日志不输出
if pnl_percent >= 5.0:
# 每5%记录一次,但允许一些容差
should_log = (int(pnl_percent) % 5 == 0) or (pnl_percent >= 10.0 and pnl_percent < 10.5)
if should_log:
trigger_condition = current_price_float >= take_profit if position_info['side'] == 'BUY' else current_price_float <= take_profit
logger.warning(
f"{symbol} [实时监控] 诊断: 盈利{pnl_percent:.2f}% | "
f"当前价: {current_price_float:.4f} | "
f"入场价: {entry_price:.4f} | "
f"止盈价: {take_profit:.4f} ({take_profit_pct:.2f}%) | "
f"方向: {position_info['side']} | "
f"是否触发: {trigger_condition} | "
f"价格差: {abs(current_price_float - take_profit):.4f} | "
f"监控状态: {'运行中' if symbol in self._monitor_tasks else '未启动'}"
)
# 如果盈利超过止盈目标但未触发,记录警告
if pnl_percent > take_profit_pct and not trigger_condition:
logger.error(
f"{symbol} [实时监控] ⚠️ 异常: 盈利{pnl_percent:.2f}% > 止盈目标{take_profit_pct:.2f}%,但未触发平仓!"
)
price_diff = current_price_float - take_profit if position_info['side'] == 'BUY' else take_profit - current_price_float
logger.error(f" 当前价: {current_price_float:.4f}, 止盈价: {take_profit:.4f}, 价格差: {price_diff:.4f}")
if position_info['side'] == 'BUY' and current_price_float >= take_profit:
should_close = True
exit_reason = 'take_profit'
logger.info(
f"{symbol} [实时监控] 触发止盈: {current_price_float:.4f} >= {take_profit:.4f} "
f"(盈亏: {pnl_percent:.2f}%, 止盈目标: {take_profit_pct:.2f}%)"
)
elif position_info['side'] == 'SELL' and current_price_float <= take_profit:
should_close = True
exit_reason = 'take_profit'
logger.info(
f"{symbol} [实时监控] 触发止盈: {current_price_float:.4f} <= {take_profit:.4f} "
f"(盈亏: {pnl_percent:.2f}%, 止盈目标: {take_profit_pct:.2f}%)"
)
# 如果触发止损止盈,执行平仓
if should_close:
logger.info(
f"{symbol} [自动平仓] 开始执行平仓操作 | "
f"原因: {exit_reason} | "
f"入场价: {entry_price:.4f} | "
f"当前价: {current_price:.4f} | "
f"盈亏: {pnl_percent:.2f}% | "
f"数量: {quantity:.4f}"
)
# 更新数据库
if DB_AVAILABLE and Trade:
trade_id = position_info.get('tradeId')
if trade_id:
try:
if position_info['side'] == 'BUY':
pnl = (current_price_float - entry_price) * quantity
else: # SELL
pnl = (entry_price - current_price_float) * quantity
logger.info(f"{symbol} [自动平仓] 更新数据库记录 (ID: {trade_id})...")
Trade.update_exit(
trade_id=trade_id,
exit_price=current_price_float,
exit_reason=exit_reason,
pnl=pnl,
pnl_percent=pnl_percent
)
logger.info(f"{symbol} [自动平仓] ✓ 数据库记录已更新 (盈亏: {pnl:.2f} USDT)")
except Exception as e:
logger.error(f"{symbol} [自动平仓] ❌ 更新数据库记录失败: {e}")
import traceback
logger.error(f" 错误详情:\n{traceback.format_exc()}")
# 执行平仓(注意:这里会停止监控,所以先更新数据库)
logger.info(f"{symbol} [自动平仓] 正在执行平仓订单...")
success = await self.close_position(symbol, reason=exit_reason)
if success:
logger.info(f"{symbol} [自动平仓] ✓ 平仓成功完成")
else:
logger.error(f"{symbol} [自动平仓] ❌ 平仓失败")
async def diagnose_position(self, symbol: str):
"""
诊断持仓状态(用于排查为什么没有自动平仓)
Args:
symbol: 交易对
"""
try:
logger.info(f"{symbol} [诊断] 开始诊断持仓状态...")
# 1. 检查是否在active_positions中
if symbol not in self.active_positions:
logger.warning(f"{symbol} [诊断] ❌ 不在本地持仓记录中 (active_positions)")
logger.warning(f" 可能原因: 手动开仓或系统重启后未同步")
logger.warning(f" 解决方案: 等待下次状态同步或手动触发同步")
else:
position_info = self.active_positions[symbol]
logger.info(f"{symbol} [诊断] ✓ 在本地持仓记录中")
logger.info(f" 入场价: {position_info['entryPrice']:.4f}")
logger.info(f" 方向: {position_info['side']}")
logger.info(f" 数量: {position_info['quantity']:.4f}")
logger.info(f" 止损价: {position_info['stopLoss']:.4f}")
logger.info(f" 止盈价: {position_info['takeProfit']:.4f}")
# 2. 检查WebSocket监控状态
if symbol in self._monitor_tasks:
task = self._monitor_tasks[symbol]
if task.done():
logger.warning(f"{symbol} [诊断] ⚠ WebSocket监控任务已结束")
try:
await task # 获取异常信息
except Exception as e:
logger.warning(f" 任务异常: {e}")
else:
logger.info(f"{symbol} [诊断] ✓ WebSocket监控任务运行中")
else:
logger.warning(f"{symbol} [诊断] ❌ 没有WebSocket监控任务")
logger.warning(f" 可能原因: 监控未启动或已停止")
# 3. 获取币安实际持仓
positions = await self.client.get_open_positions()
binance_position = next((p for p in positions if p['symbol'] == symbol), None)
if not binance_position:
logger.warning(f"{symbol} [诊断] ❌ 币安账户中没有持仓")
return
logger.info(f"{symbol} [诊断] ✓ 币安账户中有持仓")
entry_price_binance = binance_position.get('entryPrice', 0)
mark_price = binance_position.get('markPrice', 0)
unrealized_pnl = binance_position.get('unRealizedProfit', 0)
logger.info(f" 币安入场价: {entry_price_binance:.4f}")
logger.info(f" 标记价格: {mark_price:.4f}")
logger.info(f" 未实现盈亏: {unrealized_pnl:.2f} USDT")
# 4. 计算实际盈亏
if symbol in self.active_positions:
position_info = self.active_positions[symbol]
entry_price = float(position_info['entryPrice'])
take_profit = float(position_info['takeProfit'])
if position_info['side'] == 'BUY':
pnl_percent = ((mark_price - entry_price) / entry_price) * 100
take_profit_pct = ((take_profit - entry_price) / entry_price) * 100
should_trigger = mark_price >= take_profit
else: # SELL
pnl_percent = ((entry_price - mark_price) / entry_price) * 100
take_profit_pct = ((entry_price - take_profit) / entry_price) * 100
should_trigger = mark_price <= take_profit
logger.info(f"{symbol} [诊断] 盈亏分析:")
logger.info(f" 当前盈亏: {pnl_percent:.2f}%")
logger.info(f" 止盈目标: {take_profit_pct:.2f}%")
logger.info(f" 当前价: {mark_price:.4f}")
logger.info(f" 止盈价: {take_profit:.4f}")
logger.info(f" 价格差: {abs(mark_price - take_profit):.4f}")
logger.info(f" 应该触发: {should_trigger}")
if pnl_percent > take_profit_pct and not should_trigger:
logger.error(f"{symbol} [诊断] ❌ 异常: 盈亏{pnl_percent:.2f}% > 止盈目标{take_profit_pct:.2f}%,但未触发平仓!")
logger.error(f" 可能原因: 浮点数精度问题或止盈价格计算错误")
logger.info(f"{symbol} [诊断] 诊断完成")
except Exception as e:
logger.error(f"{symbol} [诊断] 诊断失败: {e}")
import traceback
logger.error(f" 错误详情:\n{traceback.format_exc()}")