606 lines
20 KiB
Python
606 lines
20 KiB
Python
"""
|
|
Step 1: Check if Fractality Exists
|
|
Implements partition function analysis to detect moment scaling behavior
|
|
|
|
Based on paper equation (4.1):
|
|
S_q(T, Δt) = Σ |ln(P(iΔt + Δt) / P(iΔt))|^q
|
|
|
|
CRITICAL: Uses NON-OVERLAPPING intervals as per Zhang's methodology.
|
|
|
|
If log₁₀(S_q) vs log₁₀(Δt) is linear, then moment scaling exists → fractality confirmed
|
|
"""
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
import matplotlib.pyplot as plt
|
|
from scipy import stats
|
|
from pathlib import Path
|
|
import config
|
|
from data_loader import DataLoader
|
|
|
|
|
|
class FractalityChecker:
|
|
"""
|
|
Checks for multifractal behavior using partition function analysis.
|
|
|
|
The partition function S_q(T, Δt) measures how returns scale across different
|
|
time intervals (Δt) and moment orders (q).
|
|
|
|
IMPORTANT: Uses NON-OVERLAPPING intervals to avoid artificial correlation.
|
|
|
|
If the log-log plot of S_q vs Δt is linear, this indicates moment scaling,
|
|
which is a hallmark of multifractal processes.
|
|
"""
|
|
|
|
def __init__(self, returns, delta_t_values=None, q_values=None, verbose=config.VERBOSE):
|
|
"""
|
|
Initialize fractality checker.
|
|
|
|
Parameters:
|
|
-----------
|
|
returns : np.ndarray
|
|
Array of log returns (must be evenly spaced in time)
|
|
delta_t_values : np.ndarray, optional
|
|
Array of Δt values (in number of observations, NOT seconds)
|
|
If None, uses config.generate_delta_t_values()
|
|
q_values : np.ndarray, optional
|
|
Array of q moment orders
|
|
If None, uses config.generate_q_values()
|
|
verbose : bool
|
|
Print detailed information
|
|
"""
|
|
self.returns = returns
|
|
self.verbose = verbose
|
|
|
|
# Validate returns data
|
|
self._validate_returns()
|
|
|
|
# Delta_t values are NOW in number of observations (not seconds)
|
|
if delta_t_values is None:
|
|
self.delta_t_values = config.generate_delta_t_values()
|
|
else:
|
|
self.delta_t_values = delta_t_values
|
|
|
|
if q_values is None:
|
|
self.q_values = config.generate_q_values()
|
|
else:
|
|
self.q_values = q_values
|
|
|
|
# Storage for results
|
|
self.partition_values = {} # {q: {delta_t: S_q value}}
|
|
self.r_squared_values = {} # {q: R²}
|
|
self.slopes = {} # {q: slope (which equals τ(q) + 1)}
|
|
self.intercepts = {} # {q: intercept}
|
|
|
|
if self.verbose:
|
|
print(f"\nFractality Checker Initialized")
|
|
print(f"Number of returns: {len(returns)}")
|
|
print(f"Δt range: {self.delta_t_values.min()} to {self.delta_t_values.max()} observations")
|
|
print(f"Number of Δt values: {len(self.delta_t_values)}")
|
|
print(f"q range: {self.q_values.min():.2f} to {self.q_values.max():.2f}")
|
|
print(f"Number of q values: {len(self.q_values)}")
|
|
|
|
def _validate_returns(self):
|
|
"""
|
|
Validate returns data quality.
|
|
|
|
Returns:
|
|
--------
|
|
dict
|
|
Dictionary with validation results
|
|
"""
|
|
issues = []
|
|
|
|
# Check for NaN/Inf
|
|
if np.any(np.isnan(self.returns)):
|
|
issues.append("Contains NaN values")
|
|
if np.any(np.isinf(self.returns)):
|
|
issues.append("Contains infinite values")
|
|
|
|
# Check data length
|
|
if len(self.returns) < 1000:
|
|
issues.append(f"Short series ({len(self.returns)} points). Recommended: >10,000")
|
|
|
|
# Check for zeros (which could indicate data issues)
|
|
zero_pct = 100 * np.sum(self.returns == 0) / len(self.returns)
|
|
if zero_pct > 5:
|
|
issues.append(f"High percentage of zero returns: {zero_pct:.1f}%")
|
|
|
|
if issues:
|
|
print("\n⚠️ DATA QUALITY WARNINGS:")
|
|
for issue in issues:
|
|
print(f" - {issue}")
|
|
|
|
if np.any(np.isnan(self.returns)) or np.any(np.isinf(self.returns)):
|
|
raise ValueError("Cannot proceed with NaN or Inf values in returns")
|
|
|
|
def calculate_partition_function(self, q, delta_t):
|
|
"""
|
|
Calculate partition function S_q(T, Δt) for given q and Δt.
|
|
|
|
*** CRITICAL FIX ***:
|
|
Uses NON-OVERLAPPING intervals as per Zhang's methodology.
|
|
|
|
Equation (4.1) from paper:
|
|
S_q(T, Δt) = Σ |ln(P(iΔt + Δt) / P(iΔt))|^q
|
|
= Σ |r(iΔt, Δt)|^q
|
|
|
|
where r(iΔt, Δt) is the return over interval Δt starting at iΔt.
|
|
|
|
Zhang explicitly uses N = number of NON-overlapping complete intervals.
|
|
|
|
Parameters:
|
|
-----------
|
|
q : float
|
|
Moment order
|
|
delta_t : int
|
|
Time interval (number of observations)
|
|
|
|
Returns:
|
|
--------
|
|
float
|
|
S_q value
|
|
"""
|
|
# Number of complete NON-overlapping intervals (floor division)
|
|
N = len(self.returns) // delta_t
|
|
|
|
if N <= 0:
|
|
return np.nan
|
|
|
|
# EFFICIENT VECTORIZED IMPLEMENTATION:
|
|
# Trim returns to fit complete intervals
|
|
trimmed_returns = self.returns[:N * delta_t]
|
|
|
|
# Reshape into (N, delta_t) and sum along axis 1
|
|
# This gives us N non-overlapping interval returns
|
|
interval_returns = trimmed_returns.reshape(N, delta_t).sum(axis=1)
|
|
|
|
# Calculate partition function: MEAN of |r|^q
|
|
# CRITICAL FIX: Use mean instead of sum to remove N-dependency
|
|
# As Δt increases, N decreases, so sum would confound the scaling
|
|
S_q = np.mean(np.abs(interval_returns) ** q)
|
|
|
|
return S_q
|
|
|
|
def compute_all_partitions(self):
|
|
"""
|
|
Compute partition function for all combinations of q and Δt.
|
|
|
|
This is the main computational step that can take some time
|
|
for large datasets or many q/Δt values.
|
|
"""
|
|
if self.verbose:
|
|
print("\nComputing partition functions...")
|
|
total_combos = len(self.q_values) * len(self.delta_t_values)
|
|
print(f"Total combinations: {len(self.q_values)} × {len(self.delta_t_values)} = {total_combos}")
|
|
|
|
for i, q in enumerate(self.q_values):
|
|
if self.verbose and i % max(1, len(self.q_values) // 10) == 0:
|
|
progress = 100 * i / len(self.q_values)
|
|
print(f"Progress: {progress:.1f}% (q = {q:.2f})")
|
|
|
|
self.partition_values[q] = {}
|
|
|
|
for delta_t in self.delta_t_values:
|
|
S_q = self.calculate_partition_function(q, delta_t)
|
|
self.partition_values[q][delta_t] = S_q
|
|
|
|
if self.verbose:
|
|
print("✓ Partition functions computed")
|
|
|
|
def fit_partition_plots(self):
|
|
"""
|
|
Fit OLS regression to log-log partition plots.
|
|
|
|
For each q, fit: log₁₀(S_q) = slope × log₁₀(Δt) + intercept
|
|
|
|
The slope equals τ(q) + 1, so τ(q) = slope - 1
|
|
The R² value indicates how linear the relationship is (evidence of scaling).
|
|
"""
|
|
if self.verbose:
|
|
print("\nFitting partition plots...")
|
|
|
|
for q in self.q_values:
|
|
# Get delta_t and S_q values
|
|
delta_t_list = []
|
|
S_q_list = []
|
|
|
|
for delta_t in sorted(self.partition_values[q].keys()):
|
|
S_q = self.partition_values[q][delta_t]
|
|
if not np.isnan(S_q) and S_q > 0: # Need positive values for log
|
|
delta_t_list.append(delta_t)
|
|
S_q_list.append(S_q)
|
|
|
|
if len(delta_t_list) < 2:
|
|
continue
|
|
|
|
# Convert to log scale
|
|
log_delta_t = np.log10(delta_t_list)
|
|
log_S_q = np.log10(S_q_list)
|
|
|
|
# OLS regression
|
|
slope, intercept, r_value, p_value, std_err = stats.linregress(log_delta_t, log_S_q)
|
|
|
|
# Store results
|
|
# slope = τ(q) + 1 → τ(q) = slope - 1
|
|
self.slopes[q] = slope
|
|
self.intercepts[q] = intercept
|
|
self.r_squared_values[q] = r_value ** 2
|
|
|
|
if self.verbose:
|
|
mean_r2 = np.mean(list(self.r_squared_values.values()))
|
|
min_r2 = np.min(list(self.r_squared_values.values()))
|
|
max_r2 = np.max(list(self.r_squared_values.values()))
|
|
|
|
print(f"✓ Partition plots fitted")
|
|
print(f"\nR² Statistics:")
|
|
print(f" Mean R²: {mean_r2:.4f}")
|
|
print(f" Min R²: {min_r2:.4f}")
|
|
print(f" Max R²: {max_r2:.4f}")
|
|
print(f" Target R² (paper): 0.66")
|
|
|
|
if mean_r2 >= config.MIN_R_SQUARED:
|
|
print(f"\n✓ FRACTALITY CONFIRMED (R² = {mean_r2:.4f} >= {config.MIN_R_SQUARED})")
|
|
else:
|
|
print(f"\n✗ Weak evidence of fractality (R² = {mean_r2:.4f} < {config.MIN_R_SQUARED})")
|
|
|
|
def get_tau_q(self, q):
|
|
"""
|
|
Extract τ(q) from slope.
|
|
|
|
From paper: slope = τ(q) + 1
|
|
Therefore: τ(q) = slope - 1
|
|
|
|
Parameters:
|
|
-----------
|
|
q : float
|
|
Moment order
|
|
|
|
Returns:
|
|
--------
|
|
float or None
|
|
τ(q) value, or None if not available
|
|
"""
|
|
if q in self.slopes:
|
|
return self.slopes[q] - 1
|
|
return None
|
|
|
|
def detect_crossover_point(self, r_squared_threshold=0.85):
|
|
"""
|
|
Detect high-frequency crossover point where scaling breaks down.
|
|
|
|
Zhang mentions this is important (Figure 4 in paper).
|
|
Scaling behavior breaks down at very short timescales.
|
|
|
|
Parameters:
|
|
-----------
|
|
r_squared_threshold : float
|
|
R² threshold below which we consider scaling has broken down
|
|
|
|
Returns:
|
|
--------
|
|
int or None
|
|
Delta_t value where crossover occurs, or None if not detected
|
|
"""
|
|
if not self.partition_values:
|
|
raise ValueError("Must run compute_all_partitions() before detecting crossover")
|
|
|
|
self.crossover_scores = {}
|
|
sorted_dt = sorted(self.delta_t_values)
|
|
|
|
for start_idx, candidate_dt in enumerate(sorted_dt):
|
|
candidate_dts = sorted_dt[start_idx:]
|
|
if len(candidate_dts) < 3:
|
|
break
|
|
|
|
r2_values = []
|
|
|
|
for q in self.q_values:
|
|
if q not in self.partition_values:
|
|
continue
|
|
|
|
delta_t_list = []
|
|
S_q_list = []
|
|
|
|
for delta_t in candidate_dts:
|
|
S_q = self.partition_values[q].get(delta_t, np.nan)
|
|
if not np.isnan(S_q) and S_q > 0:
|
|
delta_t_list.append(delta_t)
|
|
S_q_list.append(S_q)
|
|
|
|
if len(delta_t_list) < 3:
|
|
continue
|
|
|
|
log_delta_t = np.log10(delta_t_list)
|
|
log_S_q = np.log10(S_q_list)
|
|
_, _, r_value, _, _ = stats.linregress(log_delta_t, log_S_q)
|
|
r2_values.append(r_value ** 2)
|
|
|
|
if r2_values:
|
|
self.crossover_scores[candidate_dt] = np.mean(r2_values)
|
|
|
|
# Find first minimum delta_t where the truncated scaling region is strong.
|
|
for dt, mean_r2 in self.crossover_scores.items():
|
|
if mean_r2 >= r_squared_threshold:
|
|
if self.verbose:
|
|
print(f"\nHigh-frequency crossover detected at Δt = {dt} observations")
|
|
print(f"Scaling behavior valid for Δt >= {dt}")
|
|
print(f"Mean truncated R² = {mean_r2:.4f}")
|
|
return dt
|
|
|
|
if self.verbose:
|
|
print("\nNo high-frequency crossover detected with current threshold")
|
|
|
|
return None
|
|
|
|
def plot_partition_function(self, q_values_to_plot=None, save_path=None):
|
|
"""
|
|
Create partition plots (log S_q vs log Δt) for selected q values.
|
|
|
|
Parameters:
|
|
-----------
|
|
q_values_to_plot : list, optional
|
|
List of q values to plot. If None, plots a selection of q values.
|
|
save_path : str, optional
|
|
Path to save the plot. If None, displays the plot.
|
|
"""
|
|
if q_values_to_plot is None:
|
|
# Select a subset of q values for clearer visualization
|
|
q_values_to_plot = [1, 2, 3, 4, 5]
|
|
|
|
plt.figure(figsize=(12, 8))
|
|
|
|
colors = plt.cm.viridis(np.linspace(0, 1, len(q_values_to_plot)))
|
|
|
|
for idx, q in enumerate(q_values_to_plot):
|
|
if q not in self.partition_values:
|
|
continue
|
|
|
|
# Get data points
|
|
delta_t_list = []
|
|
S_q_list = []
|
|
|
|
for delta_t in sorted(self.partition_values[q].keys()):
|
|
S_q = self.partition_values[q][delta_t]
|
|
if not np.isnan(S_q) and S_q > 0:
|
|
delta_t_list.append(delta_t)
|
|
S_q_list.append(S_q)
|
|
|
|
if len(delta_t_list) == 0:
|
|
continue
|
|
|
|
# Convert to log scale
|
|
log_delta_t = np.log10(delta_t_list)
|
|
log_S_q = np.log10(S_q_list)
|
|
|
|
# Plot data points and regression line
|
|
if q in self.slopes:
|
|
r2 = self.r_squared_values[q]
|
|
plt.plot(log_delta_t, log_S_q, 'o-',
|
|
color=colors[idx],
|
|
label=f'q = {q} (R² = {r2:.3f})',
|
|
alpha=0.7, markersize=4)
|
|
|
|
# Plot regression line
|
|
fitted_line = self.slopes[q] * log_delta_t + self.intercepts[q]
|
|
plt.plot(log_delta_t, fitted_line, '--',
|
|
color=colors[idx], alpha=0.5, linewidth=1)
|
|
|
|
plt.xlabel('log₁₀(Δt) [number of observations]', fontsize=12)
|
|
plt.ylabel('log₁₀(S_q)', fontsize=12)
|
|
plt.title('Partition Function: Evidence of Moment Scaling\n(Linear relationship indicates multifractality)',
|
|
fontsize=14, fontweight='bold')
|
|
plt.legend(fontsize=9)
|
|
plt.grid(True, alpha=0.3)
|
|
|
|
# Add R² information
|
|
if len(self.r_squared_values) > 0:
|
|
mean_r2 = np.mean(list(self.r_squared_values.values()))
|
|
plt.text(0.02, 0.98, f'Mean R² = {mean_r2:.4f}\nThreshold = {config.MIN_R_SQUARED}',
|
|
transform=plt.gca().transAxes,
|
|
verticalalignment='top',
|
|
fontsize=10,
|
|
bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))
|
|
|
|
plt.tight_layout()
|
|
|
|
if save_path:
|
|
plt.savefig(save_path, dpi=300, bbox_inches='tight')
|
|
if self.verbose:
|
|
print(f"Plot saved to: {save_path}")
|
|
else:
|
|
plt.show()
|
|
|
|
plt.close()
|
|
|
|
def plot_r_squared_vs_q(self, save_path=None):
|
|
"""
|
|
Plot R² values as a function of q.
|
|
|
|
This shows how well the linear scaling holds across different moment orders.
|
|
"""
|
|
if len(self.r_squared_values) == 0:
|
|
print("No R² values available. Run fit_partition_plots() first.")
|
|
return
|
|
|
|
q_vals = sorted(self.r_squared_values.keys())
|
|
r2_vals = [self.r_squared_values[q] for q in q_vals]
|
|
|
|
plt.figure(figsize=(10, 6))
|
|
plt.plot(q_vals, r2_vals, 'o-', linewidth=2, markersize=6, color='steelblue')
|
|
plt.axhline(y=config.MIN_R_SQUARED, color='r', linestyle='--',
|
|
linewidth=2, label=f'Threshold = {config.MIN_R_SQUARED}')
|
|
plt.xlabel('q (moment order)', fontsize=12)
|
|
plt.ylabel('R² (goodness of fit)', fontsize=12)
|
|
plt.title('Linearity of Scaling Relationship across Moment Orders', fontsize=14, fontweight='bold')
|
|
plt.legend(fontsize=10)
|
|
plt.grid(True, alpha=0.3)
|
|
plt.ylim([0, 1.05])
|
|
|
|
mean_r2 = np.mean(r2_vals)
|
|
plt.text(0.98, 0.02, f'Mean R² = {mean_r2:.4f}',
|
|
transform=plt.gca().transAxes,
|
|
horizontalalignment='right',
|
|
verticalalignment='bottom',
|
|
fontsize=11,
|
|
bbox=dict(boxstyle='round', facecolor='lightgreen', alpha=0.5))
|
|
|
|
plt.tight_layout()
|
|
|
|
if save_path:
|
|
plt.savefig(save_path, dpi=300, bbox_inches='tight')
|
|
if self.verbose:
|
|
print(f"Plot saved to: {save_path}")
|
|
else:
|
|
plt.show()
|
|
|
|
plt.close()
|
|
|
|
def get_results_dataframe(self):
|
|
"""
|
|
Get results as a pandas DataFrame for further analysis.
|
|
|
|
Returns:
|
|
--------
|
|
pd.DataFrame
|
|
DataFrame with columns: q, slope, tau_q, intercept, r_squared
|
|
"""
|
|
results = []
|
|
for q in sorted(self.slopes.keys()):
|
|
results.append({
|
|
'q': q,
|
|
'slope': self.slopes[q],
|
|
'tau_q': self.slopes[q] - 1, # τ(q) = slope - 1
|
|
'intercept': self.intercepts[q],
|
|
'r_squared': self.r_squared_values[q]
|
|
})
|
|
|
|
return pd.DataFrame(results)
|
|
|
|
def save_results(self, filepath):
|
|
"""
|
|
Save results to CSV file.
|
|
|
|
Parameters:
|
|
-----------
|
|
filepath : str
|
|
Path to save CSV file
|
|
"""
|
|
df = self.get_results_dataframe()
|
|
df.to_csv(filepath, index=False)
|
|
|
|
if self.verbose:
|
|
print(f"Results saved to: {filepath}")
|
|
|
|
def is_fractal(self, threshold=None):
|
|
"""
|
|
Determine if data exhibits fractal behavior.
|
|
|
|
Parameters:
|
|
-----------
|
|
threshold : float, optional
|
|
R² threshold. If None, uses config.MIN_R_SQUARED
|
|
|
|
Returns:
|
|
--------
|
|
bool
|
|
True if mean R² exceeds threshold
|
|
"""
|
|
if threshold is None:
|
|
threshold = config.MIN_R_SQUARED
|
|
|
|
if len(self.r_squared_values) == 0:
|
|
return False
|
|
|
|
mean_r2 = np.mean(list(self.r_squared_values.values()))
|
|
return mean_r2 >= threshold
|
|
|
|
|
|
def run_fractality_check(returns, output_dir=None, save_plots=True):
|
|
"""
|
|
Complete workflow for checking fractality.
|
|
|
|
Parameters:
|
|
-----------
|
|
returns : np.ndarray
|
|
Array of log returns
|
|
output_dir : str, optional
|
|
Directory to save results
|
|
save_plots : bool
|
|
Whether to save plots
|
|
|
|
Returns:
|
|
--------
|
|
FractalityChecker
|
|
Checker object with results
|
|
"""
|
|
# Create output directory
|
|
if output_dir is None:
|
|
output_dir = config.PLOT_DIR
|
|
|
|
Path(output_dir).mkdir(parents=True, exist_ok=True)
|
|
|
|
# Initialize checker
|
|
checker = FractalityChecker(returns)
|
|
|
|
# Run analysis
|
|
print("\n" + "="*60)
|
|
print("STEP 1: CHECKING FOR FRACTALITY")
|
|
print("="*60)
|
|
|
|
checker.compute_all_partitions()
|
|
checker.fit_partition_plots()
|
|
|
|
# Detect crossover point
|
|
checker.detect_crossover_point()
|
|
|
|
# Save results
|
|
if config.SAVE_INTERMEDIATE:
|
|
results_path = Path(output_dir) / "step1_partition_results.csv"
|
|
checker.save_results(results_path)
|
|
|
|
# Create plots
|
|
if save_plots:
|
|
partition_plot_path = Path(output_dir) / "step1_partition_plots.png"
|
|
checker.plot_partition_function(q_values_to_plot=[1, 2, 3, 4, 5],
|
|
save_path=partition_plot_path)
|
|
|
|
r2_plot_path = Path(output_dir) / "step1_r_squared.png"
|
|
checker.plot_r_squared_vs_q(save_path=r2_plot_path)
|
|
|
|
# Print conclusion
|
|
print("\n" + "="*60)
|
|
if checker.is_fractal():
|
|
print("✓ CONCLUSION: Data exhibits multifractal behavior")
|
|
print(" → Proceed to Step 2: Extract scaling function τ(q)")
|
|
else:
|
|
print("✗ CONCLUSION: Weak evidence of multifractal behavior")
|
|
print(" → Consider using different data or parameters")
|
|
print("="*60 + "\n")
|
|
|
|
return checker
|
|
|
|
|
|
if __name__ == "__main__":
|
|
print("Step 1: Check Fractality")
|
|
print("="*60)
|
|
print("\nThis script checks if your price data exhibits multifractal behavior.")
|
|
print("You need to provide returns data to run the analysis.\n")
|
|
|
|
# Demo with synthetic multifractal data
|
|
print("Generating synthetic multifractal data for demonstration...")
|
|
|
|
# Simple synthetic data (not truly multifractal, just for demo)
|
|
np.random.seed(42)
|
|
n_points = 10000
|
|
returns = np.random.normal(0, 0.001, n_points) * (1 + 0.5 * np.sin(np.arange(n_points) / 100))
|
|
|
|
print(f"Generated {n_points} synthetic returns\n")
|
|
|
|
# Run analysis
|
|
checker = run_fractality_check(returns, save_plots=True)
|
|
|
|
print("\nTo use with real data:")
|
|
print("1. Load your data with DataLoader")
|
|
print("2. Extract returns array")
|
|
print("3. Call run_fractality_check(returns)")
|