214 lines
7.5 KiB
Python
214 lines
7.5 KiB
Python
|
|
# Copyright 2025, MetaQuotes Ltd.
|
||
|
|
# https://www.mql5.com
|
||
|
|
from datetime import datetime, timezone
|
||
|
|
import json
|
||
|
|
import sys
|
||
|
|
import os
|
||
|
|
import MetaTrader5 as mt5
|
||
|
|
import numpy as np
|
||
|
|
import pandas as pd
|
||
|
|
from arch import arch_model
|
||
|
|
from arch.univariate.base import ARCHModelResult
|
||
|
|
|
||
|
|
MQL5_FILES_FOLDER = "MQL5\\FILES"
|
||
|
|
MQL5_COMMON_FOLDER = "FILES"
|
||
|
|
|
||
|
|
def arch_to_json(fitted_arch_model: ARCHModelResult, file_path: str):
|
||
|
|
"""
|
||
|
|
Serializes a fitted_arch_model ARMA-GARCH model to a JSON file compatible
|
||
|
|
with MQL5 arch library.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
fitted_arch_model: The arch_model result object (e.g., from arch_model().fit()).
|
||
|
|
file_path (str): The path and filename to save the JSON file to.
|
||
|
|
"""
|
||
|
|
|
||
|
|
if not isinstance(fitted_arch_model,ARCHModelResult):
|
||
|
|
raise TypeError(f'invalid type supplied, "fitted_arch_model" should be an instance of ARCHModelResult')
|
||
|
|
if len(file_path) < 1 or not isinstance(file_path,str):
|
||
|
|
raise TypeError(f'invalid filename supplied')
|
||
|
|
print(f"-> Attempting to save model parameters to: {file_path}...")
|
||
|
|
try:
|
||
|
|
params_dict = fitted_arch_model.params.to_dict()
|
||
|
|
mean_model_spec = {
|
||
|
|
'name':fitted_arch_model.model.name,
|
||
|
|
'num_params':fitted_arch_model.model.num_params,
|
||
|
|
'parameter_names':fitted_arch_model.model.parameter_names(),
|
||
|
|
'lags':np.asarray(fitted_arch_model.model.lags).tolist(),
|
||
|
|
'hold_back':fitted_arch_model.model.hold_back,
|
||
|
|
'rescale':str(fitted_arch_model.model.rescale),
|
||
|
|
'scale':fitted_arch_model.model.scale,
|
||
|
|
'constant':str(fitted_arch_model.model.constant),
|
||
|
|
'use_rotated':str(fitted_arch_model.model.use_rotated)
|
||
|
|
}
|
||
|
|
volatility_model_spec = {
|
||
|
|
'name': fitted_arch_model.model.volatility.name,
|
||
|
|
'num_params':fitted_arch_model.model.volatility.num_params,
|
||
|
|
'parameter_names':fitted_arch_model.model.volatility.parameter_names(),
|
||
|
|
'start':fitted_arch_model.model.volatility.start,
|
||
|
|
'stop':fitted_arch_model.model.volatility.stop,
|
||
|
|
'closed_form':str(fitted_arch_model.model.volatility.closed_form),
|
||
|
|
'updateable':str(fitted_arch_model.model.volatility.updateable),
|
||
|
|
'p':getattr(fitted_arch_model.model.volatility, "p", 0),
|
||
|
|
'o':getattr(fitted_arch_model.model.volatility, "o", 0),
|
||
|
|
'q':getattr(fitted_arch_model.model.volatility, "q", 0),
|
||
|
|
'power':getattr(fitted_arch_model.model.volatility,"power",0)
|
||
|
|
}
|
||
|
|
distribution_model_spec = {
|
||
|
|
'name':fitted_arch_model.model.distribution.name,
|
||
|
|
'num_params':fitted_arch_model.model.distribution.num_params,
|
||
|
|
'parameter_names':fitted_arch_model.model.distribution.parameter_names()
|
||
|
|
|
||
|
|
}
|
||
|
|
model_spec = {
|
||
|
|
'arch_model':mean_model_spec,
|
||
|
|
'volatility_process':volatility_model_spec,
|
||
|
|
'distribution':distribution_model_spec
|
||
|
|
}
|
||
|
|
|
||
|
|
# 3. Combine Data and Save to JSON
|
||
|
|
opt = getattr(fitted_arch_model,"optimization_result",None)
|
||
|
|
data_to_save = {
|
||
|
|
'parameters': params_dict,
|
||
|
|
'model_spec': model_spec,
|
||
|
|
'fun': getattr(opt,"fun",None),
|
||
|
|
'jac': list(getattr(opt,"jac","")),
|
||
|
|
'x' : list(getattr(opt,"x",""))
|
||
|
|
}
|
||
|
|
|
||
|
|
with open(file_path, 'w') as file:
|
||
|
|
json.dump(data_to_save, file, indent=None,separators=(',', ':'))
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
print(f"!!! Error saving model to JSON: {e}")
|
||
|
|
sys.exit(1)
|
||
|
|
|
||
|
|
|
||
|
|
def parse_lags(lag_string):
|
||
|
|
"""Helper to convert a comma-separated string of integers into a list."""
|
||
|
|
if not lag_string:
|
||
|
|
return 0
|
||
|
|
try:
|
||
|
|
if "," in lag_string:
|
||
|
|
return [int(x.strip()) for x in lag_string.split(",") if x.strip()]
|
||
|
|
return int(lag_string)
|
||
|
|
except Exception:
|
||
|
|
sys.exit(1)
|
||
|
|
raise ArgumentTypeError("Lags must be an integer or a comma-separated list of integers.")
|
||
|
|
|
||
|
|
|
||
|
|
def main():
|
||
|
|
|
||
|
|
file_name = str(sys.argv[1])
|
||
|
|
dtime = str(sys.argv[2])
|
||
|
|
symbol = str(sys.argv[3])
|
||
|
|
timeframe = str(sys.argv[4])
|
||
|
|
scale = float(sys.argv[5])
|
||
|
|
count = int(sys.argv[6])
|
||
|
|
_p = int(sys.argv[7])
|
||
|
|
_o = int(sys.argv[8])
|
||
|
|
_q = int(sys.argv[9])
|
||
|
|
|
||
|
|
_mean = str(sys.argv[10])
|
||
|
|
_include_constant = bool(sys.argv[11])
|
||
|
|
_vol = str(sys.argv[12])
|
||
|
|
_dist = str(sys.argv[13])
|
||
|
|
_lags = str(sys.argv[14])
|
||
|
|
|
||
|
|
|
||
|
|
vol_mapping = {
|
||
|
|
"constant": "constant",
|
||
|
|
"arch": "arch",
|
||
|
|
"garch": "garch",
|
||
|
|
"gjr-garch": "garch", # GJR-GARCH is a GARCH model with asymmetric lags (o > 0)
|
||
|
|
"tarch": "garch", # TARCH is GARCH with absolute values (power=1.0)
|
||
|
|
"avarch": "garch", # Absolute value ARCH
|
||
|
|
"avgarch": "garch", # Absolute value GARCH
|
||
|
|
}
|
||
|
|
|
||
|
|
dist_mapping = {
|
||
|
|
"normal": "normal",
|
||
|
|
"student": "studentsut",
|
||
|
|
"skewed-student": "skewt",
|
||
|
|
"gen-error": "ged"
|
||
|
|
}
|
||
|
|
|
||
|
|
chosen_vol = vol_mapping[_vol]
|
||
|
|
chosen_dist = dist_mapping[_dist]
|
||
|
|
|
||
|
|
power_val = 2.0
|
||
|
|
if _vol in ["tarch", "avarch", "avgarch"]:
|
||
|
|
power_val = 1.0
|
||
|
|
|
||
|
|
if not mt5.initialize():
|
||
|
|
print(f"Error: MT5 initialization failed.")
|
||
|
|
sys.exit(1)
|
||
|
|
return
|
||
|
|
|
||
|
|
tinfo = mt5.terminal_info()
|
||
|
|
if tinfo != None:
|
||
|
|
tdict = mt5.terminal_info()._asdict()
|
||
|
|
file_name = os.path.join(tdict['commondata_path'], MQL5_COMMON_FOLDER, file_name)
|
||
|
|
print(f"file_name {file_name}")
|
||
|
|
else:
|
||
|
|
print("Terminal info failure")
|
||
|
|
mt5.shutdown()
|
||
|
|
sys.exit(1)
|
||
|
|
return
|
||
|
|
|
||
|
|
timeframe_mapping = {
|
||
|
|
"M1": mt5.TIMEFRAME_M1, "M5": mt5.TIMEFRAME_M5, "M15": mt5.TIMEFRAME_M15,
|
||
|
|
"M30": mt5.TIMEFRAME_M30, "H1": mt5.TIMEFRAME_H1, "H4": mt5.TIMEFRAME_H4, "D1": mt5.TIMEFRAME_D1
|
||
|
|
}
|
||
|
|
|
||
|
|
try:
|
||
|
|
tz = timezone.utc
|
||
|
|
dt = datetime.strptime(dtime, "%Y.%m.%d_%H:%M")
|
||
|
|
rates = mt5.copy_rates_from(symbol, timeframe_mapping[timeframe], dt.astimezone(tz), count)
|
||
|
|
df = pd.DataFrame(rates)
|
||
|
|
returns = np.log(df["close"] / df["close"].shift(1)).dropna() * scale
|
||
|
|
except Exception as e:
|
||
|
|
print(f"Error parsing model tuning parameters: {str(e)}")
|
||
|
|
sys.exit(1)
|
||
|
|
finally:
|
||
|
|
mt5.shutdown()
|
||
|
|
|
||
|
|
try:
|
||
|
|
model = arch_model(
|
||
|
|
returns,
|
||
|
|
mean=_mean,
|
||
|
|
lags=parse_lags(_lags),
|
||
|
|
vol=chosen_vol,
|
||
|
|
p=_p,
|
||
|
|
o=_o,
|
||
|
|
q=_q,
|
||
|
|
power=power_val,
|
||
|
|
dist=chosen_dist
|
||
|
|
)
|
||
|
|
|
||
|
|
res = model.fit(disp="off")
|
||
|
|
arch_to_json(res,file_name)
|
||
|
|
except Exception as e:
|
||
|
|
print(f"Model Configuration/Fitting Error: {str(e)}")
|
||
|
|
sys.exit(1)
|
||
|
|
|
||
|
|
"""
|
||
|
|
out = f"Dependent Variable Summary (Scaled x{args.scale})\n"
|
||
|
|
out += f"Mean Model : {_mean.upper()} (Constant: {_include_constant}, Lags: {_lags})\n"
|
||
|
|
out += f"Volatility Model : {_vol.upper()} (p={_p}, o={_o}, q={_q})\n"
|
||
|
|
out += f"Distribution : {_dist.upper()}\n"
|
||
|
|
out += f"Minimum Negative Log-Likelihood: {-res.loglikelihood:.6f}\n"
|
||
|
|
out += f"Estimated Parameters:\n"
|
||
|
|
out += f"{res}"
|
||
|
|
try:
|
||
|
|
gradients = res.model.score(res.params)
|
||
|
|
for param_name, grad_val in zip(res.params.index, gradients):
|
||
|
|
print(f"{param_name:<12} : {grad_val:.10f}")
|
||
|
|
except Exception:
|
||
|
|
print("Gradient calculation unsupported or failed for this specification.")
|
||
|
|
|
||
|
|
print("==================================================")
|
||
|
|
"""
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
main()
|