auto_trade_sys/trading_system/trade_recommender.py
薇薇安 b08d97b442 a
2026-01-15 11:34:53 +08:00

342 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
推荐交易对模块 - 生成交易推荐供手动参考
"""
import asyncio
import logging
from typing import List, Dict, Optional
from datetime import datetime, timedelta
try:
from .binance_client import BinanceClient
from .market_scanner import MarketScanner
from .risk_manager import RiskManager
from . import config
except ImportError:
from binance_client import BinanceClient
from market_scanner import MarketScanner
from risk_manager import RiskManager
import config
logger = logging.getLogger(__name__)
# 尝试导入数据库模型
DB_AVAILABLE = False
TradeRecommendation = None
try:
import sys
from pathlib import Path
project_root = Path(__file__).parent.parent
backend_path = project_root / 'backend'
if backend_path.exists():
sys.path.insert(0, str(backend_path))
from database.models import TradeRecommendation
DB_AVAILABLE = True
logger.info("✓ 数据库模型导入成功,推荐记录将保存到数据库")
else:
logger.warning("⚠ backend目录不存在无法使用数据库功能")
DB_AVAILABLE = False
except ImportError as e:
logger.warning(f"⚠ 无法导入数据库模型: {e}")
DB_AVAILABLE = False
except Exception as e:
logger.warning(f"⚠ 数据库初始化失败: {e}")
DB_AVAILABLE = False
class TradeRecommender:
"""推荐交易对生成器"""
def __init__(
self,
client: BinanceClient,
scanner: MarketScanner,
risk_manager: RiskManager
):
"""
初始化推荐器
Args:
client: 币安客户端
scanner: 市场扫描器
risk_manager: 风险管理器
"""
self.client = client
self.scanner = scanner
self.risk_manager = risk_manager
async def generate_recommendations(
self,
min_signal_strength: int = 5,
max_recommendations: int = 20
) -> List[Dict]:
"""
生成交易推荐
Args:
min_signal_strength: 最小信号强度默认5低于此强度的不推荐
max_recommendations: 最大推荐数量
Returns:
推荐列表
"""
logger.info("开始生成交易推荐...")
# 1. 扫描市场
top_symbols = await self.scanner.scan_market()
if not top_symbols:
logger.warning("未找到符合条件的交易对")
return []
recommendations = []
# 2. 对每个交易对进行分析
for symbol_info in top_symbols:
if len(recommendations) >= max_recommendations:
break
symbol = symbol_info['symbol']
current_price = symbol_info['price']
change_percent = symbol_info.get('changePercent', 0)
# 3. 分析交易信号(使用策略模块的逻辑)
trade_signal = await self._analyze_trade_signal(symbol_info)
# 4. 如果信号强度足够,生成推荐
if trade_signal['should_trade'] and trade_signal['strength'] >= min_signal_strength:
recommendation = await self._create_recommendation(
symbol_info, trade_signal
)
if recommendation:
recommendations.append(recommendation)
logger.info(f"生成了 {len(recommendations)} 个交易推荐")
return recommendations
async def _analyze_trade_signal(self, symbol_info: Dict) -> Dict:
"""
分析交易信号(复用策略模块的逻辑)
Args:
symbol_info: 交易对信息
Returns:
交易信号字典
"""
symbol = symbol_info['symbol']
current_price = symbol_info['price']
rsi = symbol_info.get('rsi')
macd = symbol_info.get('macd')
bollinger = symbol_info.get('bollinger')
market_regime = symbol_info.get('marketRegime', 'unknown')
ema20 = symbol_info.get('ema20')
ema50 = symbol_info.get('ema50')
ema20_4h = symbol_info.get('ema20_4h')
price_4h = symbol_info.get('price_4h', current_price)
# 判断4H周期趋势方向
trend_4h = None
if ema20_4h is not None:
if price_4h > ema20_4h:
trend_4h = 'up'
elif price_4h < ema20_4h:
trend_4h = 'down'
else:
trend_4h = 'neutral'
signal_strength = 0
reasons = []
direction = None
# 策略1均值回归震荡市场
if market_regime == 'ranging':
if rsi and rsi < 30:
if trend_4h in ('up', 'neutral', None):
signal_strength += 4
reasons.append(f"RSI超卖({rsi:.1f})")
if direction is None:
direction = 'BUY'
elif rsi and rsi > 70:
if trend_4h in ('down', 'neutral', None):
signal_strength += 4
reasons.append(f"RSI超买({rsi:.1f})")
if direction is None:
direction = 'SELL'
if bollinger and current_price <= bollinger.get('lower'):
if trend_4h in ('up', 'neutral', None):
signal_strength += 3
reasons.append("触及布林带下轨")
if direction is None:
direction = 'BUY'
elif bollinger and current_price >= bollinger.get('upper'):
if trend_4h in ('down', 'neutral', None):
signal_strength += 3
reasons.append("触及布林带上轨")
if direction is None:
direction = 'SELL'
# 策略2趋势跟踪趋势市场
elif market_regime == 'trending':
if macd and macd.get('macd', 0) > macd.get('signal', 0) and macd.get('histogram', 0) > 0:
if trend_4h in ('up', 'neutral', None):
signal_strength += 3
reasons.append("MACD金叉")
if direction is None:
direction = 'BUY'
elif macd and macd.get('macd', 0) < macd.get('signal', 0) and macd.get('histogram', 0) < 0:
if trend_4h in ('down', 'neutral', None):
signal_strength += 3
reasons.append("MACD死叉")
if direction is None:
direction = 'SELL'
if ema20 and ema50:
if current_price > ema20 > ema50:
if trend_4h in ('up', 'neutral', None):
signal_strength += 2
reasons.append("价格在均线之上")
if direction is None:
direction = 'BUY'
elif current_price < ema20 < ema50:
if trend_4h in ('down', 'neutral', None):
signal_strength += 2
reasons.append("价格在均线之下")
if direction is None:
direction = 'SELL'
# 多周期共振加分
if direction and trend_4h:
if (direction == 'BUY' and trend_4h == 'up') or (direction == 'SELL' and trend_4h == 'down'):
signal_strength += 2
reasons.append("4H周期共振确认")
# 判断是否应该交易
min_signal_strength = config.TRADING_CONFIG.get('MIN_SIGNAL_STRENGTH', 7)
should_trade = signal_strength >= min_signal_strength
# 禁止逆4H趋势交易
if direction and trend_4h:
if (direction == 'BUY' and trend_4h == 'down') or (direction == 'SELL' and trend_4h == 'up'):
should_trade = False
reasons.append("❌ 禁止逆4H趋势交易")
return {
'should_trade': should_trade,
'direction': direction,
'reason': ', '.join(reasons) if reasons else '无明确信号',
'strength': signal_strength,
'trend_4h': trend_4h
}
async def _create_recommendation(
self,
symbol_info: Dict,
trade_signal: Dict
) -> Optional[Dict]:
"""
创建推荐记录
Args:
symbol_info: 交易对信息
trade_signal: 交易信号
Returns:
推荐字典
"""
try:
symbol = symbol_info['symbol']
current_price = symbol_info['price']
direction = trade_signal['direction']
# 计算建议的止损止盈
entry_price = current_price
stop_loss_price = self.risk_manager.get_stop_loss_price(
entry_price,
direction,
klines=symbol_info.get('klines'),
bollinger=symbol_info.get('bollinger'),
atr=symbol_info.get('atr')
)
# 计算止损百分比
if direction == 'BUY':
stop_loss_pct = (entry_price - stop_loss_price) / entry_price
else:
stop_loss_pct = (stop_loss_price - entry_price) / entry_price
# 第一目标盈亏比1:1
if direction == 'BUY':
take_profit_1 = entry_price + (entry_price - stop_loss_price)
else:
take_profit_1 = entry_price - (stop_loss_price - entry_price)
# 第二目标止损的2.5倍
take_profit_2_pct = stop_loss_pct * 2.5
take_profit_2 = self.risk_manager.get_take_profit_price(
entry_price, direction, take_profit_pct=take_profit_2_pct
)
# 建议仓位(根据信号强度调整)
base_position_pct = config.TRADING_CONFIG.get('MAX_POSITION_PERCENT', 0.05)
signal_strength = trade_signal['strength']
# 信号强度越高建议仓位可以适当增加但不超过1.5倍)
position_multiplier = min(1.0 + (signal_strength - 5) * 0.1, 1.5)
suggested_position_pct = base_position_pct * position_multiplier
# 准备推荐数据
recommendation_data = {
'symbol': symbol,
'direction': direction,
'current_price': current_price,
'change_percent': symbol_info.get('changePercent', 0),
'recommendation_reason': trade_signal['reason'],
'signal_strength': signal_strength,
'market_regime': symbol_info.get('marketRegime'),
'trend_4h': trade_signal.get('trend_4h'),
'rsi': symbol_info.get('rsi'),
'macd_histogram': symbol_info.get('macd', {}).get('histogram') if symbol_info.get('macd') else None,
'bollinger_upper': symbol_info.get('bollinger', {}).get('upper') if symbol_info.get('bollinger') else None,
'bollinger_middle': symbol_info.get('bollinger', {}).get('middle') if symbol_info.get('bollinger') else None,
'bollinger_lower': symbol_info.get('bollinger', {}).get('lower') if symbol_info.get('bollinger') else None,
'ema20': symbol_info.get('ema20'),
'ema50': symbol_info.get('ema50'),
'ema20_4h': symbol_info.get('ema20_4h'),
'atr': symbol_info.get('atr'),
'suggested_stop_loss': stop_loss_price,
'suggested_take_profit_1': take_profit_1,
'suggested_take_profit_2': take_profit_2,
'suggested_position_percent': suggested_position_pct,
'suggested_leverage': config.TRADING_CONFIG.get('LEVERAGE', 10),
'volume_24h': symbol_info.get('volume24h'),
'volatility': symbol_info.get('volatility')
}
# 保存到数据库
if DB_AVAILABLE and TradeRecommendation:
try:
recommendation_id = TradeRecommendation.create(**recommendation_data)
logger.info(
f"✓ 推荐已保存: {symbol} {direction} "
f"(信号强度: {signal_strength}/10, ID: {recommendation_id})"
)
recommendation_data['id'] = recommendation_id
except Exception as e:
logger.error(f"保存推荐到数据库失败: {e}")
return recommendation_data
except Exception as e:
logger.error(f"创建推荐失败 {symbol_info.get('symbol', 'unknown')}: {e}")
return None
async def get_active_recommendations(self) -> List[Dict]:
"""获取当前有效的推荐"""
if DB_AVAILABLE and TradeRecommendation:
return TradeRecommendation.get_active()
return []
async def mark_recommendation_executed(self, recommendation_id: int, trade_id: int = None):
"""标记推荐已执行"""
if DB_AVAILABLE and TradeRecommendation:
TradeRecommendation.mark_executed(recommendation_id, trade_id)
logger.info(f"推荐 {recommendation_id} 已标记为已执行")