mql5/Experts/Advisors/DualEA/ML/snapshot_dataset.py
Princeec13 12eac37d58
2026-02-04 14:28:59 -05:00

375 lines
12 KiB
Python

import glob
import os
from dataclasses import dataclass
from typing import Dict, List, Optional, Sequence, Tuple
import numpy as np
import pandas as pd
COMMON_FILES_DIR = os.path.join(
os.environ.get("APPDATA", ""),
"MetaQuotes",
"Terminal",
"Common",
"Files",
)
SNAPSHOT_PATTERN = "feature_batch_*.txt"
NUMERIC_FIELDS: Sequence[str] = [
"price",
"volume",
"sl",
"tp",
"confidence",
"volatility",
"correlation",
"hour",
"timeframe",
"order_type",
"gate_passed_count",
"adjustment_attempts",
"volume_change_pct",
"price_change_pct",
"sl_scale",
"tp_scale",
]
BINARY_FIELDS: Sequence[str] = [
"executed",
"is_adjusted",
"gate1_pass",
"gate2_pass",
"gate3_pass",
"gate4_pass",
"gate5_pass",
"gate6_pass",
"gate7_pass",
"gate8_pass",
]
CATEGORICAL_FIELDS: Sequence[str] = [
"strategy",
"symbol",
"status",
"regime",
"market_regime",
"reason",
]
TIMESTAMP_FIELD = "timestamp"
def list_snapshot_files(
snapshot_dir: Optional[str] = None,
pattern: str = SNAPSHOT_PATTERN,
) -> List[str]:
base = snapshot_dir or COMMON_FILES_DIR
search_path = os.path.join(base, pattern)
files = sorted(glob.glob(search_path))
return files
def _parse_snapshot_line(line: str, idx: int) -> Tuple[str, str]:
line = line.strip()
if not line:
return (f"empty_{idx}", "")
if ":" not in line:
return (f"field_{idx}", line)
key, value = line.split(":", 1)
return key.strip().lower(), value.strip()
def _read_text_lines_with_fallback(path: str) -> List[str]:
"""Read a text file that may be UTF-8 or UTF-16 from MQL5 FILE_TXT.
MQL5 FILE_TXT writes Unicode (UTF-16) by default, which will fail if we
try to open it as UTF-8. To be robust against both historical and new
snapshots, we read bytes and try several common encodings.
"""
with open(path, "rb") as handle:
data = handle.read()
# Try the most likely encodings in order. utf-8-sig handles BOM if present.
for encoding in ("utf-8-sig", "utf-16", "utf-16-le", "utf-16-be", "cp1252"):
try:
text = data.decode(encoding)
return text.splitlines()
except UnicodeDecodeError:
continue
# If everything fails, re-raise a readable error for diagnostics.
raise UnicodeDecodeError(
"snapshot", b"", 0, 1, f"unable to decode {path!r} with common encodings"
)
def parse_snapshot_file(path: str) -> Dict[str, str]:
record: Dict[str, str] = {}
lines = _read_text_lines_with_fallback(path)
for idx, raw in enumerate(lines):
key, value = _parse_snapshot_line(raw, idx)
record[key] = value
record["__source_file"] = os.path.basename(path)
record["__source_path"] = path
return record
def load_snapshot_dataframe(
snapshot_dir: Optional[str] = None,
limit: Optional[int] = None,
) -> pd.DataFrame:
files = list_snapshot_files(snapshot_dir)
if not files:
raise RuntimeError(
f"No snapshot files found in {(snapshot_dir or COMMON_FILES_DIR)!r}"
)
if limit:
files = files[-limit:]
records: List[Dict[str, str]] = []
for path in files:
try:
rec = parse_snapshot_file(path)
records.append(rec)
except Exception as exc: # pragma: no cover - diagnostic path
print(f"[WARN] Failed to parse snapshot {path}: {exc}")
if not records:
raise RuntimeError("Snapshot parsing produced zero records")
df = pd.DataFrame(records)
df = normalize_snapshot_dataframe(df)
return df
def _to_binary(value: object) -> float:
if value is None:
return 0.0
if isinstance(value, (int, float)):
# Treat NaN as 0.0; otherwise non-zero numeric as 1.0
try:
import math
if isinstance(value, float) and math.isnan(value):
return 0.0
except Exception:
pass
return float(1.0 if value != 0 else 0.0)
s = str(value).strip().lower()
if s in {"1", "true", "yes", "y", "passed", "executed"}:
return 1.0
if s in {"0", "false", "no", "n", ""}:
return 0.0
try:
return float(1.0 if float(s) != 0 else 0.0)
except Exception:
return 0.0
def normalize_snapshot_dataframe(df: pd.DataFrame) -> pd.DataFrame:
df = df.copy()
if TIMESTAMP_FIELD in df.columns:
df[TIMESTAMP_FIELD] = pd.to_datetime(
df[TIMESTAMP_FIELD], errors="coerce", utc=True
)
else:
df[TIMESTAMP_FIELD] = pd.NaT
for field in NUMERIC_FIELDS:
df[field] = pd.to_numeric(df.get(field), errors="coerce")
for field in BINARY_FIELDS:
# df.get(..., np.nan) returns a scalar when the column is missing,
# which breaks .apply(). Construct a proper Series instead.
if field in df.columns:
series = df[field]
else:
series = pd.Series(np.nan, index=df.index)
df[field] = series.apply(_to_binary).astype(float)
for field in CATEGORICAL_FIELDS:
# df.get(..., "unknown") can return a scalar if the column is missing.
# Always work with a Series aligned to df.index.
if field in df.columns:
series = df[field]
else:
series = pd.Series("unknown", index=df.index)
df[field] = (
series
.fillna("unknown")
.astype(str)
.str.strip()
.str.lower()
)
# Normalized status column; same scalar-safe pattern as other categoricals.
if "status" in df.columns:
status_series = df["status"]
else:
status_series = pd.Series("unknown", index=df.index)
df["status"] = (
status_series
.fillna("unknown")
.astype(str)
.str.strip()
.str.lower()
)
return df
def label_from_status(
df: pd.DataFrame,
positive_status: Optional[Sequence[str]] = None,
) -> pd.Series:
if positive_status is None:
positive_status = ("passed", "executed")
positives = {s.lower() for s in positive_status}
labels = df["status"].isin(positives).astype(int)
return labels
def _load_price_frame(price_csv: str) -> pd.DataFrame:
price_df = pd.read_csv(price_csv)
price_df.columns = [
str(c).strip().lower().replace(" ", "_") for c in price_df.columns
]
if "timestamp" not in price_df.columns:
raise ValueError("price_csv must contain a 'timestamp' column")
price_df["timestamp"] = pd.to_datetime(price_df["timestamp"], errors="coerce", utc=True)
if "symbol" not in price_df.columns:
price_df["symbol"] = "*"
if "close" not in price_df.columns:
raise ValueError("price_csv must contain a 'close' column")
price_df = price_df.dropna(subset=["timestamp", "close"])
price_df = price_df.sort_values(["symbol", "timestamp"]).reset_index(drop=True)
return price_df
def _future_price_for_symbol(
price_df: pd.DataFrame,
symbol: str,
target_time: pd.Timestamp,
) -> Optional[float]:
subset = price_df[price_df["symbol"] == symbol]
if subset.empty:
return None
future = subset[subset["timestamp"] >= target_time]
if future.empty:
return None
return float(future.iloc[0]["close"])
def labels_from_future_prices(
df: pd.DataFrame,
price_csv: str,
horizon_minutes: int = 60,
target_r_multiple: float = 0.0,
) -> pd.Series:
"""
Compute labels based on future price movement relative to SL distance.
Label = 1 if future R multiple >= target_r_multiple, else 0.
"""
price_df = _load_price_frame(price_csv)
labels = pd.Series(index=df.index, dtype=float)
for idx, row in df.iterrows():
ts = row.get(TIMESTAMP_FIELD)
symbol = row.get("symbol", "*")
price = row.get("price")
sl = row.get("sl")
order_type = row.get("order_type", 0)
if pd.isna(ts) or pd.isna(price) or pd.isna(sl):
labels.loc[idx] = np.nan
continue
target_time = ts + pd.Timedelta(minutes=horizon_minutes)
future_price = _future_price_for_symbol(price_df, symbol, target_time)
if future_price is None:
labels.loc[idx] = np.nan
continue
risk = abs(price - sl)
if risk <= 0:
labels.loc[idx] = np.nan
continue
if int(order_type) == 1: # sell
reward = price - future_price
else: # buy default
reward = future_price - price
r_multiple = reward / risk
labels.loc[idx] = 1 if r_multiple >= target_r_multiple else 0
labels = labels.dropna()
return labels.astype(int)
@dataclass
class FeatureMatrix:
X: np.ndarray
feature_names: List[str]
categorical_mappings: Dict[str, Dict[str, int]]
def build_feature_matrix(
df: pd.DataFrame,
numeric_fields: Sequence[str] = NUMERIC_FIELDS,
binary_fields: Sequence[str] = BINARY_FIELDS,
categorical_fields: Sequence[str] = CATEGORICAL_FIELDS,
) -> FeatureMatrix:
working = df.copy()
for field in numeric_fields:
if field not in working.columns:
working[field] = np.nan
for field in binary_fields:
if field not in working.columns:
working[field] = 0.0
numeric_matrix = working[list(numeric_fields)].fillna(0.0).astype(float)
binary_matrix = working[list(binary_fields)].fillna(0.0).astype(float)
categorical_mappings: Dict[str, Dict[str, int]] = {}
categorical_columns: List[pd.Series] = []
for field in categorical_fields:
values = (
working.get(field, "unknown")
.fillna("unknown")
.astype(str)
.str.strip()
.str.lower()
)
unique_vals = sorted(values.unique())
mapping = {val: idx + 1 for idx, val in enumerate(unique_vals)}
categorical_mappings[field] = mapping
categorical_columns.append(values.map(mapping).fillna(0).astype(float))
arrays = [numeric_matrix.values, binary_matrix.values]
feature_names = list(numeric_fields) + list(binary_fields)
for field, series in zip(categorical_fields, categorical_columns):
arrays.append(series.values.reshape(-1, 1))
feature_names.append(f"{field}_code")
# Build raw matrix in float64 for stability, then sanitize and downcast.
X = np.hstack(arrays).astype(np.float64)
# Replace NaN/inf with finite values and clip extreme magnitudes so that
# downstream scalers do not choke on pathological inputs from malformed
# or legacy snapshots.
X = np.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0)
max_abs = 1e9
X = np.clip(X, -max_abs, max_abs)
X = X.astype(np.float32)
return FeatureMatrix(X=X, feature_names=feature_names, categorical_mappings=categorical_mappings)
def describe_dataframe(df: pd.DataFrame) -> str:
total = len(df)
symbols = sorted(df["symbol"].dropna().unique())
strategies = sorted(df["strategy"].dropna().unique())
statuses = df["status"].value_counts().to_dict()
return (
f"records={total} symbols={symbols} "
f"strategies={strategies} status_counts={statuses}"
)
def main(): # pragma: no cover - convenience CLI
import argparse
parser = argparse.ArgumentParser(description="Inspect snapshot dataset")
parser.add_argument("--snapshot_dir", type=str, default=None)
parser.add_argument("--limit", type=int, default=None)
args = parser.parse_args()
df = load_snapshot_dataframe(args.snapshot_dir, args.limit)
print(describe_dataframe(df))
print(df.head())
if __name__ == "__main__": # pragma: no cover
main()