mql5/crypto/qwen_client.py

553 lines
27 KiB
Python
Raw Permalink Normal View History

import requests
import json
import logging
import time
from typing import Dict, Any, Optional, List
import pandas as pd
import numpy as np
from datetime import datetime, date
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class CustomJSONEncoder(json.JSONEncoder):
"""自定义JSON编码器,处理Timestamp等非序列化类型"""
def default(self, o):
if isinstance(o, (datetime, date, pd.Timestamp)):
return o.isoformat()
if isinstance(o, (pd.Series, pd.DataFrame)):
return o.to_dict()
if isinstance(o, (np.integer, int)):
return int(o)
if isinstance(o, (np.floating, float)):
return float(o)
if isinstance(o, np.ndarray):
return o.tolist()
return super().default(o)
class QwenClient:
"""
Qwen3 API客户端用于策略逻辑优化动态止盈止损生成和信号强度判断
使用硅基流动API服务遵循ValueCell的API调用模式
"""
def __init__(self, api_key: str, base_url: str = "https://api.siliconflow.cn/v1", model: str = "Qwen/Qwen3-VL-235B-A22B-Thinking"):
"""
初始化Qwen客户端
Args:
api_key (str): 硅基流动API密钥
base_url (str): API基础URL默认为https://api.siliconflow.cn/v1
model (str): 使用的模型名称默认为Qwen/Qwen3-VL-235B-A22B-Thinking
"""
self.api_key = api_key
self.base_url = base_url
self.model = model
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
# 启用JSON模式,遵循ValueCell的实现
self.enable_json_mode = True
def _call_api(self, endpoint: str, payload: Dict[str, Any], max_retries: int = 3) -> Optional[Dict[str, Any]]:
"""
调用Qwen API支持重试机制
基于ValueCell的API调用模式增强了错误处理和日志记录
Args:
endpoint (str): API端点
payload (Dict[str, Any]): 请求负载
max_retries (int): 最大尝试次数默认为3 (增强稳定性)
Returns:
Optional[Dict[str, Any]]: API响应失败返回None
"""
url = f"{self.base_url}/{endpoint}"
for retry in range(max_retries):
response = None
try:
# 增加超时时间到120秒,提高在网络不稳定情况下的成功率
# 显式禁用代理,防止因系统代理配置不当导致连接国内API失败
response = requests.post(
url,
headers=self.headers,
json=payload,
timeout=120
)
# 详细记录响应状态
logger.debug(f"API响应状态码: {response.status_code}, 模型: {self.model}, 重试: {retry+1}/{max_retries}")
# 处理不同状态码
if response.status_code == 401:
logger.error(f"API认证失败,状态码: {response.status_code},请检查API密钥是否正确")
return None
elif response.status_code == 403:
logger.error(f"API访问被拒绝,状态码: {response.status_code},请检查API密钥权限")
return None
elif response.status_code == 429:
logger.warning(f"API请求频率过高,状态码: {response.status_code},进入退避重试")
elif response.status_code >= 500:
logger.error(f"API服务器错误,状态码: {response.status_code}")
response.raise_for_status()
# 解析响应并添加调试信息
response_json = response.json()
logger.info(f"API调用成功,状态码: {response.status_code}, 模型: {self.model}")
return response_json
except requests.exceptions.ConnectionError as e:
logger.error(f"API连接失败 (重试 {retry+1}/{max_retries}): {e}")
logger.error(f"请求URL: {url}")
logger.error("请检查网络连接和API服务可用性")
except requests.exceptions.Timeout as e:
logger.error(f"API请求超时 (重试 {retry+1}/{max_retries}): {e}")
logger.error(f"请求URL: {url}")
logger.error("请检查网络连接和API服务响应时间")
except requests.exceptions.HTTPError as e:
logger.error(f"API HTTP错误 (重试 {retry+1}/{max_retries}): {e}")
logger.error(f"请求URL: {url}")
if response:
logger.error(f"响应内容: {response.text[:200]}...")
except requests.exceptions.RequestException as e:
logger.error(f"API请求异常 (重试 {retry+1}/{max_retries}): {e}")
logger.error(f"请求URL: {url}")
except json.JSONDecodeError as e:
logger.error(f"JSON解析失败: {e}")
if response:
logger.error(f"响应内容: {response.text}")
return None
except Exception as e:
logger.error(f"API调用意外错误: {e}")
logger.exception("完整错误堆栈:")
return None
if retry < max_retries - 1:
# 线性延迟重试,提高网络不稳定情况下的成功率
retry_delay = min(5 * (retry + 1), 30) # 每次增加5秒,最大30秒
logger.info(f"等待 {retry_delay} 秒后重试...")
time.sleep(retry_delay)
else:
logger.error(f"API调用失败,已达到最大重试次数 {max_retries}")
return None
def optimize_strategy_logic(self, deepseek_analysis: Dict[str, Any], current_market_data: Dict[str, Any], technical_signals: Optional[Dict[str, Any]] = None, current_positions: Optional[List[Dict[str, Any]]] = None, performance_stats: Optional[List[Dict[str, Any]]] = None) -> Dict[str, Any]:
"""
优化策略逻辑基于DeepSeek的情绪得分调整入场条件
基于ValueCell的实现支持JSON模式输出
Args:
deepseek_analysis (Dict[str, Any]): DeepSeek的市场分析结果
current_market_data (Dict[str, Any]): 当前市场数据
technical_signals (Optional[Dict[str, Any]]): 其他技术模型的信号CRT, Price Equation等
current_positions (Optional[List[Dict[str, Any]]]): 当前持仓信息 (用于决定加仓或平仓)
performance_stats (Optional[List[Dict[str, Any]]]): 历史交易绩效统计 (用于自学习)
Returns:
Dict[str, Any]: 优化后的策略参数
"""
tech_context = ""
perf_context = ""
pos_context = ""
if current_positions:
pos_context = f"\n当前持仓状态 (包含实时 MFE/MAE):\n{json.dumps(current_positions, indent=2, cls=CustomJSONEncoder)}\n"
else:
pos_context = "\n当前无持仓。\n"
# 处理当前挂单信息
open_orders = current_market_data.get('open_orders', [])
orders_context = ""
if open_orders:
orders_context = f"\n当前挂单状态 (Limit/SL/TP):\n{json.dumps(open_orders, indent=2, cls=CustomJSONEncoder)}\n"
else:
orders_context = "\n当前无挂单。\n"
# 处理性能统计 (优先使用 explicit argument)
stats_to_use = performance_stats
# 兼容旧逻辑:如果 explicit 为空,尝试从 technical_signals 中提取
if not stats_to_use and technical_signals and isinstance(technical_signals.get('performance_stats'), list):
stats_to_use = technical_signals.get('performance_stats')
if stats_to_use:
# 构建 MFE/MAE 象限分析数据
# 假设 stats_to_use 是 list of trades, 或者包含 summary dict
# 这里我们需要处理两种情况:
# 1. List of trades (from db.get_trade_performance_stats)
# 2. Dict with 'recent_trades' key
recent_trades = []
summary_stats = {}
if isinstance(stats_to_use, list):
recent_trades = stats_to_use
# Calculate summary on the fly if needed, or just dump trades
if len(recent_trades) > 0:
mfe_list = [t.get('mfe', 0) for t in recent_trades if t.get('mfe') is not None]
mae_list = [t.get('mae', 0) for t in recent_trades if t.get('mae') is not None]
# Calculate Win Rate and Profit Factor
wins = len([t for t in recent_trades if t.get('profit', 0) > 0])
total_profit = sum([t.get('profit', 0) for t in recent_trades if t.get('profit', 0) > 0])
total_loss = abs(sum([t.get('profit', 0) for t in recent_trades if t.get('profit', 0) < 0]))
summary_stats = {
'avg_mfe': sum(mfe_list)/len(mfe_list) if mfe_list else 0,
'avg_mae': sum(mae_list)/len(mae_list) if mae_list else 0,
'trade_count': len(recent_trades),
'win_rate': (wins / len(recent_trades)) * 100 if recent_trades else 0,
'profit_factor': (total_profit / total_loss) if total_loss > 0 else 99.9
}
elif isinstance(stats_to_use, dict):
summary_stats = stats_to_use
recent_trades = stats_to_use.get('recent_trades', [])
trades_summary = ""
if recent_trades:
trades_summary = json.dumps(recent_trades[:10], indent=2, cls=CustomJSONEncoder)
perf_context = (
f"\n历史交易绩效参考 (用于 MFE/MAE 象限分析与 SL/TP 优化):\n"
f"- 样本交易数: {summary_stats.get('trade_count', 0)}\n"
f"- 胜率 (Win Rate): {summary_stats.get('win_rate', 0):.2f}%\n"
f"- 盈亏比 (Profit Factor): {summary_stats.get('profit_factor', 0):.2f}\n"
f"- 平均 MFE: {summary_stats.get('avg_mfe', 0):.2f}%\n"
f"- 平均 MAE: {summary_stats.get('avg_mae', 0):.2f}%\n"
f"- 最近交易详情 (用于分析体质): \n{trades_summary}\n"
)
if technical_signals:
# 从 technical_signals 中移除 stats 以免重复 (浅拷贝处理)
sigs_copy = technical_signals.copy()
if 'performance_stats' in sigs_copy:
del sigs_copy['performance_stats']
tech_context = f"\n其他技术模型信号 (CRT/PriceEq/Hybrid):\n{json.dumps(sigs_copy, indent=2, cls=CustomJSONEncoder)}\n"
prompt = f"""
作为专业的量化交易策略优化专家你是混合交易系统的核心决策层请根据DeepSeek的市场分析结果当前市场数据当前持仓状态以及其他技术模型的信号优化策略逻辑并做出最终执行决定
你现在拥有全套高级算法的信号支持请特别关注SMC策略与其他信号的共振
1. **SMC (Smart Money Concepts) - 核心信号**:
- 确认DeepSeek分析中识别的OB (订单块) FVG (流动性缺口) 是否与当前价格位置匹配
- 寻找流动性扫荡后的反转确认 (例如: 扫荡低点后出现MSB)
- 在溢价区(Premium)寻找卖出机会在折价区(Discount)寻找买入机会
2. **多模型共识**: 结合 IFVG, CRT, RVGI+CCI, MFH, MTF 的信号如果SMC信号与其他模型冲突请依据 DeepSeek 的市场结构分析和 MTF (多周期) 趋势来裁决
DeepSeek市场分析结果
{json.dumps(deepseek_analysis, indent=2, cls=CustomJSONEncoder)}
当前市场数据
{json.dumps(current_market_data, indent=2, cls=CustomJSONEncoder)}
{pos_context}
{orders_context}
{tech_context}
{perf_context}
请综合考虑所有信号并输出最终的交易决策 (Action):
1. DeepSeek 提供宏观结构和趋势判断
2. CRT (Candle Range Theory) 提供流动性猎取和反转信号
3. Price Equation 提供纯数学的动量预测
4. Hybrid Optimizer 提供加权共识
5. **MFE/MAE 象限分析与 SL/TP 优化**:
- **数据**: 请参考提供的历史交易详情 (MFE, MAE, Profit)
- **分析**:
- 观察高盈利交易的 MFE 分布 TP 设定在能捕获大部分 MFE 的位置 ( 80% 分位)
- 观察亏损交易的 MAE 分布 SL 设定在能过滤掉"第一象限 (低MFE 高MAE)"交易的位置
- **动态调整**: 如果近期 MAE 普遍变大且盈利困难说明市场波动剧烈或策略失效请收紧 SL 或暂停交易
6. **持仓管理 (Position Management)**:
- **重要**: 请首先检查 `current_market_data` 中的 `account_info` `pos_context` 中的当前持仓
- 如果已有持仓且方向正确考虑是否加仓 (Add) 或持有 (Hold)
- 如果已有持仓但方向错误或动能减弱考虑平仓 (Close)
- 如果无持仓且信号明确考虑开仓 (Open)
- **资金利用率**: 用户反馈之前的下单量过小 0.2 USDT 保证金请务必根据 `account_info.available_usdt` 计算合理的开仓比例
- **合约张数**: 注意 OKX ETH/USDT 永续合约每张价值 0.1 ETH模型建议的资金比例将转换为具体的张数进行下单
- **杠杆使用**: 如果市场趋势明确且信号强度高请充分利用杠杆 (1-100x)例如如果建议使用 50% 资金和 20x 杠杆则实际下单名义价值 = 可用余额 * 0.5 * 20请确保在 `leverage` 字段返回合适的杠杆倍数
- **资金比例 (Position Size)**: 请在 `position_size` 字段中返回建议使用的资金比例0.0-1.0例如 0.5 代表使用当前可用 USDT 50% 作为**保证金**
- **杠杆比例 (Leverage)**: 请在 `leverage` 字段中返回建议使用的杠杆倍数1-100
请提供以下优化结果并确保分析全面逻辑严密不要使用省略号或简化描述**请务必使用中文进行输出Strategy Logic Rationale 部分**
1. 核心决策买入/卖出/持有/平仓/加仓/挂单(Limit)
2. 入场/加仓条件基于情绪得分和技术指标的优化规则**如果是挂单(Limit/Stop)必须明确给出具体的挂单价格(limit_price)这非常重要**
- 对于 Buy Limit价格应低于当前市价回调买入
- 对于 Sell Limit价格应高于当前市价反弹卖出
- 请结合 SMC FVG OB 区域来设定精确的挂单点位
3. 出场/减仓条件**基于 MFE/MAE 分析的合理优化止盈止损点**请直接给出具体价格sl_price, tp_price
- **Stop Loss (SL)**: 参考 MAE 分布设定在能过滤掉大部分"假突破"但又能控制最大亏损的位置结合市场结构放在最近的 Swing High/Low 或结构位之外
- **Take Profit (TP)**: 参考 MFE 分布设定在能捕获 80% 潜在收益的位置结合市场结构放在下一个 Liquidity Pool (流动性池) OB 之前
- 不要使用 ATR 倍数请给出具体的数值
4. 仓位管理针对当前持仓的具体操作建议如加仓减仓反手
5. 风险管理建议针对当前市场状态的风险控制措施
6. **参数自适应优化建议 (Parameter Optimization)**:
- 请分析当前市场状态 (波动率趋势强度)并评估现有算法参数的适用性
- 给出针对 SMC, MFH, MatrixML Optimization Algorithm (GWO/WOAm/etc) 的具体参数调整建议
- 例如: "SMC ATR 阈值过低,建议提高到 0.003 以过滤噪音" "建议切换到 DE 优化器以增加探索能力"
7. 策略逻辑详解请详细解释做出上述决策的逻辑链条 (Strategy Logic Rationale)**必须包含对 SMC 信号的解读MFE/MAE 数据的分析以及为何选择该 SL/TP 点位**
请以JSON格式返回结果包含以下字段
- action: str ("buy", "sell", "hold", "close_buy", "close_sell", "add_buy", "add_sell", "buy_limit", "sell_limit")
- entry_conditions: dict (包含 "trigger_type", "limit_price", "confirmation") **确保 limit_price 是一个具体的数字**
- exit_conditions: dict (包含 "sl_price", "tp_price", "close_rationale") **确保 sl_price tp_price 是具体的数字**
- position_management: dict (包含 "action", "volume_percent", "reason")
- position_size: float (0.0 - 1.0, representing percentage of available USDT to use)
- leverage: int (1-100)
- signal_strength: int
- risk_management: dict
- parameter_updates: dict (包含 "smc_atr_threshold": float, "mfh_learning_rate": float, "active_optimizer": str (GWO/WOAm/DE/COAm/BBO/TETA), "reason": str)
- strategy_rationale: str (中文)
"""
# 构建payload,遵循ValueCell的实现
payload = {
"model": self.model,
"messages": [
{"role": "system", "content": "你是一位专业的量化交易策略优化专家,擅长基于市场分析结果调整交易参数。请始终使用中文回复分析内容。"},
{"role": "user", "content": prompt}
],
"temperature": 0.3,
"max_tokens": 1500,
"stream": False
}
# 启用JSON模式,ValueCell推荐使用JSON模式处理结构化输出
if self.enable_json_mode:
payload["response_format"] = {"type": "json_object"}
response = self._call_api("chat/completions", payload)
if response and "choices" in response:
try:
message_content = response["choices"][0]["message"]["content"]
# Log full response for detailed analysis
logger.info(f"收到模型响应: {message_content}")
optimized_strategy = json.loads(message_content)
# optimized_strategy["position_size"] = 0.01
return optimized_strategy
except json.JSONDecodeError as e:
logger.error(f"解析Qwen响应失败: {e}")
logger.error(f"原始响应: {response}")
# 返回默认策略参数
return {
"action": "hold",
"entry_conditions": {"trigger_type": "market"},
"exit_conditions": {"sl_atr_multiplier": 1.5, "tp_atr_multiplier": 2.5},
"position_size": 0.01,
"signal_strength": 50,
"risk_management": {"max_risk": 0.02},
"strategy_rationale": "解析失败,使用默认参数"
}
return {
"action": "hold",
"entry_conditions": {"trigger_type": "market"},
"exit_conditions": {"sl_atr_multiplier": 1.5, "tp_atr_multiplier": 2.5},
"position_size": 0.01,
"signal_strength": 50,
"risk_management": {"max_risk": 0.02},
"strategy_rationale": "API调用失败,使用默认参数"
}
def generate_dynamic_stoploss_takeprofit(self, volatility: float, market_state: str, signal_strength: int) -> Dict[str, float]:
"""
根据市场波动率生成自适应止盈止损
Args:
volatility (float): 当前波动率(ATR百分比)
market_state (str): 市场状态(趋势/震荡/高波动)
signal_strength (int): 信号强度(0-100)
Returns:
Dict[str, float]: 动态止盈止损参数
"""
prompt = f"""
作为专业的风险管理专家请根据以下参数生成动态止盈止损
当前波动率(ATR百分比){volatility}
市场状态{market_state}
信号强度{signal_strength}
请基于以下原则生成参数
1. 趋势市场止盈较大止损较小
2. 震荡市场止盈较小止损较小
3. 高波动市场止盈较大止损较大
4. 信号强度高止盈较大止损相对较小
5. 波动率高止盈止损都较大
请以ATR倍数返回止盈止损参数格式为
{{"take_profit": X.XX, "stop_loss": X.XX}}
请只返回JSON格式的结果不要包含其他解释
"""
payload = {
"model": self.model,
"messages": [
{"role": "system", "content": "你是一位专业的风险管理专家,擅长根据市场条件生成动态止盈止损参数。"},
{"role": "user", "content": prompt}
],
"temperature": 0.2,
"max_tokens": 100
}
response = self._call_api("chat/completions", payload)
if response and "choices" in response:
try:
sl_tp = json.loads(response["choices"][0]["message"]["content"])
return sl_tp
except json.JSONDecodeError as e:
logger.error(f"解析止盈止损参数失败: {e}")
return {"take_profit": 1.5, "stop_loss": 1.0}
def judge_signal_strength(self, deepseek_signal: Dict[str, Any], technical_indicators: Dict[str, Any]) -> int:
"""
对DeepSeek生成的初步信号进行二次验证判断信号强度
Args:
deepseek_signal (Dict[str, Any]): DeepSeek生成的信号
technical_indicators (Dict[str, Any]): 技术指标数据
Returns:
int: 信号强度0-100越高表示信号越可靠
"""
prompt = f"""
作为专业的交易信号分析师请评估以下交易信号的强度
DeepSeek信号
{json.dumps(deepseek_signal, indent=2)}
技术指标
{json.dumps(technical_indicators, indent=2)}
请基于以下因素评估信号强度(0-100)
1. 多指标共振技术指标是否一致支持该信号
2. 市场结构当前市场状态是否有利于该信号
3. 成交量成交量是否支持价格走势
4. 波动率当前波动率是否适合该信号
5. 历史表现类似情况下信号的历史成功率
请只返回一个数字不要包含任何其他文字或解释
"""
payload = {
"model": self.model,
"messages": [
{"role": "system", "content": "你是一位专业的交易信号分析师,擅长评估交易信号的强度和可靠性。"},
{"role": "user", "content": prompt}
],
"temperature": 0.2,
"max_tokens": 10
}
response = self._call_api("chat/completions", payload)
if response and "choices" in response:
try:
strength = int(response["choices"][0]["message"]["content"].strip())
# 确保强度在0-100之间
return max(0, min(100, strength))
except ValueError:
logger.error("无法解析信号强度")
return 50
def calculate_kelly_criterion(self, win_rate: float, risk_reward_ratio: float) -> float:
"""
计算凯利准则用于确定最优仓位
Args:
win_rate (float): 胜率(0-1)
risk_reward_ratio (float): 风险回报比
Returns:
float: 最优仓位比例
"""
prompt = f"""
请根据以下参数计算凯利准则
胜率{win_rate}
风险回报比{risk_reward_ratio}
请只返回一个数字表示最优仓位比例(0-1之间)不要包含任何其他文字或解释
"""
payload = {
"model": self.model,
"messages": [
{"role": "system", "content": "你是一位专业的资金管理专家,擅长计算凯利准则。"},
{"role": "user", "content": prompt}
],
"temperature": 0.1,
"max_tokens": 10
}
response = self._call_api("chat/completions", payload)
if response and "choices" in response:
try:
kelly = float(response["choices"][0]["message"]["content"].strip())
# 确保凯利比例在0-1之间
return max(0.0, min(1.0, kelly))
except ValueError:
logger.error("无法解析凯利比例")
# 使用传统凯利公式计算默认值
default_kelly = win_rate - ((1 - win_rate) / risk_reward_ratio)
return max(0.0, min(1.0, default_kelly))
def main():
"""
主函数用于测试Qwen客户端
"""
# 示例使用,实际需要替换为有效的API密钥
api_key = "your_qwen_api_key"
client = QwenClient(api_key)
# 示例DeepSeek分析结果
deepseek_analysis = {
"market_state": "trend_up",
"support_levels": [1.0800, 1.0750],
"resistance_levels": [1.0900, 1.0950],
"structure_score": 85,
"short_term_prediction": "bullish",
"indicator_analysis": "EMA黄金交叉,RSI处于中性区域"
}
# 示例当前市场数据
current_market_data = {
"symbol": "EURUSD",
"timeframe": "H1",
"prices": {
"open": 1.0850,
"high": 1.0875,
"low": 1.0840,
"close": 1.0865,
"volume": 1234567
},
"indicators": {
"ema_fast": 1.0855,
"ema_slow": 1.0848,
"rsi": 65.2,
"atr": 0.0025
}
}
# 测试策略优化
optimized_strategy = client.optimize_strategy_logic(deepseek_analysis, current_market_data)
print("优化后的策略参数:")
print(json.dumps(optimized_strategy, indent=2, ensure_ascii=False))
# 测试动态止盈止损生成
sl_tp = client.generate_dynamic_stoploss_takeprofit(0.25, "trend_up", 85)
print(f"\n动态止盈止损: {sl_tp}")
# 测试信号强度判断
deepseek_signal = {"signal": "buy", "confidence": 0.8}
technical_indicators = {"ema_crossover": 1, "rsi": 65.2, "volume_increase": True}
signal_strength = client.judge_signal_strength(deepseek_signal, technical_indicators)
print(f"\n信号强度: {signal_strength}")
# 测试凯利准则计算
kelly = client.calculate_kelly_criterion(0.6, 1.5)
print(f"\n凯利准则: {kelly:.2f}")
if __name__ == "__main__":
main()