MMAR/step1_check_fractality.py

606 lines
20 KiB
Python
Raw Permalink Normal View History

2026-05-03 21:24:14 +00:00
"""
Step 1: Check if Fractality Exists
Implements partition function analysis to detect moment scaling behavior
Based on paper equation (4.1):
S_q(T, Δt) = Σ |ln(P(iΔt + Δt) / P(iΔt))|^q
CRITICAL: Uses NON-OVERLAPPING intervals as per Zhang's methodology.
If log₁₀(S_q) vs log₁₀(Δt) is linear, then moment scaling exists fractality confirmed
"""
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy import stats
from pathlib import Path
import config
from data_loader import DataLoader
class FractalityChecker:
"""
Checks for multifractal behavior using partition function analysis.
The partition function S_q(T, Δt) measures how returns scale across different
time intervals (Δt) and moment orders (q).
IMPORTANT: Uses NON-OVERLAPPING intervals to avoid artificial correlation.
If the log-log plot of S_q vs Δt is linear, this indicates moment scaling,
which is a hallmark of multifractal processes.
"""
def __init__(self, returns, delta_t_values=None, q_values=None, verbose=config.VERBOSE):
"""
Initialize fractality checker.
Parameters:
-----------
returns : np.ndarray
Array of log returns (must be evenly spaced in time)
delta_t_values : np.ndarray, optional
Array of Δt values (in number of observations, NOT seconds)
If None, uses config.generate_delta_t_values()
q_values : np.ndarray, optional
Array of q moment orders
If None, uses config.generate_q_values()
verbose : bool
Print detailed information
"""
self.returns = returns
self.verbose = verbose
# Validate returns data
self._validate_returns()
# Delta_t values are NOW in number of observations (not seconds)
if delta_t_values is None:
self.delta_t_values = config.generate_delta_t_values()
else:
self.delta_t_values = delta_t_values
if q_values is None:
self.q_values = config.generate_q_values()
else:
self.q_values = q_values
# Storage for results
self.partition_values = {} # {q: {delta_t: S_q value}}
self.r_squared_values = {} # {q: R²}
self.slopes = {} # {q: slope (which equals τ(q) + 1)}
self.intercepts = {} # {q: intercept}
if self.verbose:
print(f"\nFractality Checker Initialized")
print(f"Number of returns: {len(returns)}")
print(f"Δt range: {self.delta_t_values.min()} to {self.delta_t_values.max()} observations")
print(f"Number of Δt values: {len(self.delta_t_values)}")
print(f"q range: {self.q_values.min():.2f} to {self.q_values.max():.2f}")
print(f"Number of q values: {len(self.q_values)}")
def _validate_returns(self):
"""
Validate returns data quality.
Returns:
--------
dict
Dictionary with validation results
"""
issues = []
# Check for NaN/Inf
if np.any(np.isnan(self.returns)):
issues.append("Contains NaN values")
if np.any(np.isinf(self.returns)):
issues.append("Contains infinite values")
# Check data length
if len(self.returns) < 1000:
issues.append(f"Short series ({len(self.returns)} points). Recommended: >10,000")
# Check for zeros (which could indicate data issues)
zero_pct = 100 * np.sum(self.returns == 0) / len(self.returns)
if zero_pct > 5:
issues.append(f"High percentage of zero returns: {zero_pct:.1f}%")
if issues:
print("\n⚠️ DATA QUALITY WARNINGS:")
for issue in issues:
print(f" - {issue}")
if np.any(np.isnan(self.returns)) or np.any(np.isinf(self.returns)):
raise ValueError("Cannot proceed with NaN or Inf values in returns")
def calculate_partition_function(self, q, delta_t):
"""
Calculate partition function S_q(T, Δt) for given q and Δt.
*** CRITICAL FIX ***:
Uses NON-OVERLAPPING intervals as per Zhang's methodology.
Equation (4.1) from paper:
S_q(T, Δt) = Σ |ln(P(iΔt + Δt) / P(iΔt))|^q
= Σ |r(iΔt, Δt)|^q
where r(iΔt, Δt) is the return over interval Δt starting at iΔt.
Zhang explicitly uses N = number of NON-overlapping complete intervals.
Parameters:
-----------
q : float
Moment order
delta_t : int
Time interval (number of observations)
Returns:
--------
float
S_q value
"""
# Number of complete NON-overlapping intervals (floor division)
N = len(self.returns) // delta_t
if N <= 0:
return np.nan
# EFFICIENT VECTORIZED IMPLEMENTATION:
# Trim returns to fit complete intervals
trimmed_returns = self.returns[:N * delta_t]
# Reshape into (N, delta_t) and sum along axis 1
# This gives us N non-overlapping interval returns
interval_returns = trimmed_returns.reshape(N, delta_t).sum(axis=1)
# Calculate partition function: MEAN of |r|^q
# CRITICAL FIX: Use mean instead of sum to remove N-dependency
# As Δt increases, N decreases, so sum would confound the scaling
S_q = np.mean(np.abs(interval_returns) ** q)
return S_q
def compute_all_partitions(self):
"""
Compute partition function for all combinations of q and Δt.
This is the main computational step that can take some time
for large datasets or many q/Δt values.
"""
if self.verbose:
print("\nComputing partition functions...")
total_combos = len(self.q_values) * len(self.delta_t_values)
print(f"Total combinations: {len(self.q_values)} × {len(self.delta_t_values)} = {total_combos}")
for i, q in enumerate(self.q_values):
if self.verbose and i % max(1, len(self.q_values) // 10) == 0:
progress = 100 * i / len(self.q_values)
print(f"Progress: {progress:.1f}% (q = {q:.2f})")
self.partition_values[q] = {}
for delta_t in self.delta_t_values:
S_q = self.calculate_partition_function(q, delta_t)
self.partition_values[q][delta_t] = S_q
if self.verbose:
print("✓ Partition functions computed")
def fit_partition_plots(self):
"""
Fit OLS regression to log-log partition plots.
For each q, fit: log₁₀(S_q) = slope × log₁₀(Δt) + intercept
The slope equals τ(q) + 1, so τ(q) = slope - 1
The value indicates how linear the relationship is (evidence of scaling).
"""
if self.verbose:
print("\nFitting partition plots...")
for q in self.q_values:
# Get delta_t and S_q values
delta_t_list = []
S_q_list = []
for delta_t in sorted(self.partition_values[q].keys()):
S_q = self.partition_values[q][delta_t]
if not np.isnan(S_q) and S_q > 0: # Need positive values for log
delta_t_list.append(delta_t)
S_q_list.append(S_q)
if len(delta_t_list) < 2:
continue
# Convert to log scale
log_delta_t = np.log10(delta_t_list)
log_S_q = np.log10(S_q_list)
# OLS regression
slope, intercept, r_value, p_value, std_err = stats.linregress(log_delta_t, log_S_q)
# Store results
# slope = τ(q) + 1 → τ(q) = slope - 1
self.slopes[q] = slope
self.intercepts[q] = intercept
self.r_squared_values[q] = r_value ** 2
if self.verbose:
mean_r2 = np.mean(list(self.r_squared_values.values()))
min_r2 = np.min(list(self.r_squared_values.values()))
max_r2 = np.max(list(self.r_squared_values.values()))
print(f"✓ Partition plots fitted")
print(f"\nR² Statistics:")
print(f" Mean R²: {mean_r2:.4f}")
print(f" Min R²: {min_r2:.4f}")
print(f" Max R²: {max_r2:.4f}")
print(f" Target R² (paper): 0.66")
if mean_r2 >= config.MIN_R_SQUARED:
print(f"\n✓ FRACTALITY CONFIRMED (R² = {mean_r2:.4f} >= {config.MIN_R_SQUARED})")
else:
print(f"\n✗ Weak evidence of fractality (R² = {mean_r2:.4f} < {config.MIN_R_SQUARED})")
def get_tau_q(self, q):
"""
Extract τ(q) from slope.
From paper: slope = τ(q) + 1
Therefore: τ(q) = slope - 1
Parameters:
-----------
q : float
Moment order
Returns:
--------
float or None
τ(q) value, or None if not available
"""
if q in self.slopes:
return self.slopes[q] - 1
return None
def detect_crossover_point(self, r_squared_threshold=0.85):
"""
Detect high-frequency crossover point where scaling breaks down.
Zhang mentions this is important (Figure 4 in paper).
Scaling behavior breaks down at very short timescales.
Parameters:
-----------
r_squared_threshold : float
threshold below which we consider scaling has broken down
Returns:
--------
int or None
Delta_t value where crossover occurs, or None if not detected
"""
if not self.partition_values:
raise ValueError("Must run compute_all_partitions() before detecting crossover")
self.crossover_scores = {}
sorted_dt = sorted(self.delta_t_values)
for start_idx, candidate_dt in enumerate(sorted_dt):
candidate_dts = sorted_dt[start_idx:]
if len(candidate_dts) < 3:
break
r2_values = []
for q in self.q_values:
if q not in self.partition_values:
continue
delta_t_list = []
S_q_list = []
for delta_t in candidate_dts:
S_q = self.partition_values[q].get(delta_t, np.nan)
if not np.isnan(S_q) and S_q > 0:
delta_t_list.append(delta_t)
S_q_list.append(S_q)
if len(delta_t_list) < 3:
continue
log_delta_t = np.log10(delta_t_list)
log_S_q = np.log10(S_q_list)
_, _, r_value, _, _ = stats.linregress(log_delta_t, log_S_q)
r2_values.append(r_value ** 2)
if r2_values:
self.crossover_scores[candidate_dt] = np.mean(r2_values)
# Find first minimum delta_t where the truncated scaling region is strong.
for dt, mean_r2 in self.crossover_scores.items():
if mean_r2 >= r_squared_threshold:
if self.verbose:
print(f"\nHigh-frequency crossover detected at Δt = {dt} observations")
print(f"Scaling behavior valid for Δt >= {dt}")
print(f"Mean truncated R² = {mean_r2:.4f}")
return dt
if self.verbose:
print("\nNo high-frequency crossover detected with current threshold")
return None
def plot_partition_function(self, q_values_to_plot=None, save_path=None):
"""
Create partition plots (log S_q vs log Δt) for selected q values.
Parameters:
-----------
q_values_to_plot : list, optional
List of q values to plot. If None, plots a selection of q values.
save_path : str, optional
Path to save the plot. If None, displays the plot.
"""
if q_values_to_plot is None:
# Select a subset of q values for clearer visualization
q_values_to_plot = [1, 2, 3, 4, 5]
plt.figure(figsize=(12, 8))
colors = plt.cm.viridis(np.linspace(0, 1, len(q_values_to_plot)))
for idx, q in enumerate(q_values_to_plot):
if q not in self.partition_values:
continue
# Get data points
delta_t_list = []
S_q_list = []
for delta_t in sorted(self.partition_values[q].keys()):
S_q = self.partition_values[q][delta_t]
if not np.isnan(S_q) and S_q > 0:
delta_t_list.append(delta_t)
S_q_list.append(S_q)
if len(delta_t_list) == 0:
continue
# Convert to log scale
log_delta_t = np.log10(delta_t_list)
log_S_q = np.log10(S_q_list)
# Plot data points and regression line
if q in self.slopes:
r2 = self.r_squared_values[q]
plt.plot(log_delta_t, log_S_q, 'o-',
color=colors[idx],
label=f'q = {q} (R² = {r2:.3f})',
alpha=0.7, markersize=4)
# Plot regression line
fitted_line = self.slopes[q] * log_delta_t + self.intercepts[q]
plt.plot(log_delta_t, fitted_line, '--',
color=colors[idx], alpha=0.5, linewidth=1)
plt.xlabel('log₁₀(Δt) [number of observations]', fontsize=12)
plt.ylabel('log₁₀(S_q)', fontsize=12)
plt.title('Partition Function: Evidence of Moment Scaling\n(Linear relationship indicates multifractality)',
fontsize=14, fontweight='bold')
plt.legend(fontsize=9)
plt.grid(True, alpha=0.3)
# Add R² information
if len(self.r_squared_values) > 0:
mean_r2 = np.mean(list(self.r_squared_values.values()))
plt.text(0.02, 0.98, f'Mean R² = {mean_r2:.4f}\nThreshold = {config.MIN_R_SQUARED}',
transform=plt.gca().transAxes,
verticalalignment='top',
fontsize=10,
bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches='tight')
if self.verbose:
print(f"Plot saved to: {save_path}")
else:
plt.show()
plt.close()
def plot_r_squared_vs_q(self, save_path=None):
"""
Plot values as a function of q.
This shows how well the linear scaling holds across different moment orders.
"""
if len(self.r_squared_values) == 0:
print("No R² values available. Run fit_partition_plots() first.")
return
q_vals = sorted(self.r_squared_values.keys())
r2_vals = [self.r_squared_values[q] for q in q_vals]
plt.figure(figsize=(10, 6))
plt.plot(q_vals, r2_vals, 'o-', linewidth=2, markersize=6, color='steelblue')
plt.axhline(y=config.MIN_R_SQUARED, color='r', linestyle='--',
linewidth=2, label=f'Threshold = {config.MIN_R_SQUARED}')
plt.xlabel('q (moment order)', fontsize=12)
plt.ylabel('R² (goodness of fit)', fontsize=12)
plt.title('Linearity of Scaling Relationship across Moment Orders', fontsize=14, fontweight='bold')
plt.legend(fontsize=10)
plt.grid(True, alpha=0.3)
plt.ylim([0, 1.05])
mean_r2 = np.mean(r2_vals)
plt.text(0.98, 0.02, f'Mean R² = {mean_r2:.4f}',
transform=plt.gca().transAxes,
horizontalalignment='right',
verticalalignment='bottom',
fontsize=11,
bbox=dict(boxstyle='round', facecolor='lightgreen', alpha=0.5))
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches='tight')
if self.verbose:
print(f"Plot saved to: {save_path}")
else:
plt.show()
plt.close()
def get_results_dataframe(self):
"""
Get results as a pandas DataFrame for further analysis.
Returns:
--------
pd.DataFrame
DataFrame with columns: q, slope, tau_q, intercept, r_squared
"""
results = []
for q in sorted(self.slopes.keys()):
results.append({
'q': q,
'slope': self.slopes[q],
'tau_q': self.slopes[q] - 1, # τ(q) = slope - 1
'intercept': self.intercepts[q],
'r_squared': self.r_squared_values[q]
})
return pd.DataFrame(results)
def save_results(self, filepath):
"""
Save results to CSV file.
Parameters:
-----------
filepath : str
Path to save CSV file
"""
df = self.get_results_dataframe()
df.to_csv(filepath, index=False)
if self.verbose:
print(f"Results saved to: {filepath}")
def is_fractal(self, threshold=None):
"""
Determine if data exhibits fractal behavior.
Parameters:
-----------
threshold : float, optional
threshold. If None, uses config.MIN_R_SQUARED
Returns:
--------
bool
True if mean exceeds threshold
"""
if threshold is None:
threshold = config.MIN_R_SQUARED
if len(self.r_squared_values) == 0:
return False
mean_r2 = np.mean(list(self.r_squared_values.values()))
return mean_r2 >= threshold
def run_fractality_check(returns, output_dir=None, save_plots=True):
"""
Complete workflow for checking fractality.
Parameters:
-----------
returns : np.ndarray
Array of log returns
output_dir : str, optional
Directory to save results
save_plots : bool
Whether to save plots
Returns:
--------
FractalityChecker
Checker object with results
"""
# Create output directory
if output_dir is None:
output_dir = config.PLOT_DIR
Path(output_dir).mkdir(parents=True, exist_ok=True)
# Initialize checker
checker = FractalityChecker(returns)
# Run analysis
print("\n" + "="*60)
print("STEP 1: CHECKING FOR FRACTALITY")
print("="*60)
checker.compute_all_partitions()
checker.fit_partition_plots()
# Detect crossover point
checker.detect_crossover_point()
# Save results
if config.SAVE_INTERMEDIATE:
results_path = Path(output_dir) / "step1_partition_results.csv"
checker.save_results(results_path)
# Create plots
if save_plots:
partition_plot_path = Path(output_dir) / "step1_partition_plots.png"
checker.plot_partition_function(q_values_to_plot=[1, 2, 3, 4, 5],
save_path=partition_plot_path)
r2_plot_path = Path(output_dir) / "step1_r_squared.png"
checker.plot_r_squared_vs_q(save_path=r2_plot_path)
# Print conclusion
print("\n" + "="*60)
if checker.is_fractal():
print("✓ CONCLUSION: Data exhibits multifractal behavior")
print(" → Proceed to Step 2: Extract scaling function τ(q)")
else:
print("✗ CONCLUSION: Weak evidence of multifractal behavior")
print(" → Consider using different data or parameters")
print("="*60 + "\n")
return checker
if __name__ == "__main__":
print("Step 1: Check Fractality")
print("="*60)
print("\nThis script checks if your price data exhibits multifractal behavior.")
print("You need to provide returns data to run the analysis.\n")
# Demo with synthetic multifractal data
print("Generating synthetic multifractal data for demonstration...")
# Simple synthetic data (not truly multifractal, just for demo)
np.random.seed(42)
n_points = 10000
returns = np.random.normal(0, 0.001, n_points) * (1 + 0.5 * np.sin(np.arange(n_points) / 100))
print(f"Generated {n_points} synthetic returns\n")
# Run analysis
checker = run_fractality_check(returns, save_plots=True)
print("\nTo use with real data:")
print("1. Load your data with DataLoader")
print("2. Extract returns array")
print("3. Call run_fractality_check(returns)")