mql5/Experts/Advisors/DualEA/ML/train.py

281 lines
12 KiB
Python
Raw Permalink Normal View History

2025-09-10 13:27:03 -04:00
2025-08-10 17:43:21 -04:00
import argparse
import os
import json
import pickle
2025-09-10 13:27:03 -04:00
import gzip
import shutil
from datetime import datetime
2025-08-10 17:43:21 -04:00
from typing import Optional
import numpy as np
import pandas as pd
from sklearn.metrics import f1_score, roc_auc_score
from sklearn.model_selection import TimeSeriesSplit
from dataset import load_features, make_label_from_r_multiple
from features import enrich_with_yahoo, build_feature_matrix, scale_features
from model import build_classifier, build_lstm, save_model
from policy import write_policy
def _resolve_cols(df: pd.DataFrame):
strat = "strategy" if "strategy" in df.columns else None
sym = "symbol" if "symbol" in df.columns else ("entry_symbol" if "entry_symbol" in df.columns else None)
tf = "timeframe" if "timeframe" in df.columns else ("tf" if "tf" in df.columns else None)
return strat, sym, tf
def _resolve_time_col(df: pd.DataFrame) -> Optional[str]:
for c in ("close_time", "entry_time", "timestamp", "time"):
if c in df.columns:
return c
return None
def make_sequences(df: pd.DataFrame,
X: np.ndarray,
y: pd.Series,
seq_len: int = 30,
group_cols: Optional[list] = None,
time_col: Optional[str] = None):
"""Build rolling window sequences per group, ordered by time.
Returns (Xs, ys) with shapes (N_seq, seq_len, n_features) and (N_seq,).
The label aligns to the end of each window.
"""
if group_cols is None:
group_cols = []
if time_col is None:
time_col = _resolve_time_col(df)
# attach row indices to preserve alignment
df_idx = df.reset_index(drop=True).copy()
df_idx["__row__"] = np.arange(len(df_idx))
# ensure time ordering within each group
if time_col and time_col in df_idx.columns:
try:
df_idx = df_idx.sort_values([*group_cols, time_col]) if group_cols else df_idx.sort_values(time_col)
except Exception:
pass
Xs, ys = [], []
if group_cols:
for _, g in df_idx.groupby(group_cols):
idxs = g["__row__"].values.astype(int)
if len(idxs) < seq_len:
continue
for start in range(0, len(idxs) - seq_len + 1):
win = idxs[start:start + seq_len]
Xs.append(X[win, :])
ys.append(int(y.iloc[win[-1]]))
else:
idxs = df_idx["__row__"].values.astype(int)
if len(idxs) >= seq_len:
for start in range(0, len(idxs) - seq_len + 1):
win = idxs[start:start + seq_len]
Xs.append(X[win, :])
ys.append(int(y.iloc[win[-1]]))
if not Xs:
return np.zeros((0, seq_len, X.shape[1]), dtype=float), np.zeros((0,), dtype=int)
return np.stack(Xs, axis=0).astype(float), np.array(ys, dtype=int)
2025-09-10 13:27:03 -04:00
def rotate_and_compress(path, threshold_bytes, compress_exts=(".json", ".pkl")):
if os.path.exists(path) and os.path.getsize(path) > threshold_bytes:
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
rotated = f"{path}.{ts}.bak"
shutil.move(path, rotated)
ext = os.path.splitext(path)[1].lower()
if ext in compress_exts:
with open(rotated, "rb") as f_in, gzip.open(rotated + ".gz", "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
os.remove(rotated)
print(f"[ROTATE] {path} rotated and compressed to {rotated}.gz")
else:
print(f"[ROTATE] {path} rotated to {rotated}")
2025-08-10 17:43:21 -04:00
def train(common_dir: Optional[str],
epochs: int,
batch_size: int,
val_splits: int,
min_conf: Optional[float],
model_type: str = "dense",
seq_len: int = 30) -> None:
df = load_features(common_dir)
# Enrich with Yahoo using detected timeframe and UTC normalization
tf_minutes = None
if "timeframe" in df.columns:
try:
tf_series = pd.to_numeric(df["timeframe"], errors="coerce").dropna()
if not tf_series.empty:
tf_minutes = int(tf_series.mode().iloc[0])
except Exception:
tf_minutes = None
df = enrich_with_yahoo(df, target_timeframe_minutes=tf_minutes, tz="UTC", fill_holidays=True)
# Build labels
df, y = make_label_from_r_multiple(df)
# Build feature matrix
X, feat_names = build_feature_matrix(df)
X, scaler = scale_features(X)
# Prepare data depending on model type
use_lstm = str(model_type).lower() == "lstm"
if use_lstm:
s_col, y_col, _ = _resolve_cols(df)
group_cols = [c for c in (s_col, y_col) if c is not None]
t_col = _resolve_time_col(df)
X_seq, y_seq = make_sequences(df, X, y, seq_len=seq_len, group_cols=group_cols, time_col=t_col)
if X_seq.shape[0] == 0:
raise RuntimeError("Insufficient data to build LSTM sequences — try reducing --seq_len or ensuring time columns exist")
# Time-aware split
tscv = TimeSeriesSplit(n_splits=max(2, val_splits))
best_auc = -1.0
best_model = None
best_thresh = 0.5
if use_lstm:
# Split over sequence indices
for fold, (tr_idx, va_idx) in enumerate(tscv.split(X_seq)):
X_tr, X_va = X_seq[tr_idx], X_seq[va_idx]
y_tr, y_va = y_seq[tr_idx], y_seq[va_idx]
model = build_lstm(input_dim=X_seq.shape[2], seq_len=X_seq.shape[1])
model.fit(X_tr, y_tr, epochs=epochs, batch_size=batch_size, validation_data=(X_va, y_va), verbose=0)
p_va = model.predict(X_va, verbose=0).ravel()
auc = roc_auc_score(y_va, p_va) if len(np.unique(y_va)) > 1 else 0.5
ts = np.linspace(0.3, 0.8, num=21)
f1s = []
for t in ts:
f1s.append(f1_score(y_va, (p_va >= t).astype(int)))
t_star = float(ts[int(np.argmax(f1s))])
if auc > best_auc:
best_auc = auc
best_model = model
best_thresh = t_star
print(f"Fold {fold+1}: AUC={auc:.3f} best_thresh={t_star:.2f}")
else:
for fold, (tr_idx, va_idx) in enumerate(tscv.split(X)):
X_tr, X_va = X[tr_idx], X[va_idx]
y_tr, y_va = y.iloc[tr_idx].values, y.iloc[va_idx].values
model = build_classifier(X.shape[1])
model.fit(X_tr, y_tr, epochs=epochs, batch_size=batch_size, validation_data=(X_va, y_va), verbose=0)
p_va = model.predict(X_va, verbose=0).ravel()
auc = roc_auc_score(y_va, p_va) if len(np.unique(y_va)) > 1 else 0.5
# choose threshold by max F1
ts = np.linspace(0.3, 0.8, num=21)
f1s = []
for t in ts:
f1s.append(f1_score(y_va, (p_va >= t).astype(int)))
t_star = float(ts[int(np.argmax(f1s))])
if auc > best_auc:
best_auc = auc
best_model = model
best_thresh = t_star
print(f"Fold {fold+1}: AUC={auc:.3f} best_thresh={t_star:.2f}")
best_thresh = t_star
print(f"Fold {fold+1}: AUC={auc:.3f} best_thresh={t_star:.2f}")
if best_model is None:
raise RuntimeError("Training failed to produce a model")
2025-09-10 13:27:03 -04:00
2025-08-10 17:43:21 -04:00
out_dir = os.path.join(os.getcwd(), "artifacts")
os.makedirs(out_dir, exist_ok=True)
2025-09-10 13:27:03 -04:00
# Artifact rotation thresholds
keras_thresh = 100 * 1024 * 1024 # 100MB
json_thresh = 20 * 1024 * 1024 # 20MB
pkl_thresh = 20 * 1024 * 1024 # 20MB
# Rotate/compress before writing
rotate_and_compress(os.path.join(out_dir, "tf_model.keras"), keras_thresh, compress_exts=())
rotate_and_compress(os.path.join(out_dir, "scaler.pkl"), pkl_thresh)
rotate_and_compress(os.path.join(out_dir, "features.json"), json_thresh)
2025-08-10 17:43:21 -04:00
save_model(best_model, out_dir)
# Persist preprocessing artifacts
with open(os.path.join(out_dir, "scaler.pkl"), "wb") as f:
pickle.dump(scaler, f)
with open(os.path.join(out_dir, "features.json"), "w", encoding="utf-8") as f:
json.dump({"features": feat_names}, f, indent=2)
# Compute per-slice probabilities using the full dataset predictions
if use_lstm:
# Build full sequences again to score consistently
X_full_seq, _ = make_sequences(df, X, y, seq_len=seq_len,
group_cols=[c for c in _resolve_cols(df)[:2] if c is not None],
time_col=_resolve_time_col(df))
# For mapping back to rows, approximate by using last timestep of each sequence
p_seq = best_model.predict(X_full_seq, verbose=0).ravel()
# Broadcast sequence score to last row of each window
df_probs = df.copy()
df_probs["p_win"] = np.nan
# Compute indices of last rows per sequence
# Reconstruct order the same way as make_sequences
df_idx = df.reset_index(drop=True).copy()
df_idx["__row__"] = np.arange(len(df_idx))
s_col, y_col, _ = _resolve_cols(df)
t_col = _resolve_time_col(df)
seq_last_rows = []
if s_col or y_col:
group_cols = [c for c in (s_col, y_col) if c is not None]
if t_col and t_col in df_idx.columns:
try:
df_idx = df_idx.sort_values([*group_cols, t_col])
except Exception:
pass
for _, g in df_idx.groupby(group_cols):
idxs = g["__row__"].values.astype(int)
for start in range(0, max(0, len(idxs) - seq_len + 1)):
seq_last_rows.append(idxs[start + seq_len - 1])
else:
idxs = df_idx["__row__"].values.astype(int)
for start in range(0, max(0, len(idxs) - seq_len + 1)):
seq_last_rows.append(idxs[start + seq_len - 1])
# Assign
for idx, p in zip(seq_last_rows, p_seq):
if 0 <= idx < len(df_probs):
df_probs.at[idx, "p_win"] = float(p)
# Fill remaining rows by ffill within group to get per-row probabilities
if s_col or y_col:
group_cols = [c for c in (s_col, y_col) if c is not None]
df_probs = df_probs.sort_values(group_cols + ([t_col] if t_col else [])).groupby(group_cols).apply(lambda g: g.ffill()).reset_index(drop=True)
else:
df_probs = df_probs.ffill()
else:
p_all = best_model.predict(X, verbose=0).ravel()
df_probs = df.copy()
df_probs["p_win"] = p_all
s_col, y_col, t_col = _resolve_cols(df_probs)
slice_probs = []
if all(c is not None for c in (s_col, y_col, t_col)):
gb = df_probs.groupby([s_col, y_col, t_col])["p_win"].mean().reset_index()
for _, row in gb.iterrows():
slice_probs.append({
"strategy": str(row[s_col]),
"symbol": str(row[y_col]),
"timeframe": int(row[t_col]) if not pd.isna(row[t_col]) else -1,
"p_win": float(row["p_win"]),
})
# Write policy.json to Common Files (with optional slice_probs)
threshold = float(min_conf) if min_conf is not None else float(best_thresh)
path = write_policy(min_confidence=threshold, common_dir=common_dir, slice_probs=slice_probs if slice_probs else None)
print(f"Saved model to {os.path.join(out_dir, 'tf_model.keras')}")
print(f"Saved scaler to {os.path.join(out_dir, 'scaler.pkl')}")
print(f"Saved feature list to {os.path.join(out_dir, 'features.json')}")
print(f"Saved policy to {path} (min_confidence={threshold:.2f}, slices={len(slice_probs)})")
if __name__ == "__main__":
ap = argparse.ArgumentParser()
ap.add_argument("--common", type=str, default=None, help="Override Common Files DualEA dir")
ap.add_argument("--epochs", type=int, default=15)
ap.add_argument("--batch", type=int, default=256)
ap.add_argument("--splits", type=int, default=3)
ap.add_argument("--min_conf", type=float, default=None, help="Force min_confidence for policy.json")
ap.add_argument("--model", type=str, default="dense", choices=["dense", "lstm"], help="Model type to train")
ap.add_argument("--seq_len", type=int, default=30, help="Sequence length for LSTM")
args = ap.parse_args()
train(args.common, args.epochs, args.batch, args.splits, args.min_conf, args.model, args.seq_len)