This commit is contained in:
薇薇安 2026-01-20 10:48:32 +08:00
parent d23ed2252c
commit 30cf5d539f
5 changed files with 210 additions and 11 deletions

View File

@ -768,7 +768,8 @@ async def generate_recommendations(
min_signal_strength=max(2, min_signal_strength - 3), # 降低3个等级以获取更多推荐最低2 min_signal_strength=max(2, min_signal_strength - 3), # 降低3个等级以获取更多推荐最低2
max_recommendations=max(max_recommendations, 50), # 至少生成50个推荐 max_recommendations=max(max_recommendations, 50), # 至少生成50个推荐
add_to_cache=True, # 添加到Redis缓存 add_to_cache=True, # 添加到Redis缓存
min_quality_score=0.0 # 不过滤,保留所有推荐 min_quality_score=0.0, # 不过滤,保留所有推荐
scan_cache_namespace="api", # 避免覆盖交易/推荐进程的扫描缓存
) )
return { return {

View File

@ -8,6 +8,7 @@
trading_system/ trading_system/
├── __init__.py ├── __init__.py
├── main.py # 主程序入口 ├── main.py # 主程序入口
├── recommendations_main.py # 推荐服务入口(独立进程:只生成推荐,不自动交易)
├── config.py # 配置文件 ├── config.py # 配置文件
├── binance_client.py # 币安客户端 ├── binance_client.py # 币安客户端
├── market_scanner.py # 市场扫描器 ├── market_scanner.py # 市场扫描器
@ -36,6 +37,24 @@ cd trading_system
python main.py python main.py
``` ```
### 推荐服务(独立进程)
推荐用于“普通用户查看”,建议与自动交易拆开启动(避免自动交易等待成交时拖慢推荐生成)。
```bash
# 从项目根目录运行
python -m trading_system.recommendations_main
```
常用环境变量(可选):
- `RECOMMEND_SCAN_INTERVAL_SEC`: 推荐生成间隔(秒),默认 60
- `RECOMMEND_MIN_SIGNAL_STRENGTH`: 推荐最小强度,默认 5
- `RECOMMEND_MAX_RECOMMENDATIONS`: 单次最大推荐数量,默认 60
- `RECOMMEND_MIN_QUALITY_SCORE`: 质量分过滤,默认 0.0
- `RECOMMEND_SCAN_CACHE_NAMESPACE`: 扫描缓存命名空间,默认 recommend
- `RECOMMEND_LOG_FILE`: 推荐服务日志文件,默认 recommendations_bot.log
### 方式2从项目根目录运行 ### 方式2从项目根目录运行
```bash ```bash

View File

@ -29,7 +29,7 @@ class MarketScanner:
self.client = client self.client = client
self.top_symbols: List[Dict] = [] self.top_symbols: List[Dict] = []
async def scan_market(self) -> List[Dict]: async def scan_market(self, cache_namespace: str = "trade", config_override: Optional[Dict] = None) -> List[Dict]:
""" """
扫描市场找出涨跌幅最大的前N个货币对 扫描市场找出涨跌幅最大的前N个货币对
优先从 Redis 缓存读取扫描结果如果缓存不可用或过期则重新扫描 优先从 Redis 缓存读取扫描结果如果缓存不可用或过期则重新扫描
@ -39,9 +39,15 @@ class MarketScanner:
""" """
import time import time
self._scan_start_time = time.time() self._scan_start_time = time.time()
# 允许“推荐进程”和“交易进程”使用不同的扫描参数/缓存命名空间,互不干扰
cfg = dict(config.TRADING_CONFIG or {})
if config_override and isinstance(config_override, dict):
cfg.update(config_override)
ns = (cache_namespace or "trade").strip() or "trade"
# 先查 Redis 缓存扫描结果缓存TTL: 30秒 # 先查 Redis 缓存扫描结果缓存TTL: 30秒
cache_key = "scan_result:top_symbols" cache_key = f"scan_result:top_symbols:{ns}"
cached = await self.client.redis_cache.get(cache_key) cached = await self.client.redis_cache.get(cache_key)
if cached: if cached:
logger.info(f"从Redis缓存获取扫描结果: {len(cached)} 个交易对") logger.info(f"从Redis缓存获取扫描结果: {len(cached)} 个交易对")
@ -57,7 +63,7 @@ class MarketScanner:
return [] return []
# 根据配置限制扫描的交易对数量 # 根据配置限制扫描的交易对数量
max_scan_symbols = config.TRADING_CONFIG.get('MAX_SCAN_SYMBOLS', 500) max_scan_symbols = cfg.get('MAX_SCAN_SYMBOLS', 500)
if max_scan_symbols > 0 and max_scan_symbols < len(all_symbols): if max_scan_symbols > 0 and max_scan_symbols < len(all_symbols):
symbols = all_symbols[:max_scan_symbols] symbols = all_symbols[:max_scan_symbols]
logger.info(f"限制扫描数量: {len(symbols)}/{len(all_symbols)} 个交易对(配置: MAX_SCAN_SYMBOLS={max_scan_symbols}") logger.info(f"限制扫描数量: {len(symbols)}/{len(all_symbols)} 个交易对(配置: MAX_SCAN_SYMBOLS={max_scan_symbols}")
@ -76,8 +82,8 @@ class MarketScanner:
if ticker: if ticker:
change_percent = abs(ticker.get('changePercent', 0)) change_percent = abs(ticker.get('changePercent', 0))
volume = ticker.get('volume', 0) volume = ticker.get('volume', 0)
if (change_percent >= config.TRADING_CONFIG['MIN_CHANGE_PERCENT'] and if (change_percent >= cfg.get('MIN_CHANGE_PERCENT', config.TRADING_CONFIG['MIN_CHANGE_PERCENT']) and
volume >= config.TRADING_CONFIG['MIN_VOLUME_24H']): volume >= cfg.get('MIN_VOLUME_24H', config.TRADING_CONFIG['MIN_VOLUME_24H'])):
pre_filtered_symbols.append(symbol) pre_filtered_symbols.append(symbol)
logger.info(f"初步筛选后,需要详细分析的交易对: {len(pre_filtered_symbols)}") logger.info(f"初步筛选后,需要详细分析的交易对: {len(pre_filtered_symbols)}")
@ -102,8 +108,8 @@ class MarketScanner:
# 过滤最小涨跌幅和成交量 # 过滤最小涨跌幅和成交量
filtered_results = [ filtered_results = [
r for r in valid_results r for r in valid_results
if abs(r['changePercent']) >= config.TRADING_CONFIG['MIN_CHANGE_PERCENT'] if abs(r['changePercent']) >= cfg.get('MIN_CHANGE_PERCENT', config.TRADING_CONFIG['MIN_CHANGE_PERCENT'])
and r.get('volume24h', 0) >= config.TRADING_CONFIG['MIN_VOLUME_24H'] and r.get('volume24h', 0) >= cfg.get('MIN_VOLUME_24H', config.TRADING_CONFIG['MIN_VOLUME_24H'])
] ]
# 按信号得分和涨跌幅综合排序取前N个 # 按信号得分和涨跌幅综合排序取前N个
@ -117,7 +123,7 @@ class MarketScanner:
reverse=True reverse=True
) )
top_n = sorted_results[:config.TRADING_CONFIG['TOP_N_SYMBOLS']] top_n = sorted_results[:cfg.get('TOP_N_SYMBOLS', config.TRADING_CONFIG['TOP_N_SYMBOLS'])]
self.top_symbols = top_n self.top_symbols = top_n

