This commit is contained in:
薇薇安 2026-01-15 21:59:18 +08:00
parent 5030cbcc98
commit b2860f8fdb
3 changed files with 450 additions and 61 deletions

View File

@ -36,7 +36,7 @@ async def get_recommendations(
""" """
try: try:
if type == 'realtime': if type == 'realtime':
# 实时生成推荐 # 从Redis缓存读取推荐如果存在
import sys import sys
from pathlib import Path from pathlib import Path
current_file = Path(__file__) current_file = Path(__file__)
@ -58,43 +58,59 @@ async def get_recommendations(
sys.path.insert(0, str(project_root)) sys.path.insert(0, str(project_root))
from binance_client import BinanceClient from binance_client import BinanceClient
from market_scanner import MarketScanner
from risk_manager import RiskManager
from trade_recommender import TradeRecommender
import config import config
# 初始化组件 # 创建客户端用于访问Redis
client = BinanceClient( client = BinanceClient(
api_key=config.BINANCE_API_KEY, api_key=config.BINANCE_API_KEY,
api_secret=config.BINANCE_API_SECRET, api_secret=config.BINANCE_API_SECRET,
testnet=config.USE_TESTNET testnet=config.USE_TESTNET
) )
await client.connect() # 连接Redis如果还没有连接
try: try:
scanner = MarketScanner(client) await client.redis_cache.connect()
risk_manager = RiskManager(client) except:
recommender = TradeRecommender(client, scanner, risk_manager) pass
# 生成推荐 # 从Redis读取推荐
recommendations = await recommender.generate_recommendations( recommendations = []
min_signal_strength=min_signal_strength, try:
max_recommendations=limit import time
) cache_key = "recommendations:realtime"
cached_data = await client.redis_cache.hgetall(cache_key)
if cached_data:
current_time = time.time()
max_age = 3600 * 2 # 推荐最大保留时间2小时
# 方向过滤 # 过滤过期推荐
if direction: for rec in cached_data.values():
recommendations = [r for r in recommendations if r.get('direction') == direction] if isinstance(rec, dict):
rec_timestamp = rec.get('timestamp', 0)
# 如果推荐时间超过2小时跳过
if current_time - rec_timestamp > max_age:
continue
recommendations.append(rec)
return { # 按时间戳降序排序(最新的在前)
"success": True, recommendations.sort(key=lambda x: x.get('timestamp', 0), reverse=True)
"count": len(recommendations), logger.info(f"从Redis读取到 {len(recommendations)} 个有效推荐(已过滤过期)")
"type": "realtime", except Exception as e:
"data": recommendations logger.debug(f"从Redis读取推荐失败: {e}")
}
finally: # 方向过滤
await client.disconnect() if direction:
recommendations = [r for r in recommendations if r.get('direction') == direction]
# 限制返回数量
recommendations = recommendations[:limit]
return {
"success": True,
"count": len(recommendations),
"type": "realtime",
"data": recommendations
}
elif type == 'bookmarked': elif type == 'bookmarked':
# 从数据库查询已标记的推荐 # 从数据库查询已标记的推荐
@ -470,10 +486,12 @@ async def generate_recommendations(
risk_manager = RiskManager(client) risk_manager = RiskManager(client)
recommender = TradeRecommender(client, scanner, risk_manager) recommender = TradeRecommender(client, scanner, risk_manager)
# 生成推荐 # 生成推荐增量添加到Redis缓存
recommendations = await recommender.generate_recommendations( recommendations = await recommender.generate_recommendations(
min_signal_strength=min_signal_strength, min_signal_strength=min_signal_strength,
max_recommendations=max_recommendations max_recommendations=max_recommendations,
add_to_cache=True, # 添加到Redis缓存
min_quality_score=0.0 # 不过滤,保留所有推荐
) )
return { return {
@ -494,6 +512,52 @@ async def generate_recommendations(
raise HTTPException(status_code=500, detail=f"生成推荐失败: {str(e)}") raise HTTPException(status_code=500, detail=f"生成推荐失败: {str(e)}")
@router.post("/clear-cache")
async def clear_recommendations_cache():
"""
清理Redis中的推荐缓存
"""
try:
import sys
from pathlib import Path
current_file = Path(__file__)
backend_path = current_file.parent.parent.parent
project_root = backend_path.parent
trading_system_path = project_root / 'trading_system'
if not trading_system_path.exists():
alternative_path = backend_path / 'trading_system'
if alternative_path.exists():
trading_system_path = alternative_path
sys.path.insert(0, str(trading_system_path))
sys.path.insert(0, str(project_root))
from binance_client import BinanceClient
import config
client = BinanceClient(
api_key=config.BINANCE_API_KEY,
api_secret=config.BINANCE_API_SECRET,
testnet=config.USE_TESTNET
)
try:
await client.redis_cache.connect()
cache_key = "recommendations:realtime"
await client.redis_cache.delete(cache_key)
return {
"success": True,
"message": "推荐缓存已清理"
}
finally:
await client.disconnect()
except Exception as e:
logger.error(f"清理推荐缓存失败: {e}")
raise HTTPException(status_code=500, detail=f"清理推荐缓存失败: {str(e)}")
@router.post("/bookmark") @router.post("/bookmark")
async def bookmark_recommendation(recommendation_data: dict): async def bookmark_recommendation(recommendation_data: dict):
""" """

View File

@ -219,3 +219,176 @@ class RedisCache:
def is_connected(self) -> bool: def is_connected(self) -> bool:
"""检查是否已连接""" """检查是否已连接"""
return self._connected and self.redis is not None return self._connected and self.redis is not None
async def hset(self, name: str, key: str, value: Any, ttl: int = None):
"""
设置Hash字段
Args:
name: Hash名称
key: 字段名
value: 字段值
ttl: 过期时间如果设置会对整个Hash设置TTL
"""
if self.redis and self._connected:
try:
await self.redis.hset(name, key, json.dumps(value))
# 如果设置了TTL对整个Hash设置过期时间
if ttl:
await self.redis.expire(name, ttl)
return True
except Exception as e:
logger.debug(f"Redis Hash设置失败 {name}.{key}: {e}")
# Redis失败时尝试重新连接
if not self._connected:
await self.connect()
if self.redis and self._connected:
try:
await self.redis.hset(name, key, json.dumps(value))
if ttl:
await self.redis.expire(name, ttl)
return True
except:
pass
# 降级到内存缓存
if name not in self._memory_cache:
self._memory_cache[name] = {}
self._memory_cache[name][key] = value
return False
async def hget(self, name: str, key: str) -> Optional[Any]:
"""
获取Hash字段
Args:
name: Hash名称
key: 字段名
Returns:
字段值如果不存在则返回None
"""
if self.redis and self._connected:
try:
data = await self.redis.hget(name, key)
if data:
return json.loads(data)
except Exception as e:
logger.debug(f"Redis Hash获取失败 {name}.{key}: {e}")
# 降级到内存缓存
if name in self._memory_cache and key in self._memory_cache[name]:
return self._memory_cache[name][key]
return None
async def hgetall(self, name: str) -> Dict[str, Any]:
"""
获取Hash所有字段
Args:
name: Hash名称
Returns:
所有字段的字典
"""
if self.redis and self._connected:
try:
data = await self.redis.hgetall(name)
result = {}
for k, v in data.items():
if isinstance(k, bytes):
k = k.decode('utf-8')
if isinstance(v, bytes):
v = json.loads(v.decode('utf-8'))
else:
v = json.loads(v)
result[k] = v
return result
except Exception as e:
logger.debug(f"Redis Hash获取全部失败 {name}: {e}")
# 降级到内存缓存
if name in self._memory_cache:
return self._memory_cache[name].copy()
return {}
async def hdel(self, name: str, *keys: str):
"""
删除Hash字段
Args:
name: Hash名称
*keys: 要删除的字段名
"""
if self.redis and self._connected:
try:
await self.redis.hdel(name, *keys)
except Exception as e:
logger.debug(f"Redis Hash删除失败 {name}: {e}")
# 同时删除内存缓存
if name in self._memory_cache:
for key in keys:
if key in self._memory_cache[name]:
del self._memory_cache[name][key]
async def zadd(self, name: str, mapping: Dict[str, float], ttl: int = None):
"""
添加Sorted Set成员
Args:
name: Sorted Set名称
mapping: {member: score} 字典
ttl: 过期时间
"""
if self.redis and self._connected:
try:
await self.redis.zadd(name, mapping)
if ttl:
await self.redis.expire(name, ttl)
return True
except Exception as e:
logger.debug(f"Redis Sorted Set添加失败 {name}: {e}")
return False
async def zrange(self, name: str, start: int = 0, end: int = -1, desc: bool = False, withscores: bool = False) -> List:
"""
获取Sorted Set成员按分数排序
Args:
name: Sorted Set名称
start: 起始索引
end: 结束索引
desc: 是否降序默认False升序
withscores: 是否包含分数
Returns:
成员列表
"""
if self.redis and self._connected:
try:
if desc:
return await self.redis.zrevrange(name, start, end, withscores=withscores)
else:
return await self.redis.zrange(name, start, end, withscores=withscores)
except Exception as e:
logger.debug(f"Redis Sorted Set获取失败 {name}: {e}")
return []
async def zrem(self, name: str, *members: str):
"""
删除Sorted Set成员
Args:
name: Sorted Set名称
*members: 要删除的成员
"""
if self.redis and self._connected:
try:
await self.redis.zrem(name, *members)
except Exception as e:
logger.debug(f"Redis Sorted Set删除失败 {name}: {e}")

View File

@ -3,6 +3,7 @@
""" """
import asyncio import asyncio
import logging import logging
import time
from typing import List, Dict, Optional from typing import List, Dict, Optional
from datetime import datetime, timedelta from datetime import datetime, timedelta
try: try:
@ -66,50 +67,155 @@ class TradeRecommender:
async def generate_recommendations( async def generate_recommendations(
self, self,
min_signal_strength: int = 5, min_signal_strength: int = 5,
max_recommendations: int = 20 max_recommendations: int = 20,
add_to_cache: bool = True,
min_quality_score: float = 0.0
) -> List[Dict]: ) -> List[Dict]:
""" """
生成交易推荐 生成交易推荐支持增量添加到Redis缓存
Args: Args:
min_signal_strength: 最小信号强度默认5低于此强度的不推荐 min_signal_strength: 最小信号强度默认5低于此强度的不推荐
max_recommendations: 最大推荐数量 max_recommendations: 最大推荐数量单次生成
add_to_cache: 是否添加到Redis缓存默认True
min_quality_score: 最小质量分数用于过滤默认0.0表示不过滤
Returns: Returns:
推荐列表 推荐列表
""" """
logger.info("开始生成交易推荐...") logger.info("开始生成交易推荐...")
# 1. 扫描市场 # 获取账户余额(用于计算盈亏评估)
try:
balance = await self.client.get_account_balance()
account_balance = balance.get('total', 1000)
logger.debug(f"账户余额: {account_balance:.2f} USDT")
except Exception as e:
account_balance = 1000
logger.warning(f"获取账户余额失败使用默认值1000 USDT: {e}")
# 1. 从Redis读取现有推荐如果启用缓存
existing_recommendations = {}
if add_to_cache:
try:
# 从Redis Hash读取所有现有推荐
cache_key = "recommendations:realtime"
existing_data = await self.client.redis_cache.hgetall(cache_key)
for rec_key, rec_data in existing_data.items():
if isinstance(rec_data, dict):
existing_recommendations[rec_key] = rec_data
logger.info(f"从Redis读取到 {len(existing_recommendations)} 个现有推荐")
except Exception as e:
logger.debug(f"从Redis读取现有推荐失败: {e}")
# 2. 扫描市场
top_symbols = await self.scanner.scan_market() top_symbols = await self.scanner.scan_market()
if not top_symbols: if not top_symbols:
logger.warning("未找到符合条件的交易对") logger.warning("未找到符合条件的交易对")
# 如果市场扫描失败,返回现有缓存推荐
if existing_recommendations:
return list(existing_recommendations.values())
return [] return []
recommendations = [] new_recommendations = []
# 2. 对每个交易对进行分析 # 3. 对每个交易对进行分析
for symbol_info in top_symbols: for symbol_info in top_symbols:
if len(recommendations) >= max_recommendations:
break
symbol = symbol_info['symbol'] symbol = symbol_info['symbol']
current_price = symbol_info['price'] current_price = symbol_info['price']
change_percent = symbol_info.get('changePercent', 0)
# 3. 分析交易信号(使用策略模块的逻辑) # 4. 分析交易信号
trade_signal = await self._analyze_trade_signal(symbol_info) trade_signal = await self._analyze_trade_signal(symbol_info)
# 4. 如果信号强度足够,生成推荐 # 5. 如果信号强度足够,生成推荐
if trade_signal['should_trade'] and trade_signal['strength'] >= min_signal_strength: if trade_signal['should_trade'] and trade_signal['strength'] >= min_signal_strength:
recommendation = await self._create_recommendation( recommendations_result = await self._create_recommendation(
symbol_info, trade_signal symbol_info, trade_signal, account_balance
) )
if recommendation: if recommendations_result:
recommendations.append(recommendation) # _create_recommendation 返回列表(限价单和市价单)
if isinstance(recommendations_result, list):
new_recommendations.extend(recommendations_result)
else:
new_recommendations.append(recommendations_result)
logger.info(f"生成了 {len(recommendations)} 个交易推荐") # 6. 合并新推荐和现有推荐
return recommendations all_recommendations = {}
current_time = time.time()
max_age = 3600 * 2 # 推荐最大保留时间2小时
# 先添加现有推荐(过滤掉过期的)
for rec_key, rec_data in existing_recommendations.items():
rec_timestamp = rec_data.get('timestamp', 0)
# 如果推荐时间超过2小时跳过
if current_time - rec_timestamp > max_age:
continue
all_recommendations[rec_key] = rec_data
# 添加新推荐(如果质量足够)
for rec in new_recommendations:
# 生成推荐键symbol_orderType
rec_key = f"{rec['symbol']}_{rec.get('order_type', 'LIMIT')}"
# 计算质量分数(信号强度 + 胜率预估)
quality_score = rec.get('signal_strength', 0) * 10
if 'estimated_win_rate' in rec:
quality_score += rec['estimated_win_rate']
# 如果质量分数足够,才添加或更新
if quality_score >= min_quality_score:
# 如果已存在,比较质量分数,只保留更好的
if rec_key in all_recommendations:
existing_quality = all_recommendations[rec_key].get('signal_strength', 0) * 10
if 'estimated_win_rate' in all_recommendations[rec_key]:
existing_quality += all_recommendations[rec_key]['estimated_win_rate']
# 如果新推荐质量更好,更新
if quality_score > existing_quality:
all_recommendations[rec_key] = rec
logger.debug(f"更新推荐 {rec_key}: 质量分数 {existing_quality:.1f} -> {quality_score:.1f}")
else:
# 新推荐,直接添加
all_recommendations[rec_key] = rec
logger.debug(f"添加新推荐 {rec_key}: 质量分数 {quality_score:.1f}")
# 7. 按信号强度排序,保留质量最好的推荐
sorted_recommendations = sorted(
all_recommendations.values(),
key=lambda x: (x.get('signal_strength', 0), x.get('estimated_win_rate', 0)),
reverse=True
)
# 限制总数(保留质量最好的)
max_total = max_recommendations * 3 # 允许更多推荐以便参考
final_recommendations = sorted_recommendations[:max_total]
# 8. 保存到Redis缓存如果启用
if add_to_cache:
try:
cache_key = "recommendations:realtime"
# 增量更新:只更新或添加新的推荐,保留其他推荐
updated_count = 0
for rec in final_recommendations:
rec_key = f"{rec['symbol']}_{rec.get('order_type', 'LIMIT')}"
# 使用hset更新或添加如果不存在则创建
await self.client.redis_cache.hset(cache_key, rec_key, rec, ttl=3600) # TTL 1小时
updated_count += 1
# 设置整个Hash的TTL1小时
if self.client.redis_cache.redis and self.client.redis_cache._connected:
try:
await self.client.redis_cache.redis.expire(cache_key, 3600)
except:
pass
logger.info(f"已更新 {updated_count} 个推荐到Redis缓存总计 {len(final_recommendations)} 个)")
except Exception as e:
logger.warning(f"保存推荐到Redis失败: {e}")
logger.info(f"生成了 {len(new_recommendations)} 个新推荐,总计 {len(final_recommendations)} 个推荐(包含缓存)")
return final_recommendations
async def _analyze_trade_signal(self, symbol_info: Dict) -> Dict: async def _analyze_trade_signal(self, symbol_info: Dict) -> Dict:
""" """
@ -230,8 +336,9 @@ class TradeRecommender:
async def _create_recommendation( async def _create_recommendation(
self, self,
symbol_info: Dict, symbol_info: Dict,
trade_signal: Dict trade_signal: Dict,
) -> Optional[Dict]: account_balance: float = 1000
) -> Optional[List[Dict]]:
""" """
创建推荐记录 创建推荐记录
@ -291,16 +398,43 @@ class TradeRecommender:
else: # SELL else: # SELL
suggested_limit_price = current_price * (1 + limit_price_offset_pct / 100) suggested_limit_price = current_price * (1 + limit_price_offset_pct / 100)
# 准备推荐数据 # 添加时间戳
recommendation_data = { timestamp = time.time()
recommendation_time = datetime.now().isoformat()
# 计算胜率预估(基于信号强度、市场状态等)
base_win_rate = 40 + (signal_strength * 3) # 信号强度0-10对应胜率40-70%
market_regime = symbol_info.get('marketRegime', 'ranging')
if market_regime == 'ranging':
base_win_rate += 5
elif market_regime == 'trending':
base_win_rate += 3
trend_4h = trade_signal.get('trend_4h')
if trend_4h:
if (direction == 'BUY' and trend_4h == 'up') or (direction == 'SELL' and trend_4h == 'down'):
base_win_rate += 3
elif (direction == 'BUY' and trend_4h == 'down') or (direction == 'SELL' and trend_4h == 'up'):
base_win_rate -= 2
estimated_win_rate = max(35, min(75, base_win_rate))
# 计算盈亏USDT评估
position_value = account_balance * suggested_position_pct * config.TRADING_CONFIG.get('LEVERAGE', 10)
stop_loss_usdt = position_value * stop_loss_pct
take_profit_1_usdt = position_value * stop_loss_pct
take_profit_2_usdt = position_value * take_profit_2_pct
expected_pnl_1 = (estimated_win_rate / 100) * take_profit_1_usdt - ((100 - estimated_win_rate) / 100) * stop_loss_usdt
expected_pnl_2 = (estimated_win_rate / 100) * take_profit_2_usdt - ((100 - estimated_win_rate) / 100) * stop_loss_usdt
# 基础推荐数据
base_data = {
'symbol': symbol, 'symbol': symbol,
'direction': direction, 'direction': direction,
'current_price': current_price, 'current_price': current_price,
'change_percent': symbol_info.get('changePercent', 0), 'change_percent': symbol_info.get('changePercent', 0),
'recommendation_reason': trade_signal['reason'], 'recommendation_reason': trade_signal['reason'],
'signal_strength': signal_strength, 'signal_strength': signal_strength,
'market_regime': symbol_info.get('marketRegime'), 'market_regime': market_regime,
'trend_4h': trade_signal.get('trend_4h'), 'trend_4h': trend_4h,
'rsi': symbol_info.get('rsi'), 'rsi': symbol_info.get('rsi'),
'macd_histogram': symbol_info.get('macd', {}).get('histogram') if symbol_info.get('macd') else None, 'macd_histogram': symbol_info.get('macd', {}).get('histogram') if symbol_info.get('macd') else None,
'bollinger_upper': symbol_info.get('bollinger', {}).get('upper') if symbol_info.get('bollinger') else None, 'bollinger_upper': symbol_info.get('bollinger', {}).get('upper') if symbol_info.get('bollinger') else None,
@ -317,18 +451,36 @@ class TradeRecommender:
'suggested_leverage': config.TRADING_CONFIG.get('LEVERAGE', 10), 'suggested_leverage': config.TRADING_CONFIG.get('LEVERAGE', 10),
'volume_24h': symbol_info.get('volume24h'), 'volume_24h': symbol_info.get('volume24h'),
'volatility': symbol_info.get('volatility'), 'volatility': symbol_info.get('volatility'),
'order_type': 'LIMIT', # 使用限价单 'estimated_win_rate': round(estimated_win_rate, 1),
'suggested_limit_price': suggested_limit_price # 建议的挂单价 'expected_pnl_1': round(expected_pnl_1, 2),
'expected_pnl_2': round(expected_pnl_2, 2),
'stop_loss_usdt': round(stop_loss_usdt, 2),
'take_profit_1_usdt': round(take_profit_1_usdt, 2),
'take_profit_2_usdt': round(take_profit_2_usdt, 2),
'recommendation_time': recommendation_time,
'timestamp': timestamp
} }
# 不再自动保存到数据库,只返回推荐数据 # 限价单推荐
# 只有用户在前端点击"标记"时才会保存到数据库(用于复盘) limit_recommendation = base_data.copy()
limit_recommendation.update({
'order_type': 'LIMIT',
'suggested_limit_price': suggested_limit_price
})
# 市价单推荐
market_recommendation = base_data.copy()
market_recommendation.update({
'order_type': 'MARKET',
'suggested_limit_price': None
})
logger.debug( logger.debug(
f"✓ 生成推荐: {symbol} {direction} " f"✓ 生成推荐: {symbol} {direction} "
f"(信号强度: {signal_strength}/10)" f"(信号强度: {signal_strength}/10, 胜率预估: {estimated_win_rate:.1f}%)"
) )
return recommendation_data return [limit_recommendation, market_recommendation]
except Exception as e: except Exception as e:
logger.error(f"创建推荐失败 {symbol_info.get('symbol', 'unknown')}: {e}") logger.error(f"创建推荐失败 {symbol_info.get('symbol', 'unknown')}: {e}")