mql5/Experts/Advisors/DualEA/ML/snapshot_train.py
Princeec13 12eac37d58
2026-02-04 14:28:59 -05:00

228 行
8.6 KiB
Python

import argparse
import json
import os
import pickle
import warnings
from typing import List, Optional, Sequence
import numpy as np
import pandas as pd
from sklearn.metrics import f1_score, roc_auc_score
from sklearn.model_selection import TimeSeriesSplit
from sklearn.preprocessing import StandardScaler
import xgboost as xgb
from snapshot_dataset import (
FeatureMatrix,
build_feature_matrix,
describe_dataframe,
label_from_status,
labels_from_future_prices,
load_snapshot_dataframe,
)
# Silence the noisy XGBoost deprecation warning about 'use_label_encoder'
# while leaving other warnings intact.
warnings.filterwarnings(
"ignore",
message=".*use_label_encoder.*",
category=UserWarning,
)
def _determine_labels(
df: pd.DataFrame,
mode: str,
price_csv: Optional[str],
horizon_minutes: int,
target_r_multiple: float,
positive_status: Optional[Sequence[str]],
) -> pd.Series:
mode = mode.lower()
if mode == "status":
return label_from_status(df, positive_status)
if mode == "future_price":
if not price_csv:
raise ValueError("--price_csv required when --label_mode future_price")
labels = labels_from_future_prices(
df,
price_csv=price_csv,
horizon_minutes=horizon_minutes,
target_r_multiple=target_r_multiple,
)
aligned = labels.reindex(df.index)
aligned = aligned.fillna(0).astype(int)
return aligned
raise ValueError(f"Unsupported label_mode {mode}")
def _evaluate_predictions(y_true: np.ndarray, proba: np.ndarray) -> dict:
preds = (proba >= 0.5).astype(int)
auc = roc_auc_score(y_true, proba) if len(np.unique(y_true)) > 1 else 0.5
f1 = f1_score(y_true, preds)
return {"auc": float(auc), "f1": float(f1)}
def _train_with_timeseries_split(
X: np.ndarray,
y: np.ndarray,
n_splits: int,
params: dict,
) -> tuple[xgb.XGBClassifier, dict]:
splitter = TimeSeriesSplit(n_splits=max(2, n_splits))
best_model: Optional[xgb.XGBClassifier] = None
best_auc = -1.0
metrics = []
for fold, (tr_idx, va_idx) in enumerate(splitter.split(X)):
model = xgb.XGBClassifier(
**params,
eval_metric="logloss",
)
model.fit(X[tr_idx], y[tr_idx])
proba = model.predict_proba(X[va_idx])[:, 1]
fold_metrics = _evaluate_predictions(y[va_idx], proba)
fold_metrics["fold"] = fold + 1
metrics.append(fold_metrics)
if fold_metrics["auc"] > best_auc:
best_auc = fold_metrics["auc"]
best_model = model
print(f"[Fold {fold+1}] AUC={fold_metrics['auc']:.3f} F1={fold_metrics['f1']:.3f}")
if best_model is None:
raise RuntimeError("Training failed to produce a model")
return best_model, {"folds": metrics, "best_auc": best_auc}
def save_artifacts(
out_dir: str,
model: xgb.XGBClassifier,
scaler: StandardScaler,
feature_matrix: FeatureMatrix,
label_mode: str,
positive_status: Optional[Sequence[str]],
) -> None:
os.makedirs(out_dir, exist_ok=True)
model_path = os.path.join(out_dir, "xgb_model.json")
model.save_model(model_path)
with open(os.path.join(out_dir, "scaler.pkl"), "wb") as handle:
pickle.dump(scaler, handle)
meta = {
"features": feature_matrix.feature_names,
"categorical_mappings": feature_matrix.categorical_mappings,
"label_mode": label_mode,
"positive_status": list(positive_status) if positive_status else ["passed", "executed"],
}
with open(os.path.join(out_dir, "feature_meta.json"), "w", encoding="utf-8") as handle:
json.dump(meta, handle, indent=2)
print(f"[OK] Saved artifacts to {out_dir}")
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Train XGBoost model from signal snapshot exports")
parser.add_argument("--snapshot_dir", type=str, default=None, help="Directory containing feature_batch_*.txt files (defaults to MT5 Common Files)")
parser.add_argument("--limit", type=int, default=None, help="Limit number of snapshots (most recent)")
parser.add_argument("--label_mode", type=str, default="status", choices=("status", "future_price"))
parser.add_argument("--price_csv", type=str, default=None, help="Price CSV for future_price labeling mode")
parser.add_argument("--horizon_minutes", type=int, default=60, help="Minutes ahead for future_price labeling")
parser.add_argument("--target_r_multiple", type=float, default=0.0, help="R multiple threshold for positive label in future_price mode")
parser.add_argument("--positive_status", type=str, nargs="*", default=("passed", "executed"), help="Statuses treated as positive in status label mode")
parser.add_argument("--n_splits", type=int, default=4, help="TimeSeriesSplit folds")
parser.add_argument("--max_depth", type=int, default=6)
parser.add_argument("--learning_rate", type=float, default=0.1)
parser.add_argument("--n_estimators", type=int, default=300)
parser.add_argument("--subsample", type=float, default=0.9)
parser.add_argument("--colsample_bytree", type=float, default=0.8)
parser.add_argument("--out_dir", type=str, default=os.path.join(os.getcwd(), "artifacts"))
return parser.parse_args()
def main():
args = parse_args()
df = load_snapshot_dataframe(args.snapshot_dir, limit=args.limit)
print("[DATA]", describe_dataframe(df))
feature_matrix = build_feature_matrix(df)
# Sanitize raw feature matrix before scaling to avoid NaN/inf/overflow
# issues that can arise from malformed or legacy snapshot values.
X_raw = np.asarray(feature_matrix.X, dtype=np.float64)
X_raw = np.nan_to_num(X_raw, nan=0.0, posinf=0.0, neginf=0.0)
X_raw = np.clip(X_raw, -1e9, 1e9)
scaler = StandardScaler()
X = scaler.fit_transform(X_raw)
labels = _determine_labels(
df,
mode=args.label_mode,
price_csv=args.price_csv,
horizon_minutes=args.horizon_minutes,
target_r_multiple=args.target_r_multiple,
positive_status=args.positive_status,
)
# Convert to integer labels and inspect distribution.
labels = labels.fillna(0).astype(int)
y = labels.values
n_samples = len(y)
if n_samples == 0:
raise RuntimeError("No snapshot records available for training")
unique = sorted(set(int(v) for v in y))
params = {
"max_depth": args.max_depth,
"learning_rate": args.learning_rate,
"n_estimators": args.n_estimators,
"subsample": args.subsample,
"colsample_bytree": args.colsample_bytree,
"objective": "binary:logistic",
}
if len(unique) < 2:
# Degenerate label distribution (all 0 or all 1). Fall back to a
# small bootstrap model so the pipeline can proceed, while clearly
# logging the situation.
print(
f"[WARN] Degenerate label distribution {unique} for {n_samples} samples; "
"training a baseline bootstrap model instead of failing."
)
if n_samples == 1:
# Duplicate the single snapshot to create two samples with
# opposite labels, ensuring XGBoost sees both classes.
X_train = X.repeat(2, axis=0)
y_train = np.array([0, 1], dtype=int)
else:
X_train = X
y_train = np.zeros(n_samples, dtype=int)
y_train[n_samples // 2 :] = 1
model = xgb.XGBClassifier(
**params,
eval_metric="logloss",
use_label_encoder=False,
)
model.fit(X_train, y_train)
proba = model.predict_proba(X_train)[:, 1]
metrics = _evaluate_predictions(y_train, proba)
metrics["baseline"] = True
metrics["n_samples"] = int(n_samples)
else:
model, metrics = _train_with_timeseries_split(
X,
y,
n_splits=args.n_splits,
params=params,
)
save_artifacts(
out_dir=args.out_dir,
model=model,
scaler=scaler,
feature_matrix=feature_matrix,
label_mode=args.label_mode,
positive_status=args.positive_status,
)
metrics_path = os.path.join(args.out_dir, "metrics.json")
with open(metrics_path, "w", encoding="utf-8") as handle:
json.dump(metrics, handle, indent=2)
print(f"[OK] Metrics written to {metrics_path}")
if __name__ == "__main__":
main()