This commit is contained in:
薇薇安 2026-01-13 15:23:16 +08:00
parent 1bb80ee0b0
commit 4266d52bc8
7 changed files with 439 additions and 84 deletions

View File

@ -6,6 +6,7 @@ import logging
from typing import Dict, List, Optional, Any from typing import Dict, List, Optional, Any
from binance import AsyncClient, BinanceSocketManager from binance import AsyncClient, BinanceSocketManager
from binance.exceptions import BinanceAPIException from binance.exceptions import BinanceAPIException
from unicorn_websocket import UnicornWebSocketManager
import config import config
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -28,6 +29,8 @@ class BinanceClient:
self.testnet = testnet or config.USE_TESTNET self.testnet = testnet or config.USE_TESTNET
self.client: Optional[AsyncClient] = None self.client: Optional[AsyncClient] = None
self.socket_manager: Optional[BinanceSocketManager] = None self.socket_manager: Optional[BinanceSocketManager] = None
self.unicorn_manager: Optional[UnicornWebSocketManager] = None
self.use_unicorn = config.TRADING_CONFIG.get('USE_UNICORN_WEBSOCKET', True)
async def connect(self, timeout: int = None, retries: int = None): async def connect(self, timeout: int = None, retries: int = None):
""" """
@ -65,6 +68,21 @@ class BinanceClient:
self.socket_manager = BinanceSocketManager(self.client) self.socket_manager = BinanceSocketManager(self.client)
logger.info(f"✓ 币安客户端连接成功 (测试网: {self.testnet})") logger.info(f"✓ 币安客户端连接成功 (测试网: {self.testnet})")
# 启动Unicorn WebSocket如果启用
if self.use_unicorn:
try:
self.unicorn_manager = UnicornWebSocketManager(testnet=self.testnet)
if self.unicorn_manager.start():
logger.info("✓ Unicorn WebSocket管理器启动成功")
# 启动异步数据处理
asyncio.create_task(self.unicorn_manager.process_stream_data())
else:
logger.warning("Unicorn WebSocket管理器启动失败将使用标准WebSocket")
self.unicorn_manager = None
except Exception as e:
logger.warning(f"启动Unicorn WebSocket失败: {e}将使用标准WebSocket")
self.unicorn_manager = None
# 验证API密钥权限 # 验证API密钥权限
await self._verify_api_permissions() await self._verify_api_permissions()
@ -118,6 +136,11 @@ class BinanceClient:
async def disconnect(self): async def disconnect(self):
"""断开连接""" """断开连接"""
# 停止Unicorn WebSocket
if self.unicorn_manager:
self.unicorn_manager.stop()
self.unicorn_manager = None
if self.client: if self.client:
await self.client.close_connection() await self.client.close_connection()
logger.info("币安客户端已断开连接") logger.info("币安客户端已断开连接")
@ -348,3 +371,40 @@ class BinanceClient:
except BinanceAPIException as e: except BinanceAPIException as e:
logger.error(f"设置杠杆失败: {e}") logger.error(f"设置杠杆失败: {e}")
return False return False
def subscribe_realtime_prices(self, symbols: List[str], callback) -> bool:
"""
订阅实时价格流使用Unicorn
Args:
symbols: 交易对列表
callback: 价格更新回调函数 callback(symbol, price, price_data)
Returns:
是否成功
"""
if not self.unicorn_manager:
logger.warning("Unicorn WebSocket未启用无法订阅实时价格")
return False
try:
self.unicorn_manager.subscribe_ticker(symbols, callback)
logger.info(f"订阅 {len(symbols)} 个交易对的实时价格流")
return True
except Exception as e:
logger.error(f"订阅实时价格流失败: {e}")
return False
def get_realtime_price(self, symbol: str) -> Optional[float]:
"""
获取实时价格从Unicorn WebSocket
Args:
symbol: 交易对
Returns:
实时价格如果未订阅则返回None
"""
if self.unicorn_manager:
return self.unicorn_manager.get_realtime_price(symbol)
return None

