""" Step 2: Extract Scaling Function τ(q) Extracts the scaling function from Step 1 partition function slopes From paper equation (2.1): E(|X(t)|^q) = c(q) × t^(τ(q)+1) Therefore: slope = τ(q) + 1 → τ(q) = slope - 1 Key parameter to estimate: H (self-affinity index) where τ(1/H) = 0 """ import numpy as np import pandas as pd import matplotlib.pyplot as plt from scipy.interpolate import interp1d from scipy.optimize import brentq from pathlib import Path import config # Try to import nolds for robust Hurst estimation try: import nolds NOLDS_AVAILABLE = True except ImportError: NOLDS_AVAILABLE = False print("⚠️ Warning: nolds not available. Install with: pip install nolds") class ScalingExtractor: """ Extracts scaling function τ(q) from partition function results. The scaling function τ(q) characterizes the multifractal properties of the data. It is extracted from the slopes of partition plots: τ(q) = slope - 1 Key parameter: H (Hurst exponent / self-affinity index) is found where τ(1/H) = 0 """ def __init__(self, checker, verbose=config.VERBOSE): """ Initialize scaling extractor. Parameters: ----------- checker : FractalityChecker Completed checker object from Step 1 with slopes computed verbose : bool Print detailed information """ self.checker = checker self.verbose = verbose # Extract from checker self.q_values = np.array(sorted(checker.slopes.keys())) self.slopes = np.array([checker.slopes[q] for q in self.q_values]) self.r_squared = np.array([checker.r_squared_values[q] for q in self.q_values]) # Calculate τ(q) = slope - 1 self.tau_q = self.slopes - 1 # Storage for results self.H = None # Self-affinity index self.tau_q_func = None # Interpolation function for τ(q) if self.verbose: print(f"\nScaling Extractor Initialized") print(f"Number of q values: {len(self.q_values)}") print(f"q range: {self.q_values.min():.2f} to {self.q_values.max():.2f}") print(f"τ(q) range: {self.tau_q.min():.4f} to {self.tau_q.max():.4f}") def estimate_H_nolds(self): """ Estimate H using nolds R/S (rescaled range) method. This is more robust than finding τ(1/H) = 0 numerically. Uses the original returns from the checker. Returns: -------- float Estimated H value using R/S method """ if not NOLDS_AVAILABLE: if self.verbose: print(" ⚠️ nolds not available, falling back to τ(q) method") return None if self.verbose: print("\nEstimating H using nolds R/S method...") print(" (More robust than τ(1/H) = 0 approach)") try: # Get returns from checker returns = self.checker.returns # Estimate Hurst using nolds # corrected=True applies Anis-Lloyd-Peters correction # fit='RANSAC' is robust to outliers H_nolds = nolds.hurst_rs(returns, fit='RANSAC', corrected=True) if self.verbose: print(f" ✓ H estimated: {H_nolds:.4f}") return H_nolds except Exception as e: if self.verbose: print(f" ✗ nolds estimation failed: {e}") return None def _print_H_interpretation(self): """Print interpretation of estimated H value.""" if not self.verbose or self.H is None: return print(f"\n Interpretation of H = {self.H:.4f}:") # Check if close to 0.5 if abs(self.H - 0.5) < 0.05: print(f" → H ≈ 0.5: Close to random walk") print(f" (No strong long-term memory)") elif self.H > 0.5: if self.H > 0.7: print(f" → H > 0.7: STRONGLY persistent") print(f" (Strong trends, long memory)") else: print(f" → H > 0.5: Persistent process") print(f" (Moderate trends, some long memory)") elif self.H < 0.5: if self.H < 0.3: print(f" → H < 0.3: STRONGLY anti-persistent") print(f" (Sharp mean reversion)") else: print(f" → H < 0.5: Anti-persistent") print(f" (Mean-reverting behavior)") # Additional context print(f"\n Context:") print(f" - Random walk: H = 0.50") print(f" - FX markets typically: H = 0.48-0.55") print(f" - Stock markets typically: H = 0.52-0.58") # Warning if extreme if self.H > 0.8 or self.H < 0.2: print(f"\n ⚠️ EXTREME H value!") print(f" This is unusual for financial data") print(f" → Check data quality") print(f" → May indicate measurement error") def estimate_H(self, use_nolds=True): """ Estimate H (self-affinity index). If use_nolds=True, uses robust R/S method from nolds library. Otherwise, finds where τ(1/H) = 0 from scaling function. Parameters: ----------- use_nolds : bool If True, use nolds.hurst_rs() method (recommended) Returns: -------- float Estimated H value """ # Try nolds first (more robust) if use_nolds and NOLDS_AVAILABLE: H_nolds = self.estimate_H_nolds() if H_nolds is not None: self.H = H_nolds self._print_H_interpretation() return self.H # Fallback to τ(1/H) = 0 method if self.verbose: print("\nEstimating H from τ(1/H) = 0...") print(" (Finding q where τ(q) = 0)") # Create interpolation function for τ(q) # Use cubic spline for smooth interpolation self.tau_q_func = interp1d(self.q_values, self.tau_q, kind='cubic', fill_value='extrapolate', bounds_error=False) # Find where τ(q) = 0 # τ(q) should cross zero somewhere between q_min and q_max try: # Use Brent's method to find root # τ(q) should be positive for small q and negative for large q # or vice versa depending on data # Check if we have a sign change tau_min = self.tau_q_func(self.q_values.min()) tau_max = self.tau_q_func(self.q_values.max()) if tau_min * tau_max > 0: # No sign change - τ(q) doesn't cross zero in our range # This can happen with limited q range if self.verbose: print(f" ⚠️ No zero crossing found in q range") print(f" τ(q_min={self.q_values.min():.2f}) = {tau_min:.4f}") print(f" τ(q_max={self.q_values.max():.2f}) = {tau_max:.4f}") # Estimate H using linear extrapolation # Find q where τ(q) would be closest to 0 idx_closest = np.argmin(np.abs(self.tau_q)) q_zero = self.q_values[idx_closest] self.H = 1.0 / q_zero if self.verbose: print(f" Using closest point: q = {q_zero:.4f}, τ(q) = {self.tau_q[idx_closest]:.4f}") else: # Find zero crossing using Brent's method q_zero = brentq(self.tau_q_func, self.q_values.min(), self.q_values.max()) self.H = 1.0 / q_zero if self.verbose: print(f" ✓ Found: q = {q_zero:.4f} where τ(q) ≈ 0") print(f" ✓ H = 1/q = {self.H:.4f}") # Print interpretation self._print_H_interpretation() except Exception as e: if self.verbose: print(f" ✗ Error estimating H: {e}") print(f" Using fallback estimation method") # Fallback: linear fit near τ(q) = 0 # Find the two points closest to τ = 0 idx = np.argsort(np.abs(self.tau_q))[:2] q1, q2 = self.q_values[idx] tau1, tau2 = self.tau_q[idx] # Linear interpolation if tau2 != tau1: q_zero = q1 + (0 - tau1) * (q2 - q1) / (tau2 - tau1) self.H = 1.0 / q_zero else: # Use average q_zero = (q1 + q2) / 2 self.H = 1.0 / q_zero if self.verbose: print(f" Estimated H = {self.H:.4f} (fallback method)") return self.H def validate_scaling_function(self): """ Validate that τ(q) has expected properties. From paper: 1. τ(q) should be concave (d²τ/dq² < 0) 2. τ(0) = 0 (by definition) 3. τ(1) relates to mean behavior 4. τ(q) should be smooth """ if self.verbose: print("\nValidating scaling function properties...") issues = [] # Check concavity (second derivative should be negative) d2_tau = np.diff(self.tau_q, 2) # Second differences approximate second derivative concave_pct = 100 * np.sum(d2_tau < 0) / len(d2_tau) if concave_pct < 70: issues.append(f"Low concavity: only {concave_pct:.1f}% of points are concave") # Check τ(q≈0) is reasonable # Theoretically: τ(0) = -1 (by normalization in some formulations) # In practice: τ(q≈0) should be close to -1.0 (between -0.8 and -1.2 is OK) if self.q_values.min() <= 0.1: idx_near_zero = np.argmin(np.abs(self.q_values)) q_near_zero = self.q_values[idx_near_zero] tau_at_zero = self.tau_q[idx_near_zero] # Check if it's far from expected -1.0 if tau_at_zero < -1.5 or tau_at_zero > -0.5: issues.append(f"τ(q≈{q_near_zero:.2f}) = {tau_at_zero:.4f} (expected ≈ -1.0, acceptable range: -0.5 to -1.5)") # Check smoothness (no large jumps) d_tau = np.diff(self.tau_q) max_jump = np.max(np.abs(d_tau)) if max_jump > 1.0: issues.append(f"Large jump in τ(q): {max_jump:.4f}") if issues: print(" ⚠️ Potential issues found:") for issue in issues: print(f" - {issue}") else: print(" ✓ Scaling function looks good") print(f" Concavity: {concave_pct:.1f}% of points") return len(issues) == 0 def check_multifractality(self): """ CRITICAL TEST: Determine if data is truly MULTIFRACTAL vs monofractal. This is THE decisive test for MMAR viability! Key principle: - Linear τ(q) → Monofractal (e.g., random walk) → MMAR will fail - Concave τ(q) → Multifractal → MMAR worth pursuing Returns: -------- dict Dictionary with multifractality assessment and scores """ print("\n" + "="*60) print("CRITICAL TEST: MULTIFRACTALITY CHECK") print("="*60) # Test 1: Concavity (second derivative) d2_tau = np.diff(self.tau_q, 2) concave_pct = 100 * np.sum(d2_tau < 0) / len(d2_tau) mean_d2 = np.mean(d2_tau) print(f"\n1. Concavity Test:") print(f" τ(q) concave at {concave_pct:.1f}% of points") print(f" Mean d²τ/dq² = {mean_d2:.6f}") if concave_pct >= 80: concavity_verdict = "STRONGLY CONCAVE ✓✓" concavity_score = 3 elif concave_pct >= 60: concavity_verdict = "MODERATELY CONCAVE ✓" concavity_score = 2 elif concave_pct >= 40: concavity_verdict = "WEAKLY CONCAVE ~" concavity_score = 1 else: concavity_verdict = "NOT CONCAVE ✗" concavity_score = 0 print(f" → {concavity_verdict}") # Test 2: Compare with theoretical random walk # For Brownian motion: τ(q) = q*H - 1 where H = 0.5 # So τ(q) = 0.5*q - 1 (perfectly linear) q_range = self.q_values[self.q_values > 0] tau_range = self.tau_q[self.q_values > 0] # Fit linear model from scipy.stats import linregress slope_linear, intercept_linear, r_value_linear, _, _ = linregress(q_range, tau_range) r2_linear = r_value_linear ** 2 # Calculate residuals from linear fit tau_linear_fit = slope_linear * q_range + intercept_linear rmse_from_linear = np.sqrt(np.mean((tau_range - tau_linear_fit)**2)) print(f"\n2. Linearity Test (random walk comparison):") print(f" R² of linear fit = {r2_linear:.4f}") print(f" RMSE from linear = {rmse_from_linear:.6f}") if r2_linear > 0.99 and rmse_from_linear < 0.05: linearity_verdict = "HIGHLY LINEAR (like random walk) ✗" linearity_score = 0 elif r2_linear > 0.95: linearity_verdict = "MOSTLY LINEAR (weakly multifractal) ~" linearity_score = 1 elif r2_linear > 0.90: linearity_verdict = "MODERATELY NONLINEAR ✓" linearity_score = 2 else: linearity_verdict = "STRONGLY NONLINEAR ✓✓" linearity_score = 3 print(f" → {linearity_verdict}") # Test 3: Range of τ(q) derivatives # Multifractals have varying local slopes d_tau = np.diff(self.tau_q) slope_range = np.ptp(d_tau) # Peak-to-peak slope_std = np.std(d_tau) print(f"\n3. Slope Variation Test:") print(f" Range of dτ/dq = {slope_range:.4f}") print(f" Std dev of dτ/dq = {slope_std:.4f}") if slope_range > 0.5: variation_verdict = "HIGH VARIATION ✓✓" variation_score = 3 elif slope_range > 0.3: variation_verdict = "MODERATE VARIATION ✓" variation_score = 2 elif slope_range > 0.1: variation_verdict = "LOW VARIATION ~" variation_score = 1 else: variation_verdict = "MINIMAL VARIATION (monofractal) ✗" variation_score = 0 print(f" → {variation_verdict}") # Overall verdict total_score = concavity_score + linearity_score + variation_score max_score = 9 print(f"\n" + "="*60) print(f"OVERALL MULTIFRACTALITY SCORE: {total_score}/{max_score}") print("="*60) if total_score >= 7: overall = "STRONGLY MULTIFRACTAL ✓✓" mmar_recommendation = "MMAR highly recommended - proceed with confidence" elif total_score >= 5: overall = "MODERATELY MULTIFRACTAL ✓" mmar_recommendation = "MMAR worth trying - may outperform GARCH" elif total_score >= 3: overall = "WEAKLY MULTIFRACTAL ~" mmar_recommendation = "MMAR uncertain - run comparison but GARCH may win" else: overall = "NOT MULTIFRACTAL (Monofractal) ✗" mmar_recommendation = "MMAR NOT recommended - data is too close to random walk" print(f"\n{overall}") print(f"→ {mmar_recommendation}") print("="*60 + "\n") return { 'concavity_pct': concave_pct, 'concavity_score': concavity_score, 'linear_r2': r2_linear, 'linearity_score': linearity_score, 'slope_range': slope_range, 'variation_score': variation_score, 'total_score': total_score, 'max_score': max_score, 'is_multifractal': total_score >= 5, 'recommendation': mmar_recommendation } def get_tau_q_at_q(self, q): """ Get τ(q) value at specific q (uses interpolation). Parameters: ----------- q : float or array-like Moment order(s) Returns: -------- float or np.ndarray τ(q) value(s) """ if self.tau_q_func is None: self.tau_q_func = interp1d(self.q_values, self.tau_q, kind='cubic', fill_value='extrapolate', bounds_error=False) return self.tau_q_func(q) def plot_scaling_function(self, save_path=None): """ Plot τ(q) vs q. Shows the scaling function extracted from partition plots. Key features: - Should be concave - Crosses zero at q = 1/H """ fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5)) # Left plot: τ(q) with data points and fit ax1.plot(self.q_values, self.tau_q, 'o-', linewidth=2, markersize=6, label='τ(q) = slope - 1', color='steelblue') # Add horizontal line at τ = 0 ax1.axhline(y=0, color='red', linestyle='--', linewidth=1, alpha=0.5, label='τ(q) = 0') # Mark H position if estimated if self.H is not None: q_H = 1.0 / self.H ax1.axvline(x=q_H, color='green', linestyle='--', linewidth=1, alpha=0.5, label=f'q = 1/H = {q_H:.3f}') ax1.plot([q_H], [0], 'go', markersize=10, label=f'H = {self.H:.4f}') ax1.set_xlabel('q (moment order)', fontsize=12) ax1.set_ylabel('τ(q)', fontsize=12) ax1.set_title('Scaling Function τ(q)\n(Should be concave)', fontsize=13, fontweight='bold') ax1.legend(fontsize=9) ax1.grid(True, alpha=0.3) # Right plot: Slopes (before subtracting 1) ax2.plot(self.q_values, self.slopes, 'o-', linewidth=2, markersize=6, color='darkorange', label='Slope = τ(q) + 1') # Add horizontal line at slope = 1 (where τ = 0) ax2.axhline(y=1, color='red', linestyle='--', linewidth=1, alpha=0.5, label='Slope = 1 (τ = 0)') if self.H is not None: q_H = 1.0 / self.H ax2.axvline(x=q_H, color='green', linestyle='--', linewidth=1, alpha=0.5) ax2.set_xlabel('q (moment order)', fontsize=12) ax2.set_ylabel('Slope', fontsize=12) ax2.set_title('Partition Plot Slopes\n(From Step 1)', fontsize=13, fontweight='bold') ax2.legend(fontsize=9) ax2.grid(True, alpha=0.3) plt.tight_layout() if save_path: plt.savefig(save_path, dpi=300, bbox_inches='tight') if self.verbose: print(f"Plot saved to: {save_path}") else: plt.show() plt.close() def plot_quality_metrics(self, save_path=None): """ Plot R² values and other quality metrics from Step 1. This shows which q values have reliable τ(q) estimates. """ fig, ax = plt.subplots(figsize=(10, 6)) # Plot R² vs q ax.plot(self.q_values, self.r_squared, 'o-', linewidth=2, markersize=6, color='purple') # Add threshold line ax.axhline(y=config.MIN_R_SQUARED, color='red', linestyle='--', linewidth=2, label=f'Threshold = {config.MIN_R_SQUARED}') # Color-code points good_mask = self.r_squared >= config.MIN_R_SQUARED ax.scatter(self.q_values[good_mask], self.r_squared[good_mask], s=100, c='green', alpha=0.5, zorder=5, label='Good fit') ax.scatter(self.q_values[~good_mask], self.r_squared[~good_mask], s=100, c='red', alpha=0.5, zorder=5, label='Poor fit') ax.set_xlabel('q (moment order)', fontsize=12) ax.set_ylabel('R² (goodness of fit)', fontsize=12) ax.set_title('Quality of τ(q) Estimates\n(Higher R² = more reliable)', fontsize=13, fontweight='bold') ax.legend(fontsize=10) ax.grid(True, alpha=0.3) ax.set_ylim([0, 1.05]) # Add summary text mean_r2 = self.r_squared.mean() good_pct = 100 * np.sum(good_mask) / len(good_mask) ax.text(0.02, 0.02, f'Mean R² = {mean_r2:.4f}\n{good_pct:.1f}% above threshold', transform=ax.transAxes, verticalalignment='bottom', fontsize=10, bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5)) plt.tight_layout() if save_path: plt.savefig(save_path, dpi=300, bbox_inches='tight') if self.verbose: print(f"Plot saved to: {save_path}") else: plt.show() plt.close() def get_results_dataframe(self): """ Get results as pandas DataFrame. Returns: -------- pd.DataFrame DataFrame with q, slope, tau_q, r_squared """ df = pd.DataFrame({ 'q': self.q_values, 'slope': self.slopes, 'tau_q': self.tau_q, 'r_squared': self.r_squared }) return df def save_results(self, filepath): """ Save results to CSV. Parameters: ----------- filepath : str Path to save CSV file """ df = self.get_results_dataframe() # Add H as metadata in first row if self.H is not None: # Create metadata row metadata = pd.DataFrame({ 'q': ['H_parameter'], 'slope': [self.H], 'tau_q': [f'H={self.H:.6f}'], 'r_squared': [''] }) df = pd.concat([metadata, df], ignore_index=True) df.to_csv(filepath, index=False) if self.verbose: print(f"Results saved to: {filepath}") def run_scaling_extraction(checker, output_dir=None, save_plots=True): """ Complete workflow for extracting scaling function. Parameters: ----------- checker : FractalityChecker Completed checker from Step 1 output_dir : str, optional Directory to save results save_plots : bool Whether to save plots Returns: -------- ScalingExtractor Extractor object with results """ # Create output directory if output_dir is None: output_dir = config.PLOT_DIR Path(output_dir).mkdir(parents=True, exist_ok=True) # Initialize extractor extractor = ScalingExtractor(checker) # Run analysis print("\n" + "="*60) print("STEP 2: EXTRACTING SCALING FUNCTION τ(q)") print("="*60) # Estimate H H = extractor.estimate_H() # Validate extractor.validate_scaling_function() # CRITICAL: Check multifractality mf_results = extractor.check_multifractality() # Save results if config.SAVE_INTERMEDIATE: results_path = Path(output_dir) / "step2_scaling_results.csv" extractor.save_results(results_path) # Create plots if save_plots: scaling_plot_path = Path(output_dir) / "step2_scaling_function.png" extractor.plot_scaling_function(save_path=scaling_plot_path) quality_plot_path = Path(output_dir) / "step2_quality_metrics.png" extractor.plot_quality_metrics(save_path=quality_plot_path) # Print summary print("\n" + "="*60) print("STEP 2 COMPLETE") print("="*60) print(f"\nKey Results:") print(f" H (self-affinity index) = {H:.4f}") print(f" τ(q) range: [{extractor.tau_q.min():.4f}, {extractor.tau_q.max():.4f}]") print(f" Mean R² from Step 1: {extractor.r_squared.mean():.4f}") print(f" Multifractality Score: {mf_results['total_score']}/{mf_results['max_score']}") # Decision based on multifractality check print(f"\n" + "="*60) if mf_results['is_multifractal']: print("✓✓ DATA IS MULTIFRACTAL - PROCEED TO STEP 3") print("="*60) print(f"\n{mf_results['recommendation']}") print(f"\n→ Next: Run python run_step3.py") else: print("✗✗ DATA IS NOT SUFFICIENTLY MULTIFRACTAL") print("="*60) print(f"\n{mf_results['recommendation']}") print(f"\nConsider:") print(f" - Using longer timeframe (15-min, hourly)") print(f" - Different date range or asset") print(f" - Comparing MMAR vs GARCH anyway (for research)") print(f"\nYou can still proceed to Step 3, but MMAR may not outperform simpler models.") print("="*60 + "\n") return extractor if __name__ == "__main__": print("Step 2: Extract Scaling Function") print("="*60) print("\nThis script extracts τ(q) from Step 1 partition function results.") print("You must run Step 1 first to get the FractalityChecker object.\n") print("Usage:") print(" from step1_check_fractality import run_fractality_check") print(" from step2_extract_scaling import run_scaling_extraction") print("") print(" # Step 1") print(" checker = run_fractality_check(returns)") print("") print(" # Step 2") print(" extractor = run_scaling_extraction(checker)") print("") print(" # Access results") print(" H = extractor.H") print(" tau_q = extractor.tau_q")