""" 推荐服务入口(独立进程) 目的: - 推荐生成与自动交易解耦:推荐更高频、只做分析+写入Redis,不下单、不等待成交 - 未来支持“普通用户仅看推荐 / 加V用户自动交易”时,可独立扩容 """ import asyncio import logging import os import sys from pathlib import Path from datetime import datetime, timezone, timedelta # 启动方式兼容: # - python trading_system/recommendations_main.py(__package__ 为空,需从同目录导入) # - python -m trading_system.recommendations_main(__package__='trading_system',必须用相对导入) if __package__ in (None, ""): from binance_client import BinanceClient from market_scanner import MarketScanner from risk_manager import RiskManager from trade_recommender import TradeRecommender import config else: from .binance_client import BinanceClient from .market_scanner import MarketScanner from .risk_manager import RiskManager from .trade_recommender import TradeRecommender from . import config class BeijingTimeFormatter(logging.Formatter): """使用北京时间的日志格式化器""" def formatTime(self, record, datefmt=None): beijing_tz = timezone(timedelta(hours=8)) dt = datetime.fromtimestamp(record.created, tz=beijing_tz) if datefmt: return dt.strftime(datefmt) return dt.strftime("%Y-%m-%d %H:%M:%S") def _setup_logging(): # 单独的日志文件,避免与交易进程混在一起 log_file = os.getenv("RECOMMEND_LOG_FILE", "recommendations_bot.log") if not Path(log_file).is_absolute(): project_root = Path(__file__).parent.parent log_file = project_root / log_file formatter = BeijingTimeFormatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") file_handler = logging.FileHandler(str(log_file), encoding="utf-8") file_handler.setFormatter(formatter) console_handler = logging.StreamHandler(sys.stdout) console_handler.setFormatter(formatter) logging.basicConfig(level=getattr(logging, config.LOG_LEVEL), handlers=[file_handler, console_handler]) # 追加:日志写入 Redis(用于前端日志监控) try: try: from .redis_log_handler import RedisErrorLogHandler, RedisLogConfig except Exception: from redis_log_handler import RedisErrorLogHandler, RedisLogConfig redis_cfg = RedisLogConfig( redis_url=getattr(config, "REDIS_URL", "redis://localhost:6379"), use_tls=bool(getattr(config, "REDIS_USE_TLS", False)), ssl_cert_reqs=str(getattr(config, "REDIS_SSL_CERT_REQS", "required") or "required"), ssl_ca_certs=getattr(config, "REDIS_SSL_CA_CERTS", None), username=getattr(config, "REDIS_USERNAME", None), password=getattr(config, "REDIS_PASSWORD", None), service="recommendations", ) redis_handler = RedisErrorLogHandler(redis_cfg) redis_handler.setLevel(logging.INFO) logging.getLogger().addHandler(redis_handler) except Exception: pass logger = logging.getLogger(__name__) async def _acquire_reco_lock(client: BinanceClient, ttl_sec: int) -> bool: """ 防止多实例同时生成推荐(避免重复扫描/写Redis风暴)。 """ rc = getattr(client, "redis_cache", None) r = getattr(rc, "redis", None) if rc else None if not r or not getattr(rc, "_connected", False): return True # Redis不可用时无法分布式锁,直接放行 now_ms = int(__import__("time").time() * 1000) try: ok = await r.set("lock:recommendations:generator", str(now_ms), nx=True, ex=max(5, int(ttl_sec))) return bool(ok) except Exception: return True async def main(): _setup_logging() logger.info("=" * 60) logger.info("推荐服务启动(独立进程:只生成推荐,不自动交易)") logger.info("=" * 60) # 建议:推荐频率相对更高(默认60s),且不依赖交易进程的 SCAN_INTERVAL scan_interval = int(os.getenv("RECOMMEND_SCAN_INTERVAL_SEC", "60") or "60") min_signal_strength = int(os.getenv("RECOMMEND_MIN_SIGNAL_STRENGTH", "5") or "5") max_recommendations = int(os.getenv("RECOMMEND_MAX_RECOMMENDATIONS", "60") or "60") min_quality_score = float(os.getenv("RECOMMEND_MIN_QUALITY_SCORE", "0.0") or "0.0") cache_namespace = os.getenv("RECOMMEND_SCAN_CACHE_NAMESPACE", "recommend") or "recommend" logger.info( f"推荐参数: interval={scan_interval}s, min_strength>={min_signal_strength}, " f"max_recos={max_recommendations}, min_quality={min_quality_score}, cache_ns={cache_namespace}" ) client: BinanceClient | None = None try: client = BinanceClient() await client.connect() scanner = MarketScanner(client) risk_manager = RiskManager(client) recommender = TradeRecommender(client, scanner, risk_manager) while True: try: # 每轮都从Redis刷新配置(确保推荐侧也能动态生效) try: if config._config_manager: config._config_manager.reload_from_redis() config.TRADING_CONFIG = config._get_trading_config() except Exception: pass # 分布式锁:避免多实例同时跑 got_lock = await _acquire_reco_lock(client, ttl_sec=max(10, scan_interval)) if not got_lock: logger.debug("推荐生成锁未获取到(可能有其他实例在跑),跳过本轮") await asyncio.sleep(min(5, scan_interval)) continue recos = await recommender.generate_recommendations( min_signal_strength=min_signal_strength, max_recommendations=max_recommendations, add_to_cache=True, min_quality_score=min_quality_score, scan_cache_namespace=cache_namespace, ) logger.info(f"推荐生成完成: {len(recos)} 条(已写入Redis snapshot)") except Exception as e: logger.error(f"推荐生成循环异常: {e}", exc_info=True) await asyncio.sleep(max(5, scan_interval)) finally: if client: try: await client.disconnect() except Exception: pass if __name__ == "__main__": asyncio.run(main())