# Copyright 2026, Niquel Mendoza | Leo. # https://www.mql5.com/es/users/nique_372 # trainer_regression.py #+------------------------------------------------------------------+ #| Imports | #+------------------------------------------------------------------+ import matplotlib matplotlib.use('Agg') import os from comunicator import CMqlComunication from trainer import CModelTrainer from regresion_trainer import CModelTrainerRegression from trainer import SimpleLogger from trainer import Funciones import inspect from copy import deepcopy from pathlib import Path from natsort import natsorted import onnx from onnx import helper from out_fix import fix_onnx_output_shape #+------------------------------------------------------------------+ #| Clase base para entrenar todos los modelos de un folder simbolo | #+------------------------------------------------------------------+ class CPipelineTraining(SimpleLogger.CLoggerBase): def __init__(self, general_config : dict, config_regresion : dict, config_clasificacion : dict, initial_log_flags : int, comunicador : CMqlComunication): super().__init__() # Iniciamos banderas self.AddLogFlags(initial_log_flags) # Comunicador self.m_comunicador : CMqlComunication = comunicador self.m_comunicador.AddLogFlags(self.LogFlags()) # Features init self.m_features_pred : list[str] = [] self.m_features_tp : list[str] = [] self.m_features_sl : list[str] = [] # Rutas relativas (constantes, nunca cambian) self.m_features_pred_relativo : str = general_config.get('features_pred_file', '') self.m_features_tp_relativo : str = general_config.get('features_tp_file', '') self.m_features_sl_relativo : str = general_config.get('features_sl_file', '') # Nombres de archvios self.m_filename_idx : str = general_config.get("file_name_idx", "") self.m_filename_features_ptr : str = general_config.get("file_name_features_ptr", "") # filenames self.m_path_features_pred : str = "" self.m_path_features_tp : str = "" self.m_path_features_sl : str = "" # init self.m_current_timeframe_folder_idx : int = -1 self.m_current_strategy_folder_idx : int = -1 # Seteamos config self.m_config_regresion : dict = deepcopy(config_regresion) self.m_config_clasificacion : dict = deepcopy(config_clasificacion) # Ruta de la carpeta base para entrenar modelos (normalmente nombre de un simbolo) self.m_ruta_entrenmiento : str = general_config.get('path') self.m_simbolo = Path(self.m_ruta_entrenmiento).name # file_data self.m_archivo_desc : Path = Path(os.path.join(self.m_ruta_entrenmiento,"temp.txt")) if self.m_archivo_desc.exists(): self.LogInfo("Archivo init existe temp.txt") cadena : str = self.m_archivo_desc.read_text('utf-16-le').lstrip('\ufeff').strip() cadenas : list[str] = cadena.split("_") # solo si el tamaño es de 2 asingamos if len(cadenas) == 2: self.m_current_timeframe_folder_idx = int(cadenas[0]) self.m_current_strategy_folder_idx = int(cadenas[1]) self.LogInfo(f"Empezando desde timeframe folder index = {self.m_current_timeframe_folder_idx} y strategy index = {self.m_current_strategy_folder_idx}") else: self.LogInfo("Archvio temp.txt de init no existe empzando desde inicio") # ahora listamos todos los folders self.m_folders : list[str] = [] # obteemos todos lo foldes del folder simbolo for f in os.listdir(self.m_ruta_entrenmiento): full_path : str = os.path.join(self.m_ruta_entrenmiento, f) if(os.path.isdir(full_path)): self.m_folders.append(full_path) # check if(len(self.m_folders) < 1): self.LogError("El folder no tiene elementos") return # ordenamos self.m_folders = natsorted(self.m_folders) # imprimimos los folders if(self.IsInfoLogEnabled()): self.FastLog(inspect.currentframe().f_code.co_name,SimpleLogger.CLoggerBase.INFO_TEXT, f"Folders encontrados en folder con simbolo {self.m_simbolo}: ") print(self.m_folders) #+--------------------------------------------------------------------------+ #| Leer archivo de features, simple read y split de comma | #+--------------------------------------------------------------------------+ def _LeerFeaturesFile(self, path: str) -> list[str] | None: try: contenido = Path(path).read_text(encoding='utf-16-le').lstrip('\ufeff').strip() features = [f.strip() for f in contenido.split(',')] return features if features else None except Exception as e: self.LogError(f"Fallo leer features: {path} → {e}") return None #+--------------------------------------------------------------------------+ #| Intenta sobreescribir cada array si encuentra el archivo en este nivel | #+--------------------------------------------------------------------------+ def _IntentarCargarFeatures(self, carpeta_nivel: str) -> None: rutas : dict = { 'pred': os.path.join(carpeta_nivel, self.m_features_pred_relativo), 'tp': os.path.join(carpeta_nivel, self.m_features_tp_relativo), 'sl': os.path.join(carpeta_nivel, self.m_features_sl_relativo), } # Iteracion por todo el dict # La idea es ir viendo si se existe el archivo relativo y si existe sobreecribir el valor # de la varialbe miembro correspondiente for tipo, path in rutas.items(): if os.path.exists(path): features = self._LeerFeaturesFile(path) if features is not None: setattr(self, f"m_path_features_{tipo}", path) setattr(self, f"m_features_{tipo}", features) self.LogInfo(f"Override features [{tipo}] desde: {path}") #+--------------------------------------------------------------------------+ #| Check antes del entremiento de cada modelo (por ahora solo features) | #+--------------------------------------------------------------------------+ def _CheckAntesEntrenamiento(self) -> None: if (len(self.m_features_sl) < 1 or len(self.m_features_pred) < 1 or len(self.m_features_tp) < 1): self.LogFatalError("No se han logrado cargar las features para el entremiento") Funciones.Remover(1) # Salimos fallo fatal #+------------------------------------------------------------------+ #| Funcion base para procesar un string array | #| La idea es obtener los indices del modelo (feautres elegidas) | #+------------------------------------------------------------------+ def _ProcesarStringArray(self, array_features : list[str], array_base_feautres : list[str]) -> list[int]: idx_arr : list[int] = [] # Iteracion for feature_str in array_features: # declracion inciial idx : int = -1 # tratamos de obtener su indice try: idx = array_base_feautres.index(feature_str) except ValueError: if(feature_str != " tipo de operacion"): self.LogCriticalError(f"Error al obtener indice de features, features '{feature_str}' invalida") self.LogInfo(f' Base features:\n{Funciones.array_to_string(array_base_feautres,"|","[","]")}') self.LogInfo(f' Selected features:\n{Funciones.array_to_string(array_features,"|","[","]")}') Funciones.Remover(1) return None if(idx == -1): self.LogInfo(f"Omitiendo features {feature_str} dado que no se encontro en indice, puede ser normal si noe s tipo de operacion") continue else: idx_arr.append(idx) # encontramos el indice de la feautres return idx_arr # retonramos el array con los idncies encontrados #+------------------------------------------------------------------+ #| Funcion base para procesar un string array | #| La idea es obtener los indices del modelo (feautres elegidas) | #+------------------------------------------------------------------+ def _ProcesarTimeframeFolder(self, path_folder_timeframe: str) -> bool: # Intentemoa cargar la confix de feautures de este nivel self._IntentarCargarFeatures(path_folder_timeframe) # Iteramos sobre todos los folsers [Estrategias] de este nivel folders : list[str] = self._GetSortedSubFolders(path_folder_timeframe) for index, folder_final in enumerate(folders): if index <= self.m_current_strategy_folder_idx: continue if not self._ProcesarStrategyFolder(index, folder_final): return False return True #+----------------------------------------------------------------------------+ #| Funcion para procesar un folder de estratega (esta si tiene los modelos) | #+----------------------------------------------------------------------------+ def _ProcesarStrategyFolder(self, index: int, folder: str) -> bool: # Intentamos cargar las features que pueda exisitir en este nivel self._IntentarCargarFeatures(folder) # Check de features self._CheckAntesEntrenamiento() # Info self.LogInfo(f"Procesando folder: {folder}") contenido_idx : str = "" # Clasificacion idx = self._EntrenarClasificacion(folder) if idx is None: return False contenido_idx += self._FormatIdx("modelo_pred_idx", idx) # Regresion TP idx = self._EntrenarRegresion(folder, "tp") if idx is None: return False contenido_idx += self._FormatIdx("modelo_pred_tp_idx", idx) # Regresión SL idx = self._EntrenarRegresion(folder, "sl") if idx is None: return False contenido_idx += self._FormatIdx("modelo_pred_sl_idx", idx) # Escritura y fix if not self._EscribirIdx(folder, contenido_idx): return False if not self._EscriirFileFeaturesPtr(folder): return False self._FixOnnxModelos(folder) self._GuardarCheckpoint(index) return True #+----------------------------------------------------------------------------+ #| Funcion para entnrenar a los modelos de clasificacion | #+----------------------------------------------------------------------------+ def _EntrenarClasificacion(self, folder: str) -> list[int] | None: self.m_config_clasificacion['csv_file'] = os.path.join(folder, self.m_config_clasificacion["data_csv_file"]) self.m_config_clasificacion['output_folder'] = folder # configracuinl del modelo y propagacion de flags modelo = CModelTrainer(self.m_config_clasificacion) modelo.AddLogFlags(self.LogFlags()) if not modelo.Execute(): self.LogCriticalError(f"Fallo clasificacion, data = : {self.m_config_clasificacion['csv_file']}") return None else: self.m_comunicador.SendClasificacion(folder, modelo.GetMetrics()) return self._ProcesarStringArray(modelo.GetSelectedFeatures(), self.m_features_pred) #+----------------------------------------------------------------------------+ #| Funcion para entnrenar a los modelos de regresion | #+----------------------------------------------------------------------------+ def _EntrenarRegresion(self, folder: str, tipo: str) -> list[int] | None: # tipo = "tp" o "sl" # configuracion inicial del modelo de regresion self.m_config_regresion['csv_file'] = os.path.join(folder, self.m_config_regresion[f"data_csv_file_{tipo}"]) self.m_config_regresion['output_folder'] = folder self.m_config_regresion['model_name'] = self.m_config_regresion[f'model_name_{tipo}'] # Creamos el modelo y propagamos sus flags modelo = CModelTrainerRegression(self.m_config_regresion) modelo.AddLogFlags(self.LogFlags()) if not modelo.Execute(): self.LogCriticalError(f"Fallo regresion {tipo}, archivo = : {self.m_config_regresion['csv_file']}") return None else: self.m_comunicador.SendRegresion(folder, modelo.GetMetrics()) return self._ProcesarStringArray(modelo.GetSelectedFeatures(), getattr(self, f"m_features_{tipo}")) #+----------------------------------------------------------------------------+ #| Funcion para arreglar la salida ouptu de los modelos para hacerlo 1 | #+----------------------------------------------------------------------------+ def _FixOnnxModelos(self, folder: str) -> None: for tipo in ["tp", "sl"]: nombre = self.m_config_regresion[f'model_name_{tipo}'] src = os.path.join(folder, f"{nombre}.onnx") dest = os.path.join(folder, f"{nombre}_f.onnx") fix_onnx_output_shape(src, dest) #+----------------------------------------------------------------------------+ #| Escribir todos los indices de feaures en el archivo idx luego del training | #+----------------------------------------------------------------------------+ def _EscribirIdx(self, folder: str, contenido: str) -> bool: # Indices de modelos path : Path = Path(os.path.join(folder, self.m_filename_idx)) try: path.write_text(contenido, encoding='utf-16-le') return True except Exception as e: self.LogError(f"Fallo escribir en {path.name}, err: {e}") return False #+----------------------------------------------------------------------------------------------+ #| Escribir el archivo pointer que da las rutas de donde se ubican los archivos de features | #+----------------------------------------------------------------------------------------------+ def _EscriirFileFeaturesPtr(self, folder : str) -> bool: # Path path : Path = Path(os.path.join(folder, self.m_filename_features_ptr)) data : str = "" # contenido data += self.m_path_features_pred + "\n" data += self.m_path_features_sl + "\n" data += self.m_path_features_tp + "\n" # intentamos la escritura try: path.write_text(data, encoding='utf-16-le') return True except Exception as e: self.LogError(f"Fallo escribir en {path.name}, err: {e}") return False #+----------------------------------------------------------------------------+ #| Guardar el progreso | #+----------------------------------------------------------------------------+ def _GuardarCheckpoint(self, index: int) -> None: self.m_current_strategy_folder_idx = index cadena = f"{self.m_current_timeframe_folder_idx}_{index}" self.m_archivo_desc.write_text(cadena, encoding='utf-16-le') #+----------------------------------------------------------------------------+ #| Format idx para obtener una cadena | #+----------------------------------------------------------------------------+ def _FormatIdx(self, nombre: str, indices: list[int]) -> str: return nombre + ": " + Funciones.array_to_string(indices, ",", "[", "]") + "\n" # Sort de los folders def _GetSortedSubFolders(self, path: str) -> list[str]: folders = [ os.path.join(path, f) for f in os.listdir(path) if os.path.isdir(os.path.join(path, f)) ] return natsorted(folders) #+---------------------------------------------------------------------+ #| Esta funcion es la principal aqui iteramos sobre todos los folders | #| Timeframe y llamaamos a ProcesarTimeframeFolder | #| Para que procese todos los modelos | #+---------------------------------------------------------------------+ def Execute(self) -> bool: # Check point inicial self._GuardarCheckpoint(self.m_current_strategy_folder_idx) # Intenteamos cargar feautres del root self._IntentarCargarFeatures(self.m_ruta_entrenmiento) # Iteracion principal for index, folder in enumerate(self.m_folders): # Si el indice es menor al inicio previsto omitmimos esta iteracion if(index<=self.m_current_timeframe_folder_idx): continue # Procesamos archivo if(not self._ProcesarTimeframeFolder(folder)): self.LogError(f"Fallo al procesar el folder timeframe:\n{folder}") Funciones.Remover(1) return False else: # Exito al procesar self.m_current_strategy_folder_idx = -1 # indice de strategy self.m_current_timeframe_folder_idx = index self._GuardarCheckpoint(self.m_current_strategy_folder_idx) # Fin del ckecpoint self.m_archivo_desc.write_text("finalizado",encoding='utf-16-le') # Retornamos exito return True # Exito def main() -> None: config_clasificion : dict = { 'target_col': ' salida', 'model_name': 'ModelPred', 'num_features': 25, 'validation_split': 0.2, 'n_trials': 75, 'k_folds': 5, 'random_seed': 42, 'hilos' : 2, 'jobs_optuna' : 12, 'final_hilos' : 20, 'data_csv_file' : "pred.csv", } config_regresion : dict = { 'target_col': ' salida', 'model_name_tp': 'ModelTP', 'model_name_sl': 'ModelSL', 'num_features': 25, 'validation_split': 0.2, 'n_trials': 75, 'k_folds': 5, 'random_seed': 42, 'hilos' : 2, 'jobs_optuna' : 12, 'final_hilos' : 20, 'data_csv_file_tp' : "tp_data.csv", 'data_csv_file_sl' : "sl_data.csv", } # Configuracion general para las features # los nombres de los archivos de features son relativos # json path sera el archivo donde se pondran los resultados finales (protocolo de ocmunicacion) general_config : dict = { 'path' : 'C:\\Users\\leoxd\\AppData\\Roaming\\MetaQuotes\\Terminal\\Common\\Files\\ICTKiller\\NAS100', 'features_sl_file' : 'Features\\sl.csv', 'features_tp_file' : 'Features\\tp.csv', 'features_pred_file' : 'Features\\pred.csv' } ejecutor = CPipelineTraining(general_config,config_regresion,config_clasificion, SimpleLogger.CLoggerBase.LOG_ALL) if(not ejecutor.Execute()): print("Fallo al ejeuctar el pipineline")