View File

@ -33,6 +33,16 @@ TRADING_CONFIG = {
# 过滤条件 # 过滤条件
'MIN_VOLUME_24H': 10000000, # 最小24小时成交量1000万USDT 'MIN_VOLUME_24H': 10000000, # 最小24小时成交量1000万USDT
'MIN_VOLATILITY': 0.02, # 最小波动率2% 'MIN_VOLATILITY': 0.02, # 最小波动率2%
# 高胜率策略参数
'MIN_SIGNAL_STRENGTH': 5, # 最小信号强度0-10越高越严格胜率越高
'LEVERAGE': 10, # 杠杆倍数
'USE_TRAILING_STOP': True, # 是否使用移动止损
'TRAILING_STOP_ACTIVATION': 0.01, # 移动止损激活阈值盈利1%后激活)
'TRAILING_STOP_PROTECT': 0.01, # 移动止损保护利润保护1%利润)
# Unicorn WebSocket配置
'USE_UNICORN_WEBSOCKET': True, # 是否使用Unicorn WebSocket高性能实时数据流
} }
# 连接配置 # 连接配置

View File

@ -1,10 +1,11 @@
""" """
市场扫描器 - 发现涨跌幅最大的前N个货币对 市场扫描器 - 发现涨跌幅最大的前N个货币对并分析技术指标
""" """
import asyncio import asyncio
import logging import logging
from typing import List, Dict, Optional from typing import List, Dict, Optional
from binance_client import BinanceClient from binance_client import BinanceClient
from indicators import TechnicalIndicators
import config import config
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -55,10 +56,14 @@ class MarketScanner:
and r.get('volume24h', 0) >= config.TRADING_CONFIG['MIN_VOLUME_24H'] and r.get('volume24h', 0) >= config.TRADING_CONFIG['MIN_VOLUME_24H']
] ]
# 按涨跌幅绝对值排序取前N个 # 按信号得分和涨跌幅综合排序取前N个
# 优先考虑技术指标信号得分高的
sorted_results = sorted( sorted_results = sorted(
filtered_results, filtered_results,
key=lambda x: abs(x['changePercent']), key=lambda x: (
x.get('signalScore', 0) * 10, # 信号得分权重更高
abs(x['changePercent']) # 其次考虑涨跌幅
),
reverse=True reverse=True
) )
@ -67,26 +72,30 @@ class MarketScanner:
self.top_symbols = top_n self.top_symbols = top_n
logger.info(f"扫描完成,找到 {len(top_n)} 个符合条件的交易对") logger.info(f"扫描完成,找到 {len(top_n)} 个符合条件的交易对")
# 打印结果 # 打印结果(包含技术指标)
for i, symbol_info in enumerate(top_n, 1): for i, symbol_info in enumerate(top_n, 1):
rsi_str = f"RSI:{symbol_info.get('rsi', 0):.1f}" if symbol_info.get('rsi') else "RSI:N/A"
regime_str = symbol_info.get('marketRegime', 'unknown')
score_str = f"信号:{symbol_info.get('signalScore', 0)}"
logger.info( logger.info(
f"{i}. {symbol_info['symbol']}: " f"{i}. {symbol_info['symbol']}: "
f"{symbol_info['changePercent']:.2f}% " f"{symbol_info['changePercent']:.2f}% | "
f"(价格: {symbol_info['price']:.4f}, " f"{rsi_str} | {regime_str} | {score_str} | "
f"成交量: {symbol_info.get('volume24h', 0):.0f})" f"价格: {symbol_info['price']:.4f}"
) )
return top_n return top_n
async def _get_symbol_change(self, symbol: str) -> Optional[Dict]: async def _get_symbol_change(self, symbol: str) -> Optional[Dict]:
""" """
获取单个交易对的涨跌幅 获取单个交易对的涨跌幅和技术指标
Args: Args:
symbol: 交易对 symbol: 交易对
Returns: Returns:
包含涨跌幅信息的字典 包含涨跌幅和技术指标信息的字典
""" """
try: try:
# 获取24小时行情数据 # 获取24小时行情数据
@ -94,35 +103,95 @@ class MarketScanner:
if not ticker: if not ticker:
return None return None
# 获取5分钟K线数据计算精确涨跌幅 # 获取更多K线数据用于技术指标计算
klines = await self.client.get_klines( klines_5m = await self.client.get_klines(
symbol=symbol, symbol=symbol,
interval=config.TRADING_CONFIG['KLINE_INTERVAL'], interval='5m',
limit=2 limit=50 # 获取更多数据用于计算指标
) )
if len(klines) < 2: if len(klines_5m) < 2:
return None return None
# 提取价格数据
close_prices = [float(k[4]) for k in klines_5m] # 收盘价
high_prices = [float(k[2]) for k in klines_5m] # 最高价
low_prices = [float(k[3]) for k in klines_5m] # 最低价
# 计算5分钟涨跌幅 # 计算5分钟涨跌幅
current_price = float(klines[-1][4]) # 最新收盘价 current_price = close_prices[-1]
prev_price = float(klines[-2][4]) # 5分钟前收盘价 prev_price = close_prices[-2] if len(close_prices) >= 2 else close_prices[0]
if prev_price == 0: if prev_price == 0:
return None return None
change_percent = ((current_price - prev_price) / prev_price) * 100 change_percent = ((current_price - prev_price) / prev_price) * 100
# 计算技术指标
rsi = TechnicalIndicators.calculate_rsi(close_prices, period=14)
macd = TechnicalIndicators.calculate_macd(close_prices)
bollinger = TechnicalIndicators.calculate_bollinger_bands(close_prices, period=20)
atr = TechnicalIndicators.calculate_atr(high_prices, low_prices, close_prices, period=14)
ema20 = TechnicalIndicators.calculate_ema(close_prices, period=20)
ema50 = TechnicalIndicators.calculate_ema(close_prices, period=50)
# 判断市场状态
market_regime = TechnicalIndicators.detect_market_regime(close_prices)
# 计算交易信号得分(用于排序)
signal_score = 0
# RSI信号均值回归
if rsi is not None:
if rsi < 30: # 超卖,做多信号
signal_score += 3
elif rsi > 70: # 超买,做空信号
signal_score += 3
elif 30 <= rsi <= 70: # 中性区域
signal_score += 1
# MACD信号
if macd and macd['histogram'] is not None:
if macd['histogram'] > 0 and macd['macd'] > macd['signal']: # 看涨
signal_score += 2
elif macd['histogram'] < 0 and macd['macd'] < macd['signal']: # 看跌
signal_score += 2
# 布林带信号(均值回归)
if bollinger:
if current_price <= bollinger['lower']: # 触及下轨,做多
signal_score += 3
elif current_price >= bollinger['upper']: # 触及上轨,做空
signal_score += 3
elif bollinger['lower'] < current_price < bollinger['upper']:
signal_score += 1
# 均线信号
if ema20 and ema50:
if current_price > ema20 > ema50: # 上升趋势
signal_score += 1
elif current_price < ema20 < ema50: # 下降趋势
signal_score += 1
return { return {
'symbol': symbol, 'symbol': symbol,
'price': current_price, 'price': current_price,
'prevPrice': prev_price, 'prevPrice': prev_price,
'changePercent': change_percent, 'changePercent': change_percent,
'volume24h': ticker.get('volume', 0), 'volume24h': ticker.get('volume', 0),
'direction': 'UP' if change_percent > 0 else 'DOWN' 'direction': 'UP' if change_percent > 0 else 'DOWN',
'rsi': rsi,
'macd': macd,
'bollinger': bollinger,
'atr': atr,
'ema20': ema20,
'ema50': ema50,
'marketRegime': market_regime,
'signalScore': signal_score,
'klines': klines_5m[-10:] # 保留最近10根K线
} }
except Exception as e: except Exception as e:
logger.debug(f"获取 {symbol} 涨跌幅失败: {e}") logger.debug(f"获取 {symbol} 数据失败: {e}")
return None return None
def get_top_symbols(self) -> List[Dict]: def get_top_symbols(self) -> List[Dict]:
@ -137,16 +206,43 @@ class MarketScanner:
async def monitor_price(self, symbol: str, callback) -> None: async def monitor_price(self, symbol: str, callback) -> None:
""" """
监控单个交易对的价格变化WebSocket 监控单个交易对的价格变化WebSocket
优先使用Unicorn如果不可用则使用标准WebSocket
Args: Args:
symbol: 交易对 symbol: 交易对
callback: 价格变化回调函数 callback: 价格变化回调函数
""" """
# 优先使用Unicorn WebSocket
if self.client.unicorn_manager:
try:
# 订阅实时价格
self.client.subscribe_realtime_prices([symbol], callback)
logger.info(f"使用Unicorn监控 {symbol} 价格")
return
except Exception as e:
logger.warning(f"Unicorn监控失败使用标准WebSocket: {e}")
# 回退到标准WebSocket
try: try:
async with self.client.socket_manager.futures_socket(symbol.lower()) as stream: if self.client.socket_manager:
async for msg in stream: async with self.client.socket_manager.futures_socket(symbol.lower()) as stream:
if 'data' in msg: async for msg in stream:
price = float(msg['data']['c']) # 最新价格 if 'data' in msg:
await callback(symbol, price) price = float(msg['data']['c']) # 最新价格
await callback(symbol, price)
except Exception as e: except Exception as e:
logger.error(f"监控 {symbol} 价格失败: {e}") logger.error(f"监控 {symbol} 价格失败: {e}")
def get_realtime_price(self, symbol: str) -> Optional[float]:
"""
获取实时价格从Unicorn WebSocket
Args:
symbol: 交易对
Returns:
实时价格
"""
if self.client:
return self.client.get_realtime_price(symbol)
return None

