864 lines
38 KiB
Python
864 lines
38 KiB
Python
"""
|
||
推荐交易对模块 - 生成交易推荐供手动参考
|
||
"""
|
||
import asyncio
|
||
import logging
|
||
import time
|
||
from typing import List, Dict, Optional
|
||
from datetime import datetime, timedelta
|
||
try:
|
||
from .binance_client import BinanceClient
|
||
from .market_scanner import MarketScanner
|
||
from .risk_manager import RiskManager
|
||
from . import config
|
||
except ImportError:
|
||
from binance_client import BinanceClient
|
||
from market_scanner import MarketScanner
|
||
from risk_manager import RiskManager
|
||
import config
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# 尝试导入数据库模型
|
||
DB_AVAILABLE = False
|
||
TradeRecommendation = None
|
||
try:
|
||
import sys
|
||
from pathlib import Path
|
||
project_root = Path(__file__).parent.parent
|
||
backend_path = project_root / 'backend'
|
||
if backend_path.exists():
|
||
sys.path.insert(0, str(backend_path))
|
||
from database.models import TradeRecommendation
|
||
DB_AVAILABLE = True
|
||
logger.info("✓ 数据库模型导入成功,推荐记录将保存到数据库")
|
||
else:
|
||
logger.warning("⚠ backend目录不存在,无法使用数据库功能")
|
||
DB_AVAILABLE = False
|
||
except ImportError as e:
|
||
logger.warning(f"⚠ 无法导入数据库模型: {e}")
|
||
DB_AVAILABLE = False
|
||
except Exception as e:
|
||
logger.warning(f"⚠ 数据库初始化失败: {e}")
|
||
DB_AVAILABLE = False
|
||
|
||
|
||
class TradeRecommender:
|
||
"""推荐交易对生成器"""
|
||
|
||
def __init__(
|
||
self,
|
||
client: BinanceClient,
|
||
scanner: MarketScanner,
|
||
risk_manager: RiskManager
|
||
):
|
||
"""
|
||
初始化推荐器
|
||
|
||
Args:
|
||
client: 币安客户端
|
||
scanner: 市场扫描器
|
||
risk_manager: 风险管理器
|
||
"""
|
||
self.client = client
|
||
self.scanner = scanner
|
||
self.risk_manager = risk_manager
|
||
|
||
async def generate_recommendations(
|
||
self,
|
||
min_signal_strength: int = 5,
|
||
max_recommendations: int = 20,
|
||
add_to_cache: bool = True,
|
||
min_quality_score: float = 0.0
|
||
) -> List[Dict]:
|
||
"""
|
||
生成交易推荐(支持增量添加到Redis缓存)
|
||
|
||
Args:
|
||
min_signal_strength: 最小信号强度(默认5,低于此强度的不推荐)
|
||
max_recommendations: 最大推荐数量(单次生成)
|
||
add_to_cache: 是否添加到Redis缓存(默认True)
|
||
min_quality_score: 最小质量分数(用于过滤,默认0.0表示不过滤)
|
||
|
||
Returns:
|
||
推荐列表
|
||
"""
|
||
logger.info("开始生成交易推荐...")
|
||
|
||
# 获取账户余额(用于计算盈亏评估)
|
||
try:
|
||
balance = await self.client.get_account_balance()
|
||
account_balance = balance.get('total', 1000)
|
||
logger.debug(f"账户余额: {account_balance:.2f} USDT")
|
||
except Exception as e:
|
||
account_balance = 1000
|
||
logger.warning(f"获取账户余额失败,使用默认值1000 USDT: {e}")
|
||
|
||
# 1. 从Redis读取现有推荐(如果启用缓存)
|
||
existing_recommendations = {}
|
||
if add_to_cache:
|
||
try:
|
||
# 从Redis Hash读取所有现有推荐
|
||
cache_key = "recommendations:realtime"
|
||
existing_data = await self.client.redis_cache.hgetall(cache_key)
|
||
for rec_key, rec_data in existing_data.items():
|
||
if isinstance(rec_data, dict):
|
||
existing_recommendations[rec_key] = rec_data
|
||
logger.info(f"从Redis读取到 {len(existing_recommendations)} 个现有推荐")
|
||
except Exception as e:
|
||
logger.debug(f"从Redis读取现有推荐失败: {e}")
|
||
|
||
# 2. 扫描市场
|
||
top_symbols = await self.scanner.scan_market()
|
||
if not top_symbols:
|
||
logger.warning("未找到符合条件的交易对")
|
||
# 如果市场扫描失败,返回现有缓存推荐
|
||
if existing_recommendations:
|
||
return list(existing_recommendations.values())
|
||
return []
|
||
|
||
new_recommendations = []
|
||
|
||
# 3. 对每个交易对进行分析
|
||
for symbol_info in top_symbols:
|
||
symbol = symbol_info['symbol']
|
||
current_price = symbol_info['price']
|
||
|
||
# 4. 分析交易信号(传入min_signal_strength参数)
|
||
trade_signal = await self._analyze_trade_signal(symbol_info, min_signal_strength=min_signal_strength)
|
||
|
||
# 5. 如果信号强度足够,生成推荐
|
||
if trade_signal['should_trade'] and trade_signal['strength'] >= min_signal_strength:
|
||
recommendations_result = await self._create_recommendation(
|
||
symbol_info, trade_signal, account_balance
|
||
)
|
||
if recommendations_result:
|
||
# _create_recommendation 返回列表(限价单和市价单)
|
||
if isinstance(recommendations_result, list):
|
||
new_recommendations.extend(recommendations_result)
|
||
else:
|
||
new_recommendations.append(recommendations_result)
|
||
|
||
# 6. 合并新推荐和现有推荐
|
||
all_recommendations = {}
|
||
current_time = time.time()
|
||
max_age = 3600 * 2 # 推荐最大保留时间:2小时
|
||
|
||
# 先添加现有推荐(过滤掉过期的)
|
||
for rec_key, rec_data in existing_recommendations.items():
|
||
rec_timestamp = rec_data.get('timestamp', 0)
|
||
# 如果推荐时间超过2小时,跳过
|
||
if current_time - rec_timestamp > max_age:
|
||
continue
|
||
all_recommendations[rec_key] = rec_data
|
||
|
||
# 添加新推荐(如果质量足够)
|
||
for rec in new_recommendations:
|
||
# 生成推荐键:symbol_orderType
|
||
rec_key = f"{rec['symbol']}_{rec.get('order_type', 'LIMIT')}"
|
||
|
||
# 计算质量分数(信号强度 + 胜率预估)
|
||
quality_score = rec.get('signal_strength', 0) * 10
|
||
if 'estimated_win_rate' in rec:
|
||
quality_score += rec['estimated_win_rate']
|
||
|
||
# 如果质量分数足够,才添加或更新
|
||
if quality_score >= min_quality_score:
|
||
# 如果已存在,比较质量分数,只保留更好的
|
||
if rec_key in all_recommendations:
|
||
existing_quality = all_recommendations[rec_key].get('signal_strength', 0) * 10
|
||
if 'estimated_win_rate' in all_recommendations[rec_key]:
|
||
existing_quality += all_recommendations[rec_key]['estimated_win_rate']
|
||
|
||
# 如果新推荐质量更好,更新
|
||
if quality_score > existing_quality:
|
||
all_recommendations[rec_key] = rec
|
||
logger.debug(f"更新推荐 {rec_key}: 质量分数 {existing_quality:.1f} -> {quality_score:.1f}")
|
||
else:
|
||
# 新推荐,直接添加
|
||
all_recommendations[rec_key] = rec
|
||
logger.debug(f"添加新推荐 {rec_key}: 质量分数 {quality_score:.1f}")
|
||
|
||
# 7. 按信号强度排序,保留质量最好的推荐
|
||
sorted_recommendations = sorted(
|
||
all_recommendations.values(),
|
||
key=lambda x: (x.get('signal_strength', 0), x.get('estimated_win_rate', 0)),
|
||
reverse=True
|
||
)
|
||
|
||
# 限制总数(保留质量最好的)
|
||
max_total = max_recommendations * 3 # 允许更多推荐以便参考
|
||
final_recommendations = sorted_recommendations[:max_total]
|
||
|
||
# 8. 保存到Redis缓存(如果启用)
|
||
if add_to_cache:
|
||
try:
|
||
cache_key = "recommendations:realtime"
|
||
snapshot_key = "recommendations:snapshot"
|
||
|
||
# 增量更新:只更新或添加新的推荐,保留其他推荐
|
||
updated_count = 0
|
||
for rec in final_recommendations:
|
||
rec_key = f"{rec['symbol']}_{rec.get('order_type', 'LIMIT')}"
|
||
# 使用hset更新或添加(如果不存在则创建)
|
||
await self.client.redis_cache.hset(cache_key, rec_key, rec, ttl=3600) # TTL 1小时
|
||
updated_count += 1
|
||
|
||
# 设置整个Hash的TTL(1小时)
|
||
if self.client.redis_cache.redis and self.client.redis_cache._connected:
|
||
try:
|
||
await self.client.redis_cache.redis.expire(cache_key, 3600)
|
||
except:
|
||
pass
|
||
|
||
# 写入“全量快照”(单 key,一份数据给所有用户读)
|
||
try:
|
||
now_ms = int(__import__("time").time() * 1000)
|
||
snapshot = {
|
||
"generated_at_ms": now_ms,
|
||
"generated_at": datetime.now().isoformat(),
|
||
"ttl_sec": 7200,
|
||
"count": len(final_recommendations),
|
||
"items": final_recommendations,
|
||
}
|
||
await self.client.redis_cache.set(snapshot_key, snapshot, ttl=7200) # 2小时有效
|
||
except Exception as e:
|
||
logger.warning(f"写入 recommendations:snapshot 失败(不影响返回): {e}")
|
||
|
||
logger.info(f"已更新 {updated_count} 个推荐到Redis缓存(总计 {len(final_recommendations)} 个)")
|
||
except Exception as e:
|
||
logger.warning(f"保存推荐到Redis失败: {e}")
|
||
|
||
logger.info(f"生成了 {len(new_recommendations)} 个新推荐,总计 {len(final_recommendations)} 个推荐(包含缓存)")
|
||
return final_recommendations
|
||
|
||
async def _analyze_trade_signal(self, symbol_info: Dict, min_signal_strength: int = None) -> Dict:
|
||
"""
|
||
分析交易信号(复用策略模块的逻辑)
|
||
|
||
Args:
|
||
symbol_info: 交易对信息
|
||
|
||
Returns:
|
||
交易信号字典
|
||
"""
|
||
symbol = symbol_info['symbol']
|
||
current_price = symbol_info['price']
|
||
rsi = symbol_info.get('rsi')
|
||
macd = symbol_info.get('macd')
|
||
bollinger = symbol_info.get('bollinger')
|
||
market_regime = symbol_info.get('marketRegime', 'unknown')
|
||
ema20 = symbol_info.get('ema20')
|
||
ema50 = symbol_info.get('ema50')
|
||
ema20_4h = symbol_info.get('ema20_4h')
|
||
price_4h = symbol_info.get('price_4h', current_price)
|
||
|
||
# 判断4H周期趋势方向
|
||
trend_4h = None
|
||
if ema20_4h is not None:
|
||
if price_4h > ema20_4h:
|
||
trend_4h = 'up'
|
||
elif price_4h < ema20_4h:
|
||
trend_4h = 'down'
|
||
else:
|
||
trend_4h = 'neutral'
|
||
|
||
# 基础分数:即使没有明确信号,也给1分基础分(推荐系统更宽松)
|
||
signal_strength = 1
|
||
reasons = []
|
||
direction = None
|
||
|
||
# 策略1:均值回归(震荡市场)
|
||
if market_regime == 'ranging':
|
||
if rsi and rsi < 30:
|
||
if trend_4h in ('up', 'neutral', None):
|
||
signal_strength += 4
|
||
reasons.append(f"RSI超卖({rsi:.1f})")
|
||
if direction is None:
|
||
direction = 'BUY'
|
||
elif rsi and rsi > 70:
|
||
if trend_4h in ('down', 'neutral', None):
|
||
signal_strength += 4
|
||
reasons.append(f"RSI超买({rsi:.1f})")
|
||
if direction is None:
|
||
direction = 'SELL'
|
||
|
||
if bollinger and current_price <= bollinger.get('lower'):
|
||
if trend_4h in ('up', 'neutral', None):
|
||
signal_strength += 3
|
||
reasons.append("触及布林带下轨")
|
||
if direction is None:
|
||
direction = 'BUY'
|
||
elif bollinger and current_price >= bollinger.get('upper'):
|
||
if trend_4h in ('down', 'neutral', None):
|
||
signal_strength += 3
|
||
reasons.append("触及布林带上轨")
|
||
if direction is None:
|
||
direction = 'SELL'
|
||
|
||
# 策略2:趋势跟踪(趋势市场)
|
||
elif market_regime == 'trending':
|
||
if macd and macd.get('macd', 0) > macd.get('signal', 0) and macd.get('histogram', 0) > 0:
|
||
if trend_4h in ('up', 'neutral', None):
|
||
signal_strength += 3
|
||
reasons.append("MACD金叉")
|
||
if direction is None:
|
||
direction = 'BUY'
|
||
elif macd and macd.get('macd', 0) < macd.get('signal', 0) and macd.get('histogram', 0) < 0:
|
||
if trend_4h in ('down', 'neutral', None):
|
||
signal_strength += 3
|
||
reasons.append("MACD死叉")
|
||
if direction is None:
|
||
direction = 'SELL'
|
||
|
||
if ema20 and ema50:
|
||
if current_price > ema20 > ema50:
|
||
if trend_4h in ('up', 'neutral', None):
|
||
signal_strength += 2
|
||
reasons.append("价格在均线之上")
|
||
if direction is None:
|
||
direction = 'BUY'
|
||
elif current_price < ema20 < ema50:
|
||
if trend_4h in ('down', 'neutral', None):
|
||
signal_strength += 2
|
||
reasons.append("价格在均线之下")
|
||
if direction is None:
|
||
direction = 'SELL'
|
||
|
||
# 多周期共振加分
|
||
if direction and trend_4h:
|
||
if (direction == 'BUY' and trend_4h == 'up') or (direction == 'SELL' and trend_4h == 'down'):
|
||
signal_strength += 2
|
||
reasons.append("4H周期共振确认")
|
||
|
||
# 推荐系统:默认不再输出“逆4H趋势”的推荐(用户反馈方向不对的主要来源)
|
||
# 如果未来需要放开,可做成配置项 ALLOW_COUNTER_TREND_RECOMMENDATIONS。
|
||
if direction and trend_4h and trend_4h in ("up", "down"):
|
||
if (direction == 'BUY' and trend_4h == 'down') or (direction == 'SELL' and trend_4h == 'up'):
|
||
reasons.append("❌ 逆4H趋势,跳过推荐")
|
||
return {
|
||
'should_trade': False,
|
||
'direction': direction,
|
||
'reason': ', '.join(reasons) if reasons else '逆趋势',
|
||
'strength': max(0, signal_strength - 2),
|
||
'trend_4h': trend_4h
|
||
}
|
||
|
||
# 判断是否应该推荐(使用传入的参数,如果没有则使用config中的值)
|
||
if min_signal_strength is None:
|
||
min_signal_strength = config.TRADING_CONFIG.get('MIN_SIGNAL_STRENGTH', 7)
|
||
should_trade = (direction is not None) and (signal_strength >= min_signal_strength)
|
||
|
||
return {
|
||
'should_trade': should_trade,
|
||
'direction': direction,
|
||
'reason': ', '.join(reasons) if reasons else '无明确信号',
|
||
'strength': signal_strength,
|
||
'trend_4h': trend_4h
|
||
}
|
||
|
||
async def _create_recommendation(
|
||
self,
|
||
symbol_info: Dict,
|
||
trade_signal: Dict,
|
||
account_balance: float = 1000
|
||
) -> Optional[List[Dict]]:
|
||
"""
|
||
创建推荐记录
|
||
|
||
Args:
|
||
symbol_info: 交易对信息
|
||
trade_signal: 交易信号
|
||
|
||
Returns:
|
||
推荐字典
|
||
"""
|
||
try:
|
||
symbol = symbol_info['symbol']
|
||
# 技术分析价(K线收盘价):用于指标/信号一致性
|
||
analysis_price = symbol_info.get('kline_close_price', symbol_info.get('price'))
|
||
try:
|
||
analysis_price = float(analysis_price) if analysis_price is not None else None
|
||
except Exception:
|
||
analysis_price = None
|
||
|
||
# 展示用“当前价”:优先用 ticker_price(更贴近用户理解的“当前价格”)
|
||
current_price = symbol_info.get('ticker_price', None)
|
||
try:
|
||
current_price = float(current_price) if current_price is not None else None
|
||
except Exception:
|
||
current_price = None
|
||
|
||
# 回退:如果拿不到 ticker_price,就用分析价兜底
|
||
if current_price is None:
|
||
current_price = analysis_price
|
||
|
||
price_ts_ms = symbol_info.get("ticker_ts", None)
|
||
try:
|
||
price_ts_ms = int(price_ts_ms) if price_ts_ms is not None else None
|
||
except Exception:
|
||
price_ts_ms = None
|
||
|
||
direction = trade_signal['direction']
|
||
if not direction:
|
||
return None
|
||
|
||
# 先计算建议挂单价(限价单),然后用“挂单价”作为止损/止盈的基准入场价
|
||
# 否则会出现:止损按 current_price 算、但挂单按回调价算 → 止损/挂单价不匹配,甚至相等
|
||
limit_price_offset_pct = config.TRADING_CONFIG.get('LIMIT_ORDER_OFFSET_PCT', 0.5) # 默认0.5%
|
||
if direction == 'BUY':
|
||
suggested_limit_price = current_price * (1 - limit_price_offset_pct / 100)
|
||
else: # SELL
|
||
suggested_limit_price = current_price * (1 + limit_price_offset_pct / 100)
|
||
|
||
# 计算建议的止损止盈(基于保证金),以“计划入场价=挂单价”为基准
|
||
entry_price = suggested_limit_price
|
||
|
||
# 估算仓位数量和杠杆(用于计算止损止盈)
|
||
# 重要语义:suggested_position_pct 表示“保证金占用比例”
|
||
account_balance = symbol_info.get('account_balance', 1000)
|
||
suggested_position_pct = symbol_info.get('suggested_position_pct', 0.05)
|
||
leverage = config.TRADING_CONFIG.get('LEVERAGE', 10)
|
||
|
||
# 估算保证金与名义价值
|
||
estimated_margin = account_balance * suggested_position_pct
|
||
estimated_notional = estimated_margin * leverage if leverage and leverage > 0 else estimated_margin
|
||
estimated_quantity = estimated_notional / entry_price if entry_price > 0 else 0
|
||
|
||
# 计算基于保证金的止损止盈
|
||
stop_loss_pct_margin = config.TRADING_CONFIG.get('STOP_LOSS_PERCENT', 0.08)
|
||
take_profit_pct_margin = config.TRADING_CONFIG.get('TAKE_PROFIT_PERCENT', 0.15)
|
||
|
||
stop_loss_price = self.risk_manager.get_stop_loss_price(
|
||
entry_price,
|
||
direction,
|
||
estimated_quantity,
|
||
leverage,
|
||
stop_loss_pct=stop_loss_pct_margin,
|
||
klines=symbol_info.get('klines'),
|
||
bollinger=symbol_info.get('bollinger'),
|
||
atr=symbol_info.get('atr')
|
||
)
|
||
|
||
# 计算止损百分比(相对于保证金,用于显示)
|
||
stop_loss_amount = estimated_margin * stop_loss_pct_margin
|
||
if direction == 'BUY':
|
||
stop_loss_pct = (entry_price - stop_loss_price) / entry_price
|
||
else:
|
||
stop_loss_pct = (stop_loss_price - entry_price) / entry_price
|
||
|
||
# 第一目标:盈亏比1:1(相对于保证金)
|
||
take_profit_1_pct_margin = stop_loss_pct_margin * 1.0 # 1:1 盈亏比
|
||
take_profit_1 = self.risk_manager.get_take_profit_price(
|
||
entry_price, direction, estimated_quantity, leverage,
|
||
take_profit_pct=take_profit_1_pct_margin
|
||
)
|
||
|
||
# 第二目标:止损的2.5倍(相对于保证金)
|
||
take_profit_2_pct_margin = stop_loss_pct_margin * 2.5
|
||
take_profit_2 = self.risk_manager.get_take_profit_price(
|
||
entry_price, direction, estimated_quantity, leverage,
|
||
take_profit_pct=take_profit_2_pct_margin
|
||
)
|
||
|
||
# 建议仓位(根据信号强度调整)
|
||
base_position_pct = config.TRADING_CONFIG.get('MAX_POSITION_PERCENT', 0.05)
|
||
signal_strength = trade_signal['strength']
|
||
# 信号强度越高,建议仓位可以适当增加(但不超过1.5倍)
|
||
position_multiplier = min(1.0 + (signal_strength - 5) * 0.1, 1.5)
|
||
suggested_position_pct = base_position_pct * position_multiplier
|
||
|
||
# 添加时间戳
|
||
timestamp = time.time()
|
||
recommendation_time = datetime.now().isoformat()
|
||
|
||
# 计算胜率预估(基于信号强度、市场状态等)
|
||
base_win_rate = 40 + (signal_strength * 3) # 信号强度0-10,对应胜率40-70%
|
||
market_regime = symbol_info.get('marketRegime', 'ranging')
|
||
if market_regime == 'ranging':
|
||
base_win_rate += 5
|
||
elif market_regime == 'trending':
|
||
base_win_rate += 3
|
||
trend_4h = trade_signal.get('trend_4h')
|
||
if trend_4h:
|
||
if (direction == 'BUY' and trend_4h == 'up') or (direction == 'SELL' and trend_4h == 'down'):
|
||
base_win_rate += 3
|
||
elif (direction == 'BUY' and trend_4h == 'down') or (direction == 'SELL' and trend_4h == 'up'):
|
||
base_win_rate -= 2
|
||
estimated_win_rate = max(35, min(75, base_win_rate))
|
||
|
||
# 计算盈亏USDT评估(基于保证金)
|
||
margin = account_balance * suggested_position_pct
|
||
|
||
# 止损止盈金额(相对于保证金)
|
||
stop_loss_usdt = margin * stop_loss_pct_margin
|
||
take_profit_1_usdt = margin * take_profit_1_pct_margin
|
||
take_profit_2_usdt = margin * take_profit_2_pct_margin
|
||
|
||
expected_pnl_1 = (estimated_win_rate / 100) * take_profit_1_usdt - ((100 - estimated_win_rate) / 100) * stop_loss_usdt
|
||
expected_pnl_2 = (estimated_win_rate / 100) * take_profit_2_usdt - ((100 - estimated_win_rate) / 100) * stop_loss_usdt
|
||
|
||
# 分类推荐类型和风险等级
|
||
recommendation_category, risk_level, simple_reason, risk_warning, trading_tutorial = self._classify_recommendation(
|
||
market_regime, trend_4h, direction, trade_signal, signal_strength
|
||
)
|
||
|
||
# 计算预期持仓时间(基于策略周期和市场状态)
|
||
expected_hold_time = self._estimate_hold_time(market_regime, trend_4h, direction, signal_strength)
|
||
|
||
# 生成用户指南(人话版计划)
|
||
user_guide = self._generate_user_guide(
|
||
symbol, direction, suggested_limit_price, stop_loss_price, take_profit_1, take_profit_2,
|
||
simple_reason, expected_hold_time, risk_warning, recommendation_category, current_price
|
||
)
|
||
|
||
# 基础推荐数据
|
||
base_data = {
|
||
'symbol': symbol,
|
||
'direction': direction,
|
||
'current_price': current_price,
|
||
# 价格元信息:前端用于展示“这是不是实时的”
|
||
'current_price_source': 'ticker_24h' if symbol_info.get('ticker_price') is not None else 'kline_close',
|
||
'current_price_time_ms': price_ts_ms,
|
||
'change_percent': symbol_info.get('changePercent', 0),
|
||
'recommendation_reason': trade_signal['reason'],
|
||
'signal_strength': signal_strength,
|
||
'market_regime': market_regime,
|
||
'trend_4h': trend_4h,
|
||
# 额外保留分析价(用于排查“为什么信号这样算”)
|
||
'analysis_price': analysis_price,
|
||
'rsi': symbol_info.get('rsi'),
|
||
'macd_histogram': symbol_info.get('macd', {}).get('histogram') if symbol_info.get('macd') else None,
|
||
'bollinger_upper': symbol_info.get('bollinger', {}).get('upper') if symbol_info.get('bollinger') else None,
|
||
'bollinger_middle': symbol_info.get('bollinger', {}).get('middle') if symbol_info.get('bollinger') else None,
|
||
'bollinger_lower': symbol_info.get('bollinger', {}).get('lower') if symbol_info.get('bollinger') else None,
|
||
'ema20': symbol_info.get('ema20'),
|
||
'ema50': symbol_info.get('ema50'),
|
||
'ema20_4h': symbol_info.get('ema20_4h'),
|
||
'atr': symbol_info.get('atr'),
|
||
'suggested_stop_loss': stop_loss_price,
|
||
'suggested_take_profit_1': take_profit_1,
|
||
'suggested_take_profit_2': take_profit_2,
|
||
# 计划入场价(限价挂单价)作为止损/止盈计算基准
|
||
'planned_entry_price': entry_price,
|
||
'suggested_position_percent': suggested_position_pct,
|
||
'suggested_leverage': config.TRADING_CONFIG.get('LEVERAGE', 10),
|
||
'volume_24h': symbol_info.get('volume24h'),
|
||
'volatility': symbol_info.get('volatility'),
|
||
'estimated_win_rate': round(estimated_win_rate, 1),
|
||
'expected_pnl_1': round(expected_pnl_1, 2),
|
||
'expected_pnl_2': round(expected_pnl_2, 2),
|
||
'stop_loss_usdt': round(stop_loss_usdt, 2),
|
||
'take_profit_1_usdt': round(take_profit_1_usdt, 2),
|
||
'take_profit_2_usdt': round(take_profit_2_usdt, 2),
|
||
'recommendation_time': recommendation_time,
|
||
'timestamp': timestamp,
|
||
# 新增字段:用户指南和分类
|
||
'user_guide': user_guide,
|
||
'recommendation_category': recommendation_category,
|
||
'risk_level': risk_level,
|
||
'expected_hold_time': expected_hold_time,
|
||
'trading_tutorial': trading_tutorial,
|
||
'max_hold_days': 3 # 最大持仓天数(超过此天数建议平仓)
|
||
}
|
||
|
||
# 限价单推荐(唯一推荐类型)
|
||
limit_recommendation = base_data.copy()
|
||
limit_recommendation.update({
|
||
'order_type': 'LIMIT',
|
||
'suggested_limit_price': suggested_limit_price
|
||
})
|
||
|
||
logger.debug(
|
||
f"✓ 生成推荐: {symbol} {direction} "
|
||
f"(信号强度: {signal_strength}/10, 胜率预估: {estimated_win_rate:.1f}%, "
|
||
f"分类: {recommendation_category}, 风险: {risk_level})"
|
||
)
|
||
|
||
recs = [limit_recommendation]
|
||
# 将“单条推荐”也写入 Redis(统一推荐缓存来源,避免只在手动 generate 时才更新)
|
||
await self._update_realtime_cache(recs)
|
||
return recs
|
||
|
||
except Exception as e:
|
||
logger.error(f"创建推荐失败 {symbol_info.get('symbol', 'unknown')}: {e}", exc_info=True)
|
||
return None
|
||
|
||
async def _update_realtime_cache(self, recs: List[Dict]) -> None:
|
||
"""
|
||
将推荐写入 Redis:
|
||
- Hash: recommendations:realtime(增量更新)
|
||
- Snapshot: recommendations:snapshot(全量快照,给前端统一读取)
|
||
"""
|
||
if not recs:
|
||
return
|
||
try:
|
||
rc = getattr(self.client, "redis_cache", None)
|
||
if not rc or not getattr(rc, "redis", None) or not getattr(rc, "_connected", False):
|
||
return
|
||
|
||
cache_key = "recommendations:realtime"
|
||
snapshot_key = "recommendations:snapshot"
|
||
lock_key = "lock:recommendations:snapshot:update"
|
||
|
||
# 1) 增量写 Hash(每条推荐一个 field)
|
||
for rec in recs:
|
||
if not isinstance(rec, dict):
|
||
continue
|
||
sym = rec.get("symbol")
|
||
if not sym:
|
||
continue
|
||
rec_key = f"{sym}_{rec.get('order_type', 'LIMIT')}"
|
||
await rc.hset(cache_key, rec_key, rec, ttl=3600)
|
||
try:
|
||
await rc.redis.expire(cache_key, 3600)
|
||
except Exception:
|
||
pass
|
||
|
||
# 2) 合并写 Snapshot(单 key,给所有用户读)
|
||
now_ms = int(__import__("time").time() * 1000)
|
||
try:
|
||
got_lock = await rc.redis.set(lock_key, str(now_ms), nx=True, ex=2)
|
||
except Exception:
|
||
got_lock = False
|
||
if not got_lock:
|
||
return
|
||
|
||
existing = None
|
||
try:
|
||
existing = await rc.get(snapshot_key)
|
||
except Exception:
|
||
existing = None
|
||
|
||
# 从 snapshot 取旧 items;如果没有,就从 Hash 兜底(避免首次写入为空)
|
||
items: List[Dict] = []
|
||
if isinstance(existing, dict) and isinstance(existing.get("items"), list):
|
||
items = [x for x in existing.get("items", []) if isinstance(x, dict)]
|
||
|
||
# 合并:按 rec_key 覆盖(同 symbol + order_type)
|
||
merged: Dict[str, Dict] = {}
|
||
for x in items:
|
||
sym = x.get("symbol")
|
||
if not sym:
|
||
continue
|
||
k = f"{sym}_{x.get('order_type', 'LIMIT')}"
|
||
merged[k] = x
|
||
for x in recs:
|
||
if not isinstance(x, dict):
|
||
continue
|
||
sym = x.get("symbol")
|
||
if not sym:
|
||
continue
|
||
k = f"{sym}_{x.get('order_type', 'LIMIT')}"
|
||
merged[k] = x
|
||
|
||
# 过滤过期(2小时)
|
||
max_age_sec = 3600 * 2
|
||
now_s = __import__("time").time()
|
||
filtered: List[Dict] = []
|
||
for x in merged.values():
|
||
ts = x.get("timestamp", 0)
|
||
try:
|
||
ts = float(ts) if ts is not None else 0.0
|
||
except Exception:
|
||
ts = 0.0
|
||
if ts and (now_s - ts) > max_age_sec:
|
||
continue
|
||
filtered.append(x)
|
||
|
||
filtered.sort(key=lambda x: x.get("timestamp", 0) or 0, reverse=True)
|
||
# 防止快照无限长(前端默认 limit=50,这里留一点冗余)
|
||
filtered = filtered[:200]
|
||
|
||
snapshot = {
|
||
"generated_at_ms": now_ms,
|
||
"generated_at": datetime.now().isoformat(),
|
||
"ttl_sec": 7200,
|
||
"count": len(filtered),
|
||
"items": filtered,
|
||
}
|
||
await rc.set(snapshot_key, snapshot, ttl=7200)
|
||
except Exception as e:
|
||
logger.debug(f"更新推荐 Redis 缓存失败(可忽略): {e}")
|
||
|
||
def _classify_recommendation(
|
||
self,
|
||
market_regime: str,
|
||
trend_4h: Optional[str],
|
||
direction: str,
|
||
trade_signal: Dict,
|
||
signal_strength: int
|
||
) -> tuple:
|
||
"""
|
||
分类推荐类型和风险等级
|
||
|
||
Returns:
|
||
(recommendation_category, risk_level, simple_reason, risk_warning, trading_tutorial)
|
||
"""
|
||
# 判断是否为逆趋势交易
|
||
is_counter_trend = False
|
||
if trend_4h:
|
||
if (direction == 'BUY' and trend_4h == 'down') or (direction == 'SELL' and trend_4h == 'up'):
|
||
is_counter_trend = True
|
||
|
||
# 分类推荐类型
|
||
if market_regime == 'trending':
|
||
if not is_counter_trend:
|
||
# 顺趋势突破
|
||
category = "顺趋势突破"
|
||
risk_level = "中等"
|
||
simple_reason = "趋势明确,顺势而为,胜率较高"
|
||
risk_warning = "趋势可能反转,注意止损保护"
|
||
tutorial = "此类交易适合跟随趋势,持仓时间可稍长,但需关注趋势是否持续"
|
||
else:
|
||
# 逆趋势反弹(高风险)
|
||
category = "逆势反弹(高风险)"
|
||
risk_level = "高"
|
||
simple_reason = "逆趋势操作,风险较高,适合快进快出"
|
||
risk_warning = "⚠️ 高风险:逆趋势交易胜率较低,务必严格止损,快进快出"
|
||
tutorial = "此类交易胜率较低,仅适合短线快进快出,务必严格止损。建议持仓不超过数小时,一旦触及止损立即离场"
|
||
elif market_regime == 'ranging':
|
||
# 震荡区间操作
|
||
category = "震荡区间操作"
|
||
risk_level = "中等"
|
||
simple_reason = "市场震荡,在区间边界操作,适合高抛低吸"
|
||
risk_warning = "震荡可能转为趋势,注意突破风险"
|
||
tutorial = "此类交易适合在震荡区间边界操作,预期持仓时间较短(数小时至1天),到达目标及时止盈"
|
||
else:
|
||
# 未知市场状态
|
||
category = "市场状态不明"
|
||
risk_level = "中高"
|
||
simple_reason = "市场状态不明确,谨慎操作"
|
||
risk_warning = "市场状态不明,建议降低仓位或等待更明确信号"
|
||
tutorial = "市场状态不明确,建议谨慎操作,降低仓位,严格止损"
|
||
|
||
# 根据信号强度调整风险等级
|
||
if signal_strength >= 7:
|
||
if risk_level == "高":
|
||
risk_level = "中高"
|
||
elif risk_level == "中等":
|
||
risk_level = "低中"
|
||
elif signal_strength < 5:
|
||
if risk_level == "中等":
|
||
risk_level = "中高"
|
||
elif risk_level == "低中":
|
||
risk_level = "中等"
|
||
|
||
return category, risk_level, simple_reason, risk_warning, tutorial
|
||
|
||
def _estimate_hold_time(
|
||
self,
|
||
market_regime: str,
|
||
trend_4h: Optional[str],
|
||
direction: str,
|
||
signal_strength: int
|
||
) -> str:
|
||
"""
|
||
估算预期持仓时间
|
||
|
||
Returns:
|
||
预期持仓时间的文字描述
|
||
"""
|
||
# 基于策略周期(1H/15min)和市场状态估算
|
||
if market_regime == 'trending':
|
||
# 趋势市场,持仓时间可能较长
|
||
if signal_strength >= 7:
|
||
return "预期持仓:数小时至2天(趋势明确,可适当延长持仓)"
|
||
else:
|
||
return "预期持仓:数小时至1天(趋势较弱,及时止盈)"
|
||
elif market_regime == 'ranging':
|
||
# 震荡市场,持仓时间较短
|
||
return "预期持仓:数小时至半天(震荡市场,快进快出)"
|
||
else:
|
||
# 未知状态,保守估计
|
||
return "预期持仓:数小时(市场状态不明,谨慎持仓)"
|
||
|
||
def _generate_user_guide(
|
||
self,
|
||
symbol: str,
|
||
direction: str,
|
||
limit_price: float,
|
||
stop_loss: float,
|
||
tp1: float,
|
||
tp2: float,
|
||
simple_reason: str,
|
||
expected_hold_time: str,
|
||
risk_warning: str,
|
||
category: str,
|
||
current_price: float
|
||
) -> str:
|
||
"""
|
||
生成用户指南(人话版计划)
|
||
|
||
Args:
|
||
symbol: 交易对
|
||
direction: 方向
|
||
limit_price: 建议入场价(限价单)
|
||
stop_loss: 止损价
|
||
tp1: 第一目标止盈价
|
||
tp2: 第二目标止盈价
|
||
simple_reason: 简单原因
|
||
expected_hold_time: 预期持仓时间
|
||
risk_warning: 风险警告
|
||
category: 推荐分类
|
||
current_price: 当前价格(用于计算反向波动)
|
||
|
||
Returns:
|
||
用户指南文本
|
||
"""
|
||
direction_cn = "买入" if direction == 'BUY' else "卖出"
|
||
direction_action = "挂单买入" if direction == 'BUY' else "挂单卖出"
|
||
|
||
# 计算反向波动阈值(2%)
|
||
if direction == 'BUY':
|
||
# 买单:如果价格下跌超过2%,建议取消
|
||
reverse_threshold = current_price * 0.98
|
||
reverse_direction = "下跌"
|
||
else:
|
||
# 卖单:如果价格上涨超过2%,建议取消
|
||
reverse_threshold = current_price * 1.02
|
||
reverse_direction = "上涨"
|
||
|
||
user_guide = f"""【操作计划】{direction_cn} {symbol}
|
||
【推荐类型】{category}
|
||
【核心理由】{simple_reason}
|
||
|
||
【明确的入场价】
|
||
建议在 {limit_price:.4f} USDT 附近{direction_action}
|
||
|
||
【具体点位】
|
||
• 建议挂单价: {limit_price:.4f} USDT
|
||
• 止损价: {stop_loss:.4f} USDT
|
||
• 第一目标: {tp1:.4f} USDT(盈亏比1:1)
|
||
• 第二目标: {tp2:.4f} USDT(盈亏比2.5:1)
|
||
|
||
【持仓周期】{expected_hold_time}
|
||
|
||
【退出条件】
|
||
• 触及止损:立即平仓
|
||
• 触及第一目标:可部分止盈或全部止盈
|
||
• 触及第二目标:建议全部止盈
|
||
• 持仓超过3天未触及第一目标:建议平仓离场重新评估
|
||
|
||
【订单失效条件】
|
||
此限价单建议当日有效。若价格未触及挂单价,但价格直接{reverse_direction}超过2%({reverse_threshold:.4f} USDT),则建议取消订单,等待新信号。
|
||
|
||
【关键提醒】{risk_warning}
|
||
|
||
【给主动交易者的提示】
|
||
如果您确信趋势已启动,也可考虑以市价单立即入场,但需承受更高滑点成本,且务必设置好止损。"""
|
||
|
||
return user_guide
|
||
|
||
async def get_active_recommendations(self) -> List[Dict]:
|
||
"""获取当前有效的推荐"""
|
||
if DB_AVAILABLE and TradeRecommendation:
|
||
return TradeRecommendation.get_active()
|
||
return []
|
||
|
||
async def mark_recommendation_executed(self, recommendation_id: int, trade_id: int = None):
|
||
"""标记推荐已执行"""
|
||
if DB_AVAILABLE and TradeRecommendation:
|
||
TradeRecommendation.mark_executed(recommendation_id, trade_id)
|
||
logger.info(f"推荐 {recommendation_id} 已标记为已执行")
|