auto_trade_sys/trading_system/position_manager.py
薇薇安 8a89592cb5 a
2026-01-13 17:30:59 +08:00

472 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
仓位管理模块 - 管理持仓和订单
"""
import logging
from typing import Dict, List, Optional
try:
from .binance_client import BinanceClient
from .risk_manager import RiskManager
from . import config
except ImportError:
from binance_client import BinanceClient
from risk_manager import RiskManager
import config
# 尝试导入数据库模型(如果可用)
try:
import sys
from pathlib import Path
project_root = Path(__file__).parent.parent
backend_path = project_root / 'backend'
if backend_path.exists():
sys.path.insert(0, str(backend_path))
from database.models import Trade
DB_AVAILABLE = True
else:
DB_AVAILABLE = False
except Exception:
DB_AVAILABLE = False
logger = logging.getLogger(__name__)
class PositionManager:
"""仓位管理类"""
def __init__(self, client: BinanceClient, risk_manager: RiskManager):
"""
初始化仓位管理器
Args:
client: 币安客户端
risk_manager: 风险管理器
"""
self.client = client
self.risk_manager = risk_manager
self.active_positions: Dict[str, Dict] = {}
async def open_position(
self,
symbol: str,
change_percent: float,
leverage: int = 10,
trade_direction: Optional[str] = None,
entry_reason: str = '',
atr: Optional[float] = None
) -> Optional[Dict]:
"""
开仓
Args:
symbol: 交易对
change_percent: 涨跌幅百分比
leverage: 杠杆倍数
Returns:
订单信息失败返回None
"""
try:
# 判断是否应该交易
if not await self.risk_manager.should_trade(symbol, change_percent):
return None
# 设置杠杆
await self.client.set_leverage(symbol, leverage)
# 计算仓位大小
logger.info(f"开始为 {symbol} 计算仓位大小...")
quantity = await self.risk_manager.calculate_position_size(
symbol, change_percent
)
if quantity is None:
logger.warning(f"{symbol} 仓位计算失败,跳过交易")
logger.warning(f" 可能原因:")
logger.warning(f" 1. 账户余额不足")
logger.warning(f" 2. 单笔仓位超过限制")
logger.warning(f" 3. 总仓位超过限制")
logger.warning(f" 4. 无法获取价格数据")
return None
logger.info(f"{symbol} 仓位计算成功: {quantity:.4f}")
# 确定交易方向(优先使用技术指标信号)
if trade_direction:
side = trade_direction
else:
side = 'BUY' if change_percent > 0 else 'SELL'
# 获取当前价格
ticker = await self.client.get_ticker_24h(symbol)
if not ticker:
return None
entry_price = ticker['price']
# 计算动态止损止盈使用ATR或固定比例
if atr and atr > 0:
# 使用ATR计算动态止损2倍ATR
atr_stop_loss_pct = (atr * 2) / entry_price
# 限制在合理范围内1%-5%
atr_stop_loss_pct = max(0.01, min(0.05, atr_stop_loss_pct))
stop_loss_price = self.risk_manager.get_stop_loss_price(
entry_price, side, stop_loss_pct=atr_stop_loss_pct
)
# 止盈为止损的1.5-2倍
take_profit_pct = atr_stop_loss_pct * 1.8
take_profit_price = self.risk_manager.get_take_profit_price(
entry_price, side, take_profit_pct=take_profit_pct
)
else:
# 使用固定止损止盈
stop_loss_price = self.risk_manager.get_stop_loss_price(entry_price, side)
take_profit_price = self.risk_manager.get_take_profit_price(entry_price, side)
# 下单
order = await self.client.place_order(
symbol=symbol,
side=side,
quantity=quantity,
order_type='MARKET'
)
if order:
# 记录到数据库
trade_id = None
if DB_AVAILABLE:
try:
trade_id = Trade.create(
symbol=symbol,
side=side,
quantity=quantity,
entry_price=entry_price,
leverage=leverage,
entry_reason=entry_reason
)
logger.info(f"{symbol} 交易记录已保存到数据库 (ID: {trade_id})")
except Exception as e:
logger.warning(f"保存交易记录到数据库失败: {e}")
# 记录持仓信息(包含动态止损止盈)
position_info = {
'symbol': symbol,
'side': side,
'quantity': quantity,
'entryPrice': entry_price,
'changePercent': change_percent,
'orderId': order.get('orderId'),
'tradeId': trade_id, # 数据库交易ID
'stopLoss': stop_loss_price,
'takeProfit': take_profit_price,
'initialStopLoss': stop_loss_price, # 初始止损(用于移动止损)
'leverage': leverage,
'entryReason': entry_reason,
'atr': atr,
'maxProfit': 0.0, # 记录最大盈利(用于移动止损)
'trailingStopActivated': False # 移动止损是否已激活
}
self.active_positions[symbol] = position_info
logger.info(
f"开仓成功: {symbol} {side} {quantity} @ {entry_price:.4f} "
f"(涨跌幅: {change_percent:.2f}%)"
)
return position_info
return None
except Exception as e:
logger.error(f"开仓失败 {symbol}: {e}")
return None
async def close_position(self, symbol: str) -> bool:
"""
平仓
Args:
symbol: 交易对
Returns:
是否成功
"""
try:
# 获取当前持仓
positions = await self.client.get_open_positions()
position = next(
(p for p in positions if p['symbol'] == symbol),
None
)
if not position:
logger.warning(f"{symbol} 没有持仓")
return False
# 确定平仓方向(与开仓相反)
position_amt = position['positionAmt']
side = 'SELL' if position_amt > 0 else 'BUY'
quantity = abs(position_amt)
# 平仓
order = await self.client.place_order(
symbol=symbol,
side=side,
quantity=quantity,
order_type='MARKET'
)
if order:
# 更新数据库记录
if DB_AVAILABLE and symbol in self.active_positions:
position_info = self.active_positions[symbol]
trade_id = position_info.get('tradeId')
if trade_id:
try:
# 计算盈亏
entry_price = position_info['entryPrice']
exit_price = ticker['price'] if 'ticker' in locals() else current_price
if position_info['side'] == 'BUY':
pnl = (exit_price - entry_price) * quantity
pnl_percent = ((exit_price - entry_price) / entry_price) * 100
else:
pnl = (entry_price - exit_price) * quantity
pnl_percent = ((entry_price - exit_price) / entry_price) * 100
Trade.update_exit(
trade_id=trade_id,
exit_price=exit_price,
exit_reason='manual',
pnl=pnl,
pnl_percent=pnl_percent
)
logger.info(f"{symbol} 平仓记录已更新到数据库")
except Exception as e:
logger.warning(f"更新平仓记录到数据库失败: {e}")
# 移除持仓记录
if symbol in self.active_positions:
del self.active_positions[symbol]
logger.info(f"平仓成功: {symbol} {side} {quantity}")
return True
return False
except Exception as e:
logger.error(f"平仓失败 {symbol}: {e}")
return False
async def check_stop_loss_take_profit(self) -> List[str]:
"""
检查止损止盈
Returns:
需要平仓的交易对列表
"""
closed_positions = []
try:
# 获取当前持仓
positions = await self.client.get_open_positions()
position_dict = {p['symbol']: p for p in positions}
for symbol, position_info in list(self.active_positions.items()):
if symbol not in position_dict:
# 持仓已不存在,移除记录
del self.active_positions[symbol]
continue
current_position = position_dict[symbol]
entry_price = position_info['entryPrice']
# 获取当前标记价格
current_price = current_position.get('markPrice', 0)
if current_price == 0:
# 如果标记价格为0尝试从ticker获取
ticker = await self.client.get_ticker_24h(symbol)
if ticker:
current_price = ticker['price']
else:
current_price = entry_price
# 计算当前盈亏
if position_info['side'] == 'BUY':
pnl_percent = ((current_price - entry_price) / entry_price) * 100
else:
pnl_percent = ((entry_price - current_price) / entry_price) * 100
# 更新最大盈利
if pnl_percent > position_info.get('maxProfit', 0):
position_info['maxProfit'] = pnl_percent
# 移动止损逻辑(盈利后保护利润)
use_trailing = config.TRADING_CONFIG.get('USE_TRAILING_STOP', True)
if use_trailing:
trailing_activation = config.TRADING_CONFIG.get('TRAILING_STOP_ACTIVATION', 0.01)
trailing_protect = config.TRADING_CONFIG.get('TRAILING_STOP_PROTECT', 0.01)
if not position_info.get('trailingStopActivated', False):
# 盈利超过阈值后,激活移动止损
if pnl_percent > trailing_activation * 100:
position_info['trailingStopActivated'] = True
# 将止损移至成本价(保本)
position_info['stopLoss'] = entry_price
logger.info(
f"{symbol} 移动止损激活: 止损移至成本价 {entry_price:.4f} "
f"(盈利: {pnl_percent:.2f}%)"
)
else:
# 盈利超过2%后,止损移至保护利润位
if pnl_percent > 2.0:
if position_info['side'] == 'BUY':
new_stop_loss = entry_price * (1 + trailing_protect)
if new_stop_loss > position_info['stopLoss']:
position_info['stopLoss'] = new_stop_loss
logger.info(
f"{symbol} 移动止损更新: {new_stop_loss:.4f} "
f"(保护{trailing_protect*100:.1f}%利润)"
)
else:
new_stop_loss = entry_price * (1 - trailing_protect)
if new_stop_loss < position_info['stopLoss']:
position_info['stopLoss'] = new_stop_loss
logger.info(
f"{symbol} 移动止损更新: {new_stop_loss:.4f} "
f"(保护{trailing_protect*100:.1f}%利润)"
)
# 检查止损(使用更新后的止损价)
stop_loss = position_info['stopLoss']
if position_info['side'] == 'BUY' and current_price <= stop_loss:
logger.warning(
f"{symbol} 触发止损: {current_price:.4f} <= {stop_loss:.4f} "
f"(盈亏: {pnl_percent:.2f}%)"
)
# 更新数据库
if DB_AVAILABLE:
trade_id = position_info.get('tradeId')
if trade_id:
try:
exit_reason = 'trailing_stop' if position_info.get('trailingStopActivated') else 'stop_loss'
Trade.update_exit(
trade_id=trade_id,
exit_price=current_price,
exit_reason=exit_reason,
pnl=pnl_percent * entry_price * quantity / 100,
pnl_percent=pnl_percent
)
except Exception as e:
logger.warning(f"更新止损记录失败: {e}")
if await self.close_position(symbol):
closed_positions.append(symbol)
continue
if position_info['side'] == 'SELL' and current_price >= stop_loss:
logger.warning(
f"{symbol} 触发止损: {current_price:.4f} >= {stop_loss:.4f} "
f"(盈亏: {pnl_percent:.2f}%)"
)
# 更新数据库
if DB_AVAILABLE:
trade_id = position_info.get('tradeId')
if trade_id:
try:
exit_reason = 'trailing_stop' if position_info.get('trailingStopActivated') else 'stop_loss'
Trade.update_exit(
trade_id=trade_id,
exit_price=current_price,
exit_reason=exit_reason,
pnl=pnl_percent * entry_price * quantity / 100,
pnl_percent=pnl_percent
)
except Exception as e:
logger.warning(f"更新止损记录失败: {e}")
if await self.close_position(symbol):
closed_positions.append(symbol)
continue
# 检查止盈
take_profit = position_info['takeProfit']
if position_info['side'] == 'BUY' and current_price >= take_profit:
logger.info(
f"{symbol} 触发止盈: {current_price:.4f} >= {take_profit:.4f} "
f"(盈亏: {pnl_percent:.2f}%)"
)
# 更新数据库
if DB_AVAILABLE:
trade_id = position_info.get('tradeId')
if trade_id:
try:
Trade.update_exit(
trade_id=trade_id,
exit_price=current_price,
exit_reason='take_profit',
pnl=pnl_percent * entry_price * quantity / 100,
pnl_percent=pnl_percent
)
except Exception as e:
logger.warning(f"更新止盈记录失败: {e}")
if await self.close_position(symbol):
closed_positions.append(symbol)
continue
if position_info['side'] == 'SELL' and current_price <= take_profit:
logger.info(
f"{symbol} 触发止盈: {current_price:.4f} <= {take_profit:.4f} "
f"(盈亏: {pnl_percent:.2f}%)"
)
# 更新数据库
if DB_AVAILABLE:
trade_id = position_info.get('tradeId')
if trade_id:
try:
Trade.update_exit(
trade_id=trade_id,
exit_price=current_price,
exit_reason='take_profit',
pnl=pnl_percent * entry_price * quantity / 100,
pnl_percent=pnl_percent
)
except Exception as e:
logger.warning(f"更新止盈记录失败: {e}")
if await self.close_position(symbol):
closed_positions.append(symbol)
continue
except Exception as e:
logger.error(f"检查止损止盈失败: {e}")
return closed_positions
async def get_position_summary(self) -> Dict:
"""
获取持仓摘要
Returns:
持仓摘要信息
"""
try:
positions = await self.client.get_open_positions()
balance = await self.client.get_account_balance()
total_pnl = sum(p['unRealizedProfit'] for p in positions)
return {
'totalPositions': len(positions),
'totalBalance': balance.get('total', 0),
'availableBalance': balance.get('available', 0),
'totalPnL': total_pnl,
'positions': [
{
'symbol': p['symbol'],
'positionAmt': p['positionAmt'],
'entryPrice': p['entryPrice'],
'pnl': p['unRealizedProfit']
}
for p in positions
]
}
except Exception as e:
logger.error(f"获取持仓摘要失败: {e}")
return {}