forked from nique_372/AiDataGenByLeo
458 lines
No EOL
20 KiB
Python
458 lines
No EOL
20 KiB
Python
# Copyright 2026, Niquel Mendoza | Leo.
|
|
# https://www.mql5.com/es/users/nique_372
|
|
# trainer_regression.py
|
|
|
|
#+------------------------------------------------------------------+
|
|
#| Imports |
|
|
#+------------------------------------------------------------------+
|
|
import matplotlib
|
|
matplotlib.use('Agg')
|
|
|
|
import os
|
|
from comunicator import CMqlComunication
|
|
from trainer import CModelTrainer
|
|
from regresion_trainer import CModelTrainerRegression
|
|
from trainer import SimpleLogger
|
|
from trainer import Funciones
|
|
import inspect
|
|
from copy import deepcopy
|
|
from pathlib import Path
|
|
from natsort import natsorted
|
|
import onnx
|
|
from onnx import helper
|
|
from out_fix import fix_onnx_output_shape
|
|
|
|
|
|
#+------------------------------------------------------------------+
|
|
#| Clase base para entrenar todos los modelos de un folder simbolo |
|
|
#+------------------------------------------------------------------+
|
|
class CPipelineTraining(SimpleLogger.CLoggerBase):
|
|
|
|
def __init__(self, general_config : dict, config_regresion : dict,
|
|
config_clasificacion : dict, initial_log_flags : int, comunicador : CMqlComunication):
|
|
super().__init__()
|
|
|
|
# Iniciamos banderas
|
|
self.AddLogFlags(initial_log_flags)
|
|
|
|
|
|
# Comunicador
|
|
self.m_comunicador : CMqlComunication = comunicador
|
|
self.m_comunicador.AddLogFlags(self.LogFlags())
|
|
|
|
# Features init
|
|
self.m_features_pred : list[str] = []
|
|
self.m_features_tp : list[str] = []
|
|
self.m_features_sl : list[str] = []
|
|
|
|
# Rutas relativas (constantes, nunca cambian)
|
|
self.m_features_pred_relativo : str = general_config.get('features_pred_file', '')
|
|
self.m_features_tp_relativo : str = general_config.get('features_tp_file', '')
|
|
self.m_features_sl_relativo : str = general_config.get('features_sl_file', '')
|
|
|
|
# Nombres de archvios
|
|
self.m_filename_idx : str = general_config.get("file_name_idx", "")
|
|
self.m_filename_features_ptr : str = general_config.get("file_name_features_ptr", "")
|
|
|
|
# filenames
|
|
self.m_path_features_pred : str = ""
|
|
self.m_path_features_tp : str = ""
|
|
self.m_path_features_sl : str = ""
|
|
|
|
# init
|
|
self.m_current_timeframe_folder_idx : int = -1
|
|
self.m_current_strategy_folder_idx : int = -1
|
|
|
|
# Seteamos config
|
|
self.m_config_regresion : dict = deepcopy(config_regresion)
|
|
self.m_config_clasificacion : dict = deepcopy(config_clasificacion)
|
|
|
|
# Ruta de la carpeta base para entrenar modelos (normalmente nombre de un simbolo)
|
|
self.m_ruta_entrenmiento : str = general_config.get('path')
|
|
self.m_simbolo = Path(self.m_ruta_entrenmiento).name
|
|
|
|
# file_data
|
|
self.m_archivo_desc : Path = Path(os.path.join(self.m_ruta_entrenmiento,"temp.txt"))
|
|
|
|
if self.m_archivo_desc.exists():
|
|
self.LogInfo("Archivo init existe temp.txt")
|
|
cadena : str = self.m_archivo_desc.read_text('utf-16-le').lstrip('\ufeff').strip()
|
|
cadenas : list[str] = cadena.split("_")
|
|
|
|
# solo si el tamaño es de 2 asingamos
|
|
if len(cadenas) == 2:
|
|
self.m_current_timeframe_folder_idx = int(cadenas[0])
|
|
self.m_current_strategy_folder_idx = int(cadenas[1])
|
|
|
|
self.LogInfo(f"Empezando desde timeframe folder index = {self.m_current_timeframe_folder_idx} y strategy index = {self.m_current_strategy_folder_idx}")
|
|
else:
|
|
self.LogInfo("Archvio temp.txt de init no existe empzando desde inicio")
|
|
|
|
# ahora listamos todos los folders
|
|
self.m_folders : list[str] = []
|
|
|
|
# obteemos todos lo foldes del folder simbolo
|
|
for f in os.listdir(self.m_ruta_entrenmiento):
|
|
full_path : str = os.path.join(self.m_ruta_entrenmiento, f)
|
|
if(os.path.isdir(full_path)):
|
|
self.m_folders.append(full_path)
|
|
|
|
# check
|
|
if(len(self.m_folders) < 1):
|
|
self.LogError("El folder no tiene elementos")
|
|
return
|
|
|
|
# ordenamos
|
|
self.m_folders = natsorted(self.m_folders)
|
|
|
|
# imprimimos los folders
|
|
if(self.IsInfoLogEnabled()):
|
|
self.FastLog(inspect.currentframe().f_code.co_name,SimpleLogger.CLoggerBase.INFO_TEXT, f"Folders encontrados en folder con simbolo {self.m_simbolo}: ")
|
|
print(self.m_folders)
|
|
|
|
#+--------------------------------------------------------------------------+
|
|
#| Leer archivo de features, simple read y split de comma |
|
|
#+--------------------------------------------------------------------------+
|
|
def _LeerFeaturesFile(self, path: str) -> list[str] | None:
|
|
try:
|
|
contenido = Path(path).read_text(encoding='utf-16-le').lstrip('\ufeff').strip()
|
|
features = [f.strip() for f in contenido.split(',')]
|
|
return features if features else None
|
|
except Exception as e:
|
|
self.LogError(f"Fallo leer features: {path} → {e}")
|
|
return None
|
|
|
|
#+--------------------------------------------------------------------------+
|
|
#| Intenta sobreescribir cada array si encuentra el archivo en este nivel |
|
|
#+--------------------------------------------------------------------------+
|
|
def _IntentarCargarFeatures(self, carpeta_nivel: str) -> None:
|
|
|
|
rutas : dict = {
|
|
'pred': os.path.join(carpeta_nivel, self.m_features_pred_relativo),
|
|
'tp': os.path.join(carpeta_nivel, self.m_features_tp_relativo),
|
|
'sl': os.path.join(carpeta_nivel, self.m_features_sl_relativo),
|
|
}
|
|
|
|
# Iteracion por todo el dict
|
|
# La idea es ir viendo si se existe el archivo relativo y si existe sobreecribir el valor
|
|
# de la varialbe miembro correspondiente
|
|
for tipo, path in rutas.items():
|
|
if os.path.exists(path):
|
|
features = self._LeerFeaturesFile(path)
|
|
if features is not None:
|
|
setattr(self, f"m_path_features_{tipo}", path)
|
|
setattr(self, f"m_features_{tipo}", features)
|
|
self.LogInfo(f"Override features [{tipo}] desde: {path}")
|
|
|
|
|
|
#+--------------------------------------------------------------------------+
|
|
#| Check antes del entremiento de cada modelo (por ahora solo features) |
|
|
#+--------------------------------------------------------------------------+
|
|
def _CheckAntesEntrenamiento(self) -> None:
|
|
if (len(self.m_features_sl) < 1 or
|
|
len(self.m_features_pred) < 1 or
|
|
len(self.m_features_tp) < 1):
|
|
self.LogFatalError("No se han logrado cargar las features para el entremiento")
|
|
Funciones.Remover(1) # Salimos fallo fatal
|
|
|
|
|
|
|
|
#+------------------------------------------------------------------+
|
|
#| Funcion base para procesar un string array |
|
|
#| La idea es obtener los indices del modelo (feautres elegidas) |
|
|
#+------------------------------------------------------------------+
|
|
def _ProcesarStringArray(self, array_features : list[str], array_base_feautres : list[str]) -> list[int]:
|
|
idx_arr : list[int] = []
|
|
|
|
# Iteracion
|
|
for feature_str in array_features:
|
|
# declracion inciial
|
|
idx : int = -1
|
|
|
|
# tratamos de obtener su indice
|
|
try:
|
|
idx = array_base_feautres.index(feature_str)
|
|
except ValueError:
|
|
if(feature_str != " tipo de operacion"):
|
|
self.LogCriticalError(f"Error al obtener indice de features, features '{feature_str}' invalida")
|
|
self.LogInfo(f' Base features:\n{Funciones.array_to_string(array_base_feautres,"|","[","]")}')
|
|
self.LogInfo(f' Selected features:\n{Funciones.array_to_string(array_features,"|","[","]")}')
|
|
Funciones.Remover(1)
|
|
return None
|
|
|
|
if(idx == -1):
|
|
self.LogInfo(f"Omitiendo features {feature_str} dado que no se encontro en indice, puede ser normal si noe s tipo de operacion")
|
|
continue
|
|
else:
|
|
idx_arr.append(idx) # encontramos el indice de la feautres
|
|
|
|
|
|
return idx_arr # retonramos el array con los idncies encontrados
|
|
|
|
|
|
#+------------------------------------------------------------------+
|
|
#| Funcion base para procesar un string array |
|
|
#| La idea es obtener los indices del modelo (feautres elegidas) |
|
|
#+------------------------------------------------------------------+
|
|
def _ProcesarTimeframeFolder(self, path_folder_timeframe: str) -> bool:
|
|
|
|
# Intentemoa cargar la confix de feautures de este nivel
|
|
self._IntentarCargarFeatures(path_folder_timeframe)
|
|
|
|
# Iteramos sobre todos los folsers [Estrategias] de este nivel
|
|
folders : list[str] = self._GetSortedSubFolders(path_folder_timeframe)
|
|
|
|
for index, folder_final in enumerate(folders):
|
|
if index <= self.m_current_strategy_folder_idx:
|
|
continue
|
|
|
|
if not self._ProcesarStrategyFolder(index, folder_final):
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
#+----------------------------------------------------------------------------+
|
|
#| Funcion para procesar un folder de estratega (esta si tiene los modelos) |
|
|
#+----------------------------------------------------------------------------+
|
|
def _ProcesarStrategyFolder(self, index: int, folder: str) -> bool:
|
|
# Intentamos cargar las features que pueda exisitir en este nivel
|
|
self._IntentarCargarFeatures(folder)
|
|
|
|
# Check de features
|
|
self._CheckAntesEntrenamiento()
|
|
|
|
# Info
|
|
self.LogInfo(f"Procesando folder: {folder}")
|
|
|
|
contenido_idx : str = ""
|
|
|
|
# Clasificacion
|
|
idx = self._EntrenarClasificacion(folder)
|
|
if idx is None:
|
|
return False
|
|
contenido_idx += self._FormatIdx("modelo_pred_idx", idx)
|
|
|
|
# Regresion TP
|
|
idx = self._EntrenarRegresion(folder, "tp")
|
|
if idx is None:
|
|
return False
|
|
contenido_idx += self._FormatIdx("modelo_pred_tp_idx", idx)
|
|
|
|
# Regresión SL
|
|
idx = self._EntrenarRegresion(folder, "sl")
|
|
if idx is None:
|
|
return False
|
|
contenido_idx += self._FormatIdx("modelo_pred_sl_idx", idx)
|
|
|
|
# Escritura y fix
|
|
if not self._EscribirIdx(folder, contenido_idx):
|
|
return False
|
|
if not self._EscriirFileFeaturesPtr(folder):
|
|
return False
|
|
|
|
self._FixOnnxModelos(folder)
|
|
self._GuardarCheckpoint(index)
|
|
|
|
return True
|
|
|
|
#+----------------------------------------------------------------------------+
|
|
#| Funcion para entnrenar a los modelos de clasificacion |
|
|
#+----------------------------------------------------------------------------+
|
|
def _EntrenarClasificacion(self, folder: str) -> list[int] | None:
|
|
self.m_config_clasificacion['csv_file'] = os.path.join(folder, self.m_config_clasificacion["data_csv_file"])
|
|
self.m_config_clasificacion['output_folder'] = folder
|
|
|
|
# configracuinl del modelo y propagacion de flags
|
|
modelo = CModelTrainer(self.m_config_clasificacion)
|
|
modelo.AddLogFlags(self.LogFlags())
|
|
|
|
if not modelo.Execute():
|
|
self.LogCriticalError(f"Fallo clasificacion, data = : {self.m_config_clasificacion['csv_file']}")
|
|
return None
|
|
else:
|
|
self.m_comunicador.SendClasificacion(folder, modelo.GetMetrics())
|
|
|
|
return self._ProcesarStringArray(modelo.GetSelectedFeatures(), self.m_features_pred)
|
|
|
|
#+----------------------------------------------------------------------------+
|
|
#| Funcion para entnrenar a los modelos de regresion |
|
|
#+----------------------------------------------------------------------------+
|
|
def _EntrenarRegresion(self, folder: str, tipo: str) -> list[int] | None:
|
|
# tipo = "tp" o "sl"
|
|
# configuracion inicial del modelo de regresion
|
|
self.m_config_regresion['csv_file'] = os.path.join(folder, self.m_config_regresion[f"data_csv_file_{tipo}"])
|
|
self.m_config_regresion['output_folder'] = folder
|
|
self.m_config_regresion['model_name'] = self.m_config_regresion[f'model_name_{tipo}']
|
|
|
|
# Creamos el modelo y propagamos sus flags
|
|
modelo = CModelTrainerRegression(self.m_config_regresion)
|
|
modelo.AddLogFlags(self.LogFlags())
|
|
|
|
if not modelo.Execute():
|
|
self.LogCriticalError(f"Fallo regresion {tipo}, archivo = : {self.m_config_regresion['csv_file']}")
|
|
return None
|
|
else:
|
|
self.m_comunicador.SendRegresion(folder, modelo.GetMetrics())
|
|
|
|
return self._ProcesarStringArray(modelo.GetSelectedFeatures(), getattr(self, f"m_features_{tipo}"))
|
|
|
|
#+----------------------------------------------------------------------------+
|
|
#| Funcion para arreglar la salida ouptu de los modelos para hacerlo 1 |
|
|
#+----------------------------------------------------------------------------+
|
|
def _FixOnnxModelos(self, folder: str) -> None:
|
|
for tipo in ["tp", "sl"]:
|
|
nombre = self.m_config_regresion[f'model_name_{tipo}']
|
|
src = os.path.join(folder, f"{nombre}.onnx")
|
|
dest = os.path.join(folder, f"{nombre}_f.onnx")
|
|
fix_onnx_output_shape(src, dest)
|
|
|
|
#+----------------------------------------------------------------------------+
|
|
#| Escribir todos los indices de feaures en el archivo idx luego del training |
|
|
#+----------------------------------------------------------------------------+
|
|
def _EscribirIdx(self, folder: str, contenido: str) -> bool:
|
|
|
|
# Indices de modelos
|
|
path : Path = Path(os.path.join(folder, self.m_filename_idx))
|
|
try:
|
|
path.write_text(contenido, encoding='utf-16-le')
|
|
return True
|
|
except Exception as e:
|
|
self.LogError(f"Fallo escribir en {path.name}, err: {e}")
|
|
return False
|
|
|
|
#+----------------------------------------------------------------------------------------------+
|
|
#| Escribir el archivo pointer que da las rutas de donde se ubican los archivos de features |
|
|
#+----------------------------------------------------------------------------------------------+
|
|
def _EscriirFileFeaturesPtr(self, folder : str) -> bool:
|
|
# Path
|
|
path : Path = Path(os.path.join(folder, self.m_filename_features_ptr))
|
|
data : str = ""
|
|
|
|
# contenido
|
|
data += self.m_path_features_pred + "\n"
|
|
data += self.m_path_features_sl + "\n"
|
|
data += self.m_path_features_tp + "\n"
|
|
|
|
# intentamos la escritura
|
|
try:
|
|
path.write_text(data, encoding='utf-16-le')
|
|
return True
|
|
except Exception as e:
|
|
self.LogError(f"Fallo escribir en {path.name}, err: {e}")
|
|
return False
|
|
|
|
|
|
|
|
#+----------------------------------------------------------------------------+
|
|
#| Guardar el progreso |
|
|
#+----------------------------------------------------------------------------+
|
|
def _GuardarCheckpoint(self, index: int) -> None:
|
|
self.m_current_strategy_folder_idx = index
|
|
cadena = f"{self.m_current_timeframe_folder_idx}_{index}"
|
|
self.m_archivo_desc.write_text(cadena, encoding='utf-16-le')
|
|
|
|
#+----------------------------------------------------------------------------+
|
|
#| Format idx para obtener una cadena |
|
|
#+----------------------------------------------------------------------------+
|
|
def _FormatIdx(self, nombre: str, indices: list[int]) -> str:
|
|
return nombre + ": " + Funciones.array_to_string(indices, ",", "[", "]") + "\n"
|
|
|
|
|
|
# Sort de los folders
|
|
def _GetSortedSubFolders(self, path: str) -> list[str]:
|
|
folders = [
|
|
os.path.join(path, f)
|
|
for f in os.listdir(path)
|
|
if os.path.isdir(os.path.join(path, f))
|
|
]
|
|
return natsorted(folders)
|
|
|
|
#+---------------------------------------------------------------------+
|
|
#| Esta funcion es la principal aqui iteramos sobre todos los folders |
|
|
#| Timeframe y llamaamos a ProcesarTimeframeFolder |
|
|
#| Para que procese todos los modelos |
|
|
#+---------------------------------------------------------------------+
|
|
def Execute(self) -> bool:
|
|
# Check point inicial
|
|
self._GuardarCheckpoint(self.m_current_strategy_folder_idx)
|
|
|
|
# Intenteamos cargar feautres del root
|
|
self._IntentarCargarFeatures(self.m_ruta_entrenmiento)
|
|
|
|
# Iteracion principal
|
|
for index, folder in enumerate(self.m_folders):
|
|
|
|
# Si el indice es menor al inicio previsto omitmimos esta iteracion
|
|
if(index<=self.m_current_timeframe_folder_idx):
|
|
continue
|
|
|
|
# Procesamos archivo
|
|
if(not self._ProcesarTimeframeFolder(folder)):
|
|
self.LogError(f"Fallo al procesar el folder timeframe:\n{folder}")
|
|
Funciones.Remover(1)
|
|
return False
|
|
else: # Exito al procesar
|
|
self.m_current_strategy_folder_idx = -1 # indice de strategy
|
|
self.m_current_timeframe_folder_idx = index
|
|
self._GuardarCheckpoint(self.m_current_strategy_folder_idx)
|
|
|
|
|
|
# Fin del ckecpoint
|
|
self.m_archivo_desc.write_text("finalizado",encoding='utf-16-le')
|
|
|
|
# Retornamos exito
|
|
return True # Exito
|
|
|
|
|
|
|
|
def main() -> None:
|
|
config_clasificion : dict = {
|
|
'target_col': ' salida',
|
|
'model_name': 'ModelPred',
|
|
'num_features': 25,
|
|
'validation_split': 0.2,
|
|
'n_trials': 75,
|
|
'k_folds': 5,
|
|
'random_seed': 42,
|
|
'hilos' : 2,
|
|
'jobs_optuna' : 12,
|
|
'final_hilos' : 20,
|
|
'data_csv_file' : "pred.csv",
|
|
}
|
|
|
|
config_regresion : dict = {
|
|
'target_col': ' salida',
|
|
'model_name_tp': 'ModelTP',
|
|
'model_name_sl': 'ModelSL',
|
|
'num_features': 25,
|
|
'validation_split': 0.2,
|
|
'n_trials': 75,
|
|
'k_folds': 5,
|
|
'random_seed': 42,
|
|
'hilos' : 2,
|
|
'jobs_optuna' : 12,
|
|
'final_hilos' : 20,
|
|
'data_csv_file_tp' : "tp_data.csv",
|
|
'data_csv_file_sl' : "sl_data.csv",
|
|
}
|
|
|
|
|
|
# Configuracion general para las features
|
|
# los nombres de los archivos de features son relativos
|
|
# json path sera el archivo donde se pondran los resultados finales (protocolo de ocmunicacion)
|
|
|
|
general_config : dict = {
|
|
'path' : 'C:\\Users\\leoxd\\AppData\\Roaming\\MetaQuotes\\Terminal\\Common\\Files\\ICTKiller\\NAS100',
|
|
'features_sl_file' : 'Features\\sl.csv',
|
|
'features_tp_file' : 'Features\\tp.csv',
|
|
'features_pred_file' : 'Features\\pred.csv'
|
|
}
|
|
|
|
ejecutor = CPipelineTraining(general_config,config_regresion,config_clasificion, SimpleLogger.CLoggerBase.LOG_ALL)
|
|
|
|
if(not ejecutor.Execute()):
|
|
print("Fallo al ejeuctar el pipineline")
|
|
|
|
|
|
|