AiDataGenByLeo/Py/final_trainer.py
Nique_372 d3177ce0f5
2026-03-30 09:08:39 -05:00

458 lines
No EOL
20 KiB
Python

# Copyright 2026, Niquel Mendoza | Leo.
# https://www.mql5.com/es/users/nique_372
# trainer_regression.py
#+------------------------------------------------------------------+
#| Imports |
#+------------------------------------------------------------------+
import matplotlib
matplotlib.use('Agg')
import os
from comunicator import CMqlComunication
from trainer import CModelTrainer
from regresion_trainer import CModelTrainerRegression
from trainer import SimpleLogger
from trainer import Funciones
import inspect
from copy import deepcopy
from pathlib import Path
from natsort import natsorted
import onnx
from onnx import helper
from out_fix import fix_onnx_output_shape
#+------------------------------------------------------------------+
#| Clase base para entrenar todos los modelos de un folder simbolo |
#+------------------------------------------------------------------+
class CPipelineTraining(SimpleLogger.CLoggerBase):
def __init__(self, general_config : dict, config_regresion : dict,
config_clasificacion : dict, initial_log_flags : int, comunicador : CMqlComunication):
super().__init__()
# Iniciamos banderas
self.AddLogFlags(initial_log_flags)
# Comunicador
self.m_comunicador : CMqlComunication = comunicador
self.m_comunicador.AddLogFlags(self.LogFlags())
# Features init
self.m_features_pred : list[str] = []
self.m_features_tp : list[str] = []
self.m_features_sl : list[str] = []
# Rutas relativas (constantes, nunca cambian)
self.m_features_pred_relativo : str = general_config.get('features_pred_file', '')
self.m_features_tp_relativo : str = general_config.get('features_tp_file', '')
self.m_features_sl_relativo : str = general_config.get('features_sl_file', '')
# Nombres de archvios
self.m_filename_idx : str = general_config.get("file_name_idx", "")
self.m_filename_features_ptr : str = general_config.get("file_name_features_ptr", "")
# filenames
self.m_path_features_pred : str = ""
self.m_path_features_tp : str = ""
self.m_path_features_sl : str = ""
# init
self.m_current_timeframe_folder_idx : int = -1
self.m_current_strategy_folder_idx : int = -1
# Seteamos config
self.m_config_regresion : dict = deepcopy(config_regresion)
self.m_config_clasificacion : dict = deepcopy(config_clasificacion)
# Ruta de la carpeta base para entrenar modelos (normalmente nombre de un simbolo)
self.m_ruta_entrenmiento : str = general_config.get('path')
self.m_simbolo = Path(self.m_ruta_entrenmiento).name
# file_data
self.m_archivo_desc : Path = Path(os.path.join(self.m_ruta_entrenmiento,"temp.txt"))
if self.m_archivo_desc.exists():
self.LogInfo("Archivo init existe temp.txt")
cadena : str = self.m_archivo_desc.read_text('utf-16-le').lstrip('\ufeff').strip()
cadenas : list[str] = cadena.split("_")
# solo si el tamaño es de 2 asingamos
if len(cadenas) == 2:
self.m_current_timeframe_folder_idx = int(cadenas[0])
self.m_current_strategy_folder_idx = int(cadenas[1])
self.LogInfo(f"Empezando desde timeframe folder index = {self.m_current_timeframe_folder_idx} y strategy index = {self.m_current_strategy_folder_idx}")
else:
self.LogInfo("Archvio temp.txt de init no existe empzando desde inicio")
# ahora listamos todos los folders
self.m_folders : list[str] = []
# obteemos todos lo foldes del folder simbolo
for f in os.listdir(self.m_ruta_entrenmiento):
full_path : str = os.path.join(self.m_ruta_entrenmiento, f)
if(os.path.isdir(full_path)):
self.m_folders.append(full_path)
# check
if(len(self.m_folders) < 1):
self.LogError("El folder no tiene elementos")
return
# ordenamos
self.m_folders = natsorted(self.m_folders)
# imprimimos los folders
if(self.IsInfoLogEnabled()):
self.FastLog(inspect.currentframe().f_code.co_name,SimpleLogger.CLoggerBase.INFO_TEXT, f"Folders encontrados en folder con simbolo {self.m_simbolo}: ")
print(self.m_folders)
#+--------------------------------------------------------------------------+
#| Leer archivo de features, simple read y split de comma |
#+--------------------------------------------------------------------------+
def _LeerFeaturesFile(self, path: str) -> list[str] | None:
try:
contenido = Path(path).read_text(encoding='utf-16-le').lstrip('\ufeff').strip()
features = [f.strip() for f in contenido.split(',')]
return features if features else None
except Exception as e:
self.LogError(f"Fallo leer features: {path}{e}")
return None
#+--------------------------------------------------------------------------+
#| Intenta sobreescribir cada array si encuentra el archivo en este nivel |
#+--------------------------------------------------------------------------+
def _IntentarCargarFeatures(self, carpeta_nivel: str) -> None:
rutas : dict = {
'pred': os.path.join(carpeta_nivel, self.m_features_pred_relativo),
'tp': os.path.join(carpeta_nivel, self.m_features_tp_relativo),
'sl': os.path.join(carpeta_nivel, self.m_features_sl_relativo),
}
# Iteracion por todo el dict
# La idea es ir viendo si se existe el archivo relativo y si existe sobreecribir el valor
# de la varialbe miembro correspondiente
for tipo, path in rutas.items():
if os.path.exists(path):
features = self._LeerFeaturesFile(path)
if features is not None:
setattr(self, f"m_path_features_{tipo}", path)
setattr(self, f"m_features_{tipo}", features)
self.LogInfo(f"Override features [{tipo}] desde: {path}")
#+--------------------------------------------------------------------------+
#| Check antes del entremiento de cada modelo (por ahora solo features) |
#+--------------------------------------------------------------------------+
def _CheckAntesEntrenamiento(self) -> None:
if (len(self.m_features_sl) < 1 or
len(self.m_features_pred) < 1 or
len(self.m_features_tp) < 1):
self.LogFatalError("No se han logrado cargar las features para el entremiento")
Funciones.Remover(1) # Salimos fallo fatal
#+------------------------------------------------------------------+
#| Funcion base para procesar un string array |
#| La idea es obtener los indices del modelo (feautres elegidas) |
#+------------------------------------------------------------------+
def _ProcesarStringArray(self, array_features : list[str], array_base_feautres : list[str]) -> list[int]:
idx_arr : list[int] = []
# Iteracion
for feature_str in array_features:
# declracion inciial
idx : int = -1
# tratamos de obtener su indice
try:
idx = array_base_feautres.index(feature_str)
except ValueError:
if(feature_str != " tipo de operacion"):
self.LogCriticalError(f"Error al obtener indice de features, features '{feature_str}' invalida")
self.LogInfo(f' Base features:\n{Funciones.array_to_string(array_base_feautres,"|","[","]")}')
self.LogInfo(f' Selected features:\n{Funciones.array_to_string(array_features,"|","[","]")}')
Funciones.Remover(1)
return None
if(idx == -1):
self.LogInfo(f"Omitiendo features {feature_str} dado que no se encontro en indice, puede ser normal si noe s tipo de operacion")
continue
else:
idx_arr.append(idx) # encontramos el indice de la feautres
return idx_arr # retonramos el array con los idncies encontrados
#+------------------------------------------------------------------+
#| Funcion base para procesar un string array |
#| La idea es obtener los indices del modelo (feautres elegidas) |
#+------------------------------------------------------------------+
def _ProcesarTimeframeFolder(self, path_folder_timeframe: str) -> bool:
# Intentemoa cargar la confix de feautures de este nivel
self._IntentarCargarFeatures(path_folder_timeframe)
# Iteramos sobre todos los folsers [Estrategias] de este nivel
folders : list[str] = self._GetSortedSubFolders(path_folder_timeframe)
for index, folder_final in enumerate(folders):
if index <= self.m_current_strategy_folder_idx:
continue
if not self._ProcesarStrategyFolder(index, folder_final):
return False
return True
#+----------------------------------------------------------------------------+
#| Funcion para procesar un folder de estratega (esta si tiene los modelos) |
#+----------------------------------------------------------------------------+
def _ProcesarStrategyFolder(self, index: int, folder: str) -> bool:
# Intentamos cargar las features que pueda exisitir en este nivel
self._IntentarCargarFeatures(folder)
# Check de features
self._CheckAntesEntrenamiento()
# Info
self.LogInfo(f"Procesando folder: {folder}")
contenido_idx : str = ""
# Clasificacion
idx = self._EntrenarClasificacion(folder)
if idx is None:
return False
contenido_idx += self._FormatIdx("modelo_pred_idx", idx)
# Regresion TP
idx = self._EntrenarRegresion(folder, "tp")
if idx is None:
return False
contenido_idx += self._FormatIdx("modelo_pred_tp_idx", idx)
# Regresión SL
idx = self._EntrenarRegresion(folder, "sl")
if idx is None:
return False
contenido_idx += self._FormatIdx("modelo_pred_sl_idx", idx)
# Escritura y fix
if not self._EscribirIdx(folder, contenido_idx):
return False
if not self._EscriirFileFeaturesPtr(folder):
return False
self._FixOnnxModelos(folder)
self._GuardarCheckpoint(index)
return True
#+----------------------------------------------------------------------------+
#| Funcion para entnrenar a los modelos de clasificacion |
#+----------------------------------------------------------------------------+
def _EntrenarClasificacion(self, folder: str) -> list[int] | None:
self.m_config_clasificacion['csv_file'] = os.path.join(folder, self.m_config_clasificacion["data_csv_file"])
self.m_config_clasificacion['output_folder'] = folder
# configracuinl del modelo y propagacion de flags
modelo = CModelTrainer(self.m_config_clasificacion)
modelo.AddLogFlags(self.LogFlags())
if not modelo.Execute():
self.LogCriticalError(f"Fallo clasificacion, data = : {self.m_config_clasificacion['csv_file']}")
return None
else:
self.m_comunicador.SendClasificacion(folder, modelo.GetMetrics())
return self._ProcesarStringArray(modelo.GetSelectedFeatures(), self.m_features_pred)
#+----------------------------------------------------------------------------+
#| Funcion para entnrenar a los modelos de regresion |
#+----------------------------------------------------------------------------+
def _EntrenarRegresion(self, folder: str, tipo: str) -> list[int] | None:
# tipo = "tp" o "sl"
# configuracion inicial del modelo de regresion
self.m_config_regresion['csv_file'] = os.path.join(folder, self.m_config_regresion[f"data_csv_file_{tipo}"])
self.m_config_regresion['output_folder'] = folder
self.m_config_regresion['model_name'] = self.m_config_regresion[f'model_name_{tipo}']
# Creamos el modelo y propagamos sus flags
modelo = CModelTrainerRegression(self.m_config_regresion)
modelo.AddLogFlags(self.LogFlags())
if not modelo.Execute():
self.LogCriticalError(f"Fallo regresion {tipo}, archivo = : {self.m_config_regresion['csv_file']}")
return None
else:
self.m_comunicador.SendRegresion(folder, modelo.GetMetrics())
return self._ProcesarStringArray(modelo.GetSelectedFeatures(), getattr(self, f"m_features_{tipo}"))
#+----------------------------------------------------------------------------+
#| Funcion para arreglar la salida ouptu de los modelos para hacerlo 1 |
#+----------------------------------------------------------------------------+
def _FixOnnxModelos(self, folder: str) -> None:
for tipo in ["tp", "sl"]:
nombre = self.m_config_regresion[f'model_name_{tipo}']
src = os.path.join(folder, f"{nombre}.onnx")
dest = os.path.join(folder, f"{nombre}_f.onnx")
fix_onnx_output_shape(src, dest)
#+----------------------------------------------------------------------------+
#| Escribir todos los indices de feaures en el archivo idx luego del training |
#+----------------------------------------------------------------------------+
def _EscribirIdx(self, folder: str, contenido: str) -> bool:
# Indices de modelos
path : Path = Path(os.path.join(folder, self.m_filename_idx))
try:
path.write_text(contenido, encoding='utf-16-le')
return True
except Exception as e:
self.LogError(f"Fallo escribir en {path.name}, err: {e}")
return False
#+----------------------------------------------------------------------------------------------+
#| Escribir el archivo pointer que da las rutas de donde se ubican los archivos de features |
#+----------------------------------------------------------------------------------------------+
def _EscriirFileFeaturesPtr(self, folder : str) -> bool:
# Path
path : Path = Path(os.path.join(folder, self.m_filename_features_ptr))
data : str = ""
# contenido
data += self.m_path_features_pred + "\n"
data += self.m_path_features_sl + "\n"
data += self.m_path_features_tp + "\n"
# intentamos la escritura
try:
path.write_text(data, encoding='utf-16-le')
return True
except Exception as e:
self.LogError(f"Fallo escribir en {path.name}, err: {e}")
return False
#+----------------------------------------------------------------------------+
#| Guardar el progreso |
#+----------------------------------------------------------------------------+
def _GuardarCheckpoint(self, index: int) -> None:
self.m_current_strategy_folder_idx = index
cadena = f"{self.m_current_timeframe_folder_idx}_{index}"
self.m_archivo_desc.write_text(cadena, encoding='utf-16-le')
#+----------------------------------------------------------------------------+
#| Format idx para obtener una cadena |
#+----------------------------------------------------------------------------+
def _FormatIdx(self, nombre: str, indices: list[int]) -> str:
return nombre + ": " + Funciones.array_to_string(indices, ",", "[", "]") + "\n"
# Sort de los folders
def _GetSortedSubFolders(self, path: str) -> list[str]:
folders = [
os.path.join(path, f)
for f in os.listdir(path)
if os.path.isdir(os.path.join(path, f))
]
return natsorted(folders)
#+---------------------------------------------------------------------+
#| Esta funcion es la principal aqui iteramos sobre todos los folders |
#| Timeframe y llamaamos a ProcesarTimeframeFolder |
#| Para que procese todos los modelos |
#+---------------------------------------------------------------------+
def Execute(self) -> bool:
# Check point inicial
self._GuardarCheckpoint(self.m_current_strategy_folder_idx)
# Intenteamos cargar feautres del root
self._IntentarCargarFeatures(self.m_ruta_entrenmiento)
# Iteracion principal
for index, folder in enumerate(self.m_folders):
# Si el indice es menor al inicio previsto omitmimos esta iteracion
if(index<=self.m_current_timeframe_folder_idx):
continue
# Procesamos archivo
if(not self._ProcesarTimeframeFolder(folder)):
self.LogError(f"Fallo al procesar el folder timeframe:\n{folder}")
Funciones.Remover(1)
return False
else: # Exito al procesar
self.m_current_strategy_folder_idx = -1 # indice de strategy
self.m_current_timeframe_folder_idx = index
self._GuardarCheckpoint(self.m_current_strategy_folder_idx)
# Fin del ckecpoint
self.m_archivo_desc.write_text("finalizado",encoding='utf-16-le')
# Retornamos exito
return True # Exito
def main() -> None:
config_clasificion : dict = {
'target_col': ' salida',
'model_name': 'ModelPred',
'num_features': 25,
'validation_split': 0.2,
'n_trials': 75,
'k_folds': 5,
'random_seed': 42,
'hilos' : 2,
'jobs_optuna' : 12,
'final_hilos' : 20,
'data_csv_file' : "pred.csv",
}
config_regresion : dict = {
'target_col': ' salida',
'model_name_tp': 'ModelTP',
'model_name_sl': 'ModelSL',
'num_features': 25,
'validation_split': 0.2,
'n_trials': 75,
'k_folds': 5,
'random_seed': 42,
'hilos' : 2,
'jobs_optuna' : 12,
'final_hilos' : 20,
'data_csv_file_tp' : "tp_data.csv",
'data_csv_file_sl' : "sl_data.csv",
}
# Configuracion general para las features
# los nombres de los archivos de features son relativos
# json path sera el archivo donde se pondran los resultados finales (protocolo de ocmunicacion)
general_config : dict = {
'path' : 'C:\\Users\\leoxd\\AppData\\Roaming\\MetaQuotes\\Terminal\\Common\\Files\\ICTKiller\\NAS100',
'features_sl_file' : 'Features\\sl.csv',
'features_tp_file' : 'Features\\tp.csv',
'features_pred_file' : 'Features\\pred.csv'
}
ejecutor = CPipelineTraining(general_config,config_regresion,config_clasificion, SimpleLogger.CLoggerBase.LOG_ALL)
if(not ejecutor.Execute()):
print("Fallo al ejeuctar el pipineline")