import argparse import os import json import pickle import gzip import shutil from datetime import datetime from typing import Optional import numpy as np import pandas as pd from sklearn.metrics import f1_score, roc_auc_score from sklearn.model_selection import TimeSeriesSplit from dataset import load_features, make_label_from_r_multiple from features import enrich_with_yahoo, build_feature_matrix, scale_features from model import build_classifier, build_lstm, save_model from policy import write_policy def _resolve_cols(df: pd.DataFrame): strat = "strategy" if "strategy" in df.columns else None sym = "symbol" if "symbol" in df.columns else ("entry_symbol" if "entry_symbol" in df.columns else None) tf = "timeframe" if "timeframe" in df.columns else ("tf" if "tf" in df.columns else None) return strat, sym, tf def _resolve_time_col(df: pd.DataFrame) -> Optional[str]: for c in ("close_time", "entry_time", "timestamp", "time"): if c in df.columns: return c return None def make_sequences(df: pd.DataFrame, X: np.ndarray, y: pd.Series, seq_len: int = 30, group_cols: Optional[list] = None, time_col: Optional[str] = None): """Build rolling window sequences per group, ordered by time. Returns (Xs, ys) with shapes (N_seq, seq_len, n_features) and (N_seq,). The label aligns to the end of each window. """ if group_cols is None: group_cols = [] if time_col is None: time_col = _resolve_time_col(df) # attach row indices to preserve alignment df_idx = df.reset_index(drop=True).copy() df_idx["__row__"] = np.arange(len(df_idx)) # ensure time ordering within each group if time_col and time_col in df_idx.columns: try: df_idx = df_idx.sort_values([*group_cols, time_col]) if group_cols else df_idx.sort_values(time_col) except Exception: pass Xs, ys = [], [] if group_cols: for _, g in df_idx.groupby(group_cols): idxs = g["__row__"].values.astype(int) if len(idxs) < seq_len: continue for start in range(0, len(idxs) - seq_len + 1): win = idxs[start:start + seq_len] Xs.append(X[win, :]) ys.append(int(y.iloc[win[-1]])) else: idxs = df_idx["__row__"].values.astype(int) if len(idxs) >= seq_len: for start in range(0, len(idxs) - seq_len + 1): win = idxs[start:start + seq_len] Xs.append(X[win, :]) ys.append(int(y.iloc[win[-1]])) if not Xs: return np.zeros((0, seq_len, X.shape[1]), dtype=float), np.zeros((0,), dtype=int) return np.stack(Xs, axis=0).astype(float), np.array(ys, dtype=int) def rotate_and_compress(path, threshold_bytes, compress_exts=(".json", ".pkl")): if os.path.exists(path) and os.path.getsize(path) > threshold_bytes: ts = datetime.now().strftime("%Y%m%d_%H%M%S") rotated = f"{path}.{ts}.bak" shutil.move(path, rotated) ext = os.path.splitext(path)[1].lower() if ext in compress_exts: with open(rotated, "rb") as f_in, gzip.open(rotated + ".gz", "wb") as f_out: shutil.copyfileobj(f_in, f_out) os.remove(rotated) print(f"[ROTATE] {path} rotated and compressed to {rotated}.gz") else: print(f"[ROTATE] {path} rotated to {rotated}") def train(common_dir: Optional[str], epochs: int, batch_size: int, val_splits: int, min_conf: Optional[float], model_type: str = "dense", seq_len: int = 30) -> None: df = load_features(common_dir) # Enrich with Yahoo using detected timeframe and UTC normalization tf_minutes = None if "timeframe" in df.columns: try: tf_series = pd.to_numeric(df["timeframe"], errors="coerce").dropna() if not tf_series.empty: tf_minutes = int(tf_series.mode().iloc[0]) except Exception: tf_minutes = None df = enrich_with_yahoo(df, target_timeframe_minutes=tf_minutes, tz="UTC", fill_holidays=True) # Build labels df, y = make_label_from_r_multiple(df) # Build feature matrix X, feat_names = build_feature_matrix(df) X, scaler = scale_features(X) # Prepare data depending on model type use_lstm = str(model_type).lower() == "lstm" if use_lstm: s_col, y_col, _ = _resolve_cols(df) group_cols = [c for c in (s_col, y_col) if c is not None] t_col = _resolve_time_col(df) X_seq, y_seq = make_sequences(df, X, y, seq_len=seq_len, group_cols=group_cols, time_col=t_col) if X_seq.shape[0] == 0: raise RuntimeError("Insufficient data to build LSTM sequences — try reducing --seq_len or ensuring time columns exist") # Time-aware split tscv = TimeSeriesSplit(n_splits=max(2, val_splits)) best_auc = -1.0 best_model = None best_thresh = 0.5 if use_lstm: # Split over sequence indices for fold, (tr_idx, va_idx) in enumerate(tscv.split(X_seq)): X_tr, X_va = X_seq[tr_idx], X_seq[va_idx] y_tr, y_va = y_seq[tr_idx], y_seq[va_idx] model = build_lstm(input_dim=X_seq.shape[2], seq_len=X_seq.shape[1]) model.fit(X_tr, y_tr, epochs=epochs, batch_size=batch_size, validation_data=(X_va, y_va), verbose=0) p_va = model.predict(X_va, verbose=0).ravel() auc = roc_auc_score(y_va, p_va) if len(np.unique(y_va)) > 1 else 0.5 ts = np.linspace(0.3, 0.8, num=21) f1s = [] for t in ts: f1s.append(f1_score(y_va, (p_va >= t).astype(int))) t_star = float(ts[int(np.argmax(f1s))]) if auc > best_auc: best_auc = auc best_model = model best_thresh = t_star print(f"Fold {fold+1}: AUC={auc:.3f} best_thresh={t_star:.2f}") else: for fold, (tr_idx, va_idx) in enumerate(tscv.split(X)): X_tr, X_va = X[tr_idx], X[va_idx] y_tr, y_va = y.iloc[tr_idx].values, y.iloc[va_idx].values model = build_classifier(X.shape[1]) model.fit(X_tr, y_tr, epochs=epochs, batch_size=batch_size, validation_data=(X_va, y_va), verbose=0) p_va = model.predict(X_va, verbose=0).ravel() auc = roc_auc_score(y_va, p_va) if len(np.unique(y_va)) > 1 else 0.5 # choose threshold by max F1 ts = np.linspace(0.3, 0.8, num=21) f1s = [] for t in ts: f1s.append(f1_score(y_va, (p_va >= t).astype(int))) t_star = float(ts[int(np.argmax(f1s))]) if auc > best_auc: best_auc = auc best_model = model best_thresh = t_star print(f"Fold {fold+1}: AUC={auc:.3f} best_thresh={t_star:.2f}") best_thresh = t_star print(f"Fold {fold+1}: AUC={auc:.3f} best_thresh={t_star:.2f}") if best_model is None: raise RuntimeError("Training failed to produce a model") out_dir = os.path.join(os.getcwd(), "artifacts") os.makedirs(out_dir, exist_ok=True) # Artifact rotation thresholds keras_thresh = 100 * 1024 * 1024 # 100MB json_thresh = 20 * 1024 * 1024 # 20MB pkl_thresh = 20 * 1024 * 1024 # 20MB # Rotate/compress before writing rotate_and_compress(os.path.join(out_dir, "tf_model.keras"), keras_thresh, compress_exts=()) rotate_and_compress(os.path.join(out_dir, "scaler.pkl"), pkl_thresh) rotate_and_compress(os.path.join(out_dir, "features.json"), json_thresh) save_model(best_model, out_dir) # Persist preprocessing artifacts with open(os.path.join(out_dir, "scaler.pkl"), "wb") as f: pickle.dump(scaler, f) with open(os.path.join(out_dir, "features.json"), "w", encoding="utf-8") as f: json.dump({"features": feat_names}, f, indent=2) # Compute per-slice probabilities using the full dataset predictions if use_lstm: # Build full sequences again to score consistently X_full_seq, _ = make_sequences(df, X, y, seq_len=seq_len, group_cols=[c for c in _resolve_cols(df)[:2] if c is not None], time_col=_resolve_time_col(df)) # For mapping back to rows, approximate by using last timestep of each sequence p_seq = best_model.predict(X_full_seq, verbose=0).ravel() # Broadcast sequence score to last row of each window df_probs = df.copy() df_probs["p_win"] = np.nan # Compute indices of last rows per sequence # Reconstruct order the same way as make_sequences df_idx = df.reset_index(drop=True).copy() df_idx["__row__"] = np.arange(len(df_idx)) s_col, y_col, _ = _resolve_cols(df) t_col = _resolve_time_col(df) seq_last_rows = [] if s_col or y_col: group_cols = [c for c in (s_col, y_col) if c is not None] if t_col and t_col in df_idx.columns: try: df_idx = df_idx.sort_values([*group_cols, t_col]) except Exception: pass for _, g in df_idx.groupby(group_cols): idxs = g["__row__"].values.astype(int) for start in range(0, max(0, len(idxs) - seq_len + 1)): seq_last_rows.append(idxs[start + seq_len - 1]) else: idxs = df_idx["__row__"].values.astype(int) for start in range(0, max(0, len(idxs) - seq_len + 1)): seq_last_rows.append(idxs[start + seq_len - 1]) # Assign for idx, p in zip(seq_last_rows, p_seq): if 0 <= idx < len(df_probs): df_probs.at[idx, "p_win"] = float(p) # Fill remaining rows by ffill within group to get per-row probabilities if s_col or y_col: group_cols = [c for c in (s_col, y_col) if c is not None] df_probs = df_probs.sort_values(group_cols + ([t_col] if t_col else [])).groupby(group_cols).apply(lambda g: g.ffill()).reset_index(drop=True) else: df_probs = df_probs.ffill() else: p_all = best_model.predict(X, verbose=0).ravel() df_probs = df.copy() df_probs["p_win"] = p_all s_col, y_col, t_col = _resolve_cols(df_probs) slice_probs = [] if all(c is not None for c in (s_col, y_col, t_col)): gb = df_probs.groupby([s_col, y_col, t_col])["p_win"].mean().reset_index() for _, row in gb.iterrows(): slice_probs.append({ "strategy": str(row[s_col]), "symbol": str(row[y_col]), "timeframe": int(row[t_col]) if not pd.isna(row[t_col]) else -1, "p_win": float(row["p_win"]), }) # Write policy.json to Common Files (with optional slice_probs) threshold = float(min_conf) if min_conf is not None else float(best_thresh) path = write_policy(min_confidence=threshold, common_dir=common_dir, slice_probs=slice_probs if slice_probs else None) print(f"Saved model to {os.path.join(out_dir, 'tf_model.keras')}") print(f"Saved scaler to {os.path.join(out_dir, 'scaler.pkl')}") print(f"Saved feature list to {os.path.join(out_dir, 'features.json')}") print(f"Saved policy to {path} (min_confidence={threshold:.2f}, slices={len(slice_probs)})") if __name__ == "__main__": ap = argparse.ArgumentParser() ap.add_argument("--common", type=str, default=None, help="Override Common Files DualEA dir") ap.add_argument("--epochs", type=int, default=15) ap.add_argument("--batch", type=int, default=256) ap.add_argument("--splits", type=int, default=3) ap.add_argument("--min_conf", type=float, default=None, help="Force min_confidence for policy.json") ap.add_argument("--model", type=str, default="dense", choices=["dense", "lstm"], help="Model type to train") ap.add_argument("--seq_len", type=int, default=30, help="Sequence length for LSTM") args = ap.parse_args() train(args.common, args.epochs, args.batch, args.splits, args.min_conf, args.model, args.seq_len)