MMAR/step1_check_fractality.py
2026-05-03 21:24:14 +00:00

606 lines
20 KiB
Python

"""
Step 1: Check if Fractality Exists
Implements partition function analysis to detect moment scaling behavior
Based on paper equation (4.1):
S_q(T, Δt) = Σ |ln(P(iΔt + Δt) / P(iΔt))|^q
CRITICAL: Uses NON-OVERLAPPING intervals as per Zhang's methodology.
If log₁₀(S_q) vs log₁₀(Δt) is linear, then moment scaling exists → fractality confirmed
"""
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy import stats
from pathlib import Path
import config
from data_loader import DataLoader
class FractalityChecker:
"""
Checks for multifractal behavior using partition function analysis.
The partition function S_q(T, Δt) measures how returns scale across different
time intervals (Δt) and moment orders (q).
IMPORTANT: Uses NON-OVERLAPPING intervals to avoid artificial correlation.
If the log-log plot of S_q vs Δt is linear, this indicates moment scaling,
which is a hallmark of multifractal processes.
"""
def __init__(self, returns, delta_t_values=None, q_values=None, verbose=config.VERBOSE):
"""
Initialize fractality checker.
Parameters:
-----------
returns : np.ndarray
Array of log returns (must be evenly spaced in time)
delta_t_values : np.ndarray, optional
Array of Δt values (in number of observations, NOT seconds)
If None, uses config.generate_delta_t_values()
q_values : np.ndarray, optional
Array of q moment orders
If None, uses config.generate_q_values()
verbose : bool
Print detailed information
"""
self.returns = returns
self.verbose = verbose
# Validate returns data
self._validate_returns()
# Delta_t values are NOW in number of observations (not seconds)
if delta_t_values is None:
self.delta_t_values = config.generate_delta_t_values()
else:
self.delta_t_values = delta_t_values
if q_values is None:
self.q_values = config.generate_q_values()
else:
self.q_values = q_values
# Storage for results
self.partition_values = {} # {q: {delta_t: S_q value}}
self.r_squared_values = {} # {q: R²}
self.slopes = {} # {q: slope (which equals τ(q) + 1)}
self.intercepts = {} # {q: intercept}
if self.verbose:
print(f"\nFractality Checker Initialized")
print(f"Number of returns: {len(returns)}")
print(f"Δt range: {self.delta_t_values.min()} to {self.delta_t_values.max()} observations")
print(f"Number of Δt values: {len(self.delta_t_values)}")
print(f"q range: {self.q_values.min():.2f} to {self.q_values.max():.2f}")
print(f"Number of q values: {len(self.q_values)}")
def _validate_returns(self):
"""
Validate returns data quality.
Returns:
--------
dict
Dictionary with validation results
"""
issues = []
# Check for NaN/Inf
if np.any(np.isnan(self.returns)):
issues.append("Contains NaN values")
if np.any(np.isinf(self.returns)):
issues.append("Contains infinite values")
# Check data length
if len(self.returns) < 1000:
issues.append(f"Short series ({len(self.returns)} points). Recommended: >10,000")
# Check for zeros (which could indicate data issues)
zero_pct = 100 * np.sum(self.returns == 0) / len(self.returns)
if zero_pct > 5:
issues.append(f"High percentage of zero returns: {zero_pct:.1f}%")
if issues:
print("\n⚠️ DATA QUALITY WARNINGS:")
for issue in issues:
print(f" - {issue}")
if np.any(np.isnan(self.returns)) or np.any(np.isinf(self.returns)):
raise ValueError("Cannot proceed with NaN or Inf values in returns")
def calculate_partition_function(self, q, delta_t):
"""
Calculate partition function S_q(T, Δt) for given q and Δt.
*** CRITICAL FIX ***:
Uses NON-OVERLAPPING intervals as per Zhang's methodology.
Equation (4.1) from paper:
S_q(T, Δt) = Σ |ln(P(iΔt + Δt) / P(iΔt))|^q
= Σ |r(iΔt, Δt)|^q
where r(iΔt, Δt) is the return over interval Δt starting at iΔt.
Zhang explicitly uses N = number of NON-overlapping complete intervals.
Parameters:
-----------
q : float
Moment order
delta_t : int
Time interval (number of observations)
Returns:
--------
float
S_q value
"""
# Number of complete NON-overlapping intervals (floor division)
N = len(self.returns) // delta_t
if N <= 0:
return np.nan
# EFFICIENT VECTORIZED IMPLEMENTATION:
# Trim returns to fit complete intervals
trimmed_returns = self.returns[:N * delta_t]
# Reshape into (N, delta_t) and sum along axis 1
# This gives us N non-overlapping interval returns
interval_returns = trimmed_returns.reshape(N, delta_t).sum(axis=1)
# Calculate partition function: MEAN of |r|^q
# CRITICAL FIX: Use mean instead of sum to remove N-dependency
# As Δt increases, N decreases, so sum would confound the scaling
S_q = np.mean(np.abs(interval_returns) ** q)
return S_q
def compute_all_partitions(self):
"""
Compute partition function for all combinations of q and Δt.
This is the main computational step that can take some time
for large datasets or many q/Δt values.
"""
if self.verbose:
print("\nComputing partition functions...")
total_combos = len(self.q_values) * len(self.delta_t_values)
print(f"Total combinations: {len(self.q_values)} × {len(self.delta_t_values)} = {total_combos}")
for i, q in enumerate(self.q_values):
if self.verbose and i % max(1, len(self.q_values) // 10) == 0:
progress = 100 * i / len(self.q_values)
print(f"Progress: {progress:.1f}% (q = {q:.2f})")
self.partition_values[q] = {}
for delta_t in self.delta_t_values:
S_q = self.calculate_partition_function(q, delta_t)
self.partition_values[q][delta_t] = S_q
if self.verbose:
print("✓ Partition functions computed")
def fit_partition_plots(self):
"""
Fit OLS regression to log-log partition plots.
For each q, fit: log₁₀(S_q) = slope × log₁₀(Δt) + intercept
The slope equals τ(q) + 1, so τ(q) = slope - 1
The R² value indicates how linear the relationship is (evidence of scaling).
"""
if self.verbose:
print("\nFitting partition plots...")
for q in self.q_values:
# Get delta_t and S_q values
delta_t_list = []
S_q_list = []
for delta_t in sorted(self.partition_values[q].keys()):
S_q = self.partition_values[q][delta_t]
if not np.isnan(S_q) and S_q > 0: # Need positive values for log
delta_t_list.append(delta_t)
S_q_list.append(S_q)
if len(delta_t_list) < 2:
continue
# Convert to log scale
log_delta_t = np.log10(delta_t_list)
log_S_q = np.log10(S_q_list)
# OLS regression
slope, intercept, r_value, p_value, std_err = stats.linregress(log_delta_t, log_S_q)
# Store results
# slope = τ(q) + 1 → τ(q) = slope - 1
self.slopes[q] = slope
self.intercepts[q] = intercept
self.r_squared_values[q] = r_value ** 2
if self.verbose:
mean_r2 = np.mean(list(self.r_squared_values.values()))
min_r2 = np.min(list(self.r_squared_values.values()))
max_r2 = np.max(list(self.r_squared_values.values()))
print(f"✓ Partition plots fitted")
print(f"\nR² Statistics:")
print(f" Mean R²: {mean_r2:.4f}")
print(f" Min R²: {min_r2:.4f}")
print(f" Max R²: {max_r2:.4f}")
print(f" Target R² (paper): 0.66")
if mean_r2 >= config.MIN_R_SQUARED:
print(f"\n✓ FRACTALITY CONFIRMED (R² = {mean_r2:.4f} >= {config.MIN_R_SQUARED})")
else:
print(f"\n✗ Weak evidence of fractality (R² = {mean_r2:.4f} < {config.MIN_R_SQUARED})")
def get_tau_q(self, q):
"""
Extract τ(q) from slope.
From paper: slope = τ(q) + 1
Therefore: τ(q) = slope - 1
Parameters:
-----------
q : float
Moment order
Returns:
--------
float or None
τ(q) value, or None if not available
"""
if q in self.slopes:
return self.slopes[q] - 1
return None
def detect_crossover_point(self, r_squared_threshold=0.85):
"""
Detect high-frequency crossover point where scaling breaks down.
Zhang mentions this is important (Figure 4 in paper).
Scaling behavior breaks down at very short timescales.
Parameters:
-----------
r_squared_threshold : float
R² threshold below which we consider scaling has broken down
Returns:
--------
int or None
Delta_t value where crossover occurs, or None if not detected
"""
if not self.partition_values:
raise ValueError("Must run compute_all_partitions() before detecting crossover")
self.crossover_scores = {}
sorted_dt = sorted(self.delta_t_values)
for start_idx, candidate_dt in enumerate(sorted_dt):
candidate_dts = sorted_dt[start_idx:]
if len(candidate_dts) < 3:
break
r2_values = []
for q in self.q_values:
if q not in self.partition_values:
continue
delta_t_list = []
S_q_list = []
for delta_t in candidate_dts:
S_q = self.partition_values[q].get(delta_t, np.nan)
if not np.isnan(S_q) and S_q > 0:
delta_t_list.append(delta_t)
S_q_list.append(S_q)
if len(delta_t_list) < 3:
continue
log_delta_t = np.log10(delta_t_list)
log_S_q = np.log10(S_q_list)
_, _, r_value, _, _ = stats.linregress(log_delta_t, log_S_q)
r2_values.append(r_value ** 2)
if r2_values:
self.crossover_scores[candidate_dt] = np.mean(r2_values)
# Find first minimum delta_t where the truncated scaling region is strong.
for dt, mean_r2 in self.crossover_scores.items():
if mean_r2 >= r_squared_threshold:
if self.verbose:
print(f"\nHigh-frequency crossover detected at Δt = {dt} observations")
print(f"Scaling behavior valid for Δt >= {dt}")
print(f"Mean truncated R² = {mean_r2:.4f}")
return dt
if self.verbose:
print("\nNo high-frequency crossover detected with current threshold")
return None
def plot_partition_function(self, q_values_to_plot=None, save_path=None):
"""
Create partition plots (log S_q vs log Δt) for selected q values.
Parameters:
-----------
q_values_to_plot : list, optional
List of q values to plot. If None, plots a selection of q values.
save_path : str, optional
Path to save the plot. If None, displays the plot.
"""
if q_values_to_plot is None:
# Select a subset of q values for clearer visualization
q_values_to_plot = [1, 2, 3, 4, 5]
plt.figure(figsize=(12, 8))
colors = plt.cm.viridis(np.linspace(0, 1, len(q_values_to_plot)))
for idx, q in enumerate(q_values_to_plot):
if q not in self.partition_values:
continue
# Get data points
delta_t_list = []
S_q_list = []
for delta_t in sorted(self.partition_values[q].keys()):
S_q = self.partition_values[q][delta_t]
if not np.isnan(S_q) and S_q > 0:
delta_t_list.append(delta_t)
S_q_list.append(S_q)
if len(delta_t_list) == 0:
continue
# Convert to log scale
log_delta_t = np.log10(delta_t_list)
log_S_q = np.log10(S_q_list)
# Plot data points and regression line
if q in self.slopes:
r2 = self.r_squared_values[q]
plt.plot(log_delta_t, log_S_q, 'o-',
color=colors[idx],
label=f'q = {q} (R² = {r2:.3f})',
alpha=0.7, markersize=4)
# Plot regression line
fitted_line = self.slopes[q] * log_delta_t + self.intercepts[q]
plt.plot(log_delta_t, fitted_line, '--',
color=colors[idx], alpha=0.5, linewidth=1)
plt.xlabel('log₁₀(Δt) [number of observations]', fontsize=12)
plt.ylabel('log₁₀(S_q)', fontsize=12)
plt.title('Partition Function: Evidence of Moment Scaling\n(Linear relationship indicates multifractality)',
fontsize=14, fontweight='bold')
plt.legend(fontsize=9)
plt.grid(True, alpha=0.3)
# Add R² information
if len(self.r_squared_values) > 0:
mean_r2 = np.mean(list(self.r_squared_values.values()))
plt.text(0.02, 0.98, f'Mean R² = {mean_r2:.4f}\nThreshold = {config.MIN_R_SQUARED}',
transform=plt.gca().transAxes,
verticalalignment='top',
fontsize=10,
bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches='tight')
if self.verbose:
print(f"Plot saved to: {save_path}")
else:
plt.show()
plt.close()
def plot_r_squared_vs_q(self, save_path=None):
"""
Plot R² values as a function of q.
This shows how well the linear scaling holds across different moment orders.
"""
if len(self.r_squared_values) == 0:
print("No R² values available. Run fit_partition_plots() first.")
return
q_vals = sorted(self.r_squared_values.keys())
r2_vals = [self.r_squared_values[q] for q in q_vals]
plt.figure(figsize=(10, 6))
plt.plot(q_vals, r2_vals, 'o-', linewidth=2, markersize=6, color='steelblue')
plt.axhline(y=config.MIN_R_SQUARED, color='r', linestyle='--',
linewidth=2, label=f'Threshold = {config.MIN_R_SQUARED}')
plt.xlabel('q (moment order)', fontsize=12)
plt.ylabel('R² (goodness of fit)', fontsize=12)
plt.title('Linearity of Scaling Relationship across Moment Orders', fontsize=14, fontweight='bold')
plt.legend(fontsize=10)
plt.grid(True, alpha=0.3)
plt.ylim([0, 1.05])
mean_r2 = np.mean(r2_vals)
plt.text(0.98, 0.02, f'Mean R² = {mean_r2:.4f}',
transform=plt.gca().transAxes,
horizontalalignment='right',
verticalalignment='bottom',
fontsize=11,
bbox=dict(boxstyle='round', facecolor='lightgreen', alpha=0.5))
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches='tight')
if self.verbose:
print(f"Plot saved to: {save_path}")
else:
plt.show()
plt.close()
def get_results_dataframe(self):
"""
Get results as a pandas DataFrame for further analysis.
Returns:
--------
pd.DataFrame
DataFrame with columns: q, slope, tau_q, intercept, r_squared
"""
results = []
for q in sorted(self.slopes.keys()):
results.append({
'q': q,
'slope': self.slopes[q],
'tau_q': self.slopes[q] - 1, # τ(q) = slope - 1
'intercept': self.intercepts[q],
'r_squared': self.r_squared_values[q]
})
return pd.DataFrame(results)
def save_results(self, filepath):
"""
Save results to CSV file.
Parameters:
-----------
filepath : str
Path to save CSV file
"""
df = self.get_results_dataframe()
df.to_csv(filepath, index=False)
if self.verbose:
print(f"Results saved to: {filepath}")
def is_fractal(self, threshold=None):
"""
Determine if data exhibits fractal behavior.
Parameters:
-----------
threshold : float, optional
R² threshold. If None, uses config.MIN_R_SQUARED
Returns:
--------
bool
True if mean R² exceeds threshold
"""
if threshold is None:
threshold = config.MIN_R_SQUARED
if len(self.r_squared_values) == 0:
return False
mean_r2 = np.mean(list(self.r_squared_values.values()))
return mean_r2 >= threshold
def run_fractality_check(returns, output_dir=None, save_plots=True):
"""
Complete workflow for checking fractality.
Parameters:
-----------
returns : np.ndarray
Array of log returns
output_dir : str, optional
Directory to save results
save_plots : bool
Whether to save plots
Returns:
--------
FractalityChecker
Checker object with results
"""
# Create output directory
if output_dir is None:
output_dir = config.PLOT_DIR
Path(output_dir).mkdir(parents=True, exist_ok=True)
# Initialize checker
checker = FractalityChecker(returns)
# Run analysis
print("\n" + "="*60)
print("STEP 1: CHECKING FOR FRACTALITY")
print("="*60)
checker.compute_all_partitions()
checker.fit_partition_plots()
# Detect crossover point
checker.detect_crossover_point()
# Save results
if config.SAVE_INTERMEDIATE:
results_path = Path(output_dir) / "step1_partition_results.csv"
checker.save_results(results_path)
# Create plots
if save_plots:
partition_plot_path = Path(output_dir) / "step1_partition_plots.png"
checker.plot_partition_function(q_values_to_plot=[1, 2, 3, 4, 5],
save_path=partition_plot_path)
r2_plot_path = Path(output_dir) / "step1_r_squared.png"
checker.plot_r_squared_vs_q(save_path=r2_plot_path)
# Print conclusion
print("\n" + "="*60)
if checker.is_fractal():
print("✓ CONCLUSION: Data exhibits multifractal behavior")
print(" → Proceed to Step 2: Extract scaling function τ(q)")
else:
print("✗ CONCLUSION: Weak evidence of multifractal behavior")
print(" → Consider using different data or parameters")
print("="*60 + "\n")
return checker
if __name__ == "__main__":
print("Step 1: Check Fractality")
print("="*60)
print("\nThis script checks if your price data exhibits multifractal behavior.")
print("You need to provide returns data to run the analysis.\n")
# Demo with synthetic multifractal data
print("Generating synthetic multifractal data for demonstration...")
# Simple synthetic data (not truly multifractal, just for demo)
np.random.seed(42)
n_points = 10000
returns = np.random.normal(0, 0.001, n_points) * (1 + 0.5 * np.sin(np.arange(n_points) / 100))
print(f"Generated {n_points} synthetic returns\n")
# Run analysis
checker = run_fractality_check(returns, save_plots=True)
print("\nTo use with real data:")
print("1. Load your data with DataLoader")
print("2. Extract returns array")
print("3. Call run_fractality_check(returns)")