mql5/crypto/advanced_analysis.py

651 righe
35 KiB
Python

import pandas as pd
import numpy as np
from typing import Dict, List, Optional
import warnings
import logging
warnings.filterwarnings('ignore')
class PEMAnalyzer:
"""
Price Equation Model (PEM) Analyzer
Based on PulseEquation EA logic: Polynomial Prediction + Trend Filter
"""
def __init__(self):
# Coefficients from MQL5 (g_coeffs)
self.coeffs = [0.2752466, 0.01058082, 0.55162082, 0.03687016, 0.27721318, 0.1483476, 0.0008025]
def predict_price(self, price_t1, price_t2):
"""
Calculate predicted price using the polynomial equation
"""
c = self.coeffs
prediction = (c[0] * price_t1 + # Linear t-1
c[1] * (price_t1 ** 2) + # Quadratic t-1
c[2] * price_t2 + # Linear t-2
c[3] * (price_t2 ** 2) + # Quadratic t-2
c[4] * (price_t1 - price_t2) + # Price change
c[5] * np.sin(price_t1) + # Cyclic
c[6]) # Constant
return prediction
def analyze(self, df: pd.DataFrame, ma_fast_period=108, ma_slow_period=60, adx_threshold=20) -> Dict[str, any]:
if len(df) < max(ma_fast_period, ma_slow_period) + 5:
return {"signal": "neutral", "strength": 0, "reason": "Insufficient Data"}
# 1. Price Prediction
# Get prices t-1 and t-2 (assuming df.iloc[-1] is current forming candle, so t-1 is -2)
price_t1 = df['close'].iloc[-2]
price_t2 = df['close'].iloc[-3]
current_price = df['close'].iloc[-1] # Approximation of current Ask/Bid
predicted_price = self.predict_price(price_t1, price_t2)
# Raw Signal
raw_signal = "neutral"
if predicted_price > current_price:
raw_signal = "buy"
elif predicted_price < current_price:
raw_signal = "sell"
# 2. Trend Filter (ADX + MA)
# Calculate MAs
ma_fast = df['close'].rolling(window=ma_fast_period).mean().iloc[-2] # Completed candle
ma_slow = df['close'].rolling(window=ma_slow_period).mean().iloc[-2]
# Calculate ADX (simplified or reuse existing)
adx = self._calculate_adx(df).iloc[-2]
is_strong_trend = adx >= adx_threshold
is_uptrend = ma_fast > ma_slow
is_downtrend = ma_fast < ma_slow
final_signal = "neutral"
strength = 0
reason = []
if raw_signal == "buy":
if is_strong_trend and is_uptrend:
final_signal = "buy"
strength = 85
reason.append(f"PEM: Predicted {predicted_price:.4f} > Curr {current_price:.4f} + Strong Uptrend (ADX={adx:.1f})")
else:
reason.append(f"PEM: Raw Buy filtered (Trend mismatch)")
elif raw_signal == "sell":
if is_strong_trend and is_downtrend:
final_signal = "sell"
strength = 85
reason.append(f"PEM: Predicted {predicted_price:.4f} < Curr {current_price:.4f} + Strong Downtrend (ADX={adx:.1f})")
else:
reason.append(f"PEM: Raw Sell filtered (Trend mismatch)")
return {
"signal": final_signal,
"strength": strength,
"reason": "; ".join(reason),
"prediction": predicted_price,
"adx": adx,
"trend": "bullish" if is_uptrend else "bearish"
}
def _calculate_adx(self, df, period=14):
highs = df['high']
lows = df['low']
closes = df['close']
plus_dm = highs.diff()
minus_dm = -lows.diff()
plus_dm[plus_dm < 0] = 0
minus_dm[minus_dm < 0] = 0
tr1 = highs - lows
tr2 = abs(highs - closes.shift())
tr3 = abs(lows - closes.shift())
tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
atr = tr.rolling(period).mean()
plus_di = 100 * (plus_dm.rolling(period).mean() / atr)
minus_di = 100 * (minus_dm.rolling(period).mean() / atr)
dx = 100 * abs(plus_di - minus_di) / (plus_di + minus_di)
adx = dx.rolling(period).mean()
return adx
class AdvancedMarketAnalysis:
"""
高级市场分析工具类
基于MQL5文章中的最佳实践,利用Python的Pandas和NumPy库
提供更强大的技术分析和机器学习功能
"""
def __init__(self):
self.indicators_cache = {}
def calculate_technical_indicators(self, df: pd.DataFrame) -> Dict[str, float]:
if len(df) < 20:
return self._get_default_indicators()
indicators = {}
# SMA
indicators['sma_20'] = df['close'].tail(20).mean()
indicators['sma_50'] = df['close'].tail(50).mean() if len(df) >= 50 else indicators['sma_20']
# EMA
indicators['ema_12'] = df['close'].ewm(span=12).mean().iloc[-1]
indicators['ema_26'] = df['close'].ewm(span=26).mean().iloc[-1]
# MACD
ema_12 = df['close'].ewm(span=12).mean()
ema_26 = df['close'].ewm(span=26).mean()
macd_line = ema_12 - ema_26
macd_signal = macd_line.ewm(span=9).mean()
indicators['macd'] = macd_line.iloc[-1]
indicators['macd_signal'] = macd_signal.iloc[-1]
indicators['macd_histogram'] = indicators['macd'] - indicators['macd_signal']
# RSI
delta = df['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / loss
indicators['rsi'] = 100 - (100 / (1 + rs.iloc[-1])) if not pd.isna(loss.iloc[-1]) and loss.iloc[-1] != 0 else 50
# Bollinger Bands
bb_period = 20
bb_std = 2
sma = df['close'].rolling(window=bb_period).mean()
std = df['close'].rolling(window=bb_period).std()
indicators['bb_upper'] = sma.iloc[-1] + (std.iloc[-1] * bb_std)
indicators['bb_lower'] = sma.iloc[-1] - (std.iloc[-1] * bb_std)
indicators['bb_middle'] = sma.iloc[-1]
# ATR
high_low = df['high'] - df['low']
high_close_prev = abs(df['high'] - df['close'].shift())
low_close_prev = abs(df['low'] - df['close'].shift())
true_range = pd.concat([high_low, high_close_prev, low_close_prev], axis=1).max(axis=1)
indicators['atr'] = true_range.rolling(window=14).mean().iloc[-1]
# Volume
indicators['volume_sma'] = df['volume'].tail(20).mean()
indicators['current_volume'] = df['volume'].iloc[-1]
indicators['volume_ratio'] = indicators['current_volume'] / indicators['volume_sma'] if indicators['volume_sma'] > 0 else 1
# Momentum
indicators['momentum_5'] = (df['close'].iloc[-1] / df['close'].iloc[-6] - 1) * 100
indicators['momentum_10'] = (df['close'].iloc[-1] / df['close'].iloc[-11] - 1) * 100
return indicators
def detect_market_regime(self, df: pd.DataFrame) -> Dict[str, any]:
if len(df) < 50:
return {"regime": "unknown", "confidence": 0.5, "description": "数据不足"}
returns = df['close'].pct_change().dropna()
volatility = returns.std() * np.sqrt(252)
price_change = (df['close'].iloc[-1] / df['close'].iloc[0] - 1) * 100
highs = df['high']
lows = df['low']
plus_dm = highs.diff()
minus_dm = -lows.diff()
plus_dm[plus_dm < 0] = 0
minus_dm[minus_dm < 0] = 0
tr = pd.concat([highs - lows, abs(highs - df['close'].shift()), abs(lows - df['close'].shift())], axis=1).max(axis=1)
atr = tr.rolling(window=14).mean()
plus_di = 100 * (plus_dm.rolling(window=14).mean() / atr)
minus_di = 100 * (minus_dm.rolling(window=14).mean() / atr)
dx = 100 * abs(plus_di - minus_di) / (plus_di + minus_di)
adx = dx.rolling(window=14).mean()
current_adx = adx.iloc[-1] if not pd.isna(adx.iloc[-1]) else 0
if volatility > 0.3:
regime = "high_volatility"
confidence = min(volatility / 0.5, 0.9)
description = "高波动市场"
elif current_adx > 25:
regime = "trending"
confidence = min(current_adx / 50, 0.9)
description = "趋势市场"
else:
regime = "ranging"
confidence = 0.7
description = "震荡市场"
return {"regime": regime, "confidence": confidence, "description": description, "volatility": volatility, "adx": current_adx, "price_change": price_change}
def generate_support_resistance(self, df: pd.DataFrame, lookback_period: int = 100) -> Dict[str, List[float]]:
if len(df) < lookback_period: lookback_period = len(df)
recent_data = df.tail(lookback_period)
recent_high = recent_data['high'].max()
recent_low = recent_data['low'].min()
price_range = recent_high - recent_low
current_price = df['close'].iloc[-1]
support_levels = []
for i in range(1, 4):
level = current_price - (price_range * 0.1 * i)
if level > recent_low: support_levels.append(float(round(level, 4)))
resistance_levels = []
for i in range(1, 4):
level = current_price + (price_range * 0.1 * i)
if level < recent_high: resistance_levels.append(float(round(level, 4)))
if not support_levels: support_levels = [float(round(recent_low, 4))]
if not resistance_levels: resistance_levels = [float(round(recent_high, 4))]
return {"support_levels": sorted(support_levels), "resistance_levels": sorted(resistance_levels), "current_price": current_price}
def calculate_risk_metrics(self, df: pd.DataFrame) -> Dict[str, float]:
if len(df) < 30: return self._get_default_risk_metrics()
returns = df['close'].pct_change().dropna()
metrics = {
"volatility": returns.std() * np.sqrt(252),
"sharpe_ratio": returns.mean() / returns.std() * np.sqrt(252) if returns.std() > 0 else 0,
"max_drawdown": self._calculate_max_drawdown(df['close']),
"var_95": np.percentile(returns, 5) * 100,
"expected_shortfall": returns[returns <= np.percentile(returns, 5)].mean() * 100,
"skewness": returns.skew(),
"kurtosis": returns.kurtosis()
}
return metrics
def _calculate_max_drawdown(self, prices: pd.Series) -> float:
cumulative_returns = (1 + prices.pct_change()).cumprod()
running_max = cumulative_returns.expanding().max()
drawdown = (cumulative_returns - running_max) / running_max
return drawdown.min() * 100
def _get_default_indicators(self) -> Dict[str, float]:
return {"sma_20": 0, "sma_50": 0, "ema_12": 0, "ema_26": 0, "macd": 0, "macd_signal": 0, "macd_histogram": 0, "rsi": 50, "bb_upper": 0, "bb_lower": 0, "bb_middle": 0, "atr": 0, "volume_sma": 0, "current_volume": 0, "volume_ratio": 1, "momentum_5": 0, "momentum_10": 0}
def generate_signal_from_indicators(self, indicators: Dict[str, float]) -> Dict[str, any]:
signal = "hold"; strength = 0; reasons = []
if indicators['macd'] > indicators['macd_signal'] and indicators['macd_histogram'] > 0: strength += 25; reasons.append("MACD金叉")
elif indicators['macd'] < indicators['macd_signal'] and indicators['macd_histogram'] < 0: strength -= 25; reasons.append("MACD死叉")
if indicators['rsi'] < 30: strength += 20; reasons.append("RSI超卖")
elif indicators['rsi'] > 70: strength -= 20; reasons.append("RSI超买")
if indicators['ema_12'] > indicators['ema_26']: strength += 15; reasons.append("EMA金叉")
elif indicators['ema_12'] < indicators['ema_26']: strength -= 15; reasons.append("EMA死叉")
current_price = indicators.get('current_price', indicators['sma_20'])
if current_price < indicators['bb_lower']: strength += 15; reasons.append("价格触及布林带下轨")
elif current_price > indicators['bb_upper']: strength -= 15; reasons.append("价格触及布林带上轨")
if indicators['volume_ratio'] > 1.5: strength += 10; reasons.append("成交量放大")
if strength >= 40: signal = "buy"
elif strength <= -40: signal = "sell"
return {"signal": signal, "strength": abs(strength), "reasons": reasons, "confidence": min(abs(strength) / 100, 1.0)}
def _get_default_risk_metrics(self) -> Dict[str, float]:
return {"volatility": 0.2, "sharpe_ratio": 0, "max_drawdown": 0, "var_95": -2, "expected_shortfall": -3, "skewness": 0, "kurtosis": 0}
def generate_analysis_summary(self, df: pd.DataFrame) -> Dict[str, any]:
if len(df) < 20: return {"summary": "数据不足", "recommendation": "hold", "confidence": 0.0}
indicators = self.calculate_technical_indicators(df)
market_regime = self.detect_market_regime(df)
risk_metrics = self.calculate_risk_metrics(df)
support_resistance = self.generate_support_resistance(df)
signal_info = self.generate_signal_from_indicators(indicators)
current_price = df['close'].iloc[-1]
recommendation = "持有"
if signal_info['signal'] == 'buy': recommendation = "强烈买入" if signal_info['strength'] > 60 else "买入"
elif signal_info['signal'] == 'sell': recommendation = "强烈卖出" if signal_info['strength'] > 60 else "卖出"
return {
"summary": f"当前价格: {current_price:.4f}",
"market_regime": market_regime['description'],
"recommendation": recommendation,
"confidence": signal_info['confidence'],
"risk_level": "" if risk_metrics['volatility'] > 0.3 else "",
"key_indicators": {"RSI": f"{indicators['rsi']:.1f}", "MACD": f"{indicators['macd']:.4f}"},
"support_levels": support_resistance['support_levels'],
"resistance_levels": support_resistance['resistance_levels']
}
def analyze_ifvg(self, df: pd.DataFrame, min_gap_points: int = 100) -> Dict[str, any]:
if len(df) < 5: return {"signal": "hold", "strength": 0, "reasons": [], "active_zones": []}
lows = df['low'].values; highs = df['high'].values; closes = df['close'].values; times = df.index
point = 1.0; min_gap = min_gap_points * point # For Crypto, points are different. Assuming 1.0 for now or passed value
fvgs = []
for i in range(2, len(df)):
if lows[i] > highs[i-2] and (lows[i] - highs[i-2]) > min_gap:
fvgs.append({'id': f"bull_{i}", 'type': 'bullish', 'top': lows[i], 'bottom': highs[i-2], 'start_time': times[i], 'mitigated': False, 'inverted': False, 'inverted_time': None})
elif lows[i-2] > highs[i] and (lows[i-2] - highs[i]) > min_gap:
fvgs.append({'id': f"bear_{i}", 'type': 'bearish', 'top': lows[i-2], 'bottom': highs[i], 'start_time': times[i], 'mitigated': False, 'inverted': False, 'inverted_time': None})
current_low = lows[i]; current_high = highs[i]; current_close = closes[i]
for fvg in fvgs:
if fvg['inverted']: continue
if not fvg['mitigated']:
if fvg['type'] == 'bullish' and current_low < fvg['bottom']: fvg['mitigated'] = True
elif fvg['type'] == 'bearish' and current_high > fvg['top']: fvg['mitigated'] = True
if fvg['mitigated']:
if fvg['type'] == 'bullish' and current_close < fvg['bottom']: fvg['inverted'] = True; fvg['inverted_time'] = times[i]
elif fvg['type'] == 'bearish' and current_close > fvg['top']: fvg['inverted'] = True; fvg['inverted_time'] = times[i]
last_time = times[-1]; signal = "hold"; strength = 0; reasons = []
latest_inversions = [f for f in fvgs if f['inverted'] and f['inverted_time'] == last_time]
for inv in latest_inversions:
if inv['type'] == 'bearish': signal = "buy"; strength = 80; reasons.append("IFVG Bullish Inversion")
elif inv['type'] == 'bullish': signal = "sell"; strength = 80; reasons.append("IFVG Bearish Inversion")
return {"signal": signal, "strength": strength, "reasons": reasons, "active_zones": [f for f in fvgs if f['start_time'] > times[-50]]}
def analyze_rvgi_cci_strategy(self, df: pd.DataFrame, sma_period: int = 30, cci_period: int = 14) -> Dict[str, any]:
if len(df) < max(sma_period, cci_period) + 5: return {"signal": "hold", "strength": 0, "reasons": []}
closes = df['close']; highs = df['high']; lows = df['low']; opens = df['open']
sma = closes.rolling(window=sma_period).mean()
tp = (highs + lows + closes) / 3
sma_tp = tp.rolling(window=cci_period).mean()
mad = tp.rolling(window=cci_period).apply(lambda x: np.mean(np.abs(x - np.mean(x))), raw=True).replace(0, 0.000001)
cci = (tp - sma_tp) / (0.015 * mad)
co = closes - opens; hl = highs - lows
num_val = (co + 2 * co.shift(1) + 2 * co.shift(2) + co.shift(3)) / 6
den_val = (hl + 2 * hl.shift(1) + 2 * hl.shift(2) + hl.shift(3)) / 6
rvi_period = 10
rvi_num = num_val.rolling(window=rvi_period).mean()
rvi_den = den_val.rolling(window=rvi_period).mean().replace(0, 0.000001)
rvi_main = (rvi_num / rvi_den).fillna(0)
rvi_signal = ((rvi_main + 2 * rvi_main.shift(1) + 2 * rvi_main.shift(2) + rvi_main.shift(3)) / 6).fillna(0)
curr = -1; prev = -2
price = closes.iloc[curr]; sma_val = sma.iloc[curr]; cci_now = cci.iloc[curr]
rvi_m_now = rvi_main.iloc[curr]; rvi_s_now = rvi_signal.iloc[curr]
rvi_m_prev = rvi_main.iloc[prev]; rvi_s_prev = rvi_signal.iloc[prev]
price_above_sma = price > sma_val; price_below_sma = price < sma_val
rvi_cross_up = (rvi_m_prev <= rvi_s_prev) and (rvi_m_now > rvi_s_now)
rvi_cross_down = (rvi_m_prev >= rvi_s_prev) and (rvi_m_now < rvi_s_now)
cci_buy = cci_now <= -100; cci_sell = cci_now >= 100
signal = "hold"; strength = 0; reasons = []
if price_above_sma and cci_sell and rvi_cross_down: signal = "sell"; strength = 75; reasons.append("RVGI死叉+CCI超买+SMA之上")
elif price_below_sma and cci_buy and rvi_cross_up: signal = "buy"; strength = 75; reasons.append("RVGI金叉+CCI超卖+SMA之下")
return {"signal": signal, "strength": strength, "reasons": reasons, "indicators": {"sma": sma_val, "cci": cci_now}}
def analyze_crt_strategy(self, df: pd.DataFrame, range_period: int = 15, confirm_period: int = 1, min_manipulation_pct: float = 5.0) -> Dict[str, any]:
if len(df) < range_period + confirm_period + 5: return {"signal": "hold", "strength": 0, "reasons": []}
prev_candle = df.iloc[-2]
range_high = prev_candle['high']; range_low = prev_candle['low']
range_open = prev_candle['open']; range_close = prev_candle['close']
is_bullish_range = range_close > range_open
current_candle = df.iloc[-1]
current_high = current_candle['high']; current_low = current_candle['low']; current_close = current_candle['close']
breakout = False; manipulation = False; manipulation_depth = 0.0
range_size = range_high - range_low
if range_size == 0: range_size = 0.0001
signal = "hold"; strength = 0; reasons = []
if is_bullish_range:
if current_low < range_low:
breakout = True; manipulation_depth = range_low - current_low
pct = (manipulation_depth / range_size) * 100
if current_close > range_low:
manipulation = True
if pct >= min_manipulation_pct: signal = "buy"; strength = 80; reasons.append(f"CRT Bullish: Downward Manipulation ({pct:.1f}%) & Reclaim")
else:
if current_high > range_high:
breakout = True; manipulation_depth = current_high - range_high
pct = (manipulation_depth / range_size) * 100
if current_close < range_high:
manipulation = True
if pct >= min_manipulation_pct: signal = "sell"; strength = 80; reasons.append(f"CRT Bearish: Upward Manipulation ({pct:.1f}%) & Reclaim")
return {"signal": signal, "strength": strength, "reasons": reasons, "range_info": {"high": range_high, "low": range_low, "direction": "bullish" if is_bullish_range else "bearish", "manipulation_pct": (manipulation_depth / range_size * 100) if breakout else 0}}
class SMCAnalyzer:
def __init__(self):
self.last_structure = "neutral"
self.ma_period = 200
self.swing_lookback = 5
self.atr_threshold = 0.002
self.allow_bos = True; self.allow_ob = True; self.allow_fvg = True; self.use_sentiment = True
def analyze(self, df: pd.DataFrame) -> Dict[str, any]:
if len(df) < 50: return {"signal": "neutral", "structure": "neutral", "reason": "Insufficient Data"}
swings = self._detect_swings(df)
structure = self._analyze_structure(df, swings)
ob_info = self._detect_order_blocks(df)
fvg_info = self._detect_fvg(df)
sweeps = self._detect_liquidity_sweeps(df, swings)
pd_info = self._analyze_premium_discount(df, swings)
signal = "neutral"; strength = 0; reasons = []
is_bullish = structure['trend'] == 'bullish'
is_bearish = structure['trend'] == 'bearish'
in_discount = pd_info['zone'] == 'discount'
in_premium = pd_info['zone'] == 'premium'
if is_bullish and in_discount:
if ob_info['signal'] == 'buy': signal = 'buy'; strength += 40; reasons.append(f"Bullish OB in Discount: {ob_info['reason']}")
if sweeps['signal'] == 'buy': signal = 'buy'; strength += 30; reasons.append(f"Liquidity Sweep (Sell-side): {sweeps['reason']}")
if fvg_info['signal'] == 'buy': signal = 'buy'; strength += 20; reasons.append(f"Bullish FVG Retest: {fvg_info['reason']}")
elif is_bearish and in_premium:
if ob_info['signal'] == 'sell': signal = 'sell'; strength += 40; reasons.append(f"Bearish OB in Premium: {ob_info['reason']}")
if sweeps['signal'] == 'sell': signal = 'sell'; strength += 30; reasons.append(f"Liquidity Sweep (Buy-side): {sweeps['reason']}")
if fvg_info['signal'] == 'sell': signal = 'sell'; strength += 20; reasons.append(f"Bearish FVG Retest: {fvg_info['reason']}")
return {"signal": signal, "strength": strength, "structure": structure['trend'], "reason": "; ".join(reasons), "details": {"swings": swings, "ob": ob_info, "fvg": fvg_info, "sweeps": sweeps, "premium_discount": pd_info}}
def _detect_swings(self, df):
highs = df['high'].values; lows = df['low'].values; n = len(df)
swing_highs = []; swing_lows = []
for i in range(5, n-5):
if all(highs[i] >= highs[i-j] for j in range(1, 6)) and all(highs[i] > highs[i+j] for j in range(1, 6)): swing_highs.append((i, highs[i]))
if all(lows[i] <= lows[i-j] for j in range(1, 6)) and all(lows[i] < lows[i+j] for j in range(1, 6)): swing_lows.append((i, lows[i]))
return {"highs": swing_highs, "lows": swing_lows}
def _analyze_structure(self, df, swings):
if not swings['highs'] or not swings['lows']: return {"trend": "neutral"}
last_high = swings['highs'][-1][1]; prev_high = swings['highs'][-2][1] if len(swings['highs']) > 1 else last_high
last_low = swings['lows'][-1][1]; prev_low = swings['lows'][-2][1] if len(swings['lows']) > 1 else last_low
current_close = df['close'].iloc[-1]
trend = "neutral"
if last_high > prev_high and last_low > prev_low: trend = "bullish"
elif last_high < prev_high and last_low < prev_low: trend = "bearish"
if trend == "bullish" and current_close < last_low: trend = "bearish_change"
elif trend == "bearish" and current_close > last_high: trend = "bullish_change"
return {"trend": trend}
def _detect_order_blocks(self, df):
closes = df['close'].values; opens = df['open'].values; highs = df['high'].values; lows = df['low'].values; current_price = closes[-1]
for i in range(len(df)-3, len(df)-50, -1):
if opens[i-1] > closes[i-1]:
if closes[i] > opens[i] and (closes[i] - opens[i]) > (opens[i-1] - closes[i-1]):
ob_high = highs[i-1]; ob_low = lows[i-1]; violated = False
for k in range(i+1, len(df)):
if closes[k] < ob_low: violated = True; break
if not violated and ob_low <= current_price <= ob_high * 1.01: return {"signal": "buy", "reason": f"Retesting Bullish OB from index {i-1}", "level": [ob_low, ob_high]}
if opens[i-1] < closes[i-1]:
if closes[i] < opens[i] and (opens[i] - closes[i]) > (closes[i-1] - opens[i-1]):
ob_high = highs[i-1]; ob_low = lows[i-1]; violated = False
for k in range(i+1, len(df)):
if closes[k] > ob_high: violated = True; break
if not violated and ob_low * 0.99 <= current_price <= ob_high: return {"signal": "sell", "reason": f"Retesting Bearish OB from index {i-1}", "level": [ob_low, ob_high]}
return {"signal": "neutral", "reason": "", "level": []}
def _detect_fvg(self, df):
highs = df['high'].values; lows = df['low'].values; closes = df['close'].values; current_price = closes[-1]; point = 0.00001
for i in range(len(df)-1, len(df)-20, -1):
if lows[i] > highs[i-2]:
gap_top = lows[i]; gap_bot = highs[i-2]; mitigated = False
for k in range(i+1, len(df)):
if lows[k] < gap_top: mitigated = True
if (not mitigated or (mitigated and current_price >= gap_bot)) and gap_bot <= current_price <= gap_top: return {"signal": "buy", "reason": f"In Bullish FVG {i}", "zone": [gap_bot, gap_top]}
if highs[i] < lows[i-2]:
gap_top = lows[i-2]; gap_bot = highs[i]; mitigated = False
for k in range(i+1, len(df)):
if highs[k] > gap_bot: mitigated = True
if (not mitigated or (mitigated and current_price <= gap_top)) and gap_bot <= current_price <= gap_top: return {"signal": "sell", "reason": f"In Bearish FVG {i}", "zone": [gap_bot, gap_top]}
return {"signal": "neutral", "reason": "", "zone": []}
def _detect_liquidity_sweeps(self, df, swings):
if not swings['highs'] or not swings['lows']: return {"signal": "neutral"}
current_high = df['high'].iloc[-1]; current_low = df['low'].iloc[-1]; current_close = df['close'].iloc[-1]
last_swing_high = swings['highs'][-1][1]; last_swing_low = swings['lows'][-1][1]
if current_high > last_swing_high and current_close < last_swing_high: return {"signal": "sell", "reason": "Buy-side Liquidity Sweep"}
if current_low < last_swing_low and current_close > last_swing_low: return {"signal": "buy", "reason": "Sell-side Liquidity Sweep"}
return {"signal": "neutral", "reason": ""}
def _analyze_premium_discount(self, df, swings):
if not swings['highs'] or not swings['lows']: return {"zone": "equilibrium"}
recent_range_high = max(x[1] for x in swings['highs'][-3:]); recent_range_low = min(x[1] for x in swings['lows'][-3:])
current_price = df['close'].iloc[-1]; mid_point = (recent_range_high + recent_range_low) / 2
zone = "equilibrium"
if current_price > mid_point: zone = "premium"
elif current_price < mid_point: zone = "discount"
return {"zone": zone, "range_high": recent_range_high, "range_low": recent_range_low, "equilibrium": mid_point}
class MFHAnalyzer:
def __init__(self, input_size=16, learning_rate=0.01):
self.input_size = input_size
self.learning_rate = learning_rate
self.horizon = 5
self.ma_period = 5
self.weights = np.random.randn(input_size) * np.sqrt(1 / input_size)
self.bias = 0.0
self.last_features = None
self.last_prediction = 0.0
self.count = 0
self.mean = np.zeros(input_size)
self.m2 = np.zeros(input_size)
def calculate_features(self, df):
if len(df) < (self.ma_period + self.horizon + 1): return None
closes = df['close'].values; opens = df['open'].values; highs = df['high'].values; lows = df['low'].values
ma_close = df['close'].rolling(window=self.ma_period).mean().values
ma_open = df['open'].rolling(window=self.ma_period).mean().values
ma_high = df['high'].rolling(window=self.ma_period).mean().values
ma_low = df['low'].rolling(window=self.ma_period).mean().values
curr = -1; prev_h = -1 - self.horizon
features = np.zeros(16)
features[0] = closes[curr]; features[1] = opens[curr]; features[2] = highs[curr]; features[3] = lows[curr]
features[4] = ma_close[curr]; features[5] = ma_open[curr]; features[6] = ma_high[curr]; features[7] = ma_low[curr]
features[8] = opens[curr] - opens[prev_h]; features[9] = highs[curr] - highs[prev_h]
features[10] = lows[curr] - lows[prev_h]; features[11] = closes[curr] - closes[prev_h]
features[12] = ma_close[curr] - ma_close[prev_h]; features[13] = ma_open[curr] - ma_open[prev_h]
features[14] = ma_high[curr] - ma_high[prev_h]; features[15] = ma_low[curr] - ma_low[prev_h]
if self.count == 0: self.mean = features.copy(); self.m2 = np.zeros_like(features)
else:
delta = features - self.mean; self.mean += delta / (self.count + 1)
delta2 = features - self.mean; self.m2 += delta * delta2
self.count += 1
if self.count < 2: std = np.ones_like(features)
else: variance = self.m2 / (self.count - 1); std = np.sqrt(variance); std[std == 0] = 1.0
return (features - self.mean) / std
def predict(self, df):
features = self.calculate_features(df)
if features is None: return {"signal": "neutral", "slope": 0.0}
self.last_features = features
prediction = np.dot(self.weights, features) + self.bias
self.last_prediction = prediction
slope = prediction
signal = "buy" if slope > 0.001 else "sell" if slope < -0.001 else "neutral"
return {"signal": signal, "slope": float(slope), "features": features.tolist() if features is not None else []}
def train(self, current_price_change):
if self.last_features is None: return
target = current_price_change
error = target - self.last_prediction
self.weights += self.learning_rate * error * self.last_features
self.bias += self.learning_rate * error
return error
class MTFAnalyzer:
def __init__(self):
pass
def analyze(self, current_df, htf_df):
curr_trend = self._get_trend(current_df)
htf_trend = self._get_trend(htf_df)
signal = "neutral"; strength = 0; reason = ""
if curr_trend == htf_trend and curr_trend != 0: signal = "buy" if curr_trend > 0 else "sell"; strength = 80; reason = "MTF Alignment (Trend confirmed)"
elif htf_trend != 0: reason = f"MTF Divergence: HTF {htf_trend}, Curr {curr_trend}"
return {"signal": signal, "strength": strength, "reason": reason, "htf_trend": htf_trend, "curr_trend": curr_trend}
def _get_trend(self, df):
ema20 = df['close'].ewm(span=20).mean().iloc[-1]; ema50 = df['close'].ewm(span=50).mean().iloc[-1]; close = df['close'].iloc[-1]
if close > ema20 > ema50: return 1
if close < ema20 < ema50: return -1
return 0
class MatrixMLAnalyzer:
def __init__(self, input_size=10, learning_rate=0.01):
self.input_size = input_size
self.learning_rate = learning_rate
self.weights = np.random.randn(input_size) * np.sqrt(1 / input_size)
self.bias = 0.0
self.last_inputs = None
self.last_prediction = 0.0
def sigmoid(self, x): return 1 / (1 + np.exp(-x))
def sigmoid_derivative(self, x): s = self.sigmoid(x); return s * (1 - s)
def tanh(self, x): return np.tanh(x)
def tanh_derivative(self, x): return 1.0 - np.tanh(x)**2
def predict(self, tick_data_or_returns):
# Compatible with both tick list (Gold) and direct returns array (Crypto)
if isinstance(tick_data_or_returns, list) and len(tick_data_or_returns) > 0 and isinstance(tick_data_or_returns[0], dict):
prices = np.array([t['ask'] for t in tick_data_or_returns])
returns = np.diff(prices)
elif isinstance(tick_data_or_returns, (np.ndarray, list, pd.Series)):
returns = np.array(tick_data_or_returns)
else:
return {"signal": "neutral", "strength": 0.0, "raw_output": 0.0}
if len(returns) < self.input_size: return {"signal": "neutral", "strength": 0.0, "raw_output": 0.0}
features = returns[-self.input_size:]
std = np.std(features)
if std > 0: features = features / std
else: features = np.zeros_like(features)
self.last_inputs = features
linear_output = np.dot(self.weights, features) + self.bias
prediction = self.tanh(linear_output)
self.last_prediction = prediction
signal = "neutral"; strength = abs(prediction) * 100
if prediction > 0.1: signal = "buy"
elif prediction < -0.1: signal = "sell"
return {"signal": signal, "strength": float(strength), "raw_output": float(prediction)}
def train(self, actual_price_change):
if self.last_inputs is None: return
target = 1.0 if actual_price_change > 0 else -1.0
if actual_price_change == 0: target = 0.0
error = target - self.last_prediction
derivative = self.tanh_derivative(self.last_prediction)
self.weights += self.learning_rate * error * derivative * self.last_inputs
self.bias += self.learning_rate * error * derivative
return error
class AdvancedMarketAnalysisAdapter(AdvancedMarketAnalysis):
def analyze_full(self, df: pd.DataFrame, params: Dict[str, any] = None) -> Dict[str, any]:
if df is None or len(df) < 50: return None
params = params or {}
try:
indicators = self.calculate_technical_indicators(df)
regime = self.detect_market_regime(df)
levels = self.generate_support_resistance(df)
risk = self.calculate_risk_metrics(df)
signal_info = self.generate_signal_from_indicators(indicators)
summary = self.generate_analysis_summary(df)
ifvg = self.analyze_ifvg(df, min_gap_points=params.get('ifvg_gap', 10))
rvgi_cci = self.analyze_rvgi_cci_strategy(df, sma_period=params.get('rvgi_sma', 10), cci_period=params.get('rvgi_cci', 14))
return {
"indicators": indicators,
"regime": regime,
"levels": levels,
"risk": risk,
"signal_info": signal_info,
"summary": summary,
"ifvg": ifvg,
"rvgi_cci": rvgi_cci
}
except Exception as e:
logging.error(f"Full Analysis failed: {e}")
return None