170 lines
6.5 KiB
Python
170 lines
6.5 KiB
Python
"""
|
||
推荐服务入口(独立进程)
|
||
|
||
目的:
|
||
- 推荐生成与自动交易解耦:推荐更高频、只做分析+写入Redis,不下单、不等待成交
|
||
- 未来支持“普通用户仅看推荐 / 加V用户自动交易”时,可独立扩容
|
||
"""
|
||
|
||
import asyncio
|
||
import logging
|
||
import os
|
||
import sys
|
||
from pathlib import Path
|
||
from datetime import datetime, timezone, timedelta
|
||
|
||
|
||
# 启动方式兼容:
|
||
# - python trading_system/recommendations_main.py(__package__ 为空,需从同目录导入)
|
||
# - python -m trading_system.recommendations_main(__package__='trading_system',必须用相对导入)
|
||
if __package__ in (None, ""):
|
||
from binance_client import BinanceClient
|
||
from market_scanner import MarketScanner
|
||
from risk_manager import RiskManager
|
||
from trade_recommender import TradeRecommender
|
||
import config
|
||
else:
|
||
from .binance_client import BinanceClient
|
||
from .market_scanner import MarketScanner
|
||
from .risk_manager import RiskManager
|
||
from .trade_recommender import TradeRecommender
|
||
from . import config
|
||
|
||
|
||
class BeijingTimeFormatter(logging.Formatter):
|
||
"""使用北京时间的日志格式化器"""
|
||
|
||
def formatTime(self, record, datefmt=None):
|
||
beijing_tz = timezone(timedelta(hours=8))
|
||
dt = datetime.fromtimestamp(record.created, tz=beijing_tz)
|
||
if datefmt:
|
||
return dt.strftime(datefmt)
|
||
return dt.strftime("%Y-%m-%d %H:%M:%S")
|
||
|
||
|
||
def _setup_logging():
|
||
# 单独的日志文件,避免与交易进程混在一起
|
||
log_file = os.getenv("RECOMMEND_LOG_FILE", "recommendations_bot.log")
|
||
if not Path(log_file).is_absolute():
|
||
project_root = Path(__file__).parent.parent
|
||
log_file = project_root / log_file
|
||
|
||
formatter = BeijingTimeFormatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
|
||
file_handler = logging.FileHandler(str(log_file), encoding="utf-8")
|
||
file_handler.setFormatter(formatter)
|
||
console_handler = logging.StreamHandler(sys.stdout)
|
||
console_handler.setFormatter(formatter)
|
||
|
||
logging.basicConfig(level=getattr(logging, config.LOG_LEVEL), handlers=[file_handler, console_handler])
|
||
|
||
# 追加:日志写入 Redis(用于前端日志监控)
|
||
try:
|
||
try:
|
||
from .redis_log_handler import RedisErrorLogHandler, RedisLogConfig
|
||
except Exception:
|
||
from redis_log_handler import RedisErrorLogHandler, RedisLogConfig
|
||
|
||
redis_cfg = RedisLogConfig(
|
||
redis_url=getattr(config, "REDIS_URL", "redis://localhost:6379"),
|
||
use_tls=bool(getattr(config, "REDIS_USE_TLS", False)),
|
||
ssl_cert_reqs=str(getattr(config, "REDIS_SSL_CERT_REQS", "required") or "required"),
|
||
ssl_ca_certs=getattr(config, "REDIS_SSL_CA_CERTS", None),
|
||
username=getattr(config, "REDIS_USERNAME", None),
|
||
password=getattr(config, "REDIS_PASSWORD", None),
|
||
service="recommendations",
|
||
)
|
||
redis_handler = RedisErrorLogHandler(redis_cfg)
|
||
redis_handler.setLevel(logging.INFO)
|
||
logging.getLogger().addHandler(redis_handler)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
async def _acquire_reco_lock(client: BinanceClient, ttl_sec: int) -> bool:
|
||
"""
|
||
防止多实例同时生成推荐(避免重复扫描/写Redis风暴)。
|
||
"""
|
||
rc = getattr(client, "redis_cache", None)
|
||
r = getattr(rc, "redis", None) if rc else None
|
||
if not r or not getattr(rc, "_connected", False):
|
||
return True # Redis不可用时无法分布式锁,直接放行
|
||
now_ms = int(__import__("time").time() * 1000)
|
||
try:
|
||
ok = await r.set("lock:recommendations:generator", str(now_ms), nx=True, ex=max(5, int(ttl_sec)))
|
||
return bool(ok)
|
||
except Exception:
|
||
return True
|
||
|
||
|
||
async def main():
|
||
_setup_logging()
|
||
|
||
logger.info("=" * 60)
|
||
logger.info("推荐服务启动(独立进程:只生成推荐,不自动交易)")
|
||
logger.info("=" * 60)
|
||
|
||
# 建议:推荐频率相对更高(默认60s),且不依赖交易进程的 SCAN_INTERVAL
|
||
scan_interval = int(os.getenv("RECOMMEND_SCAN_INTERVAL_SEC", "60") or "60")
|
||
min_signal_strength = int(os.getenv("RECOMMEND_MIN_SIGNAL_STRENGTH", "5") or "5")
|
||
max_recommendations = int(os.getenv("RECOMMEND_MAX_RECOMMENDATIONS", "60") or "60")
|
||
min_quality_score = float(os.getenv("RECOMMEND_MIN_QUALITY_SCORE", "0.0") or "0.0")
|
||
cache_namespace = os.getenv("RECOMMEND_SCAN_CACHE_NAMESPACE", "recommend") or "recommend"
|
||
|
||
logger.info(
|
||
f"推荐参数: interval={scan_interval}s, min_strength>={min_signal_strength}, "
|
||
f"max_recos={max_recommendations}, min_quality={min_quality_score}, cache_ns={cache_namespace}"
|
||
)
|
||
|
||
client: BinanceClient | None = None
|
||
try:
|
||
client = BinanceClient()
|
||
await client.connect()
|
||
|
||
scanner = MarketScanner(client)
|
||
risk_manager = RiskManager(client)
|
||
recommender = TradeRecommender(client, scanner, risk_manager)
|
||
|
||
while True:
|
||
try:
|
||
# 每轮都从Redis刷新配置(确保推荐侧也能动态生效)
|
||
try:
|
||
if config._config_manager:
|
||
config._config_manager.reload_from_redis()
|
||
config.TRADING_CONFIG = config._get_trading_config()
|
||
except Exception:
|
||
pass
|
||
|
||
# 分布式锁:避免多实例同时跑
|
||
got_lock = await _acquire_reco_lock(client, ttl_sec=max(10, scan_interval))
|
||
if not got_lock:
|
||
logger.debug("推荐生成锁未获取到(可能有其他实例在跑),跳过本轮")
|
||
await asyncio.sleep(min(5, scan_interval))
|
||
continue
|
||
|
||
recos = await recommender.generate_recommendations(
|
||
min_signal_strength=min_signal_strength,
|
||
max_recommendations=max_recommendations,
|
||
add_to_cache=True,
|
||
min_quality_score=min_quality_score,
|
||
scan_cache_namespace=cache_namespace,
|
||
)
|
||
logger.info(f"推荐生成完成: {len(recos)} 条(已写入Redis snapshot)")
|
||
except Exception as e:
|
||
logger.error(f"推荐生成循环异常: {e}", exc_info=True)
|
||
|
||
await asyncio.sleep(max(5, scan_interval))
|
||
finally:
|
||
if client:
|
||
try:
|
||
await client.disconnect()
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(main())
|
||
|