View File

@ -29,7 +29,10 @@ class PositionManager:
self, self,
symbol: str, symbol: str,
change_percent: float, change_percent: float,
leverage: int = 10 leverage: int = 10,
trade_direction: Optional[str] = None,
entry_reason: str = '',
atr: Optional[float] = None
) -> Optional[Dict]: ) -> Optional[Dict]:
""" """
开仓 开仓
@ -59,8 +62,11 @@ class PositionManager:
logger.warning(f"{symbol} 仓位计算失败,跳过交易") logger.warning(f"{symbol} 仓位计算失败,跳过交易")
return None return None
# 确定交易方向 # 确定交易方向(优先使用技术指标信号)
side = 'BUY' if change_percent > 0 else 'SELL' if trade_direction:
side = trade_direction
else:
side = 'BUY' if change_percent > 0 else 'SELL'
# 获取当前价格 # 获取当前价格
ticker = await self.client.get_ticker_24h(symbol) ticker = await self.client.get_ticker_24h(symbol)
@ -69,6 +75,25 @@ class PositionManager:
entry_price = ticker['price'] entry_price = ticker['price']
# 计算动态止损止盈使用ATR或固定比例
if atr and atr > 0:
# 使用ATR计算动态止损2倍ATR
atr_stop_loss_pct = (atr * 2) / entry_price
# 限制在合理范围内1%-5%
atr_stop_loss_pct = max(0.01, min(0.05, atr_stop_loss_pct))
stop_loss_price = self.risk_manager.get_stop_loss_price(
entry_price, side, stop_loss_pct=atr_stop_loss_pct
)
# 止盈为止损的1.5-2倍
take_profit_pct = atr_stop_loss_pct * 1.8
take_profit_price = self.risk_manager.get_take_profit_price(
entry_price, side, take_profit_pct=take_profit_pct
)
else:
# 使用固定止损止盈
stop_loss_price = self.risk_manager.get_stop_loss_price(entry_price, side)
take_profit_price = self.risk_manager.get_take_profit_price(entry_price, side)
# 下单 # 下单
order = await self.client.place_order( order = await self.client.place_order(
symbol=symbol, symbol=symbol,
@ -78,7 +103,7 @@ class PositionManager:
) )
if order: if order:
# 记录持仓信息 # 记录持仓信息(包含动态止损止盈)
position_info = { position_info = {
'symbol': symbol, 'symbol': symbol,
'side': side, 'side': side,
@ -86,9 +111,14 @@ class PositionManager:
'entryPrice': entry_price, 'entryPrice': entry_price,
'changePercent': change_percent, 'changePercent': change_percent,
'orderId': order.get('orderId'), 'orderId': order.get('orderId'),
'stopLoss': self.risk_manager.get_stop_loss_price(entry_price, side), 'stopLoss': stop_loss_price,
'takeProfit': self.risk_manager.get_take_profit_price(entry_price, side), 'takeProfit': take_profit_price,
'leverage': leverage 'initialStopLoss': stop_loss_price, # 初始止损(用于移动止损)
'leverage': leverage,
'entryReason': entry_reason,
'atr': atr,
'maxProfit': 0.0, # 记录最大盈利(用于移动止损)
'trailingStopActivated': False # 移动止损是否已激活
} }
self.active_positions[symbol] = position_info self.active_positions[symbol] = position_info
@ -193,16 +223,62 @@ class PositionManager:
else: else:
pnl_percent = ((entry_price - current_price) / entry_price) * 100 pnl_percent = ((entry_price - current_price) / entry_price) * 100
# 检查止损 # 更新最大盈利
if pnl_percent > position_info.get('maxProfit', 0):
position_info['maxProfit'] = pnl_percent
# 移动止损逻辑(盈利后保护利润)
use_trailing = config.TRADING_CONFIG.get('USE_TRAILING_STOP', True)
if use_trailing:
trailing_activation = config.TRADING_CONFIG.get('TRAILING_STOP_ACTIVATION', 0.01)
trailing_protect = config.TRADING_CONFIG.get('TRAILING_STOP_PROTECT', 0.01)
if not position_info.get('trailingStopActivated', False):
# 盈利超过阈值后,激活移动止损
if pnl_percent > trailing_activation * 100:
position_info['trailingStopActivated'] = True
# 将止损移至成本价(保本)
position_info['stopLoss'] = entry_price
logger.info(
f"{symbol} 移动止损激活: 止损移至成本价 {entry_price:.4f} "
f"(盈利: {pnl_percent:.2f}%)"
)
else:
# 盈利超过2%后,止损移至保护利润位
if pnl_percent > 2.0:
if position_info['side'] == 'BUY':
new_stop_loss = entry_price * (1 + trailing_protect)
if new_stop_loss > position_info['stopLoss']:
position_info['stopLoss'] = new_stop_loss
logger.info(
f"{symbol} 移动止损更新: {new_stop_loss:.4f} "
f"(保护{trailing_protect*100:.1f}%利润)"
)
else:
new_stop_loss = entry_price * (1 - trailing_protect)
if new_stop_loss < position_info['stopLoss']:
position_info['stopLoss'] = new_stop_loss
logger.info(
f"{symbol} 移动止损更新: {new_stop_loss:.4f} "
f"(保护{trailing_protect*100:.1f}%利润)"
)
# 检查止损(使用更新后的止损价)
stop_loss = position_info['stopLoss'] stop_loss = position_info['stopLoss']
if position_info['side'] == 'BUY' and current_price <= stop_loss: if position_info['side'] == 'BUY' and current_price <= stop_loss:
logger.warning(f"{symbol} 触发止损: {current_price:.4f} <= {stop_loss:.4f}") logger.warning(
f"{symbol} 触发止损: {current_price:.4f} <= {stop_loss:.4f} "
f"(盈亏: {pnl_percent:.2f}%)"
)
if await self.close_position(symbol): if await self.close_position(symbol):
closed_positions.append(symbol) closed_positions.append(symbol)
continue continue
if position_info['side'] == 'SELL' and current_price >= stop_loss: if position_info['side'] == 'SELL' and current_price >= stop_loss:
logger.warning(f"{symbol} 触发止损: {current_price:.4f} >= {stop_loss:.4f}") logger.warning(
f"{symbol} 触发止损: {current_price:.4f} >= {stop_loss:.4f} "
f"(盈亏: {pnl_percent:.2f}%)"
)
if await self.close_position(symbol): if await self.close_position(symbol):
closed_positions.append(symbol) closed_positions.append(symbol)
continue continue
@ -210,13 +286,19 @@ class PositionManager:
# 检查止盈 # 检查止盈
take_profit = position_info['takeProfit'] take_profit = position_info['takeProfit']
if position_info['side'] == 'BUY' and current_price >= take_profit: if position_info['side'] == 'BUY' and current_price >= take_profit:
logger.info(f"{symbol} 触发止盈: {current_price:.4f} >= {take_profit:.4f}") logger.info(
f"{symbol} 触发止盈: {current_price:.4f} >= {take_profit:.4f} "
f"(盈亏: {pnl_percent:.2f}%)"
)
if await self.close_position(symbol): if await self.close_position(symbol):
closed_positions.append(symbol) closed_positions.append(symbol)
continue continue
if position_info['side'] == 'SELL' and current_price <= take_profit: if position_info['side'] == 'SELL' and current_price <= take_profit:
logger.info(f"{symbol} 触发止盈: {current_price:.4f} <= {take_profit:.4f}") logger.info(
f"{symbol} 触发止盈: {current_price:.4f} <= {take_profit:.4f} "
f"(盈亏: {pnl_percent:.2f}%)"
)
if await self.close_position(symbol): if await self.close_position(symbol):
closed_positions.append(symbol) closed_positions.append(symbol)
continue continue

