# Copyright 2025, Niquel Mendoza. # https://www.mql5.com/es/users/nique_372 # trainer.py import sys import os import pandas as pd import numpy as np import matplotlib.pyplot as plt import optuna from catboost import CatBoostClassifier from sklearn.model_selection import KFold, train_test_split from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, classification_report from sklearn.feature_selection import SelectKBest, f_classif #+------------------------------------------------------------------+ #| Configurar path para importaciones | #+------------------------------------------------------------------+ root = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) sys.path.insert(0, root) from PyBase.Utils import SimpleLogger, Funciones #+------------------------------------------------------------------+ #| Clase Principal de Entrenamiento | #+------------------------------------------------------------------+ class CModelTrainer(SimpleLogger.CLoggerBase): def __init__(self, config): super().__init__() #--- Parámetros de configuración self.m_csv_file = config['csv_file'] self.m_target_col = config['target_col'] self.m_output_folder = config['output_folder'] self.m_model_name = config['model_name'] #--- Parámetros opcionales con defaults self.m_num_features = config.get('num_features', 10) self.m_validation_split = config.get('validation_split', 0.2) self.m_n_trials = config.get('n_trials', 50) self.m_k_folds = config.get('k_folds', 5) self.m_random_seed = config.get('random_seed', 42) #--- Variables de estado self.m_dataframe = None self.m_X = None self.m_Y = None self.m_X_train = None self.m_X_test = None self.m_y_train = None self.m_y_test = None self.m_selected_columns = None self.m_best_params = None self.m_model = None self.m_metrics = {} #--- Crear carpeta de salida si no existe os.makedirs(self.m_output_folder, exist_ok=True) self.LogInfo(f"Entrenador inicializado: {self.m_model_name}") self.LogInfo(f"Carpeta de salida: {self.m_output_folder}") #+------------------------------------------------------------------+ #| Carga y Validación de Datos | #+------------------------------------------------------------------+ def LoadData(self): try: self.LogInfo(f"Cargando datos desde: {self.m_csv_file}") #--- Cargar CSV self.m_dataframe = pd.read_csv(self.m_csv_file, encoding="utf-16") self.LogInfo(f"Dataset cargado: {self.m_dataframe.shape[0]} filas, {self.m_dataframe.shape[1]} columnas") #--- Convertir a numérico for column in self.m_dataframe.columns: self.m_dataframe[column] = pd.to_numeric(self.m_dataframe[column], errors='coerce') #--- Verificar NaN e infinitos if self.m_dataframe.isnull().any().any(): self.LogWarning("Dataset contiene NaN, serán eliminados") self.m_dataframe = self.m_dataframe.dropna() if np.isinf(self.m_dataframe.select_dtypes(include=[np.number]).values).any(): self.LogWarning("Dataset contiene infinitos, serán eliminados") self.m_dataframe = self.m_dataframe.replace([np.inf, -np.inf], np.nan) self.m_dataframe = self.m_dataframe.dropna() if self.m_dataframe.empty: self.LogCriticalError("Dataset vacío después de limpieza") Funciones.Remover(1) if self.m_target_col not in self.m_dataframe.columns: self.LogCriticalError(f"Columna objetivo '{self.m_target_col}' no existe") Funciones.Remover(1) self.LogInfo(f"Datos validados correctamente: {self.m_dataframe.shape[0]} muestras") return True except Exception as e: self.LogCriticalError(f"Error al cargar datos: {str(e)}") Funciones.Remover(1) #+------------------------------------------------------------------+ #| Separación de Features y Target | #+------------------------------------------------------------------+ def SeparateData(self): try: self.LogInfo("Separando features y target...") #--- Separar Y (target) self.m_Y = self.m_dataframe[self.m_target_col].to_numpy().astype(int) #--- Separar X (features) self.m_X = self.m_dataframe.drop(columns=[self.m_target_col]).to_numpy() #--- Verificar que solo haya clases 0 y 1 unique_classes = np.unique(self.m_Y) if not np.array_equal(unique_classes, np.array([0, 1])): self.LogWarning(f"Clases detectadas: {unique_classes}. Se esperaba [0, 1]") #--- Verificar distribución de clases unique, counts = np.unique(self.m_Y, return_counts=True) class_distribution = dict(zip(unique, counts)) self.LogInfo(f"X shape: {self.m_X.shape}") self.LogInfo(f"Y shape: {self.m_Y.shape}") self.LogInfo(f"Distribución de clases: {class_distribution}") total_samples = len(self.m_Y) for cls, count in class_distribution.items(): percentage = (count / total_samples) * 100 class_name = "OPERAR" if cls == 1 else "NO OPERAR" self.LogInfo(f" Clase {cls} ({class_name}): {count} ({percentage:.2f}%)") return True except Exception as e: self.LogCriticalError(f"Error al separar datos: {str(e)}") Funciones.Remover(1) #+------------------------------------------------------------------+ #| Feature Selection con SelectKBest | #+------------------------------------------------------------------+ def SelectBestFeatures(self): try: self.LogInfo("Aplicando Feature Selection...") original_columns = self.m_dataframe.drop(columns=[self.m_target_col]).columns n_features_original = len(original_columns) self.LogInfo(f"Features originales: {n_features_original}") #--- Verificar si existe TipoOp tipo_op_col = " tipo de operacion" tiene_tipo_op = tipo_op_col in original_columns if not tiene_tipo_op: self.LogCriticalError(f"Columna '{tipo_op_col}' no encontrada en el dataset") print(original_columns) Funciones.Remover(1) #--- Extraer TipoOp antes de SelectKBest tipo_op_index = original_columns.get_loc(tipo_op_col) tipo_op_data = self.m_X[:, tipo_op_index].reshape(-1, 1) #--- Remover TipoOp temporalmente para SelectKBest X_sin_tipo = np.delete(self.m_X, tipo_op_index, axis=1) cols_sin_tipo = original_columns.drop(tipo_op_col) #--- Ajustar número de features (sin contar TipoOp) n_features = min(self.m_num_features, len(cols_sin_tipo)) if n_features != self.m_num_features: self.LogWarning(f"Número de features ajustado de {self.m_num_features} a {n_features}") #--- Aplicar SelectKBest sin TipoOp selector = SelectKBest(score_func=f_classif, k=n_features) X_selected = selector.fit_transform(X_sin_tipo, self.m_Y) #--- Agregar TipoOp al final self.m_X = np.column_stack([X_selected, tipo_op_data]) #--- Obtener nombres de columnas seleccionadas selected_indices = selector.get_support(indices=True) self.m_selected_columns = cols_sin_tipo[selected_indices].tolist() self.m_selected_columns.append(tipo_op_col) feature_scores = selector.scores_[selected_indices] self.LogInfo(f"Features seleccionadas: {len(self.m_selected_columns)}") for i, (col, score) in enumerate(zip(self.m_selected_columns[:-1], feature_scores), 1): self.LogInfo(f" {i:2d}. {col} (score: {score:.4f})") self.LogInfo(f" {len(self.m_selected_columns):2d}. {tipo_op_col} (FORZADO)") return True except Exception as e: self.LogCriticalError(f"Error en Feature Selection: {str(e)}") Funciones.Remover(1) #+------------------------------------------------------------------+ #| Split Train/Test | #+------------------------------------------------------------------+ def SplitTrainTest(self): try: self.LogInfo(f"Dividiendo datos (validation_split={self.m_validation_split})...") self.m_X_train, self.m_X_test, self.m_y_train, self.m_y_test = train_test_split( self.m_X, self.m_Y, test_size=self.m_validation_split, random_state=self.m_random_seed, stratify=self.m_Y ) self.LogInfo(f"Train: {self.m_X_train.shape[0]} muestras") self.LogInfo(f"Test: {self.m_X_test.shape[0]} muestras") #--- Verificar distribución en train y test train_unique, train_counts = np.unique(self.m_y_train, return_counts=True) test_unique, test_counts = np.unique(self.m_y_test, return_counts=True) train_dist = dict(zip(train_unique, train_counts)) test_dist = dict(zip(test_unique, test_counts)) self.LogInfo(f"Train - {train_dist}") self.LogInfo(f"Test - {test_dist}") return True except Exception as e: self.LogCriticalError(f"Error al dividir datos: {str(e)}") Funciones.Remover(1) #+------------------------------------------------------------------+ #| Optimización con Optuna | #+------------------------------------------------------------------+ def ObjectiveOptuna(self, trial): params = { 'iterations': trial.suggest_int('iterations', 100, 1000, step=100), 'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3, log=True), 'depth': trial.suggest_int('depth', 3, 10), 'l2_leaf_reg': trial.suggest_float('l2_leaf_reg', 1e-8, 10.0, log=True), 'border_count': trial.suggest_categorical('border_count', [32, 64, 128, 255]), 'bagging_temperature': trial.suggest_float('bagging_temperature', 0.0, 1.0), 'random_strength': trial.suggest_float('random_strength', 1e-9, 10.0, log=True), 'subsample': trial.suggest_float('subsample', 0.5, 1.0), 'auto_class_weights': trial.suggest_categorical('auto_class_weights', ['Balanced', 'None']), 'random_seed': self.m_random_seed, 'verbose': False, 'allow_writing_files': False } kf = KFold(n_splits=self.m_k_folds, shuffle=True, random_state=self.m_random_seed) scores = [] for train_idx, val_idx in kf.split(self.m_X_train): X_fold_train, X_fold_val = self.m_X_train[train_idx], self.m_X_train[val_idx] y_fold_train, y_fold_val = self.m_y_train[train_idx], self.m_y_train[val_idx] try: model = CatBoostClassifier(**params) model.fit(X_fold_train, y_fold_train) y_pred = model.predict(X_fold_val) f1 = f1_score(y_fold_val, y_pred, zero_division=0) scores.append(f1) except Exception: return 0.0 return np.mean(scores) def OptimizeHyperparameters(self): try: self.LogInfo(f"Optimizando hiperparámetros ({self.m_n_trials} trials, {self.m_k_folds}-fold CV)...") study = optuna.create_study( direction='maximize', sampler=optuna.samplers.TPESampler(seed=self.m_random_seed) ) study.optimize(self.ObjectiveOptuna, n_trials=self.m_n_trials, show_progress_bar=True) self.m_best_params = study.best_params self.LogInfo(f"Mejor F1-Score: {study.best_value:.6f}") self.LogInfo("Mejores parámetros:") for key, value in self.m_best_params.items(): self.LogInfo(f" {key}: {value}") return True except Exception as e: self.LogCriticalError(f"Error en optimización: {str(e)}") Funciones.Remover(1) #+------------------------------------------------------------------+ #| Entrenamiento Final | #+------------------------------------------------------------------+ def TrainFinalModel(self): try: self.LogInfo("Entrenando modelo final...") final_params = self.m_best_params.copy() final_params.update({ 'random_seed': self.m_random_seed, 'verbose': False, 'allow_writing_files': False, 'eval_metric': 'F1' }) self.m_model = CatBoostClassifier(**final_params) self.m_model.fit( self.m_X_train, self.m_y_train, eval_set=(self.m_X_test, self.m_y_test), early_stopping_rounds=50, verbose=False, use_best_model=True ) self.LogInfo(f"Modelo entrenado. Mejor iteración: {self.m_model.best_iteration_}") return True except Exception as e: self.LogCriticalError(f"Error al entrenar modelo: {str(e)}") Funciones.Remover(1) #+------------------------------------------------------------------+ #| Evaluación y Métricas | #+------------------------------------------------------------------+ def EvaluateModel(self): try: self.LogInfo("Evaluando modelo...") #--- Predicciones y_pred = self.m_model.predict(self.m_X_test) y_pred_proba = self.m_model.predict_proba(self.m_X_test)[:, 1] #--- Métricas básicas accuracy = accuracy_score(self.m_y_test, y_pred) precision = precision_score(self.m_y_test, y_pred, zero_division=0) recall = recall_score(self.m_y_test, y_pred, zero_division=0) f1 = f1_score(self.m_y_test, y_pred, zero_division=0) #--- Matriz de confusión cm = confusion_matrix(self.m_y_test, y_pred) #--- Guardar métricas self.m_metrics = { 'accuracy': accuracy, 'precision': precision, 'recall': recall, 'f1_score': f1, 'confusion_matrix': cm } #--- Mostrar métricas self.LogInfo("Métricas finales:") self.LogInfo(f" Accuracy: {accuracy:.4f}") self.LogInfo(f" Precision: {precision:.4f}") self.LogInfo(f" Recall: {recall:.4f}") self.LogInfo(f" F1-Score: {f1:.4f}") self.LogInfo("\nMatriz de Confusión:") self.LogInfo(f" TN: {cm[0,0]}, FP: {cm[0,1]}") self.LogInfo(f" FN: {cm[1,0]}, TP: {cm[1,1]}") #--- Classification report self.LogInfo("\nReporte de Clasificación:") target_names = ['NO OPERAR (0)', 'OPERAR (1)'] report = classification_report(self.m_y_test, y_pred, target_names=target_names) print(report) return True except Exception as e: self.LogCriticalError(f"Error al evaluar modelo: {str(e)}") Funciones.Remover(1) #+------------------------------------------------------------------+ #| Generación de Gráficos | #+------------------------------------------------------------------+ def PlotResults(self): try: self.LogInfo("Generando gráficos...") y_pred_all = self.m_model.predict(self.m_X) y_pred_proba_all = self.m_model.predict_proba(self.m_X)[:, 1] plt.figure(figsize=(14, 12)) #=== GRÁFICO 1: Predicciones a lo largo del tiempo === plt.subplot(3, 2, 1) plt.plot(self.m_Y, label='Real', color='blue', alpha=0.7, linewidth=2, marker='o', markersize=3) plt.plot(y_pred_all, label='Predicho', color='red', alpha=0.7, linewidth=2, marker='x', markersize=3) plt.title('Comparación Real vs Predicho', fontsize=12, fontweight='bold') plt.xlabel('Índice de Muestra', fontsize=10) plt.ylabel('Clase', fontsize=10) plt.legend(fontsize=9) plt.grid(True, alpha=0.3) plt.ylim(-0.2, 1.2) plt.yticks([0, 1], ['NO OPERAR', 'OPERAR']) #=== GRÁFICO 2: Matriz de Confusión === plt.subplot(3, 2, 2) cm = self.m_metrics['confusion_matrix'] plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues) plt.title('Matriz de Confusión', fontsize=12, fontweight='bold') plt.colorbar() classes = ['NO OPERAR\n(0)', 'OPERAR\n(1)'] tick_marks = np.arange(len(classes)) plt.xticks(tick_marks, classes, fontsize=9) plt.yticks(tick_marks, classes, fontsize=9) thresh = cm.max() / 2. for i in range(cm.shape[0]): for j in range(cm.shape[1]): plt.text(j, i, format(cm[i, j], 'd'), ha="center", va="center", color="white" if cm[i, j] > thresh else "black", fontsize=14, fontweight='bold') plt.ylabel('Clase Real', fontsize=10) plt.xlabel('Clase Predicha', fontsize=10) plt.tight_layout() #=== GRÁFICO 3: Distribución de Probabilidades === plt.subplot(3, 2, 3) plt.hist(y_pred_proba_all[self.m_Y == 0], bins=30, alpha=0.6, label='NO OPERAR (0)', color='red') plt.hist(y_pred_proba_all[self.m_Y == 1], bins=30, alpha=0.6, label='OPERAR (1)', color='green') plt.axvline(x=0.5, color='black', linestyle='--', linewidth=2, label='Umbral') plt.title('Distribución de Probabilidades Predichas', fontsize=12, fontweight='bold') plt.xlabel('Probabilidad de OPERAR (1)', fontsize=10) plt.ylabel('Frecuencia', fontsize=10) plt.legend(fontsize=9) plt.grid(True, alpha=0.3) #=== GRÁFICO 4: Feature Importance === plt.subplot(3, 2, 4) feature_importance = self.m_model.get_feature_importance() top_n = min(15, len(feature_importance)) top_indices = np.argsort(feature_importance)[-top_n:] top_features = [self.m_selected_columns[i] for i in top_indices] top_scores = feature_importance[top_indices] plt.barh(range(top_n), top_scores, color='skyblue') plt.yticks(range(top_n), top_features, fontsize=8) plt.xlabel('Importancia', fontsize=10) plt.title(f'Top {top_n} Features más Importantes', fontsize=12, fontweight='bold') plt.grid(True, alpha=0.3, axis='x') #=== GRÁFICO 5: Curva de Probabilidades === plt.subplot(3, 2, 5) sorted_indices = np.argsort(y_pred_proba_all) plt.plot(y_pred_proba_all[sorted_indices], color='purple', linewidth=2) plt.axhline(y=0.5, color='red', linestyle='--', linewidth=2, label='Umbral 0.5') plt.fill_between(range(len(sorted_indices)), 0, y_pred_proba_all[sorted_indices], where=(y_pred_proba_all[sorted_indices] >= 0.5), color='green', alpha=0.3, label='OPERAR') plt.fill_between(range(len(sorted_indices)), 0, y_pred_proba_all[sorted_indices], where=(y_pred_proba_all[sorted_indices] < 0.5), color='red', alpha=0.3, label='NO OPERAR') plt.title('Probabilidades Predichas Ordenadas', fontsize=12, fontweight='bold') plt.xlabel('Índice Ordenado', fontsize=10) plt.ylabel('Probabilidad OPERAR (1)', fontsize=10) plt.legend(fontsize=9) plt.grid(True, alpha=0.3) #=== GRÁFICO 6: Métricas Resumen === plt.subplot(3, 2, 6) metrics_names = ['Accuracy', 'Precision', 'Recall', 'F1-Score'] metrics_values = [ self.m_metrics['accuracy'], self.m_metrics['precision'], self.m_metrics['recall'], self.m_metrics['f1_score'] ] colors = ['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728'] bars = plt.bar(metrics_names, metrics_values, color=colors, alpha=0.7) for bar, value in zip(bars, metrics_values): height = bar.get_height() plt.text(bar.get_x() + bar.get_width()/2., height, f'{value:.3f}', ha='center', va='bottom', fontsize=10, fontweight='bold') plt.ylim(0, 1.1) plt.title('Resumen de Métricas', fontsize=12, fontweight='bold') plt.ylabel('Valor', fontsize=10) plt.grid(True, alpha=0.3, axis='y') plt.tight_layout() #--- Guardar gráfico plot_path = os.path.join(self.m_output_folder, f"{self.m_model_name}_metrics.png") plt.savefig(plot_path, dpi=120, bbox_inches='tight') plt.close() self.LogInfo(f"Gráfico guardado: {plot_path}") return True except Exception as e: self.LogCriticalError(f"Error al generar gráficos: {str(e)}") Funciones.Remover(1) #+------------------------------------------------------------------+ #| Exportación a ONNX | #+------------------------------------------------------------------+ def ExportToONNX(self): try: self.LogInfo("Exportando modelo a ONNX...") onnx_path = os.path.join(self.m_output_folder, f"{self.m_model_name}.onnx") self.m_model.save_model(onnx_path, format="onnx") self.LogInfo(f"Modelo exportado: {onnx_path}") import onnx onnx_model = onnx.load(onnx_path) onnx.checker.check_model(onnx_model) self.LogInfo("Validación ONNX completada correctamente") return True except Exception as e: self.LogError(f"Error al exportar a ONNX: {str(e)}") return False #+------------------------------------------------------------------+ #| Ejecución Principal | #+------------------------------------------------------------------+ def Execute(self): try: self.LogInfo("="*60) self.LogInfo(f"INICIANDO ENTRENAMIENTO: {self.m_model_name}") self.LogInfo("="*60) if not self.LoadData(): return False if not self.SeparateData(): return False if not self.SelectBestFeatures(): return False if not self.SplitTrainTest(): return False if not self.OptimizeHyperparameters(): return False if not self.TrainFinalModel(): return False if not self.EvaluateModel(): return False if not self.PlotResults(): return False if not self.ExportToONNX(): self.LogWarning("Exportación a ONNX falló, pero el modelo fue entrenado") self.LogInfo("="*60) self.LogInfo("ENTRENAMIENTO COMPLETADO EXITOSAMENTE") self.LogInfo("="*60) self.LogInfo("\nRESUMEN FINAL:") self.LogInfo(f" Modelo: {self.m_model_name}") self.LogInfo(f" Features utilizadas: {len(self.m_selected_columns)}") self.LogInfo(f" Accuracy: {self.m_metrics['accuracy']:.4f}") self.LogInfo(f" Precision: {self.m_metrics['precision']:.4f}") self.LogInfo(f" Recall: {self.m_metrics['recall']:.4f}") self.LogInfo(f" F1-Score: {self.m_metrics['f1_score']:.4f}") return True except Exception as e: self.LogCriticalError(f"Error en Execute: {str(e)}") import traceback traceback.print_exc() return False #+------------------------------------------------------------------+ #| Getters | #+------------------------------------------------------------------+ def GetMetrics(self): return self.m_metrics def GetSelectedFeatures(self): return self.m_selected_columns def GetBestParams(self): return self.m_best_params def GetModel(self): return self.m_model #+------------------------------------------------------------------+ #| Función Main de Ejemplo | #+------------------------------------------------------------------+ def main(): config = { 'csv_file': 'C:\\Users\\leoxd\\AppData\\Roaming\\MetaQuotes\\Terminal\\Common\\Files\\EasySbAi\\data.csv', 'target_col': ' salida', 'output_folder': 'C:\\Users\\leoxd\\AppData\\Roaming\\MetaQuotes\\Terminal\\Common\\Files\\EasySbAi', 'model_name': 'XAUUSD-M5-PO3', 'num_features': 25, 'validation_split': 0.2, 'n_trials': 75, 'k_folds': 5, 'random_seed': 42 } trainer = CModelTrainer(config) trainer.EnableAllLogs() success = trainer.Execute() if success: print("\n Entrenamiento completado con éxito") print(f" Métricas: {trainer.GetMetrics()}") print(f" Features: {trainer.GetSelectedFeatures()}") else: print("\n Entrenamiento falló") if __name__ == "__main__": main()