auto_trade_sys/trading_system/recommendations_main.py
薇薇安 f7c68efb3e a
2026-01-24 11:00:32 +08:00

194 lines
8.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
推荐服务入口(独立进程)
目的:
- 推荐生成与自动交易解耦:推荐更高频、只做分析+写入Redis不下单、不等待成交
- 未来支持“普通用户仅看推荐 / 加V用户自动交易”时可独立扩容
"""
import asyncio
import logging
import os
import sys
from pathlib import Path
from datetime import datetime, timezone, timedelta
# 启动方式兼容(更鲁棒):
# - supervisor 推荐python -m trading_system.recommendations_main相对导入
# - 手动调试python trading_system/recommendations_main.py同目录导入
try:
from .binance_client import BinanceClient # type: ignore
from .market_scanner import MarketScanner # type: ignore
from .risk_manager import RiskManager # type: ignore
from .trade_recommender import TradeRecommender # type: ignore
from . import config # type: ignore
except Exception:
_here = Path(__file__).resolve().parent
_root = _here.parent
if str(_here) not in sys.path:
sys.path.insert(0, str(_here))
if str(_root) not in sys.path:
sys.path.insert(0, str(_root))
from binance_client import BinanceClient # type: ignore
from market_scanner import MarketScanner # type: ignore
from risk_manager import RiskManager # type: ignore
from trade_recommender import TradeRecommender # type: ignore
import config # type: ignore
class BeijingTimeFormatter(logging.Formatter):
"""使用北京时间的日志格式化器"""
def formatTime(self, record, datefmt=None):
beijing_tz = timezone(timedelta(hours=8))
dt = datetime.fromtimestamp(record.created, tz=beijing_tz)
if datefmt:
return dt.strftime(datefmt)
return dt.strftime("%Y-%m-%d %H:%M:%S")
def _setup_logging():
# 单独的日志文件,避免与交易进程混在一起
log_file = os.getenv("RECOMMEND_LOG_FILE", "recommendations_bot.log")
if not Path(log_file).is_absolute():
project_root = Path(__file__).parent.parent
log_file = project_root / log_file
formatter = BeijingTimeFormatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
file_handler = logging.FileHandler(str(log_file), encoding="utf-8")
file_handler.setFormatter(formatter)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
logging.basicConfig(level=getattr(logging, config.LOG_LEVEL), handlers=[file_handler, console_handler])
# 追加:日志写入 Redis用于前端日志监控
try:
try:
from .redis_log_handler import RedisErrorLogHandler, RedisLogConfig
except Exception:
from redis_log_handler import RedisErrorLogHandler, RedisLogConfig
redis_cfg = RedisLogConfig(
redis_url=getattr(config, "REDIS_URL", "redis://localhost:6379"),
use_tls=bool(getattr(config, "REDIS_USE_TLS", False)),
ssl_cert_reqs=str(getattr(config, "REDIS_SSL_CERT_REQS", "required") or "required"),
ssl_ca_certs=getattr(config, "REDIS_SSL_CA_CERTS", None),
username=getattr(config, "REDIS_USERNAME", None),
password=getattr(config, "REDIS_PASSWORD", None),
service="recommendations",
)
redis_handler = RedisErrorLogHandler(redis_cfg)
redis_handler.setLevel(logging.INFO)
logging.getLogger().addHandler(redis_handler)
except Exception:
pass
logger = logging.getLogger(__name__)
async def _acquire_reco_lock(client: BinanceClient, ttl_sec: int) -> bool:
"""
防止多实例同时生成推荐(避免重复扫描/写Redis风暴
"""
rc = getattr(client, "redis_cache", None)
r = getattr(rc, "redis", None) if rc else None
if not r or not getattr(rc, "_connected", False):
return True # Redis不可用时无法分布式锁直接放行
now_ms = int(__import__("time").time() * 1000)
try:
ok = await r.set("lock:recommendations:generator", str(now_ms), nx=True, ex=max(5, int(ttl_sec)))
return bool(ok)
except Exception:
return True
async def main():
_setup_logging()
logger.info("=" * 60)
logger.info("推荐服务启动(独立进程:只生成推荐,不自动交易)")
logger.info("=" * 60)
# 建议推荐频率相对更高默认60s且不依赖交易进程的 SCAN_INTERVAL
scan_interval = int(os.getenv("RECOMMEND_SCAN_INTERVAL_SEC", "60") or "60")
min_signal_strength = int(os.getenv("RECOMMEND_MIN_SIGNAL_STRENGTH", "5") or "5")
max_recommendations = int(os.getenv("RECOMMEND_MAX_RECOMMENDATIONS", "60") or "60")
min_quality_score = float(os.getenv("RECOMMEND_MIN_QUALITY_SCORE", "0.0") or "0.0")
cache_namespace = os.getenv("RECOMMEND_SCAN_CACHE_NAMESPACE", "recommend") or "recommend"
logger.info(
f"推荐参数: interval={scan_interval}s, min_strength>={min_signal_strength}, "
f"max_recos={max_recommendations}, min_quality={min_quality_score}, cache_ns={cache_namespace}"
)
client: BinanceClient | None = None
try:
# 推荐服务不需要特定账户的 API 密钥,只需要获取行情数据
# 使用空 API 密钥(公开接口)或使用只读权限的密钥
# 重要:推荐服务不应该读取任何 account_id 的 API key即使环境变量设置了 ATS_ACCOUNT_ID
logger.info("推荐服务初始化(不依赖特定账户,只获取行情数据)")
logger.info("推荐服务明确使用空 API Key确保不会读取任何账户的密钥")
# 使用空字符串明确表示"只使用公开接口",不会被 config 覆盖
client = BinanceClient(api_key="", api_secret="") # 使用空密钥,只获取公开行情
# 验证:确保 API key 确实是空的
if client.api_key:
logger.error(f"❌ 推荐服务 API Key 非空!当前值: {client.api_key[:4]}...{client.api_key[-4:] if len(client.api_key) > 8 else ''}")
logger.error(" 这可能导致推荐服务使用错误的账户密钥,请检查 BinanceClient.__init__ 逻辑")
else:
logger.info("✓ 推荐服务 API Key 确认为空,将只使用公开接口")
await client.connect()
scanner = MarketScanner(client)
risk_manager = RiskManager(client)
recommender = TradeRecommender(client, scanner, risk_manager)
while True:
try:
# 每轮都从Redis刷新配置确保推荐侧也能动态生效
try:
if config._config_manager:
config._config_manager.reload_from_redis()
config.TRADING_CONFIG = config._get_trading_config()
except Exception:
pass
# 分布式锁:避免多实例同时跑
got_lock = await _acquire_reco_lock(client, ttl_sec=max(10, scan_interval))
if not got_lock:
logger.debug("推荐生成锁未获取到(可能有其他实例在跑),跳过本轮")
await asyncio.sleep(min(5, scan_interval))
continue
recos = await recommender.generate_recommendations(
min_signal_strength=min_signal_strength,
max_recommendations=max_recommendations,
add_to_cache=True,
min_quality_score=min_quality_score,
scan_cache_namespace=cache_namespace,
)
logger.info(f"推荐生成完成: {len(recos)}已写入Redis snapshot")
except Exception as e:
logger.error(f"推荐生成循环异常: {e}", exc_info=True)
await asyncio.sleep(max(5, scan_interval))
finally:
if client:
try:
await client.disconnect()
except Exception:
pass
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
except Exception as e:
logging.getLogger(__name__).error(f"推荐进程异常退出: {e}", exc_info=True)
raise