""" 推荐交易对模块 - 生成交易推荐供手动参考 """ import asyncio import logging from typing import List, Dict, Optional from datetime import datetime, timedelta try: from .binance_client import BinanceClient from .market_scanner import MarketScanner from .risk_manager import RiskManager from . import config except ImportError: from binance_client import BinanceClient from market_scanner import MarketScanner from risk_manager import RiskManager import config logger = logging.getLogger(__name__) # 尝试导入数据库模型 DB_AVAILABLE = False TradeRecommendation = None try: import sys from pathlib import Path project_root = Path(__file__).parent.parent backend_path = project_root / 'backend' if backend_path.exists(): sys.path.insert(0, str(backend_path)) from database.models import TradeRecommendation DB_AVAILABLE = True logger.info("✓ 数据库模型导入成功,推荐记录将保存到数据库") else: logger.warning("⚠ backend目录不存在,无法使用数据库功能") DB_AVAILABLE = False except ImportError as e: logger.warning(f"⚠ 无法导入数据库模型: {e}") DB_AVAILABLE = False except Exception as e: logger.warning(f"⚠ 数据库初始化失败: {e}") DB_AVAILABLE = False class TradeRecommender: """推荐交易对生成器""" def __init__( self, client: BinanceClient, scanner: MarketScanner, risk_manager: RiskManager ): """ 初始化推荐器 Args: client: 币安客户端 scanner: 市场扫描器 risk_manager: 风险管理器 """ self.client = client self.scanner = scanner self.risk_manager = risk_manager async def generate_recommendations( self, min_signal_strength: int = 5, max_recommendations: int = 20 ) -> List[Dict]: """ 生成交易推荐 Args: min_signal_strength: 最小信号强度(默认5,低于此强度的不推荐) max_recommendations: 最大推荐数量 Returns: 推荐列表 """ logger.info("开始生成交易推荐...") # 1. 扫描市场 top_symbols = await self.scanner.scan_market() if not top_symbols: logger.warning("未找到符合条件的交易对") return [] recommendations = [] # 2. 对每个交易对进行分析 for symbol_info in top_symbols: if len(recommendations) >= max_recommendations: break symbol = symbol_info['symbol'] current_price = symbol_info['price'] change_percent = symbol_info.get('changePercent', 0) # 3. 分析交易信号(使用策略模块的逻辑) trade_signal = await self._analyze_trade_signal(symbol_info) # 4. 如果信号强度足够,生成推荐 if trade_signal['should_trade'] and trade_signal['strength'] >= min_signal_strength: recommendation = await self._create_recommendation( symbol_info, trade_signal ) if recommendation: recommendations.append(recommendation) logger.info(f"生成了 {len(recommendations)} 个交易推荐") return recommendations async def _analyze_trade_signal(self, symbol_info: Dict) -> Dict: """ 分析交易信号(复用策略模块的逻辑) Args: symbol_info: 交易对信息 Returns: 交易信号字典 """ symbol = symbol_info['symbol'] current_price = symbol_info['price'] rsi = symbol_info.get('rsi') macd = symbol_info.get('macd') bollinger = symbol_info.get('bollinger') market_regime = symbol_info.get('marketRegime', 'unknown') ema20 = symbol_info.get('ema20') ema50 = symbol_info.get('ema50') ema20_4h = symbol_info.get('ema20_4h') price_4h = symbol_info.get('price_4h', current_price) # 判断4H周期趋势方向 trend_4h = None if ema20_4h is not None: if price_4h > ema20_4h: trend_4h = 'up' elif price_4h < ema20_4h: trend_4h = 'down' else: trend_4h = 'neutral' signal_strength = 0 reasons = [] direction = None # 策略1:均值回归(震荡市场) if market_regime == 'ranging': if rsi and rsi < 30: if trend_4h in ('up', 'neutral', None): signal_strength += 4 reasons.append(f"RSI超卖({rsi:.1f})") if direction is None: direction = 'BUY' elif rsi and rsi > 70: if trend_4h in ('down', 'neutral', None): signal_strength += 4 reasons.append(f"RSI超买({rsi:.1f})") if direction is None: direction = 'SELL' if bollinger and current_price <= bollinger.get('lower'): if trend_4h in ('up', 'neutral', None): signal_strength += 3 reasons.append("触及布林带下轨") if direction is None: direction = 'BUY' elif bollinger and current_price >= bollinger.get('upper'): if trend_4h in ('down', 'neutral', None): signal_strength += 3 reasons.append("触及布林带上轨") if direction is None: direction = 'SELL' # 策略2:趋势跟踪(趋势市场) elif market_regime == 'trending': if macd and macd.get('macd', 0) > macd.get('signal', 0) and macd.get('histogram', 0) > 0: if trend_4h in ('up', 'neutral', None): signal_strength += 3 reasons.append("MACD金叉") if direction is None: direction = 'BUY' elif macd and macd.get('macd', 0) < macd.get('signal', 0) and macd.get('histogram', 0) < 0: if trend_4h in ('down', 'neutral', None): signal_strength += 3 reasons.append("MACD死叉") if direction is None: direction = 'SELL' if ema20 and ema50: if current_price > ema20 > ema50: if trend_4h in ('up', 'neutral', None): signal_strength += 2 reasons.append("价格在均线之上") if direction is None: direction = 'BUY' elif current_price < ema20 < ema50: if trend_4h in ('down', 'neutral', None): signal_strength += 2 reasons.append("价格在均线之下") if direction is None: direction = 'SELL' # 多周期共振加分 if direction and trend_4h: if (direction == 'BUY' and trend_4h == 'up') or (direction == 'SELL' and trend_4h == 'down'): signal_strength += 2 reasons.append("4H周期共振确认") # 判断是否应该交易 min_signal_strength = config.TRADING_CONFIG.get('MIN_SIGNAL_STRENGTH', 7) should_trade = signal_strength >= min_signal_strength # 禁止逆4H趋势交易 if direction and trend_4h: if (direction == 'BUY' and trend_4h == 'down') or (direction == 'SELL' and trend_4h == 'up'): should_trade = False reasons.append("❌ 禁止逆4H趋势交易") return { 'should_trade': should_trade, 'direction': direction, 'reason': ', '.join(reasons) if reasons else '无明确信号', 'strength': signal_strength, 'trend_4h': trend_4h } async def _create_recommendation( self, symbol_info: Dict, trade_signal: Dict ) -> Optional[Dict]: """ 创建推荐记录 Args: symbol_info: 交易对信息 trade_signal: 交易信号 Returns: 推荐字典 """ try: symbol = symbol_info['symbol'] current_price = symbol_info['price'] direction = trade_signal['direction'] # 计算建议的止损止盈 entry_price = current_price stop_loss_price = self.risk_manager.get_stop_loss_price( entry_price, direction, klines=symbol_info.get('klines'), bollinger=symbol_info.get('bollinger'), atr=symbol_info.get('atr') ) # 计算止损百分比 if direction == 'BUY': stop_loss_pct = (entry_price - stop_loss_price) / entry_price else: stop_loss_pct = (stop_loss_price - entry_price) / entry_price # 第一目标:盈亏比1:1 if direction == 'BUY': take_profit_1 = entry_price + (entry_price - stop_loss_price) else: take_profit_1 = entry_price - (stop_loss_price - entry_price) # 第二目标:止损的2.5倍 take_profit_2_pct = stop_loss_pct * 2.5 take_profit_2 = self.risk_manager.get_take_profit_price( entry_price, direction, take_profit_pct=take_profit_2_pct ) # 建议仓位(根据信号强度调整) base_position_pct = config.TRADING_CONFIG.get('MAX_POSITION_PERCENT', 0.05) signal_strength = trade_signal['strength'] # 信号强度越高,建议仓位可以适当增加(但不超过1.5倍) position_multiplier = min(1.0 + (signal_strength - 5) * 0.1, 1.5) suggested_position_pct = base_position_pct * position_multiplier # 计算建议的挂单价(使用限价单,而不是市价单) # 对于做多:建议价格略低于当前价格(当前价格的99.5%),以便在回调时买入 # 对于做空:建议价格略高于当前价格(当前价格的100.5%),以便在反弹时卖出 limit_price_offset_pct = config.TRADING_CONFIG.get('LIMIT_ORDER_OFFSET_PCT', 0.5) # 默认0.5% if direction == 'BUY': suggested_limit_price = current_price * (1 - limit_price_offset_pct / 100) else: # SELL suggested_limit_price = current_price * (1 + limit_price_offset_pct / 100) # 准备推荐数据 recommendation_data = { 'symbol': symbol, 'direction': direction, 'current_price': current_price, 'change_percent': symbol_info.get('changePercent', 0), 'recommendation_reason': trade_signal['reason'], 'signal_strength': signal_strength, 'market_regime': symbol_info.get('marketRegime'), 'trend_4h': trade_signal.get('trend_4h'), 'rsi': symbol_info.get('rsi'), 'macd_histogram': symbol_info.get('macd', {}).get('histogram') if symbol_info.get('macd') else None, 'bollinger_upper': symbol_info.get('bollinger', {}).get('upper') if symbol_info.get('bollinger') else None, 'bollinger_middle': symbol_info.get('bollinger', {}).get('middle') if symbol_info.get('bollinger') else None, 'bollinger_lower': symbol_info.get('bollinger', {}).get('lower') if symbol_info.get('bollinger') else None, 'ema20': symbol_info.get('ema20'), 'ema50': symbol_info.get('ema50'), 'ema20_4h': symbol_info.get('ema20_4h'), 'atr': symbol_info.get('atr'), 'suggested_stop_loss': stop_loss_price, 'suggested_take_profit_1': take_profit_1, 'suggested_take_profit_2': take_profit_2, 'suggested_position_percent': suggested_position_pct, 'suggested_leverage': config.TRADING_CONFIG.get('LEVERAGE', 10), 'volume_24h': symbol_info.get('volume24h'), 'volatility': symbol_info.get('volatility'), 'order_type': 'LIMIT', # 使用限价单 'suggested_limit_price': suggested_limit_price # 建议的挂单价 } # 不再自动保存到数据库,只返回推荐数据 # 只有用户在前端点击"标记"时才会保存到数据库(用于复盘) logger.debug( f"✓ 生成推荐: {symbol} {direction} " f"(信号强度: {signal_strength}/10)" ) return recommendation_data except Exception as e: logger.error(f"创建推荐失败 {symbol_info.get('symbol', 'unknown')}: {e}") return None async def get_active_recommendations(self) -> List[Dict]: """获取当前有效的推荐""" if DB_AVAILABLE and TradeRecommendation: return TradeRecommendation.get_active() return [] async def mark_recommendation_executed(self, recommendation_id: int, trade_id: int = None): """标记推荐已执行""" if DB_AVAILABLE and TradeRecommendation: TradeRecommendation.mark_executed(recommendation_id, trade_id) logger.info(f"推荐 {recommendation_id} 已标记为已执行")