View File

@ -0,0 +1,167 @@
"""
推荐服务入口独立进程
目的
- 推荐生成与自动交易解耦推荐更高频只做分析+写入Redis不下单不等待成交
- 未来支持普通用户仅看推荐 / 加V用户自动交易可独立扩容
"""
import asyncio
import logging
import os
import sys
from pathlib import Path
from datetime import datetime, timezone, timedelta
# 支持直接运行和作为模块导入
if __name__ == "__main__":
from binance_client import BinanceClient
from market_scanner import MarketScanner
from risk_manager import RiskManager
from trade_recommender import TradeRecommender
import config
else:
from .binance_client import BinanceClient
from .market_scanner import MarketScanner
from .risk_manager import RiskManager
from .trade_recommender import TradeRecommender
from . import config
class BeijingTimeFormatter(logging.Formatter):
"""使用北京时间的日志格式化器"""
def formatTime(self, record, datefmt=None):
beijing_tz = timezone(timedelta(hours=8))
dt = datetime.fromtimestamp(record.created, tz=beijing_tz)
if datefmt:
return dt.strftime(datefmt)
return dt.strftime("%Y-%m-%d %H:%M:%S")
def _setup_logging():
# 单独的日志文件,避免与交易进程混在一起
log_file = os.getenv("RECOMMEND_LOG_FILE", "recommendations_bot.log")
if not Path(log_file).is_absolute():
project_root = Path(__file__).parent.parent
log_file = project_root / log_file
formatter = BeijingTimeFormatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
file_handler = logging.FileHandler(str(log_file), encoding="utf-8")
file_handler.setFormatter(formatter)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
logging.basicConfig(level=getattr(logging, config.LOG_LEVEL), handlers=[file_handler, console_handler])
# 追加:日志写入 Redis用于前端日志监控
try:
try:
from .redis_log_handler import RedisErrorLogHandler, RedisLogConfig
except Exception:
from redis_log_handler import RedisErrorLogHandler, RedisLogConfig
redis_cfg = RedisLogConfig(
redis_url=getattr(config, "REDIS_URL", "redis://localhost:6379"),
use_tls=bool(getattr(config, "REDIS_USE_TLS", False)),
ssl_cert_reqs=str(getattr(config, "REDIS_SSL_CERT_REQS", "required") or "required"),
ssl_ca_certs=getattr(config, "REDIS_SSL_CA_CERTS", None),
username=getattr(config, "REDIS_USERNAME", None),
password=getattr(config, "REDIS_PASSWORD", None),
service="recommendations",
)
redis_handler = RedisErrorLogHandler(redis_cfg)
redis_handler.setLevel(logging.INFO)
logging.getLogger().addHandler(redis_handler)
except Exception:
pass
logger = logging.getLogger(__name__)
async def _acquire_reco_lock(client: BinanceClient, ttl_sec: int) -> bool:
"""
防止多实例同时生成推荐避免重复扫描/写Redis风暴
"""
rc = getattr(client, "redis_cache", None)
r = getattr(rc, "redis", None) if rc else None
if not r or not getattr(rc, "_connected", False):
return True # Redis不可用时无法分布式锁直接放行
now_ms = int(__import__("time").time() * 1000)
try:
ok = await r.set("lock:recommendations:generator", str(now_ms), nx=True, ex=max(5, int(ttl_sec)))
return bool(ok)
except Exception:
return True
async def main():
_setup_logging()
logger.info("=" * 60)
logger.info("推荐服务启动(独立进程:只生成推荐,不自动交易)")
logger.info("=" * 60)
# 建议推荐频率相对更高默认60s且不依赖交易进程的 SCAN_INTERVAL
scan_interval = int(os.getenv("RECOMMEND_SCAN_INTERVAL_SEC", "60") or "60")
min_signal_strength = int(os.getenv("RECOMMEND_MIN_SIGNAL_STRENGTH", "5") or "5")
max_recommendations = int(os.getenv("RECOMMEND_MAX_RECOMMENDATIONS", "60") or "60")
min_quality_score = float(os.getenv("RECOMMEND_MIN_QUALITY_SCORE", "0.0") or "0.0")
cache_namespace = os.getenv("RECOMMEND_SCAN_CACHE_NAMESPACE", "recommend") or "recommend"
logger.info(
f"推荐参数: interval={scan_interval}s, min_strength>={min_signal_strength}, "
f"max_recos={max_recommendations}, min_quality={min_quality_score}, cache_ns={cache_namespace}"
)
client: BinanceClient | None = None
try:
client = BinanceClient()
await client.connect()
scanner = MarketScanner(client)
risk_manager = RiskManager(client)
recommender = TradeRecommender(client, scanner, risk_manager)
while True:
try:
# 每轮都从Redis刷新配置确保推荐侧也能动态生效
try:
if config._config_manager:
config._config_manager.reload_from_redis()
config.TRADING_CONFIG = config._get_trading_config()
except Exception:
pass
# 分布式锁:避免多实例同时跑
got_lock = await _acquire_reco_lock(client, ttl_sec=max(10, scan_interval))
if not got_lock:
logger.debug("推荐生成锁未获取到(可能有其他实例在跑),跳过本轮")
await asyncio.sleep(min(5, scan_interval))
continue
recos = await recommender.generate_recommendations(
min_signal_strength=min_signal_strength,
max_recommendations=max_recommendations,
add_to_cache=True,
min_quality_score=min_quality_score,
scan_cache_namespace=cache_namespace,
)
logger.info(f"推荐生成完成: {len(recos)}已写入Redis snapshot")
except Exception as e:
logger.error(f"推荐生成循环异常: {e}", exc_info=True)
await asyncio.sleep(max(5, scan_interval))
finally:
if client:
try:
await client.disconnect()
except Exception:
pass
if __name__ == "__main__":
asyncio.run(main())

