Article-22714-Volatility-Mo.../Scripts/slsqp_article/mt5_volatility_processor.py
2026-06-03 20:14:05 +02:00

214 lines
No EOL
7.5 KiB
Python

# Copyright 2025, MetaQuotes Ltd.
# https://www.mql5.com
from datetime import datetime, timezone
import json
import sys
import os
import MetaTrader5 as mt5
import numpy as np
import pandas as pd
from arch import arch_model
from arch.univariate.base import ARCHModelResult
MQL5_FILES_FOLDER = "MQL5\\FILES"
MQL5_COMMON_FOLDER = "FILES"
def arch_to_json(fitted_arch_model: ARCHModelResult, file_path: str):
"""
Serializes a fitted_arch_model ARMA-GARCH model to a JSON file compatible
with MQL5 arch library.
Args:
fitted_arch_model: The arch_model result object (e.g., from arch_model().fit()).
file_path (str): The path and filename to save the JSON file to.
"""
if not isinstance(fitted_arch_model,ARCHModelResult):
raise TypeError(f'invalid type supplied, "fitted_arch_model" should be an instance of ARCHModelResult')
if len(file_path) < 1 or not isinstance(file_path,str):
raise TypeError(f'invalid filename supplied')
print(f"-> Attempting to save model parameters to: {file_path}...")
try:
params_dict = fitted_arch_model.params.to_dict()
mean_model_spec = {
'name':fitted_arch_model.model.name,
'num_params':fitted_arch_model.model.num_params,
'parameter_names':fitted_arch_model.model.parameter_names(),
'lags':np.asarray(fitted_arch_model.model.lags).tolist(),
'hold_back':fitted_arch_model.model.hold_back,
'rescale':str(fitted_arch_model.model.rescale),
'scale':fitted_arch_model.model.scale,
'constant':str(fitted_arch_model.model.constant),
'use_rotated':str(fitted_arch_model.model.use_rotated)
}
volatility_model_spec = {
'name': fitted_arch_model.model.volatility.name,
'num_params':fitted_arch_model.model.volatility.num_params,
'parameter_names':fitted_arch_model.model.volatility.parameter_names(),
'start':fitted_arch_model.model.volatility.start,
'stop':fitted_arch_model.model.volatility.stop,
'closed_form':str(fitted_arch_model.model.volatility.closed_form),
'updateable':str(fitted_arch_model.model.volatility.updateable),
'p':getattr(fitted_arch_model.model.volatility, "p", 0),
'o':getattr(fitted_arch_model.model.volatility, "o", 0),
'q':getattr(fitted_arch_model.model.volatility, "q", 0),
'power':getattr(fitted_arch_model.model.volatility,"power",0)
}
distribution_model_spec = {
'name':fitted_arch_model.model.distribution.name,
'num_params':fitted_arch_model.model.distribution.num_params,
'parameter_names':fitted_arch_model.model.distribution.parameter_names()
}
model_spec = {
'arch_model':mean_model_spec,
'volatility_process':volatility_model_spec,
'distribution':distribution_model_spec
}
# 3. Combine Data and Save to JSON
opt = getattr(fitted_arch_model,"optimization_result",None)
data_to_save = {
'parameters': params_dict,
'model_spec': model_spec,
'fun': getattr(opt,"fun",None),
'jac': list(getattr(opt,"jac","")),
'x' : list(getattr(opt,"x",""))
}
with open(file_path, 'w') as file:
json.dump(data_to_save, file, indent=None,separators=(',', ':'))
except Exception as e:
print(f"!!! Error saving model to JSON: {e}")
sys.exit(1)
def parse_lags(lag_string):
"""Helper to convert a comma-separated string of integers into a list."""
if not lag_string:
return 0
try:
if "," in lag_string:
return [int(x.strip()) for x in lag_string.split(",") if x.strip()]
return int(lag_string)
except Exception:
sys.exit(1)
raise ArgumentTypeError("Lags must be an integer or a comma-separated list of integers.")
def main():
file_name = str(sys.argv[1])
dtime = str(sys.argv[2])
symbol = str(sys.argv[3])
timeframe = str(sys.argv[4])
scale = float(sys.argv[5])
count = int(sys.argv[6])
_p = int(sys.argv[7])
_o = int(sys.argv[8])
_q = int(sys.argv[9])
_mean = str(sys.argv[10])
_include_constant = bool(sys.argv[11])
_vol = str(sys.argv[12])
_dist = str(sys.argv[13])
_lags = str(sys.argv[14])
vol_mapping = {
"constant": "constant",
"arch": "arch",
"garch": "garch",
"gjr-garch": "garch", # GJR-GARCH is a GARCH model with asymmetric lags (o > 0)
"tarch": "garch", # TARCH is GARCH with absolute values (power=1.0)
"avarch": "garch", # Absolute value ARCH
"avgarch": "garch", # Absolute value GARCH
}
dist_mapping = {
"normal": "normal",
"student": "studentsut",
"skewed-student": "skewt",
"gen-error": "ged"
}
chosen_vol = vol_mapping[_vol]
chosen_dist = dist_mapping[_dist]
power_val = 2.0
if _vol in ["tarch", "avarch", "avgarch"]:
power_val = 1.0
if not mt5.initialize():
print(f"Error: MT5 initialization failed.")
sys.exit(1)
return
tinfo = mt5.terminal_info()
if tinfo != None:
tdict = mt5.terminal_info()._asdict()
file_name = os.path.join(tdict['commondata_path'], MQL5_COMMON_FOLDER, file_name)
print(f"file_name {file_name}")
else:
print("Terminal info failure")
mt5.shutdown()
sys.exit(1)
return
timeframe_mapping = {
"M1": mt5.TIMEFRAME_M1, "M5": mt5.TIMEFRAME_M5, "M15": mt5.TIMEFRAME_M15,
"M30": mt5.TIMEFRAME_M30, "H1": mt5.TIMEFRAME_H1, "H4": mt5.TIMEFRAME_H4, "D1": mt5.TIMEFRAME_D1
}
try:
tz = timezone.utc
dt = datetime.strptime(dtime, "%Y.%m.%d_%H:%M")
rates = mt5.copy_rates_from(symbol, timeframe_mapping[timeframe], dt.astimezone(tz), count)
df = pd.DataFrame(rates)
returns = np.log(df["close"] / df["close"].shift(1)).dropna() * scale
except Exception as e:
print(f"Error parsing model tuning parameters: {str(e)}")
sys.exit(1)
finally:
mt5.shutdown()
try:
model = arch_model(
returns,
mean=_mean,
lags=parse_lags(_lags),
vol=chosen_vol,
p=_p,
o=_o,
q=_q,
power=power_val,
dist=chosen_dist
)
res = model.fit(disp="off")
arch_to_json(res,file_name)
except Exception as e:
print(f"Model Configuration/Fitting Error: {str(e)}")
sys.exit(1)
"""
out = f"Dependent Variable Summary (Scaled x{args.scale})\n"
out += f"Mean Model : {_mean.upper()} (Constant: {_include_constant}, Lags: {_lags})\n"
out += f"Volatility Model : {_vol.upper()} (p={_p}, o={_o}, q={_q})\n"
out += f"Distribution : {_dist.upper()}\n"
out += f"Minimum Negative Log-Likelihood: {-res.loglikelihood:.6f}\n"
out += f"Estimated Parameters:\n"
out += f"{res}"
try:
gradients = res.model.score(res.params)
for param_name, grad_val in zip(res.params.index, gradients):
print(f"{param_name:<12} : {grad_val:.10f}")
except Exception:
print("Gradient calculation unsupported or failed for this specification.")
print("==================================================")
"""
if __name__ == "__main__":
main()