HAR-RV/mt5_vol_rv_strategy.py
2025-09-30 01:03:16 +00:00

463 lines
18 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Volatility Relative-Value Strategy (Minimal Viable) for MetaTrader 5
--------------------------------------------------------------------
This script implements a production-style skeleton for a market-neutral options
strategy using MetaTrader 5's Python API. It focuses on:
1) Data ingestion (spot + option chain)
2) Implied vol estimation (ATM IV per expiry via Black–Scholes inversion)
3) Realized-volatility forecasting with HAR-RV
4) Signal: Variance Risk Premium (VRP) ≈ IV^2 - RV_forecast^2
5) Trade templates: short ATM straddle with calendar hedge (if available)
6) Cost-aware sizing, risk limits, position/risk checks
7) Execution via MT5 orders
IMPORTANT
- You must have MetaTrader5 terminal installed and logged in to a broker account.
- The Python package 'MetaTrader5' must be installed (pip install MetaTrader5).
- Options availability depends on your broker/symbols. Adjust SYMBOL_MAP/filters.
- This script is a reference implementation; backtest & harden before production.
Author: ChatGPT
Date: 2025-09-29
"""
import math
import time
import logging
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Tuple
import numpy as np
import pandas as pd
from datetime import datetime, timedelta, timezone
try:
import MetaTrader5 as mt5
except Exception as e:
raise SystemExit("Please install MetaTrader5: pip install MetaTrader5") from e
# ----------------------------- CONFIGURATION ----------------------------- #
@dataclass
class Config:
# Base underlying symbol (e.g., 'Si-12.25' on MOEX futures, or FX pair like 'EURUSD')
UNDERLYING: str = "EURUSD"
# Risk-free rate (annualized, decimal). Adjust or wire to an external source.
RISK_FREE_RATE: float = 0.01
# Dividend yield/foreign rate for FX parity; often ~0 for FX. For FX: r_dom - r_for
DIVIDEND_YIELD: float = 0.00
# Option symbol filters (broker-specific). Provide regex fragments to detect CALL/PUT and expiry.
# Example patterns must be tailored to your broker's naming. See 'discover_option_symbols' helper.
OPTION_CALL_TAGS: Tuple[str, ...] = ("C", "CALL")
OPTION_PUT_TAGS: Tuple[str, ...] = ("P", "PUT")
# Regex-like expiry locator (very broker-specific). Provide a lambda to parse expiry from symbol name.
# Fallback: query mt5.symbol_info() and read custom properties if your broker exposes them.
# For demo, we assume symbols like EURUSD_C_20251219_1.1000
EXPIRY_PARSER = staticmethod(lambda s: None) # Replace with your parser returning datetime
# Trading horizon and bars for RV forecasting
RV_BAR_TIMEFRAME = mt5.TIMEFRAME_M5
RV_LOOKBACK_DAYS: int = 30
HAR_FREQS: Tuple[int, int, int] = (1, 5, 22) # daily, weekly(~5d), monthly(~22d) components
# Signal / trade thresholds
MIN_IV: float = 0.02 # 2% floor to avoid degenerate IV
MAX_IV: float = 1.00 # 100% cap
VRP_ENTRY_BPS: float = 150 # enter when IV^2 - RV^2 >= 150 bps (0.015)
VRP_EXIT_BPS: float = 50 # exit when VRP <= 50 bps
# Risk controls
MAX_GROSS_VEGA: float = 100000.0
MAX_NET_GAMMA: float = 0.0 # keep near-neutral by construction
MAX_POS_PER_EXPIRY: int = 20
MAX_SLIPPAGE_PIPS: float = 1.0
MAX_SPREAD_PIPS: float = 1.5
NOTIONAL_PER_TRADE: float = 100000.0 # adjust for your broker contract sizes
# Execution
REHEDGE_DELTA_THRESHOLD: float = 0.05 # rebalance if |delta| exceeds 0.05
POLL_SEC: int = 60
# ------------------------------- LOGGING -------------------------------- #
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[logging.StreamHandler()],
)
# -------------------------- UTILS / FINANCE ----------------------------- #
def norm_cdf(x: float) -> float:
return 0.5 * (1.0 + math.erf(x / math.sqrt(2.0)))
def norm_pdf(x: float) -> float:
return math.exp(-0.5 * x * x) / math.sqrt(2 * math.pi)
def bs_price(spot: float, strike: float, t: float, r: float, q: float, vol: float, call: bool) -> float:
if t <= 0 or vol <= 0 or spot <= 0 or strike <= 0:
return max(0.0, (spot * math.exp(-q*t) - strike * math.exp(-r*t)) if call else (strike * math.exp(-r*t) - spot * math.exp(-q*t)))
d1 = (math.log(spot/strike) + (r - q + 0.5*vol*vol)*t) / (vol*math.sqrt(t))
d2 = d1 - vol*math.sqrt(t)
if call:
return spot*math.exp(-q*t)*norm_cdf(d1) - strike*math.exp(-r*t)*norm_cdf(d2)
else:
return strike*math.exp(-r*t)*norm_cdf(-d2) - spot*math.exp(-q*t)*norm_cdf(-d1)
def bs_vega(spot: float, strike: float, t: float, r: float, q: float, vol: float) -> float:
if t <= 0 or vol <= 0 or spot <= 0 or strike <= 0:
return 0.0
d1 = (math.log(spot/strike) + (r - q + 0.5*vol*vol)*t) / (vol*math.sqrt(t))
return spot*math.exp(-q*t)*norm_pdf(d1)*math.sqrt(t)
def iv_from_price(price: float, spot: float, strike: float, t: float, r: float, q: float, call: bool,
tol: float=1e-6, max_iter: int=100) -> Optional[float]:
# Brent-like root finder on vol for BS price - market price = 0
if price <= 0:
return None
low, high = 1e-4, 5.0
f_low = bs_price(spot, strike, t, r, q, low, call) - price
f_high = bs_price(spot, strike, t, r, q, high, call) - price
if f_low * f_high > 0:
return None
for _ in range(max_iter):
mid = 0.5*(low+high)
f_mid = bs_price(spot, strike, t, r, q, mid, call) - price
if abs(f_mid) < tol:
return max(min(mid, 5.0), 1e-4)
if f_low * f_mid <= 0:
high, f_high = mid, f_mid
else:
low, f_low = mid, f_mid
return None
def realized_vol_annualized(returns: np.ndarray, freq_per_day: int) -> float:
# standard deviation of intraday returns scaled to annual (252 days)
if len(returns) < 2:
return np.nan
daily_var = (returns.reshape(-1, freq_per_day)**2).sum(axis=1) # naive, assumes integer multiple
ann_var = daily_var.mean() * 252.0
return float(np.sqrt(max(ann_var, 0.0)))
def har_rv_forecast(rvs: pd.Series) -> float:
"""
HAR-RV(1,5,22) on realized variance (square of daily RV). Return sqrt of forecasted variance.
rvs: daily realized volatility (not variance). We square internally.
"""
if len(rvs) < 22:
return float(rvs.iloc[-1])
rv2 = rvs**2
rv_d = rv2.rolling(1).mean()
rv_w = rv2.rolling(5).mean()
rv_m = rv2.rolling(22).mean()
# OLS coefficients with ridge-like shrinkage for stability
X = np.column_stack([np.ones(len(rv2)), rv_d, rv_w, rv_m])
y = rv2.shift(-1) # predict next day
mask = ~np.isnan(X).any(axis=1) & ~np.isnan(y.values)
X, y = X[mask], y.values[mask]
if len(y) < 30:
return float(np.sqrt(rv2.iloc[-1]))
# ridge
lam = 1e-6
beta = np.linalg.solve(X.T@X + lam*np.eye(X.shape[1]), X.T@y)
last = np.array([1.0, rv_d.iloc[-1], rv_w.iloc[-1], rv_m.iloc[-1]])
rv2_hat = float(last @ beta)
rv2_hat = max(rv2_hat, 0.0)
return float(np.sqrt(rv2_hat))
# --------------------------- MT5 INTEGRATION ---------------------------- #
def mt5_init() -> None:
if not mt5.initialize():
raise SystemExit(f"MT5 initialize() failed: {mt5.last_error()}")
account_info = mt5.account_info()
if account_info is None:
raise SystemExit("MT5 account_info() returned None. Is terminal logged in?")
logging.info(f"Connected to MT5 account: {account_info.login} | broker: {account_info.company}")
def get_spot_series(symbol: str, timeframe=mt5.TIMEFRAME_M5, days: int=30) -> pd.DataFrame:
utc_to = datetime.utcnow()
utc_from = utc_to - timedelta(days=days)
rates = mt5.copy_rates_range(symbol, timeframe, utc_from, utc_to)
if rates is None or len(rates) == 0:
raise RuntimeError(f"No rates for {symbol}")
df = pd.DataFrame(rates)
df['time'] = pd.to_datetime(df['time'], unit='s', utc=True)
df.set_index('time', inplace=True)
return df
def mid_price(symbol_info) -> Optional[float]:
tick = mt5.symbol_info_tick(symbol_info.name)
if tick is None:
return None
if tick.ask == 0 or tick.bid == 0:
return None
return 0.5*(tick.ask + tick.bid)
def discover_option_symbols(underlying: str) -> List[str]:
"""
Attempt to list option symbols linked to underlying.
This is BROKER-SPECIFIC. Here we gather symbols containing the underlying string.
"""
items = mt5.symbols_get("*" + underlying + "*")
out = []
for s in items:
name = s.name.upper()
if any(tag in name for tag in ("C", "P", "CALL", "PUT")) and s.visible:
out.append(s.name)
return sorted(set(out))
def classify_option(symbol_name: str, cfg: Config) -> Optional[Tuple[str, float, datetime]]:
"""
Attempt to parse (type, strike, expiry) from symbol name using naive rules.
Replace with robust parser for your broker.
"""
up = symbol_name.upper()
call = any(tag in up for tag in cfg.OPTION_CALL_TAGS)
put = any(tag in up for tag in cfg.OPTION_PUT_TAGS)
if call == put:
return None
# crude strike parse: look for last number in name
import re
nums = re.findall(r"(\d+\.\d+|\d+)", up)
strike = None
if nums:
try:
strike = float(nums[-1])
except:
pass
expiry = cfg.EXPIRY_PARSER(symbol_name)
if strike is None or expiry is None:
return None
return ("CALL" if call else "PUT", strike, expiry)
def option_mid_price(sym: str) -> Optional[float]:
info = mt5.symbol_info(sym)
if info is None or not info.visible:
return None
return mid_price(info)
def atm_iv_for_expiry(option_syms: List[str], spot: float, t: float, r: float, q: float) -> Optional[Tuple[float, str, float]]:
"""
Compute ATM IV using the nearest-to-ATM call or put (mid price) with strike closest to spot.
Return (iv, symbol_used, strike)
"""
best = None
best_diff = float("inf")
for sym in option_syms:
info = mt5.symbol_info(sym)
if info is None or not info.visible:
continue
mp = option_mid_price(sym)
if mp is None or mp <= 0:
continue
# crude strike extraction (again, broker-specific; this demo uses last number)
cls = classify_option(sym, CFG)
if cls is None:
continue
_type, strike, _expiry = cls
diff = abs(strike - spot)
if diff < best_diff:
is_call = (_type == "CALL")
iv = iv_from_price(mp, spot, strike, t, r, q, is_call)
if iv is not None and CFG.MIN_IV <= iv <= CFG.MAX_IV:
best = (iv, sym, strike)
best_diff = diff
return best
# ----------------------------- STRATEGY --------------------------------- #
CFG = Config()
def build_daily_rv(symbol: str) -> pd.Series:
df = get_spot_series(symbol, CFG.RV_BAR_TIMEFRAME, CFG.RV_LOOKBACK_DAYS)
# compute log returns per bar
rets = np.log(df['close']).diff().dropna().values
# approximate bars per day
# Caution: FX/CFD sessions vary. Estimate from data.
idx = df.index.tz_convert("UTC")
days = pd.to_datetime(idx.date).unique()
bars_per_day = int(round(len(df)/len(days)))
# aggregate to daily realized vol (sqrt of sum of intraday squared returns)
# we pad to full multiples
n = (len(rets)//bars_per_day)*bars_per_day
rv = []
for i in range(0, n, bars_per_day):
chunk = rets[i:i+bars_per_day]
rv.append(np.sqrt(np.sum(chunk**2)))
rv = pd.Series(rv, index=pd.to_datetime(days[:len(rv)]))
# annualize daily RV to sigma (multiply by sqrt(252))
rv_annual = rv * np.sqrt(252.0)
return rv_annual
def next_signal() -> Optional[Dict]:
# 1) Spot and RV forecast
spot_info = mt5.symbol_info(CFG.UNDERLYING)
if spot_info is None or not spot_info.visible:
logging.error(f"Underlying {CFG.UNDERLYING} not visible")
return None
spot = mid_price(spot_info)
if spot is None:
logging.error("No spot mid price")
return None
rv_daily = build_daily_rv(CFG.UNDERLYING)
rv_forecast = har_rv_forecast(rv_daily) # annualized
# 2) Option chain discovery and expiries grouping (very broker-specific)
opt_syms = discover_option_symbols(CFG.UNDERLYING)
if not opt_syms:
logging.warning("No option symbols discovered. Check naming filters & broker support.")
return None
# naive expiry grouping using parser
buckets: Dict[datetime, List[str]] = {}
for s in opt_syms:
cls = classify_option(s, CFG)
if cls is None:
continue
_t, _k, expiry = cls
buckets.setdefault(expiry, []).append(s)
if not buckets:
logging.warning("No parsable options. Provide a correct EXPIRY_PARSER and tags.")
return None
# 3) For each expiry, compute ATM IV and VRP
candidates = []
now = datetime.utcnow().replace(tzinfo=timezone.utc)
for expiry, syms in buckets.items():
t = max((expiry - now).total_seconds(), 0) / (365.0*24*3600.0)
if t <= 0:
continue
atm = atm_iv_for_expiry(syms, spot, t, CFG.RISK_FREE_RATE, CFG.DIVIDEND_YIELD)
if atm is None:
continue
iv_atm, iv_sym, k_atm = atm
vrp = iv_atm**2 - rv_forecast**2
candidates.append({
"expiry": expiry, "t": t, "iv": iv_atm, "vrp": vrp,
"atm_symbol": iv_sym, "k_atm": k_atm, "spot": spot
})
if not candidates:
logging.info("No candidate expiries with valid ATM IV.")
return None
# choose best expiry by highest VRP
best = max(candidates, key=lambda x: x["vrp"])
logging.info(f"Best VRP expiry {best['expiry']} | IV={best['iv']:.4f} | RVf={rv_forecast:.4f} | VRP={best['vrp']:.4f}")
# Entry/exit logic
if best["vrp"] >= CFG.VRP_ENTRY_BPS/10000.0:
action = "ENTER_SHORT_STRADDLE"
elif best["vrp"] <= CFG.VRP_EXIT_BPS/10000.0:
action = "EXIT_STRADDLE"
else:
action = "HOLD"
return {"signal": action, "selection": best, "rv_forecast": rv_forecast}
# ----------------------------- EXECUTION -------------------------------- #
def ensure_symbol(symbol: str) -> bool:
info = mt5.symbol_info(symbol)
if info is None:
return False
if not info.visible:
return mt5.symbol_select(symbol, True)
return True
def place_market_order(symbol: str, volume: float, order_type: int, comment: str="") -> Optional[int]:
if not ensure_symbol(symbol):
logging.error(f"Symbol not available: {symbol}")
return None
tick = mt5.symbol_info_tick(symbol)
if tick is None:
logging.error(f"No tick for {symbol}")
return None
price = tick.ask if order_type in (mt5.ORDER_TYPE_BUY, mt5.ORDER_TYPE_BUY_LIMIT, mt5.ORDER_TYPE_BUY_STOP) else tick.bid
req = {
"action": mt5.TRADE_ACTION_DEAL,
"symbol": symbol,
"volume": volume,
"type": order_type,
"price": price,
"deviation": 10,
"magic": 271828,
"comment": comment,
"type_filling": mt5.ORDER_FILLING_FOK,
"type_time": mt5.ORDER_TIME_GTC,
}
res = mt5.order_send(req)
if res is None:
logging.error(f"order_send failed for {symbol}")
return None
if res.retcode != mt5.TRADE_RETCODE_DONE:
logging.error(f"Order failed: {res.retcode} | {res.comment}")
return None
logging.info(f"Order placed: ticket={res.order} {symbol} {volume}")
return res.order
def straddle_symbols(atm_symbol: str, cfg: Config) -> Optional[Tuple[str, str]]:
"""
Given an ATM option symbol used for IV, try to find its matching call/put pair.
If atm_symbol is a call, find the corresponding put with same strike/expiry, and vice versa.
Implementation is broker-specific; here we search siblings sharing common prefix & strike.
"""
# Fallback: assume naming differs only by C/P tag
up = atm_symbol.upper()
for ctag in cfg.OPTION_CALL_TAGS:
for ptag in cfg.OPTION_PUT_TAGS:
if ctag in up:
alt = up.replace(ctag, ptag)
if mt5.symbol_info(alt) is not None:
return (atm_symbol, alt) # (call, put)
if ptag in up:
alt = up.replace(ptag, ctag)
if mt5.symbol_info(alt) is not None:
return (alt, atm_symbol) # (call, put)
return None
def trade_short_atm_straddle(selection: Dict) -> None:
atm_sym = selection["atm_symbol"]
pair = straddle_symbols(atm_sym, CFG)
if pair is None:
logging.error("Could not locate matching call/put for straddle.")
return
call_sym, put_sym = pair
# Determine volume (lot size) from notional heuristic
call_info = mt5.symbol_info(call_sym)
put_info = mt5.symbol_info(put_sym)
if not call_info or not put_info:
logging.error("Symbol info missing for straddle legs.")
return
# Heuristic: 1 lot by default; refine using contract size if available
vol = 1.0
logging.info(f"Selling ATM straddle: {call_sym} & {put_sym}, volume={vol}")
place_market_order(call_sym, vol, mt5.ORDER_TYPE_SELL, comment="VRP short call")
place_market_order(put_sym, vol, mt5.ORDER_TYPE_SELL, comment="VRP short put")
def main_loop():
mt5_init()
try:
while True:
sig = next_signal()
if sig is not None:
action = sig["signal"]
sel = sig["selection"]
if action == "ENTER_SHORT_STRADDLE":
trade_short_atm_straddle(sel)
elif action == "EXIT_STRADDLE":
logging.info("Exit logic placeholder: implement position lookup & closing orders.")
else:
logging.info("No trade action. Holding.")
time.sleep(CFG.POLL_SEC)
finally:
mt5.shutdown()
if __name__ == "__main__":
main_loop()