2026-01-14 16:17:00 -05:00
|
|
|
import argparse
|
|
|
|
|
import json
|
|
|
|
|
import os
|
|
|
|
|
import pickle
|
2026-02-04 14:28:59 -05:00
|
|
|
import warnings
|
2026-01-14 16:17:00 -05:00
|
|
|
from typing import List, Optional, Sequence
|
|
|
|
|
|
|
|
|
|
import numpy as np
|
|
|
|
|
import pandas as pd
|
|
|
|
|
from sklearn.metrics import f1_score, roc_auc_score
|
|
|
|
|
from sklearn.model_selection import TimeSeriesSplit
|
|
|
|
|
from sklearn.preprocessing import StandardScaler
|
|
|
|
|
import xgboost as xgb
|
|
|
|
|
|
|
|
|
|
from snapshot_dataset import (
|
|
|
|
|
FeatureMatrix,
|
|
|
|
|
build_feature_matrix,
|
|
|
|
|
describe_dataframe,
|
|
|
|
|
label_from_status,
|
|
|
|
|
labels_from_future_prices,
|
|
|
|
|
load_snapshot_dataframe,
|
|
|
|
|
)
|
|
|
|
|
|
2026-02-04 14:28:59 -05:00
|
|
|
# Silence the noisy XGBoost deprecation warning about 'use_label_encoder'
|
|
|
|
|
# while leaving other warnings intact.
|
|
|
|
|
warnings.filterwarnings(
|
|
|
|
|
"ignore",
|
|
|
|
|
message=".*use_label_encoder.*",
|
|
|
|
|
category=UserWarning,
|
|
|
|
|
)
|
|
|
|
|
|
2026-01-14 16:17:00 -05:00
|
|
|
|
|
|
|
|
def _determine_labels(
|
|
|
|
|
df: pd.DataFrame,
|
|
|
|
|
mode: str,
|
|
|
|
|
price_csv: Optional[str],
|
|
|
|
|
horizon_minutes: int,
|
|
|
|
|
target_r_multiple: float,
|
|
|
|
|
positive_status: Optional[Sequence[str]],
|
|
|
|
|
) -> pd.Series:
|
|
|
|
|
mode = mode.lower()
|
|
|
|
|
if mode == "status":
|
|
|
|
|
return label_from_status(df, positive_status)
|
|
|
|
|
if mode == "future_price":
|
|
|
|
|
if not price_csv:
|
|
|
|
|
raise ValueError("--price_csv required when --label_mode future_price")
|
|
|
|
|
labels = labels_from_future_prices(
|
|
|
|
|
df,
|
|
|
|
|
price_csv=price_csv,
|
|
|
|
|
horizon_minutes=horizon_minutes,
|
|
|
|
|
target_r_multiple=target_r_multiple,
|
|
|
|
|
)
|
|
|
|
|
aligned = labels.reindex(df.index)
|
|
|
|
|
aligned = aligned.fillna(0).astype(int)
|
|
|
|
|
return aligned
|
|
|
|
|
raise ValueError(f"Unsupported label_mode {mode}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _evaluate_predictions(y_true: np.ndarray, proba: np.ndarray) -> dict:
|
|
|
|
|
preds = (proba >= 0.5).astype(int)
|
|
|
|
|
auc = roc_auc_score(y_true, proba) if len(np.unique(y_true)) > 1 else 0.5
|
|
|
|
|
f1 = f1_score(y_true, preds)
|
|
|
|
|
return {"auc": float(auc), "f1": float(f1)}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _train_with_timeseries_split(
|
|
|
|
|
X: np.ndarray,
|
|
|
|
|
y: np.ndarray,
|
|
|
|
|
n_splits: int,
|
|
|
|
|
params: dict,
|
|
|
|
|
) -> tuple[xgb.XGBClassifier, dict]:
|
|
|
|
|
splitter = TimeSeriesSplit(n_splits=max(2, n_splits))
|
|
|
|
|
best_model: Optional[xgb.XGBClassifier] = None
|
|
|
|
|
best_auc = -1.0
|
|
|
|
|
metrics = []
|
|
|
|
|
for fold, (tr_idx, va_idx) in enumerate(splitter.split(X)):
|
|
|
|
|
model = xgb.XGBClassifier(
|
|
|
|
|
**params,
|
|
|
|
|
eval_metric="logloss",
|
|
|
|
|
)
|
|
|
|
|
model.fit(X[tr_idx], y[tr_idx])
|
|
|
|
|
proba = model.predict_proba(X[va_idx])[:, 1]
|
|
|
|
|
fold_metrics = _evaluate_predictions(y[va_idx], proba)
|
|
|
|
|
fold_metrics["fold"] = fold + 1
|
|
|
|
|
metrics.append(fold_metrics)
|
|
|
|
|
if fold_metrics["auc"] > best_auc:
|
|
|
|
|
best_auc = fold_metrics["auc"]
|
|
|
|
|
best_model = model
|
|
|
|
|
print(f"[Fold {fold+1}] AUC={fold_metrics['auc']:.3f} F1={fold_metrics['f1']:.3f}")
|
|
|
|
|
if best_model is None:
|
|
|
|
|
raise RuntimeError("Training failed to produce a model")
|
|
|
|
|
return best_model, {"folds": metrics, "best_auc": best_auc}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def save_artifacts(
|
|
|
|
|
out_dir: str,
|
|
|
|
|
model: xgb.XGBClassifier,
|
|
|
|
|
scaler: StandardScaler,
|
|
|
|
|
feature_matrix: FeatureMatrix,
|
|
|
|
|
label_mode: str,
|
|
|
|
|
positive_status: Optional[Sequence[str]],
|
|
|
|
|
) -> None:
|
|
|
|
|
os.makedirs(out_dir, exist_ok=True)
|
|
|
|
|
model_path = os.path.join(out_dir, "xgb_model.json")
|
|
|
|
|
model.save_model(model_path)
|
|
|
|
|
with open(os.path.join(out_dir, "scaler.pkl"), "wb") as handle:
|
|
|
|
|
pickle.dump(scaler, handle)
|
|
|
|
|
meta = {
|
|
|
|
|
"features": feature_matrix.feature_names,
|
|
|
|
|
"categorical_mappings": feature_matrix.categorical_mappings,
|
|
|
|
|
"label_mode": label_mode,
|
|
|
|
|
"positive_status": list(positive_status) if positive_status else ["passed", "executed"],
|
|
|
|
|
}
|
|
|
|
|
with open(os.path.join(out_dir, "feature_meta.json"), "w", encoding="utf-8") as handle:
|
|
|
|
|
json.dump(meta, handle, indent=2)
|
|
|
|
|
print(f"[OK] Saved artifacts to {out_dir}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
|
|
|
parser = argparse.ArgumentParser(description="Train XGBoost model from signal snapshot exports")
|
|
|
|
|
parser.add_argument("--snapshot_dir", type=str, default=None, help="Directory containing feature_batch_*.txt files (defaults to MT5 Common Files)")
|
|
|
|
|
parser.add_argument("--limit", type=int, default=None, help="Limit number of snapshots (most recent)")
|
|
|
|
|
parser.add_argument("--label_mode", type=str, default="status", choices=("status", "future_price"))
|
|
|
|
|
parser.add_argument("--price_csv", type=str, default=None, help="Price CSV for future_price labeling mode")
|
|
|
|
|
parser.add_argument("--horizon_minutes", type=int, default=60, help="Minutes ahead for future_price labeling")
|
|
|
|
|
parser.add_argument("--target_r_multiple", type=float, default=0.0, help="R multiple threshold for positive label in future_price mode")
|
|
|
|
|
parser.add_argument("--positive_status", type=str, nargs="*", default=("passed", "executed"), help="Statuses treated as positive in status label mode")
|
|
|
|
|
parser.add_argument("--n_splits", type=int, default=4, help="TimeSeriesSplit folds")
|
|
|
|
|
parser.add_argument("--max_depth", type=int, default=6)
|
|
|
|
|
parser.add_argument("--learning_rate", type=float, default=0.1)
|
|
|
|
|
parser.add_argument("--n_estimators", type=int, default=300)
|
|
|
|
|
parser.add_argument("--subsample", type=float, default=0.9)
|
|
|
|
|
parser.add_argument("--colsample_bytree", type=float, default=0.8)
|
|
|
|
|
parser.add_argument("--out_dir", type=str, default=os.path.join(os.getcwd(), "artifacts"))
|
|
|
|
|
return parser.parse_args()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main():
|
|
|
|
|
args = parse_args()
|
|
|
|
|
df = load_snapshot_dataframe(args.snapshot_dir, limit=args.limit)
|
|
|
|
|
print("[DATA]", describe_dataframe(df))
|
2026-02-04 14:28:59 -05:00
|
|
|
feature_matrix = build_feature_matrix(df)
|
|
|
|
|
|
|
|
|
|
# Sanitize raw feature matrix before scaling to avoid NaN/inf/overflow
|
|
|
|
|
# issues that can arise from malformed or legacy snapshot values.
|
|
|
|
|
X_raw = np.asarray(feature_matrix.X, dtype=np.float64)
|
|
|
|
|
X_raw = np.nan_to_num(X_raw, nan=0.0, posinf=0.0, neginf=0.0)
|
|
|
|
|
X_raw = np.clip(X_raw, -1e9, 1e9)
|
|
|
|
|
|
|
|
|
|
scaler = StandardScaler()
|
|
|
|
|
X = scaler.fit_transform(X_raw)
|
|
|
|
|
|
2026-01-14 16:17:00 -05:00
|
|
|
labels = _determine_labels(
|
|
|
|
|
df,
|
|
|
|
|
mode=args.label_mode,
|
|
|
|
|
price_csv=args.price_csv,
|
|
|
|
|
horizon_minutes=args.horizon_minutes,
|
|
|
|
|
target_r_multiple=args.target_r_multiple,
|
|
|
|
|
positive_status=args.positive_status,
|
|
|
|
|
)
|
2026-02-04 14:28:59 -05:00
|
|
|
# Convert to integer labels and inspect distribution.
|
|
|
|
|
labels = labels.fillna(0).astype(int)
|
|
|
|
|
y = labels.values
|
|
|
|
|
n_samples = len(y)
|
|
|
|
|
if n_samples == 0:
|
|
|
|
|
raise RuntimeError("No snapshot records available for training")
|
|
|
|
|
|
|
|
|
|
unique = sorted(set(int(v) for v in y))
|
|
|
|
|
params = {
|
|
|
|
|
"max_depth": args.max_depth,
|
|
|
|
|
"learning_rate": args.learning_rate,
|
|
|
|
|
"n_estimators": args.n_estimators,
|
|
|
|
|
"subsample": args.subsample,
|
|
|
|
|
"colsample_bytree": args.colsample_bytree,
|
|
|
|
|
"objective": "binary:logistic",
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if len(unique) < 2:
|
|
|
|
|
# Degenerate label distribution (all 0 or all 1). Fall back to a
|
|
|
|
|
# small bootstrap model so the pipeline can proceed, while clearly
|
|
|
|
|
# logging the situation.
|
|
|
|
|
print(
|
|
|
|
|
f"[WARN] Degenerate label distribution {unique} for {n_samples} samples; "
|
|
|
|
|
"training a baseline bootstrap model instead of failing."
|
|
|
|
|
)
|
|
|
|
|
if n_samples == 1:
|
|
|
|
|
# Duplicate the single snapshot to create two samples with
|
|
|
|
|
# opposite labels, ensuring XGBoost sees both classes.
|
|
|
|
|
X_train = X.repeat(2, axis=0)
|
|
|
|
|
y_train = np.array([0, 1], dtype=int)
|
|
|
|
|
else:
|
|
|
|
|
X_train = X
|
|
|
|
|
y_train = np.zeros(n_samples, dtype=int)
|
|
|
|
|
y_train[n_samples // 2 :] = 1
|
|
|
|
|
|
|
|
|
|
model = xgb.XGBClassifier(
|
|
|
|
|
**params,
|
|
|
|
|
eval_metric="logloss",
|
|
|
|
|
use_label_encoder=False,
|
|
|
|
|
)
|
|
|
|
|
model.fit(X_train, y_train)
|
|
|
|
|
proba = model.predict_proba(X_train)[:, 1]
|
|
|
|
|
metrics = _evaluate_predictions(y_train, proba)
|
|
|
|
|
metrics["baseline"] = True
|
|
|
|
|
metrics["n_samples"] = int(n_samples)
|
|
|
|
|
else:
|
|
|
|
|
model, metrics = _train_with_timeseries_split(
|
|
|
|
|
X,
|
|
|
|
|
y,
|
|
|
|
|
n_splits=args.n_splits,
|
|
|
|
|
params=params,
|
|
|
|
|
)
|
2026-01-14 16:17:00 -05:00
|
|
|
save_artifacts(
|
|
|
|
|
out_dir=args.out_dir,
|
|
|
|
|
model=model,
|
|
|
|
|
scaler=scaler,
|
|
|
|
|
feature_matrix=feature_matrix,
|
|
|
|
|
label_mode=args.label_mode,
|
|
|
|
|
positive_status=args.positive_status,
|
|
|
|
|
)
|
|
|
|
|
metrics_path = os.path.join(args.out_dir, "metrics.json")
|
|
|
|
|
with open(metrics_path, "w", encoding="utf-8") as handle:
|
|
|
|
|
json.dump(metrics, handle, indent=2)
|
|
|
|
|
print(f"[OK] Metrics written to {metrics_path}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
main()
|