From b2860f8fdb6e46838c1c55d88bfe9b0b1474670b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=96=87=E8=96=87=E5=AE=89?= Date: Thu, 15 Jan 2026 21:59:18 +0800 Subject: [PATCH] a --- backend/api/routes/recommendations.py | 126 +++++++++++---- trading_system/redis_cache.py | 173 +++++++++++++++++++++ trading_system/trade_recommender.py | 212 ++++++++++++++++++++++---- 3 files changed, 450 insertions(+), 61 deletions(-) diff --git a/backend/api/routes/recommendations.py b/backend/api/routes/recommendations.py index 969879d..2c1e3b8 100644 --- a/backend/api/routes/recommendations.py +++ b/backend/api/routes/recommendations.py @@ -36,7 +36,7 @@ async def get_recommendations( """ try: if type == 'realtime': - # 实时生成推荐 + # 从Redis缓存读取推荐(如果存在) import sys from pathlib import Path current_file = Path(__file__) @@ -58,43 +58,59 @@ async def get_recommendations( sys.path.insert(0, str(project_root)) from binance_client import BinanceClient - from market_scanner import MarketScanner - from risk_manager import RiskManager - from trade_recommender import TradeRecommender import config - # 初始化组件 + # 创建客户端(用于访问Redis) client = BinanceClient( api_key=config.BINANCE_API_KEY, api_secret=config.BINANCE_API_SECRET, testnet=config.USE_TESTNET ) - await client.connect() - + # 连接Redis(如果还没有连接) try: - scanner = MarketScanner(client) - risk_manager = RiskManager(client) - recommender = TradeRecommender(client, scanner, risk_manager) - - # 生成推荐 - recommendations = await recommender.generate_recommendations( - min_signal_strength=min_signal_strength, - max_recommendations=limit - ) - - # 方向过滤 - if direction: - recommendations = [r for r in recommendations if r.get('direction') == direction] - - return { - "success": True, - "count": len(recommendations), - "type": "realtime", - "data": recommendations - } - finally: - await client.disconnect() + await client.redis_cache.connect() + except: + pass + + # 从Redis读取推荐 + recommendations = [] + try: + import time + cache_key = "recommendations:realtime" + cached_data = await client.redis_cache.hgetall(cache_key) + if cached_data: + current_time = time.time() + max_age = 3600 * 2 # 推荐最大保留时间:2小时 + + # 过滤过期推荐 + for rec in cached_data.values(): + if isinstance(rec, dict): + rec_timestamp = rec.get('timestamp', 0) + # 如果推荐时间超过2小时,跳过 + if current_time - rec_timestamp > max_age: + continue + recommendations.append(rec) + + # 按时间戳降序排序(最新的在前) + recommendations.sort(key=lambda x: x.get('timestamp', 0), reverse=True) + logger.info(f"从Redis读取到 {len(recommendations)} 个有效推荐(已过滤过期)") + except Exception as e: + logger.debug(f"从Redis读取推荐失败: {e}") + + # 方向过滤 + if direction: + recommendations = [r for r in recommendations if r.get('direction') == direction] + + # 限制返回数量 + recommendations = recommendations[:limit] + + return { + "success": True, + "count": len(recommendations), + "type": "realtime", + "data": recommendations + } elif type == 'bookmarked': # 从数据库查询已标记的推荐 @@ -470,10 +486,12 @@ async def generate_recommendations( risk_manager = RiskManager(client) recommender = TradeRecommender(client, scanner, risk_manager) - # 生成推荐 + # 生成推荐(增量添加到Redis缓存) recommendations = await recommender.generate_recommendations( min_signal_strength=min_signal_strength, - max_recommendations=max_recommendations + max_recommendations=max_recommendations, + add_to_cache=True, # 添加到Redis缓存 + min_quality_score=0.0 # 不过滤,保留所有推荐 ) return { @@ -494,6 +512,52 @@ async def generate_recommendations( raise HTTPException(status_code=500, detail=f"生成推荐失败: {str(e)}") +@router.post("/clear-cache") +async def clear_recommendations_cache(): + """ + 清理Redis中的推荐缓存 + """ + try: + import sys + from pathlib import Path + current_file = Path(__file__) + backend_path = current_file.parent.parent.parent + project_root = backend_path.parent + trading_system_path = project_root / 'trading_system' + + if not trading_system_path.exists(): + alternative_path = backend_path / 'trading_system' + if alternative_path.exists(): + trading_system_path = alternative_path + + sys.path.insert(0, str(trading_system_path)) + sys.path.insert(0, str(project_root)) + + from binance_client import BinanceClient + import config + + client = BinanceClient( + api_key=config.BINANCE_API_KEY, + api_secret=config.BINANCE_API_SECRET, + testnet=config.USE_TESTNET + ) + + try: + await client.redis_cache.connect() + cache_key = "recommendations:realtime" + await client.redis_cache.delete(cache_key) + + return { + "success": True, + "message": "推荐缓存已清理" + } + finally: + await client.disconnect() + except Exception as e: + logger.error(f"清理推荐缓存失败: {e}") + raise HTTPException(status_code=500, detail=f"清理推荐缓存失败: {str(e)}") + + @router.post("/bookmark") async def bookmark_recommendation(recommendation_data: dict): """ diff --git a/trading_system/redis_cache.py b/trading_system/redis_cache.py index 5643140..b3a604f 100644 --- a/trading_system/redis_cache.py +++ b/trading_system/redis_cache.py @@ -219,3 +219,176 @@ class RedisCache: def is_connected(self) -> bool: """检查是否已连接""" return self._connected and self.redis is not None + + async def hset(self, name: str, key: str, value: Any, ttl: int = None): + """ + 设置Hash字段 + + Args: + name: Hash名称 + key: 字段名 + value: 字段值 + ttl: 过期时间(秒),如果设置,会对整个Hash设置TTL + """ + if self.redis and self._connected: + try: + await self.redis.hset(name, key, json.dumps(value)) + # 如果设置了TTL,对整个Hash设置过期时间 + if ttl: + await self.redis.expire(name, ttl) + return True + except Exception as e: + logger.debug(f"Redis Hash设置失败 {name}.{key}: {e}") + # Redis失败时,尝试重新连接 + if not self._connected: + await self.connect() + if self.redis and self._connected: + try: + await self.redis.hset(name, key, json.dumps(value)) + if ttl: + await self.redis.expire(name, ttl) + return True + except: + pass + + # 降级到内存缓存 + if name not in self._memory_cache: + self._memory_cache[name] = {} + self._memory_cache[name][key] = value + return False + + async def hget(self, name: str, key: str) -> Optional[Any]: + """ + 获取Hash字段 + + Args: + name: Hash名称 + key: 字段名 + + Returns: + 字段值,如果不存在则返回None + """ + if self.redis and self._connected: + try: + data = await self.redis.hget(name, key) + if data: + return json.loads(data) + except Exception as e: + logger.debug(f"Redis Hash获取失败 {name}.{key}: {e}") + + # 降级到内存缓存 + if name in self._memory_cache and key in self._memory_cache[name]: + return self._memory_cache[name][key] + + return None + + async def hgetall(self, name: str) -> Dict[str, Any]: + """ + 获取Hash所有字段 + + Args: + name: Hash名称 + + Returns: + 所有字段的字典 + """ + if self.redis and self._connected: + try: + data = await self.redis.hgetall(name) + result = {} + for k, v in data.items(): + if isinstance(k, bytes): + k = k.decode('utf-8') + if isinstance(v, bytes): + v = json.loads(v.decode('utf-8')) + else: + v = json.loads(v) + result[k] = v + return result + except Exception as e: + logger.debug(f"Redis Hash获取全部失败 {name}: {e}") + + # 降级到内存缓存 + if name in self._memory_cache: + return self._memory_cache[name].copy() + + return {} + + async def hdel(self, name: str, *keys: str): + """ + 删除Hash字段 + + Args: + name: Hash名称 + *keys: 要删除的字段名 + """ + if self.redis and self._connected: + try: + await self.redis.hdel(name, *keys) + except Exception as e: + logger.debug(f"Redis Hash删除失败 {name}: {e}") + + # 同时删除内存缓存 + if name in self._memory_cache: + for key in keys: + if key in self._memory_cache[name]: + del self._memory_cache[name][key] + + async def zadd(self, name: str, mapping: Dict[str, float], ttl: int = None): + """ + 添加Sorted Set成员 + + Args: + name: Sorted Set名称 + mapping: {member: score} 字典 + ttl: 过期时间(秒) + """ + if self.redis and self._connected: + try: + await self.redis.zadd(name, mapping) + if ttl: + await self.redis.expire(name, ttl) + return True + except Exception as e: + logger.debug(f"Redis Sorted Set添加失败 {name}: {e}") + + return False + + async def zrange(self, name: str, start: int = 0, end: int = -1, desc: bool = False, withscores: bool = False) -> List: + """ + 获取Sorted Set成员(按分数排序) + + Args: + name: Sorted Set名称 + start: 起始索引 + end: 结束索引 + desc: 是否降序(默认False,升序) + withscores: 是否包含分数 + + Returns: + 成员列表 + """ + if self.redis and self._connected: + try: + if desc: + return await self.redis.zrevrange(name, start, end, withscores=withscores) + else: + return await self.redis.zrange(name, start, end, withscores=withscores) + except Exception as e: + logger.debug(f"Redis Sorted Set获取失败 {name}: {e}") + + return [] + + async def zrem(self, name: str, *members: str): + """ + 删除Sorted Set成员 + + Args: + name: Sorted Set名称 + *members: 要删除的成员 + """ + if self.redis and self._connected: + try: + await self.redis.zrem(name, *members) + except Exception as e: + logger.debug(f"Redis Sorted Set删除失败 {name}: {e}") diff --git a/trading_system/trade_recommender.py b/trading_system/trade_recommender.py index a4d714d..edc2b10 100644 --- a/trading_system/trade_recommender.py +++ b/trading_system/trade_recommender.py @@ -3,6 +3,7 @@ """ import asyncio import logging +import time from typing import List, Dict, Optional from datetime import datetime, timedelta try: @@ -66,50 +67,155 @@ class TradeRecommender: async def generate_recommendations( self, min_signal_strength: int = 5, - max_recommendations: int = 20 + max_recommendations: int = 20, + add_to_cache: bool = True, + min_quality_score: float = 0.0 ) -> List[Dict]: """ - 生成交易推荐 + 生成交易推荐(支持增量添加到Redis缓存) Args: min_signal_strength: 最小信号强度(默认5,低于此强度的不推荐) - max_recommendations: 最大推荐数量 + max_recommendations: 最大推荐数量(单次生成) + add_to_cache: 是否添加到Redis缓存(默认True) + min_quality_score: 最小质量分数(用于过滤,默认0.0表示不过滤) Returns: 推荐列表 """ logger.info("开始生成交易推荐...") - # 1. 扫描市场 + # 获取账户余额(用于计算盈亏评估) + try: + balance = await self.client.get_account_balance() + account_balance = balance.get('total', 1000) + logger.debug(f"账户余额: {account_balance:.2f} USDT") + except Exception as e: + account_balance = 1000 + logger.warning(f"获取账户余额失败,使用默认值1000 USDT: {e}") + + # 1. 从Redis读取现有推荐(如果启用缓存) + existing_recommendations = {} + if add_to_cache: + try: + # 从Redis Hash读取所有现有推荐 + cache_key = "recommendations:realtime" + existing_data = await self.client.redis_cache.hgetall(cache_key) + for rec_key, rec_data in existing_data.items(): + if isinstance(rec_data, dict): + existing_recommendations[rec_key] = rec_data + logger.info(f"从Redis读取到 {len(existing_recommendations)} 个现有推荐") + except Exception as e: + logger.debug(f"从Redis读取现有推荐失败: {e}") + + # 2. 扫描市场 top_symbols = await self.scanner.scan_market() if not top_symbols: logger.warning("未找到符合条件的交易对") + # 如果市场扫描失败,返回现有缓存推荐 + if existing_recommendations: + return list(existing_recommendations.values()) return [] - recommendations = [] + new_recommendations = [] - # 2. 对每个交易对进行分析 + # 3. 对每个交易对进行分析 for symbol_info in top_symbols: - if len(recommendations) >= max_recommendations: - break - symbol = symbol_info['symbol'] current_price = symbol_info['price'] - change_percent = symbol_info.get('changePercent', 0) - # 3. 分析交易信号(使用策略模块的逻辑) + # 4. 分析交易信号 trade_signal = await self._analyze_trade_signal(symbol_info) - # 4. 如果信号强度足够,生成推荐 + # 5. 如果信号强度足够,生成推荐 if trade_signal['should_trade'] and trade_signal['strength'] >= min_signal_strength: - recommendation = await self._create_recommendation( - symbol_info, trade_signal + recommendations_result = await self._create_recommendation( + symbol_info, trade_signal, account_balance ) - if recommendation: - recommendations.append(recommendation) + if recommendations_result: + # _create_recommendation 返回列表(限价单和市价单) + if isinstance(recommendations_result, list): + new_recommendations.extend(recommendations_result) + else: + new_recommendations.append(recommendations_result) - logger.info(f"生成了 {len(recommendations)} 个交易推荐") - return recommendations + # 6. 合并新推荐和现有推荐 + all_recommendations = {} + current_time = time.time() + max_age = 3600 * 2 # 推荐最大保留时间:2小时 + + # 先添加现有推荐(过滤掉过期的) + for rec_key, rec_data in existing_recommendations.items(): + rec_timestamp = rec_data.get('timestamp', 0) + # 如果推荐时间超过2小时,跳过 + if current_time - rec_timestamp > max_age: + continue + all_recommendations[rec_key] = rec_data + + # 添加新推荐(如果质量足够) + for rec in new_recommendations: + # 生成推荐键:symbol_orderType + rec_key = f"{rec['symbol']}_{rec.get('order_type', 'LIMIT')}" + + # 计算质量分数(信号强度 + 胜率预估) + quality_score = rec.get('signal_strength', 0) * 10 + if 'estimated_win_rate' in rec: + quality_score += rec['estimated_win_rate'] + + # 如果质量分数足够,才添加或更新 + if quality_score >= min_quality_score: + # 如果已存在,比较质量分数,只保留更好的 + if rec_key in all_recommendations: + existing_quality = all_recommendations[rec_key].get('signal_strength', 0) * 10 + if 'estimated_win_rate' in all_recommendations[rec_key]: + existing_quality += all_recommendations[rec_key]['estimated_win_rate'] + + # 如果新推荐质量更好,更新 + if quality_score > existing_quality: + all_recommendations[rec_key] = rec + logger.debug(f"更新推荐 {rec_key}: 质量分数 {existing_quality:.1f} -> {quality_score:.1f}") + else: + # 新推荐,直接添加 + all_recommendations[rec_key] = rec + logger.debug(f"添加新推荐 {rec_key}: 质量分数 {quality_score:.1f}") + + # 7. 按信号强度排序,保留质量最好的推荐 + sorted_recommendations = sorted( + all_recommendations.values(), + key=lambda x: (x.get('signal_strength', 0), x.get('estimated_win_rate', 0)), + reverse=True + ) + + # 限制总数(保留质量最好的) + max_total = max_recommendations * 3 # 允许更多推荐以便参考 + final_recommendations = sorted_recommendations[:max_total] + + # 8. 保存到Redis缓存(如果启用) + if add_to_cache: + try: + cache_key = "recommendations:realtime" + + # 增量更新:只更新或添加新的推荐,保留其他推荐 + updated_count = 0 + for rec in final_recommendations: + rec_key = f"{rec['symbol']}_{rec.get('order_type', 'LIMIT')}" + # 使用hset更新或添加(如果不存在则创建) + await self.client.redis_cache.hset(cache_key, rec_key, rec, ttl=3600) # TTL 1小时 + updated_count += 1 + + # 设置整个Hash的TTL(1小时) + if self.client.redis_cache.redis and self.client.redis_cache._connected: + try: + await self.client.redis_cache.redis.expire(cache_key, 3600) + except: + pass + + logger.info(f"已更新 {updated_count} 个推荐到Redis缓存(总计 {len(final_recommendations)} 个)") + except Exception as e: + logger.warning(f"保存推荐到Redis失败: {e}") + + logger.info(f"生成了 {len(new_recommendations)} 个新推荐,总计 {len(final_recommendations)} 个推荐(包含缓存)") + return final_recommendations async def _analyze_trade_signal(self, symbol_info: Dict) -> Dict: """ @@ -230,8 +336,9 @@ class TradeRecommender: async def _create_recommendation( self, symbol_info: Dict, - trade_signal: Dict - ) -> Optional[Dict]: + trade_signal: Dict, + account_balance: float = 1000 + ) -> Optional[List[Dict]]: """ 创建推荐记录 @@ -291,16 +398,43 @@ class TradeRecommender: else: # SELL suggested_limit_price = current_price * (1 + limit_price_offset_pct / 100) - # 准备推荐数据 - recommendation_data = { + # 添加时间戳 + timestamp = time.time() + recommendation_time = datetime.now().isoformat() + + # 计算胜率预估(基于信号强度、市场状态等) + base_win_rate = 40 + (signal_strength * 3) # 信号强度0-10,对应胜率40-70% + market_regime = symbol_info.get('marketRegime', 'ranging') + if market_regime == 'ranging': + base_win_rate += 5 + elif market_regime == 'trending': + base_win_rate += 3 + trend_4h = trade_signal.get('trend_4h') + if trend_4h: + if (direction == 'BUY' and trend_4h == 'up') or (direction == 'SELL' and trend_4h == 'down'): + base_win_rate += 3 + elif (direction == 'BUY' and trend_4h == 'down') or (direction == 'SELL' and trend_4h == 'up'): + base_win_rate -= 2 + estimated_win_rate = max(35, min(75, base_win_rate)) + + # 计算盈亏USDT评估 + position_value = account_balance * suggested_position_pct * config.TRADING_CONFIG.get('LEVERAGE', 10) + stop_loss_usdt = position_value * stop_loss_pct + take_profit_1_usdt = position_value * stop_loss_pct + take_profit_2_usdt = position_value * take_profit_2_pct + expected_pnl_1 = (estimated_win_rate / 100) * take_profit_1_usdt - ((100 - estimated_win_rate) / 100) * stop_loss_usdt + expected_pnl_2 = (estimated_win_rate / 100) * take_profit_2_usdt - ((100 - estimated_win_rate) / 100) * stop_loss_usdt + + # 基础推荐数据 + base_data = { 'symbol': symbol, 'direction': direction, 'current_price': current_price, 'change_percent': symbol_info.get('changePercent', 0), 'recommendation_reason': trade_signal['reason'], 'signal_strength': signal_strength, - 'market_regime': symbol_info.get('marketRegime'), - 'trend_4h': trade_signal.get('trend_4h'), + 'market_regime': market_regime, + 'trend_4h': trend_4h, 'rsi': symbol_info.get('rsi'), 'macd_histogram': symbol_info.get('macd', {}).get('histogram') if symbol_info.get('macd') else None, 'bollinger_upper': symbol_info.get('bollinger', {}).get('upper') if symbol_info.get('bollinger') else None, @@ -317,18 +451,36 @@ class TradeRecommender: 'suggested_leverage': config.TRADING_CONFIG.get('LEVERAGE', 10), 'volume_24h': symbol_info.get('volume24h'), 'volatility': symbol_info.get('volatility'), - 'order_type': 'LIMIT', # 使用限价单 - 'suggested_limit_price': suggested_limit_price # 建议的挂单价 + 'estimated_win_rate': round(estimated_win_rate, 1), + 'expected_pnl_1': round(expected_pnl_1, 2), + 'expected_pnl_2': round(expected_pnl_2, 2), + 'stop_loss_usdt': round(stop_loss_usdt, 2), + 'take_profit_1_usdt': round(take_profit_1_usdt, 2), + 'take_profit_2_usdt': round(take_profit_2_usdt, 2), + 'recommendation_time': recommendation_time, + 'timestamp': timestamp } - # 不再自动保存到数据库,只返回推荐数据 - # 只有用户在前端点击"标记"时才会保存到数据库(用于复盘) + # 限价单推荐 + limit_recommendation = base_data.copy() + limit_recommendation.update({ + 'order_type': 'LIMIT', + 'suggested_limit_price': suggested_limit_price + }) + + # 市价单推荐 + market_recommendation = base_data.copy() + market_recommendation.update({ + 'order_type': 'MARKET', + 'suggested_limit_price': None + }) + logger.debug( f"✓ 生成推荐: {symbol} {direction} " - f"(信号强度: {signal_strength}/10)" + f"(信号强度: {signal_strength}/10, 胜率预估: {estimated_win_rate:.1f}%)" ) - return recommendation_data + return [limit_recommendation, market_recommendation] except Exception as e: logger.error(f"创建推荐失败 {symbol_info.get('symbol', 'unknown')}: {e}")