# Copyright 2025, MetaQuotes Ltd. # https://www.mql5.com/en/users/johnhlomohang/ import numpy as np from collections import deque def compute_returns(prices): """Compute log returns from price series""" prices = np.array(prices, dtype=np.float32) return np.diff(np.log(prices + 1e-10)) def compute_entropy(returns, n_bins=10): """ Compute normalized Shannon entropy of returns distribution. Higher entropy = more uncertainty/volatility. """ if len(returns) < 2: return 0.0, 0.0 # Use percentile-based bins for better distribution percentiles = np.linspace(0, 100, n_bins + 1) bins = np.percentile(returns, percentiles) bins = np.unique(bins) # Remove duplicates if len(bins) < 2: return 0.0, 0.0 states = np.digitize(returns, bins[:-1]) values, counts = np.unique(states, return_counts=True) probs = counts / counts.sum() entropy = -np.sum(probs * np.log(probs + 1e-10)) max_entropy = np.log(len(values)) if len(values) > 1 else 1 # Also compute entropy of squared returns (volatility entropy) squared_returns = returns ** 2 vol_bins = np.percentile(squared_returns, percentiles) vol_bins = np.unique(vol_bins) if len(vol_bins) >= 2: vol_states = np.digitize(squared_returns, vol_bins[:-1]) vol_values, vol_counts = np.unique(vol_states, return_counts=True) vol_probs = vol_counts / vol_counts.sum() vol_entropy = -np.sum(vol_probs * np.log(vol_probs + 1e-10)) vol_max = np.log(len(vol_values)) if len(vol_values) > 1 else 1 vol_entropy_norm = vol_entropy / vol_max else: vol_entropy_norm = 0.0 return entropy / max_entropy, vol_entropy_norm def compute_volatility_metrics(returns): """Compute multiple volatility metrics""" if len(returns) < 2: return { 'std': 0.0, 'mad': 0.0, 'range': 0.0, 'skewness': 0.0, 'kurtosis': 0.0 } std = np.std(returns) mad = np.mean(np.abs(returns - np.mean(returns))) range_vol = np.max(returns) - np.min(returns) # Higher moments skewness = 0.0 kurtosis = 0.0 if std > 1e-10: skewness = np.mean((returns - np.mean(returns)) ** 3) / (std ** 3) kurtosis = np.mean((returns - np.mean(returns)) ** 4) / (std ** 4) return { 'std': std, 'mad': mad, 'range': range_vol, 'skewness': skewness, 'kurtosis': kurtosis } def compute_trend_strength(prices): """Compute trend strength using linear regression R²""" prices = np.array(prices, dtype=np.float32) if len(prices) < 2: return 0.0, 0.0 x = np.arange(len(prices)) y = prices # Linear regression n = len(x) sum_x = np.sum(x) sum_y = np.sum(y) sum_xy = np.sum(x * y) sum_xx = np.sum(x * x) sum_yy = np.sum(y * y) # Avoid division by zero denominator = n * sum_xx - sum_x * sum_x if denominator == 0: return 0.0, 0.0 slope = (n * sum_xy - sum_x * sum_y) / denominator # R-squared y_mean = np.mean(y) ss_tot = np.sum((y - y_mean) ** 2) if ss_tot == 0: r_squared = 1.0 else: y_pred = slope * x + (sum_y - slope * sum_x) / n ss_res = np.sum((y - y_pred) ** 2) r_squared = 1 - (ss_res / ss_tot) return slope, max(0.0, min(1.0, r_squared)) def build_features(prices, rsi=50.0, high_prices=None, low_prices=None): """ Build comprehensive feature vector for model input. Parameters: - prices: array of close prices - rsi: RSI value (default 50.0) - high_prices: optional array of high prices - low_prices: optional array of low prices Returns: - features: numpy array of 8 features - metrics: dictionary with all calculated metrics """ # Ensure inputs are numpy arrays prices = np.array(prices, dtype=np.float32).flatten() returns = compute_returns(prices) # Entropy metrics entropy, vol_entropy = compute_entropy(returns) # Volatility metrics vol_metrics = compute_volatility_metrics(returns) # Trend metrics slope, r_squared = compute_trend_strength(prices) # Mean and std of returns mean_ret = np.mean(returns) if len(returns) > 0 else 0.0 std_ret = vol_metrics['std'] # Normalize slope to [-1, 1] range slope_norm = np.tanh(slope * 100) if not np.isnan(slope) and not np.isinf(slope) else 0.0 # Build feature vector (8 features) features = np.array([ float(entropy), # 0: Market uncertainty float(vol_entropy), # 1: Volatility uncertainty float(mean_ret), # 2: Directional bias float(std_ret), # 3: Volatility level float(r_squared), # 4: Trend strength float(slope_norm), # 5: Normalized trend direction float(vol_metrics['skewness']), # 6: Return asymmetry float(rsi / 100.0) # 7: Normalized RSI ], dtype=np.float32) # Replace any NaN or inf with 0 features = np.nan_to_num(features, nan=0.0, posinf=1.0, neginf=-1.0) metrics = { 'entropy': float(entropy), 'vol_entropy': float(vol_entropy), 'mean_ret': float(mean_ret), 'std_ret': float(std_ret), 'r_squared': float(r_squared), 'slope': float(slope) if not np.isnan(slope) else 0.0, 'skewness': float(vol_metrics['skewness']), 'kurtosis': float(vol_metrics['kurtosis']), 'rsi': float(rsi) } return features, metrics class VolatilityRegimeDetector: """Adaptive volatility regime detection using entropy history""" def __init__(self, window_size=50, history_size=100): self.window_size = window_size self.history_size = history_size self.entropy_history = deque(maxlen=history_size) self.vol_entropy_history = deque(maxlen=history_size) self.std_history = deque(maxlen=history_size) self.regime_history = deque(maxlen=20) def update(self, metrics): """Update history and detect current regime""" self.entropy_history.append(metrics['entropy']) self.vol_entropy_history.append(metrics['vol_entropy']) self.std_history.append(metrics['std_ret']) return self.detect_regime(metrics) def detect_regime(self, metrics): """Detect current volatility regime with adaptive thresholds""" entropy = metrics['entropy'] vol_entropy = metrics['vol_entropy'] if len(self.entropy_history) < 20: # Not enough history - use static thresholds if entropy > 0.7: regime = "HIGH_VOLATILITY" multiplier = 1.5 confidence_adj = 1.3 elif entropy < 0.3: regime = "LOW_VOLATILITY" multiplier = 0.7 confidence_adj = 0.8 else: regime = "NORMAL" multiplier = 1.0 confidence_adj = 1.0 else: # Adaptive thresholds based on historical distribution entropy_array = np.array(list(self.entropy_history)) mean_entropy = np.mean(entropy_array) std_entropy = np.std(entropy_array) # Dynamic thresholds high_threshold = min(0.85, mean_entropy + 1.5 * std_entropy) low_threshold = max(0.15, mean_entropy - 1.5 * std_entropy) extreme_threshold = min(0.95, mean_entropy + 2.5 * std_entropy) # Regime detection if entropy > extreme_threshold or vol_entropy > 0.9: regime = "EXTREME_VOLATILITY" multiplier = 2.5 confidence_adj = 2.0 elif entropy > high_threshold: regime = "HIGH_VOLATILITY" multiplier = 1.5 confidence_adj = 1.3 elif entropy < low_threshold: regime = "LOW_VOLATILITY" multiplier = 0.7 confidence_adj = 0.8 else: regime = "NORMAL" multiplier = 1.0 confidence_adj = 1.0 self.regime_history.append(regime) regime_change = self._detect_regime_change() return { 'regime': regime, 'volatility_multiplier': multiplier, 'confidence_multiplier': confidence_adj, 'regime_change': regime_change, 'entropy_percentile': self._get_percentile(entropy), 'vol_entropy_percentile': self._get_percentile(vol_entropy, is_vol=True) } def _get_percentile(self, value, is_vol=False): """Calculate percentile of current value in history""" history = self.vol_entropy_history if is_vol else self.entropy_history if len(history) < 10: return 50.0 history_array = np.array(list(history)) return (np.sum(history_array < value) / len(history_array)) * 100 def _detect_regime_change(self): """Detect if regime has changed from previous state""" if len(self.regime_history) < 2: return False return self.regime_history[-1] != self.regime_history[-2] def get_adaptive_parameters(self, base_sl, base_tp, base_lot): """Calculate adaptive trading parameters""" if len(self.regime_history) == 0: return base_sl, base_tp, base_lot, "NORMAL" current_regime = self.regime_history[-1] if current_regime == "EXTREME_VOLATILITY": sl_mult = 3.0 tp_mult = 2.0 lot_mult = 0.3 elif current_regime == "HIGH_VOLATILITY": sl_mult = 1.8 tp_mult = 1.5 lot_mult = 0.6 elif current_regime == "LOW_VOLATILITY": sl_mult = 0.7 tp_mult = 0.8 lot_mult = 1.3 else: sl_mult = 1.0 tp_mult = 1.0 lot_mult = 1.0 adaptive_sl = int(base_sl * sl_mult) adaptive_tp = int(base_tp * tp_mult) adaptive_lot = base_lot * lot_mult return adaptive_sl, adaptive_tp, adaptive_lot, current_regime