1012 lines
46 KiB
Python
1012 lines
46 KiB
Python
"""
|
||
仓位管理模块 - 管理持仓和订单
|
||
"""
|
||
import asyncio
|
||
import logging
|
||
from typing import Dict, List, Optional
|
||
try:
|
||
from .binance_client import BinanceClient
|
||
from .risk_manager import RiskManager
|
||
from . import config
|
||
except ImportError:
|
||
from binance_client import BinanceClient
|
||
from risk_manager import RiskManager
|
||
import config
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# 尝试导入数据库模型(如果可用)
|
||
DB_AVAILABLE = False
|
||
Trade = None
|
||
try:
|
||
import sys
|
||
from pathlib import Path
|
||
project_root = Path(__file__).parent.parent
|
||
backend_path = project_root / 'backend'
|
||
if backend_path.exists():
|
||
sys.path.insert(0, str(backend_path))
|
||
from database.models import Trade
|
||
DB_AVAILABLE = True
|
||
logger.info("✓ 数据库模型导入成功,交易记录将保存到数据库")
|
||
else:
|
||
logger.warning("⚠ backend目录不存在,无法使用数据库功能")
|
||
DB_AVAILABLE = False
|
||
except ImportError as e:
|
||
logger.warning(f"⚠ 无法导入数据库模型: {e}")
|
||
logger.warning(" 交易记录将不会保存到数据库")
|
||
DB_AVAILABLE = False
|
||
except Exception as e:
|
||
logger.warning(f"⚠ 数据库初始化失败: {e}")
|
||
logger.warning(" 交易记录将不会保存到数据库")
|
||
DB_AVAILABLE = False
|
||
|
||
|
||
class PositionManager:
|
||
"""仓位管理类"""
|
||
|
||
def __init__(self, client: BinanceClient, risk_manager: RiskManager):
|
||
"""
|
||
初始化仓位管理器
|
||
|
||
Args:
|
||
client: 币安客户端
|
||
risk_manager: 风险管理器
|
||
"""
|
||
self.client = client
|
||
self.risk_manager = risk_manager
|
||
self.active_positions: Dict[str, Dict] = {}
|
||
self._monitor_tasks: Dict[str, asyncio.Task] = {} # WebSocket监控任务字典
|
||
self._monitoring_enabled = True # 是否启用实时监控
|
||
|
||
async def open_position(
|
||
self,
|
||
symbol: str,
|
||
change_percent: float,
|
||
leverage: int = 10,
|
||
trade_direction: Optional[str] = None,
|
||
entry_reason: str = '',
|
||
atr: Optional[float] = None
|
||
) -> Optional[Dict]:
|
||
"""
|
||
开仓
|
||
|
||
Args:
|
||
symbol: 交易对
|
||
change_percent: 涨跌幅百分比
|
||
leverage: 杠杆倍数
|
||
|
||
Returns:
|
||
订单信息,失败返回None
|
||
"""
|
||
try:
|
||
# 判断是否应该交易
|
||
if not await self.risk_manager.should_trade(symbol, change_percent):
|
||
return None
|
||
|
||
# 设置杠杆
|
||
await self.client.set_leverage(symbol, leverage)
|
||
|
||
# 计算仓位大小
|
||
logger.info(f"开始为 {symbol} 计算仓位大小...")
|
||
quantity = await self.risk_manager.calculate_position_size(
|
||
symbol, change_percent
|
||
)
|
||
|
||
if quantity is None:
|
||
logger.warning(f"❌ {symbol} 仓位计算失败,跳过交易")
|
||
logger.warning(f" 可能原因:")
|
||
logger.warning(f" 1. 账户余额不足")
|
||
logger.warning(f" 2. 单笔仓位超过限制")
|
||
logger.warning(f" 3. 总仓位超过限制")
|
||
logger.warning(f" 4. 无法获取价格数据")
|
||
return None
|
||
|
||
logger.info(f"✓ {symbol} 仓位计算成功: {quantity:.4f}")
|
||
|
||
# 确定交易方向(优先使用技术指标信号)
|
||
if trade_direction:
|
||
side = trade_direction
|
||
else:
|
||
side = 'BUY' if change_percent > 0 else 'SELL'
|
||
|
||
# 获取当前价格
|
||
ticker = await self.client.get_ticker_24h(symbol)
|
||
if not ticker:
|
||
return None
|
||
|
||
entry_price = ticker['price']
|
||
|
||
# 计算动态止损止盈(使用ATR或固定比例)
|
||
if atr and atr > 0:
|
||
# 使用ATR计算动态止损(2倍ATR)
|
||
atr_stop_loss_pct = (atr * 2) / entry_price
|
||
# 限制在合理范围内(1%-5%)
|
||
atr_stop_loss_pct = max(0.01, min(0.05, atr_stop_loss_pct))
|
||
stop_loss_price = self.risk_manager.get_stop_loss_price(
|
||
entry_price, side, stop_loss_pct=atr_stop_loss_pct
|
||
)
|
||
# 止盈为止损的1.5-2倍
|
||
take_profit_pct = atr_stop_loss_pct * 1.8
|
||
take_profit_price = self.risk_manager.get_take_profit_price(
|
||
entry_price, side, take_profit_pct=take_profit_pct
|
||
)
|
||
else:
|
||
# 使用固定止损止盈
|
||
stop_loss_price = self.risk_manager.get_stop_loss_price(entry_price, side)
|
||
take_profit_price = self.risk_manager.get_take_profit_price(entry_price, side)
|
||
|
||
# 下单
|
||
order = await self.client.place_order(
|
||
symbol=symbol,
|
||
side=side,
|
||
quantity=quantity,
|
||
order_type='MARKET'
|
||
)
|
||
|
||
if order:
|
||
# 记录到数据库
|
||
trade_id = None
|
||
if DB_AVAILABLE and Trade:
|
||
try:
|
||
logger.info(f"正在保存 {symbol} 交易记录到数据库...")
|
||
trade_id = Trade.create(
|
||
symbol=symbol,
|
||
side=side,
|
||
quantity=quantity,
|
||
entry_price=entry_price,
|
||
leverage=leverage,
|
||
entry_reason=entry_reason
|
||
)
|
||
logger.info(f"✓ {symbol} 交易记录已保存到数据库 (ID: {trade_id})")
|
||
except Exception as e:
|
||
logger.error(f"❌ 保存交易记录到数据库失败: {e}")
|
||
logger.error(f" 错误类型: {type(e).__name__}")
|
||
import traceback
|
||
logger.error(f" 错误详情:\n{traceback.format_exc()}")
|
||
elif not DB_AVAILABLE:
|
||
logger.debug(f"数据库不可用,跳过保存 {symbol} 交易记录")
|
||
elif not Trade:
|
||
logger.warning(f"Trade模型未导入,无法保存 {symbol} 交易记录")
|
||
|
||
# 记录持仓信息(包含动态止损止盈)
|
||
position_info = {
|
||
'symbol': symbol,
|
||
'side': side,
|
||
'quantity': quantity,
|
||
'entryPrice': entry_price,
|
||
'changePercent': change_percent,
|
||
'orderId': order.get('orderId'),
|
||
'tradeId': trade_id, # 数据库交易ID
|
||
'stopLoss': stop_loss_price,
|
||
'takeProfit': take_profit_price,
|
||
'initialStopLoss': stop_loss_price, # 初始止损(用于移动止损)
|
||
'leverage': leverage,
|
||
'entryReason': entry_reason,
|
||
'atr': atr,
|
||
'maxProfit': 0.0, # 记录最大盈利(用于移动止损)
|
||
'trailingStopActivated': False # 移动止损是否已激活
|
||
}
|
||
|
||
self.active_positions[symbol] = position_info
|
||
|
||
logger.info(
|
||
f"开仓成功: {symbol} {side} {quantity} @ {entry_price:.4f} "
|
||
f"(涨跌幅: {change_percent:.2f}%)"
|
||
)
|
||
|
||
# 启动WebSocket实时监控
|
||
if self._monitoring_enabled:
|
||
await self._start_position_monitoring(symbol)
|
||
|
||
return position_info
|
||
|
||
return None
|
||
|
||
except Exception as e:
|
||
logger.error(f"开仓失败 {symbol}: {e}")
|
||
return None
|
||
|
||
async def close_position(self, symbol: str, reason: str = 'manual') -> bool:
|
||
"""
|
||
平仓
|
||
|
||
Args:
|
||
symbol: 交易对
|
||
reason: 平仓原因(manual, stop_loss, take_profit, trailing_stop, sync)
|
||
|
||
Returns:
|
||
是否成功
|
||
"""
|
||
try:
|
||
logger.info(f"{symbol} [平仓] 开始平仓操作 (原因: {reason})")
|
||
|
||
# 获取当前持仓
|
||
positions = await self.client.get_open_positions()
|
||
position = next(
|
||
(p for p in positions if p['symbol'] == symbol),
|
||
None
|
||
)
|
||
|
||
if not position:
|
||
logger.warning(f"{symbol} [平仓] 币安账户中没有持仓,可能已被平仓")
|
||
# 即使币安没有持仓,也要更新数据库状态
|
||
if DB_AVAILABLE and Trade and symbol in self.active_positions:
|
||
position_info = self.active_positions[symbol]
|
||
trade_id = position_info.get('tradeId')
|
||
if trade_id:
|
||
try:
|
||
logger.info(f"{symbol} [平仓] 更新数据库状态为已平仓 (ID: {trade_id})...")
|
||
# 获取当前价格作为平仓价格
|
||
ticker = await self.client.get_ticker_24h(symbol)
|
||
exit_price = float(ticker['price']) if ticker else float(position_info['entryPrice'])
|
||
|
||
# 确保所有值都是float类型
|
||
entry_price = float(position_info['entryPrice'])
|
||
quantity = float(position_info['quantity'])
|
||
if position_info['side'] == 'BUY':
|
||
pnl = (exit_price - entry_price) * quantity
|
||
pnl_percent = ((exit_price - entry_price) / entry_price) * 100
|
||
else:
|
||
pnl = (entry_price - exit_price) * quantity
|
||
pnl_percent = ((entry_price - exit_price) / entry_price) * 100
|
||
|
||
Trade.update_exit(
|
||
trade_id=trade_id,
|
||
exit_price=exit_price,
|
||
exit_reason=reason,
|
||
pnl=pnl,
|
||
pnl_percent=pnl_percent
|
||
)
|
||
logger.info(f"{symbol} [平仓] ✓ 数据库状态已更新")
|
||
except Exception as e:
|
||
logger.error(f"{symbol} [平仓] ❌ 更新数据库状态失败: {e}")
|
||
|
||
# 清理本地记录
|
||
await self._stop_position_monitoring(symbol)
|
||
if symbol in self.active_positions:
|
||
del self.active_positions[symbol]
|
||
|
||
return False
|
||
|
||
# 确定平仓方向(与开仓相反)
|
||
position_amt = position['positionAmt']
|
||
side = 'SELL' if position_amt > 0 else 'BUY'
|
||
quantity = abs(position_amt)
|
||
|
||
logger.info(
|
||
f"{symbol} [平仓] 下单信息: {side} {quantity:.4f} @ MARKET "
|
||
f"(持仓数量: {position_amt:.4f})"
|
||
)
|
||
|
||
# 平仓
|
||
order = await self.client.place_order(
|
||
symbol=symbol,
|
||
side=side,
|
||
quantity=quantity,
|
||
order_type='MARKET'
|
||
)
|
||
|
||
if order:
|
||
logger.info(f"{symbol} [平仓] ✓ 平仓订单已提交 (订单ID: {order.get('orderId', 'N/A')})")
|
||
# 获取平仓价格(确保是float类型)
|
||
ticker = await self.client.get_ticker_24h(symbol)
|
||
if not ticker:
|
||
logger.warning(f"无法获取 {symbol} 价格,使用订单价格")
|
||
exit_price = float(order.get('avgPrice', 0)) or float(order.get('price', 0))
|
||
else:
|
||
exit_price = float(ticker['price'])
|
||
|
||
# 更新数据库记录
|
||
if DB_AVAILABLE and Trade and symbol in self.active_positions:
|
||
position_info = self.active_positions[symbol]
|
||
trade_id = position_info.get('tradeId')
|
||
if trade_id:
|
||
try:
|
||
logger.info(f"正在更新 {symbol} 平仓记录到数据库 (ID: {trade_id})...")
|
||
# 计算盈亏(确保所有值都是float类型)
|
||
entry_price = float(position_info['entryPrice'])
|
||
quantity_float = float(quantity)
|
||
exit_price_float = float(exit_price)
|
||
if position_info['side'] == 'BUY':
|
||
pnl = (exit_price_float - entry_price) * quantity_float
|
||
pnl_percent = ((exit_price_float - entry_price) / entry_price) * 100
|
||
else: # SELL
|
||
pnl = (entry_price - exit_price_float) * quantity_float
|
||
pnl_percent = ((entry_price - exit_price_float) / entry_price) * 100
|
||
|
||
Trade.update_exit(
|
||
trade_id=trade_id,
|
||
exit_price=exit_price_float,
|
||
exit_reason=reason,
|
||
pnl=pnl,
|
||
pnl_percent=pnl_percent
|
||
)
|
||
logger.info(
|
||
f"{symbol} [平仓] ✓ 数据库记录已更新 "
|
||
f"(盈亏: {pnl:.2f} USDT, {pnl_percent:.2f}%, 原因: {reason})"
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"❌ 更新平仓记录到数据库失败: {e}")
|
||
logger.error(f" 错误类型: {type(e).__name__}")
|
||
import traceback
|
||
logger.error(f" 错误详情:\n{traceback.format_exc()}")
|
||
else:
|
||
logger.warning(f"{symbol} 没有关联的数据库交易ID,无法更新平仓记录")
|
||
elif not DB_AVAILABLE:
|
||
logger.debug(f"数据库不可用,跳过更新 {symbol} 平仓记录")
|
||
elif not Trade:
|
||
logger.warning(f"Trade模型未导入,无法更新 {symbol} 平仓记录")
|
||
|
||
# 停止WebSocket监控
|
||
await self._stop_position_monitoring(symbol)
|
||
|
||
# 移除持仓记录
|
||
if symbol in self.active_positions:
|
||
del self.active_positions[symbol]
|
||
|
||
logger.info(
|
||
f"{symbol} [平仓] ✓ 平仓完成: {side} {quantity:.4f} @ {exit_price:.4f} "
|
||
f"(原因: {reason})"
|
||
)
|
||
return True
|
||
|
||
return False
|
||
|
||
except Exception as e:
|
||
logger.error(f"{symbol} [平仓] ❌ 平仓失败: {e}")
|
||
import traceback
|
||
logger.error(f" 错误详情:\n{traceback.format_exc()}")
|
||
return False
|
||
|
||
async def check_stop_loss_take_profit(self) -> List[str]:
|
||
"""
|
||
检查止损止盈
|
||
|
||
Returns:
|
||
需要平仓的交易对列表
|
||
"""
|
||
closed_positions = []
|
||
|
||
try:
|
||
# 获取当前持仓
|
||
positions = await self.client.get_open_positions()
|
||
position_dict = {p['symbol']: p for p in positions}
|
||
|
||
for symbol, position_info in list(self.active_positions.items()):
|
||
if symbol not in position_dict:
|
||
# 持仓已不存在,移除记录
|
||
del self.active_positions[symbol]
|
||
continue
|
||
|
||
current_position = position_dict[symbol]
|
||
entry_price = position_info['entryPrice']
|
||
quantity = position_info['quantity'] # 修复:获取quantity
|
||
# 获取当前标记价格
|
||
current_price = current_position.get('markPrice', 0)
|
||
if current_price == 0:
|
||
# 如果标记价格为0,尝试从ticker获取
|
||
ticker = await self.client.get_ticker_24h(symbol)
|
||
if ticker:
|
||
current_price = ticker['price']
|
||
else:
|
||
current_price = entry_price
|
||
|
||
# 计算当前盈亏
|
||
if position_info['side'] == 'BUY':
|
||
pnl_percent = ((current_price - entry_price) / entry_price) * 100
|
||
else:
|
||
pnl_percent = ((entry_price - current_price) / entry_price) * 100
|
||
|
||
# 更新最大盈利
|
||
if pnl_percent > position_info.get('maxProfit', 0):
|
||
position_info['maxProfit'] = pnl_percent
|
||
|
||
# 移动止损逻辑(盈利后保护利润)
|
||
use_trailing = config.TRADING_CONFIG.get('USE_TRAILING_STOP', True)
|
||
if use_trailing:
|
||
trailing_activation = config.TRADING_CONFIG.get('TRAILING_STOP_ACTIVATION', 0.01)
|
||
trailing_protect = config.TRADING_CONFIG.get('TRAILING_STOP_PROTECT', 0.01)
|
||
|
||
if not position_info.get('trailingStopActivated', False):
|
||
# 盈利超过阈值后,激活移动止损
|
||
if pnl_percent > trailing_activation * 100:
|
||
position_info['trailingStopActivated'] = True
|
||
# 将止损移至成本价(保本)
|
||
position_info['stopLoss'] = entry_price
|
||
logger.info(
|
||
f"{symbol} 移动止损激活: 止损移至成本价 {entry_price:.4f} "
|
||
f"(盈利: {pnl_percent:.2f}%)"
|
||
)
|
||
else:
|
||
# 盈利超过2%后,止损移至保护利润位
|
||
if pnl_percent > 2.0:
|
||
if position_info['side'] == 'BUY':
|
||
new_stop_loss = entry_price * (1 + trailing_protect)
|
||
if new_stop_loss > position_info['stopLoss']:
|
||
position_info['stopLoss'] = new_stop_loss
|
||
logger.info(
|
||
f"{symbol} 移动止损更新: {new_stop_loss:.4f} "
|
||
f"(保护{trailing_protect*100:.1f}%利润)"
|
||
)
|
||
else:
|
||
new_stop_loss = entry_price * (1 - trailing_protect)
|
||
if new_stop_loss < position_info['stopLoss']:
|
||
position_info['stopLoss'] = new_stop_loss
|
||
logger.info(
|
||
f"{symbol} 移动止损更新: {new_stop_loss:.4f} "
|
||
f"(保护{trailing_protect*100:.1f}%利润)"
|
||
)
|
||
|
||
# 检查止损(使用更新后的止损价)
|
||
stop_loss = position_info['stopLoss']
|
||
if position_info['side'] == 'BUY' and current_price <= stop_loss:
|
||
logger.warning(
|
||
f"{symbol} 触发止损: {current_price:.4f} <= {stop_loss:.4f} "
|
||
f"(盈亏: {pnl_percent:.2f}%)"
|
||
)
|
||
# 更新数据库
|
||
if DB_AVAILABLE:
|
||
trade_id = position_info.get('tradeId')
|
||
if trade_id:
|
||
try:
|
||
exit_reason = 'trailing_stop' if position_info.get('trailingStopActivated') else 'stop_loss'
|
||
Trade.update_exit(
|
||
trade_id=trade_id,
|
||
exit_price=current_price,
|
||
exit_reason=exit_reason,
|
||
pnl=pnl_percent * entry_price * quantity / 100,
|
||
pnl_percent=pnl_percent
|
||
)
|
||
except Exception as e:
|
||
logger.warning(f"更新止损记录失败: {e}")
|
||
if await self.close_position(symbol):
|
||
closed_positions.append(symbol)
|
||
continue
|
||
|
||
if position_info['side'] == 'SELL' and current_price >= stop_loss:
|
||
logger.warning(
|
||
f"{symbol} 触发止损: {current_price:.4f} >= {stop_loss:.4f} "
|
||
f"(盈亏: {pnl_percent:.2f}%)"
|
||
)
|
||
# 更新数据库
|
||
if DB_AVAILABLE:
|
||
trade_id = position_info.get('tradeId')
|
||
if trade_id:
|
||
try:
|
||
exit_reason = 'trailing_stop' if position_info.get('trailingStopActivated') else 'stop_loss'
|
||
Trade.update_exit(
|
||
trade_id=trade_id,
|
||
exit_price=current_price,
|
||
exit_reason=exit_reason,
|
||
pnl=pnl_percent * entry_price * quantity / 100,
|
||
pnl_percent=pnl_percent
|
||
)
|
||
except Exception as e:
|
||
logger.warning(f"更新止损记录失败: {e}")
|
||
if await self.close_position(symbol):
|
||
closed_positions.append(symbol)
|
||
continue
|
||
|
||
# 检查止盈
|
||
take_profit = position_info['takeProfit']
|
||
if position_info['side'] == 'BUY' and current_price >= take_profit:
|
||
logger.info(
|
||
f"{symbol} 触发止盈: {current_price:.4f} >= {take_profit:.4f} "
|
||
f"(盈亏: {pnl_percent:.2f}%)"
|
||
)
|
||
# 更新数据库
|
||
if DB_AVAILABLE:
|
||
trade_id = position_info.get('tradeId')
|
||
if trade_id:
|
||
try:
|
||
Trade.update_exit(
|
||
trade_id=trade_id,
|
||
exit_price=current_price,
|
||
exit_reason='take_profit',
|
||
pnl=pnl_percent * entry_price * quantity / 100,
|
||
pnl_percent=pnl_percent
|
||
)
|
||
except Exception as e:
|
||
logger.warning(f"更新止盈记录失败: {e}")
|
||
if await self.close_position(symbol):
|
||
closed_positions.append(symbol)
|
||
continue
|
||
|
||
if position_info['side'] == 'SELL' and current_price <= take_profit:
|
||
logger.info(
|
||
f"{symbol} 触发止盈: {current_price:.4f} <= {take_profit:.4f} "
|
||
f"(盈亏: {pnl_percent:.2f}%)"
|
||
)
|
||
# 更新数据库
|
||
if DB_AVAILABLE:
|
||
trade_id = position_info.get('tradeId')
|
||
if trade_id:
|
||
try:
|
||
Trade.update_exit(
|
||
trade_id=trade_id,
|
||
exit_price=current_price,
|
||
exit_reason='take_profit',
|
||
pnl=pnl_percent * entry_price * quantity / 100,
|
||
pnl_percent=pnl_percent
|
||
)
|
||
except Exception as e:
|
||
logger.warning(f"更新止盈记录失败: {e}")
|
||
if await self.close_position(symbol):
|
||
closed_positions.append(symbol)
|
||
continue
|
||
|
||
except Exception as e:
|
||
logger.error(f"检查止损止盈失败: {e}")
|
||
|
||
return closed_positions
|
||
|
||
async def get_position_summary(self) -> Dict:
|
||
"""
|
||
获取持仓摘要
|
||
|
||
Returns:
|
||
持仓摘要信息
|
||
"""
|
||
try:
|
||
positions = await self.client.get_open_positions()
|
||
balance = await self.client.get_account_balance()
|
||
|
||
total_pnl = sum(p['unRealizedProfit'] for p in positions)
|
||
|
||
return {
|
||
'totalPositions': len(positions),
|
||
'totalBalance': balance.get('total', 0),
|
||
'availableBalance': balance.get('available', 0),
|
||
'totalPnL': total_pnl,
|
||
'positions': [
|
||
{
|
||
'symbol': p['symbol'],
|
||
'positionAmt': p['positionAmt'],
|
||
'entryPrice': p['entryPrice'],
|
||
'pnl': p['unRealizedProfit']
|
||
}
|
||
for p in positions
|
||
]
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"获取持仓摘要失败: {e}")
|
||
return {}
|
||
|
||
async def sync_positions_with_binance(self):
|
||
"""
|
||
同步币安实际持仓状态与数据库状态
|
||
检查哪些持仓在数据库中还是open状态,但在币安已经不存在了
|
||
"""
|
||
if not DB_AVAILABLE or not Trade:
|
||
logger.debug("数据库不可用,跳过持仓状态同步")
|
||
return
|
||
|
||
try:
|
||
logger.info("开始同步币安持仓状态与数据库...")
|
||
|
||
# 1. 获取币安实际持仓
|
||
binance_positions = await self.client.get_open_positions()
|
||
binance_symbols = {p['symbol'] for p in binance_positions}
|
||
logger.debug(f"币安实际持仓: {len(binance_symbols)} 个 ({', '.join(binance_symbols) if binance_symbols else '无'})")
|
||
|
||
# 2. 获取数据库中状态为open的交易记录
|
||
db_open_trades = Trade.get_all(status='open')
|
||
db_open_symbols = {t['symbol'] for t in db_open_trades}
|
||
logger.debug(f"数据库open状态: {len(db_open_symbols)} 个 ({', '.join(db_open_symbols) if db_open_symbols else '无'})")
|
||
|
||
# 3. 找出在数据库中open但在币安已不存在的持仓
|
||
missing_in_binance = db_open_symbols - binance_symbols
|
||
|
||
if missing_in_binance:
|
||
logger.warning(
|
||
f"发现 {len(missing_in_binance)} 个持仓在数据库中为open状态,但在币安已不存在: "
|
||
f"{', '.join(missing_in_binance)}"
|
||
)
|
||
|
||
# 4. 更新这些持仓的状态
|
||
for symbol in missing_in_binance:
|
||
trades = Trade.get_by_symbol(symbol, status='open')
|
||
for trade in trades:
|
||
trade_id = trade['id']
|
||
try:
|
||
logger.info(f"{symbol} [状态同步] 更新交易记录状态 (ID: {trade_id})...")
|
||
|
||
# 获取当前价格作为平仓价格
|
||
ticker = await self.client.get_ticker_24h(symbol)
|
||
exit_price = float(ticker['price']) if ticker else float(trade['entry_price'])
|
||
|
||
# 计算盈亏(确保所有值都是float类型,避免Decimal类型问题)
|
||
entry_price = float(trade['entry_price'])
|
||
quantity = float(trade['quantity'])
|
||
if trade['side'] == 'BUY':
|
||
pnl = (exit_price - entry_price) * quantity
|
||
pnl_percent = ((exit_price - entry_price) / entry_price) * 100
|
||
else: # SELL
|
||
pnl = (entry_price - exit_price) * quantity
|
||
pnl_percent = ((entry_price - exit_price) / entry_price) * 100
|
||
|
||
Trade.update_exit(
|
||
trade_id=trade_id,
|
||
exit_price=exit_price,
|
||
exit_reason='sync', # 标记为同步更新
|
||
pnl=pnl,
|
||
pnl_percent=pnl_percent
|
||
)
|
||
|
||
logger.info(
|
||
f"{symbol} [状态同步] ✓ 已更新 (ID: {trade_id}, "
|
||
f"盈亏: {pnl:.2f} USDT, {pnl_percent:.2f}%)"
|
||
)
|
||
|
||
# 清理本地记录
|
||
if symbol in self.active_positions:
|
||
await self._stop_position_monitoring(symbol)
|
||
del self.active_positions[symbol]
|
||
logger.debug(f"{symbol} [状态同步] 已清理本地持仓记录")
|
||
|
||
except Exception as e:
|
||
logger.error(f"{symbol} [状态同步] ❌ 更新失败 (ID: {trade_id}): {e}")
|
||
else:
|
||
logger.info("✓ 持仓状态同步完成,数据库与币安状态一致")
|
||
|
||
# 5. 检查币安有但数据库没有记录的持仓(可能是手动开仓的)
|
||
missing_in_db = binance_symbols - db_open_symbols
|
||
if missing_in_db:
|
||
logger.info(
|
||
f"发现 {len(missing_in_db)} 个持仓在币安存在但数据库中没有记录: "
|
||
f"{', '.join(missing_in_db)} (可能是手动开仓)"
|
||
)
|
||
|
||
# 为手动开仓的持仓创建数据库记录并启动监控
|
||
for symbol in missing_in_db:
|
||
try:
|
||
# 获取币安持仓详情
|
||
binance_position = next(
|
||
(p for p in binance_positions if p['symbol'] == symbol),
|
||
None
|
||
)
|
||
if not binance_position:
|
||
continue
|
||
|
||
position_amt = binance_position['positionAmt']
|
||
entry_price = binance_position['entryPrice']
|
||
quantity = abs(position_amt)
|
||
side = 'BUY' if position_amt > 0 else 'SELL'
|
||
|
||
logger.info(
|
||
f"{symbol} [状态同步] 检测到手动开仓,创建数据库记录... "
|
||
f"({side} {quantity:.4f} @ {entry_price:.4f})"
|
||
)
|
||
|
||
# 创建数据库记录
|
||
trade_id = Trade.create(
|
||
symbol=symbol,
|
||
side=side,
|
||
quantity=quantity,
|
||
entry_price=entry_price,
|
||
leverage=binance_position.get('leverage', 10),
|
||
entry_reason='manual_entry' # 标记为手动开仓
|
||
)
|
||
|
||
logger.info(f"{symbol} [状态同步] ✓ 数据库记录已创建 (ID: {trade_id})")
|
||
|
||
# 创建本地持仓记录(用于监控)
|
||
ticker = await self.client.get_ticker_24h(symbol)
|
||
current_price = ticker['price'] if ticker else entry_price
|
||
|
||
# 计算止损止盈
|
||
stop_loss_price = self.risk_manager.get_stop_loss_price(entry_price, side)
|
||
take_profit_price = self.risk_manager.get_take_profit_price(entry_price, side)
|
||
|
||
position_info = {
|
||
'symbol': symbol,
|
||
'side': side,
|
||
'quantity': quantity,
|
||
'entryPrice': entry_price,
|
||
'changePercent': 0, # 手动开仓,无法计算涨跌幅
|
||
'orderId': None,
|
||
'tradeId': trade_id,
|
||
'stopLoss': stop_loss_price,
|
||
'takeProfit': take_profit_price,
|
||
'initialStopLoss': stop_loss_price,
|
||
'leverage': binance_position.get('leverage', 10),
|
||
'entryReason': 'manual_entry',
|
||
'atr': None,
|
||
'maxProfit': 0.0,
|
||
'trailingStopActivated': False
|
||
}
|
||
|
||
self.active_positions[symbol] = position_info
|
||
|
||
# 启动WebSocket监控
|
||
if self._monitoring_enabled:
|
||
await self._start_position_monitoring(symbol)
|
||
logger.info(f"{symbol} [状态同步] ✓ 已启动实时监控")
|
||
|
||
logger.info(f"{symbol} [状态同步] ✓ 手动开仓同步完成")
|
||
|
||
except Exception as e:
|
||
logger.error(f"{symbol} [状态同步] ❌ 处理手动开仓失败: {e}")
|
||
import traceback
|
||
logger.error(f" 错误详情:\n{traceback.format_exc()}")
|
||
|
||
logger.info("持仓状态同步完成")
|
||
|
||
except Exception as e:
|
||
logger.error(f"同步持仓状态失败: {e}")
|
||
import traceback
|
||
logger.error(f"错误详情:\n{traceback.format_exc()}")
|
||
|
||
async def start_all_position_monitoring(self):
|
||
"""
|
||
启动所有持仓的WebSocket实时监控
|
||
"""
|
||
if not self._monitoring_enabled:
|
||
logger.info("实时监控已禁用,跳过启动")
|
||
return
|
||
|
||
if not self.client.socket_manager:
|
||
logger.warning("WebSocket未初始化,无法启动实时监控")
|
||
return
|
||
|
||
# 获取当前所有持仓
|
||
positions = await self.client.get_open_positions()
|
||
for position in positions:
|
||
symbol = position['symbol']
|
||
if symbol not in self._monitor_tasks and symbol in self.active_positions:
|
||
await self._start_position_monitoring(symbol)
|
||
|
||
logger.info(f"已启动 {len(self._monitor_tasks)} 个持仓的实时监控")
|
||
|
||
async def stop_all_position_monitoring(self):
|
||
"""
|
||
停止所有持仓的WebSocket监控
|
||
"""
|
||
symbols = list(self._monitor_tasks.keys())
|
||
for symbol in symbols:
|
||
await self._stop_position_monitoring(symbol)
|
||
logger.info(f"已停止所有持仓监控 ({len(symbols)} 个)")
|
||
|
||
async def _start_position_monitoring(self, symbol: str):
|
||
"""
|
||
启动单个持仓的WebSocket价格监控
|
||
|
||
Args:
|
||
symbol: 交易对
|
||
"""
|
||
if symbol in self._monitor_tasks:
|
||
logger.debug(f"{symbol} 监控任务已存在,跳过")
|
||
return
|
||
|
||
if not self.client.socket_manager:
|
||
logger.warning(f"{symbol} WebSocket未初始化,无法启动监控")
|
||
return
|
||
|
||
try:
|
||
task = asyncio.create_task(self._monitor_position_price(symbol))
|
||
self._monitor_tasks[symbol] = task
|
||
logger.info(f"✓ 启动 {symbol} WebSocket实时价格监控")
|
||
except Exception as e:
|
||
logger.error(f"启动 {symbol} 监控失败: {e}")
|
||
|
||
async def _stop_position_monitoring(self, symbol: str):
|
||
"""
|
||
停止单个持仓的WebSocket监控
|
||
|
||
Args:
|
||
symbol: 交易对
|
||
"""
|
||
if symbol not in self._monitor_tasks:
|
||
return
|
||
|
||
task = self._monitor_tasks[symbol]
|
||
if not task.done():
|
||
task.cancel()
|
||
try:
|
||
await task
|
||
except asyncio.CancelledError:
|
||
pass
|
||
|
||
del self._monitor_tasks[symbol]
|
||
logger.debug(f"已停止 {symbol} 的WebSocket监控")
|
||
|
||
async def _monitor_position_price(self, symbol: str):
|
||
"""
|
||
监控单个持仓的价格(WebSocket实时推送)
|
||
|
||
Args:
|
||
symbol: 交易对
|
||
"""
|
||
retry_count = 0
|
||
max_retries = 5
|
||
|
||
while retry_count < max_retries:
|
||
try:
|
||
if symbol not in self.active_positions:
|
||
logger.info(f"{symbol} 持仓已不存在,停止监控")
|
||
break
|
||
|
||
# 使用WebSocket订阅价格流
|
||
async with self.client.socket_manager.futures_socket(symbol.lower()) as stream:
|
||
logger.debug(f"{symbol} WebSocket连接已建立,开始接收价格更新")
|
||
retry_count = 0 # 连接成功,重置重试计数
|
||
|
||
async for msg in stream:
|
||
if symbol not in self.active_positions:
|
||
logger.info(f"{symbol} 持仓已不存在,停止监控")
|
||
break
|
||
|
||
if 'data' in msg:
|
||
try:
|
||
current_price = float(msg['data']['c']) # 最新价格
|
||
# 立即检查止损止盈
|
||
await self._check_single_position(symbol, current_price)
|
||
except (KeyError, ValueError, TypeError) as e:
|
||
logger.debug(f"{symbol} 解析价格数据失败: {e}")
|
||
continue
|
||
|
||
except asyncio.CancelledError:
|
||
logger.info(f"{symbol} 监控任务已取消")
|
||
break
|
||
except Exception as e:
|
||
retry_count += 1
|
||
logger.warning(f"{symbol} WebSocket监控出错 (重试 {retry_count}/{max_retries}): {e}")
|
||
|
||
if retry_count < max_retries:
|
||
# 指数退避重试
|
||
wait_time = min(2 ** retry_count, 30)
|
||
logger.info(f"{symbol} {wait_time}秒后重试连接...")
|
||
await asyncio.sleep(wait_time)
|
||
else:
|
||
logger.error(f"{symbol} WebSocket监控失败,已达到最大重试次数")
|
||
# 回退到定时检查模式
|
||
logger.info(f"{symbol} 将使用定时检查模式(非实时)")
|
||
break
|
||
|
||
# 清理任务
|
||
if symbol in self._monitor_tasks:
|
||
del self._monitor_tasks[symbol]
|
||
|
||
async def _check_single_position(self, symbol: str, current_price: float):
|
||
"""
|
||
检查单个持仓的止损止盈(实时检查)
|
||
|
||
Args:
|
||
symbol: 交易对
|
||
current_price: 当前价格
|
||
"""
|
||
position_info = self.active_positions.get(symbol)
|
||
if not position_info:
|
||
return
|
||
|
||
# 确保所有值都是float类型
|
||
entry_price = float(position_info['entryPrice'])
|
||
quantity = float(position_info['quantity'])
|
||
current_price_float = float(current_price)
|
||
|
||
# 计算当前盈亏
|
||
if position_info['side'] == 'BUY':
|
||
pnl_percent = ((current_price_float - entry_price) / entry_price) * 100
|
||
else: # SELL
|
||
pnl_percent = ((entry_price - current_price_float) / entry_price) * 100
|
||
|
||
# 更新最大盈利
|
||
if pnl_percent > position_info.get('maxProfit', 0):
|
||
position_info['maxProfit'] = pnl_percent
|
||
|
||
# 移动止损逻辑(盈利后保护利润)
|
||
use_trailing = config.TRADING_CONFIG.get('USE_TRAILING_STOP', True)
|
||
if use_trailing:
|
||
trailing_activation = config.TRADING_CONFIG.get('TRAILING_STOP_ACTIVATION', 0.01)
|
||
trailing_protect = config.TRADING_CONFIG.get('TRAILING_STOP_PROTECT', 0.01)
|
||
|
||
if not position_info.get('trailingStopActivated', False):
|
||
# 盈利超过阈值后,激活移动止损
|
||
if pnl_percent > trailing_activation * 100:
|
||
position_info['trailingStopActivated'] = True
|
||
# 将止损移至成本价(保本)
|
||
position_info['stopLoss'] = entry_price
|
||
logger.info(
|
||
f"{symbol} [实时监控] 移动止损激活: 止损移至成本价 {entry_price:.4f} "
|
||
f"(盈利: {pnl_percent:.2f}%)"
|
||
)
|
||
else:
|
||
# 盈利超过2%后,止损移至保护利润位
|
||
if pnl_percent > 2.0:
|
||
if position_info['side'] == 'BUY':
|
||
new_stop_loss = entry_price * (1 + trailing_protect)
|
||
if new_stop_loss > position_info['stopLoss']:
|
||
position_info['stopLoss'] = new_stop_loss
|
||
logger.info(
|
||
f"{symbol} [实时监控] 移动止损更新: {new_stop_loss:.4f} "
|
||
f"(保护{trailing_protect*100:.1f}%利润)"
|
||
)
|
||
else: # SELL
|
||
new_stop_loss = entry_price * (1 - trailing_protect)
|
||
if new_stop_loss < position_info['stopLoss']:
|
||
position_info['stopLoss'] = new_stop_loss
|
||
logger.info(
|
||
f"{symbol} [实时监控] 移动止损更新: {new_stop_loss:.4f} "
|
||
f"(保护{trailing_protect*100:.1f}%利润)"
|
||
)
|
||
|
||
# 检查止损
|
||
stop_loss = position_info['stopLoss']
|
||
should_close = False
|
||
exit_reason = None
|
||
|
||
if position_info['side'] == 'BUY' and current_price <= stop_loss:
|
||
should_close = True
|
||
exit_reason = 'trailing_stop' if position_info.get('trailingStopActivated') else 'stop_loss'
|
||
logger.warning(
|
||
f"{symbol} [实时监控] 触发止损: {current_price:.4f} <= {stop_loss:.4f} "
|
||
f"(盈亏: {pnl_percent:.2f}%)"
|
||
)
|
||
elif position_info['side'] == 'SELL' and current_price >= stop_loss:
|
||
should_close = True
|
||
exit_reason = 'trailing_stop' if position_info.get('trailingStopActivated') else 'stop_loss'
|
||
logger.warning(
|
||
f"{symbol} [实时监控] 触发止损: {current_price:.4f} >= {stop_loss:.4f} "
|
||
f"(盈亏: {pnl_percent:.2f}%)"
|
||
)
|
||
|
||
# 检查止盈
|
||
if not should_close:
|
||
take_profit = position_info['takeProfit']
|
||
if position_info['side'] == 'BUY' and current_price >= take_profit:
|
||
should_close = True
|
||
exit_reason = 'take_profit'
|
||
logger.info(
|
||
f"{symbol} [实时监控] 触发止盈: {current_price:.4f} >= {take_profit:.4f} "
|
||
f"(盈亏: {pnl_percent:.2f}%)"
|
||
)
|
||
elif position_info['side'] == 'SELL' and current_price <= take_profit:
|
||
should_close = True
|
||
exit_reason = 'take_profit'
|
||
logger.info(
|
||
f"{symbol} [实时监控] 触发止盈: {current_price:.4f} <= {take_profit:.4f} "
|
||
f"(盈亏: {pnl_percent:.2f}%)"
|
||
)
|
||
|
||
# 如果触发止损止盈,执行平仓
|
||
if should_close:
|
||
logger.info(
|
||
f"{symbol} [自动平仓] 开始执行平仓操作 | "
|
||
f"原因: {exit_reason} | "
|
||
f"入场价: {entry_price:.4f} | "
|
||
f"当前价: {current_price:.4f} | "
|
||
f"盈亏: {pnl_percent:.2f}% | "
|
||
f"数量: {quantity:.4f}"
|
||
)
|
||
|
||
# 更新数据库
|
||
if DB_AVAILABLE and Trade:
|
||
trade_id = position_info.get('tradeId')
|
||
if trade_id:
|
||
try:
|
||
if position_info['side'] == 'BUY':
|
||
pnl = (current_price_float - entry_price) * quantity
|
||
else: # SELL
|
||
pnl = (entry_price - current_price_float) * quantity
|
||
|
||
logger.info(f"{symbol} [自动平仓] 更新数据库记录 (ID: {trade_id})...")
|
||
Trade.update_exit(
|
||
trade_id=trade_id,
|
||
exit_price=current_price_float,
|
||
exit_reason=exit_reason,
|
||
pnl=pnl,
|
||
pnl_percent=pnl_percent
|
||
)
|
||
logger.info(f"{symbol} [自动平仓] ✓ 数据库记录已更新 (盈亏: {pnl:.2f} USDT)")
|
||
except Exception as e:
|
||
logger.error(f"{symbol} [自动平仓] ❌ 更新数据库记录失败: {e}")
|
||
import traceback
|
||
logger.error(f" 错误详情:\n{traceback.format_exc()}")
|
||
|
||
# 执行平仓(注意:这里会停止监控,所以先更新数据库)
|
||
logger.info(f"{symbol} [自动平仓] 正在执行平仓订单...")
|
||
success = await self.close_position(symbol, reason=exit_reason)
|
||
if success:
|
||
logger.info(f"{symbol} [自动平仓] ✓ 平仓成功完成")
|
||
else:
|
||
logger.error(f"{symbol} [自动平仓] ❌ 平仓失败") |