mql5/Experts/Advisors/DualEA/python/feature_export.py
2025-09-24 15:11:56 -04:00

80 lines
2.9 KiB
Python

import os
import pyarrow as pa
import pyarrow.parquet as pq
from google.protobuf.json_format import MessageToDict
from dual_ea_pb2 import FeatureBatchEnvelope
EXPORT_DIR = r"C:\DualEA_FeatureBatches" # Change as needed
os.makedirs(EXPORT_DIR, exist_ok=True)
# Helper: recursively convert protobuf FeatureValue to Python native
# Supports scalar, array, or nested object
def feature_value_to_py(val):
if val.HasField("number"):
return val.number
if val.HasField("text"):
return val.text
if val.HasField("bool_value"):
return val.bool_value
if val.HasField("array"):
return [feature_value_to_py(x) for x in val.array.items]
if val.HasField("object"):
return {k: feature_value_to_py(v) for k, v in val.object.fields.items()}
# Forward compatibility: unknown
if val.HasField("unknown"):
return val.unknown
return None
def feature_batch_to_py(batch):
return {
"symbol": batch.symbol,
"strategy": batch.strategy,
"timestamp": batch.timestamp,
"features": [
{
"feature_name": f.feature_name,
"value": feature_value_to_py(f.value)
} for f in batch.features
],
"extra": {k: feature_value_to_py(v) for k, v in batch.extra.items()}
}
def envelope_to_py(envelope):
return [feature_batch_to_py(b) for b in envelope.batches]
def write_batches_to_parquet(pb_bytes, fname=None):
"""
Accepts raw protobuf bytes (FeatureBatchEnvelope), writes Arrow/Parquet.
Retries up to 3x, falls back to CSV if needed.
"""
import dual_ea_pb2
import time
from google.protobuf.message import DecodeError
for attempt in range(3):
try:
envelope = dual_ea_pb2.FeatureBatchEnvelope()
envelope.ParseFromString(pb_bytes)
py_batches = envelope_to_py(envelope)
table = pa.Table.from_pylist(py_batches)
fname = fname or f"feature_batch_{int(time.time())}.parquet"
path = os.path.join(EXPORT_DIR, fname)
pq.write_table(table, path)
return path
except Exception as e:
if attempt == 2:
# Fallback to CSV
import csv
csv_fname = fname or f"feature_batch_{int(time.time())}.csv"
csv_path = os.path.join(EXPORT_DIR, csv_fname)
with open(csv_path, "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(["symbol", "strategy", "timestamp", "features", "extra"])
for row in py_batches:
writer.writerow([
row["symbol"], row["strategy"], row["timestamp"],
str(row["features"]), str(row["extra"])
])
return csv_path
time.sleep(0.2)
raise RuntimeError("Failed to export feature batch after retries.")