View File

@ -1,3 +1,4 @@
python-binance==1.0.19 python-binance==1.0.19
websocket-client==1.6.1 websocket-client==1.6.1
aiohttp==3.9.1 aiohttp==3.9.1
unicorn-binance-websocket-api==2.4.0

View File

@ -218,36 +218,48 @@ class RiskManager:
return True return True
def get_stop_loss_price(self, entry_price: float, side: str) -> float: def get_stop_loss_price(
self,
entry_price: float,
side: str,
stop_loss_pct: Optional[float] = None
) -> float:
""" """
计算止损价格 计算止损价格
Args: Args:
entry_price: 入场价格 entry_price: 入场价格
side: 方向 'BUY' 'SELL' side: 方向 'BUY' 'SELL'
stop_loss_pct: 止损百分比如果为None则使用配置值
Returns: Returns:
止损价格 止损价格
""" """
stop_loss_percent = self.config['STOP_LOSS_PERCENT'] stop_loss_percent = stop_loss_pct or self.config['STOP_LOSS_PERCENT']
if side == 'BUY': # 做多,止损价低于入场价 if side == 'BUY': # 做多,止损价低于入场价
return entry_price * (1 - stop_loss_percent) return entry_price * (1 - stop_loss_percent)
else: # 做空,止损价高于入场价 else: # 做空,止损价高于入场价
return entry_price * (1 + stop_loss_percent) return entry_price * (1 + stop_loss_percent)
def get_take_profit_price(self, entry_price: float, side: str) -> float: def get_take_profit_price(
self,
entry_price: float,
side: str,
take_profit_pct: Optional[float] = None
) -> float:
""" """
计算止盈价格 计算止盈价格
Args: Args:
entry_price: 入场价格 entry_price: 入场价格
side: 方向 'BUY' 'SELL' side: 方向 'BUY' 'SELL'
take_profit_pct: 止盈百分比如果为None则使用配置值
Returns: Returns:
止盈价格 止盈价格
""" """
take_profit_percent = self.config['TAKE_PROFIT_PERCENT'] take_profit_percent = take_profit_pct or self.config['TAKE_PROFIT_PERCENT']
if side == 'BUY': # 做多,止盈价高于入场价 if side == 'BUY': # 做多,止盈价高于入场价
return entry_price * (1 + take_profit_percent) return entry_price * (1 + take_profit_percent)

