mql5/python/backtest_engine.py

391 lines
13 KiB
Python
Raw Permalink Normal View History

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from typing import Dict, Any, List
from datetime import datetime
class BacktestEngine:
"""
回测引擎用于策略历史数据回测和性能评估
"""
def __init__(self, initial_capital: float = 100000.0):
"""
初始化回测引擎
Args:
initial_capital (float): 初始资金默认为100,000.0
"""
self.initial_capital = initial_capital
self.data = None
self.strategy = None
self.results = None
self.trades = None
def load_data(self, data: pd.DataFrame):
"""
加载历史数据
Args:
data (pd.DataFrame): 包含OHLCV和特征的数据
"""
self.data = data.copy()
def set_strategy(self, strategy_func):
"""
设置回测策略
Args:
strategy_func: 策略函数接受数据和回测引擎实例返回信号
"""
self.strategy = strategy_func
def run_backtest(self, risk_per_trade: float = 0.01):
"""
执行回测
Args:
risk_per_trade (float): 每笔交易风险占比默认为1%
"""
if self.data is None:
raise ValueError("No data loaded. Please call load_data() first.")
if self.strategy is None:
raise ValueError("No strategy set. Please call set_strategy() first.")
# 初始化回测变量
capital = self.initial_capital
position = 0 # 持仓量,正数为多,负数为空
trades = []
equity_curve = [capital]
# 遍历数据,执行策略
for i in range(len(self.data)):
# 获取当前数据
current_data = self.data.iloc[i]
# 调用策略函数生成信号
signal = self.strategy(self.data.iloc[:i+1], self)
# 计算ATR用于止损
atr = current_data['atr']
# 计算每笔交易的风险金额
risk_amount = capital * risk_per_trade
# 计算仓位大小(假设1手=100000单位)
if atr > 0:
position_size = int(risk_amount / (atr * 100000))
else:
position_size = 1
# 执行交易
if signal == 1 and position <= 0: # 买入信号
# 平仓(如果有空头仓位)
if position < 0:
exit_price = current_data['close']
pnl = (exit_price - current_data['open']) * abs(position) * 100000
capital += pnl
trades.append({
'entry_time': entry_time,
'entry_price': entry_price,
'exit_time': current_data.name,
'exit_price': exit_price,
'signal': 'sell',
'pnl': pnl,
'capital': capital
})
# 开仓
position = position_size
entry_time = current_data.name
entry_price = current_data['open']
elif signal == -1 and position >= 0: # 卖出信号
# 平仓(如果有多头仓位)
if position > 0:
exit_price = current_data['close']
pnl = (exit_price - entry_price) * position * 100000
capital += pnl
trades.append({
'entry_time': entry_time,
'entry_price': entry_price,
'exit_time': current_data.name,
'exit_price': exit_price,
'signal': 'buy',
'pnl': pnl,
'capital': capital
})
# 开仓
position = -position_size
entry_time = current_data.name
entry_price = current_data['open']
# 更新权益曲线
if position != 0:
# 计算未实现盈亏
current_price = current_data['close']
unrealized_pnl = (current_price - entry_price) * position * 100000
current_capital = capital + unrealized_pnl
else:
current_capital = capital
equity_curve.append(current_capital)
# 保存结果
self.results = {
'equity_curve': equity_curve,
'final_capital': equity_curve[-1],
'initial_capital': self.initial_capital
}
self.trades = pd.DataFrame(trades)
if not self.trades.empty:
self.trades.set_index('entry_time', inplace=True)
# 计算性能指标
self.calculate_metrics()
def calculate_metrics(self):
"""
计算回测性能指标
"""
if self.results is None:
raise ValueError("No backtest results available. Please run_backtest() first.")
equity = pd.Series(self.results['equity_curve'][1:], index=self.data.index)
returns = equity.pct_change().dropna()
# 计算基本指标
total_return = (self.results['final_capital'] - self.results['initial_capital']) / self.results['initial_capital']
annual_return = (1 + total_return) ** (252 / len(returns)) - 1 # 假设252个交易日
# 计算风险指标
volatility = returns.std() * np.sqrt(252)
sharpe_ratio = annual_return / volatility if volatility > 0 else 0
# 计算最大回撤
drawdowns = []
peak = equity.iloc[0]
for value in equity:
if value > peak:
peak = value
drawdown = (peak - value) / peak
drawdowns.append(drawdown)
max_drawdown = max(drawdowns)
# 计算胜率
winning_trades = 0
total_trades = len(self.trades) if self.trades is not None and not self.trades.empty else 0
if total_trades > 0:
winning_trades = len(self.trades[self.trades['pnl'] > 0])
win_rate = winning_trades / total_trades if total_trades > 0 else 0
# 计算平均盈亏比
avg_win = self.trades[self.trades['pnl'] > 0]['pnl'].mean() if winning_trades > 0 else 0
avg_loss = abs(self.trades[self.trades['pnl'] < 0]['pnl'].mean()) if (total_trades - winning_trades) > 0 else 0
risk_reward_ratio = avg_win / avg_loss if avg_loss > 0 else 0
# 保存指标
self.results.update({
'total_return': total_return,
'annual_return': annual_return,
'volatility': volatility,
'sharpe_ratio': sharpe_ratio,
'max_drawdown': max_drawdown,
'win_rate': win_rate,
'risk_reward_ratio': risk_reward_ratio,
'total_trades': total_trades,
'winning_trades': winning_trades
})
def generate_report(self, report_path: str = None):
"""
生成回测报告
Args:
report_path (str): 报告保存路径默认为None不保存
Returns:
Dict[str, Any]: 回测报告
"""
if self.results is None:
raise ValueError("No backtest results available. Please run_backtest() first.")
report = {
'回测参数': {
'初始资金': self.initial_capital,
'回测周期': f"{self.data.index[0]}{self.data.index[-1]}",
'数据周期': str(self.data.index.freq),
'交易品种': 'GOLD'
},
'性能指标': {
'总收益率': f"{self.results['total_return']:.2%}",
'年化收益率': f"{self.results['annual_return']:.2%}",
'波动率': f"{self.results['volatility']:.2%}",
'夏普比率': f"{self.results['sharpe_ratio']:.2f}",
'最大回撤': f"{self.results['max_drawdown']:.2%}",
'胜率': f"{self.results['win_rate']:.2%}",
'风险回报比': f"{self.results['risk_reward_ratio']:.2f}",
'总交易次数': self.results['total_trades'],
'盈利交易次数': self.results['winning_trades']
}
}
# 如果有交易记录,添加交易详情
if self.trades is not None and not self.trades.empty:
report['交易记录'] = self.trades.to_dict('records')
# 保存报告
if report_path:
import json
with open(report_path, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, ensure_ascii=False, default=str)
return report
def plot_results(self, save_path: str = None):
"""
可视化回测结果
Args:
save_path (str): 图表保存路径默认为None不保存
"""
if self.results is None:
raise ValueError("No backtest results available. Please run_backtest() first.")
equity = pd.Series(self.results['equity_curve'][1:], index=self.data.index)
# 创建画布
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10), sharex=True)
# 绘制权益曲线
ax1.plot(equity.index, equity, label='Equity Curve', color='blue')
ax1.set_title('Backtest Results - Equity Curve')
ax1.set_ylabel('Capital')
ax1.grid(True)
ax1.legend()
# 绘制最大回撤
drawdowns = []
peak = equity.iloc[0]
for value in equity:
if value > peak:
peak = value
drawdown = (peak - value) / peak
drawdowns.append(drawdown)
ax2.plot(equity.index, drawdowns, label='Drawdown', color='red')
ax2.set_title('Backtest Results - Drawdown')
ax2.set_xlabel('Date')
ax2.set_ylabel('Drawdown')
ax2.grid(True)
ax2.legend()
# 格式化日期
ax2.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))
fig.autofmt_xdate()
# 保存图表
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches='tight')
plt.tight_layout()
plt.show()
def example_strategy(data, engine):
"""
示例策略EMA交叉策略
Args:
data (pd.DataFrame): 历史数据
engine (BacktestEngine): 回测引擎实例
Returns:
int: 信号1=买入-1=卖出0=持有
"""
if len(data) < 26: # 确保有足够的数据计算指标
return 0
# 简单的EMA交叉策略
if data['ema_fast'].iloc[-1] > data['ema_slow'].iloc[-1] and data['ema_fast'].iloc[-2] <= data['ema_slow'].iloc[-2]:
return 1 # 金叉,买入信号
elif data['ema_fast'].iloc[-1] < data['ema_slow'].iloc[-1] and data['ema_fast'].iloc[-2] >= data['ema_slow'].iloc[-2]:
return -1 # 死叉,卖出信号
else:
return 0 # 无信号
def main():
"""
主函数用于测试回测引擎
"""
from data_processor import MT5DataProcessor
# 初始化数据处理器
processor = MT5DataProcessor()
# 获取历史数据
from datetime import datetime, timedelta
end_date = datetime.now()
start_date = end_date - timedelta(days=30)
# 注意:这里使用模拟数据,因为MT5可能没有连接
# 生成模拟数据
dates = pd.date_range(start=start_date, end=end_date, freq='H')
n = len(dates)
# 生成模拟价格数据
np.random.seed(42)
close = 1900 + np.cumsum(np.random.randn(n) * 2)
high = close + np.random.rand(n) * 3
low = close - np.random.rand(n) * 3
open_price = close.shift(1).fillna(1900)
volume = np.random.randint(100000, 1000000, n)
# 创建模拟DataFrame
df = pd.DataFrame({
'open': open_price,
'high': high,
'low': low,
'close': close,
'volume': volume
}, index=dates)
# 生成特征
df = processor.generate_features(df)
# 初始化回测引擎
engine = BacktestEngine(initial_capital=100000.0)
# 加载数据
engine.load_data(df)
# 设置策略
engine.set_strategy(example_strategy)
# 运行回测
engine.run_backtest(risk_per_trade=0.01)
# 生成报告
report = engine.generate_report()
print("回测报告:")
for section, content in report.items():
print(f"\n{section}:")
if isinstance(content, dict):
for key, value in content.items():
print(f" {key}: {value}")
else:
print(content)
# 绘制结果
engine.plot_results(save_path="/Users/lenovo/tmp/quant_trading_strategy/backtest_reports/example_backtest.png")
if __name__ == "__main__":
main()