394 lines
12 KiB
Python
394 lines
12 KiB
Python
"""
|
|
Step 5: Generate Fractional Brownian Motion (FBM)
|
|
Creates FBM B_H(t) with self-affinity index H
|
|
|
|
From paper Section 2.a:
|
|
- FBM is a generalization of Brownian motion with long memory
|
|
- Characterized by Hurst exponent H
|
|
- H > 0.5: Persistent (trending)
|
|
- H = 0.5: Standard Brownian motion
|
|
- H < 0.5: Anti-persistent (mean-reverting)
|
|
|
|
Implementation uses Davies-Harte method (exact simulation)
|
|
"""
|
|
|
|
import numpy as np
|
|
from scipy.linalg import toeplitz, sqrtm
|
|
import matplotlib.pyplot as plt
|
|
from pathlib import Path
|
|
import config
|
|
|
|
|
|
class FBMGenerator:
|
|
"""
|
|
Generates Fractional Brownian Motion using Davies-Harte method.
|
|
|
|
FBM is a Gaussian process with stationary increments and
|
|
long-range dependence controlled by the Hurst parameter H.
|
|
"""
|
|
|
|
def __init__(self, H, n_points, verbose=config.VERBOSE):
|
|
"""
|
|
Initialize FBM generator.
|
|
|
|
Parameters:
|
|
-----------
|
|
H : float
|
|
Hurst exponent (self-affinity index) from Step 2
|
|
n_points : int
|
|
Number of points to generate
|
|
verbose : bool
|
|
Print detailed information
|
|
"""
|
|
self.H = H
|
|
self.n_points = n_points
|
|
self.verbose = verbose
|
|
|
|
# Storage
|
|
self.fbm = None
|
|
self.fbm_increments = None
|
|
|
|
if self.verbose:
|
|
print(f"\nFBM Generator Initialized")
|
|
print(f"H = {H:.4f}")
|
|
print(f"Number of points = {n_points}")
|
|
|
|
# Interpret H
|
|
if abs(H - 0.5) < 0.05:
|
|
print(f" → H ≈ 0.5: Close to standard Brownian motion")
|
|
elif H > 0.5:
|
|
print(f" → H > 0.5: Persistent (trending) behavior")
|
|
else:
|
|
print(f" → H < 0.5: Anti-persistent (mean-reverting) behavior")
|
|
|
|
def fbm_autocovariance(self, k):
|
|
"""
|
|
Calculate autocovariance function of FBM increments.
|
|
|
|
For FBM with Hurst parameter H:
|
|
γ(k) = (1/2) * [|k+1|^(2H) - 2|k|^(2H) + |k-1|^(2H)]
|
|
|
|
This is the covariance between increments separated by k steps.
|
|
|
|
Parameters:
|
|
-----------
|
|
k : int or array
|
|
Lag
|
|
|
|
Returns:
|
|
--------
|
|
float or array
|
|
Autocovariance at lag k
|
|
"""
|
|
k = np.abs(k)
|
|
return 0.5 * (np.abs(k + 1)**(2*self.H) - 2*np.abs(k)**(2*self.H) + np.abs(k - 1)**(2*self.H))
|
|
|
|
def generate_fbm_davies_harte(self):
|
|
"""
|
|
Generate FBM using Davies-Harte (exact) method.
|
|
|
|
This method is exact for FBM and computationally efficient.
|
|
|
|
Algorithm:
|
|
1. Construct circulant covariance matrix from autocovariance
|
|
2. Use FFT to generate correlated Gaussian increments
|
|
3. Cumsum to get FBM path
|
|
|
|
Returns:
|
|
--------
|
|
np.ndarray
|
|
FBM path B_H(t)
|
|
"""
|
|
if self.verbose:
|
|
print("\nGenerating FBM using Davies-Harte method...")
|
|
|
|
n = self.n_points
|
|
|
|
# Build covariance vector for circulant embedding
|
|
# First row of circulant matrix
|
|
r = np.zeros(2 * n)
|
|
for k in range(n):
|
|
r[k] = self.fbm_autocovariance(k)
|
|
|
|
# Mirror for circulant property
|
|
for k in range(1, n):
|
|
r[2*n - k] = r[k]
|
|
|
|
# Eigenvalues via FFT
|
|
lam = np.fft.fft(r).real
|
|
|
|
# Check if all eigenvalues are non-negative (required for valid covariance)
|
|
if np.any(lam < -1e-10):
|
|
if self.verbose:
|
|
print(" ⚠️ Negative eigenvalues detected, using Cholesky fallback")
|
|
return self.generate_fbm_cholesky()
|
|
|
|
# Ensure non-negative
|
|
lam = np.maximum(lam, 0)
|
|
|
|
# Generate complex Gaussian random variables
|
|
# Real and imaginary parts are independent N(0,1)
|
|
Z_real = np.random.randn(2*n)
|
|
Z_imag = np.random.randn(2*n)
|
|
Z = Z_real + 1j * Z_imag
|
|
|
|
# Scale by sqrt of eigenvalues
|
|
W = np.sqrt(lam / (2*n)) * Z
|
|
|
|
# IFFT to get correlated increments
|
|
w = np.fft.ifft(W)
|
|
|
|
# Take real part and first n values
|
|
increments = w[:n].real
|
|
|
|
# Cumulative sum to get FBM
|
|
self.fbm_increments = increments
|
|
self.fbm = np.cumsum(increments)
|
|
|
|
# Center at zero
|
|
self.fbm = self.fbm - self.fbm[0]
|
|
|
|
if self.verbose:
|
|
print(f" ✓ FBM generated: {len(self.fbm)} points")
|
|
print(f" FBM range: [{self.fbm.min():.4f}, {self.fbm.max():.4f}]")
|
|
print(f" Std dev: {np.std(self.fbm):.4f}")
|
|
|
|
return self.fbm
|
|
|
|
def generate_fbm_cholesky(self):
|
|
"""
|
|
Fallback method using Cholesky decomposition.
|
|
|
|
Slower but more stable for some parameter values.
|
|
|
|
Returns:
|
|
--------
|
|
np.ndarray
|
|
FBM path
|
|
"""
|
|
if self.verbose:
|
|
print("\nGenerating FBM using Cholesky method...")
|
|
|
|
n = self.n_points
|
|
|
|
# Build covariance matrix
|
|
cov_matrix = np.zeros((n, n))
|
|
for i in range(n):
|
|
for j in range(n):
|
|
cov_matrix[i, j] = self.fbm_autocovariance(i - j)
|
|
|
|
# Cholesky decomposition
|
|
try:
|
|
L = np.linalg.cholesky(cov_matrix)
|
|
except np.linalg.LinAlgError:
|
|
# Add small diagonal for numerical stability
|
|
cov_matrix += 1e-10 * np.eye(n)
|
|
L = np.linalg.cholesky(cov_matrix)
|
|
|
|
# Generate independent Gaussian random variables
|
|
Z = np.random.randn(n)
|
|
|
|
# Correlated increments
|
|
increments = L @ Z
|
|
|
|
# Cumulative sum
|
|
self.fbm_increments = increments
|
|
self.fbm = np.cumsum(increments)
|
|
self.fbm = self.fbm - self.fbm[0]
|
|
|
|
if self.verbose:
|
|
print(f" ✓ FBM generated: {len(self.fbm)} points")
|
|
|
|
return self.fbm
|
|
|
|
def scale_fbm(self, volatility):
|
|
"""
|
|
Scale FBM by observed volatility.
|
|
|
|
From paper Section 5.b:
|
|
"The FBM is scaled by the sample standard deviation of returns at
|
|
the configured forecast interval"
|
|
|
|
Parameters:
|
|
-----------
|
|
volatility : float
|
|
Sample standard deviation to match
|
|
|
|
Returns:
|
|
--------
|
|
np.ndarray
|
|
Scaled FBM
|
|
"""
|
|
if self.fbm is None:
|
|
raise ValueError("Must generate FBM first")
|
|
|
|
# Current std dev
|
|
current_std = np.std(self.fbm_increments)
|
|
|
|
# Scale factor
|
|
scale_factor = volatility / current_std if current_std > 0 else 1.0
|
|
|
|
# Apply scaling
|
|
self.fbm = self.fbm * scale_factor
|
|
self.fbm_increments = self.fbm_increments * scale_factor
|
|
|
|
if self.verbose:
|
|
print(f"\nScaled FBM to match volatility = {volatility:.6f}")
|
|
print(f" Scale factor = {scale_factor:.4f}")
|
|
|
|
return self.fbm
|
|
|
|
def plot_fbm(self, save_path=None):
|
|
"""Visualize the generated FBM."""
|
|
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
|
|
|
|
t = np.arange(len(self.fbm))
|
|
|
|
# Top left: FBM path
|
|
ax = axes[0, 0]
|
|
ax.plot(t, self.fbm, 'b-', linewidth=1, alpha=0.8)
|
|
ax.set_xlabel('Time', fontsize=11)
|
|
ax.set_ylabel('B_H(t)', fontsize=11)
|
|
ax.set_title(f'Fractional Brownian Motion\n(H = {self.H:.4f})',
|
|
fontsize=12, fontweight='bold')
|
|
ax.grid(True, alpha=0.3)
|
|
ax.axhline(y=0, color='k', linestyle='--', alpha=0.3)
|
|
|
|
# Top right: FBM increments
|
|
ax = axes[0, 1]
|
|
ax.plot(t[:-1], self.fbm_increments[1:], 'r-', linewidth=0.5, alpha=0.7)
|
|
ax.set_xlabel('Time', fontsize=11)
|
|
ax.set_ylabel('dB_H', fontsize=11)
|
|
ax.set_title('FBM Increments\n(Returns)', fontsize=12, fontweight='bold')
|
|
ax.grid(True, alpha=0.3)
|
|
ax.axhline(y=0, color='k', linestyle='--', alpha=0.3)
|
|
|
|
# Bottom left: Histogram of increments
|
|
ax = axes[1, 0]
|
|
ax.hist(self.fbm_increments, bins=50, color='green', alpha=0.6, edgecolor='black', density=True)
|
|
ax.set_xlabel('Increment value', fontsize=11)
|
|
ax.set_ylabel('Density', fontsize=11)
|
|
ax.set_title('Distribution of Increments\n(Should be Gaussian)', fontsize=12, fontweight='bold')
|
|
ax.grid(True, alpha=0.3)
|
|
|
|
# Overlay normal distribution
|
|
x = np.linspace(self.fbm_increments.min(), self.fbm_increments.max(), 100)
|
|
from scipy.stats import norm
|
|
ax.plot(x, norm.pdf(x, np.mean(self.fbm_increments), np.std(self.fbm_increments)),
|
|
'r-', linewidth=2, label='Normal fit')
|
|
ax.legend(fontsize=9)
|
|
|
|
# Bottom right: ACF of increments (should show long memory)
|
|
ax = axes[1, 1]
|
|
max_lag = min(100, len(self.fbm_increments) // 10)
|
|
acf = np.correlate(self.fbm_increments - self.fbm_increments.mean(),
|
|
self.fbm_increments - self.fbm_increments.mean(),
|
|
mode='full')
|
|
acf = acf[len(acf)//2:]
|
|
acf = acf / acf[0] # Normalize
|
|
|
|
ax.plot(range(max_lag), acf[:max_lag], 'o-', markersize=3)
|
|
ax.set_xlabel('Lag', fontsize=11)
|
|
ax.set_ylabel('ACF', fontsize=11)
|
|
ax.set_title('Autocorrelation Function\n(Long memory)', fontsize=12, fontweight='bold')
|
|
ax.grid(True, alpha=0.3)
|
|
ax.axhline(y=0, color='k', linestyle='--', alpha=0.3)
|
|
|
|
# Add confidence interval
|
|
conf_interval = 1.96 / np.sqrt(len(self.fbm_increments))
|
|
ax.axhline(y=conf_interval, color='r', linestyle='--', alpha=0.5)
|
|
ax.axhline(y=-conf_interval, color='r', linestyle='--', alpha=0.5)
|
|
|
|
plt.tight_layout()
|
|
|
|
if save_path:
|
|
plt.savefig(save_path, dpi=300, bbox_inches='tight')
|
|
if self.verbose:
|
|
print(f"Plot saved to: {save_path}")
|
|
else:
|
|
plt.show()
|
|
|
|
plt.close()
|
|
|
|
|
|
def run_fbm_generation(extractor, n_points=None, output_dir=None,
|
|
save_plots=True, random_seed=config.RANDOM_SEED):
|
|
"""
|
|
Complete workflow for generating FBM.
|
|
|
|
Parameters:
|
|
-----------
|
|
extractor : ScalingExtractor
|
|
Completed extractor from Step 2 (for H parameter)
|
|
n_points : int, optional
|
|
Number of points to generate (default: 2^10 = 1024)
|
|
output_dir : str, optional
|
|
Directory to save results
|
|
save_plots : bool
|
|
Whether to save plots
|
|
random_seed : int, optional
|
|
Seed for reproducible FBM generation
|
|
|
|
Returns:
|
|
--------
|
|
FBMGenerator
|
|
Generator object with FBM
|
|
"""
|
|
# Default to 2^10 points
|
|
if n_points is None:
|
|
n_points = config.CASCADE_B ** config.CASCADE_K_MAX
|
|
|
|
# Create output directory
|
|
if output_dir is None:
|
|
output_dir = config.PLOT_DIR
|
|
|
|
Path(output_dir).mkdir(parents=True, exist_ok=True)
|
|
|
|
if random_seed is not None:
|
|
np.random.seed(random_seed)
|
|
|
|
# Initialize generator
|
|
generator = FBMGenerator(H=extractor.H, n_points=n_points)
|
|
|
|
# Run analysis
|
|
print("\n" + "="*60)
|
|
print("STEP 5: GENERATING FRACTIONAL BROWNIAN MOTION")
|
|
print("="*60)
|
|
|
|
# Generate FBM
|
|
generator.generate_fbm_davies_harte()
|
|
|
|
# Create plots
|
|
if save_plots:
|
|
fbm_plot_path = Path(output_dir) / "step5_fbm.png"
|
|
generator.plot_fbm(save_path=fbm_plot_path)
|
|
|
|
# Save FBM generator
|
|
if config.SAVE_INTERMEDIATE:
|
|
import pickle
|
|
save_path = Path(config.OUTPUT_DIR) / "step5_fbm_generator.pkl"
|
|
with open(save_path, 'wb') as f:
|
|
pickle.dump(generator, f)
|
|
if generator.verbose:
|
|
print(f"\nFBM generator saved to: {save_path}")
|
|
|
|
# Print summary
|
|
print("\n" + "="*60)
|
|
print("STEP 5 COMPLETE")
|
|
print("="*60)
|
|
print(f"\nFractional Brownian Motion generated")
|
|
print(f"H = {generator.H:.4f}")
|
|
print(f"Grid points: {len(generator.fbm)}")
|
|
print(f"FBM std dev: {np.std(generator.fbm_increments):.6f}")
|
|
|
|
print(f"\n→ Next: Run python run_step6.py")
|
|
print("="*60 + "\n")
|
|
|
|
return generator
|
|
|
|
|
|
if __name__ == "__main__":
|
|
print("Step 5: Generate Fractional Brownian Motion")
|
|
print("="*60)
|
|
print("\nThis script generates FBM with Hurst parameter H from Step 2.")
|
|
print("You must run Steps 1-2 first.\n")
|