mql5/Experts/Advisors/DualEA/ML/policy_export.py
2025-08-14 19:25:09 -04:00

194 lines
7.7 KiB
Python

import argparse
import os
import json
import pickle
from typing import Optional, List, Dict
import numpy as np
import pandas as pd
from dataset import load_features
from features import enrich_with_yahoo, build_feature_matrix, scale_features
from model import load_model
from policy import write_policy
def _resolve_cols(df: pd.DataFrame):
strat = "strategy" if "strategy" in df.columns else None
sym = "symbol" if "symbol" in df.columns else ("entry_symbol" if "entry_symbol" in df.columns else None)
tf = "timeframe" if "timeframe" in df.columns else ("tf" if "tf" in df.columns else None)
return strat, sym, tf
# Heuristic mapping from probability to scaling multipliers.
# Note: MQL5 policy parser commits after reading p_win, so we must output
# sl_scale/tp_scale/trail_atr_mult BEFORE p_win in each slice object.
def compute_scales(p: float, min_conf: float, neutral: bool = False):
if neutral or p is None:
return 1.0, 1.0, 1.0
try:
import math
if math.isnan(p):
return 1.0, 1.0, 1.0
except Exception:
pass
hi = max(0.80, float(min_conf) + 0.20)
mid = max(0.60, float(min_conf) + 0.10)
if p >= hi:
return 0.85, 1.35, 1.15
if p >= mid:
return 0.90, 1.20, 1.10
if p >= float(min_conf):
return 1.00, 1.00, 1.00
return 1.00, 1.00, 1.00
def export_policy(common_dir: Optional[str], model_dir: str, min_conf: float, enrich: bool = True, neutral_scales: bool = False) -> str:
# Load features from Common Files
df = load_features(common_dir)
if enrich:
tf_minutes = None
if "timeframe" in df.columns:
try:
tf_series = pd.to_numeric(df["timeframe"], errors="coerce").dropna()
if not tf_series.empty:
tf_minutes = int(tf_series.mode().iloc[0])
except Exception:
tf_minutes = None
df = enrich_with_yahoo(df, target_timeframe_minutes=tf_minutes, tz="UTC", fill_holidays=True)
# Build features matrix with the same preprocessing as training
X, feat_names = build_feature_matrix(df)
# Try to load persisted scaler and feature order
scaler_path = os.path.join(model_dir, "scaler.pkl")
feats_path = os.path.join(model_dir, "features.json")
scaler = None
feature_order: List[str] = feat_names
if os.path.exists(scaler_path):
try:
with open(scaler_path, "rb") as f:
scaler = pickle.load(f)
except Exception:
scaler = None
if os.path.exists(feats_path):
try:
with open(feats_path, "r", encoding="utf-8") as f:
d = json.load(f)
if isinstance(d, dict) and "features" in d and isinstance(d["features"], list):
feature_order = [str(x) for x in d["features"]]
except Exception:
pass
# Align current X to training feature order if possible
try:
import numpy as np
# Build a mapping from current feature list to desired order
col_to_idx = {c: i for i, c in enumerate(feat_names)}
idxs = [col_to_idx[c] for c in feature_order if c in col_to_idx]
X = X[:, idxs]
except Exception:
pass
if scaler is not None:
Xs = scaler.transform(X)
else:
# Fallback: fit a fresh scaler (not ideal, but functional)
Xs, _ = scale_features(X)
# Load model and predict probabilities
model = load_model(model_dir)
p_all = model.predict(Xs, verbose=0).ravel()
df_probs = df.copy()
df_probs["p_win"] = p_all
s_col, y_col, t_col = _resolve_cols(df_probs)
slice_probs: List[Dict] = []
if all(c is not None for c in (s_col, y_col, t_col)):
# Exact slices: strat+symbol+timeframe
gb = df_probs.groupby([s_col, y_col, t_col])["p_win"].mean().reset_index()
for _, row in gb.iterrows():
p = float(row["p_win"])
sl_s, tp_s, tr_s = compute_scales(p, min_conf, neutral_scales)
# Ensure scales come before p_win for the EA parser
slice_probs.append({
"strategy": str(row[s_col]),
"symbol": str(row[y_col]),
"timeframe": int(row[t_col]) if not pd.isna(row[t_col]) else -1,
"sl_scale": float(sl_s),
"tp_scale": float(tp_s),
"trail_atr_mult": float(tr_s),
"p_win": p,
})
# Aggregated by strat+symbol across all timeframes (timeframe = -1)
gb_sym = df_probs.groupby([s_col, y_col])["p_win"].mean().reset_index()
for _, row in gb_sym.iterrows():
p = float(row["p_win"])
sl_s, tp_s, tr_s = compute_scales(p, min_conf, neutral_scales)
# Skip if an exact slice with tf=-1 already exists (shouldn't) — just add
slice_probs.append({
"strategy": str(row[s_col]),
"symbol": str(row[y_col]),
"timeframe": -1,
"sl_scale": float(sl_s),
"tp_scale": float(tp_s),
"trail_atr_mult": float(tr_s),
"p_win": p,
})
# Aggregated by strategy across all symbols/timeframes (symbol = "*", timeframe = -1)
gb_strat = df_probs.groupby([s_col])["p_win"].mean().reset_index()
for _, row in gb_strat.iterrows():
p = float(row["p_win"])
sl_s, tp_s, tr_s = compute_scales(p, min_conf, neutral_scales)
slice_probs.append({
"strategy": str(row[s_col]),
"symbol": "*",
"timeframe": -1,
"sl_scale": float(sl_s),
"tp_scale": float(tp_s),
"trail_atr_mult": float(tr_s),
"p_win": p,
})
# Ensure trainer-side default coverage for known EA strategies
KNOWN_EA_STRATS = [
"BollAveragesStrategy",
"MeanReversionBBStrategy",
"SuperTrendADXKamaStrategy",
"RSI2BBReversionStrategy",
"DonchianATRBreakoutStrategy",
]
existing = set()
for r in slice_probs:
key = (str(r.get("strategy","")), str(r.get("symbol","")), int(r.get("timeframe", -1)))
existing.add(key)
# Add neutral strat-only aggregates where missing
for strat in KNOWN_EA_STRATS:
key = (strat, "*", -1)
if key not in existing:
# Emit neutral scales explicitly and ensure they appear before p_win
slice_probs.append({
"strategy": strat,
"symbol": "*",
"timeframe": -1,
"sl_scale": 1.0,
"tp_scale": 1.0,
"trail_atr_mult": 1.0,
"p_win": 0.50,
})
path = write_policy(min_confidence=float(min_conf), common_dir=common_dir, slice_probs=slice_probs if slice_probs else None)
return path
if __name__ == "__main__":
ap = argparse.ArgumentParser(description="Export DualEA policy.json from trained model and features.csv")
ap.add_argument("--common", type=str, default=None, help="Override Common Files DualEA dir")
ap.add_argument("--model_dir", type=str, default=os.path.join(os.getcwd(), "artifacts"), help="Directory containing tf_model.keras")
ap.add_argument("--min_conf", type=float, default=0.55, help="Minimum confidence threshold to enforce in policy.json")
ap.add_argument("--no_enrich", action="store_true", help="Disable Yahoo enrichment for speed")
ap.add_argument("--neutral_scales", action="store_true", help="Emit neutral scale fields (1.0)")
args = ap.parse_args()
out = export_policy(args.common, args.model_dir, args.min_conf, enrich=(not args.no_enrich), neutral_scales=args.neutral_scales)
print(f"Saved policy to {out} (min_confidence={args.min_conf:.2f})")