""" 推荐交易对模块 - 生成交易推荐供手动参考 """ import asyncio import logging import time from typing import List, Dict, Optional from datetime import datetime, timedelta try: from .binance_client import BinanceClient from .market_scanner import MarketScanner from .risk_manager import RiskManager from . import config except ImportError: from binance_client import BinanceClient from market_scanner import MarketScanner from risk_manager import RiskManager import config logger = logging.getLogger(__name__) # 尝试导入数据库模型 DB_AVAILABLE = False TradeRecommendation = None try: import sys from pathlib import Path project_root = Path(__file__).parent.parent backend_path = project_root / 'backend' if backend_path.exists(): sys.path.insert(0, str(backend_path)) from database.models import TradeRecommendation DB_AVAILABLE = True logger.info("✓ 数据库模型导入成功,推荐记录将保存到数据库") else: logger.warning("⚠ backend目录不存在,无法使用数据库功能") DB_AVAILABLE = False except ImportError as e: logger.warning(f"⚠ 无法导入数据库模型: {e}") DB_AVAILABLE = False except Exception as e: logger.warning(f"⚠ 数据库初始化失败: {e}") DB_AVAILABLE = False class TradeRecommender: """推荐交易对生成器""" def __init__( self, client: BinanceClient, scanner: MarketScanner, risk_manager: RiskManager ): """ 初始化推荐器 Args: client: 币安客户端 scanner: 市场扫描器 risk_manager: 风险管理器 """ self.client = client self.scanner = scanner self.risk_manager = risk_manager async def generate_recommendations( self, min_signal_strength: int = 5, max_recommendations: int = 20, add_to_cache: bool = True, min_quality_score: float = 0.0 ) -> List[Dict]: """ 生成交易推荐(支持增量添加到Redis缓存) Args: min_signal_strength: 最小信号强度(默认5,低于此强度的不推荐) max_recommendations: 最大推荐数量(单次生成) add_to_cache: 是否添加到Redis缓存(默认True) min_quality_score: 最小质量分数(用于过滤,默认0.0表示不过滤) Returns: 推荐列表 """ logger.info("开始生成交易推荐...") # 获取账户余额(用于计算盈亏评估) try: balance = await self.client.get_account_balance() account_balance = balance.get('total', 1000) logger.debug(f"账户余额: {account_balance:.2f} USDT") except Exception as e: account_balance = 1000 logger.warning(f"获取账户余额失败,使用默认值1000 USDT: {e}") # 1. 从Redis读取现有推荐(如果启用缓存) existing_recommendations = {} if add_to_cache: try: # 从Redis Hash读取所有现有推荐 cache_key = "recommendations:realtime" existing_data = await self.client.redis_cache.hgetall(cache_key) for rec_key, rec_data in existing_data.items(): if isinstance(rec_data, dict): existing_recommendations[rec_key] = rec_data logger.info(f"从Redis读取到 {len(existing_recommendations)} 个现有推荐") except Exception as e: logger.debug(f"从Redis读取现有推荐失败: {e}") # 2. 扫描市场 top_symbols = await self.scanner.scan_market() if not top_symbols: logger.warning("未找到符合条件的交易对") # 如果市场扫描失败,返回现有缓存推荐 if existing_recommendations: return list(existing_recommendations.values()) return [] new_recommendations = [] # 3. 对每个交易对进行分析 for symbol_info in top_symbols: symbol = symbol_info['symbol'] current_price = symbol_info['price'] # 4. 分析交易信号(传入min_signal_strength参数) trade_signal = await self._analyze_trade_signal(symbol_info, min_signal_strength=min_signal_strength) # 5. 如果信号强度足够,生成推荐 if trade_signal['should_trade'] and trade_signal['strength'] >= min_signal_strength: recommendations_result = await self._create_recommendation( symbol_info, trade_signal, account_balance ) if recommendations_result: # _create_recommendation 返回列表(限价单和市价单) if isinstance(recommendations_result, list): new_recommendations.extend(recommendations_result) else: new_recommendations.append(recommendations_result) # 6. 合并新推荐和现有推荐 all_recommendations = {} current_time = time.time() max_age = 3600 * 2 # 推荐最大保留时间:2小时 # 先添加现有推荐(过滤掉过期的) for rec_key, rec_data in existing_recommendations.items(): rec_timestamp = rec_data.get('timestamp', 0) # 如果推荐时间超过2小时,跳过 if current_time - rec_timestamp > max_age: continue all_recommendations[rec_key] = rec_data # 添加新推荐(如果质量足够) for rec in new_recommendations: # 生成推荐键:symbol_orderType rec_key = f"{rec['symbol']}_{rec.get('order_type', 'LIMIT')}" # 计算质量分数(信号强度 + 胜率预估) quality_score = rec.get('signal_strength', 0) * 10 if 'estimated_win_rate' in rec: quality_score += rec['estimated_win_rate'] # 如果质量分数足够,才添加或更新 if quality_score >= min_quality_score: # 如果已存在,比较质量分数,只保留更好的 if rec_key in all_recommendations: existing_quality = all_recommendations[rec_key].get('signal_strength', 0) * 10 if 'estimated_win_rate' in all_recommendations[rec_key]: existing_quality += all_recommendations[rec_key]['estimated_win_rate'] # 如果新推荐质量更好,更新 if quality_score > existing_quality: all_recommendations[rec_key] = rec logger.debug(f"更新推荐 {rec_key}: 质量分数 {existing_quality:.1f} -> {quality_score:.1f}") else: # 新推荐,直接添加 all_recommendations[rec_key] = rec logger.debug(f"添加新推荐 {rec_key}: 质量分数 {quality_score:.1f}") # 7. 按信号强度排序,保留质量最好的推荐 sorted_recommendations = sorted( all_recommendations.values(), key=lambda x: (x.get('signal_strength', 0), x.get('estimated_win_rate', 0)), reverse=True ) # 限制总数(保留质量最好的) max_total = max_recommendations * 3 # 允许更多推荐以便参考 final_recommendations = sorted_recommendations[:max_total] # 8. 保存到Redis缓存(如果启用) if add_to_cache: try: cache_key = "recommendations:realtime" # 增量更新:只更新或添加新的推荐,保留其他推荐 updated_count = 0 for rec in final_recommendations: rec_key = f"{rec['symbol']}_{rec.get('order_type', 'LIMIT')}" # 使用hset更新或添加(如果不存在则创建) await self.client.redis_cache.hset(cache_key, rec_key, rec, ttl=3600) # TTL 1小时 updated_count += 1 # 设置整个Hash的TTL(1小时) if self.client.redis_cache.redis and self.client.redis_cache._connected: try: await self.client.redis_cache.redis.expire(cache_key, 3600) except: pass logger.info(f"已更新 {updated_count} 个推荐到Redis缓存(总计 {len(final_recommendations)} 个)") except Exception as e: logger.warning(f"保存推荐到Redis失败: {e}") logger.info(f"生成了 {len(new_recommendations)} 个新推荐,总计 {len(final_recommendations)} 个推荐(包含缓存)") return final_recommendations async def _analyze_trade_signal(self, symbol_info: Dict, min_signal_strength: int = None) -> Dict: """ 分析交易信号(复用策略模块的逻辑) Args: symbol_info: 交易对信息 Returns: 交易信号字典 """ symbol = symbol_info['symbol'] current_price = symbol_info['price'] rsi = symbol_info.get('rsi') macd = symbol_info.get('macd') bollinger = symbol_info.get('bollinger') market_regime = symbol_info.get('marketRegime', 'unknown') ema20 = symbol_info.get('ema20') ema50 = symbol_info.get('ema50') ema20_4h = symbol_info.get('ema20_4h') price_4h = symbol_info.get('price_4h', current_price) # 判断4H周期趋势方向 trend_4h = None if ema20_4h is not None: if price_4h > ema20_4h: trend_4h = 'up' elif price_4h < ema20_4h: trend_4h = 'down' else: trend_4h = 'neutral' # 基础分数:即使没有明确信号,也给1分基础分(推荐系统更宽松) signal_strength = 1 reasons = [] direction = None # 策略1:均值回归(震荡市场) if market_regime == 'ranging': if rsi and rsi < 30: if trend_4h in ('up', 'neutral', None): signal_strength += 4 reasons.append(f"RSI超卖({rsi:.1f})") if direction is None: direction = 'BUY' elif rsi and rsi > 70: if trend_4h in ('down', 'neutral', None): signal_strength += 4 reasons.append(f"RSI超买({rsi:.1f})") if direction is None: direction = 'SELL' if bollinger and current_price <= bollinger.get('lower'): if trend_4h in ('up', 'neutral', None): signal_strength += 3 reasons.append("触及布林带下轨") if direction is None: direction = 'BUY' elif bollinger and current_price >= bollinger.get('upper'): if trend_4h in ('down', 'neutral', None): signal_strength += 3 reasons.append("触及布林带上轨") if direction is None: direction = 'SELL' # 策略2:趋势跟踪(趋势市场) elif market_regime == 'trending': if macd and macd.get('macd', 0) > macd.get('signal', 0) and macd.get('histogram', 0) > 0: if trend_4h in ('up', 'neutral', None): signal_strength += 3 reasons.append("MACD金叉") if direction is None: direction = 'BUY' elif macd and macd.get('macd', 0) < macd.get('signal', 0) and macd.get('histogram', 0) < 0: if trend_4h in ('down', 'neutral', None): signal_strength += 3 reasons.append("MACD死叉") if direction is None: direction = 'SELL' if ema20 and ema50: if current_price > ema20 > ema50: if trend_4h in ('up', 'neutral', None): signal_strength += 2 reasons.append("价格在均线之上") if direction is None: direction = 'BUY' elif current_price < ema20 < ema50: if trend_4h in ('down', 'neutral', None): signal_strength += 2 reasons.append("价格在均线之下") if direction is None: direction = 'SELL' # 多周期共振加分 if direction and trend_4h: if (direction == 'BUY' and trend_4h == 'up') or (direction == 'SELL' and trend_4h == 'down'): signal_strength += 2 reasons.append("4H周期共振确认") # 判断是否应该交易(使用传入的参数,如果没有则使用config中的值) if min_signal_strength is None: min_signal_strength = config.TRADING_CONFIG.get('MIN_SIGNAL_STRENGTH', 7) should_trade = signal_strength >= min_signal_strength # 对于推荐系统,允许逆4H趋势交易(但降低信号强度,标记为高风险) if direction and trend_4h: if (direction == 'BUY' and trend_4h == 'down') or (direction == 'SELL' and trend_4h == 'up'): # 推荐系统允许逆趋势,但降低信号强度(减2分) signal_strength = max(0, signal_strength - 2) reasons.append("⚠️ 逆4H趋势(高风险)") # 不禁止,但标记为高风险 return { 'should_trade': should_trade, 'direction': direction, 'reason': ', '.join(reasons) if reasons else '无明确信号', 'strength': signal_strength, 'trend_4h': trend_4h } async def _create_recommendation( self, symbol_info: Dict, trade_signal: Dict, account_balance: float = 1000 ) -> Optional[List[Dict]]: """ 创建推荐记录 Args: symbol_info: 交易对信息 trade_signal: 交易信号 Returns: 推荐字典 """ try: symbol = symbol_info['symbol'] current_price = symbol_info['price'] direction = trade_signal['direction'] # 计算建议的止损止盈(基于保证金) entry_price = current_price # 估算仓位数量和杠杆(用于计算止损止盈) # 使用建议的仓位比例和账户余额来估算 account_balance = symbol_info.get('account_balance', 1000) suggested_position_pct = symbol_info.get('suggested_position_pct', 0.05) leverage = config.TRADING_CONFIG.get('LEVERAGE', 10) # 估算仓位价值 estimated_position_value = account_balance * suggested_position_pct estimated_quantity = estimated_position_value / entry_price if entry_price > 0 else 0 # 计算基于保证金的止损止盈 stop_loss_pct_margin = config.TRADING_CONFIG.get('STOP_LOSS_PERCENT', 0.03) take_profit_pct_margin = config.TRADING_CONFIG.get('TAKE_PROFIT_PERCENT', 0.05) stop_loss_price = self.risk_manager.get_stop_loss_price( entry_price, direction, estimated_quantity, leverage, stop_loss_pct=stop_loss_pct_margin, klines=symbol_info.get('klines'), bollinger=symbol_info.get('bollinger'), atr=symbol_info.get('atr') ) # 计算止损百分比(相对于保证金,用于显示) estimated_margin = estimated_position_value / leverage if leverage > 0 else estimated_position_value stop_loss_amount = estimated_margin * stop_loss_pct_margin if direction == 'BUY': stop_loss_pct = (entry_price - stop_loss_price) / entry_price else: stop_loss_pct = (stop_loss_price - entry_price) / entry_price # 第一目标:盈亏比1:1(相对于保证金) take_profit_1_pct_margin = stop_loss_pct_margin * 1.0 # 1:1 盈亏比 take_profit_1 = self.risk_manager.get_take_profit_price( entry_price, direction, estimated_quantity, leverage, take_profit_pct=take_profit_1_pct_margin ) # 第二目标:止损的2.5倍(相对于保证金) take_profit_2_pct_margin = stop_loss_pct_margin * 2.5 take_profit_2 = self.risk_manager.get_take_profit_price( entry_price, direction, estimated_quantity, leverage, take_profit_pct=take_profit_2_pct_margin ) # 建议仓位(根据信号强度调整) base_position_pct = config.TRADING_CONFIG.get('MAX_POSITION_PERCENT', 0.05) signal_strength = trade_signal['strength'] # 信号强度越高,建议仓位可以适当增加(但不超过1.5倍) position_multiplier = min(1.0 + (signal_strength - 5) * 0.1, 1.5) suggested_position_pct = base_position_pct * position_multiplier # 计算建议的挂单价(使用限价单,而不是市价单) # 对于做多:建议价格略低于当前价格(当前价格的99.5%),以便在回调时买入 # 对于做空:建议价格略高于当前价格(当前价格的100.5%),以便在反弹时卖出 limit_price_offset_pct = config.TRADING_CONFIG.get('LIMIT_ORDER_OFFSET_PCT', 0.5) # 默认0.5% if direction == 'BUY': suggested_limit_price = current_price * (1 - limit_price_offset_pct / 100) else: # SELL suggested_limit_price = current_price * (1 + limit_price_offset_pct / 100) # 添加时间戳 timestamp = time.time() recommendation_time = datetime.now().isoformat() # 计算胜率预估(基于信号强度、市场状态等) base_win_rate = 40 + (signal_strength * 3) # 信号强度0-10,对应胜率40-70% market_regime = symbol_info.get('marketRegime', 'ranging') if market_regime == 'ranging': base_win_rate += 5 elif market_regime == 'trending': base_win_rate += 3 trend_4h = trade_signal.get('trend_4h') if trend_4h: if (direction == 'BUY' and trend_4h == 'up') or (direction == 'SELL' and trend_4h == 'down'): base_win_rate += 3 elif (direction == 'BUY' and trend_4h == 'down') or (direction == 'SELL' and trend_4h == 'up'): base_win_rate -= 2 estimated_win_rate = max(35, min(75, base_win_rate)) # 计算盈亏USDT评估(基于保证金) position_value = account_balance * suggested_position_pct margin = position_value / leverage if leverage > 0 else position_value # 止损止盈金额(相对于保证金) stop_loss_usdt = margin * stop_loss_pct_margin take_profit_1_usdt = margin * take_profit_1_pct_margin take_profit_2_usdt = margin * take_profit_2_pct_margin expected_pnl_1 = (estimated_win_rate / 100) * take_profit_1_usdt - ((100 - estimated_win_rate) / 100) * stop_loss_usdt expected_pnl_2 = (estimated_win_rate / 100) * take_profit_2_usdt - ((100 - estimated_win_rate) / 100) * stop_loss_usdt # 基础推荐数据 base_data = { 'symbol': symbol, 'direction': direction, 'current_price': current_price, 'change_percent': symbol_info.get('changePercent', 0), 'recommendation_reason': trade_signal['reason'], 'signal_strength': signal_strength, 'market_regime': market_regime, 'trend_4h': trend_4h, 'rsi': symbol_info.get('rsi'), 'macd_histogram': symbol_info.get('macd', {}).get('histogram') if symbol_info.get('macd') else None, 'bollinger_upper': symbol_info.get('bollinger', {}).get('upper') if symbol_info.get('bollinger') else None, 'bollinger_middle': symbol_info.get('bollinger', {}).get('middle') if symbol_info.get('bollinger') else None, 'bollinger_lower': symbol_info.get('bollinger', {}).get('lower') if symbol_info.get('bollinger') else None, 'ema20': symbol_info.get('ema20'), 'ema50': symbol_info.get('ema50'), 'ema20_4h': symbol_info.get('ema20_4h'), 'atr': symbol_info.get('atr'), 'suggested_stop_loss': stop_loss_price, 'suggested_take_profit_1': take_profit_1, 'suggested_take_profit_2': take_profit_2, 'suggested_position_percent': suggested_position_pct, 'suggested_leverage': config.TRADING_CONFIG.get('LEVERAGE', 10), 'volume_24h': symbol_info.get('volume24h'), 'volatility': symbol_info.get('volatility'), 'estimated_win_rate': round(estimated_win_rate, 1), 'expected_pnl_1': round(expected_pnl_1, 2), 'expected_pnl_2': round(expected_pnl_2, 2), 'stop_loss_usdt': round(stop_loss_usdt, 2), 'take_profit_1_usdt': round(take_profit_1_usdt, 2), 'take_profit_2_usdt': round(take_profit_2_usdt, 2), 'recommendation_time': recommendation_time, 'timestamp': timestamp } # 限价单推荐 limit_recommendation = base_data.copy() limit_recommendation.update({ 'order_type': 'LIMIT', 'suggested_limit_price': suggested_limit_price }) # 市价单推荐 market_recommendation = base_data.copy() market_recommendation.update({ 'order_type': 'MARKET', 'suggested_limit_price': None }) logger.debug( f"✓ 生成推荐: {symbol} {direction} " f"(信号强度: {signal_strength}/10, 胜率预估: {estimated_win_rate:.1f}%)" ) return [limit_recommendation, market_recommendation] except Exception as e: logger.error(f"创建推荐失败 {symbol_info.get('symbol', 'unknown')}: {e}") return None async def get_active_recommendations(self) -> List[Dict]: """获取当前有效的推荐""" if DB_AVAILABLE and TradeRecommendation: return TradeRecommendation.get_active() return [] async def mark_recommendation_executed(self, recommendation_id: int, trade_id: int = None): """标记推荐已执行""" if DB_AVAILABLE and TradeRecommendation: TradeRecommendation.mark_executed(recommendation_id, trade_id) logger.info(f"推荐 {recommendation_id} 已标记为已执行")