auto_trade_sys/trading_system/strategy.py
薇薇安 f99f508b09 a
2026-01-14 13:43:06 +08:00

349 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
交易策略模块 - 实现交易逻辑和优化
"""
import asyncio
import logging
from typing import List, Dict, Optional
try:
from .binance_client import BinanceClient
from .market_scanner import MarketScanner
from .risk_manager import RiskManager
from .position_manager import PositionManager
from . import config
except ImportError:
from binance_client import BinanceClient
from market_scanner import MarketScanner
from risk_manager import RiskManager
from position_manager import PositionManager
import config
logger = logging.getLogger(__name__)
class TradingStrategy:
"""交易策略类"""
def __init__(
self,
client: BinanceClient,
scanner: MarketScanner,
risk_manager: RiskManager,
position_manager: PositionManager
):
"""
初始化交易策略
Args:
client: 币安客户端
scanner: 市场扫描器
risk_manager: 风险管理器
position_manager: 仓位管理器
"""
self.client = client
self.scanner = scanner
self.risk_manager = risk_manager
self.position_manager = position_manager
self.running = False
async def execute_strategy(self):
"""
执行交易策略
"""
self.running = True
logger.info("交易策略开始执行...")
try:
while self.running:
# 1. 扫描市场找出涨跌幅最大的前N个货币对
top_symbols = await self.scanner.scan_market()
if not top_symbols:
logger.warning("未找到符合条件的交易对,等待下次扫描...")
await asyncio.sleep(config.TRADING_CONFIG['SCAN_INTERVAL'])
continue
# 2. 对每个交易对执行交易逻辑(使用技术指标)
for symbol_info in top_symbols:
if not self.running:
break
symbol = symbol_info['symbol']
change_percent = symbol_info['changePercent']
direction = symbol_info['direction']
market_regime = symbol_info.get('marketRegime', 'unknown')
logger.info(
f"处理交易对: {symbol} "
f"({direction} {change_percent:.2f}%, 市场状态: {market_regime})"
)
# 检查是否应该交易
if not await self.risk_manager.should_trade(symbol, change_percent):
continue
# 优化:结合成交量确认
if not await self._check_volume_confirmation(symbol_info):
logger.info(f"{symbol} 成交量确认失败,跳过")
continue
# 使用技术指标判断交易信号(高胜率策略)
trade_signal = await self._analyze_trade_signal(symbol_info)
# 记录交易信号到数据库
try:
import sys
from pathlib import Path
project_root = Path(__file__).parent.parent
backend_path = project_root / 'backend'
if backend_path.exists():
sys.path.insert(0, str(backend_path))
from database.models import TradingSignal
TradingSignal.create(
symbol=symbol,
signal_direction=trade_signal.get('direction', ''),
signal_strength=trade_signal.get('strength', 0),
signal_reason=trade_signal.get('reason', ''),
rsi=symbol_info.get('rsi'),
macd_histogram=symbol_info.get('macd', {}).get('histogram') if symbol_info.get('macd') else None,
market_regime=symbol_info.get('marketRegime')
)
except Exception as e:
logger.debug(f"记录交易信号失败: {e}")
if not trade_signal['should_trade']:
logger.info(
f"{symbol} 技术指标分析: {trade_signal['reason']}, 跳过"
)
continue
# 确定交易方向(基于技术指标)
trade_direction = trade_signal['direction']
entry_reason = trade_signal['reason']
logger.info(
f"{symbol} 交易信号: {trade_direction} | "
f"原因: {entry_reason} | "
f"信号强度: {trade_signal.get('strength', 0)}/10"
)
# 开仓(使用改进的仓位管理)
position = await self.position_manager.open_position(
symbol=symbol,
change_percent=change_percent,
leverage=config.TRADING_CONFIG.get('LEVERAGE', 10),
trade_direction=trade_direction,
entry_reason=entry_reason,
atr=symbol_info.get('atr')
)
if position:
logger.info(f"{symbol} 开仓成功: {trade_direction} ({entry_reason})")
else:
logger.warning(f"{symbol} 开仓失败")
# 避免同时处理太多交易对
await asyncio.sleep(1)
# 3. 检查止损止盈
await self.position_manager.check_stop_loss_take_profit()
# 4. 打印持仓摘要并记录账户快照
summary = await self.position_manager.get_position_summary()
if summary:
logger.info(
f"持仓摘要: {summary['totalPositions']} 个持仓, "
f"总盈亏: {summary['totalPnL']:.2f} USDT, "
f"可用余额: {summary['availableBalance']:.2f} USDT"
)
# 输出价格缓存统计
cache_size = len(self.client._price_cache)
if cache_size > 0:
logger.info(f"价格缓存: {cache_size}个交易对")
# 记录账户快照到数据库
try:
import sys
from pathlib import Path
project_root = Path(__file__).parent.parent
backend_path = project_root / 'backend'
if backend_path.exists():
sys.path.insert(0, str(backend_path))
from database.models import AccountSnapshot
AccountSnapshot.create(
total_balance=summary['totalBalance'],
available_balance=summary['availableBalance'],
total_position_value=sum(abs(p['positionAmt'] * p['entryPrice']) for p in summary.get('positions', [])),
total_pnl=summary['totalPnL'],
open_positions=summary['totalPositions']
)
except Exception as e:
logger.debug(f"记录账户快照失败: {e}")
# 重新加载配置(确保使用最新配置)
try:
if hasattr(config, 'reload_config'):
config.reload_config()
logger.info("配置已重新加载,将使用最新配置进行下次扫描")
except Exception as e:
logger.debug(f"重新加载配置失败: {e}")
# 等待下次扫描
scan_interval = config.TRADING_CONFIG.get('SCAN_INTERVAL', 3600)
logger.info(f"等待 {scan_interval} 秒后进行下次扫描...")
await asyncio.sleep(scan_interval)
except Exception as e:
logger.error(f"策略执行出错: {e}", exc_info=True)
finally:
self.running = False
logger.info("交易策略已停止")
async def _check_volume_confirmation(self, symbol_info: Dict) -> bool:
"""
成交量确认 - 确保有足够的成交量支撑
Args:
symbol_info: 交易对信息
Returns:
是否通过确认
"""
volume_24h = symbol_info.get('volume24h', 0)
min_volume = config.TRADING_CONFIG['MIN_VOLUME_24H']
if volume_24h < min_volume:
return False
return True
async def _analyze_trade_signal(self, symbol_info: Dict) -> Dict:
"""
使用技术指标分析交易信号(高胜率策略)
Args:
symbol_info: 交易对信息(包含技术指标)
Returns:
交易信号字典 {'should_trade': bool, 'direction': str, 'reason': str, 'strength': int}
"""
symbol = symbol_info['symbol']
current_price = symbol_info['price']
rsi = symbol_info.get('rsi')
macd = symbol_info.get('macd')
bollinger = symbol_info.get('bollinger')
market_regime = symbol_info.get('marketRegime', 'unknown')
ema20 = symbol_info.get('ema20')
ema50 = symbol_info.get('ema50')
signal_strength = 0
reasons = []
direction = None
# 策略1均值回归震荡市场高胜率
if market_regime == 'ranging':
# RSI超卖做多信号
if rsi and rsi < 30:
signal_strength += 4
reasons.append(f"RSI超卖({rsi:.1f})")
if direction is None:
direction = 'BUY'
# RSI超买做空信号
elif rsi and rsi > 70:
signal_strength += 4
reasons.append(f"RSI超买({rsi:.1f})")
if direction is None:
direction = 'SELL'
# 布林带下轨,做多信号
if bollinger and current_price <= bollinger['lower']:
signal_strength += 3
reasons.append("触及布林带下轨")
if direction is None:
direction = 'BUY'
# 布林带上轨,做空信号
elif bollinger and current_price >= bollinger['upper']:
signal_strength += 3
reasons.append("触及布林带上轨")
if direction is None:
direction = 'SELL'
# 策略2趋势跟踪趋势市场
elif market_regime == 'trending':
# MACD金叉做多信号
if macd and macd['macd'] > macd['signal'] and macd['histogram'] > 0:
signal_strength += 3
reasons.append("MACD金叉")
if direction is None:
direction = 'BUY'
# MACD死叉做空信号
elif macd and macd['macd'] < macd['signal'] and macd['histogram'] < 0:
signal_strength += 3
reasons.append("MACD死叉")
if direction is None:
direction = 'SELL'
# 均线系统
if ema20 and ema50:
if current_price > ema20 > ema50: # 上升趋势
signal_strength += 2
reasons.append("价格在均线之上")
if direction is None:
direction = 'BUY'
elif current_price < ema20 < ema50: # 下降趋势
signal_strength += 2
reasons.append("价格在均线之下")
if direction is None:
direction = 'SELL'
# 策略3综合信号提高胜率
# 多个指标同时确认时,信号更强
confirmations = 0
# RSI确认
if rsi:
if direction == 'BUY' and rsi < 50:
confirmations += 1
elif direction == 'SELL' and rsi > 50:
confirmations += 1
# MACD确认
if macd:
if direction == 'BUY' and macd['histogram'] > 0:
confirmations += 1
elif direction == 'SELL' and macd['histogram'] < 0:
confirmations += 1
# 布林带确认
if bollinger:
if direction == 'BUY' and current_price < bollinger['middle']:
confirmations += 1
elif direction == 'SELL' and current_price > bollinger['middle']:
confirmations += 1
# 多个指标确认时,增加信号强度
if confirmations >= 2:
signal_strength += 2
reasons.append(f"{confirmations}个指标确认")
# 判断是否应该交易(信号强度 >= 5 才交易,提高胜率)
should_trade = signal_strength >= config.TRADING_CONFIG.get('MIN_SIGNAL_STRENGTH', 5)
if not should_trade and direction:
reasons.append(f"信号强度不足({signal_strength}/10)")
return {
'should_trade': should_trade,
'direction': direction,
'reason': ', '.join(reasons) if reasons else '无明确信号',
'strength': signal_strength
}
def stop(self):
"""停止策略"""
self.running = False
logger.info("正在停止交易策略...")