auto_trade_sys/trading_system/trade_recommender.py
薇薇安 e81dc33c3c a
2026-01-15 22:48:53 +08:00

504 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
推荐交易对模块 - 生成交易推荐供手动参考
"""
import asyncio
import logging
import time
from typing import List, Dict, Optional
from datetime import datetime, timedelta
try:
from .binance_client import BinanceClient
from .market_scanner import MarketScanner
from .risk_manager import RiskManager
from . import config
except ImportError:
from binance_client import BinanceClient
from market_scanner import MarketScanner
from risk_manager import RiskManager
import config
logger = logging.getLogger(__name__)
# 尝试导入数据库模型
DB_AVAILABLE = False
TradeRecommendation = None
try:
import sys
from pathlib import Path
project_root = Path(__file__).parent.parent
backend_path = project_root / 'backend'
if backend_path.exists():
sys.path.insert(0, str(backend_path))
from database.models import TradeRecommendation
DB_AVAILABLE = True
logger.info("✓ 数据库模型导入成功,推荐记录将保存到数据库")
else:
logger.warning("⚠ backend目录不存在无法使用数据库功能")
DB_AVAILABLE = False
except ImportError as e:
logger.warning(f"⚠ 无法导入数据库模型: {e}")
DB_AVAILABLE = False
except Exception as e:
logger.warning(f"⚠ 数据库初始化失败: {e}")
DB_AVAILABLE = False
class TradeRecommender:
"""推荐交易对生成器"""
def __init__(
self,
client: BinanceClient,
scanner: MarketScanner,
risk_manager: RiskManager
):
"""
初始化推荐器
Args:
client: 币安客户端
scanner: 市场扫描器
risk_manager: 风险管理器
"""
self.client = client
self.scanner = scanner
self.risk_manager = risk_manager
async def generate_recommendations(
self,
min_signal_strength: int = 5,
max_recommendations: int = 20,
add_to_cache: bool = True,
min_quality_score: float = 0.0
) -> List[Dict]:
"""
生成交易推荐支持增量添加到Redis缓存
Args:
min_signal_strength: 最小信号强度默认5低于此强度的不推荐
max_recommendations: 最大推荐数量(单次生成)
add_to_cache: 是否添加到Redis缓存默认True
min_quality_score: 最小质量分数用于过滤默认0.0表示不过滤)
Returns:
推荐列表
"""
logger.info("开始生成交易推荐...")
# 获取账户余额(用于计算盈亏评估)
try:
balance = await self.client.get_account_balance()
account_balance = balance.get('total', 1000)
logger.debug(f"账户余额: {account_balance:.2f} USDT")
except Exception as e:
account_balance = 1000
logger.warning(f"获取账户余额失败使用默认值1000 USDT: {e}")
# 1. 从Redis读取现有推荐如果启用缓存
existing_recommendations = {}
if add_to_cache:
try:
# 从Redis Hash读取所有现有推荐
cache_key = "recommendations:realtime"
existing_data = await self.client.redis_cache.hgetall(cache_key)
for rec_key, rec_data in existing_data.items():
if isinstance(rec_data, dict):
existing_recommendations[rec_key] = rec_data
logger.info(f"从Redis读取到 {len(existing_recommendations)} 个现有推荐")
except Exception as e:
logger.debug(f"从Redis读取现有推荐失败: {e}")
# 2. 扫描市场
top_symbols = await self.scanner.scan_market()
if not top_symbols:
logger.warning("未找到符合条件的交易对")
# 如果市场扫描失败,返回现有缓存推荐
if existing_recommendations:
return list(existing_recommendations.values())
return []
new_recommendations = []
# 3. 对每个交易对进行分析
for symbol_info in top_symbols:
symbol = symbol_info['symbol']
current_price = symbol_info['price']
# 4. 分析交易信号传入min_signal_strength参数
trade_signal = await self._analyze_trade_signal(symbol_info, min_signal_strength=min_signal_strength)
# 5. 如果信号强度足够,生成推荐
if trade_signal['should_trade'] and trade_signal['strength'] >= min_signal_strength:
recommendations_result = await self._create_recommendation(
symbol_info, trade_signal, account_balance
)
if recommendations_result:
# _create_recommendation 返回列表(限价单和市价单)
if isinstance(recommendations_result, list):
new_recommendations.extend(recommendations_result)
else:
new_recommendations.append(recommendations_result)
# 6. 合并新推荐和现有推荐
all_recommendations = {}
current_time = time.time()
max_age = 3600 * 2 # 推荐最大保留时间2小时
# 先添加现有推荐(过滤掉过期的)
for rec_key, rec_data in existing_recommendations.items():
rec_timestamp = rec_data.get('timestamp', 0)
# 如果推荐时间超过2小时跳过
if current_time - rec_timestamp > max_age:
continue
all_recommendations[rec_key] = rec_data
# 添加新推荐(如果质量足够)
for rec in new_recommendations:
# 生成推荐键symbol_orderType
rec_key = f"{rec['symbol']}_{rec.get('order_type', 'LIMIT')}"
# 计算质量分数(信号强度 + 胜率预估)
quality_score = rec.get('signal_strength', 0) * 10
if 'estimated_win_rate' in rec:
quality_score += rec['estimated_win_rate']
# 如果质量分数足够,才添加或更新
if quality_score >= min_quality_score:
# 如果已存在,比较质量分数,只保留更好的
if rec_key in all_recommendations:
existing_quality = all_recommendations[rec_key].get('signal_strength', 0) * 10
if 'estimated_win_rate' in all_recommendations[rec_key]:
existing_quality += all_recommendations[rec_key]['estimated_win_rate']
# 如果新推荐质量更好,更新
if quality_score > existing_quality:
all_recommendations[rec_key] = rec
logger.debug(f"更新推荐 {rec_key}: 质量分数 {existing_quality:.1f} -> {quality_score:.1f}")
else:
# 新推荐,直接添加
all_recommendations[rec_key] = rec
logger.debug(f"添加新推荐 {rec_key}: 质量分数 {quality_score:.1f}")
# 7. 按信号强度排序,保留质量最好的推荐
sorted_recommendations = sorted(
all_recommendations.values(),
key=lambda x: (x.get('signal_strength', 0), x.get('estimated_win_rate', 0)),
reverse=True
)
# 限制总数(保留质量最好的)
max_total = max_recommendations * 3 # 允许更多推荐以便参考
final_recommendations = sorted_recommendations[:max_total]
# 8. 保存到Redis缓存如果启用
if add_to_cache:
try:
cache_key = "recommendations:realtime"
# 增量更新:只更新或添加新的推荐,保留其他推荐
updated_count = 0
for rec in final_recommendations:
rec_key = f"{rec['symbol']}_{rec.get('order_type', 'LIMIT')}"
# 使用hset更新或添加如果不存在则创建
await self.client.redis_cache.hset(cache_key, rec_key, rec, ttl=3600) # TTL 1小时
updated_count += 1
# 设置整个Hash的TTL1小时
if self.client.redis_cache.redis and self.client.redis_cache._connected:
try:
await self.client.redis_cache.redis.expire(cache_key, 3600)
except:
pass
logger.info(f"已更新 {updated_count} 个推荐到Redis缓存总计 {len(final_recommendations)} 个)")
except Exception as e:
logger.warning(f"保存推荐到Redis失败: {e}")
logger.info(f"生成了 {len(new_recommendations)} 个新推荐,总计 {len(final_recommendations)} 个推荐(包含缓存)")
return final_recommendations
async def _analyze_trade_signal(self, symbol_info: Dict, min_signal_strength: int = None) -> Dict:
"""
分析交易信号(复用策略模块的逻辑)
Args:
symbol_info: 交易对信息
Returns:
交易信号字典
"""
symbol = symbol_info['symbol']
current_price = symbol_info['price']
rsi = symbol_info.get('rsi')
macd = symbol_info.get('macd')
bollinger = symbol_info.get('bollinger')
market_regime = symbol_info.get('marketRegime', 'unknown')
ema20 = symbol_info.get('ema20')
ema50 = symbol_info.get('ema50')
ema20_4h = symbol_info.get('ema20_4h')
price_4h = symbol_info.get('price_4h', current_price)
# 判断4H周期趋势方向
trend_4h = None
if ema20_4h is not None:
if price_4h > ema20_4h:
trend_4h = 'up'
elif price_4h < ema20_4h:
trend_4h = 'down'
else:
trend_4h = 'neutral'
# 基础分数即使没有明确信号也给1分基础分推荐系统更宽松
signal_strength = 1
reasons = []
direction = None
# 策略1均值回归震荡市场
if market_regime == 'ranging':
if rsi and rsi < 30:
if trend_4h in ('up', 'neutral', None):
signal_strength += 4
reasons.append(f"RSI超卖({rsi:.1f})")
if direction is None:
direction = 'BUY'
elif rsi and rsi > 70:
if trend_4h in ('down', 'neutral', None):
signal_strength += 4
reasons.append(f"RSI超买({rsi:.1f})")
if direction is None:
direction = 'SELL'
if bollinger and current_price <= bollinger.get('lower'):
if trend_4h in ('up', 'neutral', None):
signal_strength += 3
reasons.append("触及布林带下轨")
if direction is None:
direction = 'BUY'
elif bollinger and current_price >= bollinger.get('upper'):
if trend_4h in ('down', 'neutral', None):
signal_strength += 3
reasons.append("触及布林带上轨")
if direction is None:
direction = 'SELL'
# 策略2趋势跟踪趋势市场
elif market_regime == 'trending':
if macd and macd.get('macd', 0) > macd.get('signal', 0) and macd.get('histogram', 0) > 0:
if trend_4h in ('up', 'neutral', None):
signal_strength += 3
reasons.append("MACD金叉")
if direction is None:
direction = 'BUY'
elif macd and macd.get('macd', 0) < macd.get('signal', 0) and macd.get('histogram', 0) < 0:
if trend_4h in ('down', 'neutral', None):
signal_strength += 3
reasons.append("MACD死叉")
if direction is None:
direction = 'SELL'
if ema20 and ema50:
if current_price > ema20 > ema50:
if trend_4h in ('up', 'neutral', None):
signal_strength += 2
reasons.append("价格在均线之上")
if direction is None:
direction = 'BUY'
elif current_price < ema20 < ema50:
if trend_4h in ('down', 'neutral', None):
signal_strength += 2
reasons.append("价格在均线之下")
if direction is None:
direction = 'SELL'
# 多周期共振加分
if direction and trend_4h:
if (direction == 'BUY' and trend_4h == 'up') or (direction == 'SELL' and trend_4h == 'down'):
signal_strength += 2
reasons.append("4H周期共振确认")
# 判断是否应该交易使用传入的参数如果没有则使用config中的值
if min_signal_strength is None:
min_signal_strength = config.TRADING_CONFIG.get('MIN_SIGNAL_STRENGTH', 7)
should_trade = signal_strength >= min_signal_strength
# 对于推荐系统允许逆4H趋势交易但降低信号强度标记为高风险
if direction and trend_4h:
if (direction == 'BUY' and trend_4h == 'down') or (direction == 'SELL' and trend_4h == 'up'):
# 推荐系统允许逆趋势但降低信号强度减2分
signal_strength = max(0, signal_strength - 2)
reasons.append("⚠️ 逆4H趋势高风险")
# 不禁止,但标记为高风险
return {
'should_trade': should_trade,
'direction': direction,
'reason': ', '.join(reasons) if reasons else '无明确信号',
'strength': signal_strength,
'trend_4h': trend_4h
}
async def _create_recommendation(
self,
symbol_info: Dict,
trade_signal: Dict,
account_balance: float = 1000
) -> Optional[List[Dict]]:
"""
创建推荐记录
Args:
symbol_info: 交易对信息
trade_signal: 交易信号
Returns:
推荐字典
"""
try:
symbol = symbol_info['symbol']
current_price = symbol_info['price']
direction = trade_signal['direction']
# 计算建议的止损止盈
entry_price = current_price
stop_loss_price = self.risk_manager.get_stop_loss_price(
entry_price,
direction,
klines=symbol_info.get('klines'),
bollinger=symbol_info.get('bollinger'),
atr=symbol_info.get('atr')
)
# 计算止损百分比
if direction == 'BUY':
stop_loss_pct = (entry_price - stop_loss_price) / entry_price
else:
stop_loss_pct = (stop_loss_price - entry_price) / entry_price
# 第一目标盈亏比1:1
if direction == 'BUY':
take_profit_1 = entry_price + (entry_price - stop_loss_price)
else:
take_profit_1 = entry_price - (stop_loss_price - entry_price)
# 第二目标止损的2.5倍
take_profit_2_pct = stop_loss_pct * 2.5
take_profit_2 = self.risk_manager.get_take_profit_price(
entry_price, direction, take_profit_pct=take_profit_2_pct
)
# 建议仓位(根据信号强度调整)
base_position_pct = config.TRADING_CONFIG.get('MAX_POSITION_PERCENT', 0.05)
signal_strength = trade_signal['strength']
# 信号强度越高建议仓位可以适当增加但不超过1.5倍)
position_multiplier = min(1.0 + (signal_strength - 5) * 0.1, 1.5)
suggested_position_pct = base_position_pct * position_multiplier
# 计算建议的挂单价(使用限价单,而不是市价单)
# 对于做多建议价格略低于当前价格当前价格的99.5%),以便在回调时买入
# 对于做空建议价格略高于当前价格当前价格的100.5%),以便在反弹时卖出
limit_price_offset_pct = config.TRADING_CONFIG.get('LIMIT_ORDER_OFFSET_PCT', 0.5) # 默认0.5%
if direction == 'BUY':
suggested_limit_price = current_price * (1 - limit_price_offset_pct / 100)
else: # SELL
suggested_limit_price = current_price * (1 + limit_price_offset_pct / 100)
# 添加时间戳
timestamp = time.time()
recommendation_time = datetime.now().isoformat()
# 计算胜率预估(基于信号强度、市场状态等)
base_win_rate = 40 + (signal_strength * 3) # 信号强度0-10对应胜率40-70%
market_regime = symbol_info.get('marketRegime', 'ranging')
if market_regime == 'ranging':
base_win_rate += 5
elif market_regime == 'trending':
base_win_rate += 3
trend_4h = trade_signal.get('trend_4h')
if trend_4h:
if (direction == 'BUY' and trend_4h == 'up') or (direction == 'SELL' and trend_4h == 'down'):
base_win_rate += 3
elif (direction == 'BUY' and trend_4h == 'down') or (direction == 'SELL' and trend_4h == 'up'):
base_win_rate -= 2
estimated_win_rate = max(35, min(75, base_win_rate))
# 计算盈亏USDT评估
position_value = account_balance * suggested_position_pct * config.TRADING_CONFIG.get('LEVERAGE', 10)
stop_loss_usdt = position_value * stop_loss_pct
take_profit_1_usdt = position_value * stop_loss_pct
take_profit_2_usdt = position_value * take_profit_2_pct
expected_pnl_1 = (estimated_win_rate / 100) * take_profit_1_usdt - ((100 - estimated_win_rate) / 100) * stop_loss_usdt
expected_pnl_2 = (estimated_win_rate / 100) * take_profit_2_usdt - ((100 - estimated_win_rate) / 100) * stop_loss_usdt
# 基础推荐数据
base_data = {
'symbol': symbol,
'direction': direction,
'current_price': current_price,
'change_percent': symbol_info.get('changePercent', 0),
'recommendation_reason': trade_signal['reason'],
'signal_strength': signal_strength,
'market_regime': market_regime,
'trend_4h': trend_4h,
'rsi': symbol_info.get('rsi'),
'macd_histogram': symbol_info.get('macd', {}).get('histogram') if symbol_info.get('macd') else None,
'bollinger_upper': symbol_info.get('bollinger', {}).get('upper') if symbol_info.get('bollinger') else None,
'bollinger_middle': symbol_info.get('bollinger', {}).get('middle') if symbol_info.get('bollinger') else None,
'bollinger_lower': symbol_info.get('bollinger', {}).get('lower') if symbol_info.get('bollinger') else None,
'ema20': symbol_info.get('ema20'),
'ema50': symbol_info.get('ema50'),
'ema20_4h': symbol_info.get('ema20_4h'),
'atr': symbol_info.get('atr'),
'suggested_stop_loss': stop_loss_price,
'suggested_take_profit_1': take_profit_1,
'suggested_take_profit_2': take_profit_2,
'suggested_position_percent': suggested_position_pct,
'suggested_leverage': config.TRADING_CONFIG.get('LEVERAGE', 10),
'volume_24h': symbol_info.get('volume24h'),
'volatility': symbol_info.get('volatility'),
'estimated_win_rate': round(estimated_win_rate, 1),
'expected_pnl_1': round(expected_pnl_1, 2),
'expected_pnl_2': round(expected_pnl_2, 2),
'stop_loss_usdt': round(stop_loss_usdt, 2),
'take_profit_1_usdt': round(take_profit_1_usdt, 2),
'take_profit_2_usdt': round(take_profit_2_usdt, 2),
'recommendation_time': recommendation_time,
'timestamp': timestamp
}
# 限价单推荐
limit_recommendation = base_data.copy()
limit_recommendation.update({
'order_type': 'LIMIT',
'suggested_limit_price': suggested_limit_price
})
# 市价单推荐
market_recommendation = base_data.copy()
market_recommendation.update({
'order_type': 'MARKET',
'suggested_limit_price': None
})
logger.debug(
f"✓ 生成推荐: {symbol} {direction} "
f"(信号强度: {signal_strength}/10, 胜率预估: {estimated_win_rate:.1f}%)"
)
return [limit_recommendation, market_recommendation]
except Exception as e:
logger.error(f"创建推荐失败 {symbol_info.get('symbol', 'unknown')}: {e}")
return None
async def get_active_recommendations(self) -> List[Dict]:
"""获取当前有效的推荐"""
if DB_AVAILABLE and TradeRecommendation:
return TradeRecommendation.get_active()
return []
async def mark_recommendation_executed(self, recommendation_id: int, trade_id: int = None):
"""标记推荐已执行"""
if DB_AVAILABLE and TradeRecommendation:
TradeRecommendation.mark_executed(recommendation_id, trade_id)
logger.info(f"推荐 {recommendation_id} 已标记为已执行")