585 lines
20 KiB
Python
585 lines
20 KiB
Python
"""
|
|
Step 3: Fit Multifractal Spectrum f(α)
|
|
Transforms τ(q) to f(α) and fits to 4 distribution types
|
|
|
|
From paper Section 4.c:
|
|
- Transform τ(q) → f(α) using Legendre transform
|
|
- Fit to 4 distributions: Normal, Binomial, Poisson, Gamma
|
|
- Choose best fit (lowest squared error)
|
|
|
|
Legendre transform:
|
|
f_P(α) = αq - τ_P(q)
|
|
where α = dτ/dq
|
|
"""
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
import matplotlib.pyplot as plt
|
|
from scipy.optimize import minimize
|
|
from scipy.optimize import approx_fprime
|
|
from pathlib import Path
|
|
import config
|
|
|
|
|
|
class SpectrumFitter:
|
|
"""
|
|
Fits multifractal spectrum f(α) from scaling function τ(q).
|
|
|
|
The multifractal spectrum characterizes the distribution of
|
|
local singularities (Hölder exponents) in the data.
|
|
|
|
Process:
|
|
1. Transform τ(q) → f(α) via Legendre transform
|
|
2. Fit to 4 theoretical distributions
|
|
3. Select best fit (minimum squared error)
|
|
"""
|
|
|
|
def __init__(self, extractor, verbose=config.VERBOSE):
|
|
"""
|
|
Initialize spectrum fitter.
|
|
|
|
Parameters:
|
|
-----------
|
|
extractor : ScalingExtractor
|
|
Completed extractor from Step 2 with τ(q) and H
|
|
verbose : bool
|
|
Print detailed information
|
|
"""
|
|
self.extractor = extractor
|
|
self.verbose = verbose
|
|
|
|
# Extract from previous step
|
|
self.q_values = extractor.q_values
|
|
self.tau_q = extractor.tau_q
|
|
self.H = extractor.H
|
|
self.tau_q_func = extractor.tau_q_func
|
|
|
|
# Storage for spectrum
|
|
self.alpha_values = None
|
|
self.f_alpha_values = None
|
|
|
|
# Storage for fitted distributions
|
|
self.fitted_distributions = {}
|
|
self.best_distribution = None
|
|
self.best_params = None
|
|
|
|
if self.verbose:
|
|
print(f"\nSpectrum Fitter Initialized")
|
|
print(f"H = {self.H:.4f}")
|
|
print(f"τ(q) range: [{self.tau_q.min():.4f}, {self.tau_q.max():.4f}]")
|
|
|
|
def compute_multifractal_spectrum(self):
|
|
"""
|
|
Compute f(α) from τ(q) using Legendre transform.
|
|
|
|
Legendre transform:
|
|
α(q) = dτ/dq
|
|
f(α) = αq - τ(q)
|
|
|
|
This transforms from q-space to α-space (singularity spectrum).
|
|
"""
|
|
if self.verbose:
|
|
print("\nComputing multifractal spectrum f(α)...")
|
|
|
|
# Ensure interpolation function exists
|
|
if self.tau_q_func is None:
|
|
from scipy.interpolate import interp1d
|
|
self.tau_q_func = interp1d(self.extractor.q_values,
|
|
self.extractor.tau_q,
|
|
kind='cubic',
|
|
fill_value='extrapolate',
|
|
bounds_error=False)
|
|
|
|
# Compute α = dτ/dq numerically
|
|
# Use central differences for interior points
|
|
alpha_values = []
|
|
f_alpha_values = []
|
|
q_for_spectrum = []
|
|
|
|
# Use the q_values from the extractor
|
|
q_values = self.extractor.q_values
|
|
tau_q = self.extractor.tau_q
|
|
|
|
for i, q in enumerate(q_values):
|
|
# Skip very small q values (numerical issues with derivative)
|
|
if q < 0.5: # Increased from 0.1 to avoid edge effects
|
|
continue
|
|
|
|
# Skip values too close to the edge
|
|
if i == 0 or i == len(q_values) - 1:
|
|
continue
|
|
|
|
# Numerical derivative using finite difference
|
|
try:
|
|
# Calculate derivative: dτ/dq at point q
|
|
# Use actual neighboring points instead of fixed h
|
|
q_prev = q_values[i-1]
|
|
q_next = q_values[i+1]
|
|
tau_prev = tau_q[i-1]
|
|
tau_next = tau_q[i+1]
|
|
|
|
# Central difference
|
|
alpha = (tau_next - tau_prev) / (q_next - q_prev)
|
|
f_alpha = alpha * q - tau_q[i]
|
|
|
|
# Only keep valid values
|
|
if np.isfinite(alpha) and np.isfinite(f_alpha):
|
|
alpha_values.append(alpha)
|
|
f_alpha_values.append(f_alpha)
|
|
q_for_spectrum.append(q)
|
|
except Exception as e:
|
|
if self.verbose:
|
|
print(f" Warning: Failed to compute spectrum at q={q:.2f}: {e}")
|
|
continue
|
|
|
|
self.alpha_values = np.array(alpha_values)
|
|
self.f_alpha_values = np.array(f_alpha_values)
|
|
self.q_for_spectrum = np.array(q_for_spectrum)
|
|
|
|
if len(self.alpha_values) == 0:
|
|
raise ValueError("Failed to compute any spectrum points! Check τ(q) data quality.")
|
|
|
|
if self.verbose:
|
|
print(f" ✓ Computed {len(self.alpha_values)} spectrum points")
|
|
print(f" α range: [{self.alpha_values.min():.4f}, {self.alpha_values.max():.4f}]")
|
|
print(f" f(α) range: [{self.f_alpha_values.min():.4f}, {self.f_alpha_values.max():.4f}]")
|
|
|
|
# Find peak (should be at f=1 theoretically)
|
|
idx_max = np.argmax(self.f_alpha_values)
|
|
alpha_0 = self.alpha_values[idx_max]
|
|
f_max = self.f_alpha_values[idx_max]
|
|
print(f" Peak: α₀ = {alpha_0:.4f}, f(α₀) = {f_max:.4f}")
|
|
|
|
def normal_spectrum(self, alpha, alpha_0, H):
|
|
"""
|
|
Lognormal distribution spectrum (Appendix b.i).
|
|
|
|
f_P(α) = 1 - (α - α₀)² / [4H(α₀ - H)]
|
|
|
|
Parameters to fit: α₀ (peak location)
|
|
"""
|
|
denominator = 4 * H * (alpha_0 - H)
|
|
if denominator <= 0:
|
|
return np.full_like(alpha, -np.inf)
|
|
return 1 - (alpha - alpha_0)**2 / denominator
|
|
|
|
def binomial_spectrum(self, alpha, alpha_min, alpha_max):
|
|
"""
|
|
Binomial distribution spectrum (Appendix b.ii).
|
|
|
|
f_P(α) = -(α_max - α)/(α_max - α_min) * log₂((α_max - α)/(α_max - α_min))
|
|
-(α - α_min)/(α_max - α_min) * log₂((α - α_min)/(α_max - α_min))
|
|
|
|
Parameters to fit: α_min, α_max
|
|
"""
|
|
if alpha_max <= alpha_min:
|
|
return np.full_like(alpha, -np.inf)
|
|
|
|
# Clip alpha to valid range
|
|
alpha = np.clip(alpha, alpha_min + 1e-10, alpha_max - 1e-10)
|
|
|
|
term1_ratio = (alpha_max - alpha) / (alpha_max - alpha_min)
|
|
term2_ratio = (alpha - alpha_min) / (alpha_max - alpha_min)
|
|
|
|
# Avoid log(0)
|
|
term1_ratio = np.maximum(term1_ratio, 1e-10)
|
|
term2_ratio = np.maximum(term2_ratio, 1e-10)
|
|
|
|
f = -(term1_ratio * np.log2(term1_ratio) + term2_ratio * np.log2(term2_ratio))
|
|
return f
|
|
|
|
def poisson_spectrum(self, alpha, alpha_0, H, b=2):
|
|
"""
|
|
Poisson distribution spectrum (Appendix b.iii).
|
|
|
|
f_P(α) = 1 - α₀/(H·ln(b)) + (α/H)·log_b(α₀·e/α)
|
|
|
|
Parameters to fit: α₀
|
|
"""
|
|
if alpha_0 <= 0:
|
|
return np.full_like(alpha, -np.inf)
|
|
|
|
# Avoid division by zero
|
|
alpha = np.maximum(alpha, 1e-10)
|
|
|
|
term1 = 1 - alpha_0 / (H * np.log(b))
|
|
term2 = (alpha / H) * (np.log(alpha_0 * np.e / alpha) / np.log(b))
|
|
return term1 + term2
|
|
|
|
def gamma_spectrum(self, alpha, alpha_0, gamma, b=2):
|
|
"""
|
|
Gamma distribution spectrum (Appendix b.iv).
|
|
|
|
f_P(α) = 1 + γ·log_b(α/α₀) + γ(α₀ - α)/(α₀·ln(b))
|
|
|
|
Parameters to fit: α₀, γ
|
|
"""
|
|
if alpha_0 <= 0 or gamma <= 0:
|
|
return np.full_like(alpha, -np.inf)
|
|
|
|
# Avoid division by zero
|
|
alpha = np.maximum(alpha, 1e-10)
|
|
|
|
term1 = gamma * np.log(alpha / alpha_0) / np.log(b)
|
|
term2 = gamma * (alpha_0 - alpha) / (alpha_0 * np.log(b))
|
|
return 1 + term1 + term2
|
|
|
|
def fit_normal_distribution(self):
|
|
"""Fit lognormal spectrum."""
|
|
# CRITICAL FIX: For lognormal spectrum, we MUST have alpha_0 > H
|
|
# Otherwise denominator 4*H*(alpha_0 - H) becomes negative → NaN
|
|
|
|
# Bounds: α₀ must be greater than H and within alpha range
|
|
alpha_min_bound = max(self.H + 1e-6, self.alpha_values.min())
|
|
alpha_max_bound = self.alpha_values.max()
|
|
|
|
# Check if we have valid range
|
|
if alpha_min_bound >= alpha_max_bound:
|
|
# No valid range for lognormal fit
|
|
if self.verbose:
|
|
print(f" Warning: Cannot fit Normal spectrum (need α₀ > H={self.H:.4f}, but alpha_max={alpha_max_bound:.4f})")
|
|
return {'alpha_0': np.nan}, np.inf
|
|
|
|
# Initial guess: middle of valid range
|
|
alpha_0_init = (alpha_min_bound + alpha_max_bound) / 2
|
|
|
|
def objective(params):
|
|
alpha_0 = params[0]
|
|
# Double-check constraint within objective
|
|
if alpha_0 <= self.H:
|
|
return 1e10
|
|
f_fit = self.normal_spectrum(self.alpha_values, alpha_0, self.H)
|
|
# Check for NaN/inf
|
|
if not np.all(np.isfinite(f_fit)):
|
|
return 1e10
|
|
return np.sum((self.f_alpha_values - f_fit)**2)
|
|
|
|
bounds = [(alpha_min_bound, alpha_max_bound)]
|
|
|
|
result = minimize(objective, [alpha_0_init], bounds=bounds, method='L-BFGS-B')
|
|
|
|
alpha_0_opt = result.x[0]
|
|
sse = result.fun
|
|
|
|
return {'alpha_0': alpha_0_opt}, sse
|
|
|
|
def fit_binomial_distribution(self):
|
|
"""Fit binomial spectrum."""
|
|
# Initial guess: use min and max of alpha
|
|
alpha_min_init = self.alpha_values.min()
|
|
alpha_max_init = self.alpha_values.max()
|
|
|
|
def objective(params):
|
|
alpha_min, alpha_max = params
|
|
if alpha_max <= alpha_min:
|
|
return 1e10
|
|
f_fit = self.binomial_spectrum(self.alpha_values, alpha_min, alpha_max)
|
|
return np.sum((self.f_alpha_values - f_fit)**2)
|
|
|
|
# Bounds
|
|
bounds = [(self.alpha_values.min() * 0.5, self.alpha_values.max() * 0.5),
|
|
(self.alpha_values.min() * 1.5, self.alpha_values.max() * 1.5)]
|
|
|
|
result = minimize(objective, [alpha_min_init, alpha_max_init], bounds=bounds, method='L-BFGS-B')
|
|
|
|
alpha_min_opt, alpha_max_opt = result.x
|
|
sse = result.fun
|
|
|
|
return {'alpha_min': alpha_min_opt, 'alpha_max': alpha_max_opt}, sse
|
|
|
|
def fit_poisson_distribution(self):
|
|
"""Fit Poisson spectrum."""
|
|
alpha_0_init = np.median(self.alpha_values)
|
|
|
|
def objective(params):
|
|
alpha_0 = params[0]
|
|
f_fit = self.poisson_spectrum(self.alpha_values, alpha_0, self.H)
|
|
return np.sum((self.f_alpha_values - f_fit)**2)
|
|
|
|
bounds = [(self.alpha_values.min(), self.alpha_values.max())]
|
|
|
|
result = minimize(objective, [alpha_0_init], bounds=bounds, method='L-BFGS-B')
|
|
|
|
alpha_0_opt = result.x[0]
|
|
sse = result.fun
|
|
|
|
return {'alpha_0': alpha_0_opt}, sse
|
|
|
|
def fit_gamma_distribution(self):
|
|
"""Fit Gamma spectrum."""
|
|
alpha_0_init = np.median(self.alpha_values)
|
|
gamma_init = 1.0
|
|
|
|
def objective(params):
|
|
alpha_0, gamma = params
|
|
if gamma <= 0:
|
|
return 1e10
|
|
f_fit = self.gamma_spectrum(self.alpha_values, alpha_0, gamma)
|
|
return np.sum((self.f_alpha_values - f_fit)**2)
|
|
|
|
bounds = [(self.alpha_values.min(), self.alpha_values.max()),
|
|
(0.1, 10.0)]
|
|
|
|
result = minimize(objective, [alpha_0_init, gamma_init], bounds=bounds, method='L-BFGS-B')
|
|
|
|
alpha_0_opt, gamma_opt = result.x
|
|
sse = result.fun
|
|
|
|
return {'alpha_0': alpha_0_opt, 'gamma': gamma_opt}, sse
|
|
|
|
def fit_all_distributions(self):
|
|
"""
|
|
Fit all 4 distribution types and select best fit.
|
|
|
|
Returns:
|
|
--------
|
|
str
|
|
Name of best-fitting distribution
|
|
"""
|
|
if self.verbose:
|
|
print("\nFitting multifractal spectrum to 4 distributions...")
|
|
|
|
# Fit each distribution
|
|
distributions = {
|
|
'Normal': self.fit_normal_distribution,
|
|
'Binomial': self.fit_binomial_distribution,
|
|
'Poisson': self.fit_poisson_distribution,
|
|
'Gamma': self.fit_gamma_distribution
|
|
}
|
|
|
|
for name, fit_func in distributions.items():
|
|
try:
|
|
params, sse = fit_func()
|
|
self.fitted_distributions[name] = {
|
|
'params': params,
|
|
'sse': sse,
|
|
'rmse': np.sqrt(sse / len(self.alpha_values))
|
|
}
|
|
if self.verbose:
|
|
print(f" {name:10s}: SSE = {sse:.6f}, RMSE = {self.fitted_distributions[name]['rmse']:.6f}")
|
|
for key, val in params.items():
|
|
print(f" {key} = {val:.6f}")
|
|
except Exception as e:
|
|
if self.verbose:
|
|
print(f" {name:10s}: FAILED ({str(e)})")
|
|
self.fitted_distributions[name] = None
|
|
|
|
# Select best fit (lowest SSE)
|
|
valid_dists = {k: v for k, v in self.fitted_distributions.items() if v is not None}
|
|
if not valid_dists:
|
|
raise ValueError("All distribution fits failed!")
|
|
|
|
self.best_distribution = min(valid_dists.keys(), key=lambda k: valid_dists[k]['sse'])
|
|
self.best_params = valid_dists[self.best_distribution]['params']
|
|
|
|
if self.verbose:
|
|
print(f"\n ✓ BEST FIT: {self.best_distribution}")
|
|
print(f" RMSE = {valid_dists[self.best_distribution]['rmse']:.6f}")
|
|
|
|
return self.best_distribution
|
|
|
|
def generate_fitted_spectrum(self, alpha_grid):
|
|
"""
|
|
Generate fitted spectrum values for plotting.
|
|
|
|
Parameters:
|
|
-----------
|
|
alpha_grid : np.ndarray
|
|
Grid of alpha values
|
|
|
|
Returns:
|
|
--------
|
|
np.ndarray
|
|
Fitted f(α) values
|
|
"""
|
|
if self.best_distribution is None:
|
|
raise ValueError("Must run fit_all_distributions() first")
|
|
|
|
params = self.best_params
|
|
|
|
if self.best_distribution == 'Normal':
|
|
return self.normal_spectrum(alpha_grid, params['alpha_0'], self.H)
|
|
elif self.best_distribution == 'Binomial':
|
|
return self.binomial_spectrum(alpha_grid, params['alpha_min'], params['alpha_max'])
|
|
elif self.best_distribution == 'Poisson':
|
|
return self.poisson_spectrum(alpha_grid, params['alpha_0'], self.H)
|
|
elif self.best_distribution == 'Gamma':
|
|
return self.gamma_spectrum(alpha_grid, params['alpha_0'], params['gamma'])
|
|
|
|
def plot_spectrum_fit(self, save_path=None):
|
|
"""Plot empirical spectrum and fitted distributions."""
|
|
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 6))
|
|
|
|
# Left: Sample spectrum with all fits
|
|
ax1.plot(self.alpha_values, self.f_alpha_values, 'ko',
|
|
markersize=6, label='Empirical f(α)', alpha=0.6)
|
|
|
|
# Generate smooth alpha grid for plotting
|
|
alpha_grid = np.linspace(self.alpha_values.min() * 0.9,
|
|
self.alpha_values.max() * 1.1, 200)
|
|
|
|
colors = {'Normal': 'blue', 'Binomial': 'red', 'Poisson': 'green', 'Gamma': 'orange'}
|
|
|
|
for name, data in self.fitted_distributions.items():
|
|
if data is None:
|
|
continue
|
|
|
|
params = data['params']
|
|
|
|
if name == 'Normal':
|
|
f_fit = self.normal_spectrum(alpha_grid, params['alpha_0'], self.H)
|
|
elif name == 'Binomial':
|
|
f_fit = self.binomial_spectrum(alpha_grid, params['alpha_min'], params['alpha_max'])
|
|
elif name == 'Poisson':
|
|
f_fit = self.poisson_spectrum(alpha_grid, params['alpha_0'], self.H)
|
|
elif name == 'Gamma':
|
|
f_fit = self.gamma_spectrum(alpha_grid, params['alpha_0'], params['gamma'])
|
|
|
|
linewidth = 3 if name == self.best_distribution else 1
|
|
linestyle = '-' if name == self.best_distribution else '--'
|
|
label = f"{name} (RMSE={data['rmse']:.4f})"
|
|
if name == self.best_distribution:
|
|
label += " ★"
|
|
|
|
ax1.plot(alpha_grid, f_fit, color=colors[name],
|
|
linewidth=linewidth, linestyle=linestyle, label=label, alpha=0.7)
|
|
|
|
ax1.set_xlabel('α (Hölder exponent)', fontsize=12)
|
|
ax1.set_ylabel('f(α)', fontsize=12)
|
|
ax1.set_title('Multifractal Spectrum f(α)\nAll Distribution Fits',
|
|
fontsize=13, fontweight='bold')
|
|
ax1.legend(fontsize=9)
|
|
ax1.grid(True, alpha=0.3)
|
|
ax1.set_ylim([0, 1.1])
|
|
|
|
# Right: Best fit only
|
|
ax2.plot(self.alpha_values, self.f_alpha_values, 'ko',
|
|
markersize=8, label='Empirical f(α)', alpha=0.6)
|
|
|
|
f_best = self.generate_fitted_spectrum(alpha_grid)
|
|
ax2.plot(alpha_grid, f_best, 'r-', linewidth=2.5,
|
|
label=f'{self.best_distribution} fit ★', alpha=0.8)
|
|
|
|
ax2.set_xlabel('α (Hölder exponent)', fontsize=12)
|
|
ax2.set_ylabel('f(α)', fontsize=12)
|
|
ax2.set_title(f'Best Fit: {self.best_distribution} Distribution',
|
|
fontsize=13, fontweight='bold')
|
|
ax2.legend(fontsize=10)
|
|
ax2.grid(True, alpha=0.3)
|
|
ax2.set_ylim([0, 1.1])
|
|
|
|
plt.tight_layout()
|
|
|
|
if save_path:
|
|
plt.savefig(save_path, dpi=300, bbox_inches='tight')
|
|
if self.verbose:
|
|
print(f"Plot saved to: {save_path}")
|
|
else:
|
|
plt.show()
|
|
|
|
plt.close()
|
|
|
|
def get_results_dataframe(self):
|
|
"""Get results as DataFrame."""
|
|
spectrum_df = pd.DataFrame({
|
|
'alpha': self.alpha_values,
|
|
'f_alpha': self.f_alpha_values,
|
|
'q': self.q_for_spectrum
|
|
})
|
|
|
|
return spectrum_df
|
|
|
|
def save_results(self, filepath):
|
|
"""Save results to CSV."""
|
|
df = self.get_results_dataframe()
|
|
|
|
# Add metadata
|
|
metadata = pd.DataFrame({
|
|
'alpha': ['BEST_FIT', 'H_PARAMETER'],
|
|
'f_alpha': [self.best_distribution, self.H],
|
|
'q': ['', '']
|
|
})
|
|
|
|
# Add best fit parameters
|
|
for key, val in self.best_params.items():
|
|
metadata = pd.concat([metadata, pd.DataFrame({
|
|
'alpha': [f'PARAM_{key}'],
|
|
'f_alpha': [val],
|
|
'q': ['']
|
|
})], ignore_index=True)
|
|
|
|
df = pd.concat([metadata, df], ignore_index=True)
|
|
df.to_csv(filepath, index=False)
|
|
|
|
if self.verbose:
|
|
print(f"Results saved to: {filepath}")
|
|
|
|
|
|
def run_spectrum_fitting(extractor, output_dir=None, save_plots=True):
|
|
"""
|
|
Complete workflow for fitting multifractal spectrum.
|
|
|
|
Parameters:
|
|
-----------
|
|
extractor : ScalingExtractor
|
|
Completed extractor from Step 2
|
|
output_dir : str, optional
|
|
Directory to save results
|
|
save_plots : bool
|
|
Whether to save plots
|
|
|
|
Returns:
|
|
--------
|
|
SpectrumFitter
|
|
Fitter object with results
|
|
"""
|
|
# Create output directory
|
|
if output_dir is None:
|
|
output_dir = config.PLOT_DIR
|
|
|
|
Path(output_dir).mkdir(parents=True, exist_ok=True)
|
|
|
|
# Initialize fitter
|
|
fitter = SpectrumFitter(extractor)
|
|
|
|
# Run analysis
|
|
print("\n" + "="*60)
|
|
print("STEP 3: FITTING MULTIFRACTAL SPECTRUM")
|
|
print("="*60)
|
|
|
|
# Compute spectrum
|
|
fitter.compute_multifractal_spectrum()
|
|
|
|
# Fit to distributions
|
|
best_dist = fitter.fit_all_distributions()
|
|
|
|
# Save results
|
|
if config.SAVE_INTERMEDIATE:
|
|
results_path = Path(output_dir) / "step3_spectrum_results.csv"
|
|
fitter.save_results(results_path)
|
|
|
|
# Create plots
|
|
if save_plots:
|
|
spectrum_plot_path = Path(output_dir) / "step3_spectrum_fit.png"
|
|
fitter.plot_spectrum_fit(save_path=spectrum_plot_path)
|
|
|
|
# Print summary
|
|
print("\n" + "="*60)
|
|
print("STEP 3 COMPLETE")
|
|
print("="*60)
|
|
print(f"\nBest-fitting distribution: {best_dist}")
|
|
print(f"Parameters:")
|
|
for key, val in fitter.best_params.items():
|
|
print(f" {key} = {val:.6f}")
|
|
|
|
print(f"\n→ Next: Run python run_step4.py")
|
|
print("="*60 + "\n")
|
|
|
|
return fitter
|
|
|
|
|
|
if __name__ == "__main__":
|
|
print("Step 3: Fit Multifractal Spectrum")
|
|
print("="*60)
|
|
print("\nThis script fits the multifractal spectrum f(α) to theoretical distributions.")
|
|
print("You must run Steps 1 & 2 first.\n")
|