2026-01-14 16:17:00 -05:00
|
|
|
import glob
|
|
|
|
|
import os
|
|
|
|
|
from dataclasses import dataclass
|
|
|
|
|
from typing import Dict, List, Optional, Sequence, Tuple
|
|
|
|
|
|
|
|
|
|
import numpy as np
|
|
|
|
|
import pandas as pd
|
|
|
|
|
|
|
|
|
|
COMMON_FILES_DIR = os.path.join(
|
|
|
|
|
os.environ.get("APPDATA", ""),
|
|
|
|
|
"MetaQuotes",
|
|
|
|
|
"Terminal",
|
|
|
|
|
"Common",
|
|
|
|
|
"Files",
|
|
|
|
|
)
|
|
|
|
|
SNAPSHOT_PATTERN = "feature_batch_*.txt"
|
|
|
|
|
|
|
|
|
|
NUMERIC_FIELDS: Sequence[str] = [
|
|
|
|
|
"price",
|
|
|
|
|
"volume",
|
|
|
|
|
"sl",
|
|
|
|
|
"tp",
|
|
|
|
|
"confidence",
|
|
|
|
|
"volatility",
|
|
|
|
|
"correlation",
|
|
|
|
|
"hour",
|
|
|
|
|
"timeframe",
|
|
|
|
|
"order_type",
|
|
|
|
|
"gate_passed_count",
|
|
|
|
|
"adjustment_attempts",
|
|
|
|
|
"volume_change_pct",
|
|
|
|
|
"price_change_pct",
|
|
|
|
|
"sl_scale",
|
|
|
|
|
"tp_scale",
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
BINARY_FIELDS: Sequence[str] = [
|
|
|
|
|
"executed",
|
|
|
|
|
"is_adjusted",
|
|
|
|
|
"gate1_pass",
|
|
|
|
|
"gate2_pass",
|
|
|
|
|
"gate3_pass",
|
|
|
|
|
"gate4_pass",
|
|
|
|
|
"gate5_pass",
|
|
|
|
|
"gate6_pass",
|
|
|
|
|
"gate7_pass",
|
|
|
|
|
"gate8_pass",
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
CATEGORICAL_FIELDS: Sequence[str] = [
|
|
|
|
|
"strategy",
|
|
|
|
|
"symbol",
|
|
|
|
|
"status",
|
|
|
|
|
"regime",
|
|
|
|
|
"market_regime",
|
|
|
|
|
"reason",
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
TIMESTAMP_FIELD = "timestamp"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def list_snapshot_files(
|
|
|
|
|
snapshot_dir: Optional[str] = None,
|
|
|
|
|
pattern: str = SNAPSHOT_PATTERN,
|
|
|
|
|
) -> List[str]:
|
|
|
|
|
base = snapshot_dir or COMMON_FILES_DIR
|
|
|
|
|
search_path = os.path.join(base, pattern)
|
|
|
|
|
files = sorted(glob.glob(search_path))
|
|
|
|
|
return files
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _parse_snapshot_line(line: str, idx: int) -> Tuple[str, str]:
|
|
|
|
|
line = line.strip()
|
|
|
|
|
if not line:
|
|
|
|
|
return (f"empty_{idx}", "")
|
|
|
|
|
if ":" not in line:
|
|
|
|
|
return (f"field_{idx}", line)
|
|
|
|
|
key, value = line.split(":", 1)
|
|
|
|
|
return key.strip().lower(), value.strip()
|
|
|
|
|
|
|
|
|
|
|
2026-02-04 14:28:59 -05:00
|
|
|
def _read_text_lines_with_fallback(path: str) -> List[str]:
|
|
|
|
|
"""Read a text file that may be UTF-8 or UTF-16 from MQL5 FILE_TXT.
|
|
|
|
|
|
|
|
|
|
MQL5 FILE_TXT writes Unicode (UTF-16) by default, which will fail if we
|
|
|
|
|
try to open it as UTF-8. To be robust against both historical and new
|
|
|
|
|
snapshots, we read bytes and try several common encodings.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
with open(path, "rb") as handle:
|
|
|
|
|
data = handle.read()
|
|
|
|
|
|
|
|
|
|
# Try the most likely encodings in order. utf-8-sig handles BOM if present.
|
|
|
|
|
for encoding in ("utf-8-sig", "utf-16", "utf-16-le", "utf-16-be", "cp1252"):
|
|
|
|
|
try:
|
|
|
|
|
text = data.decode(encoding)
|
|
|
|
|
return text.splitlines()
|
|
|
|
|
except UnicodeDecodeError:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# If everything fails, re-raise a readable error for diagnostics.
|
|
|
|
|
raise UnicodeDecodeError(
|
|
|
|
|
"snapshot", b"", 0, 1, f"unable to decode {path!r} with common encodings"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
2026-01-14 16:17:00 -05:00
|
|
|
def parse_snapshot_file(path: str) -> Dict[str, str]:
|
|
|
|
|
record: Dict[str, str] = {}
|
2026-02-04 14:28:59 -05:00
|
|
|
lines = _read_text_lines_with_fallback(path)
|
|
|
|
|
for idx, raw in enumerate(lines):
|
|
|
|
|
key, value = _parse_snapshot_line(raw, idx)
|
|
|
|
|
record[key] = value
|
2026-01-14 16:17:00 -05:00
|
|
|
record["__source_file"] = os.path.basename(path)
|
|
|
|
|
record["__source_path"] = path
|
|
|
|
|
return record
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def load_snapshot_dataframe(
|
|
|
|
|
snapshot_dir: Optional[str] = None,
|
|
|
|
|
limit: Optional[int] = None,
|
|
|
|
|
) -> pd.DataFrame:
|
|
|
|
|
files = list_snapshot_files(snapshot_dir)
|
|
|
|
|
if not files:
|
|
|
|
|
raise RuntimeError(
|
|
|
|
|
f"No snapshot files found in {(snapshot_dir or COMMON_FILES_DIR)!r}"
|
|
|
|
|
)
|
|
|
|
|
if limit:
|
|
|
|
|
files = files[-limit:]
|
|
|
|
|
records: List[Dict[str, str]] = []
|
|
|
|
|
for path in files:
|
|
|
|
|
try:
|
|
|
|
|
rec = parse_snapshot_file(path)
|
|
|
|
|
records.append(rec)
|
|
|
|
|
except Exception as exc: # pragma: no cover - diagnostic path
|
|
|
|
|
print(f"[WARN] Failed to parse snapshot {path}: {exc}")
|
|
|
|
|
if not records:
|
|
|
|
|
raise RuntimeError("Snapshot parsing produced zero records")
|
|
|
|
|
df = pd.DataFrame(records)
|
|
|
|
|
df = normalize_snapshot_dataframe(df)
|
|
|
|
|
return df
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _to_binary(value: object) -> float:
|
|
|
|
|
if value is None:
|
|
|
|
|
return 0.0
|
|
|
|
|
if isinstance(value, (int, float)):
|
2026-02-04 14:28:59 -05:00
|
|
|
# Treat NaN as 0.0; otherwise non-zero numeric as 1.0
|
|
|
|
|
try:
|
|
|
|
|
import math
|
|
|
|
|
|
|
|
|
|
if isinstance(value, float) and math.isnan(value):
|
|
|
|
|
return 0.0
|
|
|
|
|
except Exception:
|
|
|
|
|
pass
|
2026-01-14 16:17:00 -05:00
|
|
|
return float(1.0 if value != 0 else 0.0)
|
|
|
|
|
s = str(value).strip().lower()
|
|
|
|
|
if s in {"1", "true", "yes", "y", "passed", "executed"}:
|
|
|
|
|
return 1.0
|
|
|
|
|
if s in {"0", "false", "no", "n", ""}:
|
|
|
|
|
return 0.0
|
|
|
|
|
try:
|
|
|
|
|
return float(1.0 if float(s) != 0 else 0.0)
|
|
|
|
|
except Exception:
|
|
|
|
|
return 0.0
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def normalize_snapshot_dataframe(df: pd.DataFrame) -> pd.DataFrame:
|
|
|
|
|
df = df.copy()
|
|
|
|
|
if TIMESTAMP_FIELD in df.columns:
|
|
|
|
|
df[TIMESTAMP_FIELD] = pd.to_datetime(
|
|
|
|
|
df[TIMESTAMP_FIELD], errors="coerce", utc=True
|
|
|
|
|
)
|
|
|
|
|
else:
|
|
|
|
|
df[TIMESTAMP_FIELD] = pd.NaT
|
|
|
|
|
for field in NUMERIC_FIELDS:
|
|
|
|
|
df[field] = pd.to_numeric(df.get(field), errors="coerce")
|
|
|
|
|
for field in BINARY_FIELDS:
|
2026-02-04 14:28:59 -05:00
|
|
|
# df.get(..., np.nan) returns a scalar when the column is missing,
|
|
|
|
|
# which breaks .apply(). Construct a proper Series instead.
|
|
|
|
|
if field in df.columns:
|
|
|
|
|
series = df[field]
|
|
|
|
|
else:
|
|
|
|
|
series = pd.Series(np.nan, index=df.index)
|
|
|
|
|
df[field] = series.apply(_to_binary).astype(float)
|
2026-01-14 16:17:00 -05:00
|
|
|
for field in CATEGORICAL_FIELDS:
|
2026-02-04 14:28:59 -05:00
|
|
|
# df.get(..., "unknown") can return a scalar if the column is missing.
|
|
|
|
|
# Always work with a Series aligned to df.index.
|
|
|
|
|
if field in df.columns:
|
|
|
|
|
series = df[field]
|
|
|
|
|
else:
|
|
|
|
|
series = pd.Series("unknown", index=df.index)
|
2026-01-14 16:17:00 -05:00
|
|
|
df[field] = (
|
2026-02-04 14:28:59 -05:00
|
|
|
series
|
2026-01-14 16:17:00 -05:00
|
|
|
.fillna("unknown")
|
|
|
|
|
.astype(str)
|
|
|
|
|
.str.strip()
|
|
|
|
|
.str.lower()
|
|
|
|
|
)
|
2026-02-04 14:28:59 -05:00
|
|
|
|
|
|
|
|
# Normalized status column; same scalar-safe pattern as other categoricals.
|
|
|
|
|
if "status" in df.columns:
|
|
|
|
|
status_series = df["status"]
|
|
|
|
|
else:
|
|
|
|
|
status_series = pd.Series("unknown", index=df.index)
|
|
|
|
|
df["status"] = (
|
|
|
|
|
status_series
|
|
|
|
|
.fillna("unknown")
|
|
|
|
|
.astype(str)
|
|
|
|
|
.str.strip()
|
|
|
|
|
.str.lower()
|
|
|
|
|
)
|
2026-01-14 16:17:00 -05:00
|
|
|
return df
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def label_from_status(
|
|
|
|
|
df: pd.DataFrame,
|
|
|
|
|
positive_status: Optional[Sequence[str]] = None,
|
|
|
|
|
) -> pd.Series:
|
|
|
|
|
if positive_status is None:
|
|
|
|
|
positive_status = ("passed", "executed")
|
|
|
|
|
positives = {s.lower() for s in positive_status}
|
|
|
|
|
labels = df["status"].isin(positives).astype(int)
|
|
|
|
|
return labels
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _load_price_frame(price_csv: str) -> pd.DataFrame:
|
|
|
|
|
price_df = pd.read_csv(price_csv)
|
|
|
|
|
price_df.columns = [
|
|
|
|
|
str(c).strip().lower().replace(" ", "_") for c in price_df.columns
|
|
|
|
|
]
|
|
|
|
|
if "timestamp" not in price_df.columns:
|
|
|
|
|
raise ValueError("price_csv must contain a 'timestamp' column")
|
|
|
|
|
price_df["timestamp"] = pd.to_datetime(price_df["timestamp"], errors="coerce", utc=True)
|
|
|
|
|
if "symbol" not in price_df.columns:
|
|
|
|
|
price_df["symbol"] = "*"
|
|
|
|
|
if "close" not in price_df.columns:
|
|
|
|
|
raise ValueError("price_csv must contain a 'close' column")
|
|
|
|
|
price_df = price_df.dropna(subset=["timestamp", "close"])
|
|
|
|
|
price_df = price_df.sort_values(["symbol", "timestamp"]).reset_index(drop=True)
|
|
|
|
|
return price_df
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _future_price_for_symbol(
|
|
|
|
|
price_df: pd.DataFrame,
|
|
|
|
|
symbol: str,
|
|
|
|
|
target_time: pd.Timestamp,
|
|
|
|
|
) -> Optional[float]:
|
|
|
|
|
subset = price_df[price_df["symbol"] == symbol]
|
|
|
|
|
if subset.empty:
|
|
|
|
|
return None
|
|
|
|
|
future = subset[subset["timestamp"] >= target_time]
|
|
|
|
|
if future.empty:
|
|
|
|
|
return None
|
|
|
|
|
return float(future.iloc[0]["close"])
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def labels_from_future_prices(
|
|
|
|
|
df: pd.DataFrame,
|
|
|
|
|
price_csv: str,
|
|
|
|
|
horizon_minutes: int = 60,
|
|
|
|
|
target_r_multiple: float = 0.0,
|
|
|
|
|
) -> pd.Series:
|
|
|
|
|
"""
|
|
|
|
|
Compute labels based on future price movement relative to SL distance.
|
|
|
|
|
Label = 1 if future R multiple >= target_r_multiple, else 0.
|
|
|
|
|
"""
|
|
|
|
|
price_df = _load_price_frame(price_csv)
|
|
|
|
|
labels = pd.Series(index=df.index, dtype=float)
|
|
|
|
|
for idx, row in df.iterrows():
|
|
|
|
|
ts = row.get(TIMESTAMP_FIELD)
|
|
|
|
|
symbol = row.get("symbol", "*")
|
|
|
|
|
price = row.get("price")
|
|
|
|
|
sl = row.get("sl")
|
|
|
|
|
order_type = row.get("order_type", 0)
|
|
|
|
|
if pd.isna(ts) or pd.isna(price) or pd.isna(sl):
|
|
|
|
|
labels.loc[idx] = np.nan
|
|
|
|
|
continue
|
|
|
|
|
target_time = ts + pd.Timedelta(minutes=horizon_minutes)
|
|
|
|
|
future_price = _future_price_for_symbol(price_df, symbol, target_time)
|
|
|
|
|
if future_price is None:
|
|
|
|
|
labels.loc[idx] = np.nan
|
|
|
|
|
continue
|
|
|
|
|
risk = abs(price - sl)
|
|
|
|
|
if risk <= 0:
|
|
|
|
|
labels.loc[idx] = np.nan
|
|
|
|
|
continue
|
|
|
|
|
if int(order_type) == 1: # sell
|
|
|
|
|
reward = price - future_price
|
|
|
|
|
else: # buy default
|
|
|
|
|
reward = future_price - price
|
|
|
|
|
r_multiple = reward / risk
|
|
|
|
|
labels.loc[idx] = 1 if r_multiple >= target_r_multiple else 0
|
|
|
|
|
labels = labels.dropna()
|
|
|
|
|
return labels.astype(int)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
|
class FeatureMatrix:
|
|
|
|
|
X: np.ndarray
|
|
|
|
|
feature_names: List[str]
|
|
|
|
|
categorical_mappings: Dict[str, Dict[str, int]]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def build_feature_matrix(
|
|
|
|
|
df: pd.DataFrame,
|
|
|
|
|
numeric_fields: Sequence[str] = NUMERIC_FIELDS,
|
|
|
|
|
binary_fields: Sequence[str] = BINARY_FIELDS,
|
|
|
|
|
categorical_fields: Sequence[str] = CATEGORICAL_FIELDS,
|
|
|
|
|
) -> FeatureMatrix:
|
|
|
|
|
working = df.copy()
|
|
|
|
|
for field in numeric_fields:
|
|
|
|
|
if field not in working.columns:
|
|
|
|
|
working[field] = np.nan
|
|
|
|
|
for field in binary_fields:
|
|
|
|
|
if field not in working.columns:
|
|
|
|
|
working[field] = 0.0
|
|
|
|
|
numeric_matrix = working[list(numeric_fields)].fillna(0.0).astype(float)
|
|
|
|
|
binary_matrix = working[list(binary_fields)].fillna(0.0).astype(float)
|
|
|
|
|
categorical_mappings: Dict[str, Dict[str, int]] = {}
|
|
|
|
|
categorical_columns: List[pd.Series] = []
|
|
|
|
|
for field in categorical_fields:
|
|
|
|
|
values = (
|
|
|
|
|
working.get(field, "unknown")
|
|
|
|
|
.fillna("unknown")
|
|
|
|
|
.astype(str)
|
|
|
|
|
.str.strip()
|
|
|
|
|
.str.lower()
|
|
|
|
|
)
|
|
|
|
|
unique_vals = sorted(values.unique())
|
|
|
|
|
mapping = {val: idx + 1 for idx, val in enumerate(unique_vals)}
|
|
|
|
|
categorical_mappings[field] = mapping
|
|
|
|
|
categorical_columns.append(values.map(mapping).fillna(0).astype(float))
|
|
|
|
|
arrays = [numeric_matrix.values, binary_matrix.values]
|
|
|
|
|
feature_names = list(numeric_fields) + list(binary_fields)
|
|
|
|
|
for field, series in zip(categorical_fields, categorical_columns):
|
|
|
|
|
arrays.append(series.values.reshape(-1, 1))
|
|
|
|
|
feature_names.append(f"{field}_code")
|
2026-02-04 14:28:59 -05:00
|
|
|
|
|
|
|
|
# Build raw matrix in float64 for stability, then sanitize and downcast.
|
|
|
|
|
X = np.hstack(arrays).astype(np.float64)
|
|
|
|
|
# Replace NaN/inf with finite values and clip extreme magnitudes so that
|
|
|
|
|
# downstream scalers do not choke on pathological inputs from malformed
|
|
|
|
|
# or legacy snapshots.
|
|
|
|
|
X = np.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0)
|
|
|
|
|
max_abs = 1e9
|
|
|
|
|
X = np.clip(X, -max_abs, max_abs)
|
|
|
|
|
X = X.astype(np.float32)
|
2026-01-14 16:17:00 -05:00
|
|
|
return FeatureMatrix(X=X, feature_names=feature_names, categorical_mappings=categorical_mappings)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def describe_dataframe(df: pd.DataFrame) -> str:
|
|
|
|
|
total = len(df)
|
|
|
|
|
symbols = sorted(df["symbol"].dropna().unique())
|
|
|
|
|
strategies = sorted(df["strategy"].dropna().unique())
|
|
|
|
|
statuses = df["status"].value_counts().to_dict()
|
|
|
|
|
return (
|
|
|
|
|
f"records={total} symbols={symbols} "
|
|
|
|
|
f"strategies={strategies} status_counts={statuses}"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main(): # pragma: no cover - convenience CLI
|
|
|
|
|
import argparse
|
|
|
|
|
|
|
|
|
|
parser = argparse.ArgumentParser(description="Inspect snapshot dataset")
|
|
|
|
|
parser.add_argument("--snapshot_dir", type=str, default=None)
|
|
|
|
|
parser.add_argument("--limit", type=int, default=None)
|
|
|
|
|
args = parser.parse_args()
|
|
|
|
|
df = load_snapshot_dataframe(args.snapshot_dir, args.limit)
|
|
|
|
|
print(describe_dataframe(df))
|
|
|
|
|
print(df.head())
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": # pragma: no cover
|
|
|
|
|
main()
|