1. Fix limit order execution in crypto/trading_bot.py by adding auto-price fallback. 2. Update crypto/qwen_client.py prompt to mandate explicit limit prices and MFE/MAE based SL/TP. 3. Update crypto/deepseek_client.py prompt to include performance stats and position context.
422 Zeilen
19 KiB
Python
422 Zeilen
19 KiB
Python
import requests
|
|
import json
|
|
import logging
|
|
import time
|
|
from typing import Dict, Any, Optional, List
|
|
import pandas as pd
|
|
import numpy as np
|
|
from datetime import datetime, date
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class CustomJSONEncoder(json.JSONEncoder):
|
|
"""自定义JSON编码器,处理Timestamp等非序列化类型"""
|
|
def default(self, o):
|
|
if isinstance(o, (datetime, date, pd.Timestamp)):
|
|
return o.isoformat()
|
|
if isinstance(o, (pd.Series, pd.DataFrame)):
|
|
return o.to_dict()
|
|
if isinstance(o, (np.integer, int)):
|
|
return int(o)
|
|
if isinstance(o, (np.floating, float)):
|
|
return float(o)
|
|
if isinstance(o, np.ndarray):
|
|
return o.tolist()
|
|
return super().default(o)
|
|
|
|
class DeepSeekClient:
|
|
"""
|
|
DeepSeek API客户端,用于市场分析和情绪得分生成
|
|
使用硅基流动API服务,遵循ValueCell的API调用模式
|
|
"""
|
|
def __init__(self, api_key: str, base_url: str = "https://api.siliconflow.cn/v1", model: str = "deepseek-ai/DeepSeek-V3.1-Terminus"):
|
|
"""
|
|
初始化DeepSeek客户端
|
|
|
|
Args:
|
|
api_key (str): 硅基流动API密钥
|
|
base_url (str): API基础URL,默认为https://api.siliconflow.cn/v1
|
|
model (str): 使用的模型名称,默认为deepseek-ai/DeepSeek-V3.1-Terminus
|
|
"""
|
|
self.api_key = api_key
|
|
self.base_url = base_url
|
|
self.model = model
|
|
self.headers = {
|
|
"Authorization": f"Bearer {api_key}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
# 启用JSON模式,遵循ValueCell的实现
|
|
self.enable_json_mode = True
|
|
|
|
def _call_api(self, endpoint: str, payload: Dict[str, Any], max_retries: int = 3) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
调用DeepSeek API,支持重试机制
|
|
基于ValueCell的API调用模式,增强了错误处理和日志记录
|
|
|
|
Args:
|
|
endpoint (str): API端点
|
|
payload (Dict[str, Any]): 请求负载
|
|
max_retries (int): 最大尝试次数,默认为3 (增强稳定性)
|
|
|
|
Returns:
|
|
Optional[Dict[str, Any]]: API响应,失败返回None
|
|
"""
|
|
url = f"{self.base_url}/{endpoint}"
|
|
|
|
for retry in range(max_retries):
|
|
response = None
|
|
try:
|
|
# 增加超时时间到120秒,提高在网络不稳定或模型响应慢情况下的成功率
|
|
# 显式禁用代理,防止因系统代理配置不当导致连接国内API失败
|
|
response = requests.post(
|
|
url,
|
|
headers=self.headers,
|
|
json=payload,
|
|
timeout=120,
|
|
proxies={"http": None, "https": None}
|
|
)
|
|
|
|
# 详细记录响应状态
|
|
logger.debug(f"API响应状态码: {response.status_code}, 模型: {self.model}, 重试: {retry+1}/{max_retries}")
|
|
|
|
# 处理不同状态码
|
|
if response.status_code == 401:
|
|
logger.error(f"API认证失败,状态码: {response.status_code},请检查API密钥是否正确")
|
|
return None
|
|
elif response.status_code == 403:
|
|
logger.error(f"API访问被拒绝,状态码: {response.status_code},请检查API密钥权限")
|
|
return None
|
|
elif response.status_code == 429:
|
|
logger.warning(f"API请求频率过高,状态码: {response.status_code},进入退避重试")
|
|
elif response.status_code >= 500:
|
|
logger.error(f"API服务器错误,状态码: {response.status_code}")
|
|
|
|
response.raise_for_status()
|
|
|
|
# 解析响应并添加调试信息
|
|
response_json = response.json()
|
|
logger.info(f"API调用成功,状态码: {response.status_code}, 模型: {self.model}")
|
|
return response_json
|
|
except requests.exceptions.ConnectionError as e:
|
|
logger.error(f"API连接失败 (重试 {retry+1}/{max_retries}): {e}")
|
|
logger.error(f"请求URL: {url}")
|
|
logger.error("请检查网络连接和API服务可用性")
|
|
except requests.exceptions.Timeout as e:
|
|
logger.error(f"API请求超时 (重试 {retry+1}/{max_retries}): {e}")
|
|
logger.error(f"请求URL: {url}")
|
|
logger.error("请检查网络连接和API服务响应时间")
|
|
except requests.exceptions.HTTPError as e:
|
|
logger.error(f"API HTTP错误 (重试 {retry+1}/{max_retries}): {e}")
|
|
logger.error(f"请求URL: {url}")
|
|
if response:
|
|
logger.error(f"响应内容: {response.text[:200]}...")
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"API请求异常 (重试 {retry+1}/{max_retries}): {e}")
|
|
logger.error(f"请求URL: {url}")
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"JSON解析失败: {e}")
|
|
if response:
|
|
logger.error(f"响应内容: {response.text}")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"API调用意外错误: {e}")
|
|
logger.exception("完整错误堆栈:")
|
|
return None
|
|
|
|
if retry < max_retries - 1:
|
|
# 线性延迟重试,提高网络不稳定情况下的成功率
|
|
retry_delay = min(5 * (retry + 1), 30) # 每次增加5秒,最大30秒
|
|
logger.info(f"等待 {retry_delay} 秒后重试...")
|
|
time.sleep(retry_delay)
|
|
else:
|
|
logger.error(f"API调用失败,已达到最大重试次数 {max_retries}")
|
|
return None
|
|
|
|
def analyze_market_structure(self, market_data: Dict[str, Any], current_positions: Optional[List[Dict[str, Any]]] = None, extra_analysis: Optional[Dict[str, Any]] = None, performance_stats: Optional[List[Dict[str, Any]]] = None) -> Dict[str, Any]:
|
|
"""
|
|
分析市场结构,识别趋势与震荡行情
|
|
基于ValueCell的实现,支持JSON模式输出
|
|
|
|
Args:
|
|
market_data (Dict[str, Any]): 市场数据,包含价格、成交量、指标等
|
|
current_positions (Optional[List[Dict[str, Any]]]): 当前实时持仓数据
|
|
extra_analysis (Optional[Dict[str, Any]]): 额外的技术分析数据(如CRT、价格方程等)
|
|
performance_stats (Optional[List[Dict[str, Any]]]): 历史交易绩效统计 (MFE/MAE)
|
|
|
|
Returns:
|
|
Dict[str, Any]: 市场结构分析结果
|
|
"""
|
|
extra_context = ""
|
|
if extra_analysis:
|
|
extra_context = f"\n额外技术分析参考:\n{json.dumps(extra_analysis, indent=2, cls=CustomJSONEncoder)}\n"
|
|
|
|
# 添加持仓信息到分析上下文中,使DeepSeek能感知当前风险敞口
|
|
pos_context = ""
|
|
if current_positions:
|
|
pos_context = f"\n当前实时持仓:\n{json.dumps(current_positions, indent=2, cls=CustomJSONEncoder)}\n"
|
|
elif market_data.get('current_positions'):
|
|
pos_context = f"\n当前实时持仓:\n{json.dumps(market_data.get('current_positions'), indent=2, cls=CustomJSONEncoder)}\n"
|
|
|
|
# 处理性能统计 (MFE/MAE)
|
|
perf_context = ""
|
|
if performance_stats:
|
|
# 简单统计
|
|
mfe_list = [t.get('mfe', 0) for t in performance_stats if t.get('mfe') is not None]
|
|
mae_list = [t.get('mae', 0) for t in performance_stats if t.get('mae') is not None]
|
|
avg_mfe = sum(mfe_list)/len(mfe_list) if mfe_list else 0
|
|
avg_mae = sum(mae_list)/len(mae_list) if mae_list else 0
|
|
|
|
perf_context = (
|
|
f"\n历史交易绩效参考 (MFE/MAE):\n"
|
|
f"- 平均最大有利幅度 (Avg MFE): {avg_mfe:.2f}%\n"
|
|
f"- 平均最大不利幅度 (Avg MAE): {avg_mae:.2f}%\n"
|
|
f"- 最近交易记录: {json.dumps(performance_stats[:5], indent=2, cls=CustomJSONEncoder)}\n"
|
|
)
|
|
|
|
opt_algo = "Auto-AO"
|
|
opt_info_str = ""
|
|
if extra_analysis and 'optimization_status' in extra_analysis:
|
|
status = extra_analysis['optimization_status']
|
|
opt_algo = status.get('active_optimizer', 'Auto-AO')
|
|
if 'optimizer_details' in status:
|
|
details = status['optimizer_details']
|
|
opt_info_str = f"""
|
|
[可用优化器池信息]
|
|
当前激活: {opt_algo} (得分: {details.get('last_optimization_score', 'N/A')})
|
|
可用算法: {', '.join(details.get('available_optimizers', []))}
|
|
算法说明:
|
|
{json.dumps(details.get('descriptions', {}), indent=2, ensure_ascii=False)}
|
|
"""
|
|
|
|
prompt = f"""
|
|
作为专业的量化交易分析师,你是混合交易系统的一部分。请分析以下市场数据,识别当前的市场结构。
|
|
|
|
你现在拥有来自多个高级算法策略的信号输入,请仔细综合这些信息,特别是SMC策略:
|
|
1. **SMC (Smart Money Concepts) - 核心策略**:
|
|
- 重点关注订单块(Order Blocks, OB)的位置。
|
|
- 识别流动性缺口(Fair Value Gaps, FVG)。
|
|
- 寻找流动性扫荡(Liquidity Sweeps)和市场结构破坏(Market Structure Break, MSB)。
|
|
- 关注溢价区(Premium)和折价区(Discount)的平衡。
|
|
2. IFVG (Inverse FVG): 关注反向价值缺口作为支撑/阻力互换。
|
|
3. CRT (Candle Range Theory): 关注K线区间操纵行为。
|
|
4. RVGI+CCI: 复合动量策略,确认趋势强度。
|
|
5. MFH (Multiple Forecast Horizons): 多周期预测。
|
|
6. MTF (Multi-Timeframe): 多时间周期一致性,大周期制约小周期。
|
|
8. **基于 MFE/MAE 的结构质量评估**: 结合历史绩效 (Avg MFE/MAE),评估当前市场结构是否有利于产生高盈亏比的交易。
|
|
{opt_info_str}
|
|
|
|
{pos_context}
|
|
{perf_context}
|
|
{extra_context}
|
|
市场数据:
|
|
{json.dumps(market_data, indent=2, cls=CustomJSONEncoder)}
|
|
|
|
请提供以下分析结果:
|
|
1. 市场状态:趋势(上升/下降)、震荡、高波动
|
|
2. 主要支撑位和阻力位 (基于SMC的OB/FVG)。**请参考 Avg MAE,确保识别的关键位在合理的风控范围内。**
|
|
3. 市场结构评分(0-100):高分表示趋势明确,低分表示震荡
|
|
4. 短期预测(1-3天):请提供完整、详细的预测逻辑和目标位描述
|
|
5. 关键指标解读
|
|
6. **综合交易建议 (Preliminary Trade Idea)**: 基于你的结构分析和所有输入的高级算法信号,给出一个初步的交易建议 (Buy/Sell/Wait)。**如果建议挂单(Limit),请基于 SMC 结构给出精确的建议价格。**
|
|
7. **策略一致性评估**: 评估各个高级算法信号之间的一致性。
|
|
|
|
请以JSON格式返回结果,包含以下字段:
|
|
- market_state: str
|
|
- support_levels: list[float]
|
|
- resistance_levels: list[float]
|
|
- structure_score: int
|
|
- short_term_prediction: str
|
|
- indicator_analysis: str
|
|
- preliminary_signal: str ("buy", "sell", "neutral")
|
|
- signal_confidence: int (0-100)
|
|
- consistency_analysis: str
|
|
"""
|
|
|
|
# 构建payload,遵循ValueCell的实现
|
|
payload = {
|
|
"model": self.model,
|
|
"messages": [
|
|
{"role": "system", "content": "你是一位专业的量化交易分析师,擅长市场结构分析和趋势识别。"},
|
|
{"role": "user", "content": prompt}
|
|
],
|
|
"temperature": 0.3,
|
|
"max_tokens": 1500,
|
|
"stream": False
|
|
}
|
|
|
|
# 启用JSON模式,ValueCell推荐使用JSON模式处理结构化输出
|
|
if self.enable_json_mode:
|
|
payload["response_format"] = {"type": "json_object"}
|
|
|
|
response = self._call_api("chat/completions", payload)
|
|
if response and "choices" in response:
|
|
try:
|
|
message_content = response["choices"][0]["message"]["content"]
|
|
# Log full response to avoid truncation
|
|
logger.info(f"收到模型响应: {message_content}")
|
|
|
|
analysis_result = json.loads(message_content)
|
|
return analysis_result
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"解析DeepSeek响应失败: {e}")
|
|
logger.error(f"原始响应: {response}")
|
|
# 如果解析失败,返回默认值
|
|
return {
|
|
"market_state": "neutral",
|
|
"support_levels": [],
|
|
"resistance_levels": [],
|
|
"structure_score": 50,
|
|
"short_term_prediction": "neutral",
|
|
"indicator_analysis": "无法解析市场数据"
|
|
}
|
|
return {
|
|
"market_state": "neutral",
|
|
"support_levels": [],
|
|
"resistance_levels": [],
|
|
"structure_score": 50,
|
|
"short_term_prediction": "neutral",
|
|
"indicator_analysis": "API调用失败"
|
|
}
|
|
|
|
def generate_sentiment_score(self, historical_data: Dict[str, Any]) -> float:
|
|
"""
|
|
生成市场情绪得分
|
|
|
|
Args:
|
|
historical_data (Dict[str, Any]): 历史市场数据
|
|
|
|
Returns:
|
|
float: 情绪得分,范围-1到1,-1表示极度看空,1表示极度看多
|
|
"""
|
|
prompt = f"""
|
|
作为专业的市场情绪分析师,请根据以下历史数据生成市场情绪得分:
|
|
|
|
{json.dumps(historical_data, indent=2)}
|
|
|
|
请基于价格走势、成交量变化和指标信号,生成一个情绪得分。
|
|
得分范围为-1到1,其中:
|
|
- -1:极度看空
|
|
- 0:中性
|
|
- 1:极度看多
|
|
|
|
请只返回一个数字,不要包含任何其他文字或解释。
|
|
"""
|
|
|
|
payload = {
|
|
"model": self.model,
|
|
"messages": [
|
|
{"role": "system", "content": "你是一位专业的市场情绪分析师,擅长基于历史数据生成准确的情绪得分。"},
|
|
{"role": "user", "content": prompt}
|
|
],
|
|
"temperature": 0.2,
|
|
"max_tokens": 10
|
|
}
|
|
|
|
response = self._call_api("chat/completions", payload)
|
|
if response and "choices" in response:
|
|
try:
|
|
sentiment_score = float(response["choices"][0]["message"]["content"].strip())
|
|
# 确保得分在-1到1之间
|
|
return max(-1.0, min(1.0, sentiment_score))
|
|
except ValueError:
|
|
logger.error("无法解析情绪得分")
|
|
return 0.0
|
|
|
|
def process_data_for_qwen(self, raw_data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
处理数据,为Qwen3提供结构化输入
|
|
|
|
Args:
|
|
raw_data (Dict[str, Any]): 原始市场数据
|
|
|
|
Returns:
|
|
Dict[str, Any]: 结构化的模型输入数据
|
|
"""
|
|
prompt = f"""
|
|
作为专业的数据分析师,请将以下原始市场数据处理为结构化格式,用于量化交易策略生成:
|
|
|
|
{json.dumps(raw_data, indent=2)}
|
|
|
|
请提取关键特征,计算必要的统计指标,并组织成适合输入到交易策略模型的数据格式。
|
|
|
|
处理要求:
|
|
1. 提取关键价格数据:最高价、最低价、收盘价、成交量
|
|
2. 计算技术指标:EMA交叉、ATR波动率、RSI超买超卖
|
|
3. 识别价格模式:支撑位、阻力位、趋势线
|
|
4. 计算风险指标:波动率、最大回撤
|
|
5. 生成特征向量:用于机器学习模型的输入
|
|
|
|
请以JSON格式返回处理后的数据,确保结构清晰,便于后续模型处理。
|
|
"""
|
|
|
|
payload = {
|
|
"model": self.model,
|
|
"messages": [
|
|
{"role": "system", "content": "你是一位专业的数据分析师,擅长处理金融市场数据,为量化交易策略生成结构化输入。"},
|
|
{"role": "user", "content": prompt}
|
|
],
|
|
"temperature": 0.3,
|
|
"max_tokens": 1500
|
|
}
|
|
|
|
response = self._call_api("chat/completions", payload)
|
|
if response and "choices" in response:
|
|
try:
|
|
processed_data = json.loads(response["choices"][0]["message"]["content"])
|
|
return processed_data
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"解析处理后的数据失败: {e}")
|
|
return raw_data
|
|
return raw_data
|
|
|
|
|
|
def main():
|
|
"""
|
|
主函数用于测试DeepSeek客户端
|
|
"""
|
|
# 示例使用,实际需要替换为有效的API密钥
|
|
api_key = "your_deepseek_api_key"
|
|
client = DeepSeekClient(api_key)
|
|
|
|
# 示例市场数据
|
|
market_data = {
|
|
"symbol": "EURUSD",
|
|
"timeframe": "H1",
|
|
"prices": {
|
|
"open": 1.0850,
|
|
"high": 1.0875,
|
|
"low": 1.0840,
|
|
"close": 1.0865,
|
|
"volume": 1234567
|
|
},
|
|
"indicators": {
|
|
"ema_fast": 1.0855,
|
|
"ema_slow": 1.0848,
|
|
"rsi": 65.2,
|
|
"atr": 0.0025
|
|
},
|
|
"recent_candles": [
|
|
{"time": "2024-01-01 10:00", "open": 1.0840, "high": 1.0860, "low": 1.0835, "close": 1.0855},
|
|
{"time": "2024-01-01 09:00", "open": 1.0830, "high": 1.0850, "low": 1.0825, "close": 1.0840},
|
|
{"time": "2024-01-01 08:00", "open": 1.0820, "high": 1.0840, "low": 1.0815, "close": 1.0830}
|
|
]
|
|
}
|
|
|
|
# 测试市场结构分析
|
|
structure_analysis = client.analyze_market_structure(market_data)
|
|
print("市场结构分析:")
|
|
print(json.dumps(structure_analysis, indent=2, ensure_ascii=False))
|
|
|
|
# 测试情绪得分生成
|
|
sentiment_score = client.generate_sentiment_score(market_data)
|
|
print(f"\n市场情绪得分: {sentiment_score}")
|
|
|
|
# 测试数据处理
|
|
processed_data = client.process_data_for_qwen(market_data)
|
|
print("\n处理后的数据:")
|
|
print(json.dumps(processed_data, indent=2, ensure_ascii=False))
|
|
|
|
if __name__ == "__main__":
|
|
main()
|