View File

@ -72,7 +72,9 @@ class TradeRecommender:
min_signal_strength: int = 5, min_signal_strength: int = 5,
max_recommendations: int = 20, max_recommendations: int = 20,
add_to_cache: bool = True, add_to_cache: bool = True,
min_quality_score: float = 0.0 min_quality_score: float = 0.0,
scan_cache_namespace: str = "recommend",
scan_config_override: Optional[Dict] = None,
) -> List[Dict]: ) -> List[Dict]:
""" """
生成交易推荐支持增量添加到Redis缓存 生成交易推荐支持增量添加到Redis缓存
@ -112,7 +114,11 @@ class TradeRecommender:
logger.debug(f"从Redis读取现有推荐失败: {e}") logger.debug(f"从Redis读取现有推荐失败: {e}")
# 2. 扫描市场 # 2. 扫描市场
top_symbols = await self.scanner.scan_market() # 说明:推荐进程/交易进程可能同时运行,为避免扫描缓存互相覆盖,使用独立 cache_namespace
top_symbols = await self.scanner.scan_market(
cache_namespace=scan_cache_namespace or "recommend",
config_override=scan_config_override,
)
if not top_symbols: if not top_symbols:
logger.warning("未找到符合条件的交易对") logger.warning("未找到符合条件的交易对")
# 如果市场扫描失败,返回现有缓存推荐 # 如果市场扫描失败,返回现有缓存推荐