auto_trade_sys/trading_system/trade_recommender.py
薇薇安 f2d71d3390 a
2026-01-24 10:32:41 +08:00

870 lines
38 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
推荐交易对模块 - 生成交易推荐供手动参考
"""
import asyncio
import logging
import time
from typing import List, Dict, Optional
from datetime import datetime, timedelta, timezone
try:
from .binance_client import BinanceClient
from .market_scanner import MarketScanner
from .risk_manager import RiskManager
from . import config
except ImportError:
from binance_client import BinanceClient
from market_scanner import MarketScanner
from risk_manager import RiskManager
import config
logger = logging.getLogger(__name__)
# 北京时间UTC+8用于推荐时间戳展示避免无时区 isoformat 导致前端误判)
BEIJING_TZ = timezone(timedelta(hours=8))
# 尝试导入数据库模型
DB_AVAILABLE = False
TradeRecommendation = None
try:
import sys
from pathlib import Path
project_root = Path(__file__).parent.parent
backend_path = project_root / 'backend'
if backend_path.exists():
sys.path.insert(0, str(backend_path))
from database.models import TradeRecommendation
DB_AVAILABLE = True
logger.info("✓ 数据库模型导入成功,推荐记录将保存到数据库")
else:
logger.warning("⚠ backend目录不存在无法使用数据库功能")
DB_AVAILABLE = False
except ImportError as e:
logger.warning(f"⚠ 无法导入数据库模型: {e}")
DB_AVAILABLE = False
except Exception as e:
logger.warning(f"⚠ 数据库初始化失败: {e}")
DB_AVAILABLE = False
class TradeRecommender:
"""推荐交易对生成器"""
def __init__(
self,
client: BinanceClient,
scanner: MarketScanner,
risk_manager: RiskManager
):
"""
初始化推荐器
Args:
client: 币安客户端
scanner: 市场扫描器
risk_manager: 风险管理器
"""
self.client = client
self.scanner = scanner
self.risk_manager = risk_manager
async def generate_recommendations(
self,
min_signal_strength: int = 5,
max_recommendations: int = 20,
add_to_cache: bool = True,
min_quality_score: float = 0.0,
scan_cache_namespace: str = "recommend",
scan_config_override: Optional[Dict] = None,
) -> List[Dict]:
"""
生成交易推荐支持增量添加到Redis缓存
Args:
min_signal_strength: 最小信号强度默认5低于此强度的不推荐
max_recommendations: 最大推荐数量(单次生成)
add_to_cache: 是否添加到Redis缓存默认True
min_quality_score: 最小质量分数用于过滤默认0.0表示不过滤)
Returns:
推荐列表
"""
logger.info("开始生成交易推荐...")
# 推荐服务不需要获取特定账户余额,直接使用固定值用于计算推荐仓位
# 用户可以根据自己的实际余额调整实际下单量
account_balance = 1000 # 固定使用 1000 USDT 作为参考余额
logger.debug(f"推荐服务使用固定参考余额: {account_balance:.2f} USDT用于计算推荐仓位比例")
# 1. 从Redis读取现有推荐如果启用缓存
existing_recommendations = {}
if add_to_cache:
try:
# 从Redis Hash读取所有现有推荐
cache_key = "recommendations:realtime"
existing_data = await self.client.redis_cache.hgetall(cache_key)
for rec_key, rec_data in existing_data.items():
if isinstance(rec_data, dict):
existing_recommendations[rec_key] = rec_data
logger.info(f"从Redis读取到 {len(existing_recommendations)} 个现有推荐")
except Exception as e:
logger.debug(f"从Redis读取现有推荐失败: {e}")
# 2. 扫描市场
# 说明:推荐进程/交易进程可能同时运行,为避免扫描缓存互相覆盖,使用独立 cache_namespace
top_symbols = await self.scanner.scan_market(
cache_namespace=scan_cache_namespace or "recommend",
config_override=scan_config_override,
)
if not top_symbols:
logger.warning("未找到符合条件的交易对")
# 如果市场扫描失败,返回现有缓存推荐
if existing_recommendations:
return list(existing_recommendations.values())
return []
new_recommendations = []
# 3. 对每个交易对进行分析
for symbol_info in top_symbols:
symbol = symbol_info['symbol']
current_price = symbol_info['price']
# 4. 分析交易信号传入min_signal_strength参数
trade_signal = await self._analyze_trade_signal(symbol_info, min_signal_strength=min_signal_strength)
# 5. 如果信号强度足够,生成推荐
if trade_signal['should_trade'] and trade_signal['strength'] >= min_signal_strength:
recommendations_result = await self._create_recommendation(
symbol_info, trade_signal, account_balance
)
if recommendations_result:
# _create_recommendation 返回列表(限价单和市价单)
if isinstance(recommendations_result, list):
new_recommendations.extend(recommendations_result)
else:
new_recommendations.append(recommendations_result)
# 6. 合并新推荐和现有推荐
all_recommendations = {}
current_time = time.time()
max_age = 3600 * 2 # 推荐最大保留时间2小时
# 先添加现有推荐(过滤掉过期的)
for rec_key, rec_data in existing_recommendations.items():
rec_timestamp = rec_data.get('timestamp', 0)
# 如果推荐时间超过2小时跳过
if current_time - rec_timestamp > max_age:
continue
all_recommendations[rec_key] = rec_data
# 添加新推荐(如果质量足够)
for rec in new_recommendations:
# 生成推荐键symbol_orderType
rec_key = f"{rec['symbol']}_{rec.get('order_type', 'LIMIT')}"
# 计算质量分数(信号强度 + 胜率预估)
quality_score = rec.get('signal_strength', 0) * 10
if 'estimated_win_rate' in rec:
quality_score += rec['estimated_win_rate']
# 如果质量分数足够,才添加或更新
if quality_score >= min_quality_score:
# 如果已存在,比较质量分数,只保留更好的
if rec_key in all_recommendations:
existing_quality = all_recommendations[rec_key].get('signal_strength', 0) * 10
if 'estimated_win_rate' in all_recommendations[rec_key]:
existing_quality += all_recommendations[rec_key]['estimated_win_rate']
# 如果新推荐质量更好,更新
if quality_score > existing_quality:
all_recommendations[rec_key] = rec
logger.debug(f"更新推荐 {rec_key}: 质量分数 {existing_quality:.1f} -> {quality_score:.1f}")
else:
# 新推荐,直接添加
all_recommendations[rec_key] = rec
logger.debug(f"添加新推荐 {rec_key}: 质量分数 {quality_score:.1f}")
# 7. 按信号强度排序,保留质量最好的推荐
sorted_recommendations = sorted(
all_recommendations.values(),
key=lambda x: (x.get('signal_strength', 0), x.get('estimated_win_rate', 0)),
reverse=True
)
# 限制总数(保留质量最好的)
max_total = max_recommendations * 3 # 允许更多推荐以便参考
final_recommendations = sorted_recommendations[:max_total]
# 8. 保存到Redis缓存如果启用
if add_to_cache:
try:
cache_key = "recommendations:realtime"
snapshot_key = "recommendations:snapshot"
# 增量更新:只更新或添加新的推荐,保留其他推荐
updated_count = 0
for rec in final_recommendations:
rec_key = f"{rec['symbol']}_{rec.get('order_type', 'LIMIT')}"
# 使用hset更新或添加如果不存在则创建
await self.client.redis_cache.hset(cache_key, rec_key, rec, ttl=3600) # TTL 1小时
updated_count += 1
# 设置整个Hash的TTL1小时
if self.client.redis_cache.redis and self.client.redis_cache._connected:
try:
await self.client.redis_cache.redis.expire(cache_key, 3600)
except:
pass
# 写入“全量快照”(单 key一份数据给所有用户读
try:
now_ms = int(__import__("time").time() * 1000)
snapshot = {
"generated_at_ms": now_ms,
"generated_at": datetime.now().isoformat(),
"ttl_sec": 7200,
"count": len(final_recommendations),
"items": final_recommendations,
}
await self.client.redis_cache.set(snapshot_key, snapshot, ttl=7200) # 2小时有效
except Exception as e:
logger.warning(f"写入 recommendations:snapshot 失败(不影响返回): {e}")
logger.info(f"已更新 {updated_count} 个推荐到Redis缓存总计 {len(final_recommendations)} 个)")
except Exception as e:
logger.warning(f"保存推荐到Redis失败: {e}")
logger.info(f"生成了 {len(new_recommendations)} 个新推荐,总计 {len(final_recommendations)} 个推荐(包含缓存)")
return final_recommendations
async def _analyze_trade_signal(self, symbol_info: Dict, min_signal_strength: int = None) -> Dict:
"""
分析交易信号(复用策略模块的逻辑)
Args:
symbol_info: 交易对信息
Returns:
交易信号字典
"""
symbol = symbol_info['symbol']
current_price = symbol_info['price']
rsi = symbol_info.get('rsi')
macd = symbol_info.get('macd')
bollinger = symbol_info.get('bollinger')
market_regime = symbol_info.get('marketRegime', 'unknown')
ema20 = symbol_info.get('ema20')
ema50 = symbol_info.get('ema50')
ema20_4h = symbol_info.get('ema20_4h')
price_4h = symbol_info.get('price_4h', current_price)
# 判断4H周期趋势方向
trend_4h = None
if ema20_4h is not None:
if price_4h > ema20_4h:
trend_4h = 'up'
elif price_4h < ema20_4h:
trend_4h = 'down'
else:
trend_4h = 'neutral'
# 基础分数即使没有明确信号也给1分基础分推荐系统更宽松
signal_strength = 1
reasons = []
direction = None
# 策略1均值回归震荡市场
if market_regime == 'ranging':
if rsi and rsi < 30:
if trend_4h in ('up', 'neutral', None):
signal_strength += 4
reasons.append(f"RSI超卖({rsi:.1f})")
if direction is None:
direction = 'BUY'
elif rsi and rsi > 70:
if trend_4h in ('down', 'neutral', None):
signal_strength += 4
reasons.append(f"RSI超买({rsi:.1f})")
if direction is None:
direction = 'SELL'
if bollinger and current_price <= bollinger.get('lower'):
if trend_4h in ('up', 'neutral', None):
signal_strength += 3
reasons.append("触及布林带下轨")
if direction is None:
direction = 'BUY'
elif bollinger and current_price >= bollinger.get('upper'):
if trend_4h in ('down', 'neutral', None):
signal_strength += 3
reasons.append("触及布林带上轨")
if direction is None:
direction = 'SELL'
# 策略2趋势跟踪趋势市场
elif market_regime == 'trending':
if macd and macd.get('macd', 0) > macd.get('signal', 0) and macd.get('histogram', 0) > 0:
if trend_4h in ('up', 'neutral', None):
signal_strength += 3
reasons.append("MACD金叉")
if direction is None:
direction = 'BUY'
elif macd and macd.get('macd', 0) < macd.get('signal', 0) and macd.get('histogram', 0) < 0:
if trend_4h in ('down', 'neutral', None):
signal_strength += 3
reasons.append("MACD死叉")
if direction is None:
direction = 'SELL'
if ema20 and ema50:
if current_price > ema20 > ema50:
if trend_4h in ('up', 'neutral', None):
signal_strength += 2
reasons.append("价格在均线之上")
if direction is None:
direction = 'BUY'
elif current_price < ema20 < ema50:
if trend_4h in ('down', 'neutral', None):
signal_strength += 2
reasons.append("价格在均线之下")
if direction is None:
direction = 'SELL'
# 多周期共振加分
if direction and trend_4h:
if (direction == 'BUY' and trend_4h == 'up') or (direction == 'SELL' and trend_4h == 'down'):
signal_strength += 2
reasons.append("4H周期共振确认")
# 推荐系统默认不再输出“逆4H趋势”的推荐用户反馈方向不对的主要来源
# 如果未来需要放开,可做成配置项 ALLOW_COUNTER_TREND_RECOMMENDATIONS。
if direction and trend_4h and trend_4h in ("up", "down"):
if (direction == 'BUY' and trend_4h == 'down') or (direction == 'SELL' and trend_4h == 'up'):
reasons.append("❌ 逆4H趋势跳过推荐")
return {
'should_trade': False,
'direction': direction,
'reason': ', '.join(reasons) if reasons else '逆趋势',
'strength': max(0, signal_strength - 2),
'trend_4h': trend_4h
}
# 判断是否应该推荐使用传入的参数如果没有则使用config中的值
if min_signal_strength is None:
min_signal_strength = config.TRADING_CONFIG.get('MIN_SIGNAL_STRENGTH', 7)
should_trade = (direction is not None) and (signal_strength >= min_signal_strength)
return {
'should_trade': should_trade,
'direction': direction,
'reason': ', '.join(reasons) if reasons else '无明确信号',
'strength': signal_strength,
'trend_4h': trend_4h
}
async def _create_recommendation(
self,
symbol_info: Dict,
trade_signal: Dict,
account_balance: float = 1000
) -> Optional[List[Dict]]:
"""
创建推荐记录
Args:
symbol_info: 交易对信息
trade_signal: 交易信号
Returns:
推荐字典
"""
try:
symbol = symbol_info['symbol']
# 技术分析价K线收盘价用于指标/信号一致性
analysis_price = symbol_info.get('kline_close_price', symbol_info.get('price'))
try:
analysis_price = float(analysis_price) if analysis_price is not None else None
except Exception:
analysis_price = None
# 展示用“当前价”:优先用 ticker_price更贴近用户理解的“当前价格”
current_price = symbol_info.get('ticker_price', None)
try:
current_price = float(current_price) if current_price is not None else None
except Exception:
current_price = None
# 回退:如果拿不到 ticker_price就用分析价兜底
if current_price is None:
current_price = analysis_price
price_ts_ms = symbol_info.get("ticker_ts", None)
try:
price_ts_ms = int(price_ts_ms) if price_ts_ms is not None else None
except Exception:
price_ts_ms = None
direction = trade_signal['direction']
if not direction:
return None
# 先计算建议挂单价(限价单),然后用“挂单价”作为止损/止盈的基准入场价
# 否则会出现:止损按 current_price 算、但挂单按回调价算 → 止损/挂单价不匹配,甚至相等
limit_price_offset_pct = config.TRADING_CONFIG.get('LIMIT_ORDER_OFFSET_PCT', 0.5) # 默认0.5%
if direction == 'BUY':
suggested_limit_price = current_price * (1 - limit_price_offset_pct / 100)
else: # SELL
suggested_limit_price = current_price * (1 + limit_price_offset_pct / 100)
# 计算建议的止损止盈(基于保证金),以“计划入场价=挂单价”为基准
entry_price = suggested_limit_price
# 估算仓位数量和杠杆(用于计算止损止盈)
# 重要语义suggested_position_pct 表示“保证金占用比例”
account_balance = symbol_info.get('account_balance', 1000)
suggested_position_pct = symbol_info.get('suggested_position_pct', 0.05)
leverage = config.TRADING_CONFIG.get('LEVERAGE', 10)
# 估算保证金与名义价值
estimated_margin = account_balance * suggested_position_pct
estimated_notional = estimated_margin * leverage if leverage and leverage > 0 else estimated_margin
estimated_quantity = estimated_notional / entry_price if entry_price > 0 else 0
# 计算基于保证金的止损止盈
stop_loss_pct_margin = config.TRADING_CONFIG.get('STOP_LOSS_PERCENT', 0.08)
take_profit_pct_margin = config.TRADING_CONFIG.get('TAKE_PROFIT_PERCENT', 0.15)
stop_loss_price = self.risk_manager.get_stop_loss_price(
entry_price,
direction,
estimated_quantity,
leverage,
stop_loss_pct=stop_loss_pct_margin,
klines=symbol_info.get('klines'),
bollinger=symbol_info.get('bollinger'),
atr=symbol_info.get('atr')
)
# 计算止损百分比(相对于保证金,用于显示)
stop_loss_amount = estimated_margin * stop_loss_pct_margin
if direction == 'BUY':
stop_loss_pct = (entry_price - stop_loss_price) / entry_price
else:
stop_loss_pct = (stop_loss_price - entry_price) / entry_price
# 第一目标盈亏比1:1相对于保证金
take_profit_1_pct_margin = stop_loss_pct_margin * 1.0 # 1:1 盈亏比
take_profit_1 = self.risk_manager.get_take_profit_price(
entry_price, direction, estimated_quantity, leverage,
take_profit_pct=take_profit_1_pct_margin
)
# 第二目标止损的2.5倍(相对于保证金)
take_profit_2_pct_margin = stop_loss_pct_margin * 2.5
take_profit_2 = self.risk_manager.get_take_profit_price(
entry_price, direction, estimated_quantity, leverage,
take_profit_pct=take_profit_2_pct_margin
)
# 建议仓位(根据信号强度调整)
base_position_pct = config.TRADING_CONFIG.get('MAX_POSITION_PERCENT', 0.05)
signal_strength = trade_signal['strength']
# 信号强度越高建议仓位可以适当增加但不超过1.5倍)
position_multiplier = min(1.0 + (signal_strength - 5) * 0.1, 1.5)
suggested_position_pct = base_position_pct * position_multiplier
# 添加时间戳
timestamp = time.time()
# 用带时区的 ISO 字符串,前端可稳定转换/展示北京时间
recommendation_time = datetime.now(BEIJING_TZ).isoformat()
# 计算胜率预估(基于信号强度、市场状态等)
base_win_rate = 40 + (signal_strength * 3) # 信号强度0-10对应胜率40-70%
market_regime = symbol_info.get('marketRegime', 'ranging')
if market_regime == 'ranging':
base_win_rate += 5
elif market_regime == 'trending':
base_win_rate += 3
trend_4h = trade_signal.get('trend_4h')
if trend_4h:
if (direction == 'BUY' and trend_4h == 'up') or (direction == 'SELL' and trend_4h == 'down'):
base_win_rate += 3
elif (direction == 'BUY' and trend_4h == 'down') or (direction == 'SELL' and trend_4h == 'up'):
base_win_rate -= 2
estimated_win_rate = max(35, min(75, base_win_rate))
# 计算盈亏USDT评估基于保证金
margin = account_balance * suggested_position_pct
# 止损止盈金额(相对于保证金)
stop_loss_usdt = margin * stop_loss_pct_margin
take_profit_1_usdt = margin * take_profit_1_pct_margin
take_profit_2_usdt = margin * take_profit_2_pct_margin
expected_pnl_1 = (estimated_win_rate / 100) * take_profit_1_usdt - ((100 - estimated_win_rate) / 100) * stop_loss_usdt
expected_pnl_2 = (estimated_win_rate / 100) * take_profit_2_usdt - ((100 - estimated_win_rate) / 100) * stop_loss_usdt
# 分类推荐类型和风险等级
recommendation_category, risk_level, simple_reason, risk_warning, trading_tutorial = self._classify_recommendation(
market_regime, trend_4h, direction, trade_signal, signal_strength
)
# 计算预期持仓时间(基于策略周期和市场状态)
expected_hold_time = self._estimate_hold_time(market_regime, trend_4h, direction, signal_strength)
# 生成用户指南(人话版计划)
user_guide = self._generate_user_guide(
symbol, direction, suggested_limit_price, stop_loss_price, take_profit_1, take_profit_2,
simple_reason, expected_hold_time, risk_warning, recommendation_category, current_price
)
# 基础推荐数据
base_data = {
'symbol': symbol,
'direction': direction,
'current_price': current_price,
# 价格元信息:前端用于展示“这是不是实时的”
'current_price_source': 'ticker_24h' if symbol_info.get('ticker_price') is not None else 'kline_close',
'current_price_time_ms': price_ts_ms,
'change_percent': symbol_info.get('changePercent', 0),
'recommendation_reason': trade_signal['reason'],
'signal_strength': signal_strength,
'market_regime': market_regime,
'trend_4h': trend_4h,
# 额外保留分析价(用于排查“为什么信号这样算”)
'analysis_price': analysis_price,
'rsi': symbol_info.get('rsi'),
'macd_histogram': symbol_info.get('macd', {}).get('histogram') if symbol_info.get('macd') else None,
'bollinger_upper': symbol_info.get('bollinger', {}).get('upper') if symbol_info.get('bollinger') else None,
'bollinger_middle': symbol_info.get('bollinger', {}).get('middle') if symbol_info.get('bollinger') else None,
'bollinger_lower': symbol_info.get('bollinger', {}).get('lower') if symbol_info.get('bollinger') else None,
'ema20': symbol_info.get('ema20'),
'ema50': symbol_info.get('ema50'),
'ema20_4h': symbol_info.get('ema20_4h'),
'atr': symbol_info.get('atr'),
'suggested_stop_loss': stop_loss_price,
'suggested_take_profit_1': take_profit_1,
'suggested_take_profit_2': take_profit_2,
# 计划入场价(限价挂单价)作为止损/止盈计算基准
'planned_entry_price': entry_price,
'suggested_position_percent': suggested_position_pct,
'suggested_leverage': config.TRADING_CONFIG.get('LEVERAGE', 10),
'volume_24h': symbol_info.get('volume24h'),
'volatility': symbol_info.get('volatility'),
'estimated_win_rate': round(estimated_win_rate, 1),
'expected_pnl_1': round(expected_pnl_1, 2),
'expected_pnl_2': round(expected_pnl_2, 2),
'stop_loss_usdt': round(stop_loss_usdt, 2),
'take_profit_1_usdt': round(take_profit_1_usdt, 2),
'take_profit_2_usdt': round(take_profit_2_usdt, 2),
'recommendation_time': recommendation_time,
'timestamp': timestamp,
# 新增字段:用户指南和分类
'user_guide': user_guide,
'recommendation_category': recommendation_category,
'risk_level': risk_level,
'expected_hold_time': expected_hold_time,
'trading_tutorial': trading_tutorial,
'max_hold_days': 3 # 最大持仓天数(超过此天数建议平仓)
}
# 限价单推荐(唯一推荐类型)
limit_recommendation = base_data.copy()
limit_recommendation.update({
'order_type': 'LIMIT',
'suggested_limit_price': suggested_limit_price
})
logger.debug(
f"✓ 生成推荐: {symbol} {direction} "
f"(信号强度: {signal_strength}/10, 胜率预估: {estimated_win_rate:.1f}%, "
f"分类: {recommendation_category}, 风险: {risk_level})"
)
recs = [limit_recommendation]
# 将“单条推荐”也写入 Redis统一推荐缓存来源避免只在手动 generate 时才更新)
await self._update_realtime_cache(recs)
return recs
except Exception as e:
logger.error(f"创建推荐失败 {symbol_info.get('symbol', 'unknown')}: {e}", exc_info=True)
return None
async def _update_realtime_cache(self, recs: List[Dict]) -> None:
"""
将推荐写入 Redis
- Hash: recommendations:realtime增量更新
- Snapshot: recommendations:snapshot全量快照给前端统一读取
"""
if not recs:
return
try:
rc = getattr(self.client, "redis_cache", None)
if not rc or not getattr(rc, "redis", None) or not getattr(rc, "_connected", False):
return
cache_key = "recommendations:realtime"
snapshot_key = "recommendations:snapshot"
lock_key = "lock:recommendations:snapshot:update"
# 1) 增量写 Hash每条推荐一个 field
for rec in recs:
if not isinstance(rec, dict):
continue
sym = rec.get("symbol")
if not sym:
continue
rec_key = f"{sym}_{rec.get('order_type', 'LIMIT')}"
await rc.hset(cache_key, rec_key, rec, ttl=3600)
try:
await rc.redis.expire(cache_key, 3600)
except Exception:
pass
# 2) 合并写 Snapshot单 key给所有用户读
now_ms = int(__import__("time").time() * 1000)
try:
got_lock = await rc.redis.set(lock_key, str(now_ms), nx=True, ex=2)
except Exception:
got_lock = False
if not got_lock:
return
existing = None
try:
existing = await rc.get(snapshot_key)
except Exception:
existing = None
# 从 snapshot 取旧 items如果没有就从 Hash 兜底(避免首次写入为空)
items: List[Dict] = []
if isinstance(existing, dict) and isinstance(existing.get("items"), list):
items = [x for x in existing.get("items", []) if isinstance(x, dict)]
# 合并:按 rec_key 覆盖(同 symbol + order_type
merged: Dict[str, Dict] = {}
for x in items:
sym = x.get("symbol")
if not sym:
continue
k = f"{sym}_{x.get('order_type', 'LIMIT')}"
merged[k] = x
for x in recs:
if not isinstance(x, dict):
continue
sym = x.get("symbol")
if not sym:
continue
k = f"{sym}_{x.get('order_type', 'LIMIT')}"
merged[k] = x
# 过滤过期2小时
max_age_sec = 3600 * 2
now_s = __import__("time").time()
filtered: List[Dict] = []
for x in merged.values():
ts = x.get("timestamp", 0)
try:
ts = float(ts) if ts is not None else 0.0
except Exception:
ts = 0.0
if ts and (now_s - ts) > max_age_sec:
continue
filtered.append(x)
filtered.sort(key=lambda x: x.get("timestamp", 0) or 0, reverse=True)
# 防止快照无限长(前端默认 limit=50这里留一点冗余
filtered = filtered[:200]
snapshot = {
"generated_at_ms": now_ms,
"generated_at": datetime.now().isoformat(),
"ttl_sec": 7200,
"count": len(filtered),
"items": filtered,
}
await rc.set(snapshot_key, snapshot, ttl=7200)
except Exception as e:
logger.debug(f"更新推荐 Redis 缓存失败(可忽略): {e}")
def _classify_recommendation(
self,
market_regime: str,
trend_4h: Optional[str],
direction: str,
trade_signal: Dict,
signal_strength: int
) -> tuple:
"""
分类推荐类型和风险等级
Returns:
(recommendation_category, risk_level, simple_reason, risk_warning, trading_tutorial)
"""
# 判断是否为逆趋势交易
is_counter_trend = False
if trend_4h:
if (direction == 'BUY' and trend_4h == 'down') or (direction == 'SELL' and trend_4h == 'up'):
is_counter_trend = True
# 分类推荐类型
if market_regime == 'trending':
if not is_counter_trend:
# 顺趋势突破
category = "顺趋势突破"
risk_level = "中等"
simple_reason = "趋势明确,顺势而为,胜率较高"
risk_warning = "趋势可能反转,注意止损保护"
tutorial = "此类交易适合跟随趋势,持仓时间可稍长,但需关注趋势是否持续"
else:
# 逆趋势反弹(高风险)
category = "逆势反弹(高风险)"
risk_level = ""
simple_reason = "逆趋势操作,风险较高,适合快进快出"
risk_warning = "⚠️ 高风险:逆趋势交易胜率较低,务必严格止损,快进快出"
tutorial = "此类交易胜率较低,仅适合短线快进快出,务必严格止损。建议持仓不超过数小时,一旦触及止损立即离场"
elif market_regime == 'ranging':
# 震荡区间操作
category = "震荡区间操作"
risk_level = "中等"
simple_reason = "市场震荡,在区间边界操作,适合高抛低吸"
risk_warning = "震荡可能转为趋势,注意突破风险"
tutorial = "此类交易适合在震荡区间边界操作预期持仓时间较短数小时至1天到达目标及时止盈"
else:
# 未知市场状态
category = "市场状态不明"
risk_level = "中高"
simple_reason = "市场状态不明确,谨慎操作"
risk_warning = "市场状态不明,建议降低仓位或等待更明确信号"
tutorial = "市场状态不明确,建议谨慎操作,降低仓位,严格止损"
# 根据信号强度调整风险等级
if signal_strength >= 7:
if risk_level == "":
risk_level = "中高"
elif risk_level == "中等":
risk_level = "低中"
elif signal_strength < 5:
if risk_level == "中等":
risk_level = "中高"
elif risk_level == "低中":
risk_level = "中等"
return category, risk_level, simple_reason, risk_warning, tutorial
def _estimate_hold_time(
self,
market_regime: str,
trend_4h: Optional[str],
direction: str,
signal_strength: int
) -> str:
"""
估算预期持仓时间
Returns:
预期持仓时间的文字描述
"""
# 基于策略周期1H/15min和市场状态估算
if market_regime == 'trending':
# 趋势市场,持仓时间可能较长
if signal_strength >= 7:
return "预期持仓数小时至2天趋势明确可适当延长持仓"
else:
return "预期持仓数小时至1天趋势较弱及时止盈"
elif market_regime == 'ranging':
# 震荡市场,持仓时间较短
return "预期持仓:数小时至半天(震荡市场,快进快出)"
else:
# 未知状态,保守估计
return "预期持仓:数小时(市场状态不明,谨慎持仓)"
def _generate_user_guide(
self,
symbol: str,
direction: str,
limit_price: float,
stop_loss: float,
tp1: float,
tp2: float,
simple_reason: str,
expected_hold_time: str,
risk_warning: str,
category: str,
current_price: float
) -> str:
"""
生成用户指南(人话版计划)
Args:
symbol: 交易对
direction: 方向
limit_price: 建议入场价(限价单)
stop_loss: 止损价
tp1: 第一目标止盈价
tp2: 第二目标止盈价
simple_reason: 简单原因
expected_hold_time: 预期持仓时间
risk_warning: 风险警告
category: 推荐分类
current_price: 当前价格(用于计算反向波动)
Returns:
用户指南文本
"""
direction_cn = "买入" if direction == 'BUY' else "卖出"
direction_action = "挂单买入" if direction == 'BUY' else "挂单卖出"
# 计算反向波动阈值2%
if direction == 'BUY':
# 买单如果价格下跌超过2%,建议取消
reverse_threshold = current_price * 0.98
reverse_direction = "下跌"
else:
# 卖单如果价格上涨超过2%,建议取消
reverse_threshold = current_price * 1.02
reverse_direction = "上涨"
user_guide = f"""【操作计划】{direction_cn} {symbol}
【推荐类型】{category}
【核心理由】{simple_reason}
【明确的入场价】
建议在 {limit_price:.4f} USDT 附近{direction_action}
【具体点位】
• 建议挂单价: {limit_price:.4f} USDT
• 止损价: {stop_loss:.4f} USDT
• 第一目标: {tp1:.4f} USDT盈亏比1:1
• 第二目标: {tp2:.4f} USDT盈亏比2.5:1
【持仓周期】{expected_hold_time}
【退出条件】
• 触及止损:立即平仓
• 触及第一目标:可部分止盈或全部止盈
• 触及第二目标:建议全部止盈
• 持仓超过3天未触及第一目标建议平仓离场重新评估
【订单失效条件】
此限价单建议当日有效。若价格未触及挂单价,但价格直接{reverse_direction}超过2%{reverse_threshold:.4f} USDT则建议取消订单等待新信号。
【关键提醒】{risk_warning}
【给主动交易者的提示】
如果您确信趋势已启动,也可考虑以市价单立即入场,但需承受更高滑点成本,且务必设置好止损。"""
return user_guide
async def get_active_recommendations(self) -> List[Dict]:
"""获取当前有效的推荐"""
if DB_AVAILABLE and TradeRecommendation:
return TradeRecommendation.get_active()
return []
async def mark_recommendation_executed(self, recommendation_id: int, trade_id: int = None):
"""标记推荐已执行"""
if DB_AVAILABLE and TradeRecommendation:
TradeRecommendation.mark_executed(recommendation_id, trade_id)
logger.info(f"推荐 {recommendation_id} 已标记为已执行")