303 lines
10 KiB
Python
303 lines
10 KiB
Python
|
|
# Copyright 2025, MetaQuotes Ltd.
|
||
|
|
# https://www.mql5.com/en/users/johnhlomohang/
|
||
|
|
|
||
|
|
import numpy as np
|
||
|
|
from collections import deque
|
||
|
|
|
||
|
|
def compute_returns(prices):
|
||
|
|
"""Compute log returns from price series"""
|
||
|
|
prices = np.array(prices, dtype=np.float32)
|
||
|
|
return np.diff(np.log(prices + 1e-10))
|
||
|
|
|
||
|
|
def compute_entropy(returns, n_bins=10):
|
||
|
|
"""
|
||
|
|
Compute normalized Shannon entropy of returns distribution.
|
||
|
|
Higher entropy = more uncertainty/volatility.
|
||
|
|
"""
|
||
|
|
if len(returns) < 2:
|
||
|
|
return 0.0, 0.0
|
||
|
|
|
||
|
|
# Use percentile-based bins for better distribution
|
||
|
|
percentiles = np.linspace(0, 100, n_bins + 1)
|
||
|
|
bins = np.percentile(returns, percentiles)
|
||
|
|
bins = np.unique(bins) # Remove duplicates
|
||
|
|
|
||
|
|
if len(bins) < 2:
|
||
|
|
return 0.0, 0.0
|
||
|
|
|
||
|
|
states = np.digitize(returns, bins[:-1])
|
||
|
|
values, counts = np.unique(states, return_counts=True)
|
||
|
|
probs = counts / counts.sum()
|
||
|
|
|
||
|
|
entropy = -np.sum(probs * np.log(probs + 1e-10))
|
||
|
|
max_entropy = np.log(len(values)) if len(values) > 1 else 1
|
||
|
|
|
||
|
|
# Also compute entropy of squared returns (volatility entropy)
|
||
|
|
squared_returns = returns ** 2
|
||
|
|
vol_bins = np.percentile(squared_returns, percentiles)
|
||
|
|
vol_bins = np.unique(vol_bins)
|
||
|
|
|
||
|
|
if len(vol_bins) >= 2:
|
||
|
|
vol_states = np.digitize(squared_returns, vol_bins[:-1])
|
||
|
|
vol_values, vol_counts = np.unique(vol_states, return_counts=True)
|
||
|
|
vol_probs = vol_counts / vol_counts.sum()
|
||
|
|
vol_entropy = -np.sum(vol_probs * np.log(vol_probs + 1e-10))
|
||
|
|
vol_max = np.log(len(vol_values)) if len(vol_values) > 1 else 1
|
||
|
|
vol_entropy_norm = vol_entropy / vol_max
|
||
|
|
else:
|
||
|
|
vol_entropy_norm = 0.0
|
||
|
|
|
||
|
|
return entropy / max_entropy, vol_entropy_norm
|
||
|
|
|
||
|
|
def compute_volatility_metrics(returns):
|
||
|
|
"""Compute multiple volatility metrics"""
|
||
|
|
if len(returns) < 2:
|
||
|
|
return {
|
||
|
|
'std': 0.0,
|
||
|
|
'mad': 0.0,
|
||
|
|
'range': 0.0,
|
||
|
|
'skewness': 0.0,
|
||
|
|
'kurtosis': 0.0
|
||
|
|
}
|
||
|
|
|
||
|
|
std = np.std(returns)
|
||
|
|
mad = np.mean(np.abs(returns - np.mean(returns)))
|
||
|
|
range_vol = np.max(returns) - np.min(returns)
|
||
|
|
|
||
|
|
# Higher moments
|
||
|
|
skewness = 0.0
|
||
|
|
kurtosis = 0.0
|
||
|
|
if std > 1e-10:
|
||
|
|
skewness = np.mean((returns - np.mean(returns)) ** 3) / (std ** 3)
|
||
|
|
kurtosis = np.mean((returns - np.mean(returns)) ** 4) / (std ** 4)
|
||
|
|
|
||
|
|
return {
|
||
|
|
'std': std,
|
||
|
|
'mad': mad,
|
||
|
|
'range': range_vol,
|
||
|
|
'skewness': skewness,
|
||
|
|
'kurtosis': kurtosis
|
||
|
|
}
|
||
|
|
|
||
|
|
def compute_trend_strength(prices):
|
||
|
|
"""Compute trend strength using linear regression R²"""
|
||
|
|
prices = np.array(prices, dtype=np.float32)
|
||
|
|
if len(prices) < 2:
|
||
|
|
return 0.0, 0.0
|
||
|
|
|
||
|
|
x = np.arange(len(prices))
|
||
|
|
y = prices
|
||
|
|
|
||
|
|
# Linear regression
|
||
|
|
n = len(x)
|
||
|
|
sum_x = np.sum(x)
|
||
|
|
sum_y = np.sum(y)
|
||
|
|
sum_xy = np.sum(x * y)
|
||
|
|
sum_xx = np.sum(x * x)
|
||
|
|
sum_yy = np.sum(y * y)
|
||
|
|
|
||
|
|
# Avoid division by zero
|
||
|
|
denominator = n * sum_xx - sum_x * sum_x
|
||
|
|
if denominator == 0:
|
||
|
|
return 0.0, 0.0
|
||
|
|
|
||
|
|
slope = (n * sum_xy - sum_x * sum_y) / denominator
|
||
|
|
|
||
|
|
# R-squared
|
||
|
|
y_mean = np.mean(y)
|
||
|
|
ss_tot = np.sum((y - y_mean) ** 2)
|
||
|
|
if ss_tot == 0:
|
||
|
|
r_squared = 1.0
|
||
|
|
else:
|
||
|
|
y_pred = slope * x + (sum_y - slope * sum_x) / n
|
||
|
|
ss_res = np.sum((y - y_pred) ** 2)
|
||
|
|
r_squared = 1 - (ss_res / ss_tot)
|
||
|
|
|
||
|
|
return slope, max(0.0, min(1.0, r_squared))
|
||
|
|
|
||
|
|
def build_features(prices, rsi=50.0, high_prices=None, low_prices=None):
|
||
|
|
"""
|
||
|
|
Build comprehensive feature vector for model input.
|
||
|
|
|
||
|
|
Parameters:
|
||
|
|
- prices: array of close prices
|
||
|
|
- rsi: RSI value (default 50.0)
|
||
|
|
- high_prices: optional array of high prices
|
||
|
|
- low_prices: optional array of low prices
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
- features: numpy array of 8 features
|
||
|
|
- metrics: dictionary with all calculated metrics
|
||
|
|
"""
|
||
|
|
# Ensure inputs are numpy arrays
|
||
|
|
prices = np.array(prices, dtype=np.float32).flatten()
|
||
|
|
|
||
|
|
returns = compute_returns(prices)
|
||
|
|
|
||
|
|
# Entropy metrics
|
||
|
|
entropy, vol_entropy = compute_entropy(returns)
|
||
|
|
|
||
|
|
# Volatility metrics
|
||
|
|
vol_metrics = compute_volatility_metrics(returns)
|
||
|
|
|
||
|
|
# Trend metrics
|
||
|
|
slope, r_squared = compute_trend_strength(prices)
|
||
|
|
|
||
|
|
# Mean and std of returns
|
||
|
|
mean_ret = np.mean(returns) if len(returns) > 0 else 0.0
|
||
|
|
std_ret = vol_metrics['std']
|
||
|
|
|
||
|
|
# Normalize slope to [-1, 1] range
|
||
|
|
slope_norm = np.tanh(slope * 100) if not np.isnan(slope) and not np.isinf(slope) else 0.0
|
||
|
|
|
||
|
|
# Build feature vector (8 features)
|
||
|
|
features = np.array([
|
||
|
|
float(entropy), # 0: Market uncertainty
|
||
|
|
float(vol_entropy), # 1: Volatility uncertainty
|
||
|
|
float(mean_ret), # 2: Directional bias
|
||
|
|
float(std_ret), # 3: Volatility level
|
||
|
|
float(r_squared), # 4: Trend strength
|
||
|
|
float(slope_norm), # 5: Normalized trend direction
|
||
|
|
float(vol_metrics['skewness']), # 6: Return asymmetry
|
||
|
|
float(rsi / 100.0) # 7: Normalized RSI
|
||
|
|
], dtype=np.float32)
|
||
|
|
|
||
|
|
# Replace any NaN or inf with 0
|
||
|
|
features = np.nan_to_num(features, nan=0.0, posinf=1.0, neginf=-1.0)
|
||
|
|
|
||
|
|
metrics = {
|
||
|
|
'entropy': float(entropy),
|
||
|
|
'vol_entropy': float(vol_entropy),
|
||
|
|
'mean_ret': float(mean_ret),
|
||
|
|
'std_ret': float(std_ret),
|
||
|
|
'r_squared': float(r_squared),
|
||
|
|
'slope': float(slope) if not np.isnan(slope) else 0.0,
|
||
|
|
'skewness': float(vol_metrics['skewness']),
|
||
|
|
'kurtosis': float(vol_metrics['kurtosis']),
|
||
|
|
'rsi': float(rsi)
|
||
|
|
}
|
||
|
|
|
||
|
|
return features, metrics
|
||
|
|
|
||
|
|
|
||
|
|
class VolatilityRegimeDetector:
|
||
|
|
"""Adaptive volatility regime detection using entropy history"""
|
||
|
|
|
||
|
|
def __init__(self, window_size=50, history_size=100):
|
||
|
|
self.window_size = window_size
|
||
|
|
self.history_size = history_size
|
||
|
|
self.entropy_history = deque(maxlen=history_size)
|
||
|
|
self.vol_entropy_history = deque(maxlen=history_size)
|
||
|
|
self.std_history = deque(maxlen=history_size)
|
||
|
|
self.regime_history = deque(maxlen=20)
|
||
|
|
|
||
|
|
def update(self, metrics):
|
||
|
|
"""Update history and detect current regime"""
|
||
|
|
self.entropy_history.append(metrics['entropy'])
|
||
|
|
self.vol_entropy_history.append(metrics['vol_entropy'])
|
||
|
|
self.std_history.append(metrics['std_ret'])
|
||
|
|
return self.detect_regime(metrics)
|
||
|
|
|
||
|
|
def detect_regime(self, metrics):
|
||
|
|
"""Detect current volatility regime with adaptive thresholds"""
|
||
|
|
entropy = metrics['entropy']
|
||
|
|
vol_entropy = metrics['vol_entropy']
|
||
|
|
|
||
|
|
if len(self.entropy_history) < 20:
|
||
|
|
# Not enough history - use static thresholds
|
||
|
|
if entropy > 0.7:
|
||
|
|
regime = "HIGH_VOLATILITY"
|
||
|
|
multiplier = 1.5
|
||
|
|
confidence_adj = 1.3
|
||
|
|
elif entropy < 0.3:
|
||
|
|
regime = "LOW_VOLATILITY"
|
||
|
|
multiplier = 0.7
|
||
|
|
confidence_adj = 0.8
|
||
|
|
else:
|
||
|
|
regime = "NORMAL"
|
||
|
|
multiplier = 1.0
|
||
|
|
confidence_adj = 1.0
|
||
|
|
else:
|
||
|
|
# Adaptive thresholds based on historical distribution
|
||
|
|
entropy_array = np.array(list(self.entropy_history))
|
||
|
|
mean_entropy = np.mean(entropy_array)
|
||
|
|
std_entropy = np.std(entropy_array)
|
||
|
|
|
||
|
|
# Dynamic thresholds
|
||
|
|
high_threshold = min(0.85, mean_entropy + 1.5 * std_entropy)
|
||
|
|
low_threshold = max(0.15, mean_entropy - 1.5 * std_entropy)
|
||
|
|
extreme_threshold = min(0.95, mean_entropy + 2.5 * std_entropy)
|
||
|
|
|
||
|
|
# Regime detection
|
||
|
|
if entropy > extreme_threshold or vol_entropy > 0.9:
|
||
|
|
regime = "EXTREME_VOLATILITY"
|
||
|
|
multiplier = 2.5
|
||
|
|
confidence_adj = 2.0
|
||
|
|
elif entropy > high_threshold:
|
||
|
|
regime = "HIGH_VOLATILITY"
|
||
|
|
multiplier = 1.5
|
||
|
|
confidence_adj = 1.3
|
||
|
|
elif entropy < low_threshold:
|
||
|
|
regime = "LOW_VOLATILITY"
|
||
|
|
multiplier = 0.7
|
||
|
|
confidence_adj = 0.8
|
||
|
|
else:
|
||
|
|
regime = "NORMAL"
|
||
|
|
multiplier = 1.0
|
||
|
|
confidence_adj = 1.0
|
||
|
|
|
||
|
|
self.regime_history.append(regime)
|
||
|
|
regime_change = self._detect_regime_change()
|
||
|
|
|
||
|
|
return {
|
||
|
|
'regime': regime,
|
||
|
|
'volatility_multiplier': multiplier,
|
||
|
|
'confidence_multiplier': confidence_adj,
|
||
|
|
'regime_change': regime_change,
|
||
|
|
'entropy_percentile': self._get_percentile(entropy),
|
||
|
|
'vol_entropy_percentile': self._get_percentile(vol_entropy, is_vol=True)
|
||
|
|
}
|
||
|
|
|
||
|
|
def _get_percentile(self, value, is_vol=False):
|
||
|
|
"""Calculate percentile of current value in history"""
|
||
|
|
history = self.vol_entropy_history if is_vol else self.entropy_history
|
||
|
|
if len(history) < 10:
|
||
|
|
return 50.0
|
||
|
|
history_array = np.array(list(history))
|
||
|
|
return (np.sum(history_array < value) / len(history_array)) * 100
|
||
|
|
|
||
|
|
def _detect_regime_change(self):
|
||
|
|
"""Detect if regime has changed from previous state"""
|
||
|
|
if len(self.regime_history) < 2:
|
||
|
|
return False
|
||
|
|
return self.regime_history[-1] != self.regime_history[-2]
|
||
|
|
|
||
|
|
def get_adaptive_parameters(self, base_sl, base_tp, base_lot):
|
||
|
|
"""Calculate adaptive trading parameters"""
|
||
|
|
if len(self.regime_history) == 0:
|
||
|
|
return base_sl, base_tp, base_lot, "NORMAL"
|
||
|
|
|
||
|
|
current_regime = self.regime_history[-1]
|
||
|
|
|
||
|
|
if current_regime == "EXTREME_VOLATILITY":
|
||
|
|
sl_mult = 3.0
|
||
|
|
tp_mult = 2.0
|
||
|
|
lot_mult = 0.3
|
||
|
|
elif current_regime == "HIGH_VOLATILITY":
|
||
|
|
sl_mult = 1.8
|
||
|
|
tp_mult = 1.5
|
||
|
|
lot_mult = 0.6
|
||
|
|
elif current_regime == "LOW_VOLATILITY":
|
||
|
|
sl_mult = 0.7
|
||
|
|
tp_mult = 0.8
|
||
|
|
lot_mult = 1.3
|
||
|
|
else:
|
||
|
|
sl_mult = 1.0
|
||
|
|
tp_mult = 1.0
|
||
|
|
lot_mult = 1.0
|
||
|
|
|
||
|
|
adaptive_sl = int(base_sl * sl_mult)
|
||
|
|
adaptive_tp = int(base_tp * tp_mult)
|
||
|
|
adaptive_lot = base_lot * lot_mult
|
||
|
|
|
||
|
|
return adaptive_sl, adaptive_tp, adaptive_lot, current_regime
|