Article-22220-Real-Time-Ent.../Features.py
2026-06-02 15:05:50 +02:00

303 lines
No EOL
10 KiB
Python

# Copyright 2025, MetaQuotes Ltd.
# https://www.mql5.com/en/users/johnhlomohang/
import numpy as np
from collections import deque
def compute_returns(prices):
"""Compute log returns from price series"""
prices = np.array(prices, dtype=np.float32)
return np.diff(np.log(prices + 1e-10))
def compute_entropy(returns, n_bins=10):
"""
Compute normalized Shannon entropy of returns distribution.
Higher entropy = more uncertainty/volatility.
"""
if len(returns) < 2:
return 0.0, 0.0
# Use percentile-based bins for better distribution
percentiles = np.linspace(0, 100, n_bins + 1)
bins = np.percentile(returns, percentiles)
bins = np.unique(bins) # Remove duplicates
if len(bins) < 2:
return 0.0, 0.0
states = np.digitize(returns, bins[:-1])
values, counts = np.unique(states, return_counts=True)
probs = counts / counts.sum()
entropy = -np.sum(probs * np.log(probs + 1e-10))
max_entropy = np.log(len(values)) if len(values) > 1 else 1
# Also compute entropy of squared returns (volatility entropy)
squared_returns = returns ** 2
vol_bins = np.percentile(squared_returns, percentiles)
vol_bins = np.unique(vol_bins)
if len(vol_bins) >= 2:
vol_states = np.digitize(squared_returns, vol_bins[:-1])
vol_values, vol_counts = np.unique(vol_states, return_counts=True)
vol_probs = vol_counts / vol_counts.sum()
vol_entropy = -np.sum(vol_probs * np.log(vol_probs + 1e-10))
vol_max = np.log(len(vol_values)) if len(vol_values) > 1 else 1
vol_entropy_norm = vol_entropy / vol_max
else:
vol_entropy_norm = 0.0
return entropy / max_entropy, vol_entropy_norm
def compute_volatility_metrics(returns):
"""Compute multiple volatility metrics"""
if len(returns) < 2:
return {
'std': 0.0,
'mad': 0.0,
'range': 0.0,
'skewness': 0.0,
'kurtosis': 0.0
}
std = np.std(returns)
mad = np.mean(np.abs(returns - np.mean(returns)))
range_vol = np.max(returns) - np.min(returns)
# Higher moments
skewness = 0.0
kurtosis = 0.0
if std > 1e-10:
skewness = np.mean((returns - np.mean(returns)) ** 3) / (std ** 3)
kurtosis = np.mean((returns - np.mean(returns)) ** 4) / (std ** 4)
return {
'std': std,
'mad': mad,
'range': range_vol,
'skewness': skewness,
'kurtosis': kurtosis
}
def compute_trend_strength(prices):
"""Compute trend strength using linear regression R²"""
prices = np.array(prices, dtype=np.float32)
if len(prices) < 2:
return 0.0, 0.0
x = np.arange(len(prices))
y = prices
# Linear regression
n = len(x)
sum_x = np.sum(x)
sum_y = np.sum(y)
sum_xy = np.sum(x * y)
sum_xx = np.sum(x * x)
sum_yy = np.sum(y * y)
# Avoid division by zero
denominator = n * sum_xx - sum_x * sum_x
if denominator == 0:
return 0.0, 0.0
slope = (n * sum_xy - sum_x * sum_y) / denominator
# R-squared
y_mean = np.mean(y)
ss_tot = np.sum((y - y_mean) ** 2)
if ss_tot == 0:
r_squared = 1.0
else:
y_pred = slope * x + (sum_y - slope * sum_x) / n
ss_res = np.sum((y - y_pred) ** 2)
r_squared = 1 - (ss_res / ss_tot)
return slope, max(0.0, min(1.0, r_squared))
def build_features(prices, rsi=50.0, high_prices=None, low_prices=None):
"""
Build comprehensive feature vector for model input.
Parameters:
- prices: array of close prices
- rsi: RSI value (default 50.0)
- high_prices: optional array of high prices
- low_prices: optional array of low prices
Returns:
- features: numpy array of 8 features
- metrics: dictionary with all calculated metrics
"""
# Ensure inputs are numpy arrays
prices = np.array(prices, dtype=np.float32).flatten()
returns = compute_returns(prices)
# Entropy metrics
entropy, vol_entropy = compute_entropy(returns)
# Volatility metrics
vol_metrics = compute_volatility_metrics(returns)
# Trend metrics
slope, r_squared = compute_trend_strength(prices)
# Mean and std of returns
mean_ret = np.mean(returns) if len(returns) > 0 else 0.0
std_ret = vol_metrics['std']
# Normalize slope to [-1, 1] range
slope_norm = np.tanh(slope * 100) if not np.isnan(slope) and not np.isinf(slope) else 0.0
# Build feature vector (8 features)
features = np.array([
float(entropy), # 0: Market uncertainty
float(vol_entropy), # 1: Volatility uncertainty
float(mean_ret), # 2: Directional bias
float(std_ret), # 3: Volatility level
float(r_squared), # 4: Trend strength
float(slope_norm), # 5: Normalized trend direction
float(vol_metrics['skewness']), # 6: Return asymmetry
float(rsi / 100.0) # 7: Normalized RSI
], dtype=np.float32)
# Replace any NaN or inf with 0
features = np.nan_to_num(features, nan=0.0, posinf=1.0, neginf=-1.0)
metrics = {
'entropy': float(entropy),
'vol_entropy': float(vol_entropy),
'mean_ret': float(mean_ret),
'std_ret': float(std_ret),
'r_squared': float(r_squared),
'slope': float(slope) if not np.isnan(slope) else 0.0,
'skewness': float(vol_metrics['skewness']),
'kurtosis': float(vol_metrics['kurtosis']),
'rsi': float(rsi)
}
return features, metrics
class VolatilityRegimeDetector:
"""Adaptive volatility regime detection using entropy history"""
def __init__(self, window_size=50, history_size=100):
self.window_size = window_size
self.history_size = history_size
self.entropy_history = deque(maxlen=history_size)
self.vol_entropy_history = deque(maxlen=history_size)
self.std_history = deque(maxlen=history_size)
self.regime_history = deque(maxlen=20)
def update(self, metrics):
"""Update history and detect current regime"""
self.entropy_history.append(metrics['entropy'])
self.vol_entropy_history.append(metrics['vol_entropy'])
self.std_history.append(metrics['std_ret'])
return self.detect_regime(metrics)
def detect_regime(self, metrics):
"""Detect current volatility regime with adaptive thresholds"""
entropy = metrics['entropy']
vol_entropy = metrics['vol_entropy']
if len(self.entropy_history) < 20:
# Not enough history - use static thresholds
if entropy > 0.7:
regime = "HIGH_VOLATILITY"
multiplier = 1.5
confidence_adj = 1.3
elif entropy < 0.3:
regime = "LOW_VOLATILITY"
multiplier = 0.7
confidence_adj = 0.8
else:
regime = "NORMAL"
multiplier = 1.0
confidence_adj = 1.0
else:
# Adaptive thresholds based on historical distribution
entropy_array = np.array(list(self.entropy_history))
mean_entropy = np.mean(entropy_array)
std_entropy = np.std(entropy_array)
# Dynamic thresholds
high_threshold = min(0.85, mean_entropy + 1.5 * std_entropy)
low_threshold = max(0.15, mean_entropy - 1.5 * std_entropy)
extreme_threshold = min(0.95, mean_entropy + 2.5 * std_entropy)
# Regime detection
if entropy > extreme_threshold or vol_entropy > 0.9:
regime = "EXTREME_VOLATILITY"
multiplier = 2.5
confidence_adj = 2.0
elif entropy > high_threshold:
regime = "HIGH_VOLATILITY"
multiplier = 1.5
confidence_adj = 1.3
elif entropy < low_threshold:
regime = "LOW_VOLATILITY"
multiplier = 0.7
confidence_adj = 0.8
else:
regime = "NORMAL"
multiplier = 1.0
confidence_adj = 1.0
self.regime_history.append(regime)
regime_change = self._detect_regime_change()
return {
'regime': regime,
'volatility_multiplier': multiplier,
'confidence_multiplier': confidence_adj,
'regime_change': regime_change,
'entropy_percentile': self._get_percentile(entropy),
'vol_entropy_percentile': self._get_percentile(vol_entropy, is_vol=True)
}
def _get_percentile(self, value, is_vol=False):
"""Calculate percentile of current value in history"""
history = self.vol_entropy_history if is_vol else self.entropy_history
if len(history) < 10:
return 50.0
history_array = np.array(list(history))
return (np.sum(history_array < value) / len(history_array)) * 100
def _detect_regime_change(self):
"""Detect if regime has changed from previous state"""
if len(self.regime_history) < 2:
return False
return self.regime_history[-1] != self.regime_history[-2]
def get_adaptive_parameters(self, base_sl, base_tp, base_lot):
"""Calculate adaptive trading parameters"""
if len(self.regime_history) == 0:
return base_sl, base_tp, base_lot, "NORMAL"
current_regime = self.regime_history[-1]
if current_regime == "EXTREME_VOLATILITY":
sl_mult = 3.0
tp_mult = 2.0
lot_mult = 0.3
elif current_regime == "HIGH_VOLATILITY":
sl_mult = 1.8
tp_mult = 1.5
lot_mult = 0.6
elif current_regime == "LOW_VOLATILITY":
sl_mult = 0.7
tp_mult = 0.8
lot_mult = 1.3
else:
sl_mult = 1.0
tp_mult = 1.0
lot_mult = 1.0
adaptive_sl = int(base_sl * sl_mult)
adaptive_tp = int(base_tp * tp_mult)
adaptive_lot = base_lot * lot_mult
return adaptive_sl, adaptive_tp, adaptive_lot, current_regime