MMAR/step2_extract_scaling.py
2026-05-03 21:24:14 +00:00

739 lines
25 KiB
Python

"""
Step 2: Extract Scaling Function τ(q)
Extracts the scaling function from Step 1 partition function slopes
From paper equation (2.1):
E(|X(t)|^q) = c(q) × t^(τ(q)+1)
Therefore:
slope = τ(q) + 1 → τ(q) = slope - 1
Key parameter to estimate:
H (self-affinity index) where τ(1/H) = 0
"""
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.interpolate import interp1d
from scipy.optimize import brentq
from pathlib import Path
import config
# Try to import nolds for robust Hurst estimation
try:
import nolds
NOLDS_AVAILABLE = True
except ImportError:
NOLDS_AVAILABLE = False
print("⚠️ Warning: nolds not available. Install with: pip install nolds")
class ScalingExtractor:
"""
Extracts scaling function τ(q) from partition function results.
The scaling function τ(q) characterizes the multifractal properties
of the data. It is extracted from the slopes of partition plots:
τ(q) = slope - 1
Key parameter:
H (Hurst exponent / self-affinity index) is found where τ(1/H) = 0
"""
def __init__(self, checker, verbose=config.VERBOSE):
"""
Initialize scaling extractor.
Parameters:
-----------
checker : FractalityChecker
Completed checker object from Step 1 with slopes computed
verbose : bool
Print detailed information
"""
self.checker = checker
self.verbose = verbose
# Extract from checker
self.q_values = np.array(sorted(checker.slopes.keys()))
self.slopes = np.array([checker.slopes[q] for q in self.q_values])
self.r_squared = np.array([checker.r_squared_values[q] for q in self.q_values])
# Calculate τ(q) = slope - 1
self.tau_q = self.slopes - 1
# Storage for results
self.H = None # Self-affinity index
self.tau_q_func = None # Interpolation function for τ(q)
if self.verbose:
print(f"\nScaling Extractor Initialized")
print(f"Number of q values: {len(self.q_values)}")
print(f"q range: {self.q_values.min():.2f} to {self.q_values.max():.2f}")
print(f"τ(q) range: {self.tau_q.min():.4f} to {self.tau_q.max():.4f}")
def estimate_H_nolds(self):
"""
Estimate H using nolds R/S (rescaled range) method.
This is more robust than finding τ(1/H) = 0 numerically.
Uses the original returns from the checker.
Returns:
--------
float
Estimated H value using R/S method
"""
if not NOLDS_AVAILABLE:
if self.verbose:
print(" ⚠️ nolds not available, falling back to τ(q) method")
return None
if self.verbose:
print("\nEstimating H using nolds R/S method...")
print(" (More robust than τ(1/H) = 0 approach)")
try:
# Get returns from checker
returns = self.checker.returns
# Estimate Hurst using nolds
# corrected=True applies Anis-Lloyd-Peters correction
# fit='RANSAC' is robust to outliers
H_nolds = nolds.hurst_rs(returns, fit='RANSAC', corrected=True)
if self.verbose:
print(f" ✓ H estimated: {H_nolds:.4f}")
return H_nolds
except Exception as e:
if self.verbose:
print(f" ✗ nolds estimation failed: {e}")
return None
def _print_H_interpretation(self):
"""Print interpretation of estimated H value."""
if not self.verbose or self.H is None:
return
print(f"\n Interpretation of H = {self.H:.4f}:")
# Check if close to 0.5
if abs(self.H - 0.5) < 0.05:
print(f" → H ≈ 0.5: Close to random walk")
print(f" (No strong long-term memory)")
elif self.H > 0.5:
if self.H > 0.7:
print(f" → H > 0.7: STRONGLY persistent")
print(f" (Strong trends, long memory)")
else:
print(f" → H > 0.5: Persistent process")
print(f" (Moderate trends, some long memory)")
elif self.H < 0.5:
if self.H < 0.3:
print(f" → H < 0.3: STRONGLY anti-persistent")
print(f" (Sharp mean reversion)")
else:
print(f" → H < 0.5: Anti-persistent")
print(f" (Mean-reverting behavior)")
# Additional context
print(f"\n Context:")
print(f" - Random walk: H = 0.50")
print(f" - FX markets typically: H = 0.48-0.55")
print(f" - Stock markets typically: H = 0.52-0.58")
# Warning if extreme
if self.H > 0.8 or self.H < 0.2:
print(f"\n ⚠️ EXTREME H value!")
print(f" This is unusual for financial data")
print(f" → Check data quality")
print(f" → May indicate measurement error")
def estimate_H(self, use_nolds=True):
"""
Estimate H (self-affinity index).
If use_nolds=True, uses robust R/S method from nolds library.
Otherwise, finds where τ(1/H) = 0 from scaling function.
Parameters:
-----------
use_nolds : bool
If True, use nolds.hurst_rs() method (recommended)
Returns:
--------
float
Estimated H value
"""
# Try nolds first (more robust)
if use_nolds and NOLDS_AVAILABLE:
H_nolds = self.estimate_H_nolds()
if H_nolds is not None:
self.H = H_nolds
self._print_H_interpretation()
return self.H
# Fallback to τ(1/H) = 0 method
if self.verbose:
print("\nEstimating H from τ(1/H) = 0...")
print(" (Finding q where τ(q) = 0)")
# Create interpolation function for τ(q)
# Use cubic spline for smooth interpolation
self.tau_q_func = interp1d(self.q_values, self.tau_q,
kind='cubic',
fill_value='extrapolate',
bounds_error=False)
# Find where τ(q) = 0
# τ(q) should cross zero somewhere between q_min and q_max
try:
# Use Brent's method to find root
# τ(q) should be positive for small q and negative for large q
# or vice versa depending on data
# Check if we have a sign change
tau_min = self.tau_q_func(self.q_values.min())
tau_max = self.tau_q_func(self.q_values.max())
if tau_min * tau_max > 0:
# No sign change - τ(q) doesn't cross zero in our range
# This can happen with limited q range
if self.verbose:
print(f" ⚠️ No zero crossing found in q range")
print(f" τ(q_min={self.q_values.min():.2f}) = {tau_min:.4f}")
print(f" τ(q_max={self.q_values.max():.2f}) = {tau_max:.4f}")
# Estimate H using linear extrapolation
# Find q where τ(q) would be closest to 0
idx_closest = np.argmin(np.abs(self.tau_q))
q_zero = self.q_values[idx_closest]
self.H = 1.0 / q_zero
if self.verbose:
print(f" Using closest point: q = {q_zero:.4f}, τ(q) = {self.tau_q[idx_closest]:.4f}")
else:
# Find zero crossing using Brent's method
q_zero = brentq(self.tau_q_func, self.q_values.min(), self.q_values.max())
self.H = 1.0 / q_zero
if self.verbose:
print(f" ✓ Found: q = {q_zero:.4f} where τ(q) ≈ 0")
print(f" ✓ H = 1/q = {self.H:.4f}")
# Print interpretation
self._print_H_interpretation()
except Exception as e:
if self.verbose:
print(f" ✗ Error estimating H: {e}")
print(f" Using fallback estimation method")
# Fallback: linear fit near τ(q) = 0
# Find the two points closest to τ = 0
idx = np.argsort(np.abs(self.tau_q))[:2]
q1, q2 = self.q_values[idx]
tau1, tau2 = self.tau_q[idx]
# Linear interpolation
if tau2 != tau1:
q_zero = q1 + (0 - tau1) * (q2 - q1) / (tau2 - tau1)
self.H = 1.0 / q_zero
else:
# Use average
q_zero = (q1 + q2) / 2
self.H = 1.0 / q_zero
if self.verbose:
print(f" Estimated H = {self.H:.4f} (fallback method)")
return self.H
def validate_scaling_function(self):
"""
Validate that τ(q) has expected properties.
From paper:
1. τ(q) should be concave (d²τ/dq² < 0)
2. τ(0) = 0 (by definition)
3. τ(1) relates to mean behavior
4. τ(q) should be smooth
"""
if self.verbose:
print("\nValidating scaling function properties...")
issues = []
# Check concavity (second derivative should be negative)
d2_tau = np.diff(self.tau_q, 2) # Second differences approximate second derivative
concave_pct = 100 * np.sum(d2_tau < 0) / len(d2_tau)
if concave_pct < 70:
issues.append(f"Low concavity: only {concave_pct:.1f}% of points are concave")
# Check τ(q≈0) is reasonable
# Theoretically: τ(0) = -1 (by normalization in some formulations)
# In practice: τ(q≈0) should be close to -1.0 (between -0.8 and -1.2 is OK)
if self.q_values.min() <= 0.1:
idx_near_zero = np.argmin(np.abs(self.q_values))
q_near_zero = self.q_values[idx_near_zero]
tau_at_zero = self.tau_q[idx_near_zero]
# Check if it's far from expected -1.0
if tau_at_zero < -1.5 or tau_at_zero > -0.5:
issues.append(f"τ(q≈{q_near_zero:.2f}) = {tau_at_zero:.4f} (expected ≈ -1.0, acceptable range: -0.5 to -1.5)")
# Check smoothness (no large jumps)
d_tau = np.diff(self.tau_q)
max_jump = np.max(np.abs(d_tau))
if max_jump > 1.0:
issues.append(f"Large jump in τ(q): {max_jump:.4f}")
if issues:
print(" ⚠️ Potential issues found:")
for issue in issues:
print(f" - {issue}")
else:
print(" ✓ Scaling function looks good")
print(f" Concavity: {concave_pct:.1f}% of points")
return len(issues) == 0
def check_multifractality(self):
"""
CRITICAL TEST: Determine if data is truly MULTIFRACTAL vs monofractal.
This is THE decisive test for MMAR viability!
Key principle:
- Linear τ(q) → Monofractal (e.g., random walk) → MMAR will fail
- Concave τ(q) → Multifractal → MMAR worth pursuing
Returns:
--------
dict
Dictionary with multifractality assessment and scores
"""
print("\n" + "="*60)
print("CRITICAL TEST: MULTIFRACTALITY CHECK")
print("="*60)
# Test 1: Concavity (second derivative)
d2_tau = np.diff(self.tau_q, 2)
concave_pct = 100 * np.sum(d2_tau < 0) / len(d2_tau)
mean_d2 = np.mean(d2_tau)
print(f"\n1. Concavity Test:")
print(f" τ(q) concave at {concave_pct:.1f}% of points")
print(f" Mean d²τ/dq² = {mean_d2:.6f}")
if concave_pct >= 80:
concavity_verdict = "STRONGLY CONCAVE ✓✓"
concavity_score = 3
elif concave_pct >= 60:
concavity_verdict = "MODERATELY CONCAVE ✓"
concavity_score = 2
elif concave_pct >= 40:
concavity_verdict = "WEAKLY CONCAVE ~"
concavity_score = 1
else:
concavity_verdict = "NOT CONCAVE ✗"
concavity_score = 0
print(f"{concavity_verdict}")
# Test 2: Compare with theoretical random walk
# For Brownian motion: τ(q) = q*H - 1 where H = 0.5
# So τ(q) = 0.5*q - 1 (perfectly linear)
q_range = self.q_values[self.q_values > 0]
tau_range = self.tau_q[self.q_values > 0]
# Fit linear model
from scipy.stats import linregress
slope_linear, intercept_linear, r_value_linear, _, _ = linregress(q_range, tau_range)
r2_linear = r_value_linear ** 2
# Calculate residuals from linear fit
tau_linear_fit = slope_linear * q_range + intercept_linear
rmse_from_linear = np.sqrt(np.mean((tau_range - tau_linear_fit)**2))
print(f"\n2. Linearity Test (random walk comparison):")
print(f" R² of linear fit = {r2_linear:.4f}")
print(f" RMSE from linear = {rmse_from_linear:.6f}")
if r2_linear > 0.99 and rmse_from_linear < 0.05:
linearity_verdict = "HIGHLY LINEAR (like random walk) ✗"
linearity_score = 0
elif r2_linear > 0.95:
linearity_verdict = "MOSTLY LINEAR (weakly multifractal) ~"
linearity_score = 1
elif r2_linear > 0.90:
linearity_verdict = "MODERATELY NONLINEAR ✓"
linearity_score = 2
else:
linearity_verdict = "STRONGLY NONLINEAR ✓✓"
linearity_score = 3
print(f"{linearity_verdict}")
# Test 3: Range of τ(q) derivatives
# Multifractals have varying local slopes
d_tau = np.diff(self.tau_q)
slope_range = np.ptp(d_tau) # Peak-to-peak
slope_std = np.std(d_tau)
print(f"\n3. Slope Variation Test:")
print(f" Range of dτ/dq = {slope_range:.4f}")
print(f" Std dev of dτ/dq = {slope_std:.4f}")
if slope_range > 0.5:
variation_verdict = "HIGH VARIATION ✓✓"
variation_score = 3
elif slope_range > 0.3:
variation_verdict = "MODERATE VARIATION ✓"
variation_score = 2
elif slope_range > 0.1:
variation_verdict = "LOW VARIATION ~"
variation_score = 1
else:
variation_verdict = "MINIMAL VARIATION (monofractal) ✗"
variation_score = 0
print(f"{variation_verdict}")
# Overall verdict
total_score = concavity_score + linearity_score + variation_score
max_score = 9
print(f"\n" + "="*60)
print(f"OVERALL MULTIFRACTALITY SCORE: {total_score}/{max_score}")
print("="*60)
if total_score >= 7:
overall = "STRONGLY MULTIFRACTAL ✓✓"
mmar_recommendation = "MMAR highly recommended - proceed with confidence"
elif total_score >= 5:
overall = "MODERATELY MULTIFRACTAL ✓"
mmar_recommendation = "MMAR worth trying - may outperform GARCH"
elif total_score >= 3:
overall = "WEAKLY MULTIFRACTAL ~"
mmar_recommendation = "MMAR uncertain - run comparison but GARCH may win"
else:
overall = "NOT MULTIFRACTAL (Monofractal) ✗"
mmar_recommendation = "MMAR NOT recommended - data is too close to random walk"
print(f"\n{overall}")
print(f"{mmar_recommendation}")
print("="*60 + "\n")
return {
'concavity_pct': concave_pct,
'concavity_score': concavity_score,
'linear_r2': r2_linear,
'linearity_score': linearity_score,
'slope_range': slope_range,
'variation_score': variation_score,
'total_score': total_score,
'max_score': max_score,
'is_multifractal': total_score >= 5,
'recommendation': mmar_recommendation
}
def get_tau_q_at_q(self, q):
"""
Get τ(q) value at specific q (uses interpolation).
Parameters:
-----------
q : float or array-like
Moment order(s)
Returns:
--------
float or np.ndarray
τ(q) value(s)
"""
if self.tau_q_func is None:
self.tau_q_func = interp1d(self.q_values, self.tau_q,
kind='cubic',
fill_value='extrapolate',
bounds_error=False)
return self.tau_q_func(q)
def plot_scaling_function(self, save_path=None):
"""
Plot τ(q) vs q.
Shows the scaling function extracted from partition plots.
Key features:
- Should be concave
- Crosses zero at q = 1/H
"""
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))
# Left plot: τ(q) with data points and fit
ax1.plot(self.q_values, self.tau_q, 'o-',
linewidth=2, markersize=6,
label='τ(q) = slope - 1', color='steelblue')
# Add horizontal line at τ = 0
ax1.axhline(y=0, color='red', linestyle='--',
linewidth=1, alpha=0.5, label='τ(q) = 0')
# Mark H position if estimated
if self.H is not None:
q_H = 1.0 / self.H
ax1.axvline(x=q_H, color='green', linestyle='--',
linewidth=1, alpha=0.5, label=f'q = 1/H = {q_H:.3f}')
ax1.plot([q_H], [0], 'go', markersize=10,
label=f'H = {self.H:.4f}')
ax1.set_xlabel('q (moment order)', fontsize=12)
ax1.set_ylabel('τ(q)', fontsize=12)
ax1.set_title('Scaling Function τ(q)\n(Should be concave)',
fontsize=13, fontweight='bold')
ax1.legend(fontsize=9)
ax1.grid(True, alpha=0.3)
# Right plot: Slopes (before subtracting 1)
ax2.plot(self.q_values, self.slopes, 'o-',
linewidth=2, markersize=6,
color='darkorange', label='Slope = τ(q) + 1')
# Add horizontal line at slope = 1 (where τ = 0)
ax2.axhline(y=1, color='red', linestyle='--',
linewidth=1, alpha=0.5, label='Slope = 1 (τ = 0)')
if self.H is not None:
q_H = 1.0 / self.H
ax2.axvline(x=q_H, color='green', linestyle='--',
linewidth=1, alpha=0.5)
ax2.set_xlabel('q (moment order)', fontsize=12)
ax2.set_ylabel('Slope', fontsize=12)
ax2.set_title('Partition Plot Slopes\n(From Step 1)',
fontsize=13, fontweight='bold')
ax2.legend(fontsize=9)
ax2.grid(True, alpha=0.3)
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches='tight')
if self.verbose:
print(f"Plot saved to: {save_path}")
else:
plt.show()
plt.close()
def plot_quality_metrics(self, save_path=None):
"""
Plot R² values and other quality metrics from Step 1.
This shows which q values have reliable τ(q) estimates.
"""
fig, ax = plt.subplots(figsize=(10, 6))
# Plot R² vs q
ax.plot(self.q_values, self.r_squared, 'o-',
linewidth=2, markersize=6, color='purple')
# Add threshold line
ax.axhline(y=config.MIN_R_SQUARED, color='red',
linestyle='--', linewidth=2,
label=f'Threshold = {config.MIN_R_SQUARED}')
# Color-code points
good_mask = self.r_squared >= config.MIN_R_SQUARED
ax.scatter(self.q_values[good_mask], self.r_squared[good_mask],
s=100, c='green', alpha=0.5, zorder=5, label='Good fit')
ax.scatter(self.q_values[~good_mask], self.r_squared[~good_mask],
s=100, c='red', alpha=0.5, zorder=5, label='Poor fit')
ax.set_xlabel('q (moment order)', fontsize=12)
ax.set_ylabel('R² (goodness of fit)', fontsize=12)
ax.set_title('Quality of τ(q) Estimates\n(Higher R² = more reliable)',
fontsize=13, fontweight='bold')
ax.legend(fontsize=10)
ax.grid(True, alpha=0.3)
ax.set_ylim([0, 1.05])
# Add summary text
mean_r2 = self.r_squared.mean()
good_pct = 100 * np.sum(good_mask) / len(good_mask)
ax.text(0.02, 0.02,
f'Mean R² = {mean_r2:.4f}\n{good_pct:.1f}% above threshold',
transform=ax.transAxes,
verticalalignment='bottom',
fontsize=10,
bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches='tight')
if self.verbose:
print(f"Plot saved to: {save_path}")
else:
plt.show()
plt.close()
def get_results_dataframe(self):
"""
Get results as pandas DataFrame.
Returns:
--------
pd.DataFrame
DataFrame with q, slope, tau_q, r_squared
"""
df = pd.DataFrame({
'q': self.q_values,
'slope': self.slopes,
'tau_q': self.tau_q,
'r_squared': self.r_squared
})
return df
def save_results(self, filepath):
"""
Save results to CSV.
Parameters:
-----------
filepath : str
Path to save CSV file
"""
df = self.get_results_dataframe()
# Add H as metadata in first row
if self.H is not None:
# Create metadata row
metadata = pd.DataFrame({
'q': ['H_parameter'],
'slope': [self.H],
'tau_q': [f'H={self.H:.6f}'],
'r_squared': ['']
})
df = pd.concat([metadata, df], ignore_index=True)
df.to_csv(filepath, index=False)
if self.verbose:
print(f"Results saved to: {filepath}")
def run_scaling_extraction(checker, output_dir=None, save_plots=True):
"""
Complete workflow for extracting scaling function.
Parameters:
-----------
checker : FractalityChecker
Completed checker from Step 1
output_dir : str, optional
Directory to save results
save_plots : bool
Whether to save plots
Returns:
--------
ScalingExtractor
Extractor object with results
"""
# Create output directory
if output_dir is None:
output_dir = config.PLOT_DIR
Path(output_dir).mkdir(parents=True, exist_ok=True)
# Initialize extractor
extractor = ScalingExtractor(checker)
# Run analysis
print("\n" + "="*60)
print("STEP 2: EXTRACTING SCALING FUNCTION τ(q)")
print("="*60)
# Estimate H
H = extractor.estimate_H()
# Validate
extractor.validate_scaling_function()
# CRITICAL: Check multifractality
mf_results = extractor.check_multifractality()
# Save results
if config.SAVE_INTERMEDIATE:
results_path = Path(output_dir) / "step2_scaling_results.csv"
extractor.save_results(results_path)
# Create plots
if save_plots:
scaling_plot_path = Path(output_dir) / "step2_scaling_function.png"
extractor.plot_scaling_function(save_path=scaling_plot_path)
quality_plot_path = Path(output_dir) / "step2_quality_metrics.png"
extractor.plot_quality_metrics(save_path=quality_plot_path)
# Print summary
print("\n" + "="*60)
print("STEP 2 COMPLETE")
print("="*60)
print(f"\nKey Results:")
print(f" H (self-affinity index) = {H:.4f}")
print(f" τ(q) range: [{extractor.tau_q.min():.4f}, {extractor.tau_q.max():.4f}]")
print(f" Mean R² from Step 1: {extractor.r_squared.mean():.4f}")
print(f" Multifractality Score: {mf_results['total_score']}/{mf_results['max_score']}")
# Decision based on multifractality check
print(f"\n" + "="*60)
if mf_results['is_multifractal']:
print("✓✓ DATA IS MULTIFRACTAL - PROCEED TO STEP 3")
print("="*60)
print(f"\n{mf_results['recommendation']}")
print(f"\n→ Next: Run python run_step3.py")
else:
print("✗✗ DATA IS NOT SUFFICIENTLY MULTIFRACTAL")
print("="*60)
print(f"\n{mf_results['recommendation']}")
print(f"\nConsider:")
print(f" - Using longer timeframe (15-min, hourly)")
print(f" - Different date range or asset")
print(f" - Comparing MMAR vs GARCH anyway (for research)")
print(f"\nYou can still proceed to Step 3, but MMAR may not outperform simpler models.")
print("="*60 + "\n")
return extractor
if __name__ == "__main__":
print("Step 2: Extract Scaling Function")
print("="*60)
print("\nThis script extracts τ(q) from Step 1 partition function results.")
print("You must run Step 1 first to get the FractalityChecker object.\n")
print("Usage:")
print(" from step1_check_fractality import run_fractality_check")
print(" from step2_extract_scaling import run_scaling_extraction")
print("")
print(" # Step 1")
print(" checker = run_fractality_check(returns)")
print("")
print(" # Step 2")
print(" extractor = run_scaling_extraction(checker)")
print("")
print(" # Access results")
print(" H = extractor.H")
print(" tau_q = extractor.tau_q")