auto_trade_sys/trading_system/recommendations_main.py
薇薇安 414607d566 a
2026-01-21 22:38:24 +08:00

182 lines
7.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
推荐服务入口(独立进程)
目的:
- 推荐生成与自动交易解耦:推荐更高频、只做分析+写入Redis不下单、不等待成交
- 未来支持“普通用户仅看推荐 / 加V用户自动交易”时可独立扩容
"""
import asyncio
import logging
import os
import sys
from pathlib import Path
from datetime import datetime, timezone, timedelta
# 启动方式兼容(更鲁棒):
# - supervisor 推荐python -m trading_system.recommendations_main相对导入
# - 手动调试python trading_system/recommendations_main.py同目录导入
try:
from .binance_client import BinanceClient # type: ignore
from .market_scanner import MarketScanner # type: ignore
from .risk_manager import RiskManager # type: ignore
from .trade_recommender import TradeRecommender # type: ignore
from . import config # type: ignore
except Exception:
_here = Path(__file__).resolve().parent
_root = _here.parent
if str(_here) not in sys.path:
sys.path.insert(0, str(_here))
if str(_root) not in sys.path:
sys.path.insert(0, str(_root))
from binance_client import BinanceClient # type: ignore
from market_scanner import MarketScanner # type: ignore
from risk_manager import RiskManager # type: ignore
from trade_recommender import TradeRecommender # type: ignore
import config # type: ignore
class BeijingTimeFormatter(logging.Formatter):
"""使用北京时间的日志格式化器"""
def formatTime(self, record, datefmt=None):
beijing_tz = timezone(timedelta(hours=8))
dt = datetime.fromtimestamp(record.created, tz=beijing_tz)
if datefmt:
return dt.strftime(datefmt)
return dt.strftime("%Y-%m-%d %H:%M:%S")
def _setup_logging():
# 单独的日志文件,避免与交易进程混在一起
log_file = os.getenv("RECOMMEND_LOG_FILE", "recommendations_bot.log")
if not Path(log_file).is_absolute():
project_root = Path(__file__).parent.parent
log_file = project_root / log_file
formatter = BeijingTimeFormatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
file_handler = logging.FileHandler(str(log_file), encoding="utf-8")
file_handler.setFormatter(formatter)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
logging.basicConfig(level=getattr(logging, config.LOG_LEVEL), handlers=[file_handler, console_handler])
# 追加:日志写入 Redis用于前端日志监控
try:
try:
from .redis_log_handler import RedisErrorLogHandler, RedisLogConfig
except Exception:
from redis_log_handler import RedisErrorLogHandler, RedisLogConfig
redis_cfg = RedisLogConfig(
redis_url=getattr(config, "REDIS_URL", "redis://localhost:6379"),
use_tls=bool(getattr(config, "REDIS_USE_TLS", False)),
ssl_cert_reqs=str(getattr(config, "REDIS_SSL_CERT_REQS", "required") or "required"),
ssl_ca_certs=getattr(config, "REDIS_SSL_CA_CERTS", None),
username=getattr(config, "REDIS_USERNAME", None),
password=getattr(config, "REDIS_PASSWORD", None),
service="recommendations",
)
redis_handler = RedisErrorLogHandler(redis_cfg)
redis_handler.setLevel(logging.INFO)
logging.getLogger().addHandler(redis_handler)
except Exception:
pass
logger = logging.getLogger(__name__)
async def _acquire_reco_lock(client: BinanceClient, ttl_sec: int) -> bool:
"""
防止多实例同时生成推荐(避免重复扫描/写Redis风暴
"""
rc = getattr(client, "redis_cache", None)
r = getattr(rc, "redis", None) if rc else None
if not r or not getattr(rc, "_connected", False):
return True # Redis不可用时无法分布式锁直接放行
now_ms = int(__import__("time").time() * 1000)
try:
ok = await r.set("lock:recommendations:generator", str(now_ms), nx=True, ex=max(5, int(ttl_sec)))
return bool(ok)
except Exception:
return True
async def main():
_setup_logging()
logger.info("=" * 60)
logger.info("推荐服务启动(独立进程:只生成推荐,不自动交易)")
logger.info("=" * 60)
# 建议推荐频率相对更高默认60s且不依赖交易进程的 SCAN_INTERVAL
scan_interval = int(os.getenv("RECOMMEND_SCAN_INTERVAL_SEC", "60") or "60")
min_signal_strength = int(os.getenv("RECOMMEND_MIN_SIGNAL_STRENGTH", "5") or "5")
max_recommendations = int(os.getenv("RECOMMEND_MAX_RECOMMENDATIONS", "60") or "60")
min_quality_score = float(os.getenv("RECOMMEND_MIN_QUALITY_SCORE", "0.0") or "0.0")
cache_namespace = os.getenv("RECOMMEND_SCAN_CACHE_NAMESPACE", "recommend") or "recommend"
logger.info(
f"推荐参数: interval={scan_interval}s, min_strength>={min_signal_strength}, "
f"max_recos={max_recommendations}, min_quality={min_quality_score}, cache_ns={cache_namespace}"
)
client: BinanceClient | None = None
try:
client = BinanceClient()
await client.connect()
scanner = MarketScanner(client)
risk_manager = RiskManager(client)
recommender = TradeRecommender(client, scanner, risk_manager)
while True:
try:
# 每轮都从Redis刷新配置确保推荐侧也能动态生效
try:
if config._config_manager:
config._config_manager.reload_from_redis()
config.TRADING_CONFIG = config._get_trading_config()
except Exception:
pass
# 分布式锁:避免多实例同时跑
got_lock = await _acquire_reco_lock(client, ttl_sec=max(10, scan_interval))
if not got_lock:
logger.debug("推荐生成锁未获取到(可能有其他实例在跑),跳过本轮")
await asyncio.sleep(min(5, scan_interval))
continue
recos = await recommender.generate_recommendations(
min_signal_strength=min_signal_strength,
max_recommendations=max_recommendations,
add_to_cache=True,
min_quality_score=min_quality_score,
scan_cache_namespace=cache_namespace,
)
logger.info(f"推荐生成完成: {len(recos)}已写入Redis snapshot")
except Exception as e:
logger.error(f"推荐生成循环异常: {e}", exc_info=True)
await asyncio.sleep(max(5, scan_interval))
finally:
if client:
try:
await client.disconnect()
except Exception:
pass
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
except Exception as e:
logging.getLogger(__name__).error(f"推荐进程异常退出: {e}", exc_info=True)
raise