""" 推荐交易对模块 - 生成交易推荐供手动参考 """ import asyncio import logging import time from typing import List, Dict, Optional from datetime import datetime, timedelta try: from .binance_client import BinanceClient from .market_scanner import MarketScanner from .risk_manager import RiskManager from . import config except ImportError: from binance_client import BinanceClient from market_scanner import MarketScanner from risk_manager import RiskManager import config logger = logging.getLogger(__name__) # 尝试导入数据库模型 DB_AVAILABLE = False TradeRecommendation = None try: import sys from pathlib import Path project_root = Path(__file__).parent.parent backend_path = project_root / 'backend' if backend_path.exists(): sys.path.insert(0, str(backend_path)) from database.models import TradeRecommendation DB_AVAILABLE = True logger.info("✓ 数据库模型导入成功,推荐记录将保存到数据库") else: logger.warning("⚠ backend目录不存在,无法使用数据库功能") DB_AVAILABLE = False except ImportError as e: logger.warning(f"⚠ 无法导入数据库模型: {e}") DB_AVAILABLE = False except Exception as e: logger.warning(f"⚠ 数据库初始化失败: {e}") DB_AVAILABLE = False class TradeRecommender: """推荐交易对生成器""" def __init__( self, client: BinanceClient, scanner: MarketScanner, risk_manager: RiskManager ): """ 初始化推荐器 Args: client: 币安客户端 scanner: 市场扫描器 risk_manager: 风险管理器 """ self.client = client self.scanner = scanner self.risk_manager = risk_manager async def generate_recommendations( self, min_signal_strength: int = 5, max_recommendations: int = 20, add_to_cache: bool = True, min_quality_score: float = 0.0 ) -> List[Dict]: """ 生成交易推荐(支持增量添加到Redis缓存) Args: min_signal_strength: 最小信号强度(默认5,低于此强度的不推荐) max_recommendations: 最大推荐数量(单次生成) add_to_cache: 是否添加到Redis缓存(默认True) min_quality_score: 最小质量分数(用于过滤,默认0.0表示不过滤) Returns: 推荐列表 """ logger.info("开始生成交易推荐...") # 获取账户余额(用于计算盈亏评估) try: balance = await self.client.get_account_balance() account_balance = balance.get('total', 1000) logger.debug(f"账户余额: {account_balance:.2f} USDT") except Exception as e: account_balance = 1000 logger.warning(f"获取账户余额失败,使用默认值1000 USDT: {e}") # 1. 从Redis读取现有推荐(如果启用缓存) existing_recommendations = {} if add_to_cache: try: # 从Redis Hash读取所有现有推荐 cache_key = "recommendations:realtime" existing_data = await self.client.redis_cache.hgetall(cache_key) for rec_key, rec_data in existing_data.items(): if isinstance(rec_data, dict): existing_recommendations[rec_key] = rec_data logger.info(f"从Redis读取到 {len(existing_recommendations)} 个现有推荐") except Exception as e: logger.debug(f"从Redis读取现有推荐失败: {e}") # 2. 扫描市场 top_symbols = await self.scanner.scan_market() if not top_symbols: logger.warning("未找到符合条件的交易对") # 如果市场扫描失败,返回现有缓存推荐 if existing_recommendations: return list(existing_recommendations.values()) return [] new_recommendations = [] # 3. 对每个交易对进行分析 for symbol_info in top_symbols: symbol = symbol_info['symbol'] current_price = symbol_info['price'] # 4. 分析交易信号(传入min_signal_strength参数) trade_signal = await self._analyze_trade_signal(symbol_info, min_signal_strength=min_signal_strength) # 5. 如果信号强度足够,生成推荐 if trade_signal['should_trade'] and trade_signal['strength'] >= min_signal_strength: recommendations_result = await self._create_recommendation( symbol_info, trade_signal, account_balance ) if recommendations_result: # _create_recommendation 返回列表(限价单和市价单) if isinstance(recommendations_result, list): new_recommendations.extend(recommendations_result) else: new_recommendations.append(recommendations_result) # 6. 合并新推荐和现有推荐 all_recommendations = {} current_time = time.time() max_age = 3600 * 2 # 推荐最大保留时间:2小时 # 先添加现有推荐(过滤掉过期的) for rec_key, rec_data in existing_recommendations.items(): rec_timestamp = rec_data.get('timestamp', 0) # 如果推荐时间超过2小时,跳过 if current_time - rec_timestamp > max_age: continue all_recommendations[rec_key] = rec_data # 添加新推荐(如果质量足够) for rec in new_recommendations: # 生成推荐键:symbol_orderType rec_key = f"{rec['symbol']}_{rec.get('order_type', 'LIMIT')}" # 计算质量分数(信号强度 + 胜率预估) quality_score = rec.get('signal_strength', 0) * 10 if 'estimated_win_rate' in rec: quality_score += rec['estimated_win_rate'] # 如果质量分数足够,才添加或更新 if quality_score >= min_quality_score: # 如果已存在,比较质量分数,只保留更好的 if rec_key in all_recommendations: existing_quality = all_recommendations[rec_key].get('signal_strength', 0) * 10 if 'estimated_win_rate' in all_recommendations[rec_key]: existing_quality += all_recommendations[rec_key]['estimated_win_rate'] # 如果新推荐质量更好,更新 if quality_score > existing_quality: all_recommendations[rec_key] = rec logger.debug(f"更新推荐 {rec_key}: 质量分数 {existing_quality:.1f} -> {quality_score:.1f}") else: # 新推荐,直接添加 all_recommendations[rec_key] = rec logger.debug(f"添加新推荐 {rec_key}: 质量分数 {quality_score:.1f}") # 7. 按信号强度排序,保留质量最好的推荐 sorted_recommendations = sorted( all_recommendations.values(), key=lambda x: (x.get('signal_strength', 0), x.get('estimated_win_rate', 0)), reverse=True ) # 限制总数(保留质量最好的) max_total = max_recommendations * 3 # 允许更多推荐以便参考 final_recommendations = sorted_recommendations[:max_total] # 8. 保存到Redis缓存(如果启用) if add_to_cache: try: cache_key = "recommendations:realtime" # 增量更新:只更新或添加新的推荐,保留其他推荐 updated_count = 0 for rec in final_recommendations: rec_key = f"{rec['symbol']}_{rec.get('order_type', 'LIMIT')}" # 使用hset更新或添加(如果不存在则创建) await self.client.redis_cache.hset(cache_key, rec_key, rec, ttl=3600) # TTL 1小时 updated_count += 1 # 设置整个Hash的TTL(1小时) if self.client.redis_cache.redis and self.client.redis_cache._connected: try: await self.client.redis_cache.redis.expire(cache_key, 3600) except: pass logger.info(f"已更新 {updated_count} 个推荐到Redis缓存(总计 {len(final_recommendations)} 个)") except Exception as e: logger.warning(f"保存推荐到Redis失败: {e}") logger.info(f"生成了 {len(new_recommendations)} 个新推荐,总计 {len(final_recommendations)} 个推荐(包含缓存)") return final_recommendations async def _analyze_trade_signal(self, symbol_info: Dict, min_signal_strength: int = None) -> Dict: """ 分析交易信号(复用策略模块的逻辑) Args: symbol_info: 交易对信息 Returns: 交易信号字典 """ symbol = symbol_info['symbol'] current_price = symbol_info['price'] rsi = symbol_info.get('rsi') macd = symbol_info.get('macd') bollinger = symbol_info.get('bollinger') market_regime = symbol_info.get('marketRegime', 'unknown') ema20 = symbol_info.get('ema20') ema50 = symbol_info.get('ema50') ema20_4h = symbol_info.get('ema20_4h') price_4h = symbol_info.get('price_4h', current_price) # 判断4H周期趋势方向 trend_4h = None if ema20_4h is not None: if price_4h > ema20_4h: trend_4h = 'up' elif price_4h < ema20_4h: trend_4h = 'down' else: trend_4h = 'neutral' # 基础分数:即使没有明确信号,也给1分基础分(推荐系统更宽松) signal_strength = 1 reasons = [] direction = None # 策略1:均值回归(震荡市场) if market_regime == 'ranging': if rsi and rsi < 30: if trend_4h in ('up', 'neutral', None): signal_strength += 4 reasons.append(f"RSI超卖({rsi:.1f})") if direction is None: direction = 'BUY' elif rsi and rsi > 70: if trend_4h in ('down', 'neutral', None): signal_strength += 4 reasons.append(f"RSI超买({rsi:.1f})") if direction is None: direction = 'SELL' if bollinger and current_price <= bollinger.get('lower'): if trend_4h in ('up', 'neutral', None): signal_strength += 3 reasons.append("触及布林带下轨") if direction is None: direction = 'BUY' elif bollinger and current_price >= bollinger.get('upper'): if trend_4h in ('down', 'neutral', None): signal_strength += 3 reasons.append("触及布林带上轨") if direction is None: direction = 'SELL' # 策略2:趋势跟踪(趋势市场) elif market_regime == 'trending': if macd and macd.get('macd', 0) > macd.get('signal', 0) and macd.get('histogram', 0) > 0: if trend_4h in ('up', 'neutral', None): signal_strength += 3 reasons.append("MACD金叉") if direction is None: direction = 'BUY' elif macd and macd.get('macd', 0) < macd.get('signal', 0) and macd.get('histogram', 0) < 0: if trend_4h in ('down', 'neutral', None): signal_strength += 3 reasons.append("MACD死叉") if direction is None: direction = 'SELL' if ema20 and ema50: if current_price > ema20 > ema50: if trend_4h in ('up', 'neutral', None): signal_strength += 2 reasons.append("价格在均线之上") if direction is None: direction = 'BUY' elif current_price < ema20 < ema50: if trend_4h in ('down', 'neutral', None): signal_strength += 2 reasons.append("价格在均线之下") if direction is None: direction = 'SELL' # 多周期共振加分 if direction and trend_4h: if (direction == 'BUY' and trend_4h == 'up') or (direction == 'SELL' and trend_4h == 'down'): signal_strength += 2 reasons.append("4H周期共振确认") # 判断是否应该交易(使用传入的参数,如果没有则使用config中的值) if min_signal_strength is None: min_signal_strength = config.TRADING_CONFIG.get('MIN_SIGNAL_STRENGTH', 7) should_trade = signal_strength >= min_signal_strength # 对于推荐系统,允许逆4H趋势交易(但降低信号强度,标记为高风险) if direction and trend_4h: if (direction == 'BUY' and trend_4h == 'down') or (direction == 'SELL' and trend_4h == 'up'): # 推荐系统允许逆趋势,但降低信号强度(减2分) signal_strength = max(0, signal_strength - 2) reasons.append("⚠️ 逆4H趋势(高风险)") # 不禁止,但标记为高风险 return { 'should_trade': should_trade, 'direction': direction, 'reason': ', '.join(reasons) if reasons else '无明确信号', 'strength': signal_strength, 'trend_4h': trend_4h } async def _create_recommendation( self, symbol_info: Dict, trade_signal: Dict, account_balance: float = 1000 ) -> Optional[List[Dict]]: """ 创建推荐记录 Args: symbol_info: 交易对信息 trade_signal: 交易信号 Returns: 推荐字典 """ try: symbol = symbol_info['symbol'] current_price = symbol_info['price'] direction = trade_signal['direction'] # 计算建议的止损止盈(基于保证金) entry_price = current_price # 估算仓位数量和杠杆(用于计算止损止盈) # 重要语义:suggested_position_pct 表示“保证金占用比例” account_balance = symbol_info.get('account_balance', 1000) suggested_position_pct = symbol_info.get('suggested_position_pct', 0.05) leverage = config.TRADING_CONFIG.get('LEVERAGE', 10) # 估算保证金与名义价值 estimated_margin = account_balance * suggested_position_pct estimated_notional = estimated_margin * leverage if leverage and leverage > 0 else estimated_margin estimated_quantity = estimated_notional / entry_price if entry_price > 0 else 0 # 计算基于保证金的止损止盈 stop_loss_pct_margin = config.TRADING_CONFIG.get('STOP_LOSS_PERCENT', 0.08) take_profit_pct_margin = config.TRADING_CONFIG.get('TAKE_PROFIT_PERCENT', 0.15) stop_loss_price = self.risk_manager.get_stop_loss_price( entry_price, direction, estimated_quantity, leverage, stop_loss_pct=stop_loss_pct_margin, klines=symbol_info.get('klines'), bollinger=symbol_info.get('bollinger'), atr=symbol_info.get('atr') ) # 计算止损百分比(相对于保证金,用于显示) stop_loss_amount = estimated_margin * stop_loss_pct_margin if direction == 'BUY': stop_loss_pct = (entry_price - stop_loss_price) / entry_price else: stop_loss_pct = (stop_loss_price - entry_price) / entry_price # 第一目标:盈亏比1:1(相对于保证金) take_profit_1_pct_margin = stop_loss_pct_margin * 1.0 # 1:1 盈亏比 take_profit_1 = self.risk_manager.get_take_profit_price( entry_price, direction, estimated_quantity, leverage, take_profit_pct=take_profit_1_pct_margin ) # 第二目标:止损的2.5倍(相对于保证金) take_profit_2_pct_margin = stop_loss_pct_margin * 2.5 take_profit_2 = self.risk_manager.get_take_profit_price( entry_price, direction, estimated_quantity, leverage, take_profit_pct=take_profit_2_pct_margin ) # 建议仓位(根据信号强度调整) base_position_pct = config.TRADING_CONFIG.get('MAX_POSITION_PERCENT', 0.05) signal_strength = trade_signal['strength'] # 信号强度越高,建议仓位可以适当增加(但不超过1.5倍) position_multiplier = min(1.0 + (signal_strength - 5) * 0.1, 1.5) suggested_position_pct = base_position_pct * position_multiplier # 计算建议的挂单价(使用限价单,而不是市价单) # 对于做多:建议价格略低于当前价格(当前价格的99.5%),以便在回调时买入 # 对于做空:建议价格略高于当前价格(当前价格的100.5%),以便在反弹时卖出 limit_price_offset_pct = config.TRADING_CONFIG.get('LIMIT_ORDER_OFFSET_PCT', 0.5) # 默认0.5% if direction == 'BUY': suggested_limit_price = current_price * (1 - limit_price_offset_pct / 100) else: # SELL suggested_limit_price = current_price * (1 + limit_price_offset_pct / 100) # 添加时间戳 timestamp = time.time() recommendation_time = datetime.now().isoformat() # 计算胜率预估(基于信号强度、市场状态等) base_win_rate = 40 + (signal_strength * 3) # 信号强度0-10,对应胜率40-70% market_regime = symbol_info.get('marketRegime', 'ranging') if market_regime == 'ranging': base_win_rate += 5 elif market_regime == 'trending': base_win_rate += 3 trend_4h = trade_signal.get('trend_4h') if trend_4h: if (direction == 'BUY' and trend_4h == 'up') or (direction == 'SELL' and trend_4h == 'down'): base_win_rate += 3 elif (direction == 'BUY' and trend_4h == 'down') or (direction == 'SELL' and trend_4h == 'up'): base_win_rate -= 2 estimated_win_rate = max(35, min(75, base_win_rate)) # 计算盈亏USDT评估(基于保证金) margin = account_balance * suggested_position_pct # 止损止盈金额(相对于保证金) stop_loss_usdt = margin * stop_loss_pct_margin take_profit_1_usdt = margin * take_profit_1_pct_margin take_profit_2_usdt = margin * take_profit_2_pct_margin expected_pnl_1 = (estimated_win_rate / 100) * take_profit_1_usdt - ((100 - estimated_win_rate) / 100) * stop_loss_usdt expected_pnl_2 = (estimated_win_rate / 100) * take_profit_2_usdt - ((100 - estimated_win_rate) / 100) * stop_loss_usdt # 分类推荐类型和风险等级 recommendation_category, risk_level, simple_reason, risk_warning, trading_tutorial = self._classify_recommendation( market_regime, trend_4h, direction, trade_signal, signal_strength ) # 计算预期持仓时间(基于策略周期和市场状态) expected_hold_time = self._estimate_hold_time(market_regime, trend_4h, direction, signal_strength) # 生成用户指南(人话版计划) user_guide = self._generate_user_guide( symbol, direction, suggested_limit_price, stop_loss_price, take_profit_1, take_profit_2, simple_reason, expected_hold_time, risk_warning, recommendation_category, current_price ) # 基础推荐数据 base_data = { 'symbol': symbol, 'direction': direction, 'current_price': current_price, 'change_percent': symbol_info.get('changePercent', 0), 'recommendation_reason': trade_signal['reason'], 'signal_strength': signal_strength, 'market_regime': market_regime, 'trend_4h': trend_4h, 'rsi': symbol_info.get('rsi'), 'macd_histogram': symbol_info.get('macd', {}).get('histogram') if symbol_info.get('macd') else None, 'bollinger_upper': symbol_info.get('bollinger', {}).get('upper') if symbol_info.get('bollinger') else None, 'bollinger_middle': symbol_info.get('bollinger', {}).get('middle') if symbol_info.get('bollinger') else None, 'bollinger_lower': symbol_info.get('bollinger', {}).get('lower') if symbol_info.get('bollinger') else None, 'ema20': symbol_info.get('ema20'), 'ema50': symbol_info.get('ema50'), 'ema20_4h': symbol_info.get('ema20_4h'), 'atr': symbol_info.get('atr'), 'suggested_stop_loss': stop_loss_price, 'suggested_take_profit_1': take_profit_1, 'suggested_take_profit_2': take_profit_2, 'suggested_position_percent': suggested_position_pct, 'suggested_leverage': config.TRADING_CONFIG.get('LEVERAGE', 10), 'volume_24h': symbol_info.get('volume24h'), 'volatility': symbol_info.get('volatility'), 'estimated_win_rate': round(estimated_win_rate, 1), 'expected_pnl_1': round(expected_pnl_1, 2), 'expected_pnl_2': round(expected_pnl_2, 2), 'stop_loss_usdt': round(stop_loss_usdt, 2), 'take_profit_1_usdt': round(take_profit_1_usdt, 2), 'take_profit_2_usdt': round(take_profit_2_usdt, 2), 'recommendation_time': recommendation_time, 'timestamp': timestamp, # 新增字段:用户指南和分类 'user_guide': user_guide, 'recommendation_category': recommendation_category, 'risk_level': risk_level, 'expected_hold_time': expected_hold_time, 'trading_tutorial': trading_tutorial, 'max_hold_days': 3 # 最大持仓天数(超过此天数建议平仓) } # 限价单推荐(唯一推荐类型) limit_recommendation = base_data.copy() limit_recommendation.update({ 'order_type': 'LIMIT', 'suggested_limit_price': suggested_limit_price }) logger.debug( f"✓ 生成推荐: {symbol} {direction} " f"(信号强度: {signal_strength}/10, 胜率预估: {estimated_win_rate:.1f}%, " f"分类: {recommendation_category}, 风险: {risk_level})" ) return [limit_recommendation] except Exception as e: logger.error(f"创建推荐失败 {symbol_info.get('symbol', 'unknown')}: {e}", exc_info=True) return None def _classify_recommendation( self, market_regime: str, trend_4h: Optional[str], direction: str, trade_signal: Dict, signal_strength: int ) -> tuple: """ 分类推荐类型和风险等级 Returns: (recommendation_category, risk_level, simple_reason, risk_warning, trading_tutorial) """ # 判断是否为逆趋势交易 is_counter_trend = False if trend_4h: if (direction == 'BUY' and trend_4h == 'down') or (direction == 'SELL' and trend_4h == 'up'): is_counter_trend = True # 分类推荐类型 if market_regime == 'trending': if not is_counter_trend: # 顺趋势突破 category = "顺趋势突破" risk_level = "中等" simple_reason = "趋势明确,顺势而为,胜率较高" risk_warning = "趋势可能反转,注意止损保护" tutorial = "此类交易适合跟随趋势,持仓时间可稍长,但需关注趋势是否持续" else: # 逆趋势反弹(高风险) category = "逆势反弹(高风险)" risk_level = "高" simple_reason = "逆趋势操作,风险较高,适合快进快出" risk_warning = "⚠️ 高风险:逆趋势交易胜率较低,务必严格止损,快进快出" tutorial = "此类交易胜率较低,仅适合短线快进快出,务必严格止损。建议持仓不超过数小时,一旦触及止损立即离场" elif market_regime == 'ranging': # 震荡区间操作 category = "震荡区间操作" risk_level = "中等" simple_reason = "市场震荡,在区间边界操作,适合高抛低吸" risk_warning = "震荡可能转为趋势,注意突破风险" tutorial = "此类交易适合在震荡区间边界操作,预期持仓时间较短(数小时至1天),到达目标及时止盈" else: # 未知市场状态 category = "市场状态不明" risk_level = "中高" simple_reason = "市场状态不明确,谨慎操作" risk_warning = "市场状态不明,建议降低仓位或等待更明确信号" tutorial = "市场状态不明确,建议谨慎操作,降低仓位,严格止损" # 根据信号强度调整风险等级 if signal_strength >= 7: if risk_level == "高": risk_level = "中高" elif risk_level == "中等": risk_level = "低中" elif signal_strength < 5: if risk_level == "中等": risk_level = "中高" elif risk_level == "低中": risk_level = "中等" return category, risk_level, simple_reason, risk_warning, tutorial def _estimate_hold_time( self, market_regime: str, trend_4h: Optional[str], direction: str, signal_strength: int ) -> str: """ 估算预期持仓时间 Returns: 预期持仓时间的文字描述 """ # 基于策略周期(1H/15min)和市场状态估算 if market_regime == 'trending': # 趋势市场,持仓时间可能较长 if signal_strength >= 7: return "预期持仓:数小时至2天(趋势明确,可适当延长持仓)" else: return "预期持仓:数小时至1天(趋势较弱,及时止盈)" elif market_regime == 'ranging': # 震荡市场,持仓时间较短 return "预期持仓:数小时至半天(震荡市场,快进快出)" else: # 未知状态,保守估计 return "预期持仓:数小时(市场状态不明,谨慎持仓)" def _generate_user_guide( self, symbol: str, direction: str, limit_price: float, stop_loss: float, tp1: float, tp2: float, simple_reason: str, expected_hold_time: str, risk_warning: str, category: str, current_price: float ) -> str: """ 生成用户指南(人话版计划) Args: symbol: 交易对 direction: 方向 limit_price: 建议入场价(限价单) stop_loss: 止损价 tp1: 第一目标止盈价 tp2: 第二目标止盈价 simple_reason: 简单原因 expected_hold_time: 预期持仓时间 risk_warning: 风险警告 category: 推荐分类 current_price: 当前价格(用于计算反向波动) Returns: 用户指南文本 """ direction_cn = "买入" if direction == 'BUY' else "卖出" direction_action = "挂单买入" if direction == 'BUY' else "挂单卖出" # 计算反向波动阈值(2%) if direction == 'BUY': # 买单:如果价格下跌超过2%,建议取消 reverse_threshold = current_price * 0.98 reverse_direction = "下跌" else: # 卖单:如果价格上涨超过2%,建议取消 reverse_threshold = current_price * 1.02 reverse_direction = "上涨" user_guide = f"""【操作计划】{direction_cn} {symbol} 【推荐类型】{category} 【核心理由】{simple_reason} 【明确的入场价】 建议在 {limit_price:.4f} USDT 附近{direction_action} 【具体点位】 • 建议挂单价: {limit_price:.4f} USDT • 止损价: {stop_loss:.4f} USDT • 第一目标: {tp1:.4f} USDT(盈亏比1:1) • 第二目标: {tp2:.4f} USDT(盈亏比2.5:1) 【持仓周期】{expected_hold_time} 【退出条件】 • 触及止损:立即平仓 • 触及第一目标:可部分止盈或全部止盈 • 触及第二目标:建议全部止盈 • 持仓超过3天未触及第一目标:建议平仓离场重新评估 【订单失效条件】 此限价单建议当日有效。若价格未触及挂单价,但价格直接{reverse_direction}超过2%({reverse_threshold:.4f} USDT),则建议取消订单,等待新信号。 【关键提醒】{risk_warning} 【给主动交易者的提示】 如果您确信趋势已启动,也可考虑以市价单立即入场,但需承受更高滑点成本,且务必设置好止损。""" return user_guide async def get_active_recommendations(self) -> List[Dict]: """获取当前有效的推荐""" if DB_AVAILABLE and TradeRecommendation: return TradeRecommendation.get_active() return [] async def mark_recommendation_executed(self, recommendation_id: int, trade_id: int = None): """标记推荐已执行""" if DB_AVAILABLE and TradeRecommendation: TradeRecommendation.mark_executed(recommendation_id, trade_id) logger.info(f"推荐 {recommendation_id} 已标记为已执行")