#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Volatility Relative-Value Strategy (Minimal Viable) for MetaTrader 5 -------------------------------------------------------------------- This script implements a production-style skeleton for a market-neutral options strategy using MetaTrader 5's Python API. It focuses on: 1) Data ingestion (spot + option chain) 2) Implied vol estimation (ATM IV per expiry via Black–Scholes inversion) 3) Realized-volatility forecasting with HAR-RV 4) Signal: Variance Risk Premium (VRP) ≈ IV^2 - RV_forecast^2 5) Trade templates: short ATM straddle with calendar hedge (if available) 6) Cost-aware sizing, risk limits, position/risk checks 7) Execution via MT5 orders IMPORTANT - You must have MetaTrader5 terminal installed and logged in to a broker account. - The Python package 'MetaTrader5' must be installed (pip install MetaTrader5). - Options availability depends on your broker/symbols. Adjust SYMBOL_MAP/filters. - This script is a reference implementation; backtest & harden before production. Author: ChatGPT Date: 2025-09-29 """ import math import time import logging from dataclasses import dataclass, field from typing import List, Dict, Optional, Tuple import numpy as np import pandas as pd from datetime import datetime, timedelta, timezone try: import MetaTrader5 as mt5 except Exception as e: raise SystemExit("Please install MetaTrader5: pip install MetaTrader5") from e # ----------------------------- CONFIGURATION ----------------------------- # @dataclass class Config: # Base underlying symbol (e.g., 'Si-12.25' on MOEX futures, or FX pair like 'EURUSD') UNDERLYING: str = "EURUSD" # Risk-free rate (annualized, decimal). Adjust or wire to an external source. RISK_FREE_RATE: float = 0.01 # Dividend yield/foreign rate for FX parity; often ~0 for FX. For FX: r_dom - r_for DIVIDEND_YIELD: float = 0.00 # Option symbol filters (broker-specific). Provide regex fragments to detect CALL/PUT and expiry. # Example patterns must be tailored to your broker's naming. See 'discover_option_symbols' helper. OPTION_CALL_TAGS: Tuple[str, ...] = ("C", "CALL") OPTION_PUT_TAGS: Tuple[str, ...] = ("P", "PUT") # Regex-like expiry locator (very broker-specific). Provide a lambda to parse expiry from symbol name. # Fallback: query mt5.symbol_info() and read custom properties if your broker exposes them. # For demo, we assume symbols like EURUSD_C_20251219_1.1000 EXPIRY_PARSER = staticmethod(lambda s: None) # Replace with your parser returning datetime # Trading horizon and bars for RV forecasting RV_BAR_TIMEFRAME = mt5.TIMEFRAME_M5 RV_LOOKBACK_DAYS: int = 30 HAR_FREQS: Tuple[int, int, int] = (1, 5, 22) # daily, weekly(~5d), monthly(~22d) components # Signal / trade thresholds MIN_IV: float = 0.02 # 2% floor to avoid degenerate IV MAX_IV: float = 1.00 # 100% cap VRP_ENTRY_BPS: float = 150 # enter when IV^2 - RV^2 >= 150 bps (0.015) VRP_EXIT_BPS: float = 50 # exit when VRP <= 50 bps # Risk controls MAX_GROSS_VEGA: float = 100000.0 MAX_NET_GAMMA: float = 0.0 # keep near-neutral by construction MAX_POS_PER_EXPIRY: int = 20 MAX_SLIPPAGE_PIPS: float = 1.0 MAX_SPREAD_PIPS: float = 1.5 NOTIONAL_PER_TRADE: float = 100000.0 # adjust for your broker contract sizes # Execution REHEDGE_DELTA_THRESHOLD: float = 0.05 # rebalance if |delta| exceeds 0.05 POLL_SEC: int = 60 # ------------------------------- LOGGING -------------------------------- # logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[logging.StreamHandler()], ) # -------------------------- UTILS / FINANCE ----------------------------- # def norm_cdf(x: float) -> float: return 0.5 * (1.0 + math.erf(x / math.sqrt(2.0))) def norm_pdf(x: float) -> float: return math.exp(-0.5 * x * x) / math.sqrt(2 * math.pi) def bs_price(spot: float, strike: float, t: float, r: float, q: float, vol: float, call: bool) -> float: if t <= 0 or vol <= 0 or spot <= 0 or strike <= 0: return max(0.0, (spot * math.exp(-q*t) - strike * math.exp(-r*t)) if call else (strike * math.exp(-r*t) - spot * math.exp(-q*t))) d1 = (math.log(spot/strike) + (r - q + 0.5*vol*vol)*t) / (vol*math.sqrt(t)) d2 = d1 - vol*math.sqrt(t) if call: return spot*math.exp(-q*t)*norm_cdf(d1) - strike*math.exp(-r*t)*norm_cdf(d2) else: return strike*math.exp(-r*t)*norm_cdf(-d2) - spot*math.exp(-q*t)*norm_cdf(-d1) def bs_vega(spot: float, strike: float, t: float, r: float, q: float, vol: float) -> float: if t <= 0 or vol <= 0 or spot <= 0 or strike <= 0: return 0.0 d1 = (math.log(spot/strike) + (r - q + 0.5*vol*vol)*t) / (vol*math.sqrt(t)) return spot*math.exp(-q*t)*norm_pdf(d1)*math.sqrt(t) def iv_from_price(price: float, spot: float, strike: float, t: float, r: float, q: float, call: bool, tol: float=1e-6, max_iter: int=100) -> Optional[float]: # Brent-like root finder on vol for BS price - market price = 0 if price <= 0: return None low, high = 1e-4, 5.0 f_low = bs_price(spot, strike, t, r, q, low, call) - price f_high = bs_price(spot, strike, t, r, q, high, call) - price if f_low * f_high > 0: return None for _ in range(max_iter): mid = 0.5*(low+high) f_mid = bs_price(spot, strike, t, r, q, mid, call) - price if abs(f_mid) < tol: return max(min(mid, 5.0), 1e-4) if f_low * f_mid <= 0: high, f_high = mid, f_mid else: low, f_low = mid, f_mid return None def realized_vol_annualized(returns: np.ndarray, freq_per_day: int) -> float: # standard deviation of intraday returns scaled to annual (252 days) if len(returns) < 2: return np.nan daily_var = (returns.reshape(-1, freq_per_day)**2).sum(axis=1) # naive, assumes integer multiple ann_var = daily_var.mean() * 252.0 return float(np.sqrt(max(ann_var, 0.0))) def har_rv_forecast(rvs: pd.Series) -> float: """ HAR-RV(1,5,22) on realized variance (square of daily RV). Return sqrt of forecasted variance. rvs: daily realized volatility (not variance). We square internally. """ if len(rvs) < 22: return float(rvs.iloc[-1]) rv2 = rvs**2 rv_d = rv2.rolling(1).mean() rv_w = rv2.rolling(5).mean() rv_m = rv2.rolling(22).mean() # OLS coefficients with ridge-like shrinkage for stability X = np.column_stack([np.ones(len(rv2)), rv_d, rv_w, rv_m]) y = rv2.shift(-1) # predict next day mask = ~np.isnan(X).any(axis=1) & ~np.isnan(y.values) X, y = X[mask], y.values[mask] if len(y) < 30: return float(np.sqrt(rv2.iloc[-1])) # ridge lam = 1e-6 beta = np.linalg.solve(X.T@X + lam*np.eye(X.shape[1]), X.T@y) last = np.array([1.0, rv_d.iloc[-1], rv_w.iloc[-1], rv_m.iloc[-1]]) rv2_hat = float(last @ beta) rv2_hat = max(rv2_hat, 0.0) return float(np.sqrt(rv2_hat)) # --------------------------- MT5 INTEGRATION ---------------------------- # def mt5_init() -> None: if not mt5.initialize(): raise SystemExit(f"MT5 initialize() failed: {mt5.last_error()}") account_info = mt5.account_info() if account_info is None: raise SystemExit("MT5 account_info() returned None. Is terminal logged in?") logging.info(f"Connected to MT5 account: {account_info.login} | broker: {account_info.company}") def get_spot_series(symbol: str, timeframe=mt5.TIMEFRAME_M5, days: int=30) -> pd.DataFrame: utc_to = datetime.utcnow() utc_from = utc_to - timedelta(days=days) rates = mt5.copy_rates_range(symbol, timeframe, utc_from, utc_to) if rates is None or len(rates) == 0: raise RuntimeError(f"No rates for {symbol}") df = pd.DataFrame(rates) df['time'] = pd.to_datetime(df['time'], unit='s', utc=True) df.set_index('time', inplace=True) return df def mid_price(symbol_info) -> Optional[float]: tick = mt5.symbol_info_tick(symbol_info.name) if tick is None: return None if tick.ask == 0 or tick.bid == 0: return None return 0.5*(tick.ask + tick.bid) def discover_option_symbols(underlying: str) -> List[str]: """ Attempt to list option symbols linked to underlying. This is BROKER-SPECIFIC. Here we gather symbols containing the underlying string. """ items = mt5.symbols_get("*" + underlying + "*") out = [] for s in items: name = s.name.upper() if any(tag in name for tag in ("C", "P", "CALL", "PUT")) and s.visible: out.append(s.name) return sorted(set(out)) def classify_option(symbol_name: str, cfg: Config) -> Optional[Tuple[str, float, datetime]]: """ Attempt to parse (type, strike, expiry) from symbol name using naive rules. Replace with robust parser for your broker. """ up = symbol_name.upper() call = any(tag in up for tag in cfg.OPTION_CALL_TAGS) put = any(tag in up for tag in cfg.OPTION_PUT_TAGS) if call == put: return None # crude strike parse: look for last number in name import re nums = re.findall(r"(\d+\.\d+|\d+)", up) strike = None if nums: try: strike = float(nums[-1]) except: pass expiry = cfg.EXPIRY_PARSER(symbol_name) if strike is None or expiry is None: return None return ("CALL" if call else "PUT", strike, expiry) def option_mid_price(sym: str) -> Optional[float]: info = mt5.symbol_info(sym) if info is None or not info.visible: return None return mid_price(info) def atm_iv_for_expiry(option_syms: List[str], spot: float, t: float, r: float, q: float) -> Optional[Tuple[float, str, float]]: """ Compute ATM IV using the nearest-to-ATM call or put (mid price) with strike closest to spot. Return (iv, symbol_used, strike) """ best = None best_diff = float("inf") for sym in option_syms: info = mt5.symbol_info(sym) if info is None or not info.visible: continue mp = option_mid_price(sym) if mp is None or mp <= 0: continue # crude strike extraction (again, broker-specific; this demo uses last number) cls = classify_option(sym, CFG) if cls is None: continue _type, strike, _expiry = cls diff = abs(strike - spot) if diff < best_diff: is_call = (_type == "CALL") iv = iv_from_price(mp, spot, strike, t, r, q, is_call) if iv is not None and CFG.MIN_IV <= iv <= CFG.MAX_IV: best = (iv, sym, strike) best_diff = diff return best # ----------------------------- STRATEGY --------------------------------- # CFG = Config() def build_daily_rv(symbol: str) -> pd.Series: df = get_spot_series(symbol, CFG.RV_BAR_TIMEFRAME, CFG.RV_LOOKBACK_DAYS) # compute log returns per bar rets = np.log(df['close']).diff().dropna().values # approximate bars per day # Caution: FX/CFD sessions vary. Estimate from data. idx = df.index.tz_convert("UTC") days = pd.to_datetime(idx.date).unique() bars_per_day = int(round(len(df)/len(days))) # aggregate to daily realized vol (sqrt of sum of intraday squared returns) # we pad to full multiples n = (len(rets)//bars_per_day)*bars_per_day rv = [] for i in range(0, n, bars_per_day): chunk = rets[i:i+bars_per_day] rv.append(np.sqrt(np.sum(chunk**2))) rv = pd.Series(rv, index=pd.to_datetime(days[:len(rv)])) # annualize daily RV to sigma (multiply by sqrt(252)) rv_annual = rv * np.sqrt(252.0) return rv_annual def next_signal() -> Optional[Dict]: # 1) Spot and RV forecast spot_info = mt5.symbol_info(CFG.UNDERLYING) if spot_info is None or not spot_info.visible: logging.error(f"Underlying {CFG.UNDERLYING} not visible") return None spot = mid_price(spot_info) if spot is None: logging.error("No spot mid price") return None rv_daily = build_daily_rv(CFG.UNDERLYING) rv_forecast = har_rv_forecast(rv_daily) # annualized # 2) Option chain discovery and expiries grouping (very broker-specific) opt_syms = discover_option_symbols(CFG.UNDERLYING) if not opt_syms: logging.warning("No option symbols discovered. Check naming filters & broker support.") return None # naive expiry grouping using parser buckets: Dict[datetime, List[str]] = {} for s in opt_syms: cls = classify_option(s, CFG) if cls is None: continue _t, _k, expiry = cls buckets.setdefault(expiry, []).append(s) if not buckets: logging.warning("No parsable options. Provide a correct EXPIRY_PARSER and tags.") return None # 3) For each expiry, compute ATM IV and VRP candidates = [] now = datetime.utcnow().replace(tzinfo=timezone.utc) for expiry, syms in buckets.items(): t = max((expiry - now).total_seconds(), 0) / (365.0*24*3600.0) if t <= 0: continue atm = atm_iv_for_expiry(syms, spot, t, CFG.RISK_FREE_RATE, CFG.DIVIDEND_YIELD) if atm is None: continue iv_atm, iv_sym, k_atm = atm vrp = iv_atm**2 - rv_forecast**2 candidates.append({ "expiry": expiry, "t": t, "iv": iv_atm, "vrp": vrp, "atm_symbol": iv_sym, "k_atm": k_atm, "spot": spot }) if not candidates: logging.info("No candidate expiries with valid ATM IV.") return None # choose best expiry by highest VRP best = max(candidates, key=lambda x: x["vrp"]) logging.info(f"Best VRP expiry {best['expiry']} | IV={best['iv']:.4f} | RVf={rv_forecast:.4f} | VRP={best['vrp']:.4f}") # Entry/exit logic if best["vrp"] >= CFG.VRP_ENTRY_BPS/10000.0: action = "ENTER_SHORT_STRADDLE" elif best["vrp"] <= CFG.VRP_EXIT_BPS/10000.0: action = "EXIT_STRADDLE" else: action = "HOLD" return {"signal": action, "selection": best, "rv_forecast": rv_forecast} # ----------------------------- EXECUTION -------------------------------- # def ensure_symbol(symbol: str) -> bool: info = mt5.symbol_info(symbol) if info is None: return False if not info.visible: return mt5.symbol_select(symbol, True) return True def place_market_order(symbol: str, volume: float, order_type: int, comment: str="") -> Optional[int]: if not ensure_symbol(symbol): logging.error(f"Symbol not available: {symbol}") return None tick = mt5.symbol_info_tick(symbol) if tick is None: logging.error(f"No tick for {symbol}") return None price = tick.ask if order_type in (mt5.ORDER_TYPE_BUY, mt5.ORDER_TYPE_BUY_LIMIT, mt5.ORDER_TYPE_BUY_STOP) else tick.bid req = { "action": mt5.TRADE_ACTION_DEAL, "symbol": symbol, "volume": volume, "type": order_type, "price": price, "deviation": 10, "magic": 271828, "comment": comment, "type_filling": mt5.ORDER_FILLING_FOK, "type_time": mt5.ORDER_TIME_GTC, } res = mt5.order_send(req) if res is None: logging.error(f"order_send failed for {symbol}") return None if res.retcode != mt5.TRADE_RETCODE_DONE: logging.error(f"Order failed: {res.retcode} | {res.comment}") return None logging.info(f"Order placed: ticket={res.order} {symbol} {volume}") return res.order def straddle_symbols(atm_symbol: str, cfg: Config) -> Optional[Tuple[str, str]]: """ Given an ATM option symbol used for IV, try to find its matching call/put pair. If atm_symbol is a call, find the corresponding put with same strike/expiry, and vice versa. Implementation is broker-specific; here we search siblings sharing common prefix & strike. """ # Fallback: assume naming differs only by C/P tag up = atm_symbol.upper() for ctag in cfg.OPTION_CALL_TAGS: for ptag in cfg.OPTION_PUT_TAGS: if ctag in up: alt = up.replace(ctag, ptag) if mt5.symbol_info(alt) is not None: return (atm_symbol, alt) # (call, put) if ptag in up: alt = up.replace(ptag, ctag) if mt5.symbol_info(alt) is not None: return (alt, atm_symbol) # (call, put) return None def trade_short_atm_straddle(selection: Dict) -> None: atm_sym = selection["atm_symbol"] pair = straddle_symbols(atm_sym, CFG) if pair is None: logging.error("Could not locate matching call/put for straddle.") return call_sym, put_sym = pair # Determine volume (lot size) from notional heuristic call_info = mt5.symbol_info(call_sym) put_info = mt5.symbol_info(put_sym) if not call_info or not put_info: logging.error("Symbol info missing for straddle legs.") return # Heuristic: 1 lot by default; refine using contract size if available vol = 1.0 logging.info(f"Selling ATM straddle: {call_sym} & {put_sym}, volume={vol}") place_market_order(call_sym, vol, mt5.ORDER_TYPE_SELL, comment="VRP short call") place_market_order(put_sym, vol, mt5.ORDER_TYPE_SELL, comment="VRP short put") def main_loop(): mt5_init() try: while True: sig = next_signal() if sig is not None: action = sig["signal"] sel = sig["selection"] if action == "ENTER_SHORT_STRADDLE": trade_short_atm_straddle(sel) elif action == "EXIT_STRADDLE": logging.info("Exit logic placeholder: implement position lookup & closing orders.") else: logging.info("No trade action. Holding.") time.sleep(CFG.POLL_SEC) finally: mt5.shutdown() if __name__ == "__main__": main_loop()