import time import sys import os import logging from datetime import datetime import pandas as pd import numpy as np from dotenv import load_dotenv # Try importing MetaTrader5 try: import MetaTrader5 as mt5 except ImportError: print("Error: MetaTrader5 module not found.") sys.exit(1) # Configure Logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('windows_bot.log', encoding='utf-8'), logging.StreamHandler() ] ) logger = logging.getLogger("WindowsBot") # Load Environment Variables load_dotenv() # Add current directory to sys.path to ensure local imports work current_dir = os.path.dirname(os.path.abspath(__file__)) if current_dir not in sys.path: sys.path.append(current_dir) # Import Local Modules try: from .ai_client_factory import AIClientFactory from .mt5_data_processor import MT5DataProcessor from .database_manager import DatabaseManager from .optimization import GWO, WOAm, DE, COAm, BBO, TETA from .advanced_analysis import ( AdvancedMarketAnalysis, AdvancedMarketAnalysisAdapter, MFHAnalyzer, SMCAnalyzer, MatrixMLAnalyzer, CRTAnalyzer, PriceEquationModel, TimeframeVisualAnalyzer, MTFAnalyzer ) except ImportError: # Fallback for direct script execution try: from ai_client_factory import AIClientFactory from mt5_data_processor import MT5DataProcessor from database_manager import DatabaseManager from optimization import GWO, WOAm, DE, COAm, BBO, TETA from advanced_analysis import ( AdvancedMarketAnalysis, AdvancedMarketAnalysisAdapter, MFHAnalyzer, SMCAnalyzer, MatrixMLAnalyzer, CRTAnalyzer, PriceEquationModel, TimeframeVisualAnalyzer, MTFAnalyzer ) except ImportError as e: logger.error(f"Failed to import modules: {e}") sys.exit(1) class HybridOptimizer: def __init__(self): self.weights = { "deepseek": 1.0, "qwen": 1.2, "crt": 0.8, "price_equation": 0.6, "tf_visual": 0.5, "advanced_tech": 0.7, "matrix_ml": 0.9, "smc": 1.1, "mfh": 0.8, "mtf": 0.8, "ifvg": 0.7, "rvgi_cci": 0.6 } self.history = [] def combine_signals(self, signals): weighted_sum = 0 total_weight = 0 details = {} for source, signal in signals.items(): weight = self.weights.get(source, 0.5) val = 0 if signal == 'buy': val = 1 elif signal == 'sell': val = -1 # DeepSeek/Qwen 信号包含强度,可以进一步加权? # 这里简化处理,只看方向 weighted_sum += val * weight total_weight += weight details[source] = val * weight if total_weight == 0: return "neutral", 0, self.weights final_score = weighted_sum / total_weight final_signal = "neutral" if final_score > 0.15: final_signal = "buy" # 降低阈值,更灵敏 elif final_score < -0.15: final_signal = "sell" return final_signal, final_score, self.weights class AI_MT5_Bot: def __init__(self, symbol="XAUUSD", timeframe=mt5.TIMEFRAME_M15): self.symbol = symbol self.timeframe = timeframe self.tf_name = "M15" if timeframe == mt5.TIMEFRAME_H1: self.tf_name = "H1" elif timeframe == mt5.TIMEFRAME_H4: self.tf_name = "H4" self.magic_number = 123456 self.lot_size = 0.01 self.max_drawdown_pct = 0.05 self.db_manager = DatabaseManager() self.ai_factory = AIClientFactory() self.deepseek_client = self.ai_factory.create_client("deepseek") self.qwen_client = self.ai_factory.create_client("qwen") self.crt_analyzer = CRTAnalyzer(timeframe_htf=mt5.TIMEFRAME_H1) self.mtf_analyzer = MTFAnalyzer(htf1=mt5.TIMEFRAME_M30, htf2=mt5.TIMEFRAME_H1) self.price_model = PriceEquationModel() self.tf_analyzer = TimeframeVisualAnalyzer() self.advanced_adapter = AdvancedMarketAnalysisAdapter() self.matrix_ml = MatrixMLAnalyzer() self.smc_analyzer = SMCAnalyzer() self.mfh_analyzer = MFHAnalyzer() self.optimizer = HybridOptimizer() self.last_bar_time = 0 self.last_analysis_time = 0 self.signal_history = [] self.last_optimization_time = 0 self.last_realtime_save = 0 self.latest_strategy = None self.latest_signal = "neutral" self.optimizers = { "GWO": GWO(), "WOAm": WOAm(), "DE": DE(), "COAm": COAm(), "BBO": BBO(), "TETA": TETA() } self.active_optimizer_name = "WOAm" def initialize_mt5(self): """初始化 MT5 连接""" # 尝试使用指定账户登录 account = 89633982 server = "Ava-Real 1-MT5" password = "Clj568741230#" if not mt5.initialize(login=account, server=server, password=password): logger.error(f"MT5 初始化失败, 错误码: {mt5.last_error()}") # 尝试不带账号初始化 if not mt5.initialize(): return False # 确保数据库路径设置正确 current_dir = os.path.dirname(os.path.abspath(__file__)) db_path = os.path.join(current_dir, 'trading_data.db') # 检查是否需要更新 db_manager 的路径 # DatabaseManager 默认初始化时可能使用了不同的路径,这里强制覆盖 if self.db_manager.db_path != db_path: logger.info(f"重新定向数据库路径到: {db_path}") self.db_manager = DatabaseManager(db_path=db_path) # 检查终端状态 term_info = mt5.terminal_info() if term_info is None: logger.error("无法获取终端信息") return False if not term_info.trade_allowed: logger.warning("⚠️ 警告: 终端 '自动交易' (Algo Trading) 未开启,无法执行交易!请在 MT5 工具栏点击 'Algo Trading' 按钮。") if not term_info.connected: logger.warning("⚠️ 警告: 终端未连接到交易服务器,请检查网络或账号设置。") # 确认交易品种存在 symbol_info = mt5.symbol_info(self.symbol) if symbol_info is None: logger.error(f"找不到交易品种 {self.symbol}") return False if not symbol_info.visible: logger.info(f"交易品种 {self.symbol} 不可见,尝试选中") if not mt5.symbol_select(self.symbol, True): logger.error(f"无法选中交易品种 {self.symbol}") return False # 检查品种是否允许交易 if symbol_info.trade_mode == mt5.SYMBOL_TRADE_MODE_DISABLED: logger.error(f"交易品种 {self.symbol} 禁止交易") return False logger.info(f"MT5 初始化成功,已连接到账户: {mt5.account_info().login}") return True def get_market_data(self, num_candles=100): """直接从 MT5 获取历史数据""" rates = mt5.copy_rates_from_pos(self.symbol, self.timeframe, 0, num_candles) if rates is None or len(rates) == 0: logger.error("无法获取 K 线数据") return None # 转换为 DataFrame df = pd.DataFrame(rates) df['time'] = pd.to_datetime(df['time'], unit='s') df.set_index('time', inplace=True) # 将 tick_volume 重命名为 volume 以保持一致性 if 'tick_volume' in df.columns: df.rename(columns={'tick_volume': 'volume'}, inplace=True) return df def get_position_stats(self, pos): """ 计算持仓的 MFE (最大潜在收益) 和 MAE (最大潜在亏损) """ try: # 获取持仓期间的 M1 数据 now = datetime.now() # pos.time 是时间戳,转换为 datetime open_time = datetime.fromtimestamp(pos.time) # 获取数据 rates = mt5.copy_rates_range(self.symbol, mt5.TIMEFRAME_M1, open_time, now) if rates is None or len(rates) == 0: # 如果获取不到数据,尝试只用当前价格估算 # 这种情况可能发生在刚刚开仓的一瞬间 current_price = pos.price_current if pos.type == mt5.POSITION_TYPE_BUY: mfe_price = max(0, current_price - pos.price_open) mae_price = max(0, pos.price_open - current_price) else: mfe_price = max(0, pos.price_open - current_price) mae_price = max(0, current_price - pos.price_open) if pos.price_open > 0: return (mfe_price / pos.price_open) * 100, (mae_price / pos.price_open) * 100 return 0.0, 0.0 df = pd.DataFrame(rates) # 计算期间最高价和最低价 # 注意: 还需要考虑当前价格,因为 M1 数据可能还没包含当前的 tick period_high = max(df['high'].max(), pos.price_current) period_low = min(df['low'].min(), pos.price_current) mfe = 0.0 mae = 0.0 if pos.type == mt5.POSITION_TYPE_BUY: # 买入: MFE = High - Open, MAE = Open - Low mfe_price = max(0, period_high - pos.price_open) mae_price = max(0, pos.price_open - period_low) else: # 卖出: MFE = Open - Low, MAE = High - Open mfe_price = max(0, pos.price_open - period_low) mae_price = max(0, period_high - pos.price_open) # 转换为百分比 if pos.price_open > 0: mfe = (mfe_price / pos.price_open) * 100 mae = (mae_price / pos.price_open) * 100 return mfe, mae except Exception as e: logger.error(f"计算持仓统计时出错: {e}") return 0.0, 0.0 def close_position(self, position, comment="AI-Bot Close"): """辅助函数: 平仓""" request = { "action": mt5.TRADE_ACTION_DEAL, "symbol": position.symbol, "volume": position.volume, "type": mt5.ORDER_TYPE_SELL if position.type == mt5.POSITION_TYPE_BUY else mt5.ORDER_TYPE_BUY, "position": position.ticket, "price": mt5.symbol_info_tick(self.symbol).bid if position.type == mt5.POSITION_TYPE_BUY else mt5.symbol_info_tick(self.symbol).ask, "deviation": 20, "magic": self.magic_number, "comment": comment, "type_time": mt5.ORDER_TIME_GTC, "type_filling": mt5.ORDER_FILLING_FOK, } result = mt5.order_send(request) if result.retcode != mt5.TRADE_RETCODE_DONE: logger.error(f"平仓失败 #{position.ticket}: {result.comment}") return False else: logger.info(f"平仓成功 #{position.ticket}") profit = getattr(result, 'profit', 0.0) self.send_telegram_message(f"🔄 *Position Closed*\nTicket: `{position.ticket}`\nReason: {comment}\nProfit: {profit}") return True def execute_trade(self, signal, strength, sl_tp_params, entry_params=None): """ 执行交易指令,完全由大模型驱动 """ # 允许所有相关指令进入 valid_actions = ['buy', 'sell', 'limit_buy', 'limit_sell', 'close', 'add_buy', 'add_sell', 'hold'] # 注意: signal 参数这里传入的是 final_signal,已经被归一化为 buy/sell/close/hold # 但我们更关心 entry_params 中的具体 action # --- 1. 获取市场状态 --- positions = mt5.positions_get(symbol=self.symbol) tick = mt5.symbol_info_tick(self.symbol) if not tick: logger.error("无法获取 Tick 数据") return # 解析 LLM 指令 # 这里的 entry_params 是从 strategy 字典中提取的 'entry_conditions' # 但 strategy 字典本身也有 'action' # 为了更准确,我们应该直接使用 self.latest_strategy (在 run 循环中更新) # 兼容性处理 llm_action = "hold" if self.latest_strategy: llm_action = self.latest_strategy.get('action', 'hold').lower() elif entry_params and 'action' in entry_params: llm_action = entry_params.get('action', 'hold').lower() else: llm_action = signal if signal in valid_actions else 'hold' # 显式 MFE/MAE 止损止盈 # LLM 应该返回具体的 sl_price 和 tp_price,或者 MFE/MAE 的百分比建议 # 如果 LLM 提供了具体的 SL/TP 价格,优先使用 explicit_sl = None explicit_tp = None if self.latest_strategy: explicit_sl = self.latest_strategy.get('sl') explicit_tp = self.latest_strategy.get('tp') # 如果没有具体价格,回退到 sl_tp_params (通常也是 LLM 生成的) if explicit_sl is None and sl_tp_params: explicit_sl = sl_tp_params.get('sl_price') if explicit_tp is None and sl_tp_params: explicit_tp = sl_tp_params.get('tp_price') logger.info(f"执行逻辑: Action={llm_action}, Signal={signal}, Explicit SL={explicit_sl}, TP={explicit_tp}") # --- 2. 持仓管理 (已开仓状态) --- if positions and len(positions) > 0: for pos in positions: pos_type = pos.type # 0: Buy, 1: Sell is_buy_pos = (pos_type == mt5.POSITION_TYPE_BUY) # A. 平仓/减仓逻辑 (Close) should_close = False close_reason = "" if llm_action in ['close', 'close_buy', 'close_sell']: # 检查方向匹配 if llm_action == 'close': should_close = True elif llm_action == 'close_buy' and is_buy_pos: should_close = True elif llm_action == 'close_sell' and not is_buy_pos: should_close = True if should_close: close_reason = "LLM Close Instruction" # 反向信号平仓 (Reversal) elif (llm_action in ['buy', 'add_buy'] and not is_buy_pos): should_close = True close_reason = "Reversal (Sell -> Buy)" elif (llm_action in ['sell', 'add_sell'] and is_buy_pos): should_close = True close_reason = "Reversal (Buy -> Sell)" if should_close: logger.info(f"执行平仓 #{pos.ticket}: {close_reason}") self.close_position(pos, comment=f"AI: {close_reason}") continue # B. 加仓逻辑 (Add Position) should_add = False if llm_action == 'add_buy' and is_buy_pos: should_add = True elif llm_action == 'add_sell' and not is_buy_pos: should_add = True # 如果是单纯的 buy/sell 信号,且已有同向仓位,通常视为 hold,除非明确 add # 但如果用户希望 "完全交给大模型",那么如果大模型在有仓位时发出了 buy,可能意味着加仓 # 为了安全,我们严格限制只有 'add_xxx' 才加仓,或者 signal 极强 if should_add: logger.info(f"执行加仓 #{pos.ticket} 方向") # 加仓逻辑复用开仓逻辑,但可能调整手数 self._send_order( "buy" if is_buy_pos else "sell", tick.ask if is_buy_pos else tick.bid, explicit_sl, explicit_tp, comment="AI: Add Position" ) # C. 持仓 (Hold) - 默认行为 # 更新 SL/TP (如果 LLM 给出了新的优化值) # 只有当新给出的 SL/TP 与当前差别较大时才修改 if explicit_sl is not None and explicit_tp is not None: # 简单的阈值检查,避免频繁修改 point = mt5.symbol_info(self.symbol).point if abs(pos.sl - explicit_sl) > 10 * point or abs(pos.tp - explicit_tp) > 10 * point: logger.info(f"更新持仓 SL/TP #{pos.ticket}: SL {pos.sl}->{explicit_sl}, TP {pos.tp}->{explicit_tp}") request = { "action": mt5.TRADE_ACTION_SLTP, "position": pos.ticket, "sl": explicit_sl, "tp": explicit_tp } mt5.order_send(request) # --- 3. 开仓/挂单逻辑 (未开仓 或 加仓) --- # 注意: 上面的循环处理了已有仓位的 Close 和 Add。 # 如果当前没有仓位,或者上面的逻辑没有触发 Close (即是 Hold), # 或者是 Reversal (Close 之后),我们需要看是否需要开新仓。 # 重新检查持仓数 (因为刚才可能平仓了) # 仅检查由本机器人 (Magic Number) 管理的持仓 all_positions = mt5.positions_get(symbol=self.symbol) bot_positions = [p for p in all_positions if p.magic == self.magic_number] if all_positions else [] has_position = len(bot_positions) > 0 # 如果有持仓且不是加仓指令,则不再开新仓 if has_position and 'add' not in llm_action: logger.info(f"已有持仓 ({len(bot_positions)}), 且非加仓指令 ({llm_action}), 跳过开仓") return # 执行开仓/挂单 trade_type = None price = 0.0 if llm_action == 'buy': trade_type = "buy" price = tick.ask elif llm_action == 'sell': trade_type = "sell" price = tick.bid elif llm_action in ['limit_buy', 'buy_limit']: # 优先使用 limit_price (与 prompt 一致),回退使用 entry_price price = entry_params.get('limit_price', entry_params.get('entry_price', 0.0)) if entry_params else 0.0 # 增强:如果价格无效,尝试自动修复 if price <= 0: logger.warning(f"LLM 建议 Limit Buy 但未提供价格,尝试使用 ATR 自动计算") # 获取 ATR rates = mt5.copy_rates_from_pos(self.symbol, self.timeframe, 0, 20) if rates is not None and len(rates) > 14: df_temp = pd.DataFrame(rates) high_low = df_temp['high'] - df_temp['low'] atr = high_low.rolling(14).mean().iloc[-1] if atr > 0: price = tick.ask - (atr * 0.5) # 默认在当前价格下方 0.5 ATR 处挂单 logger.info(f"自动设定 Limit Buy 价格: {price:.2f} (Ask: {tick.ask}, ATR: {atr:.4f})") # 智能判断 Limit vs Stop if price > 0: if price > tick.ask: trade_type = "stop_buy" # 价格高于当前价 -> 突破买入 else: trade_type = "limit_buy" # 价格低于当前价 -> 回调买入 elif llm_action in ['limit_sell', 'sell_limit']: price = entry_params.get('limit_price', entry_params.get('entry_price', 0.0)) if entry_params else 0.0 # 增强:如果价格无效,尝试自动修复 if price <= 0: logger.warning(f"LLM 建议 Limit Sell 但未提供价格,尝试使用 ATR 自动计算") # 获取 ATR rates = mt5.copy_rates_from_pos(self.symbol, self.timeframe, 0, 20) if rates is not None and len(rates) > 14: df_temp = pd.DataFrame(rates) high_low = df_temp['high'] - df_temp['low'] atr = high_low.rolling(14).mean().iloc[-1] if atr > 0: price = tick.bid + (atr * 0.5) # 默认在当前价格上方 0.5 ATR 处挂单 logger.info(f"自动设定 Limit Sell 价格: {price:.2f} (Bid: {tick.bid}, ATR: {atr:.4f})") # 智能判断 Limit vs Stop if price > 0: if price < tick.bid: trade_type = "stop_sell" # 价格低于当前价 -> 突破卖出 else: trade_type = "limit_sell" # 价格高于当前价 -> 反弹卖出 if trade_type and price > 0: # 再次确认 SL/TP 是否存在 if explicit_sl is None or explicit_tp is None: # 策略优化: 如果 LLM 未提供明确价格,则使用基于 MFE/MAE 的统计优化值 # 移除旧的 ATR 动态计算,确保策略的一致性和基于绩效的优化 logger.info("LLM 未提供明确 SL/TP,使用 MFE/MAE 统计优化值") # 计算 ATR rates = mt5.copy_rates_from_pos(self.symbol, self.timeframe, 0, 20) atr = 0.0 if rates is not None and len(rates) > 14: df_temp = pd.DataFrame(rates) high_low = df_temp['high'] - df_temp['low'] atr = high_low.rolling(14).mean().iloc[-1] explicit_sl, explicit_tp = self.calculate_optimized_sl_tp(trade_type, price, atr) if explicit_sl == 0 or explicit_tp == 0: logger.error("无法计算优化 SL/TP,放弃交易") return comment = f"AI: {llm_action.upper()}" self._send_order(trade_type, price, explicit_sl, explicit_tp, comment=comment) else: if llm_action not in ['hold', 'neutral']: logger.warning(f"无法执行交易: Action={llm_action}, TradeType={trade_type}, Price={price}") def _get_filling_mode(self): """ Get the correct order filling mode for the symbol. Checks broker support for FOK/IOC. """ symbol_info = mt5.symbol_info(self.symbol) if symbol_info is None: return mt5.ORDER_FILLING_FOK # Default # filling_mode is a flag property # 1: FOK, 2: IOC modes = symbol_info.filling_mode if modes & mt5.SYMBOL_FILLING_FOK: return mt5.ORDER_FILLING_FOK elif modes & mt5.SYMBOL_FILLING_IOC: return mt5.ORDER_FILLING_IOC else: return mt5.ORDER_FILLING_RETURN def _send_order(self, type_str, price, sl, tp, comment=""): """底层下单函数""" order_type = mt5.ORDER_TYPE_BUY action = mt5.TRADE_ACTION_DEAL if type_str == "buy": order_type = mt5.ORDER_TYPE_BUY action = mt5.TRADE_ACTION_DEAL elif type_str == "sell": order_type = mt5.ORDER_TYPE_SELL action = mt5.TRADE_ACTION_DEAL elif type_str == "limit_buy": order_type = mt5.ORDER_TYPE_BUY_LIMIT action = mt5.TRADE_ACTION_PENDING elif type_str == "limit_sell": order_type = mt5.ORDER_TYPE_SELL_LIMIT action = mt5.TRADE_ACTION_PENDING elif type_str == "stop_buy": order_type = mt5.ORDER_TYPE_BUY_STOP action = mt5.TRADE_ACTION_PENDING elif type_str == "stop_sell": order_type = mt5.ORDER_TYPE_SELL_STOP action = mt5.TRADE_ACTION_PENDING request = { "action": action, "symbol": self.symbol, "volume": self.lot_size, "type": order_type, "price": price, "sl": sl, "tp": tp, "deviation": 20, "magic": self.magic_number, "comment": comment, "type_time": mt5.ORDER_TIME_GTC, "type_filling": self._get_filling_mode(), } # 挂单需要不同的 filling type? 通常 Pending 订单不用 FOK,用 RETURN 或默认 if "limit" in type_str or "stop" in type_str: if 'type_filling' in request: del request['type_filling'] request['type_filling'] = mt5.ORDER_FILLING_RETURN logger.info(f"发送订单请求: Action={action}, Type={order_type}, Price={price:.2f}, SL={sl:.2f}, TP={tp:.2f}") result = mt5.order_send(request) if result is None: logger.error("order_send 返回 None") return if result.retcode != mt5.TRADE_RETCODE_DONE: logger.error(f"下单失败 ({type_str}): {result.comment}, retcode={result.retcode}") else: logger.info(f"下单成功 ({type_str}) #{result.order}") self.send_telegram_message(f"✅ *Order Executed*\nType: `{type_str.upper()}`\nPrice: `{price}`\nSL: `{sl}`\nTP: `{tp}`") def escape_markdown(self, text): """Helper to escape Markdown special characters for Telegram""" if not isinstance(text, str): text = str(text) # Escaping for Markdown (V1) escape_chars = '_*[`' for char in escape_chars: text = text.replace(char, f'\\{char}') return text def send_telegram_message(self, message): """发送消息到 Telegram""" token = "8253887074:AAE_o7hfEb6iJCZ2MdVIezOC_E0OnTCvCzY" chat_id = "5254086791" url = f"https://api.telegram.org/bot{token}/sendMessage" data = { "chat_id": chat_id, "text": message, "parse_mode": "Markdown" } # 配置代理 (针对中国大陆用户) # 如果您使用 Clash,通常端口是 7890 # 如果您使用 v2rayN,通常端口是 10809 proxies = { "http": "http://127.0.0.1:7890", "https": "http://127.0.0.1:7890" } try: import requests try: # 尝试通过代理发送 response = requests.post(url, json=data, timeout=10, proxies=proxies) except (requests.exceptions.ProxyError, requests.exceptions.ConnectionError): # 如果代理失败,尝试直连 (虽然可能也会被墙) logger.warning("代理连接失败,尝试直连 Telegram...") response = requests.post(url, json=data, timeout=10) if response.status_code != 200: logger.error(f"Telegram 发送失败: {response.text}") except Exception as e: logger.error(f"Telegram 发送异常: {e}") def manage_positions(self, signal=None, strategy_params=None): """ 根据最新分析结果管理持仓: 1. 更新止损止盈 (覆盖旧设置) - 基于 strategy_params 2. 执行移动止损 (Trailing Stop) 3. 检查是否需要平仓 (非反转情况,例如信号转弱) """ positions = mt5.positions_get(symbol=self.symbol) if positions is None or len(positions) == 0: return # 获取 ATR 用于计算移动止损距离 (动态调整) rates = mt5.copy_rates_from_pos(self.symbol, self.timeframe, 0, 20) atr = 0.0 if rates is not None and len(rates) > 14: df_temp = pd.DataFrame(rates) high_low = df_temp['high'] - df_temp['low'] atr = high_low.rolling(14).mean().iloc[-1] if atr <= 0: return # 无法计算 ATR,跳过 trailing_dist = atr * 1.5 # 默认移动止损距离 # 如果有策略参数,尝试解析最新的 SL/TP 设置 new_sl_multiplier = 1.5 new_tp_multiplier = 2.5 has_new_params = False if strategy_params: exit_cond = strategy_params.get('exit_conditions') if exit_cond: new_sl_multiplier = exit_cond.get('sl_atr_multiplier', 1.5) new_tp_multiplier = exit_cond.get('tp_atr_multiplier', 2.5) has_new_params = True symbol_info = mt5.symbol_info(self.symbol) if not symbol_info: return point = symbol_info.point # 遍历所有持仓,独立管理 for pos in positions: if pos.magic != self.magic_number: continue symbol = pos.symbol type_pos = pos.type # 0: Buy, 1: Sell price_open = pos.price_open sl = pos.sl tp = pos.tp current_price = pos.price_current # 针对每个订单独立计算最优 SL/TP # 如果是挂单成交后的新持仓,或者老持仓,都统一处理 request = { "action": mt5.TRADE_ACTION_SLTP, "symbol": symbol, "position": pos.ticket, "sl": sl, "tp": tp } changed = False # --- 1. 基于最新策略更新 SL/TP (全量覆盖更新) --- # 用户指令: "止盈和止损也需要根据大模型的最后整合分析结果来进行移动...而不是只有当新计算的 Trailing SL ... 还要高时,才再次更新" # 解读: 允许 SL/TP 动态调整,既可以收紧也可以放宽 (Breathing Stop),以适应 LLM 对市场波动率和结构的最新判断。 if has_new_params: current_sl_dist = atr * new_sl_multiplier current_tp_dist = atr * new_tp_multiplier # 计算建议的 SL/TP 价格 (基于当前价格) suggested_sl = 0.0 suggested_tp = 0.0 if type_pos == mt5.POSITION_TYPE_BUY: suggested_sl = current_price - current_sl_dist suggested_tp = current_price + current_tp_dist # 更新 SL: 始终更新 (移除 > sl 的限制) # 注意: 这意味着如果 ATR 变大或 Multiplier 变大,SL 可能会下移 (放宽) if abs(suggested_sl - sl) > point * 5: # 避免微小抖动 request['sl'] = suggested_sl changed = True # 更新 TP if abs(suggested_tp - tp) > point * 10: request['tp'] = suggested_tp changed = True elif type_pos == mt5.POSITION_TYPE_SELL: suggested_sl = current_price + current_sl_dist suggested_tp = current_price - current_tp_dist # 更新 SL: 始终更新 (移除 < sl 的限制) if abs(suggested_sl - sl) > point * 5: request['sl'] = suggested_sl changed = True # 更新 TP if abs(suggested_tp - tp) > point * 10: request['tp'] = suggested_tp changed = True # --- 2. 兜底移动止损 (Trailing Stop) --- # 如果上面没有因为 LLM 参数变化而更新,我们依然执行常规的 Trailing 逻辑 (仅收紧) # 只有当 'changed' 为 False 时才检查,避免冲突 if not changed: if type_pos == mt5.POSITION_TYPE_BUY: target_sl = current_price - (atr * new_sl_multiplier) # 常规 Trailing: 仅收紧 current_req_sl = request['sl'] if request['sl'] > 0 else sl if target_sl > current_req_sl: if (current_price - target_sl) >= point * 10: request['sl'] = target_sl changed = True elif type_pos == mt5.POSITION_TYPE_SELL: target_sl = current_price + (atr * new_sl_multiplier) # 常规 Trailing: 仅收紧 current_req_sl = request['sl'] if current_req_sl == 0 or target_sl < current_req_sl: if (target_sl - current_price) >= point * 10: request['sl'] = target_sl changed = True # 2. 移动止盈 (Trailing Take Profit) dist_to_tp = current_price - tp if dist_to_tp > 0 and dist_to_tp < (atr * 0.5): if signal == 'sell': new_tp = current_price - (atr * max(new_tp_multiplier, 1.0)) if new_tp < tp: request['tp'] = new_tp changed = True logger.info(f"🚀 移动止盈触发 (Sell): TP 延伸至 {new_tp:.2f}") if changed: logger.info(f"更新持仓 #{pos.ticket}: SL={request['sl']:.2f}, TP={request['tp']:.2f} (ATR x {new_sl_multiplier})") result = mt5.order_send(request) if result.retcode != mt5.TRADE_RETCODE_DONE: logger.error(f"持仓修改失败: {result.comment}") # --- 3. 检查信号平仓 --- # 如果最新信号转为反向或中立,且强度足够,可以考虑提前平仓 # 但 execute_trade 已经处理了反向开仓(会先平仓)。 # 这里只处理: 信号变 Weak/Neutral 时的防御性平仓 (如果需要) # 用户: "operate SL/TP, or close, open" if signal == 'neutral' and strategy_params: # 检查是否应该平仓 # 简单逻辑: 如果盈利 > 0 且信号消失,落袋为安? # 或者依靠 Trailing Stop 自然离场。 pass def analyze_closed_trades(self): """ 分析已平仓的交易,计算 MFE (最大有利波动) and MAE (最大不利波动) 用于后续 AI 学习和策略优化 """ try: # 1. 获取数据库中尚未标记为 CLOSED 的交易 open_trades = self.db_manager.get_open_trades() if not open_trades: return for trade in open_trades: ticket = trade['ticket'] # 这是 Order Ticket symbol = trade['symbol'] # 2. 检查该订单是否已完全平仓 # 我们通过 Order Ticket 查找对应的 History Orders 或 Deals # 注意: 在 MT5 中,一个 Position 可能由多个 Deal 组成 (In, Out) # 我们需要找到该 Order 开启的 Position ID # 尝试通过 Order Ticket 获取 Position ID # history_orders_get 可以通过 ticket 获取指定历史订单 # 但我们需要的是 Deals 来确定是否平仓 # 方法 A: 获取该 Order 的 Deal,得到 Position ID,然后查询 Position 的所有 Deals # 假设 Order Ticket 也是 Position ID (通常情况) position_id = ticket # 获取该 Position ID 的所有历史交易 # from_date 设为很久以前,确保能找到 deals = mt5.history_deals_get(position=position_id) if deals is None or len(deals) == 0: # 可能还没平仓,或者 Ticket 不是 Position ID # 如果是 Netting 账户,PositionID 通常等于开仓 Deal 的 Ticket continue # 检查是否有 ENTRY_OUT (平仓) 类型的 Deal has_out = False close_time = 0 close_price = 0.0 profit = 0.0 open_price = trade['price'] # 使用 DB 中的开仓价 open_time_ts = 0 # 重新计算利润和确认平仓 total_profit = 0.0 for deal in deals: total_profit += deal.profit + deal.swap + deal.commission if deal.entry == mt5.DEAL_ENTRY_IN: open_time_ts = deal.time # 如果 DB 中没有准确的开仓价,可以用这个: open_price = deal.price if deal.entry == mt5.DEAL_ENTRY_OUT: has_out = True close_time = deal.time close_price = deal.price # 如果有 OUT deal,说明已平仓 (或部分平仓,这里简化为只要有 OUT 就视为结束分析) # 并且要确保此时持仓量为 0 (完全平仓) # 通过 positions_get(ticket=position_id) 检查是否还存在不要简化 active_pos = mt5.positions_get(ticket=position_id) is_fully_closed = True if active_pos is not None and len(active_pos) > 0: # Position still exists is_fully_closed = False if has_out and is_fully_closed: # 这是一个已平仓的完整交易 # 获取该时段的 M1 数据来计算 MFE/MAE # 确保时间范围有效 if open_time_ts == 0: open_time_ts = int(pd.to_datetime(trade['time']).timestamp()) start_dt = datetime.fromtimestamp(open_time_ts) end_dt = datetime.fromtimestamp(close_time) if start_dt >= end_dt: continue rates = mt5.copy_rates_range(symbol, mt5.TIMEFRAME_M1, start_dt, end_dt) if rates is not None and len(rates) > 0: df_rates = pd.DataFrame(rates) max_high = df_rates['high'].max() min_low = df_rates['low'].min() mfe = 0.0 mae = 0.0 action = trade['action'] if action == 'BUY': mfe = (max_high - open_price) / open_price * 100 # % mae = abs((min_low - open_price) / open_price * 100) # % (Absolute) elif action == 'SELL': mfe = (open_price - min_low) / open_price * 100 # % mae = abs((open_price - max_high) / open_price * 100) # % (Absolute) # 更新数据库 self.db_manager.update_trade_performance(ticket, { "close_price": close_price, "close_time": end_dt, "profit": total_profit, "mfe": mfe, "mae": mae }) logger.info(f"分析交易 #{ticket} 完成: MFE={mfe:.2f}%, MAE={mae:.2f}%, Profit={total_profit:.2f}") except Exception as e: logger.error(f"分析历史交易失败: {e}") def evaluate_comprehensive_params(self, params, df): """ Comprehensive Objective Function: Evaluates ALL dataframe-based strategy parameters together. params: Vector of parameter values corresponding to the defined structure. """ # Global counter for progress logging if not hasattr(self, '_opt_counter'): self._opt_counter = 0 self._opt_counter += 1 if self._opt_counter % 50 == 0: logger.info(f"Optimization Progress: {self._opt_counter} evaluations...") # 1. Decode Parameters # 0: smc_ma (int) # 1: smc_atr (float) # 2: mfh_lr (float) # 3: mfh_horizon (int) # 4: pem_fast (int) # 5: pem_slow (int) # 6: pem_adx (float) # 7: rvgi_sma (int) # 8: rvgi_cci (int) # 9: ifvg_gap (int) try: p_smc_ma = int(params[0]) p_smc_atr = params[1] p_mfh_lr = params[2] p_mfh_horizon = int(params[3]) p_pem_fast = int(params[4]) p_pem_slow = int(params[5]) p_pem_adx = params[6] p_rvgi_sma = int(params[7]) p_rvgi_cci = int(params[8]) p_ifvg_gap = int(params[9]) # 2. Initialize Temporary Analyzers (Fresh State) tmp_smc = SMCAnalyzer() tmp_smc.ma_period = p_smc_ma tmp_smc.atr_threshold = p_smc_atr tmp_mfh = MFHAnalyzer(learning_rate=p_mfh_lr) tmp_mfh.horizon = p_mfh_horizon tmp_pem = PriceEquationModel() tmp_pem.ma_fast_period = p_pem_fast tmp_pem.ma_slow_period = p_pem_slow tmp_pem.adx_threshold = p_pem_adx tmp_adapter = AdvancedMarketAnalysisAdapter() # 3. Run Simulation start_idx = max(p_smc_ma, p_pem_slow, 50) + 10 if len(df) < start_idx + 50: return -9999 balance = 10000.0 closes = df['close'].values trades_count = 0 wins = 0 # Optimization: Step size > 1 to speed up (e.g., check every 4th candle ~ 1 hour) # But MFH needs continuous training. # We will run full loop but simplified logic. for i in range(start_idx, len(df)-1): sub_df = df.iloc[:i+1] curr_price = closes[i] next_price = closes[i+1] # MFH Train (Must happen every step for consistency) if i > p_mfh_horizon: past_ret = (closes[i] - closes[i-p_mfh_horizon]) / closes[i-p_mfh_horizon] tmp_mfh.train(past_ret) # Skip some heavy analysis for speed, only check every 2 bars? # No, we need accuracy. # Signals smc_sig = tmp_smc.analyze(sub_df)['signal'] mfh_sig = tmp_mfh.predict(sub_df)['signal'] tmp_pem.update(curr_price) pem_sig = tmp_pem.predict(sub_df)['signal'] # Short Term ifvg_sig = tmp_adapter.analyze_ifvg(sub_df, min_gap_points=p_ifvg_gap)['signal'] rvgi_sig = tmp_adapter.analyze_rvgi_cci_strategy(sub_df, sma_period=p_rvgi_sma, cci_period=p_rvgi_cci)['signal'] # Combine votes = 0 for s in [smc_sig, mfh_sig, pem_sig, ifvg_sig, rvgi_sig]: if s == 'buy': votes += 1 elif s == 'sell': votes -= 1 final_sig = "neutral" if votes >= 2: final_sig = "buy" elif votes <= -2: final_sig = "sell" # Evaluate if final_sig == "buy": trades_count += 1 if next_price > curr_price: wins += 1 balance += (next_price - curr_price) elif final_sig == "sell": trades_count += 1 if next_price < curr_price: wins += 1 balance += (curr_price - next_price) if trades_count == 0: return -100 # Simple Profit Metric score = (balance - 10000.0) return score except Exception: return -9999 def optimize_strategy_parameters(self): """ Comprehensive Optimization: Tunes ALL strategy parameters using Auto-AO. """ logger.info("开始执行全策略参数优化 (Comprehensive Auto-AO)...") # Reset progress counter self._opt_counter = 0 # 1. 获取历史数据 # Reduce data size for speed (300 candles ~ 75 hours of M15 data) df = self.get_market_data(300) if df is None or len(df) < 200: logger.warning("数据不足,跳过优化") return # 2. Define Search Space (10 Dimensions) # smc_ma, smc_atr, mfh_lr, mfh_horizon, pem_fast, pem_slow, pem_adx, rvgi_sma, rvgi_cci, ifvg_gap bounds = [ (100, 300), # smc_ma (0.001, 0.005), # smc_atr (0.001, 0.1), # mfh_lr (3, 10), # mfh_horizon (10, 50), # pem_fast (100, 300), # pem_slow (15.0, 30.0), # pem_adx (10, 50), # rvgi_sma (10, 30), # rvgi_cci (10, 100) # ifvg_gap ] steps = [10, 0.0005, 0.005, 1, 5, 10, 1.0, 2, 2, 5] # 3. Objective def objective(params): return self.evaluate_comprehensive_params(params, df) # 4. Optimizer import random algo_name = random.choice(list(self.optimizers.keys())) optimizer = self.optimizers[algo_name] # Adjust population size for realtime performance # Default is 50, which is too slow for 10-dim complex sim if hasattr(optimizer, 'pop_size'): optimizer.pop_size = 20 logger.info(f"本次选择的优化算法: {algo_name} (Pop: {optimizer.pop_size})") # 5. Run # Increase epochs slightly as space is larger, but keep low for realtime best_params, best_score = optimizer.optimize( objective, bounds, steps=steps, epochs=4 # Reduced from 8 to 4 for speed ) # 6. Apply Results if best_score > -1000: logger.info(f"全策略优化完成! Best Score: {best_score:.2f}") # Extract p_smc_ma = int(best_params[0]) p_smc_atr = best_params[1] p_mfh_lr = best_params[2] p_mfh_horizon = int(best_params[3]) p_pem_fast = int(best_params[4]) p_pem_slow = int(best_params[5]) p_pem_adx = best_params[6] p_rvgi_sma = int(best_params[7]) p_rvgi_cci = int(best_params[8]) p_ifvg_gap = int(best_params[9]) # Apply self.smc_analyzer.ma_period = p_smc_ma self.smc_analyzer.atr_threshold = p_smc_atr self.mfh_analyzer.learning_rate = p_mfh_lr self.mfh_analyzer.horizon = p_mfh_horizon # Re-init MFH buffers if horizon changed? # MFHAnalyzer uses horizon in calculate_features. # Ideally we should re-init but learning rate update is fine. self.price_model.ma_fast_period = p_pem_fast self.price_model.ma_slow_period = p_pem_slow self.price_model.adx_threshold = p_pem_adx self.short_term_params = { 'rvgi_sma': p_rvgi_sma, 'rvgi_cci': p_rvgi_cci, 'ifvg_gap': p_ifvg_gap } msg = ( f"🧬 *Comprehensive Optimization ({algo_name})*\n" f"Score: {best_score:.2f}\n" f"• SMC: MA={p_smc_ma}, ATR={p_smc_atr:.4f}\n" f"• MFH: LR={p_mfh_lr:.3f}, H={p_mfh_horizon}\n" f"• PEM: Fast={p_pem_fast}, Slow={p_pem_slow}, ADX={p_pem_adx:.1f}\n" f"• ST: RVGI({p_rvgi_sma},{p_rvgi_cci}), IFVG({p_ifvg_gap})" ) self.send_telegram_message(msg) logger.info(f"已更新所有策略参数: {msg}") else: logger.warning("优化失败,保持原有参数") def optimize_weights(self): """ 使用激活的优化算法 (GWO, WOAm, etc.) 实时优化 HybridOptimizer 的权重 解决优化算法一直为负数的问题:确保有实际运行并使用正向的适应度函数 (准确率) """ if len(self.signal_history) < 20: # 需要一定的历史数据 return logger.info(f"正在运行权重优化 ({self.active_optimizer_name})... 样本数: {len(self.signal_history)}") # 1. 准备数据 # 提取历史信号和实际结果 # history items: (timestamp, signals_dict, close_price) # 我们需要计算每个样本的实际涨跌: price[i+1] - price[i] samples = [] for i in range(len(self.signal_history) - 1): curr = self.signal_history[i] next_bar = self.signal_history[i+1] signals = curr[1] price_change = next_bar[2] - curr[2] actual_dir = 0 if price_change > 0: actual_dir = 1 elif price_change < 0: actual_dir = -1 if actual_dir != 0: samples.append((signals, actual_dir)) if len(samples) < 10: return # 2. 定义目标函数 (适应度函数) # 输入: 权重向量 [w1, w2, ...] # 输出: 准确率 (0.0 - 1.0) -> 保证非负 strategy_keys = list(self.optimizer.weights.keys()) def objective(weights_vec): correct = 0 total = 0 # 构建临时权重字典 temp_weights = {k: w for k, w in zip(strategy_keys, weights_vec)} for signals, actual_dir in samples: # 模拟 combine_signals weighted_sum = 0 total_w = 0 for strat, sig in signals.items(): w = temp_weights.get(strat, 1.0) if sig == 'buy': weighted_sum += w total_w += w elif sig == 'sell': weighted_sum -= w total_w += w if total_w > 0: norm_score = weighted_sum / total_w pred_dir = 0 if norm_score > 0.3: pred_dir = 1 elif norm_score < -0.3: pred_dir = -1 if pred_dir == actual_dir: correct += 1 total += 1 if total == 0: return 0.0 return correct / total # 返回准确率 # 3. 运行优化 optimizer = self.optimizers[self.active_optimizer_name] # 定义边界: 权重范围 [0.0, 2.0] bounds = [(0.0, 2.0) for _ in range(len(strategy_keys))] try: best_weights_vec, best_score = optimizer.optimize( objective_function=objective, bounds=bounds, epochs=20 # 实时运行不宜过久 ) # 4. 应用最佳权重 if best_score > 0: # 确保结果有效 for i, k in enumerate(strategy_keys): self.optimizer.weights[k] = best_weights_vec[i] logger.info(f"权重优化完成! 最佳准确率: {best_score:.2%}") logger.info(f"新权重: {self.optimizer.weights}") self.last_optimization_time = time.time() else: logger.warning("优化结果得分过低,未更新权重") except Exception as e: logger.error(f"权重优化失败: {e}") def calculate_optimized_sl_tp(self, trade_type, price, atr, market_context=None): """ 计算基于综合因素的优化止损止盈点 结合: 14天 ATR, MFE/MAE 统计, 市场分析(Supply/Demand/FVG) """ # 1. 基础波动率 (14天 ATR) # 确保传入的 ATR 是有效的 14周期 ATR if atr <= 0: atr = price * 0.005 # Fallback # 2. 历史绩效 (MFE/MAE) mfe_tp_dist = atr * 2.0 # 默认 mae_sl_dist = atr * 1.5 # 默认 try: stats = self.db_manager.get_trade_performance_stats(limit=100) trades = [] if isinstance(stats, list): trades = stats elif isinstance(stats, dict) and 'recent_trades' in stats: trades = stats['recent_trades'] if trades and len(trades) > 10: mfes = [t.get('mfe', 0) for t in trades if t.get('mfe', 0) > 0] maes = [abs(t.get('mae', 0)) for t in trades if abs(t.get('mae', 0)) > 0] if mfes and maes: # 使用 ATR 倍数来标准化 MFE/MAE (假设 MFE/MAE 也是以 ATR 为单位存储,或者我们需要转换) # 如果 DB 存的是百分比,我们需要将其转换为当前 ATR 倍数 # 这里简化处理:直接取百分比的中位数,然后转换为价格距离 opt_tp_pct = np.percentile(mfes, 60) / 100.0 # 60分位数 opt_sl_pct = np.percentile(maes, 90) / 100.0 # 90分位数 mfe_tp_dist = price * opt_tp_pct mae_sl_dist = price * opt_sl_pct except Exception as e: logger.warning(f"MFE/MAE 计算失败: {e}") # 3. 市场结构调整 (Supply/Demand/FVG) # 从 market_context 中获取关键位 struct_tp_price = 0.0 struct_sl_price = 0.0 if market_context: # 获取最近的 Supply/Demand 区间 # 假设 market_context 包含 advanced_tech 或 ifvg 结果 is_buy = 'buy' in trade_type # 寻找止盈点 (最近的阻力位/FVG) if is_buy: # 买入 TP: 最近的 Supply Zone 或 Bearish FVG 的下沿 resistance_candidates = [] if 'supply_zones' in market_context: # 找出所有高于当前价格的 Supply Zone bottom # 注意: zones 可能是 [(top, bottom), ...] 或其他结构,需要类型检查 raw_zones = market_context['supply_zones'] if raw_zones and isinstance(raw_zones, list): try: # 尝试解析可能的元组/列表结构 valid_zones = [] for z in raw_zones: if isinstance(z, (list, tuple)) and len(z) >= 2: # 假设结构是 (top, bottom, ...) if z[1] > price: valid_zones.append(z[1]) elif isinstance(z, dict): # 假设结构是 {'top': ..., 'bottom': ...} btm = z.get('bottom') if btm and btm > price: valid_zones.append(btm) if valid_zones: resistance_candidates.append(min(valid_zones)) except Exception as e: logger.warning(f"解析 Supply Zones 失败: {e}") if 'bearish_fvgs' in market_context: raw_fvgs = market_context['bearish_fvgs'] if raw_fvgs and isinstance(raw_fvgs, list): try: valid_fvgs = [] for f in raw_fvgs: if isinstance(f, dict): btm = f.get('bottom') if btm and btm > price: valid_fvgs.append(btm) if valid_fvgs: resistance_candidates.append(min(valid_fvgs)) except Exception as e: logger.warning(f"解析 Bearish FVG 失败: {e}") if resistance_candidates: struct_tp_price = min(resistance_candidates) else: # 卖出 TP: 最近的 Demand Zone 或 Bullish FVG 的上沿 support_candidates = [] if 'demand_zones' in market_context: raw_zones = market_context['demand_zones'] if raw_zones and isinstance(raw_zones, list): try: valid_zones = [] for z in raw_zones: if isinstance(z, (list, tuple)) and len(z) >= 2: # 假设结构是 (top, bottom, ...) if z[0] < price: valid_zones.append(z[0]) elif isinstance(z, dict): top = z.get('top') if top and top < price: valid_zones.append(top) if valid_zones: support_candidates.append(max(valid_zones)) except Exception as e: logger.warning(f"解析 Demand Zones 失败: {e}") if 'bullish_fvgs' in market_context: raw_fvgs = market_context['bullish_fvgs'] if raw_fvgs and isinstance(raw_fvgs, list): try: valid_fvgs = [] for f in raw_fvgs: if isinstance(f, dict): top = f.get('top') if top and top < price: valid_fvgs.append(top) if valid_fvgs: support_candidates.append(max(valid_fvgs)) except Exception as e: logger.warning(f"解析 Bullish FVG 失败: {e}") if support_candidates: struct_tp_price = max(support_candidates) # 寻找止损点 (最近的支撑位/结构点) # 这里简化逻辑,通常 SL 放在结构点外侧 # 可以使用 recent swing high/low pass # 4. 综合计算 # 逻辑: # TP: 优先使用结构位 (Struct TP),如果结构位太远或太近,使用 MFE/ATR 修正 # SL: 使用 MAE/ATR 保护,但如果结构位 (如 Swing Low) 在附近,可以参考 final_sl = 0.0 final_tp = 0.0 # 基础计算 if 'buy' in trade_type: base_tp = price + mfe_tp_dist base_sl = price - mae_sl_dist # TP 融合 if struct_tp_price > price: # 如果结构位比基础 TP 近,说明上方有阻力,保守起见设在阻力前 # 如果结构位比基础 TP 远,可以尝试去拿,但最好分批。这里取加权平均或保守值 if struct_tp_price < base_tp: final_tp = struct_tp_price - (atr * 0.1) # 阻力下方一点点 else: final_tp = base_tp # 保持 MFE 目标,比较稳健 else: final_tp = base_tp final_sl = base_sl # SL 主要靠统计风控 else: # Sell base_tp = price - mfe_tp_dist base_sl = price + mae_sl_dist if struct_tp_price > 0 and struct_tp_price < price: if struct_tp_price > base_tp: # 支撑位在目标上方 (更近) final_tp = struct_tp_price + (atr * 0.1) else: final_tp = base_tp else: final_tp = base_tp final_sl = base_sl return final_sl, final_tp def optimize_short_term_params(self): """ Optimize short-term strategy parameters (RVGI+CCI, IFVG) Executed every 1 hour """ logger.info("Running Short-Term Parameter Optimization (WOAm)...") # 1. Get Data (Last 200 M15 candles) df = self.get_market_data(200) if df is None or len(df) < 100: return # 2. Define Objective Function # Optimize for RVGI+CCI and IFVG parameters def objective(params): # Params: [rvgi_sma, rvgi_cci, ifvg_gap] p_rvgi_sma = int(params[0]) p_rvgi_cci = int(params[1]) p_ifvg_gap = int(params[2]) # Run simulation # We use a simplified simulation here for speed # Calculate indicators on the whole dataframe try: # RVGI+CCI # We need to call analyze_rvgi_cci_strategy for each candle? No, too slow. # We rely on the fact that we can calculate the whole series. # But the Analyzer methods currently return a single snapshot for the last candle. # To do this efficiently without rewriting everything, we iterate over the last 50 candles. score = 0 trades = 0 wins = 0 # Iterate through the last 50 candles as "current" for i in range(len(df)-50, len(df)): sub_df = df.iloc[:i+1] future_close = df.iloc[i+1]['close'] if i+1 < len(df) else df.iloc[i]['close'] current_close = df.iloc[i]['close'] # RVGI Signal res_rvgi = self.advanced_adapter.analyze_rvgi_cci_strategy( sub_df, sma_period=p_rvgi_sma, cci_period=p_rvgi_cci ) # IFVG Signal res_ifvg = self.advanced_adapter.analyze_ifvg( sub_df, min_gap_points=p_ifvg_gap ) signal = 0 if res_rvgi['signal'] == 'buy': signal += 1 elif res_rvgi['signal'] == 'sell': signal -= 1 if res_ifvg['signal'] == 'buy': signal += 1 elif res_ifvg['signal'] == 'sell': signal -= 1 if signal > 0: # Buy trades += 1 if future_close > current_close: wins += 1 else: score -= 1 elif signal < 0: # Sell trades += 1 if future_close < current_close: wins += 1 else: score -= 1 if trades == 0: return 0 win_rate = wins / trades score += (win_rate * 100) return score except Exception: return -100 # 3. Optimization optimizer = WOAm() bounds = [(10, 50), (7, 21), (10, 100)] # [sma, cci, gap] steps = [1, 1, 5] best_params, best_score = optimizer.optimize(objective, bounds, steps=steps, epochs=3) # 4. Apply if best_score > 0: logger.info(f"Short-Term Optimization Complete. Score: {best_score}") logger.info(f"New Params: RVGI_SMA={int(best_params[0])}, RVGI_CCI={int(best_params[1])}, IFVG_GAP={int(best_params[2])}") # Store these params in a property to be used by analyze_full # We need to add a property to store these or pass them self.short_term_params = { 'rvgi_sma': int(best_params[0]), 'rvgi_cci': int(best_params[1]), 'ifvg_gap': int(best_params[2]) } # We also need to update the analyze call in run() to use these else: logger.info("Short-Term Optimization found no improvement.") def run(self): """主循环""" if not self.initialize_mt5(): return logger.info(f"启动 AI 自动交易机器人 - {self.symbol}") self.send_telegram_message(f"🤖 *AI Bot Started*\nSymbol: `{self.symbol}`\nTimeframe: `{self.timeframe}`") try: while True: # 0. 管理持仓 (移动止损) - 使用最新策略 if self.latest_strategy: self.manage_positions(self.latest_signal, self.latest_strategy) else: self.manage_positions() # 降级为默认 # 0.5 分析已平仓交易 (每 60 次循环 / 约 1 分钟执行一次) if int(time.time()) % 60 == 0: self.analyze_closed_trades() # 0.6 执行策略参数优化 (每 4 小时一次) if time.time() - self.last_optimization_time > 14400: self.optimize_strategy_parameters() self.last_optimization_time = time.time() # 0.7 执行短线参数优化 (每 1 小时一次) if int(time.time()) % 3600 == 0: self.optimize_short_term_params() # 1. 检查新 K 线 # 获取最后一根 K 线的时间 rates = mt5.copy_rates_from_pos(self.symbol, self.timeframe, 0, 1) if rates is None: time.sleep(1) continue current_bar_time = rates[0]['time'] # --- Real-time Data Update (Added for Dashboard) --- # 每隔 3 秒保存一次当前正在形成的 K 线数据到数据库 # 这样 Dashboard 就可以看到实时价格跳动 if time.time() - self.last_realtime_save > 3: try: df_current = pd.DataFrame(rates) df_current['time'] = pd.to_datetime(df_current['time'], unit='s') df_current.set_index('time', inplace=True) if 'tick_volume' in df_current.columns: df_current.rename(columns={'tick_volume': 'volume'}, inplace=True) self.db_manager.save_market_data(df_current.copy(), self.symbol, self.tf_name) self.last_realtime_save = time.time() # --- 实时保存账户信息 (新增) --- try: account_info = mt5.account_info() if account_info: # 计算当前品种的浮动盈亏 positions = mt5.positions_get(symbol=self.symbol) symbol_pnl = 0.0 magic_positions_count = 0 if positions: for pos in positions: # 仅统计和计算属于本策略ID的持仓 if pos.magic == self.magic_number: magic_positions_count += 1 # Handle different position object structures safely profit = getattr(pos, 'profit', 0.0) swap = getattr(pos, 'swap', 0.0) commission = getattr(pos, 'commission', 0.0) # Check attribute existence symbol_pnl += profit + swap + commission # 显示当前 ID 的持仓状态 # if magic_positions_count > 0: # logger.info(f"ID {self.magic_number} 当前持仓: {magic_positions_count} 个") # else: # pass metrics = { "timestamp": datetime.now(), "balance": account_info.balance, "equity": account_info.equity, "margin": account_info.margin, "free_margin": account_info.margin_free, "margin_level": account_info.margin_level, "total_profit": account_info.profit, "symbol_pnl": symbol_pnl } self.db_manager.save_account_metrics(metrics) except Exception as e: logger.error(f"Failed to save account metrics: {e}") # ------------------------------ # 实时更新持仓 SL/TP (使用最近一次分析的策略) if self.latest_strategy: self.manage_positions(self.latest_signal, self.latest_strategy) except Exception as e: logger.error(f"Real-time data save failed: {e}") # --------------------------------------------------- # 如果是新 K 线 或者 这是第一次运行 (last_bar_time 为 0) # 用户需求: 每15分钟执行一次分析 (即跟随 M15 K 线收盘) is_new_bar = current_bar_time != self.last_bar_time if is_new_bar: if self.last_bar_time == 0: logger.info("首次运行,立即执行分析...") else: logger.info(f"发现新 K 线: {datetime.fromtimestamp(current_bar_time)}") self.last_bar_time = current_bar_time self.last_analysis_time = time.time() # 2. 获取数据并分析 # ... 这里的代码保持不变 ... # PEM 需要至少 108 根 K 线 (ma_fast_period),MTF 更新 Zones 需要 500 根 # 为了确保所有模块都有足够数据,我们获取 300 根 (MTF Zones 在 update_zones 内部单独获取) df = self.get_market_data(300) # 获取最近的 Tick 数据用于 Matrix ML # 尝试获取最近 20 个 tick ticks = mt5.copy_ticks_from(self.symbol, current_bar_time, 20, mt5.COPY_TICKS_ALL) if ticks is None: ticks = [] if df is not None: # 保存市场数据到DB self.db_manager.save_market_data(df, self.symbol, self.tf_name) # 使用 data_processor 计算指标 processor = MT5DataProcessor() df_features = processor.generate_features(df) # 3. 调用 AI 与高级分析 # 构建市场快照 current_price = df.iloc[-1] latest_features = df_features.iloc[-1].to_dict() market_snapshot = { "symbol": self.symbol, "timeframe": self.tf_name, "prices": { "open": float(current_price['open']), "high": float(current_price['high']), "low": float(current_price['low']), "close": float(current_price['close']), "volume": int(current_price['volume']) }, "indicators": { "rsi": float(latest_features.get('rsi', 50)), "atr": float(latest_features.get('atr', 0)), "ema_fast": float(latest_features.get('ema_fast', 0)), "ema_slow": float(latest_features.get('ema_slow', 0)), "volatility": float(latest_features.get('volatility', 0)) } } # --- 3.1 CRT 分析 --- crt_result = self.crt_analyzer.analyze(self.symbol, current_price, current_bar_time) logger.info(f"CRT 分析: {crt_result['signal']} ({crt_result['reason']})") # --- 3.2 价格方程模型 (PEM) --- self.price_model.update(float(current_price['close'])) price_eq_result = self.price_model.predict(df) # 传入 df 进行分析 logger.info(f"PEM 预测: {price_eq_result['signal']} (目标: {price_eq_result['predicted_price']:.2f})") # --- 3.2.1 多时间周期分析 (新增) --- tf_result = self.tf_analyzer.analyze(self.symbol, current_bar_time) logger.info(f"TF 分析: {tf_result['signal']} ({tf_result['reason']})") # --- 3.2.2 高级技术分析 (新增) --- # Use optimized parameters if available st_params = getattr(self, 'short_term_params', {}) adv_result = self.advanced_adapter.analyze_full(df, params=st_params) adv_signal = "neutral" if adv_result: adv_signal = adv_result['signal_info']['signal'] logger.info(f"高级技术分析: {adv_signal} (强度: {adv_result['signal_info']['strength']})") logger.info(f"市场状态: {adv_result['regime']['description']}") # --- 3.2.3 Matrix ML 分析 (新增) --- # 首先进行训练 (基于上一次预测和当前价格变动) price_change = float(current_price['close']) - float(df.iloc[-2]['close']) if len(df) > 1 else 0 loss = self.matrix_ml.train(price_change) if loss: logger.info(f"Matrix ML 训练 Loss: {loss:.4f}") # 进行预测 ml_result = self.matrix_ml.predict(ticks) logger.info(f"Matrix ML 预测: {ml_result['signal']} (Raw: {ml_result.get('raw_output', 0.0):.2f})") # --- 3.2.4 SMC 分析 (新增) --- smc_result = self.smc_analyzer.analyze(df, self.symbol) logger.info(f"SMC 结构: {smc_result['structure']} (信号: {smc_result['signal']})") # --- 3.2.5 MFH 分析 (新增) --- # 计算真实收益率用于训练 (t - t_horizon) # 我们需要足够的数据来计算 Horizon 收益 horizon = 5 mfh_slope = 0.0 mfh_signal = "neutral" if len(df) > horizon + 10: # 1. 训练 (Delayed Training) # 实际发生的 Horizon 收益: (Close[t] - Close[t-5]) / Close[t-5] current_close = float(current_price['close']) past_close = float(df.iloc[-1 - horizon]['close']) if past_close > 0: actual_return = (current_close - past_close) / past_close self.mfh_analyzer.train(actual_return) # 2. 预测 mfh_result = self.mfh_analyzer.predict(df) mfh_slope = mfh_result['slope'] mfh_signal = mfh_result['signal'] logger.info(f"MFH 斜率: {mfh_slope:.4f} (信号: {mfh_signal})") else: mfh_result = {"signal": "neutral", "slope": 0.0} # --- 3.2.6 MTF 分析 (新增) --- mtf_result = self.mtf_analyzer.analyze(self.symbol, current_price, current_bar_time) logger.info(f"MTF 分析: {mtf_result['signal']} ({mtf_result['reason']})") # --- 3.2.7 IFVG 分析 (新增) --- # 在 AdvancedAnalysisAdapter 中已调用,但这里需要单独提取结果供后续使用 # 我们之前在步骤 3.2.1 的 AdvancedAnalysisAdapter.analyze 中已经获取了 ifvg_result # 但由于 analyze 方法返回的是一个包含多个子结果的字典,我们需要确保 ifvg_result 变量被正确定义 if adv_result and 'ifvg' in adv_result: ifvg_result = adv_result['ifvg'] else: # Fallback if advanced analysis failed or ifvg key missing ifvg_result = {"signal": "hold", "strength": 0, "reasons": [], "active_zones": []} logger.info(f"IFVG 分析: {ifvg_result['signal']} (Strength: {ifvg_result['strength']})") # --- 3.2.8 RVGI+CCI 分析 (新增) --- if adv_result and 'rvgi_cci' in adv_result: rvgi_cci_result = adv_result['rvgi_cci'] else: rvgi_cci_result = {"signal": "hold", "strength": 0, "reasons": []} logger.info(f"RVGI+CCI 分析: {rvgi_cci_result['signal']} (Strength: {rvgi_cci_result['strength']})") # 准备优化器池信息供 AI 参考 optimizer_info = { "available_optimizers": list(self.optimizers.keys()), "active_optimizer": self.active_optimizer_name, "last_optimization_score": self.optimizers[self.active_optimizer_name].best_score if self.optimizers[self.active_optimizer_name].best_score > -90000 else None, "descriptions": { "GWO": "Grey Wolf Optimizer - 模拟灰狼捕猎行为", "WOAm": "Whale Optimization Algorithm (Modified) - 模拟座头鲸气泡网捕猎", "DE": "Differential Evolution - 差分进化算法", "COAm": "Cuckoo Optimization Algorithm (Modified) - 模拟布谷鸟寄生繁殖", "BBO": "Biogeography-Based Optimization - 生物地理学优化", "TETA": "Time Evolution Travel Algorithm - 时间演化旅行算法 (无参)" } } # --- 3.3 DeepSeek 分析 --- logger.info("正在调用 DeepSeek 分析市场结构...") # 获取历史交易绩效 (MFE/MAE) - 提前获取供 DeepSeek 使用 trade_stats = self.db_manager.get_trade_performance_stats(limit=50) # 获取当前持仓状态 (供 DeepSeek 和 Qwen 决策) - 提前获取 positions = mt5.positions_get(symbol=self.symbol) current_positions_list = [] if positions: for pos in positions: cur_mfe, cur_mae = self.get_position_stats(pos) current_positions_list.append({ "ticket": pos.ticket, "type": "buy" if pos.type == mt5.POSITION_TYPE_BUY else "sell", "volume": pos.volume, "open_price": pos.price_open, "current_price": pos.price_current, "profit": pos.profit, "sl": pos.sl, "tp": pos.tp, "mfe_pct": cur_mfe, "mae_pct": cur_mae }) # 准备当前优化状态上下文 optimization_status = { "active_optimizer": self.active_optimizer_name, "optimizer_details": optimizer_info, # 注入详细优化器信息 "smc_params": { "ma_period": self.smc_analyzer.ma_period, "atr_threshold": self.smc_analyzer.atr_threshold }, "mfh_params": { "learning_rate": self.mfh_analyzer.learning_rate } } # 传入 CRT, PriceEq, TF 和 高级分析 的结果作为额外上下文 extra_analysis = { "crt": crt_result, "price_equation": price_eq_result, "timeframe_analysis": tf_result, "advanced_tech": adv_result['summary'] if adv_result else None, "matrix_ml": ml_result, "smc": smc_result, "mfh": mfh_result, "mtf": mtf_result, "ifvg": ifvg_result, "rvgi_cci": rvgi_cci_result, "optimization_status": optimization_status # 新增: 当前参数状态 } # 调用 DeepSeek,传入性能数据和持仓信息 structure = self.deepseek_client.analyze_market_structure( market_snapshot, current_positions=current_positions_list, extra_analysis=extra_analysis, performance_stats=trade_stats ) logger.info(f"DeepSeek 分析完成: {structure.get('market_state')}") # DeepSeek 信号转换 ds_signal = structure.get('preliminary_signal', 'neutral') ds_pred = structure.get('short_term_prediction', 'neutral') ds_score = structure.get('structure_score', 50) # 如果 DeepSeek 没有返回 preliminary_signal (旧版本兼容),使用简单的规则 if ds_signal == 'neutral': if ds_pred == 'bullish' and ds_score > 60: ds_signal = "buy" elif ds_pred == 'bearish' and ds_score > 60: ds_signal = "sell" # --- 3.4 Qwen 策略 --- logger.info("正在调用 Qwen 生成策略...") # 准备混合信号供 Qwen 参考 technical_signals = { "crt": crt_result, "price_equation": price_eq_result, "timeframe_analysis": tf_result, "advanced_tech": adv_signal, "matrix_ml": ml_result['signal'], "smc": smc_result['signal'], "mfh": mfh_result['signal'], "mtf": mtf_result['signal'], "deepseek_analysis": { # 传入完整的 DeepSeek 分析结果 "market_state": structure.get('market_state'), "preliminary_signal": ds_signal, "confidence": structure.get('signal_confidence'), "consistency": structure.get('consistency_analysis'), "prediction": ds_pred }, "performance_stats": trade_stats # 传入历史绩效 } strategy = self.qwen_client.optimize_strategy_logic(structure, market_snapshot, technical_signals=technical_signals, current_positions=current_positions_list) # --- 参数自适应优化 (Feedback Loop) --- # 将大模型的参数优化建议应用到当前运行的算法中 param_updates = strategy.get('parameter_updates', {}) if param_updates: try: update_reason = param_updates.get('reason', 'AI Optimized') logger.info(f"应用参数优化 ({update_reason}): {param_updates}") # 1. SMC 参数 if 'smc_atr_threshold' in param_updates: new_val = float(param_updates['smc_atr_threshold']) self.smc_analyzer.atr_threshold = new_val logger.info(f"Updated SMC ATR Threshold -> {new_val}") # 2. MFH 参数 if 'mfh_learning_rate' in param_updates: new_val = float(param_updates['mfh_learning_rate']) self.mfh_analyzer.learning_rate = new_val logger.info(f"Updated MFH Learning Rate -> {new_val}") # 3. 切换优化器 if 'active_optimizer' in param_updates: new_opt = str(param_updates['active_optimizer']) if new_opt in self.optimizers and new_opt != self.active_optimizer_name: self.active_optimizer_name = new_opt logger.info(f"Switched Optimizer -> {new_opt}") # 4. Matrix ML 参数 (如需) if 'matrix_ml_learning_rate' in param_updates: self.matrix_ml.learning_rate = float(param_updates['matrix_ml_learning_rate']) except Exception as e: logger.error(f"参数动态更新失败: {e}") # Qwen 信号转换 # 如果没有明确 action 字段,我们假设它作为 DeepSeek 的确认层 # 现在我们优先使用 Qwen 返回的 action qw_action = strategy.get('action', 'neutral').lower() # 扩展 Action 解析,支持加仓/减仓/平仓/挂单指令 final_signal = "neutral" if qw_action in ['buy', 'add_buy']: final_signal = "buy" elif qw_action in ['sell', 'add_sell']: final_signal = "sell" elif qw_action in ['buy_limit', 'limit_buy']: final_signal = "limit_buy" elif qw_action in ['sell_limit', 'limit_sell']: final_signal = "limit_sell" elif qw_action in ['close_buy', 'close_sell', 'close']: final_signal = "close" # 特殊信号: 平仓 elif qw_action == 'hold': final_signal = "hold" qw_signal = final_signal if final_signal not in ['hold', 'close'] else 'neutral' # --- 3.5 最终决策 (LLM Centric) --- # 依据用户指令:完全基于大模型的最终决策 (以 Qwen 的 Action 为主) # Qwen 已经接收了所有技术指标(technical_signals)作为输入,因此它的输出即为"集合最终分析结果" # final_signal 已在上面由 qw_action 解析得出 reason = strategy.get('reason', 'LLM Decision') # 计算置信度/强度 (Strength) # 我们使用技术指标的一致性作为置信度评分 tech_consensus_score = 0 matching_count = 0 valid_tech_count = 0 tech_signals_list = [ crt_result['signal'], price_eq_result['signal'], tf_result['signal'], adv_signal, ml_result['signal'], smc_result['signal'], mfh_result['signal'], mtf_result['signal'], ifvg_result['signal'], rvgi_cci_result['signal'] ] for sig in tech_signals_list: if sig != 'neutral': valid_tech_count += 1 if sig == final_signal: matching_count += 1 if final_signal in ['buy', 'sell', 'limit_buy', 'limit_sell']: # 基础分 60 (既然 LLM 敢喊单) base_strength = 60 # 技术面加成 if valid_tech_count > 0: tech_boost = (matching_count / valid_tech_count) * 40 # 最高 +40 strength = base_strength + tech_boost else: strength = base_strength # DeepSeek 加成 if ds_signal == final_signal: strength = min(100, strength + 10) else: strength = 0 all_signals = { "deepseek": ds_signal, "qwen": qw_signal, "crt": crt_result['signal'], "price_equation": price_eq_result['signal'], "tf_visual": tf_result['signal'], "advanced_tech": adv_signal, "matrix_ml": ml_result['signal'], "smc": smc_result['signal'], "mfh": mfh_result['signal'], "mtf": mtf_result['signal'], "ifvg": ifvg_result['signal'], "rvgi_cci": rvgi_cci_result['signal'] } # 仅保留 weights 用于记录,不再用于计算信号 _, _, weights = self.optimizer.combine_signals(all_signals) # --- 3.6 记录信号历史用于实时优化 --- # 解决优化算法未运行的问题:收集数据并定期调用 optimize_weights self.signal_history.append((current_bar_time, all_signals, float(current_price['close']))) if len(self.signal_history) > 1000: self.signal_history.pop(0) # 每 15 分钟尝试优化一次权重 if time.time() - self.last_optimization_time > 900: self.optimize_weights() logger.info(f"AI 最终决定 (LLM-Driven): {final_signal.upper()} (强度: {strength:.1f})") logger.info(f"LLM Reason: {reason}") logger.info(f"技术面支持: {matching_count}/{valid_tech_count}") # 保存分析结果到DB self.db_manager.save_signal(self.symbol, self.tf_name, { "final_signal": final_signal, "strength": strength, "details": { "source": "LLM_Centric", "weights": weights, "signals": all_signals, "market_state": structure.get('market_state'), "prediction": structure.get('short_term_prediction'), "crt_reason": crt_result['reason'], "mtf_reason": mtf_result['reason'], "adv_summary": adv_result['summary'] if adv_result else None, "matrix_ml_raw": ml_result['raw_output'], "smc_structure": smc_result['structure'], "smc_reason": smc_result['reason'], "mfh_slope": mfh_result['slope'], "ifvg_reason": ", ".join(ifvg_result['reasons']) if ifvg_result['reasons'] else "N/A" } }) # 更新全局缓存,供 manage_positions 使用 self.latest_strategy = strategy self.latest_signal = final_signal # --- 发送分析报告到 Telegram --- # 构建更详细的报告 regime_info = adv_result['regime']['description'] if adv_result else "N/A" volatility_info = f"{adv_result['risk']['volatility']:.2%}" if adv_result else "N/A" # 获取当前持仓概览 pos_summary = "No Open Positions" positions = mt5.positions_get(symbol=self.symbol) if positions: pos_details = [] for p in positions: type_str = "BUY" if p.type == mt5.POSITION_TYPE_BUY else "SELL" pnl = p.profit pos_details.append(f"{type_str} {p.volume} (PnL: {pnl:.2f})") pos_summary = "\n".join(pos_details) # 获取建议的 SL/TP (用于展示最优 SL/TP) # 逻辑: 优先展示 Qwen 策略中明确的 SL/TP,如果没有,则展示基于 MFE/MAE 优化的计算值 current_bid = mt5.symbol_info_tick(self.symbol).bid current_ask = mt5.symbol_info_tick(self.symbol).ask ref_price = current_ask # 默认参考价 trade_dir_for_calc = "buy" if final_signal in ['sell', 'limit_sell']: trade_dir_for_calc = "sell" ref_price = current_bid # 1. 尝试从 Qwen 策略获取 exit_conds = strategy.get('exit_conditions', {}) opt_sl = exit_conds.get('sl_price') opt_tp = exit_conds.get('tp_price') # 2. 如果 Qwen 未提供,使用内部优化算法计算 if not opt_sl or not opt_tp: # 准备市场上下文 sl_tp_context = { "supply_zones": adv_result.get('ifvg', {}).get('active_zones', []), "demand_zones": [], "bearish_fvgs": [], "bullish_fvgs": [] } # 计算 ATR (复用) atr_val = float(latest_features.get('atr', 0)) if atr_val == 0: atr_val = ref_price * 0.005 calc_sl, calc_tp = self.calculate_optimized_sl_tp(trade_dir_for_calc, ref_price, atr_val, market_context=sl_tp_context) if not opt_sl: opt_sl = calc_sl if not opt_tp: opt_tp = calc_tp # 计算盈亏比 (R:R) rr_str = "N/A" if opt_sl and opt_tp and ref_price: risk = abs(ref_price - opt_sl) reward = abs(opt_tp - ref_price) if risk > 0: rr = reward / risk rr_str = f"1:{rr:.2f}" # 优化显示逻辑: 如果是 Hold 且无持仓,显示为 "Waiting for Market Direction" display_decision = final_signal.upper() if final_signal == 'hold' and (not positions or len(positions) == 0): display_decision = "WAITING FOR MARKET DIRECTION ⏳" # 格式化 DeepSeek 和 Qwen 的详细分析 # DeepSeek Report ds_analysis_text = f"• Market State: {self.escape_markdown(structure.get('market_state', 'N/A'))}\n" ds_analysis_text += f"• Signal: {self.escape_markdown(ds_signal.upper())} (Conf: {ds_score}/100)\n" ds_analysis_text += f"• Prediction: {self.escape_markdown(ds_pred)}" # Qwen Report qw_reason = strategy.get('reason', strategy.get('rationale', 'Strategy Optimization')) qw_analysis_text = f"• Action: {self.escape_markdown(qw_action.upper())}\n" qw_analysis_text += f"• Logic: _{self.escape_markdown(qw_reason)}_\n" if param_updates: qw_analysis_text += f"• Params Updated: {len(param_updates)} items" safe_reason = self.escape_markdown(reason) safe_volatility = self.escape_markdown(volatility_info) safe_pos_summary = self.escape_markdown(pos_summary) analysis_msg = ( f"🤖 *AI Gold Strategy Comprehensive Report*\n" f"Symbol: `{self.symbol}` | TF: `{self.tf_name}`\n" f"Time: {datetime.now().strftime('%H:%M:%S')}\n\n" f"🕵️ *DeepSeek Analysis (Structure)*\n" f"{ds_analysis_text}\n\n" f"🧙‍♂️ *Qwen Analysis (Strategy)*\n" f"{qw_analysis_text}\n\n" f"🧠 *Final Consolidated Result*\n" f"• Decision: *{display_decision}* (Strength: {strength:.0f}%)\n" f"• Direction: `{trade_dir_for_calc.upper()}`\n" f"• Reason: _{safe_reason}_\n\n" f"🎯 *Optimal Trade Setup (Best SL/TP)*\n" f"• Ref Entry: `{ref_price:.2f}`\n" f"• 🛑 Stop Loss: `{opt_sl:.2f}`\n" f"• 🏆 Take Profit: `{opt_tp:.2f}`\n" f"• R:R Ratio: `{rr_str}`\n\n" f"📊 *Market Context*\n" f"• Volatility: `{safe_volatility}`\n" f"• Tech Consensus: {matching_count}/{valid_tech_count} agree\n" f"• Signals: SMC[{smc_result['signal']}] | CRT[{crt_result['signal']}] | MTF[{mtf_result['signal']}]\n\n" f"💼 *Account Status*\n" f"{safe_pos_summary}" ) self.send_telegram_message(analysis_msg) # 4. 执行交易 # 修正逻辑: 优先尊重 Qwen 的信号和参数 (大模型集合最终结果) # 如果 Qwen 明确说 "hold" 或 "neutral",即使 final_signal 是 buy/sell,也应该谨慎 # 但如果 final_signal 极强 (如 100.0),我们可能还是想交易 # 现在的逻辑是: 交易方向以 final_signal 为准 (因为它是混合投票的结果,Qwen 也是其中一票) # 但 参数 (Entry/Exit) 必须优先使用 Qwen 的建议 if final_signal != 'hold': logger.info(f">>> 准备执行 AI 集合决策: {final_signal.upper()} <<<") entry_params = strategy.get('entry_conditions') exit_params = strategy.get('exit_conditions') # 强制使用 Qwen 的参数,不再进行一致性回退检查 # 除非 Qwen 建议的参数明显不可用 (如 None) # 日志记录差异,但不阻止使用参数 if qw_signal != final_signal and qw_signal not in ['neutral', 'hold']: logger.warning(f"Qwen 信号 ({qw_signal}) 与最终决策 ({final_signal}) 不一致,但仍优先使用 Qwen 参数") trade_res = self.execute_trade( final_signal, strength, exit_params, entry_params ) time.sleep(1) # 避免 CPU 占用过高 except KeyboardInterrupt: logger.info("用户停止机器人") mt5.shutdown() except Exception as e: logger.error(f"发生未捕获异常: {e}", exc_info=True) mt5.shutdown() if __name__ == "__main__": # 可以通过命令行参数传入品种 symbol = "GOLD" if len(sys.argv) > 1: symbol = sys.argv[1].upper() else: symbol = "GOLD" # 默认改为黄金 bot = AI_MT5_Bot(symbol=symbol) bot.run()