164 lines
5 KiB
Python
164 lines
5 KiB
Python
# Copyright 2025, MetaQuotes Ltd.
|
|
# https://www.mql5.com/en/users/johnhlomohang/
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
import torch
|
|
from sklearn.preprocessing import StandardScaler
|
|
import joblib
|
|
|
|
from Model import EntropyModel
|
|
from Features import build_features
|
|
|
|
# ------------------------------
|
|
# 1. Load CSV data
|
|
# ------------------------------
|
|
CSV_FILE = "XAUUSD_H1.csv"
|
|
|
|
try:
|
|
df = pd.read_csv(CSV_FILE)
|
|
print(f"Loaded {len(df)} rows from {CSV_FILE}")
|
|
except FileNotFoundError:
|
|
print(f"Error: {CSV_FILE} not found. Run 'Getting Hist Data.py' first.")
|
|
exit()
|
|
|
|
# Extract prices
|
|
if 'close' not in df.columns:
|
|
print("Error: CSV must contain a 'close' column.")
|
|
exit()
|
|
|
|
prices = df['close'].dropna().values.astype(np.float32)
|
|
high_prices = df['high'].values.astype(np.float32) if 'high' in df.columns else None
|
|
low_prices = df['low'].values.astype(np.float32) if 'low' in df.columns else None
|
|
|
|
print(f"Price data shape: {prices.shape}")
|
|
if high_prices is not None:
|
|
print(f"High/Low data available")
|
|
|
|
# ------------------------------
|
|
# 2. Feature Engineering
|
|
# ------------------------------
|
|
WINDOW = 50
|
|
HORIZON = 5
|
|
|
|
X, y = [], []
|
|
|
|
# Calculate simple RSI for each point
|
|
def calculate_rsi(prices, period=14):
|
|
if len(prices) < period + 1:
|
|
return 50.0
|
|
deltas = np.diff(prices)
|
|
seed = deltas[:period+1]
|
|
up = seed[seed >= 0].sum() / period
|
|
down = -seed[seed < 0].sum() / period
|
|
if down == 0:
|
|
return 100.0
|
|
rs = up / down
|
|
return 100.0 - (100.0 / (1.0 + rs))
|
|
|
|
for i in range(WINDOW, len(prices) - HORIZON):
|
|
window = prices[i - WINDOW:i]
|
|
|
|
# Get high/low for window if available
|
|
window_high = high_prices[i - WINDOW:i] if high_prices is not None else None
|
|
window_low = low_prices[i - WINDOW:i] if low_prices is not None else None
|
|
|
|
# Calculate RSI for the window
|
|
rsi = calculate_rsi(window, 14)
|
|
|
|
# Call build_features with all 4 arguments
|
|
features, metrics = build_features(
|
|
prices=window, # positional or keyword
|
|
rsi=rsi, # positional or keyword
|
|
high_prices=window_high, # keyword
|
|
low_prices=window_low # keyword
|
|
)
|
|
|
|
# Target: future return direction
|
|
future_return = np.log(prices[i + HORIZON] / prices[i])
|
|
label = 1 if future_return > 0 else 0
|
|
|
|
X.append(features)
|
|
y.append(label)
|
|
|
|
X = np.array(X, dtype=np.float32)
|
|
y = np.array(y, dtype=np.float32)
|
|
|
|
print(f"Dataset shape: X={X.shape}, y={y.shape}")
|
|
print(f"Feature vector length: {X.shape[1]}")
|
|
print(f"Positive labels: {np.sum(y)} / {len(y)} ({np.sum(y)/len(y)*100:.1f}%)")
|
|
|
|
# ------------------------------
|
|
# 3. Scaling
|
|
# ------------------------------
|
|
scaler = StandardScaler()
|
|
X_scaled = scaler.fit_transform(X)
|
|
|
|
joblib.dump(scaler, "scaler.pkl")
|
|
print("Scaler saved to scaler.pkl")
|
|
|
|
# ------------------------------
|
|
# 4. Train/Val Split
|
|
# ------------------------------
|
|
split_idx = int(len(X) * 0.8)
|
|
X_train, X_val = X_scaled[:split_idx], X_scaled[split_idx:]
|
|
y_train, y_val = y[:split_idx], y[split_idx:]
|
|
|
|
print(f"Training samples: {len(X_train)}, Validation samples: {len(X_val)}")
|
|
|
|
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
|
|
y_train_tensor = torch.tensor(y_train, dtype=torch.float32).view(-1, 1)
|
|
X_val_tensor = torch.tensor(X_val, dtype=torch.float32)
|
|
y_val_tensor = torch.tensor(y_val, dtype=torch.float32).view(-1, 1)
|
|
|
|
# ------------------------------
|
|
# 5. Model Training
|
|
# ------------------------------
|
|
model = EntropyModel()
|
|
|
|
criterion = torch.nn.BCELoss()
|
|
optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5)
|
|
|
|
EPOCHS = 50
|
|
best_val_loss = float('inf')
|
|
|
|
for epoch in range(EPOCHS):
|
|
# Training
|
|
model.train()
|
|
optimizer.zero_grad()
|
|
output = model(X_train_tensor)
|
|
loss = criterion(output, y_train_tensor)
|
|
loss.backward()
|
|
optimizer.step()
|
|
|
|
# Validation
|
|
model.eval()
|
|
with torch.no_grad():
|
|
val_output = model(X_val_tensor)
|
|
val_loss = criterion(val_output, y_val_tensor)
|
|
|
|
# Save best model
|
|
if val_loss < best_val_loss:
|
|
best_val_loss = val_loss
|
|
torch.save(model.state_dict(), "entropy_model.pth")
|
|
|
|
if (epoch + 1) % 5 == 0 or epoch == 0:
|
|
print(f"Epoch {epoch+1}/{EPOCHS} | Train Loss: {loss.item():.6f} | Val Loss: {val_loss.item():.6f}")
|
|
|
|
print(f"\nTraining complete. Best validation loss: {best_val_loss:.6f}")
|
|
print("Model saved to entropy_model.pth")
|
|
|
|
# ------------------------------
|
|
# 6. Quick evaluation
|
|
# ------------------------------
|
|
model.eval()
|
|
with torch.no_grad():
|
|
train_pred = model(X_train_tensor)
|
|
train_acc = ((train_pred > 0.5) == y_train_tensor).float().mean().item()
|
|
|
|
val_pred = model(X_val_tensor)
|
|
val_acc = ((val_pred > 0.5) == y_val_tensor).float().mean().item()
|
|
|
|
print(f"\nFinal Metrics:")
|
|
print(f"Train Accuracy: {train_acc:.3f}")
|
|
print(f"Val Accuracy: {val_acc:.3f}")
|