AiDataGenByLeo/Py/trainer.py

647 lines
27 KiB
Python
Raw Permalink Normal View History

2026-02-08 10:50:58 -05:00
# Copyright 2025, Niquel Mendoza.
# https://www.mql5.com/es/users/nique_372
# trainer.py
import sys
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import optuna
from catboost import CatBoostClassifier
from sklearn.model_selection import KFold, train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, classification_report
from sklearn.feature_selection import SelectKBest, f_classif
#+------------------------------------------------------------------+
#| Configurar path para importaciones |
#+------------------------------------------------------------------+
2026-02-08 12:44:12 -05:00
root = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
2026-02-08 10:50:58 -05:00
sys.path.insert(0, root)
from PyBase.Utils import SimpleLogger, Funciones
#+------------------------------------------------------------------+
#| Clase Principal de Entrenamiento |
#+------------------------------------------------------------------+
class CModelTrainer(SimpleLogger.CLoggerBase):
def __init__(self, config):
super().__init__()
#--- Parámetros de configuración
self.m_csv_file = config['csv_file']
self.m_target_col = config['target_col']
self.m_output_folder = config['output_folder']
self.m_model_name = config['model_name']
#--- Parámetros opcionales con defaults
self.m_num_features = config.get('num_features', 10)
self.m_validation_split = config.get('validation_split', 0.2)
self.m_n_trials = config.get('n_trials', 50)
self.m_k_folds = config.get('k_folds', 5)
self.m_random_seed = config.get('random_seed', 42)
#--- Variables de estado
self.m_dataframe = None
self.m_X = None
self.m_Y = None
self.m_X_train = None
self.m_X_test = None
self.m_y_train = None
self.m_y_test = None
self.m_selected_columns = None
self.m_best_params = None
self.m_model = None
self.m_metrics = {}
#--- Crear carpeta de salida si no existe
os.makedirs(self.m_output_folder, exist_ok=True)
self.LogInfo(f"Entrenador inicializado: {self.m_model_name}")
self.LogInfo(f"Carpeta de salida: {self.m_output_folder}")
#+------------------------------------------------------------------+
#| Carga y Validación de Datos |
#+------------------------------------------------------------------+
def LoadData(self):
try:
self.LogInfo(f"Cargando datos desde: {self.m_csv_file}")
#--- Cargar CSV
self.m_dataframe = pd.read_csv(self.m_csv_file, encoding="utf-16")
self.LogInfo(f"Dataset cargado: {self.m_dataframe.shape[0]} filas, {self.m_dataframe.shape[1]} columnas")
#--- Convertir a numérico
for column in self.m_dataframe.columns:
self.m_dataframe[column] = pd.to_numeric(self.m_dataframe[column], errors='coerce')
#--- Verificar NaN e infinitos
if self.m_dataframe.isnull().any().any():
self.LogWarning("Dataset contiene NaN, serán eliminados")
self.m_dataframe = self.m_dataframe.dropna()
if np.isinf(self.m_dataframe.select_dtypes(include=[np.number]).values).any():
self.LogWarning("Dataset contiene infinitos, serán eliminados")
self.m_dataframe = self.m_dataframe.replace([np.inf, -np.inf], np.nan)
self.m_dataframe = self.m_dataframe.dropna()
if self.m_dataframe.empty:
self.LogCriticalError("Dataset vacío después de limpieza")
Funciones.Remover(1)
if self.m_target_col not in self.m_dataframe.columns:
self.LogCriticalError(f"Columna objetivo '{self.m_target_col}' no existe")
Funciones.Remover(1)
self.LogInfo(f"Datos validados correctamente: {self.m_dataframe.shape[0]} muestras")
return True
except Exception as e:
self.LogCriticalError(f"Error al cargar datos: {str(e)}")
Funciones.Remover(1)
#+------------------------------------------------------------------+
#| Separación de Features y Target |
#+------------------------------------------------------------------+
def SeparateData(self):
try:
self.LogInfo("Separando features y target...")
#--- Separar Y (target)
self.m_Y = self.m_dataframe[self.m_target_col].to_numpy().astype(int)
#--- Separar X (features)
self.m_X = self.m_dataframe.drop(columns=[self.m_target_col]).to_numpy()
#--- Verificar que solo haya clases 0 y 1
unique_classes = np.unique(self.m_Y)
if not np.array_equal(unique_classes, np.array([0, 1])):
self.LogWarning(f"Clases detectadas: {unique_classes}. Se esperaba [0, 1]")
#--- Verificar distribución de clases
unique, counts = np.unique(self.m_Y, return_counts=True)
class_distribution = dict(zip(unique, counts))
self.LogInfo(f"X shape: {self.m_X.shape}")
self.LogInfo(f"Y shape: {self.m_Y.shape}")
self.LogInfo(f"Distribución de clases: {class_distribution}")
total_samples = len(self.m_Y)
for cls, count in class_distribution.items():
percentage = (count / total_samples) * 100
class_name = "OPERAR" if cls == 1 else "NO OPERAR"
self.LogInfo(f" Clase {cls} ({class_name}): {count} ({percentage:.2f}%)")
return True
except Exception as e:
self.LogCriticalError(f"Error al separar datos: {str(e)}")
Funciones.Remover(1)
#+------------------------------------------------------------------+
#| Feature Selection con SelectKBest |
#+------------------------------------------------------------------+
def SelectBestFeatures(self):
try:
self.LogInfo("Aplicando Feature Selection...")
original_columns = self.m_dataframe.drop(columns=[self.m_target_col]).columns
n_features_original = len(original_columns)
self.LogInfo(f"Features originales: {n_features_original}")
#--- Verificar si existe TipoOp
tipo_op_col = " tipo de operacion"
tiene_tipo_op = tipo_op_col in original_columns
if not tiene_tipo_op:
self.LogCriticalError(f"Columna '{tipo_op_col}' no encontrada en el dataset")
print(original_columns)
Funciones.Remover(1)
#--- Extraer TipoOp antes de SelectKBest
tipo_op_index = original_columns.get_loc(tipo_op_col)
tipo_op_data = self.m_X[:, tipo_op_index].reshape(-1, 1)
#--- Remover TipoOp temporalmente para SelectKBest
X_sin_tipo = np.delete(self.m_X, tipo_op_index, axis=1)
cols_sin_tipo = original_columns.drop(tipo_op_col)
#--- Ajustar número de features (sin contar TipoOp)
n_features = min(self.m_num_features, len(cols_sin_tipo))
if n_features != self.m_num_features:
self.LogWarning(f"Número de features ajustado de {self.m_num_features} a {n_features}")
#--- Aplicar SelectKBest sin TipoOp
selector = SelectKBest(score_func=f_classif, k=n_features)
X_selected = selector.fit_transform(X_sin_tipo, self.m_Y)
#--- Agregar TipoOp al final
self.m_X = np.column_stack([X_selected, tipo_op_data])
#--- Obtener nombres de columnas seleccionadas
selected_indices = selector.get_support(indices=True)
self.m_selected_columns = cols_sin_tipo[selected_indices].tolist()
self.m_selected_columns.append(tipo_op_col)
feature_scores = selector.scores_[selected_indices]
self.LogInfo(f"Features seleccionadas: {len(self.m_selected_columns)}")
for i, (col, score) in enumerate(zip(self.m_selected_columns[:-1], feature_scores), 1):
self.LogInfo(f" {i:2d}. {col} (score: {score:.4f})")
self.LogInfo(f" {len(self.m_selected_columns):2d}. {tipo_op_col} (FORZADO)")
return True
except Exception as e:
self.LogCriticalError(f"Error en Feature Selection: {str(e)}")
Funciones.Remover(1)
#+------------------------------------------------------------------+
#| Split Train/Test |
#+------------------------------------------------------------------+
def SplitTrainTest(self):
try:
self.LogInfo(f"Dividiendo datos (validation_split={self.m_validation_split})...")
self.m_X_train, self.m_X_test, self.m_y_train, self.m_y_test = train_test_split(
self.m_X,
self.m_Y,
test_size=self.m_validation_split,
random_state=self.m_random_seed,
stratify=self.m_Y
)
self.LogInfo(f"Train: {self.m_X_train.shape[0]} muestras")
self.LogInfo(f"Test: {self.m_X_test.shape[0]} muestras")
#--- Verificar distribución en train y test
train_unique, train_counts = np.unique(self.m_y_train, return_counts=True)
test_unique, test_counts = np.unique(self.m_y_test, return_counts=True)
train_dist = dict(zip(train_unique, train_counts))
test_dist = dict(zip(test_unique, test_counts))
self.LogInfo(f"Train - {train_dist}")
self.LogInfo(f"Test - {test_dist}")
return True
except Exception as e:
self.LogCriticalError(f"Error al dividir datos: {str(e)}")
Funciones.Remover(1)
#+------------------------------------------------------------------+
#| Optimización con Optuna |
#+------------------------------------------------------------------+
def ObjectiveOptuna(self, trial):
params = {
'iterations': trial.suggest_int('iterations', 100, 1000, step=100),
'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3, log=True),
'depth': trial.suggest_int('depth', 3, 10),
'l2_leaf_reg': trial.suggest_float('l2_leaf_reg', 1e-8, 10.0, log=True),
'border_count': trial.suggest_categorical('border_count', [32, 64, 128, 255]),
'bagging_temperature': trial.suggest_float('bagging_temperature', 0.0, 1.0),
'random_strength': trial.suggest_float('random_strength', 1e-9, 10.0, log=True),
'subsample': trial.suggest_float('subsample', 0.5, 1.0),
'auto_class_weights': trial.suggest_categorical('auto_class_weights', ['Balanced', 'None']),
'random_seed': self.m_random_seed,
'verbose': False,
'allow_writing_files': False
}
kf = KFold(n_splits=self.m_k_folds, shuffle=True, random_state=self.m_random_seed)
scores = []
for train_idx, val_idx in kf.split(self.m_X_train):
X_fold_train, X_fold_val = self.m_X_train[train_idx], self.m_X_train[val_idx]
y_fold_train, y_fold_val = self.m_y_train[train_idx], self.m_y_train[val_idx]
try:
model = CatBoostClassifier(**params)
model.fit(X_fold_train, y_fold_train)
y_pred = model.predict(X_fold_val)
f1 = f1_score(y_fold_val, y_pred, zero_division=0)
scores.append(f1)
except Exception:
return 0.0
return np.mean(scores)
def OptimizeHyperparameters(self):
try:
self.LogInfo(f"Optimizando hiperparámetros ({self.m_n_trials} trials, {self.m_k_folds}-fold CV)...")
study = optuna.create_study(
direction='maximize',
sampler=optuna.samplers.TPESampler(seed=self.m_random_seed)
)
study.optimize(self.ObjectiveOptuna, n_trials=self.m_n_trials, show_progress_bar=True)
self.m_best_params = study.best_params
self.LogInfo(f"Mejor F1-Score: {study.best_value:.6f}")
self.LogInfo("Mejores parámetros:")
for key, value in self.m_best_params.items():
self.LogInfo(f" {key}: {value}")
return True
except Exception as e:
self.LogCriticalError(f"Error en optimización: {str(e)}")
Funciones.Remover(1)
#+------------------------------------------------------------------+
#| Entrenamiento Final |
#+------------------------------------------------------------------+
def TrainFinalModel(self):
try:
self.LogInfo("Entrenando modelo final...")
final_params = self.m_best_params.copy()
final_params.update({
'random_seed': self.m_random_seed,
'verbose': False,
'allow_writing_files': False,
'eval_metric': 'F1'
})
self.m_model = CatBoostClassifier(**final_params)
self.m_model.fit(
self.m_X_train,
self.m_y_train,
eval_set=(self.m_X_test, self.m_y_test),
early_stopping_rounds=50,
verbose=False,
use_best_model=True
)
self.LogInfo(f"Modelo entrenado. Mejor iteración: {self.m_model.best_iteration_}")
return True
except Exception as e:
self.LogCriticalError(f"Error al entrenar modelo: {str(e)}")
Funciones.Remover(1)
#+------------------------------------------------------------------+
#| Evaluación y Métricas |
#+------------------------------------------------------------------+
def EvaluateModel(self):
try:
self.LogInfo("Evaluando modelo...")
#--- Predicciones
y_pred = self.m_model.predict(self.m_X_test)
y_pred_proba = self.m_model.predict_proba(self.m_X_test)[:, 1]
#--- Métricas básicas
accuracy = accuracy_score(self.m_y_test, y_pred)
precision = precision_score(self.m_y_test, y_pred, zero_division=0)
recall = recall_score(self.m_y_test, y_pred, zero_division=0)
f1 = f1_score(self.m_y_test, y_pred, zero_division=0)
#--- Matriz de confusión
cm = confusion_matrix(self.m_y_test, y_pred)
#--- Guardar métricas
self.m_metrics = {
'accuracy': accuracy,
'precision': precision,
'recall': recall,
'f1_score': f1,
'confusion_matrix': cm
}
#--- Mostrar métricas
self.LogInfo("Métricas finales:")
self.LogInfo(f" Accuracy: {accuracy:.4f}")
self.LogInfo(f" Precision: {precision:.4f}")
self.LogInfo(f" Recall: {recall:.4f}")
self.LogInfo(f" F1-Score: {f1:.4f}")
self.LogInfo("\nMatriz de Confusión:")
self.LogInfo(f" TN: {cm[0,0]}, FP: {cm[0,1]}")
self.LogInfo(f" FN: {cm[1,0]}, TP: {cm[1,1]}")
#--- Classification report
self.LogInfo("\nReporte de Clasificación:")
target_names = ['NO OPERAR (0)', 'OPERAR (1)']
report = classification_report(self.m_y_test, y_pred, target_names=target_names)
print(report)
return True
except Exception as e:
self.LogCriticalError(f"Error al evaluar modelo: {str(e)}")
Funciones.Remover(1)
#+------------------------------------------------------------------+
#| Generación de Gráficos |
#+------------------------------------------------------------------+
def PlotResults(self):
try:
self.LogInfo("Generando gráficos...")
y_pred_all = self.m_model.predict(self.m_X)
y_pred_proba_all = self.m_model.predict_proba(self.m_X)[:, 1]
plt.figure(figsize=(14, 12))
#=== GRÁFICO 1: Predicciones a lo largo del tiempo ===
plt.subplot(3, 2, 1)
plt.plot(self.m_Y, label='Real', color='blue', alpha=0.7, linewidth=2, marker='o', markersize=3)
plt.plot(y_pred_all, label='Predicho', color='red', alpha=0.7, linewidth=2, marker='x', markersize=3)
plt.title('Comparación Real vs Predicho', fontsize=12, fontweight='bold')
plt.xlabel('Índice de Muestra', fontsize=10)
plt.ylabel('Clase', fontsize=10)
plt.legend(fontsize=9)
plt.grid(True, alpha=0.3)
plt.ylim(-0.2, 1.2)
plt.yticks([0, 1], ['NO OPERAR', 'OPERAR'])
#=== GRÁFICO 2: Matriz de Confusión ===
plt.subplot(3, 2, 2)
cm = self.m_metrics['confusion_matrix']
plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
plt.title('Matriz de Confusión', fontsize=12, fontweight='bold')
plt.colorbar()
classes = ['NO OPERAR\n(0)', 'OPERAR\n(1)']
tick_marks = np.arange(len(classes))
plt.xticks(tick_marks, classes, fontsize=9)
plt.yticks(tick_marks, classes, fontsize=9)
thresh = cm.max() / 2.
for i in range(cm.shape[0]):
for j in range(cm.shape[1]):
plt.text(j, i, format(cm[i, j], 'd'),
ha="center", va="center",
color="white" if cm[i, j] > thresh else "black",
fontsize=14, fontweight='bold')
plt.ylabel('Clase Real', fontsize=10)
plt.xlabel('Clase Predicha', fontsize=10)
plt.tight_layout()
#=== GRÁFICO 3: Distribución de Probabilidades ===
plt.subplot(3, 2, 3)
plt.hist(y_pred_proba_all[self.m_Y == 0], bins=30, alpha=0.6, label='NO OPERAR (0)', color='red')
plt.hist(y_pred_proba_all[self.m_Y == 1], bins=30, alpha=0.6, label='OPERAR (1)', color='green')
plt.axvline(x=0.5, color='black', linestyle='--', linewidth=2, label='Umbral')
plt.title('Distribución de Probabilidades Predichas', fontsize=12, fontweight='bold')
plt.xlabel('Probabilidad de OPERAR (1)', fontsize=10)
plt.ylabel('Frecuencia', fontsize=10)
plt.legend(fontsize=9)
plt.grid(True, alpha=0.3)
#=== GRÁFICO 4: Feature Importance ===
plt.subplot(3, 2, 4)
feature_importance = self.m_model.get_feature_importance()
top_n = min(15, len(feature_importance))
top_indices = np.argsort(feature_importance)[-top_n:]
top_features = [self.m_selected_columns[i] for i in top_indices]
top_scores = feature_importance[top_indices]
plt.barh(range(top_n), top_scores, color='skyblue')
plt.yticks(range(top_n), top_features, fontsize=8)
plt.xlabel('Importancia', fontsize=10)
plt.title(f'Top {top_n} Features más Importantes', fontsize=12, fontweight='bold')
plt.grid(True, alpha=0.3, axis='x')
#=== GRÁFICO 5: Curva de Probabilidades ===
plt.subplot(3, 2, 5)
sorted_indices = np.argsort(y_pred_proba_all)
plt.plot(y_pred_proba_all[sorted_indices], color='purple', linewidth=2)
plt.axhline(y=0.5, color='red', linestyle='--', linewidth=2, label='Umbral 0.5')
plt.fill_between(range(len(sorted_indices)), 0, y_pred_proba_all[sorted_indices],
where=(y_pred_proba_all[sorted_indices] >= 0.5),
color='green', alpha=0.3, label='OPERAR')
plt.fill_between(range(len(sorted_indices)), 0, y_pred_proba_all[sorted_indices],
where=(y_pred_proba_all[sorted_indices] < 0.5),
color='red', alpha=0.3, label='NO OPERAR')
plt.title('Probabilidades Predichas Ordenadas', fontsize=12, fontweight='bold')
plt.xlabel('Índice Ordenado', fontsize=10)
plt.ylabel('Probabilidad OPERAR (1)', fontsize=10)
plt.legend(fontsize=9)
plt.grid(True, alpha=0.3)
#=== GRÁFICO 6: Métricas Resumen ===
plt.subplot(3, 2, 6)
metrics_names = ['Accuracy', 'Precision', 'Recall', 'F1-Score']
metrics_values = [
self.m_metrics['accuracy'],
self.m_metrics['precision'],
self.m_metrics['recall'],
self.m_metrics['f1_score']
]
colors = ['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728']
bars = plt.bar(metrics_names, metrics_values, color=colors, alpha=0.7)
for bar, value in zip(bars, metrics_values):
height = bar.get_height()
plt.text(bar.get_x() + bar.get_width()/2., height,
f'{value:.3f}',
ha='center', va='bottom', fontsize=10, fontweight='bold')
plt.ylim(0, 1.1)
plt.title('Resumen de Métricas', fontsize=12, fontweight='bold')
plt.ylabel('Valor', fontsize=10)
plt.grid(True, alpha=0.3, axis='y')
plt.tight_layout()
#--- Guardar gráfico
plot_path = os.path.join(self.m_output_folder, f"{self.m_model_name}_metrics.png")
plt.savefig(plot_path, dpi=120, bbox_inches='tight')
plt.close()
self.LogInfo(f"Gráfico guardado: {plot_path}")
return True
except Exception as e:
self.LogCriticalError(f"Error al generar gráficos: {str(e)}")
Funciones.Remover(1)
#+------------------------------------------------------------------+
#| Exportación a ONNX |
#+------------------------------------------------------------------+
def ExportToONNX(self):
try:
self.LogInfo("Exportando modelo a ONNX...")
onnx_path = os.path.join(self.m_output_folder, f"{self.m_model_name}.onnx")
self.m_model.save_model(onnx_path, format="onnx")
self.LogInfo(f"Modelo exportado: {onnx_path}")
import onnx
onnx_model = onnx.load(onnx_path)
onnx.checker.check_model(onnx_model)
self.LogInfo("Validación ONNX completada correctamente")
return True
except Exception as e:
self.LogError(f"Error al exportar a ONNX: {str(e)}")
return False
#+------------------------------------------------------------------+
#| Ejecución Principal |
#+------------------------------------------------------------------+
def Execute(self):
try:
self.LogInfo("="*60)
self.LogInfo(f"INICIANDO ENTRENAMIENTO: {self.m_model_name}")
self.LogInfo("="*60)
if not self.LoadData():
return False
if not self.SeparateData():
return False
if not self.SelectBestFeatures():
return False
if not self.SplitTrainTest():
return False
if not self.OptimizeHyperparameters():
return False
if not self.TrainFinalModel():
return False
if not self.EvaluateModel():
return False
if not self.PlotResults():
return False
if not self.ExportToONNX():
self.LogWarning("Exportación a ONNX falló, pero el modelo fue entrenado")
self.LogInfo("="*60)
self.LogInfo("ENTRENAMIENTO COMPLETADO EXITOSAMENTE")
self.LogInfo("="*60)
self.LogInfo("\nRESUMEN FINAL:")
self.LogInfo(f" Modelo: {self.m_model_name}")
self.LogInfo(f" Features utilizadas: {len(self.m_selected_columns)}")
self.LogInfo(f" Accuracy: {self.m_metrics['accuracy']:.4f}")
self.LogInfo(f" Precision: {self.m_metrics['precision']:.4f}")
self.LogInfo(f" Recall: {self.m_metrics['recall']:.4f}")
self.LogInfo(f" F1-Score: {self.m_metrics['f1_score']:.4f}")
return True
except Exception as e:
self.LogCriticalError(f"Error en Execute: {str(e)}")
import traceback
traceback.print_exc()
return False
#+------------------------------------------------------------------+
#| Getters |
#+------------------------------------------------------------------+
def GetMetrics(self):
return self.m_metrics
def GetSelectedFeatures(self):
return self.m_selected_columns
def GetBestParams(self):
return self.m_best_params
def GetModel(self):
return self.m_model
#+------------------------------------------------------------------+
#| Función Main de Ejemplo |
#+------------------------------------------------------------------+
def main():
config = {
2026-02-08 12:44:12 -05:00
'csv_file': 'C:\\Users\\leoxd\\AppData\\Roaming\\MetaQuotes\\Terminal\\Common\\Files\\EasySbAi\\data.csv',
2026-02-08 10:50:58 -05:00
'target_col': ' salida',
2026-02-08 12:44:12 -05:00
'output_folder': 'C:\\Users\\leoxd\\AppData\\Roaming\\MetaQuotes\\Terminal\\Common\\Files\\EasySbAi',
2026-02-08 10:50:58 -05:00
'model_name': 'XAUUSD-M5-PO3',
'num_features': 25,
'validation_split': 0.2,
'n_trials': 75,
'k_folds': 5,
'random_seed': 42
}
trainer = CModelTrainer(config)
trainer.EnableAllLogs()
success = trainer.Execute()
if success:
print("\n Entrenamiento completado con éxito")
print(f" Métricas: {trainer.GetMetrics()}")
print(f" Features: {trainer.GetSelectedFeatures()}")
else:
print("\n Entrenamiento falló")
if __name__ == "__main__":
main()