mql5/Experts/Advisors/DualEA/ML/policy_export.py

194 lines
7.7 KiB
Python
Raw Permalink Normal View History

2025-08-10 17:43:21 -04:00
import argparse
import os
import json
import pickle
from typing import Optional, List, Dict
import numpy as np
import pandas as pd
from dataset import load_features
from features import enrich_with_yahoo, build_feature_matrix, scale_features
from model import load_model
from policy import write_policy
def _resolve_cols(df: pd.DataFrame):
strat = "strategy" if "strategy" in df.columns else None
sym = "symbol" if "symbol" in df.columns else ("entry_symbol" if "entry_symbol" in df.columns else None)
tf = "timeframe" if "timeframe" in df.columns else ("tf" if "tf" in df.columns else None)
return strat, sym, tf
# Heuristic mapping from probability to scaling multipliers.
# Note: MQL5 policy parser commits after reading p_win, so we must output
# sl_scale/tp_scale/trail_atr_mult BEFORE p_win in each slice object.
def compute_scales(p: float, min_conf: float, neutral: bool = False):
if neutral or p is None:
return 1.0, 1.0, 1.0
try:
import math
if math.isnan(p):
return 1.0, 1.0, 1.0
except Exception:
pass
hi = max(0.80, float(min_conf) + 0.20)
mid = max(0.60, float(min_conf) + 0.10)
if p >= hi:
return 0.85, 1.35, 1.15
if p >= mid:
return 0.90, 1.20, 1.10
if p >= float(min_conf):
return 1.00, 1.00, 1.00
return 1.00, 1.00, 1.00
def export_policy(common_dir: Optional[str], model_dir: str, min_conf: float, enrich: bool = True, neutral_scales: bool = False) -> str:
# Load features from Common Files
df = load_features(common_dir)
if enrich:
tf_minutes = None
if "timeframe" in df.columns:
try:
tf_series = pd.to_numeric(df["timeframe"], errors="coerce").dropna()
if not tf_series.empty:
tf_minutes = int(tf_series.mode().iloc[0])
except Exception:
tf_minutes = None
df = enrich_with_yahoo(df, target_timeframe_minutes=tf_minutes, tz="UTC", fill_holidays=True)
# Build features matrix with the same preprocessing as training
X, feat_names = build_feature_matrix(df)
# Try to load persisted scaler and feature order
scaler_path = os.path.join(model_dir, "scaler.pkl")
feats_path = os.path.join(model_dir, "features.json")
scaler = None
feature_order: List[str] = feat_names
if os.path.exists(scaler_path):
try:
with open(scaler_path, "rb") as f:
scaler = pickle.load(f)
except Exception:
scaler = None
if os.path.exists(feats_path):
try:
with open(feats_path, "r", encoding="utf-8") as f:
d = json.load(f)
if isinstance(d, dict) and "features" in d and isinstance(d["features"], list):
feature_order = [str(x) for x in d["features"]]
except Exception:
pass
# Align current X to training feature order if possible
try:
import numpy as np
# Build a mapping from current feature list to desired order
col_to_idx = {c: i for i, c in enumerate(feat_names)}
idxs = [col_to_idx[c] for c in feature_order if c in col_to_idx]
X = X[:, idxs]
except Exception:
pass
if scaler is not None:
Xs = scaler.transform(X)
else:
# Fallback: fit a fresh scaler (not ideal, but functional)
Xs, _ = scale_features(X)
# Load model and predict probabilities
model = load_model(model_dir)
p_all = model.predict(Xs, verbose=0).ravel()
df_probs = df.copy()
df_probs["p_win"] = p_all
s_col, y_col, t_col = _resolve_cols(df_probs)
slice_probs: List[Dict] = []
if all(c is not None for c in (s_col, y_col, t_col)):
# Exact slices: strat+symbol+timeframe
gb = df_probs.groupby([s_col, y_col, t_col])["p_win"].mean().reset_index()
for _, row in gb.iterrows():
p = float(row["p_win"])
sl_s, tp_s, tr_s = compute_scales(p, min_conf, neutral_scales)
# Ensure scales come before p_win for the EA parser
slice_probs.append({
"strategy": str(row[s_col]),
"symbol": str(row[y_col]),
"timeframe": int(row[t_col]) if not pd.isna(row[t_col]) else -1,
"sl_scale": float(sl_s),
"tp_scale": float(tp_s),
"trail_atr_mult": float(tr_s),
"p_win": p,
})
# Aggregated by strat+symbol across all timeframes (timeframe = -1)
gb_sym = df_probs.groupby([s_col, y_col])["p_win"].mean().reset_index()
for _, row in gb_sym.iterrows():
p = float(row["p_win"])
sl_s, tp_s, tr_s = compute_scales(p, min_conf, neutral_scales)
# Skip if an exact slice with tf=-1 already exists (shouldn't) — just add
slice_probs.append({
"strategy": str(row[s_col]),
"symbol": str(row[y_col]),
"timeframe": -1,
"sl_scale": float(sl_s),
"tp_scale": float(tp_s),
"trail_atr_mult": float(tr_s),
"p_win": p,
})
# Aggregated by strategy across all symbols/timeframes (symbol = "*", timeframe = -1)
gb_strat = df_probs.groupby([s_col])["p_win"].mean().reset_index()
for _, row in gb_strat.iterrows():
p = float(row["p_win"])
sl_s, tp_s, tr_s = compute_scales(p, min_conf, neutral_scales)
slice_probs.append({
"strategy": str(row[s_col]),
"symbol": "*",
"timeframe": -1,
"sl_scale": float(sl_s),
"tp_scale": float(tp_s),
"trail_atr_mult": float(tr_s),
"p_win": p,
})
# Ensure trainer-side default coverage for known EA strategies
KNOWN_EA_STRATS = [
"BollAveragesStrategy",
"MeanReversionBBStrategy",
"SuperTrendADXKamaStrategy",
"RSI2BBReversionStrategy",
"DonchianATRBreakoutStrategy",
]
existing = set()
for r in slice_probs:
key = (str(r.get("strategy","")), str(r.get("symbol","")), int(r.get("timeframe", -1)))
existing.add(key)
# Add neutral strat-only aggregates where missing
for strat in KNOWN_EA_STRATS:
key = (strat, "*", -1)
if key not in existing:
# Emit neutral scales explicitly and ensure they appear before p_win
slice_probs.append({
"strategy": strat,
"symbol": "*",
"timeframe": -1,
"sl_scale": 1.0,
"tp_scale": 1.0,
"trail_atr_mult": 1.0,
"p_win": 0.50,
})
path = write_policy(min_confidence=float(min_conf), common_dir=common_dir, slice_probs=slice_probs if slice_probs else None)
return path
if __name__ == "__main__":
ap = argparse.ArgumentParser(description="Export DualEA policy.json from trained model and features.csv")
ap.add_argument("--common", type=str, default=None, help="Override Common Files DualEA dir")
ap.add_argument("--model_dir", type=str, default=os.path.join(os.getcwd(), "artifacts"), help="Directory containing tf_model.keras")
ap.add_argument("--min_conf", type=float, default=0.55, help="Minimum confidence threshold to enforce in policy.json")
ap.add_argument("--no_enrich", action="store_true", help="Disable Yahoo enrichment for speed")
ap.add_argument("--neutral_scales", action="store_true", help="Emit neutral scale fields (1.0)")
args = ap.parse_args()
out = export_policy(args.common, args.model_dir, args.min_conf, enrich=(not args.no_enrich), neutral_scales=args.neutral_scales)
print(f"Saved policy to {out} (min_confidence={args.min_conf:.2f})")