# Copyright 2025, Niquel Mendoza. # https://www.mql5.com/es/users/nique_372 # trainer_regression.py import sys import os import pandas as pd import numpy as np import matplotlib.pyplot as plt import optuna from catboost import CatBoostRegressor from sklearn.model_selection import KFold, train_test_split from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score, mean_absolute_percentage_error from sklearn.feature_selection import SelectKBest, f_regression #+------------------------------------------------------------------+ #| Configurar path para importaciones | #+------------------------------------------------------------------+ from PyBase.Utils import SimpleLogger, Funciones #+------------------------------------------------------------------+ #| Clase Principal de Entrenamiento para Regresión | #+------------------------------------------------------------------+ class CModelTrainerRegression(SimpleLogger.CLoggerBase): def __init__(self, config): super().__init__() #--- Parámetros de configuración self.m_csv_file = config['csv_file'] self.m_target_col = config['target_col'] self.m_output_folder = config['output_folder'] self.m_model_name = config['model_name'] #--- Parámetros opcionales con defaults self.m_num_features = config.get('num_features', 10) self.m_validation_split = config.get('validation_split', 0.2) self.m_n_trials = config.get('n_trials', 50) self.m_k_folds = config.get('k_folds', 5) self.m_random_seed = config.get('random_seed', 42) #--- Variables de estado self.m_dataframe = None self.m_X = None self.m_Y = None self.m_X_train = None self.m_X_test = None self.m_y_train = None self.m_y_test = None self.m_selected_columns = None self.m_best_params = None self.m_model = None self.m_metrics = {} #--- Crear carpeta de salida si no existe os.makedirs(self.m_output_folder, exist_ok=True) self.LogInfo(f"Entrenador de Regresión inicializado: {self.m_model_name}") self.LogInfo(f"Carpeta de salida: {self.m_output_folder}") #+------------------------------------------------------------------+ #| Carga y Validación de Datos | #+------------------------------------------------------------------+ def LoadData(self): try: self.LogInfo(f"Cargando datos desde: {self.m_csv_file}") #--- Cargar CSV self.m_dataframe = pd.read_csv(self.m_csv_file, encoding="utf-16") self.LogInfo(f"Dataset cargado: {self.m_dataframe.shape[0]} filas, {self.m_dataframe.shape[1]} columnas") #--- Convertir a numérico for column in self.m_dataframe.columns: self.m_dataframe[column] = pd.to_numeric(self.m_dataframe[column], errors='coerce') #--- Verificar NaN e infinitos if self.m_dataframe.isnull().any().any(): self.LogWarning("Dataset contiene NaN, serán eliminados") self.m_dataframe = self.m_dataframe.dropna() if np.isinf(self.m_dataframe.select_dtypes(include=[np.number]).values).any(): self.LogWarning("Dataset contiene infinitos, serán eliminados") self.m_dataframe = self.m_dataframe.replace([np.inf, -np.inf], np.nan) self.m_dataframe = self.m_dataframe.dropna() if self.m_dataframe.empty: self.LogCriticalError("Dataset vacío después de limpieza") Funciones.Remover(1) if self.m_target_col not in self.m_dataframe.columns: self.LogCriticalError(f"Columna objetivo '{self.m_target_col}' no existe") Funciones.Remover(1) self.LogInfo(f"Datos validados correctamente: {self.m_dataframe.shape[0]} muestras") return True except Exception as e: self.LogCriticalError(f"Error al cargar datos: {str(e)}") Funciones.Remover(1) #+------------------------------------------------------------------+ #| Separación de Features y Target | #+------------------------------------------------------------------+ def SeparateData(self): try: self.LogInfo("Separando features y target...") #--- Separar Y (target) - ahora es continuo self.m_Y = self.m_dataframe[self.m_target_col].to_numpy().astype(float) #--- Separar X (features) self.m_X = self.m_dataframe.drop(columns=[self.m_target_col]).to_numpy() #--- Estadísticas del target y_min = np.min(self.m_Y) y_max = np.max(self.m_Y) y_mean = np.mean(self.m_Y) y_std = np.std(self.m_Y) y_median = np.median(self.m_Y) self.LogInfo(f"X shape: {self.m_X.shape}") self.LogInfo(f"Y shape: {self.m_Y.shape}") self.LogInfo("\nEstadísticas del Target:") self.LogInfo(f" Mínimo: {y_min:.6f}") self.LogInfo(f" Máximo: {y_max:.6f}") self.LogInfo(f" Media: {y_mean:.6f}") self.LogInfo(f" Mediana: {y_median:.6f}") self.LogInfo(f" Desv. Estándar: {y_std:.6f}") self.LogInfo(f" Rango: {y_max - y_min:.6f}") return True except Exception as e: self.LogCriticalError(f"Error al separar datos: {str(e)}") Funciones.Remover(1) #+------------------------------------------------------------------+ #| Feature Selection con SelectKBest | #+------------------------------------------------------------------+ def SelectBestFeatures(self): try: self.LogInfo("Aplicando Feature Selection...") original_columns = self.m_dataframe.drop(columns=[self.m_target_col]).columns n_features_original = len(original_columns) self.LogInfo(f"Features originales: {n_features_original}") #--- Verificar si existe TipoOp tipo_op_col = " tipo de operacion" tiene_tipo_op = tipo_op_col in original_columns if not tiene_tipo_op: self.LogWarning(f"Columna '{tipo_op_col}' no encontrada en el dataset") self.LogInfo("Procediendo sin forzar columna TipoOp") #--- Sin TipoOp: aplicar SelectKBest normal n_features = min(self.m_num_features, len(original_columns)) if n_features != self.m_num_features: self.LogWarning(f"Número de features ajustado de {self.m_num_features} a {n_features}") #--- Aplicar SelectKBest con f_regression selector = SelectKBest(score_func=f_regression, k=n_features) self.m_X = selector.fit_transform(self.m_X, self.m_Y) #--- Obtener nombres de columnas seleccionadas selected_indices = selector.get_support(indices=True) self.m_selected_columns = original_columns[selected_indices].tolist() feature_scores = selector.scores_[selected_indices] else: #--- Con TipoOp: extraer y agregar al final tipo_op_index = original_columns.get_loc(tipo_op_col) tipo_op_data = self.m_X[:, tipo_op_index].reshape(-1, 1) #--- Remover TipoOp temporalmente para SelectKBest X_sin_tipo = np.delete(self.m_X, tipo_op_index, axis=1) cols_sin_tipo = original_columns.drop(tipo_op_col) #--- Ajustar número de features (sin contar TipoOp) n_features = min(self.m_num_features, len(cols_sin_tipo)) if n_features != self.m_num_features: self.LogWarning(f"Número de features ajustado de {self.m_num_features} a {n_features}") #--- Aplicar SelectKBest sin TipoOp (usando f_regression) selector = SelectKBest(score_func=f_regression, k=n_features) X_selected = selector.fit_transform(X_sin_tipo, self.m_Y) #--- Agregar TipoOp al final self.m_X = np.column_stack([X_selected, tipo_op_data]) #--- Obtener nombres de columnas seleccionadas selected_indices = selector.get_support(indices=True) self.m_selected_columns = cols_sin_tipo[selected_indices].tolist() self.m_selected_columns.append(tipo_op_col) feature_scores = selector.scores_[selected_indices] self.LogInfo(f"Features seleccionadas: {len(self.m_selected_columns)}") for i, col in enumerate(self.m_selected_columns[:-1] if tiene_tipo_op else self.m_selected_columns, 1): score = feature_scores[i-1] if i-1 < len(feature_scores) else 0 self.LogInfo(f" {i:2d}. {col} (score: {score:.4f})") if tiene_tipo_op: self.LogInfo(f" {len(self.m_selected_columns):2d}. {tipo_op_col} (FORZADO)") return True except Exception as e: self.LogCriticalError(f"Error en Feature Selection: {str(e)}") Funciones.Remover(1) #+------------------------------------------------------------------+ #| Split Train/Test | #+------------------------------------------------------------------+ def SplitTrainTest(self): try: self.LogInfo(f"Dividiendo datos (validation_split={self.m_validation_split})...") self.m_X_train, self.m_X_test, self.m_y_train, self.m_y_test = train_test_split( self.m_X, self.m_Y, test_size=self.m_validation_split, random_state=self.m_random_seed, shuffle=True ) self.LogInfo(f"Train: {self.m_X_train.shape[0]} muestras") self.LogInfo(f"Test: {self.m_X_test.shape[0]} muestras") #--- Estadísticas de train y test self.LogInfo(f"\nTrain - Media: {np.mean(self.m_y_train):.6f}, Std: {np.std(self.m_y_train):.6f}") self.LogInfo(f"Test - Media: {np.mean(self.m_y_test):.6f}, Std: {np.std(self.m_y_test):.6f}") return True except Exception as e: self.LogCriticalError(f"Error al dividir datos: {str(e)}") Funciones.Remover(1) #+------------------------------------------------------------------+ #| Optimización con Optuna para Regresión | #+------------------------------------------------------------------+ def ObjectiveOptuna(self, trial): params = { 'iterations': trial.suggest_int('iterations', 100, 1000, step=100), 'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3, log=True), 'depth': trial.suggest_int('depth', 3, 10), 'l2_leaf_reg': trial.suggest_float('l2_leaf_reg', 1e-8, 10.0, log=True), 'border_count': trial.suggest_categorical('border_count', [32, 64, 128, 255]), 'bagging_temperature': trial.suggest_float('bagging_temperature', 0.0, 1.0), 'random_strength': trial.suggest_float('random_strength', 1e-9, 10.0, log=True), 'subsample': trial.suggest_float('subsample', 0.5, 1.0), 'loss_function': 'RMSE', # Función de pérdida para regresión 'random_seed': self.m_random_seed, 'verbose': False, 'allow_writing_files': False } kf = KFold(n_splits=self.m_k_folds, shuffle=True, random_state=self.m_random_seed) scores = [] for train_idx, val_idx in kf.split(self.m_X_train): X_fold_train, X_fold_val = self.m_X_train[train_idx], self.m_X_train[val_idx] y_fold_train, y_fold_val = self.m_y_train[train_idx], self.m_y_train[val_idx] try: model = CatBoostRegressor(**params) model.fit(X_fold_train, y_fold_train) y_pred = model.predict(X_fold_val) rmse = np.sqrt(mean_squared_error(y_fold_val, y_pred)) scores.append(rmse) except Exception: return float('inf') return np.mean(scores) def OptimizeHyperparameters(self): try: self.LogInfo(f"Optimizando hiperparámetros ({self.m_n_trials} trials, {self.m_k_folds}-fold CV)...") study = optuna.create_study( direction='minimize', # Minimizar RMSE sampler=optuna.samplers.TPESampler(seed=self.m_random_seed) ) study.optimize(self.ObjectiveOptuna, n_trials=self.m_n_trials, show_progress_bar=True) self.m_best_params = study.best_params self.LogInfo(f"Mejor RMSE: {study.best_value:.6f}") self.LogInfo("Mejores parámetros:") for key, value in self.m_best_params.items(): self.LogInfo(f" {key}: {value}") return True except Exception as e: self.LogCriticalError(f"Error en optimización: {str(e)}") Funciones.Remover(1) #+------------------------------------------------------------------+ #| Entrenamiento Final | #+------------------------------------------------------------------+ def TrainFinalModel(self): try: self.LogInfo("Entrenando modelo final...") final_params = self.m_best_params.copy() final_params.update({ 'random_seed': self.m_random_seed, 'verbose': False, 'allow_writing_files': False, 'loss_function': 'RMSE', 'eval_metric': 'RMSE' }) self.m_model = CatBoostRegressor(**final_params) self.m_model.fit( self.m_X_train, self.m_y_train, eval_set=(self.m_X_test, self.m_y_test), early_stopping_rounds=50, verbose=False, use_best_model=True ) self.LogInfo(f"Modelo entrenado. Mejor iteración: {self.m_model.best_iteration_}") return True except Exception as e: self.LogCriticalError(f"Error al entrenar modelo: {str(e)}") Funciones.Remover(1) #+------------------------------------------------------------------+ #| Evaluación y Métricas para Regresión | #+------------------------------------------------------------------+ def EvaluateModel(self): try: self.LogInfo("Evaluando modelo...") #--- Predicciones y_pred_train = self.m_model.predict(self.m_X_train) y_pred_test = self.m_model.predict(self.m_X_test) #--- Métricas de regresión # Train rmse_train = np.sqrt(mean_squared_error(self.m_y_train, y_pred_train)) mae_train = mean_absolute_error(self.m_y_train, y_pred_train) r2_train = r2_score(self.m_y_train, y_pred_train) # Test rmse_test = np.sqrt(mean_squared_error(self.m_y_test, y_pred_test)) mae_test = mean_absolute_error(self.m_y_test, y_pred_test) r2_test = r2_score(self.m_y_test, y_pred_test) # MAPE (Mean Absolute Percentage Error) - solo si no hay valores cero try: mape_test = mean_absolute_percentage_error(self.m_y_test, y_pred_test) * 100 except: mape_test = None #--- Guardar métricas self.m_metrics = { 'rmse_train': rmse_train, 'mae_train': mae_train, 'r2_train': r2_train, 'rmse_test': rmse_test, 'mae_test': mae_test, 'r2_test': r2_test, 'mape_test': mape_test, 'y_pred_train': y_pred_train, 'y_pred_test': y_pred_test } #--- Mostrar métricas self.LogInfo("\n" + "="*60) self.LogInfo("MÉTRICAS DE ENTRENAMIENTO:") self.LogInfo("="*60) self.LogInfo(f" RMSE Train: {rmse_train:.6f}") self.LogInfo(f" MAE Train: {mae_train:.6f}") self.LogInfo(f" R² Train: {r2_train:.6f}") self.LogInfo("\n" + "="*60) self.LogInfo("MÉTRICAS DE TEST:") self.LogInfo("="*60) self.LogInfo(f" RMSE Test: {rmse_test:.6f}") self.LogInfo(f" MAE Test: {mae_test:.6f}") self.LogInfo(f" R² Test: {r2_test:.6f}") if mape_test is not None: self.LogInfo(f" MAPE Test: {mape_test:.2f}%") #--- Análisis de errores errors_test = self.m_y_test - y_pred_test self.LogInfo("\nANÁLISIS DE ERRORES (Test):") self.LogInfo(f" Error medio: {np.mean(errors_test):.6f}") self.LogInfo(f" Error std: {np.std(errors_test):.6f}") self.LogInfo(f" Error mínimo: {np.min(errors_test):.6f}") self.LogInfo(f" Error máximo: {np.max(errors_test):.6f}") return True except Exception as e: self.LogCriticalError(f"Error al evaluar modelo: {str(e)}") Funciones.Remover(1) #+------------------------------------------------------------------+ #| Generación de Gráficos para Regresión | #+------------------------------------------------------------------+ def PlotResults(self): try: self.LogInfo("Generando gráficos...") y_pred_all = self.m_model.predict(self.m_X) y_pred_test = self.m_metrics['y_pred_test'] plt.figure(figsize=(16, 12)) #=== GRÁFICO 1: Real vs Predicho (Test) === plt.subplot(3, 3, 1) plt.scatter(self.m_y_test, y_pred_test, alpha=0.5, s=20) # Línea de identidad perfecta min_val = min(np.min(self.m_y_test), np.min(y_pred_test)) max_val = max(np.max(self.m_y_test), np.max(y_pred_test)) plt.plot([min_val, max_val], [min_val, max_val], 'r--', linewidth=2, label='Predicción Perfecta') plt.xlabel('Valor Real', fontsize=10) plt.ylabel('Valor Predicho', fontsize=10) plt.title('Real vs Predicho (Test)', fontsize=12, fontweight='bold') plt.legend(fontsize=9) plt.grid(True, alpha=0.3) #=== GRÁFICO 2: Distribución de Errores === plt.subplot(3, 3, 2) errors = self.m_y_test - y_pred_test plt.hist(errors, bins=50, edgecolor='black', alpha=0.7, color='skyblue') plt.axvline(x=0, color='red', linestyle='--', linewidth=2, label='Error = 0') plt.axvline(x=np.mean(errors), color='green', linestyle='--', linewidth=2, label=f'Media = {np.mean(errors):.4f}') plt.xlabel('Error (Real - Predicho)', fontsize=10) plt.ylabel('Frecuencia', fontsize=10) plt.title('Distribución de Errores', fontsize=12, fontweight='bold') plt.legend(fontsize=9) plt.grid(True, alpha=0.3) #=== GRÁFICO 3: Residuos vs Predicciones === plt.subplot(3, 3, 3) plt.scatter(y_pred_test, errors, alpha=0.5, s=20) plt.axhline(y=0, color='red', linestyle='--', linewidth=2) plt.xlabel('Valor Predicho', fontsize=10) plt.ylabel('Residuo', fontsize=10) plt.title('Residuos vs Predicciones', fontsize=12, fontweight='bold') plt.grid(True, alpha=0.3) #=== GRÁFICO 4: Series Temporales (si aplicable) === plt.subplot(3, 3, 4) indices = np.arange(len(self.m_Y)) plt.plot(indices, self.m_Y, label='Real', color='blue', alpha=0.7, linewidth=1.5) plt.plot(indices, y_pred_all, label='Predicho', color='red', alpha=0.7, linewidth=1.5) plt.xlabel('Índice de Muestra', fontsize=10) plt.ylabel('Valor', fontsize=10) plt.title('Comparación Real vs Predicho (Todo el Dataset)', fontsize=12, fontweight='bold') plt.legend(fontsize=9) plt.grid(True, alpha=0.3) #=== GRÁFICO 5: Feature Importance === plt.subplot(3, 3, 5) feature_importance = self.m_model.get_feature_importance() top_n = min(15, len(feature_importance)) top_indices = np.argsort(feature_importance)[-top_n:] top_features = [self.m_selected_columns[i] for i in top_indices] top_scores = feature_importance[top_indices] plt.barh(range(top_n), top_scores, color='skyblue') plt.yticks(range(top_n), top_features, fontsize=8) plt.xlabel('Importancia', fontsize=10) plt.title(f'Top {top_n} Features más Importantes', fontsize=12, fontweight='bold') plt.grid(True, alpha=0.3, axis='x') #=== GRÁFICO 6: Q-Q Plot (Normalidad de Residuos) === plt.subplot(3, 3, 6) from scipy import stats stats.probplot(errors, dist="norm", plot=plt) plt.title('Q-Q Plot (Normalidad de Residuos)', fontsize=12, fontweight='bold') plt.grid(True, alpha=0.3) #=== GRÁFICO 7: Métricas Resumen === plt.subplot(3, 3, 7) metrics_data = { 'RMSE\nTest': self.m_metrics['rmse_test'], 'MAE\nTest': self.m_metrics['mae_test'], 'R²\nTest': self.m_metrics['r2_test'] } colors = ['#d62728', '#ff7f0e', '#2ca02c'] bars = plt.bar(metrics_data.keys(), metrics_data.values(), color=colors, alpha=0.7) for bar, (key, value) in zip(bars, metrics_data.items()): height = bar.get_height() plt.text(bar.get_x() + bar.get_width()/2., height, f'{value:.4f}', ha='center', va='bottom', fontsize=10, fontweight='bold') plt.title('Resumen de Métricas', fontsize=12, fontweight='bold') plt.ylabel('Valor', fontsize=10) plt.grid(True, alpha=0.3, axis='y') #=== GRÁFICO 8: Errores Absolutos vs Índice === plt.subplot(3, 3, 8) abs_errors = np.abs(errors) plt.plot(abs_errors, color='red', alpha=0.7, linewidth=1) plt.axhline(y=np.mean(abs_errors), color='blue', linestyle='--', linewidth=2, label=f'MAE = {np.mean(abs_errors):.4f}') plt.xlabel('Índice de Test', fontsize=10) plt.ylabel('Error Absoluto', fontsize=10) plt.title('Errores Absolutos', fontsize=12, fontweight='bold') plt.legend(fontsize=9) plt.grid(True, alpha=0.3) #=== GRÁFICO 9: Distribución Real vs Predicho === plt.subplot(3, 3, 9) plt.hist(self.m_y_test, bins=30, alpha=0.6, label='Real', color='blue', edgecolor='black') plt.hist(y_pred_test, bins=30, alpha=0.6, label='Predicho', color='red', edgecolor='black') plt.xlabel('Valor', fontsize=10) plt.ylabel('Frecuencia', fontsize=10) plt.title('Distribución de Valores', fontsize=12, fontweight='bold') plt.legend(fontsize=9) plt.grid(True, alpha=0.3) plt.tight_layout() #--- Guardar gráfico plot_path = os.path.join(self.m_output_folder, f"{self.m_model_name}_metrics.png") plt.savefig(plot_path, dpi=120, bbox_inches='tight') plt.close() self.LogInfo(f"Gráfico guardado: {plot_path}") return True except Exception as e: self.LogCriticalError(f"Error al generar gráficos: {str(e)}") Funciones.Remover(1) #+------------------------------------------------------------------+ #| Exportación a ONNX | #+------------------------------------------------------------------+ def ExportToONNX(self): try: self.LogInfo("Exportando modelo a ONNX...") onnx_path = os.path.join(self.m_output_folder, f"{self.m_model_name}.onnx") self.m_model.save_model(onnx_path, format="onnx") self.LogInfo(f"Modelo exportado: {onnx_path}") import onnx onnx_model = onnx.load(onnx_path) onnx.checker.check_model(onnx_model) self.LogInfo("Validación ONNX completada correctamente") return True except Exception as e: self.LogError(f"Error al exportar a ONNX: {str(e)}") return False #+------------------------------------------------------------------+ #| Ejecución Principal | #+------------------------------------------------------------------+ def Execute(self): try: self.LogInfo("="*60) self.LogInfo(f"INICIANDO ENTRENAMIENTO DE REGRESIÓN: {self.m_model_name}") self.LogInfo("="*60) if not self.LoadData(): return False if not self.SeparateData(): return False if not self.SelectBestFeatures(): return False if not self.SplitTrainTest(): return False if not self.OptimizeHyperparameters(): return False if not self.TrainFinalModel(): return False if not self.EvaluateModel(): return False if not self.PlotResults(): return False if not self.ExportToONNX(): self.LogWarning("Exportación a ONNX falló, pero el modelo fue entrenado") self.LogInfo("="*60) self.LogInfo("ENTRENAMIENTO COMPLETADO EXITOSAMENTE") self.LogInfo("="*60) self.LogInfo("\nResumen final:") self.LogInfo(f" Modelo: {self.m_model_name}") self.LogInfo(f" Features utilizadas: {len(self.m_selected_columns)}") self.LogInfo(f" RMSE Test: {self.m_metrics['rmse_test']:.6f}") self.LogInfo(f" MAE Test: {self.m_metrics['mae_test']:.6f}") self.LogInfo(f" R² Test: {self.m_metrics['r2_test']:.6f}") if self.m_metrics['mape_test'] is not None: self.LogInfo(f" MAPE Test: {self.m_metrics['mape_test']:.2f}%") return True except Exception as e: self.LogCriticalError(f"Error en Execute: {str(e)}") import traceback traceback.print_exc() return False #+------------------------------------------------------------------+ #| Getters | #+------------------------------------------------------------------+ def GetMetrics(self): return self.m_metrics def GetSelectedFeatures(self): return self.m_selected_columns def GetBestParams(self): return self.m_best_params def GetModel(self): return self.m_model #+------------------------------------------------------------------+ #| Función Main de Ejemplo | #+------------------------------------------------------------------+ def main(): config = { 'csv_file': 'C:\\Users\\leoxd\\AppData\\Roaming\\MetaQuotes\\Terminal\\Common\\Files\\EasySbAi\\data_tp.csv', 'target_col': ' salida', 'output_folder': 'C:\\Users\\leoxd\\AppData\\Roaming\\MetaQuotes\\Terminal\\Common\\Files\\EasySbAi', 'model_name': 'XAUUSD-M5-Model', 'num_features': 25, 'validation_split': 0.2, 'n_trials': 75, 'k_folds': 5, 'random_seed': 42 } trainer = CModelTrainerRegression(config) trainer.EnableAllLogs() success = trainer.Execute() if success: print("\n Entrenamiento completado con éxito") print(f" Métricas: {trainer.GetMetrics()}") print(f" Features: {trainer.GetSelectedFeatures()}") else: print("\n Entrenamiento falló") if __name__ == "__main__": main()