# Copyright 2025, MetaQuotes Ltd. # https://www.mql5.com/en/users/johnhlomohang/ import numpy as np import pandas as pd import torch from sklearn.preprocessing import StandardScaler import joblib from Model import EntropyModel from Features import build_features # ------------------------------ # 1. Load CSV data # ------------------------------ CSV_FILE = "XAUUSD_H1.csv" try: df = pd.read_csv(CSV_FILE) print(f"Loaded {len(df)} rows from {CSV_FILE}") except FileNotFoundError: print(f"Error: {CSV_FILE} not found. Run 'Getting Hist Data.py' first.") exit() # Extract prices if 'close' not in df.columns: print("Error: CSV must contain a 'close' column.") exit() prices = df['close'].dropna().values.astype(np.float32) high_prices = df['high'].values.astype(np.float32) if 'high' in df.columns else None low_prices = df['low'].values.astype(np.float32) if 'low' in df.columns else None print(f"Price data shape: {prices.shape}") if high_prices is not None: print(f"High/Low data available") # ------------------------------ # 2. Feature Engineering # ------------------------------ WINDOW = 50 HORIZON = 5 X, y = [], [] # Calculate simple RSI for each point def calculate_rsi(prices, period=14): if len(prices) < period + 1: return 50.0 deltas = np.diff(prices) seed = deltas[:period+1] up = seed[seed >= 0].sum() / period down = -seed[seed < 0].sum() / period if down == 0: return 100.0 rs = up / down return 100.0 - (100.0 / (1.0 + rs)) for i in range(WINDOW, len(prices) - HORIZON): window = prices[i - WINDOW:i] # Get high/low for window if available window_high = high_prices[i - WINDOW:i] if high_prices is not None else None window_low = low_prices[i - WINDOW:i] if low_prices is not None else None # Calculate RSI for the window rsi = calculate_rsi(window, 14) # Call build_features with all 4 arguments features, metrics = build_features( prices=window, # positional or keyword rsi=rsi, # positional or keyword high_prices=window_high, # keyword low_prices=window_low # keyword ) # Target: future return direction future_return = np.log(prices[i + HORIZON] / prices[i]) label = 1 if future_return > 0 else 0 X.append(features) y.append(label) X = np.array(X, dtype=np.float32) y = np.array(y, dtype=np.float32) print(f"Dataset shape: X={X.shape}, y={y.shape}") print(f"Feature vector length: {X.shape[1]}") print(f"Positive labels: {np.sum(y)} / {len(y)} ({np.sum(y)/len(y)*100:.1f}%)") # ------------------------------ # 3. Scaling # ------------------------------ scaler = StandardScaler() X_scaled = scaler.fit_transform(X) joblib.dump(scaler, "scaler.pkl") print("Scaler saved to scaler.pkl") # ------------------------------ # 4. Train/Val Split # ------------------------------ split_idx = int(len(X) * 0.8) X_train, X_val = X_scaled[:split_idx], X_scaled[split_idx:] y_train, y_val = y[:split_idx], y[split_idx:] print(f"Training samples: {len(X_train)}, Validation samples: {len(X_val)}") X_train_tensor = torch.tensor(X_train, dtype=torch.float32) y_train_tensor = torch.tensor(y_train, dtype=torch.float32).view(-1, 1) X_val_tensor = torch.tensor(X_val, dtype=torch.float32) y_val_tensor = torch.tensor(y_val, dtype=torch.float32).view(-1, 1) # ------------------------------ # 5. Model Training # ------------------------------ model = EntropyModel() criterion = torch.nn.BCELoss() optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5) EPOCHS = 50 best_val_loss = float('inf') for epoch in range(EPOCHS): # Training model.train() optimizer.zero_grad() output = model(X_train_tensor) loss = criterion(output, y_train_tensor) loss.backward() optimizer.step() # Validation model.eval() with torch.no_grad(): val_output = model(X_val_tensor) val_loss = criterion(val_output, y_val_tensor) # Save best model if val_loss < best_val_loss: best_val_loss = val_loss torch.save(model.state_dict(), "entropy_model.pth") if (epoch + 1) % 5 == 0 or epoch == 0: print(f"Epoch {epoch+1}/{EPOCHS} | Train Loss: {loss.item():.6f} | Val Loss: {val_loss.item():.6f}") print(f"\nTraining complete. Best validation loss: {best_val_loss:.6f}") print("Model saved to entropy_model.pth") # ------------------------------ # 6. Quick evaluation # ------------------------------ model.eval() with torch.no_grad(): train_pred = model(X_train_tensor) train_acc = ((train_pred > 0.5) == y_train_tensor).float().mean().item() val_pred = model(X_val_tensor) val_acc = ((val_pred > 0.5) == y_val_tensor).float().mean().item() print(f"\nFinal Metrics:") print(f"Train Accuracy: {train_acc:.3f}") print(f"Val Accuracy: {val_acc:.3f}")