463 lines
18 KiB
Python
463 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
Volatility Relative-Value Strategy (Minimal Viable) for MetaTrader 5
|
|
--------------------------------------------------------------------
|
|
This script implements a production-style skeleton for a market-neutral options
|
|
strategy using MetaTrader 5's Python API. It focuses on:
|
|
1) Data ingestion (spot + option chain)
|
|
2) Implied vol estimation (ATM IV per expiry via Black–Scholes inversion)
|
|
3) Realized-volatility forecasting with HAR-RV
|
|
4) Signal: Variance Risk Premium (VRP) ≈ IV^2 - RV_forecast^2
|
|
5) Trade templates: short ATM straddle with calendar hedge (if available)
|
|
6) Cost-aware sizing, risk limits, position/risk checks
|
|
7) Execution via MT5 orders
|
|
|
|
IMPORTANT
|
|
- You must have MetaTrader5 terminal installed and logged in to a broker account.
|
|
- The Python package 'MetaTrader5' must be installed (pip install MetaTrader5).
|
|
- Options availability depends on your broker/symbols. Adjust SYMBOL_MAP/filters.
|
|
- This script is a reference implementation; backtest & harden before production.
|
|
|
|
Author: ChatGPT
|
|
Date: 2025-09-29
|
|
"""
|
|
|
|
import math
|
|
import time
|
|
import logging
|
|
from dataclasses import dataclass, field
|
|
from typing import List, Dict, Optional, Tuple
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
try:
|
|
import MetaTrader5 as mt5
|
|
except Exception as e:
|
|
raise SystemExit("Please install MetaTrader5: pip install MetaTrader5") from e
|
|
|
|
# ----------------------------- CONFIGURATION ----------------------------- #
|
|
|
|
@dataclass
|
|
class Config:
|
|
# Base underlying symbol (e.g., 'Si-12.25' on MOEX futures, or FX pair like 'EURUSD')
|
|
UNDERLYING: str = "EURUSD"
|
|
# Risk-free rate (annualized, decimal). Adjust or wire to an external source.
|
|
RISK_FREE_RATE: float = 0.01
|
|
# Dividend yield/foreign rate for FX parity; often ~0 for FX. For FX: r_dom - r_for
|
|
DIVIDEND_YIELD: float = 0.00
|
|
|
|
# Option symbol filters (broker-specific). Provide regex fragments to detect CALL/PUT and expiry.
|
|
# Example patterns must be tailored to your broker's naming. See 'discover_option_symbols' helper.
|
|
OPTION_CALL_TAGS: Tuple[str, ...] = ("C", "CALL")
|
|
OPTION_PUT_TAGS: Tuple[str, ...] = ("P", "PUT")
|
|
# Regex-like expiry locator (very broker-specific). Provide a lambda to parse expiry from symbol name.
|
|
# Fallback: query mt5.symbol_info() and read custom properties if your broker exposes them.
|
|
# For demo, we assume symbols like EURUSD_C_20251219_1.1000
|
|
EXPIRY_PARSER = staticmethod(lambda s: None) # Replace with your parser returning datetime
|
|
|
|
# Trading horizon and bars for RV forecasting
|
|
RV_BAR_TIMEFRAME = mt5.TIMEFRAME_M5
|
|
RV_LOOKBACK_DAYS: int = 30
|
|
HAR_FREQS: Tuple[int, int, int] = (1, 5, 22) # daily, weekly(~5d), monthly(~22d) components
|
|
|
|
# Signal / trade thresholds
|
|
MIN_IV: float = 0.02 # 2% floor to avoid degenerate IV
|
|
MAX_IV: float = 1.00 # 100% cap
|
|
VRP_ENTRY_BPS: float = 150 # enter when IV^2 - RV^2 >= 150 bps (0.015)
|
|
VRP_EXIT_BPS: float = 50 # exit when VRP <= 50 bps
|
|
|
|
# Risk controls
|
|
MAX_GROSS_VEGA: float = 100000.0
|
|
MAX_NET_GAMMA: float = 0.0 # keep near-neutral by construction
|
|
MAX_POS_PER_EXPIRY: int = 20
|
|
MAX_SLIPPAGE_PIPS: float = 1.0
|
|
MAX_SPREAD_PIPS: float = 1.5
|
|
NOTIONAL_PER_TRADE: float = 100000.0 # adjust for your broker contract sizes
|
|
|
|
# Execution
|
|
REHEDGE_DELTA_THRESHOLD: float = 0.05 # rebalance if |delta| exceeds 0.05
|
|
POLL_SEC: int = 60
|
|
|
|
# ------------------------------- LOGGING -------------------------------- #
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(message)s",
|
|
handlers=[logging.StreamHandler()],
|
|
)
|
|
|
|
# -------------------------- UTILS / FINANCE ----------------------------- #
|
|
|
|
def norm_cdf(x: float) -> float:
|
|
return 0.5 * (1.0 + math.erf(x / math.sqrt(2.0)))
|
|
|
|
def norm_pdf(x: float) -> float:
|
|
return math.exp(-0.5 * x * x) / math.sqrt(2 * math.pi)
|
|
|
|
def bs_price(spot: float, strike: float, t: float, r: float, q: float, vol: float, call: bool) -> float:
|
|
if t <= 0 or vol <= 0 or spot <= 0 or strike <= 0:
|
|
return max(0.0, (spot * math.exp(-q*t) - strike * math.exp(-r*t)) if call else (strike * math.exp(-r*t) - spot * math.exp(-q*t)))
|
|
d1 = (math.log(spot/strike) + (r - q + 0.5*vol*vol)*t) / (vol*math.sqrt(t))
|
|
d2 = d1 - vol*math.sqrt(t)
|
|
if call:
|
|
return spot*math.exp(-q*t)*norm_cdf(d1) - strike*math.exp(-r*t)*norm_cdf(d2)
|
|
else:
|
|
return strike*math.exp(-r*t)*norm_cdf(-d2) - spot*math.exp(-q*t)*norm_cdf(-d1)
|
|
|
|
def bs_vega(spot: float, strike: float, t: float, r: float, q: float, vol: float) -> float:
|
|
if t <= 0 or vol <= 0 or spot <= 0 or strike <= 0:
|
|
return 0.0
|
|
d1 = (math.log(spot/strike) + (r - q + 0.5*vol*vol)*t) / (vol*math.sqrt(t))
|
|
return spot*math.exp(-q*t)*norm_pdf(d1)*math.sqrt(t)
|
|
|
|
def iv_from_price(price: float, spot: float, strike: float, t: float, r: float, q: float, call: bool,
|
|
tol: float=1e-6, max_iter: int=100) -> Optional[float]:
|
|
# Brent-like root finder on vol for BS price - market price = 0
|
|
if price <= 0:
|
|
return None
|
|
low, high = 1e-4, 5.0
|
|
f_low = bs_price(spot, strike, t, r, q, low, call) - price
|
|
f_high = bs_price(spot, strike, t, r, q, high, call) - price
|
|
if f_low * f_high > 0:
|
|
return None
|
|
for _ in range(max_iter):
|
|
mid = 0.5*(low+high)
|
|
f_mid = bs_price(spot, strike, t, r, q, mid, call) - price
|
|
if abs(f_mid) < tol:
|
|
return max(min(mid, 5.0), 1e-4)
|
|
if f_low * f_mid <= 0:
|
|
high, f_high = mid, f_mid
|
|
else:
|
|
low, f_low = mid, f_mid
|
|
return None
|
|
|
|
def realized_vol_annualized(returns: np.ndarray, freq_per_day: int) -> float:
|
|
# standard deviation of intraday returns scaled to annual (252 days)
|
|
if len(returns) < 2:
|
|
return np.nan
|
|
daily_var = (returns.reshape(-1, freq_per_day)**2).sum(axis=1) # naive, assumes integer multiple
|
|
ann_var = daily_var.mean() * 252.0
|
|
return float(np.sqrt(max(ann_var, 0.0)))
|
|
|
|
def har_rv_forecast(rvs: pd.Series) -> float:
|
|
"""
|
|
HAR-RV(1,5,22) on realized variance (square of daily RV). Return sqrt of forecasted variance.
|
|
rvs: daily realized volatility (not variance). We square internally.
|
|
"""
|
|
if len(rvs) < 22:
|
|
return float(rvs.iloc[-1])
|
|
rv2 = rvs**2
|
|
rv_d = rv2.rolling(1).mean()
|
|
rv_w = rv2.rolling(5).mean()
|
|
rv_m = rv2.rolling(22).mean()
|
|
# OLS coefficients with ridge-like shrinkage for stability
|
|
X = np.column_stack([np.ones(len(rv2)), rv_d, rv_w, rv_m])
|
|
y = rv2.shift(-1) # predict next day
|
|
mask = ~np.isnan(X).any(axis=1) & ~np.isnan(y.values)
|
|
X, y = X[mask], y.values[mask]
|
|
if len(y) < 30:
|
|
return float(np.sqrt(rv2.iloc[-1]))
|
|
# ridge
|
|
lam = 1e-6
|
|
beta = np.linalg.solve(X.T@X + lam*np.eye(X.shape[1]), X.T@y)
|
|
last = np.array([1.0, rv_d.iloc[-1], rv_w.iloc[-1], rv_m.iloc[-1]])
|
|
rv2_hat = float(last @ beta)
|
|
rv2_hat = max(rv2_hat, 0.0)
|
|
return float(np.sqrt(rv2_hat))
|
|
|
|
# --------------------------- MT5 INTEGRATION ---------------------------- #
|
|
|
|
def mt5_init() -> None:
|
|
if not mt5.initialize():
|
|
raise SystemExit(f"MT5 initialize() failed: {mt5.last_error()}")
|
|
account_info = mt5.account_info()
|
|
if account_info is None:
|
|
raise SystemExit("MT5 account_info() returned None. Is terminal logged in?")
|
|
logging.info(f"Connected to MT5 account: {account_info.login} | broker: {account_info.company}")
|
|
|
|
def get_spot_series(symbol: str, timeframe=mt5.TIMEFRAME_M5, days: int=30) -> pd.DataFrame:
|
|
utc_to = datetime.utcnow()
|
|
utc_from = utc_to - timedelta(days=days)
|
|
rates = mt5.copy_rates_range(symbol, timeframe, utc_from, utc_to)
|
|
if rates is None or len(rates) == 0:
|
|
raise RuntimeError(f"No rates for {symbol}")
|
|
df = pd.DataFrame(rates)
|
|
df['time'] = pd.to_datetime(df['time'], unit='s', utc=True)
|
|
df.set_index('time', inplace=True)
|
|
return df
|
|
|
|
def mid_price(symbol_info) -> Optional[float]:
|
|
tick = mt5.symbol_info_tick(symbol_info.name)
|
|
if tick is None:
|
|
return None
|
|
if tick.ask == 0 or tick.bid == 0:
|
|
return None
|
|
return 0.5*(tick.ask + tick.bid)
|
|
|
|
def discover_option_symbols(underlying: str) -> List[str]:
|
|
"""
|
|
Attempt to list option symbols linked to underlying.
|
|
This is BROKER-SPECIFIC. Here we gather symbols containing the underlying string.
|
|
"""
|
|
items = mt5.symbols_get("*" + underlying + "*")
|
|
out = []
|
|
for s in items:
|
|
name = s.name.upper()
|
|
if any(tag in name for tag in ("C", "P", "CALL", "PUT")) and s.visible:
|
|
out.append(s.name)
|
|
return sorted(set(out))
|
|
|
|
def classify_option(symbol_name: str, cfg: Config) -> Optional[Tuple[str, float, datetime]]:
|
|
"""
|
|
Attempt to parse (type, strike, expiry) from symbol name using naive rules.
|
|
Replace with robust parser for your broker.
|
|
"""
|
|
up = symbol_name.upper()
|
|
call = any(tag in up for tag in cfg.OPTION_CALL_TAGS)
|
|
put = any(tag in up for tag in cfg.OPTION_PUT_TAGS)
|
|
if call == put:
|
|
return None
|
|
# crude strike parse: look for last number in name
|
|
import re
|
|
nums = re.findall(r"(\d+\.\d+|\d+)", up)
|
|
strike = None
|
|
if nums:
|
|
try:
|
|
strike = float(nums[-1])
|
|
except:
|
|
pass
|
|
expiry = cfg.EXPIRY_PARSER(symbol_name)
|
|
if strike is None or expiry is None:
|
|
return None
|
|
return ("CALL" if call else "PUT", strike, expiry)
|
|
|
|
def option_mid_price(sym: str) -> Optional[float]:
|
|
info = mt5.symbol_info(sym)
|
|
if info is None or not info.visible:
|
|
return None
|
|
return mid_price(info)
|
|
|
|
def atm_iv_for_expiry(option_syms: List[str], spot: float, t: float, r: float, q: float) -> Optional[Tuple[float, str, float]]:
|
|
"""
|
|
Compute ATM IV using the nearest-to-ATM call or put (mid price) with strike closest to spot.
|
|
Return (iv, symbol_used, strike)
|
|
"""
|
|
best = None
|
|
best_diff = float("inf")
|
|
for sym in option_syms:
|
|
info = mt5.symbol_info(sym)
|
|
if info is None or not info.visible:
|
|
continue
|
|
mp = option_mid_price(sym)
|
|
if mp is None or mp <= 0:
|
|
continue
|
|
# crude strike extraction (again, broker-specific; this demo uses last number)
|
|
cls = classify_option(sym, CFG)
|
|
if cls is None:
|
|
continue
|
|
_type, strike, _expiry = cls
|
|
diff = abs(strike - spot)
|
|
if diff < best_diff:
|
|
is_call = (_type == "CALL")
|
|
iv = iv_from_price(mp, spot, strike, t, r, q, is_call)
|
|
if iv is not None and CFG.MIN_IV <= iv <= CFG.MAX_IV:
|
|
best = (iv, sym, strike)
|
|
best_diff = diff
|
|
return best
|
|
|
|
# ----------------------------- STRATEGY --------------------------------- #
|
|
|
|
CFG = Config()
|
|
|
|
def build_daily_rv(symbol: str) -> pd.Series:
|
|
df = get_spot_series(symbol, CFG.RV_BAR_TIMEFRAME, CFG.RV_LOOKBACK_DAYS)
|
|
# compute log returns per bar
|
|
rets = np.log(df['close']).diff().dropna().values
|
|
# approximate bars per day
|
|
# Caution: FX/CFD sessions vary. Estimate from data.
|
|
idx = df.index.tz_convert("UTC")
|
|
days = pd.to_datetime(idx.date).unique()
|
|
bars_per_day = int(round(len(df)/len(days)))
|
|
# aggregate to daily realized vol (sqrt of sum of intraday squared returns)
|
|
# we pad to full multiples
|
|
n = (len(rets)//bars_per_day)*bars_per_day
|
|
rv = []
|
|
for i in range(0, n, bars_per_day):
|
|
chunk = rets[i:i+bars_per_day]
|
|
rv.append(np.sqrt(np.sum(chunk**2)))
|
|
rv = pd.Series(rv, index=pd.to_datetime(days[:len(rv)]))
|
|
# annualize daily RV to sigma (multiply by sqrt(252))
|
|
rv_annual = rv * np.sqrt(252.0)
|
|
return rv_annual
|
|
|
|
def next_signal() -> Optional[Dict]:
|
|
# 1) Spot and RV forecast
|
|
spot_info = mt5.symbol_info(CFG.UNDERLYING)
|
|
if spot_info is None or not spot_info.visible:
|
|
logging.error(f"Underlying {CFG.UNDERLYING} not visible")
|
|
return None
|
|
spot = mid_price(spot_info)
|
|
if spot is None:
|
|
logging.error("No spot mid price")
|
|
return None
|
|
|
|
rv_daily = build_daily_rv(CFG.UNDERLYING)
|
|
rv_forecast = har_rv_forecast(rv_daily) # annualized
|
|
|
|
# 2) Option chain discovery and expiries grouping (very broker-specific)
|
|
opt_syms = discover_option_symbols(CFG.UNDERLYING)
|
|
if not opt_syms:
|
|
logging.warning("No option symbols discovered. Check naming filters & broker support.")
|
|
return None
|
|
|
|
# naive expiry grouping using parser
|
|
buckets: Dict[datetime, List[str]] = {}
|
|
for s in opt_syms:
|
|
cls = classify_option(s, CFG)
|
|
if cls is None:
|
|
continue
|
|
_t, _k, expiry = cls
|
|
buckets.setdefault(expiry, []).append(s)
|
|
|
|
if not buckets:
|
|
logging.warning("No parsable options. Provide a correct EXPIRY_PARSER and tags.")
|
|
return None
|
|
|
|
# 3) For each expiry, compute ATM IV and VRP
|
|
candidates = []
|
|
now = datetime.utcnow().replace(tzinfo=timezone.utc)
|
|
for expiry, syms in buckets.items():
|
|
t = max((expiry - now).total_seconds(), 0) / (365.0*24*3600.0)
|
|
if t <= 0:
|
|
continue
|
|
atm = atm_iv_for_expiry(syms, spot, t, CFG.RISK_FREE_RATE, CFG.DIVIDEND_YIELD)
|
|
if atm is None:
|
|
continue
|
|
iv_atm, iv_sym, k_atm = atm
|
|
vrp = iv_atm**2 - rv_forecast**2
|
|
candidates.append({
|
|
"expiry": expiry, "t": t, "iv": iv_atm, "vrp": vrp,
|
|
"atm_symbol": iv_sym, "k_atm": k_atm, "spot": spot
|
|
})
|
|
|
|
if not candidates:
|
|
logging.info("No candidate expiries with valid ATM IV.")
|
|
return None
|
|
|
|
# choose best expiry by highest VRP
|
|
best = max(candidates, key=lambda x: x["vrp"])
|
|
logging.info(f"Best VRP expiry {best['expiry']} | IV={best['iv']:.4f} | RVf={rv_forecast:.4f} | VRP={best['vrp']:.4f}")
|
|
|
|
# Entry/exit logic
|
|
if best["vrp"] >= CFG.VRP_ENTRY_BPS/10000.0:
|
|
action = "ENTER_SHORT_STRADDLE"
|
|
elif best["vrp"] <= CFG.VRP_EXIT_BPS/10000.0:
|
|
action = "EXIT_STRADDLE"
|
|
else:
|
|
action = "HOLD"
|
|
|
|
return {"signal": action, "selection": best, "rv_forecast": rv_forecast}
|
|
|
|
# ----------------------------- EXECUTION -------------------------------- #
|
|
|
|
def ensure_symbol(symbol: str) -> bool:
|
|
info = mt5.symbol_info(symbol)
|
|
if info is None:
|
|
return False
|
|
if not info.visible:
|
|
return mt5.symbol_select(symbol, True)
|
|
return True
|
|
|
|
def place_market_order(symbol: str, volume: float, order_type: int, comment: str="") -> Optional[int]:
|
|
if not ensure_symbol(symbol):
|
|
logging.error(f"Symbol not available: {symbol}")
|
|
return None
|
|
tick = mt5.symbol_info_tick(symbol)
|
|
if tick is None:
|
|
logging.error(f"No tick for {symbol}")
|
|
return None
|
|
price = tick.ask if order_type in (mt5.ORDER_TYPE_BUY, mt5.ORDER_TYPE_BUY_LIMIT, mt5.ORDER_TYPE_BUY_STOP) else tick.bid
|
|
req = {
|
|
"action": mt5.TRADE_ACTION_DEAL,
|
|
"symbol": symbol,
|
|
"volume": volume,
|
|
"type": order_type,
|
|
"price": price,
|
|
"deviation": 10,
|
|
"magic": 271828,
|
|
"comment": comment,
|
|
"type_filling": mt5.ORDER_FILLING_FOK,
|
|
"type_time": mt5.ORDER_TIME_GTC,
|
|
}
|
|
res = mt5.order_send(req)
|
|
if res is None:
|
|
logging.error(f"order_send failed for {symbol}")
|
|
return None
|
|
if res.retcode != mt5.TRADE_RETCODE_DONE:
|
|
logging.error(f"Order failed: {res.retcode} | {res.comment}")
|
|
return None
|
|
logging.info(f"Order placed: ticket={res.order} {symbol} {volume}")
|
|
return res.order
|
|
|
|
def straddle_symbols(atm_symbol: str, cfg: Config) -> Optional[Tuple[str, str]]:
|
|
"""
|
|
Given an ATM option symbol used for IV, try to find its matching call/put pair.
|
|
If atm_symbol is a call, find the corresponding put with same strike/expiry, and vice versa.
|
|
Implementation is broker-specific; here we search siblings sharing common prefix & strike.
|
|
"""
|
|
# Fallback: assume naming differs only by C/P tag
|
|
up = atm_symbol.upper()
|
|
for ctag in cfg.OPTION_CALL_TAGS:
|
|
for ptag in cfg.OPTION_PUT_TAGS:
|
|
if ctag in up:
|
|
alt = up.replace(ctag, ptag)
|
|
if mt5.symbol_info(alt) is not None:
|
|
return (atm_symbol, alt) # (call, put)
|
|
if ptag in up:
|
|
alt = up.replace(ptag, ctag)
|
|
if mt5.symbol_info(alt) is not None:
|
|
return (alt, atm_symbol) # (call, put)
|
|
return None
|
|
|
|
def trade_short_atm_straddle(selection: Dict) -> None:
|
|
atm_sym = selection["atm_symbol"]
|
|
pair = straddle_symbols(atm_sym, CFG)
|
|
if pair is None:
|
|
logging.error("Could not locate matching call/put for straddle.")
|
|
return
|
|
call_sym, put_sym = pair
|
|
# Determine volume (lot size) from notional heuristic
|
|
call_info = mt5.symbol_info(call_sym)
|
|
put_info = mt5.symbol_info(put_sym)
|
|
if not call_info or not put_info:
|
|
logging.error("Symbol info missing for straddle legs.")
|
|
return
|
|
# Heuristic: 1 lot by default; refine using contract size if available
|
|
vol = 1.0
|
|
logging.info(f"Selling ATM straddle: {call_sym} & {put_sym}, volume={vol}")
|
|
place_market_order(call_sym, vol, mt5.ORDER_TYPE_SELL, comment="VRP short call")
|
|
place_market_order(put_sym, vol, mt5.ORDER_TYPE_SELL, comment="VRP short put")
|
|
|
|
def main_loop():
|
|
mt5_init()
|
|
try:
|
|
while True:
|
|
sig = next_signal()
|
|
if sig is not None:
|
|
action = sig["signal"]
|
|
sel = sig["selection"]
|
|
if action == "ENTER_SHORT_STRADDLE":
|
|
trade_short_atm_straddle(sel)
|
|
elif action == "EXIT_STRADDLE":
|
|
logging.info("Exit logic placeholder: implement position lookup & closing orders.")
|
|
else:
|
|
logging.info("No trade action. Holding.")
|
|
time.sleep(CFG.POLL_SEC)
|
|
finally:
|
|
mt5.shutdown()
|
|
|
|
if __name__ == "__main__":
|
|
main_loop()
|