diff --git a/backend/api/routes/recommendations.py b/backend/api/routes/recommendations.py index bdf6257..698a0c5 100644 --- a/backend/api/routes/recommendations.py +++ b/backend/api/routes/recommendations.py @@ -768,7 +768,8 @@ async def generate_recommendations( min_signal_strength=max(2, min_signal_strength - 3), # 降低3个等级以获取更多推荐(最低2) max_recommendations=max(max_recommendations, 50), # 至少生成50个推荐 add_to_cache=True, # 添加到Redis缓存 - min_quality_score=0.0 # 不过滤,保留所有推荐 + min_quality_score=0.0, # 不过滤,保留所有推荐 + scan_cache_namespace="api", # 避免覆盖交易/推荐进程的扫描缓存 ) return { diff --git a/trading_system/README.md b/trading_system/README.md index b3661d2..c9ad11d 100644 --- a/trading_system/README.md +++ b/trading_system/README.md @@ -8,6 +8,7 @@ trading_system/ ├── __init__.py ├── main.py # 主程序入口 +├── recommendations_main.py # 推荐服务入口(独立进程:只生成推荐,不自动交易) ├── config.py # 配置文件 ├── binance_client.py # 币安客户端 ├── market_scanner.py # 市场扫描器 @@ -36,6 +37,24 @@ cd trading_system python main.py ``` +### 推荐服务(独立进程) + +推荐用于“普通用户查看”,建议与自动交易拆开启动(避免自动交易等待成交时拖慢推荐生成)。 + +```bash +# 从项目根目录运行 +python -m trading_system.recommendations_main +``` + +常用环境变量(可选): + +- `RECOMMEND_SCAN_INTERVAL_SEC`: 推荐生成间隔(秒),默认 60 +- `RECOMMEND_MIN_SIGNAL_STRENGTH`: 推荐最小强度,默认 5 +- `RECOMMEND_MAX_RECOMMENDATIONS`: 单次最大推荐数量,默认 60 +- `RECOMMEND_MIN_QUALITY_SCORE`: 质量分过滤,默认 0.0 +- `RECOMMEND_SCAN_CACHE_NAMESPACE`: 扫描缓存命名空间,默认 recommend +- `RECOMMEND_LOG_FILE`: 推荐服务日志文件,默认 recommendations_bot.log + ### 方式2:从项目根目录运行 ```bash diff --git a/trading_system/market_scanner.py b/trading_system/market_scanner.py index 68f110f..769fe81 100644 --- a/trading_system/market_scanner.py +++ b/trading_system/market_scanner.py @@ -29,7 +29,7 @@ class MarketScanner: self.client = client self.top_symbols: List[Dict] = [] - async def scan_market(self) -> List[Dict]: + async def scan_market(self, cache_namespace: str = "trade", config_override: Optional[Dict] = None) -> List[Dict]: """ 扫描市场,找出涨跌幅最大的前N个货币对 优先从 Redis 缓存读取扫描结果,如果缓存不可用或过期则重新扫描 @@ -39,9 +39,15 @@ class MarketScanner: """ import time self._scan_start_time = time.time() + + # 允许“推荐进程”和“交易进程”使用不同的扫描参数/缓存命名空间,互不干扰 + cfg = dict(config.TRADING_CONFIG or {}) + if config_override and isinstance(config_override, dict): + cfg.update(config_override) + ns = (cache_namespace or "trade").strip() or "trade" # 先查 Redis 缓存(扫描结果缓存,TTL: 30秒) - cache_key = "scan_result:top_symbols" + cache_key = f"scan_result:top_symbols:{ns}" cached = await self.client.redis_cache.get(cache_key) if cached: logger.info(f"从Redis缓存获取扫描结果: {len(cached)} 个交易对") @@ -57,7 +63,7 @@ class MarketScanner: return [] # 根据配置限制扫描的交易对数量 - max_scan_symbols = config.TRADING_CONFIG.get('MAX_SCAN_SYMBOLS', 500) + max_scan_symbols = cfg.get('MAX_SCAN_SYMBOLS', 500) if max_scan_symbols > 0 and max_scan_symbols < len(all_symbols): symbols = all_symbols[:max_scan_symbols] logger.info(f"限制扫描数量: {len(symbols)}/{len(all_symbols)} 个交易对(配置: MAX_SCAN_SYMBOLS={max_scan_symbols})") @@ -76,8 +82,8 @@ class MarketScanner: if ticker: change_percent = abs(ticker.get('changePercent', 0)) volume = ticker.get('volume', 0) - if (change_percent >= config.TRADING_CONFIG['MIN_CHANGE_PERCENT'] and - volume >= config.TRADING_CONFIG['MIN_VOLUME_24H']): + if (change_percent >= cfg.get('MIN_CHANGE_PERCENT', config.TRADING_CONFIG['MIN_CHANGE_PERCENT']) and + volume >= cfg.get('MIN_VOLUME_24H', config.TRADING_CONFIG['MIN_VOLUME_24H'])): pre_filtered_symbols.append(symbol) logger.info(f"初步筛选后,需要详细分析的交易对: {len(pre_filtered_symbols)} 个") @@ -102,8 +108,8 @@ class MarketScanner: # 过滤最小涨跌幅和成交量 filtered_results = [ r for r in valid_results - if abs(r['changePercent']) >= config.TRADING_CONFIG['MIN_CHANGE_PERCENT'] - and r.get('volume24h', 0) >= config.TRADING_CONFIG['MIN_VOLUME_24H'] + if abs(r['changePercent']) >= cfg.get('MIN_CHANGE_PERCENT', config.TRADING_CONFIG['MIN_CHANGE_PERCENT']) + and r.get('volume24h', 0) >= cfg.get('MIN_VOLUME_24H', config.TRADING_CONFIG['MIN_VOLUME_24H']) ] # 按信号得分和涨跌幅综合排序,取前N个 @@ -117,7 +123,7 @@ class MarketScanner: reverse=True ) - top_n = sorted_results[:config.TRADING_CONFIG['TOP_N_SYMBOLS']] + top_n = sorted_results[:cfg.get('TOP_N_SYMBOLS', config.TRADING_CONFIG['TOP_N_SYMBOLS'])] self.top_symbols = top_n diff --git a/trading_system/recommendations_main.py b/trading_system/recommendations_main.py new file mode 100644 index 0000000..e9c3936 --- /dev/null +++ b/trading_system/recommendations_main.py @@ -0,0 +1,167 @@ +""" +推荐服务入口(独立进程) + +目的: +- 推荐生成与自动交易解耦:推荐更高频、只做分析+写入Redis,不下单、不等待成交 +- 未来支持“普通用户仅看推荐 / 加V用户自动交易”时,可独立扩容 +""" + +import asyncio +import logging +import os +import sys +from pathlib import Path +from datetime import datetime, timezone, timedelta + + +# 支持直接运行和作为模块导入 +if __name__ == "__main__": + from binance_client import BinanceClient + from market_scanner import MarketScanner + from risk_manager import RiskManager + from trade_recommender import TradeRecommender + import config +else: + from .binance_client import BinanceClient + from .market_scanner import MarketScanner + from .risk_manager import RiskManager + from .trade_recommender import TradeRecommender + from . import config + + +class BeijingTimeFormatter(logging.Formatter): + """使用北京时间的日志格式化器""" + + def formatTime(self, record, datefmt=None): + beijing_tz = timezone(timedelta(hours=8)) + dt = datetime.fromtimestamp(record.created, tz=beijing_tz) + if datefmt: + return dt.strftime(datefmt) + return dt.strftime("%Y-%m-%d %H:%M:%S") + + +def _setup_logging(): + # 单独的日志文件,避免与交易进程混在一起 + log_file = os.getenv("RECOMMEND_LOG_FILE", "recommendations_bot.log") + if not Path(log_file).is_absolute(): + project_root = Path(__file__).parent.parent + log_file = project_root / log_file + + formatter = BeijingTimeFormatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") + file_handler = logging.FileHandler(str(log_file), encoding="utf-8") + file_handler.setFormatter(formatter) + console_handler = logging.StreamHandler(sys.stdout) + console_handler.setFormatter(formatter) + + logging.basicConfig(level=getattr(logging, config.LOG_LEVEL), handlers=[file_handler, console_handler]) + + # 追加:日志写入 Redis(用于前端日志监控) + try: + try: + from .redis_log_handler import RedisErrorLogHandler, RedisLogConfig + except Exception: + from redis_log_handler import RedisErrorLogHandler, RedisLogConfig + + redis_cfg = RedisLogConfig( + redis_url=getattr(config, "REDIS_URL", "redis://localhost:6379"), + use_tls=bool(getattr(config, "REDIS_USE_TLS", False)), + ssl_cert_reqs=str(getattr(config, "REDIS_SSL_CERT_REQS", "required") or "required"), + ssl_ca_certs=getattr(config, "REDIS_SSL_CA_CERTS", None), + username=getattr(config, "REDIS_USERNAME", None), + password=getattr(config, "REDIS_PASSWORD", None), + service="recommendations", + ) + redis_handler = RedisErrorLogHandler(redis_cfg) + redis_handler.setLevel(logging.INFO) + logging.getLogger().addHandler(redis_handler) + except Exception: + pass + + +logger = logging.getLogger(__name__) + + +async def _acquire_reco_lock(client: BinanceClient, ttl_sec: int) -> bool: + """ + 防止多实例同时生成推荐(避免重复扫描/写Redis风暴)。 + """ + rc = getattr(client, "redis_cache", None) + r = getattr(rc, "redis", None) if rc else None + if not r or not getattr(rc, "_connected", False): + return True # Redis不可用时无法分布式锁,直接放行 + now_ms = int(__import__("time").time() * 1000) + try: + ok = await r.set("lock:recommendations:generator", str(now_ms), nx=True, ex=max(5, int(ttl_sec))) + return bool(ok) + except Exception: + return True + + +async def main(): + _setup_logging() + + logger.info("=" * 60) + logger.info("推荐服务启动(独立进程:只生成推荐,不自动交易)") + logger.info("=" * 60) + + # 建议:推荐频率相对更高(默认60s),且不依赖交易进程的 SCAN_INTERVAL + scan_interval = int(os.getenv("RECOMMEND_SCAN_INTERVAL_SEC", "60") or "60") + min_signal_strength = int(os.getenv("RECOMMEND_MIN_SIGNAL_STRENGTH", "5") or "5") + max_recommendations = int(os.getenv("RECOMMEND_MAX_RECOMMENDATIONS", "60") or "60") + min_quality_score = float(os.getenv("RECOMMEND_MIN_QUALITY_SCORE", "0.0") or "0.0") + cache_namespace = os.getenv("RECOMMEND_SCAN_CACHE_NAMESPACE", "recommend") or "recommend" + + logger.info( + f"推荐参数: interval={scan_interval}s, min_strength>={min_signal_strength}, " + f"max_recos={max_recommendations}, min_quality={min_quality_score}, cache_ns={cache_namespace}" + ) + + client: BinanceClient | None = None + try: + client = BinanceClient() + await client.connect() + + scanner = MarketScanner(client) + risk_manager = RiskManager(client) + recommender = TradeRecommender(client, scanner, risk_manager) + + while True: + try: + # 每轮都从Redis刷新配置(确保推荐侧也能动态生效) + try: + if config._config_manager: + config._config_manager.reload_from_redis() + config.TRADING_CONFIG = config._get_trading_config() + except Exception: + pass + + # 分布式锁:避免多实例同时跑 + got_lock = await _acquire_reco_lock(client, ttl_sec=max(10, scan_interval)) + if not got_lock: + logger.debug("推荐生成锁未获取到(可能有其他实例在跑),跳过本轮") + await asyncio.sleep(min(5, scan_interval)) + continue + + recos = await recommender.generate_recommendations( + min_signal_strength=min_signal_strength, + max_recommendations=max_recommendations, + add_to_cache=True, + min_quality_score=min_quality_score, + scan_cache_namespace=cache_namespace, + ) + logger.info(f"推荐生成完成: {len(recos)} 条(已写入Redis snapshot)") + except Exception as e: + logger.error(f"推荐生成循环异常: {e}", exc_info=True) + + await asyncio.sleep(max(5, scan_interval)) + finally: + if client: + try: + await client.disconnect() + except Exception: + pass + + +if __name__ == "__main__": + asyncio.run(main()) + diff --git a/trading_system/trade_recommender.py b/trading_system/trade_recommender.py index 13f477a..caacd97 100644 --- a/trading_system/trade_recommender.py +++ b/trading_system/trade_recommender.py @@ -72,7 +72,9 @@ class TradeRecommender: min_signal_strength: int = 5, max_recommendations: int = 20, add_to_cache: bool = True, - min_quality_score: float = 0.0 + min_quality_score: float = 0.0, + scan_cache_namespace: str = "recommend", + scan_config_override: Optional[Dict] = None, ) -> List[Dict]: """ 生成交易推荐(支持增量添加到Redis缓存) @@ -112,7 +114,11 @@ class TradeRecommender: logger.debug(f"从Redis读取现有推荐失败: {e}") # 2. 扫描市场 - top_symbols = await self.scanner.scan_market() + # 说明:推荐进程/交易进程可能同时运行,为避免扫描缓存互相覆盖,使用独立 cache_namespace + top_symbols = await self.scanner.scan_market( + cache_namespace=scan_cache_namespace or "recommend", + config_override=scan_config_override, + ) if not top_symbols: logger.warning("未找到符合条件的交易对") # 如果市场扫描失败,返回现有缓存推荐