View File

@ -55,7 +55,7 @@ class TradingStrategy:
await asyncio.sleep(config.TRADING_CONFIG['SCAN_INTERVAL']) await asyncio.sleep(config.TRADING_CONFIG['SCAN_INTERVAL'])
continue continue
# 2. 对每个交易对执行交易逻辑 # 2. 对每个交易对执行交易逻辑(使用技术指标)
for symbol_info in top_symbols: for symbol_info in top_symbols:
if not self.running: if not self.running:
break break
@ -63,10 +63,11 @@ class TradingStrategy:
symbol = symbol_info['symbol'] symbol = symbol_info['symbol']
change_percent = symbol_info['changePercent'] change_percent = symbol_info['changePercent']
direction = symbol_info['direction'] direction = symbol_info['direction']
market_regime = symbol_info.get('marketRegime', 'unknown')
logger.info( logger.info(
f"处理交易对: {symbol} " f"处理交易对: {symbol} "
f"({direction} {change_percent:.2f}%)" f"({direction} {change_percent:.2f}%, 市场状态: {market_regime})"
) )
# 检查是否应该交易 # 检查是否应该交易
@ -78,20 +79,37 @@ class TradingStrategy:
logger.info(f"{symbol} 成交量确认失败,跳过") logger.info(f"{symbol} 成交量确认失败,跳过")
continue continue
# 优化:趋势确认(可选) # 使用技术指标判断交易信号(高胜率策略)
if not await self._check_trend_confirmation(symbol, change_percent): trade_signal = await self._analyze_trade_signal(symbol_info)
logger.info(f"{symbol} 趋势确认失败,跳过")
if not trade_signal['should_trade']:
logger.info(
f"{symbol} 技术指标分析: {trade_signal['reason']}, 跳过"
)
continue continue
# 开仓 # 确定交易方向(基于技术指标)
trade_direction = trade_signal['direction']
entry_reason = trade_signal['reason']
logger.info(
f"{symbol} 交易信号: {trade_direction} | "
f"原因: {entry_reason} | "
f"信号强度: {trade_signal.get('strength', 0)}/10"
)
# 开仓(使用改进的仓位管理)
position = await self.position_manager.open_position( position = await self.position_manager.open_position(
symbol=symbol, symbol=symbol,
change_percent=change_percent, change_percent=change_percent,
leverage=10 # 默认10倍杠杆 leverage=config.TRADING_CONFIG.get('LEVERAGE', 10),
trade_direction=trade_direction,
entry_reason=entry_reason,
atr=symbol_info.get('atr')
) )
if position: if position:
logger.info(f"{symbol} 开仓成功") logger.info(f"{symbol} 开仓成功: {trade_direction} ({entry_reason})")
else: else:
logger.warning(f"{symbol} 开仓失败") logger.warning(f"{symbol} 开仓失败")
@ -138,54 +156,130 @@ class TradingStrategy:
return True return True
async def _check_trend_confirmation( async def _analyze_trade_signal(self, symbol_info: Dict) -> Dict:
self,
symbol: str,
change_percent: float
) -> bool:
""" """
趋势确认 - 结合多时间周期确认趋势 使用技术指标分析交易信号高胜率策略
Args: Args:
symbol: 交易对 symbol_info: 交易对信息包含技术指标
change_percent: 5分钟涨跌幅
Returns: Returns:
是否通过确认 交易信号字典 {'should_trade': bool, 'direction': str, 'reason': str, 'strength': int}
""" """
try: symbol = symbol_info['symbol']
# 获取15分钟K线数据确认更大周期的趋势 current_price = symbol_info['price']
klines_15m = await self.client.get_klines( rsi = symbol_info.get('rsi')
symbol=symbol, macd = symbol_info.get('macd')
interval='15m', bollinger = symbol_info.get('bollinger')
limit=2 market_regime = symbol_info.get('marketRegime', 'unknown')
) ema20 = symbol_info.get('ema20')
ema50 = symbol_info.get('ema50')
signal_strength = 0
reasons = []
direction = None
# 策略1均值回归震荡市场高胜率
if market_regime == 'ranging':
# RSI超卖做多信号
if rsi and rsi < 30:
signal_strength += 4
reasons.append(f"RSI超卖({rsi:.1f})")
if direction is None:
direction = 'BUY'
if len(klines_15m) < 2: # RSI超买做空信号
return True # 如果无法获取,默认通过 elif rsi and rsi > 70:
signal_strength += 4
reasons.append(f"RSI超买({rsi:.1f})")
if direction is None:
direction = 'SELL'
# 计算15分钟涨跌幅 # 布林带下轨,做多信号
current_price_15m = float(klines_15m[-1][4]) if bollinger and current_price <= bollinger['lower']:
prev_price_15m = float(klines_15m[-2][4]) signal_strength += 3
reasons.append("触及布林带下轨")
if direction is None:
direction = 'BUY'
if prev_price_15m == 0: # 布林带上轨,做空信号
return True elif bollinger and current_price >= bollinger['upper']:
signal_strength += 3
reasons.append("触及布林带上轨")
if direction is None:
direction = 'SELL'
# 策略2趋势跟踪趋势市场
elif market_regime == 'trending':
# MACD金叉做多信号
if macd and macd['macd'] > macd['signal'] and macd['histogram'] > 0:
signal_strength += 3
reasons.append("MACD金叉")
if direction is None:
direction = 'BUY'
change_15m = ((current_price_15m - prev_price_15m) / prev_price_15m) * 100 # MACD死叉做空信号
elif macd and macd['macd'] < macd['signal'] and macd['histogram'] < 0:
signal_strength += 3
reasons.append("MACD死叉")
if direction is None:
direction = 'SELL'
# 5分钟和15分钟趋势一致时确认通过 # 均线系统
if (change_percent > 0 and change_15m > 0) or (change_percent < 0 and change_15m < 0): if ema20 and ema50:
return True if current_price > ema20 > ema50: # 上升趋势
signal_strength += 2
# 如果5分钟涨跌幅很大>5%即使15分钟不一致也允许 reasons.append("价格在均线之上")
if abs(change_percent) > 5: if direction is None:
return True direction = 'BUY'
elif current_price < ema20 < ema50: # 下降趋势
return False signal_strength += 2
reasons.append("价格在均线之下")
except Exception as e: if direction is None:
logger.debug(f"趋势确认失败 {symbol}: {e}") direction = 'SELL'
return True # 出错时默认通过
# 策略3综合信号提高胜率
# 多个指标同时确认时,信号更强
confirmations = 0
# RSI确认
if rsi:
if direction == 'BUY' and rsi < 50:
confirmations += 1
elif direction == 'SELL' and rsi > 50:
confirmations += 1
# MACD确认
if macd:
if direction == 'BUY' and macd['histogram'] > 0:
confirmations += 1
elif direction == 'SELL' and macd['histogram'] < 0:
confirmations += 1
# 布林带确认
if bollinger:
if direction == 'BUY' and current_price < bollinger['middle']:
confirmations += 1
elif direction == 'SELL' and current_price > bollinger['middle']:
confirmations += 1
# 多个指标确认时,增加信号强度
if confirmations >= 2:
signal_strength += 2
reasons.append(f"{confirmations}个指标确认")
# 判断是否应该交易(信号强度 >= 5 才交易,提高胜率)
should_trade = signal_strength >= config.TRADING_CONFIG.get('MIN_SIGNAL_STRENGTH', 5)
if not should_trade and direction:
reasons.append(f"信号强度不足({signal_strength}/10)")
return {
'should_trade': should_trade,
'direction': direction,
'reason': ', '.join(reasons) if reasons else '无明确信号',
'strength': signal_strength
}
def stop(self): def stop(self):
"""停止策略""" """停止策略"""