mql5/crypto/advanced_analysis.py

651 lines
35 KiB
Python
Raw Permalink Normal View History

import pandas as pd
import numpy as np
from typing import Dict, List, Optional
import warnings
import logging
warnings.filterwarnings('ignore')
class PEMAnalyzer:
"""
Price Equation Model (PEM) Analyzer
Based on PulseEquation EA logic: Polynomial Prediction + Trend Filter
"""
def __init__(self):
# Coefficients from MQL5 (g_coeffs)
self.coeffs = [0.2752466, 0.01058082, 0.55162082, 0.03687016, 0.27721318, 0.1483476, 0.0008025]
def predict_price(self, price_t1, price_t2):
"""
Calculate predicted price using the polynomial equation
"""
c = self.coeffs
prediction = (c[0] * price_t1 + # Linear t-1
c[1] * (price_t1 ** 2) + # Quadratic t-1
c[2] * price_t2 + # Linear t-2
c[3] * (price_t2 ** 2) + # Quadratic t-2
c[4] * (price_t1 - price_t2) + # Price change
c[5] * np.sin(price_t1) + # Cyclic
c[6]) # Constant
return prediction
def analyze(self, df: pd.DataFrame, ma_fast_period=108, ma_slow_period=60, adx_threshold=20) -> Dict[str, any]:
if len(df) < max(ma_fast_period, ma_slow_period) + 5:
return {"signal": "neutral", "strength": 0, "reason": "Insufficient Data"}
# 1. Price Prediction
# Get prices t-1 and t-2 (assuming df.iloc[-1] is current forming candle, so t-1 is -2)
price_t1 = df['close'].iloc[-2]
price_t2 = df['close'].iloc[-3]
current_price = df['close'].iloc[-1] # Approximation of current Ask/Bid
predicted_price = self.predict_price(price_t1, price_t2)
# Raw Signal
raw_signal = "neutral"
if predicted_price > current_price:
raw_signal = "buy"
elif predicted_price < current_price:
raw_signal = "sell"
# 2. Trend Filter (ADX + MA)
# Calculate MAs
ma_fast = df['close'].rolling(window=ma_fast_period).mean().iloc[-2] # Completed candle
ma_slow = df['close'].rolling(window=ma_slow_period).mean().iloc[-2]
# Calculate ADX (simplified or reuse existing)
adx = self._calculate_adx(df).iloc[-2]
is_strong_trend = adx >= adx_threshold
is_uptrend = ma_fast > ma_slow
is_downtrend = ma_fast < ma_slow
final_signal = "neutral"
strength = 0
reason = []
if raw_signal == "buy":
if is_strong_trend and is_uptrend:
final_signal = "buy"
strength = 85
reason.append(f"PEM: Predicted {predicted_price:.4f} > Curr {current_price:.4f} + Strong Uptrend (ADX={adx:.1f})")
else:
reason.append(f"PEM: Raw Buy filtered (Trend mismatch)")
elif raw_signal == "sell":
if is_strong_trend and is_downtrend:
final_signal = "sell"
strength = 85
reason.append(f"PEM: Predicted {predicted_price:.4f} < Curr {current_price:.4f} + Strong Downtrend (ADX={adx:.1f})")
else:
reason.append(f"PEM: Raw Sell filtered (Trend mismatch)")
return {
"signal": final_signal,
"strength": strength,
"reason": "; ".join(reason),
"prediction": predicted_price,
"adx": adx,
"trend": "bullish" if is_uptrend else "bearish"
}
def _calculate_adx(self, df, period=14):
highs = df['high']
lows = df['low']
closes = df['close']
plus_dm = highs.diff()
minus_dm = -lows.diff()
plus_dm[plus_dm < 0] = 0
minus_dm[minus_dm < 0] = 0
tr1 = highs - lows
tr2 = abs(highs - closes.shift())
tr3 = abs(lows - closes.shift())
tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
atr = tr.rolling(period).mean()
plus_di = 100 * (plus_dm.rolling(period).mean() / atr)
minus_di = 100 * (minus_dm.rolling(period).mean() / atr)
dx = 100 * abs(plus_di - minus_di) / (plus_di + minus_di)
adx = dx.rolling(period).mean()
return adx
class AdvancedMarketAnalysis:
"""
高级市场分析工具类
基于MQL5文章中的最佳实践利用Python的Pandas和NumPy库
提供更强大的技术分析和机器学习功能
"""
def __init__(self):
self.indicators_cache = {}
def calculate_technical_indicators(self, df: pd.DataFrame) -> Dict[str, float]:
if len(df) < 20:
return self._get_default_indicators()
indicators = {}
# SMA
indicators['sma_20'] = df['close'].tail(20).mean()
indicators['sma_50'] = df['close'].tail(50).mean() if len(df) >= 50 else indicators['sma_20']
# EMA
indicators['ema_12'] = df['close'].ewm(span=12).mean().iloc[-1]
indicators['ema_26'] = df['close'].ewm(span=26).mean().iloc[-1]
# MACD
ema_12 = df['close'].ewm(span=12).mean()
ema_26 = df['close'].ewm(span=26).mean()
macd_line = ema_12 - ema_26
macd_signal = macd_line.ewm(span=9).mean()
indicators['macd'] = macd_line.iloc[-1]
indicators['macd_signal'] = macd_signal.iloc[-1]
indicators['macd_histogram'] = indicators['macd'] - indicators['macd_signal']
# RSI
delta = df['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / loss
indicators['rsi'] = 100 - (100 / (1 + rs.iloc[-1])) if not pd.isna(loss.iloc[-1]) and loss.iloc[-1] != 0 else 50
# Bollinger Bands
bb_period = 20
bb_std = 2
sma = df['close'].rolling(window=bb_period).mean()
std = df['close'].rolling(window=bb_period).std()
indicators['bb_upper'] = sma.iloc[-1] + (std.iloc[-1] * bb_std)
indicators['bb_lower'] = sma.iloc[-1] - (std.iloc[-1] * bb_std)
indicators['bb_middle'] = sma.iloc[-1]
# ATR
high_low = df['high'] - df['low']
high_close_prev = abs(df['high'] - df['close'].shift())
low_close_prev = abs(df['low'] - df['close'].shift())
true_range = pd.concat([high_low, high_close_prev, low_close_prev], axis=1).max(axis=1)
indicators['atr'] = true_range.rolling(window=14).mean().iloc[-1]
# Volume
indicators['volume_sma'] = df['volume'].tail(20).mean()
indicators['current_volume'] = df['volume'].iloc[-1]
indicators['volume_ratio'] = indicators['current_volume'] / indicators['volume_sma'] if indicators['volume_sma'] > 0 else 1
# Momentum
indicators['momentum_5'] = (df['close'].iloc[-1] / df['close'].iloc[-6] - 1) * 100
indicators['momentum_10'] = (df['close'].iloc[-1] / df['close'].iloc[-11] - 1) * 100
return indicators
def detect_market_regime(self, df: pd.DataFrame) -> Dict[str, any]:
if len(df) < 50:
return {"regime": "unknown", "confidence": 0.5, "description": "数据不足"}
returns = df['close'].pct_change().dropna()
volatility = returns.std() * np.sqrt(252)
price_change = (df['close'].iloc[-1] / df['close'].iloc[0] - 1) * 100
highs = df['high']
lows = df['low']
plus_dm = highs.diff()
minus_dm = -lows.diff()
plus_dm[plus_dm < 0] = 0
minus_dm[minus_dm < 0] = 0
tr = pd.concat([highs - lows, abs(highs - df['close'].shift()), abs(lows - df['close'].shift())], axis=1).max(axis=1)
atr = tr.rolling(window=14).mean()
plus_di = 100 * (plus_dm.rolling(window=14).mean() / atr)
minus_di = 100 * (minus_dm.rolling(window=14).mean() / atr)
dx = 100 * abs(plus_di - minus_di) / (plus_di + minus_di)
adx = dx.rolling(window=14).mean()
current_adx = adx.iloc[-1] if not pd.isna(adx.iloc[-1]) else 0
if volatility > 0.3:
regime = "high_volatility"
confidence = min(volatility / 0.5, 0.9)
description = "高波动市场"
elif current_adx > 25:
regime = "trending"
confidence = min(current_adx / 50, 0.9)
description = "趋势市场"
else:
regime = "ranging"
confidence = 0.7
description = "震荡市场"
return {"regime": regime, "confidence": confidence, "description": description, "volatility": volatility, "adx": current_adx, "price_change": price_change}
def generate_support_resistance(self, df: pd.DataFrame, lookback_period: int = 100) -> Dict[str, List[float]]:
if len(df) < lookback_period: lookback_period = len(df)
recent_data = df.tail(lookback_period)
recent_high = recent_data['high'].max()
recent_low = recent_data['low'].min()
price_range = recent_high - recent_low
current_price = df['close'].iloc[-1]
support_levels = []
for i in range(1, 4):
level = current_price - (price_range * 0.1 * i)
if level > recent_low: support_levels.append(float(round(level, 4)))
resistance_levels = []
for i in range(1, 4):
level = current_price + (price_range * 0.1 * i)
if level < recent_high: resistance_levels.append(float(round(level, 4)))
if not support_levels: support_levels = [float(round(recent_low, 4))]
if not resistance_levels: resistance_levels = [float(round(recent_high, 4))]
return {"support_levels": sorted(support_levels), "resistance_levels": sorted(resistance_levels), "current_price": current_price}
def calculate_risk_metrics(self, df: pd.DataFrame) -> Dict[str, float]:
if len(df) < 30: return self._get_default_risk_metrics()
returns = df['close'].pct_change().dropna()
metrics = {
"volatility": returns.std() * np.sqrt(252),
"sharpe_ratio": returns.mean() / returns.std() * np.sqrt(252) if returns.std() > 0 else 0,
"max_drawdown": self._calculate_max_drawdown(df['close']),
"var_95": np.percentile(returns, 5) * 100,
"expected_shortfall": returns[returns <= np.percentile(returns, 5)].mean() * 100,
"skewness": returns.skew(),
"kurtosis": returns.kurtosis()
}
return metrics
def _calculate_max_drawdown(self, prices: pd.Series) -> float:
cumulative_returns = (1 + prices.pct_change()).cumprod()
running_max = cumulative_returns.expanding().max()
drawdown = (cumulative_returns - running_max) / running_max
return drawdown.min() * 100
def _get_default_indicators(self) -> Dict[str, float]:
return {"sma_20": 0, "sma_50": 0, "ema_12": 0, "ema_26": 0, "macd": 0, "macd_signal": 0, "macd_histogram": 0, "rsi": 50, "bb_upper": 0, "bb_lower": 0, "bb_middle": 0, "atr": 0, "volume_sma": 0, "current_volume": 0, "volume_ratio": 1, "momentum_5": 0, "momentum_10": 0}
def generate_signal_from_indicators(self, indicators: Dict[str, float]) -> Dict[str, any]:
signal = "hold"; strength = 0; reasons = []
if indicators['macd'] > indicators['macd_signal'] and indicators['macd_histogram'] > 0: strength += 25; reasons.append("MACD金叉")
elif indicators['macd'] < indicators['macd_signal'] and indicators['macd_histogram'] < 0: strength -= 25; reasons.append("MACD死叉")
if indicators['rsi'] < 30: strength += 20; reasons.append("RSI超卖")
elif indicators['rsi'] > 70: strength -= 20; reasons.append("RSI超买")
if indicators['ema_12'] > indicators['ema_26']: strength += 15; reasons.append("EMA金叉")
elif indicators['ema_12'] < indicators['ema_26']: strength -= 15; reasons.append("EMA死叉")
current_price = indicators.get('current_price', indicators['sma_20'])
if current_price < indicators['bb_lower']: strength += 15; reasons.append("价格触及布林带下轨")
elif current_price > indicators['bb_upper']: strength -= 15; reasons.append("价格触及布林带上轨")
if indicators['volume_ratio'] > 1.5: strength += 10; reasons.append("成交量放大")
if strength >= 40: signal = "buy"
elif strength <= -40: signal = "sell"
return {"signal": signal, "strength": abs(strength), "reasons": reasons, "confidence": min(abs(strength) / 100, 1.0)}
def _get_default_risk_metrics(self) -> Dict[str, float]:
return {"volatility": 0.2, "sharpe_ratio": 0, "max_drawdown": 0, "var_95": -2, "expected_shortfall": -3, "skewness": 0, "kurtosis": 0}
def generate_analysis_summary(self, df: pd.DataFrame) -> Dict[str, any]:
if len(df) < 20: return {"summary": "数据不足", "recommendation": "hold", "confidence": 0.0}
indicators = self.calculate_technical_indicators(df)
market_regime = self.detect_market_regime(df)
risk_metrics = self.calculate_risk_metrics(df)
support_resistance = self.generate_support_resistance(df)
signal_info = self.generate_signal_from_indicators(indicators)
current_price = df['close'].iloc[-1]
recommendation = "持有"
if signal_info['signal'] == 'buy': recommendation = "强烈买入" if signal_info['strength'] > 60 else "买入"
elif signal_info['signal'] == 'sell': recommendation = "强烈卖出" if signal_info['strength'] > 60 else "卖出"
return {
"summary": f"当前价格: {current_price:.4f}",
"market_regime": market_regime['description'],
"recommendation": recommendation,
"confidence": signal_info['confidence'],
"risk_level": "" if risk_metrics['volatility'] > 0.3 else "",
"key_indicators": {"RSI": f"{indicators['rsi']:.1f}", "MACD": f"{indicators['macd']:.4f}"},
"support_levels": support_resistance['support_levels'],
"resistance_levels": support_resistance['resistance_levels']
}
def analyze_ifvg(self, df: pd.DataFrame, min_gap_points: int = 100) -> Dict[str, any]:
if len(df) < 5: return {"signal": "hold", "strength": 0, "reasons": [], "active_zones": []}
lows = df['low'].values; highs = df['high'].values; closes = df['close'].values; times = df.index
point = 1.0; min_gap = min_gap_points * point # For Crypto, points are different. Assuming 1.0 for now or passed value
fvgs = []
for i in range(2, len(df)):
if lows[i] > highs[i-2] and (lows[i] - highs[i-2]) > min_gap:
fvgs.append({'id': f"bull_{i}", 'type': 'bullish', 'top': lows[i], 'bottom': highs[i-2], 'start_time': times[i], 'mitigated': False, 'inverted': False, 'inverted_time': None})
elif lows[i-2] > highs[i] and (lows[i-2] - highs[i]) > min_gap:
fvgs.append({'id': f"bear_{i}", 'type': 'bearish', 'top': lows[i-2], 'bottom': highs[i], 'start_time': times[i], 'mitigated': False, 'inverted': False, 'inverted_time': None})
current_low = lows[i]; current_high = highs[i]; current_close = closes[i]
for fvg in fvgs:
if fvg['inverted']: continue
if not fvg['mitigated']:
if fvg['type'] == 'bullish' and current_low < fvg['bottom']: fvg['mitigated'] = True
elif fvg['type'] == 'bearish' and current_high > fvg['top']: fvg['mitigated'] = True
if fvg['mitigated']:
if fvg['type'] == 'bullish' and current_close < fvg['bottom']: fvg['inverted'] = True; fvg['inverted_time'] = times[i]
elif fvg['type'] == 'bearish' and current_close > fvg['top']: fvg['inverted'] = True; fvg['inverted_time'] = times[i]
last_time = times[-1]; signal = "hold"; strength = 0; reasons = []
latest_inversions = [f for f in fvgs if f['inverted'] and f['inverted_time'] == last_time]
for inv in latest_inversions:
if inv['type'] == 'bearish': signal = "buy"; strength = 80; reasons.append("IFVG Bullish Inversion")
elif inv['type'] == 'bullish': signal = "sell"; strength = 80; reasons.append("IFVG Bearish Inversion")
return {"signal": signal, "strength": strength, "reasons": reasons, "active_zones": [f for f in fvgs if f['start_time'] > times[-50]]}
def analyze_rvgi_cci_strategy(self, df: pd.DataFrame, sma_period: int = 30, cci_period: int = 14) -> Dict[str, any]:
if len(df) < max(sma_period, cci_period) + 5: return {"signal": "hold", "strength": 0, "reasons": []}
closes = df['close']; highs = df['high']; lows = df['low']; opens = df['open']
sma = closes.rolling(window=sma_period).mean()
tp = (highs + lows + closes) / 3
sma_tp = tp.rolling(window=cci_period).mean()
mad = tp.rolling(window=cci_period).apply(lambda x: np.mean(np.abs(x - np.mean(x))), raw=True).replace(0, 0.000001)
cci = (tp - sma_tp) / (0.015 * mad)
co = closes - opens; hl = highs - lows
num_val = (co + 2 * co.shift(1) + 2 * co.shift(2) + co.shift(3)) / 6
den_val = (hl + 2 * hl.shift(1) + 2 * hl.shift(2) + hl.shift(3)) / 6
rvi_period = 10
rvi_num = num_val.rolling(window=rvi_period).mean()
rvi_den = den_val.rolling(window=rvi_period).mean().replace(0, 0.000001)
rvi_main = (rvi_num / rvi_den).fillna(0)
rvi_signal = ((rvi_main + 2 * rvi_main.shift(1) + 2 * rvi_main.shift(2) + rvi_main.shift(3)) / 6).fillna(0)
curr = -1; prev = -2
price = closes.iloc[curr]; sma_val = sma.iloc[curr]; cci_now = cci.iloc[curr]
rvi_m_now = rvi_main.iloc[curr]; rvi_s_now = rvi_signal.iloc[curr]
rvi_m_prev = rvi_main.iloc[prev]; rvi_s_prev = rvi_signal.iloc[prev]
price_above_sma = price > sma_val; price_below_sma = price < sma_val
rvi_cross_up = (rvi_m_prev <= rvi_s_prev) and (rvi_m_now > rvi_s_now)
rvi_cross_down = (rvi_m_prev >= rvi_s_prev) and (rvi_m_now < rvi_s_now)
cci_buy = cci_now <= -100; cci_sell = cci_now >= 100
signal = "hold"; strength = 0; reasons = []
if price_above_sma and cci_sell and rvi_cross_down: signal = "sell"; strength = 75; reasons.append("RVGI死叉+CCI超买+SMA之上")
elif price_below_sma and cci_buy and rvi_cross_up: signal = "buy"; strength = 75; reasons.append("RVGI金叉+CCI超卖+SMA之下")
return {"signal": signal, "strength": strength, "reasons": reasons, "indicators": {"sma": sma_val, "cci": cci_now}}
def analyze_crt_strategy(self, df: pd.DataFrame, range_period: int = 15, confirm_period: int = 1, min_manipulation_pct: float = 5.0) -> Dict[str, any]:
if len(df) < range_period + confirm_period + 5: return {"signal": "hold", "strength": 0, "reasons": []}
prev_candle = df.iloc[-2]
range_high = prev_candle['high']; range_low = prev_candle['low']
range_open = prev_candle['open']; range_close = prev_candle['close']
is_bullish_range = range_close > range_open
current_candle = df.iloc[-1]
current_high = current_candle['high']; current_low = current_candle['low']; current_close = current_candle['close']
breakout = False; manipulation = False; manipulation_depth = 0.0
range_size = range_high - range_low
if range_size == 0: range_size = 0.0001
signal = "hold"; strength = 0; reasons = []
if is_bullish_range:
if current_low < range_low:
breakout = True; manipulation_depth = range_low - current_low
pct = (manipulation_depth / range_size) * 100
if current_close > range_low:
manipulation = True
if pct >= min_manipulation_pct: signal = "buy"; strength = 80; reasons.append(f"CRT Bullish: Downward Manipulation ({pct:.1f}%) & Reclaim")
else:
if current_high > range_high:
breakout = True; manipulation_depth = current_high - range_high
pct = (manipulation_depth / range_size) * 100
if current_close < range_high:
manipulation = True
if pct >= min_manipulation_pct: signal = "sell"; strength = 80; reasons.append(f"CRT Bearish: Upward Manipulation ({pct:.1f}%) & Reclaim")
return {"signal": signal, "strength": strength, "reasons": reasons, "range_info": {"high": range_high, "low": range_low, "direction": "bullish" if is_bullish_range else "bearish", "manipulation_pct": (manipulation_depth / range_size * 100) if breakout else 0}}
class SMCAnalyzer:
def __init__(self):
self.last_structure = "neutral"
self.ma_period = 200
self.swing_lookback = 5
self.atr_threshold = 0.002
self.allow_bos = True; self.allow_ob = True; self.allow_fvg = True; self.use_sentiment = True
def analyze(self, df: pd.DataFrame) -> Dict[str, any]:
if len(df) < 50: return {"signal": "neutral", "structure": "neutral", "reason": "Insufficient Data"}
swings = self._detect_swings(df)
structure = self._analyze_structure(df, swings)
ob_info = self._detect_order_blocks(df)
fvg_info = self._detect_fvg(df)
sweeps = self._detect_liquidity_sweeps(df, swings)
pd_info = self._analyze_premium_discount(df, swings)
signal = "neutral"; strength = 0; reasons = []
is_bullish = structure['trend'] == 'bullish'
is_bearish = structure['trend'] == 'bearish'
in_discount = pd_info['zone'] == 'discount'
in_premium = pd_info['zone'] == 'premium'
if is_bullish and in_discount:
if ob_info['signal'] == 'buy': signal = 'buy'; strength += 40; reasons.append(f"Bullish OB in Discount: {ob_info['reason']}")
if sweeps['signal'] == 'buy': signal = 'buy'; strength += 30; reasons.append(f"Liquidity Sweep (Sell-side): {sweeps['reason']}")
if fvg_info['signal'] == 'buy': signal = 'buy'; strength += 20; reasons.append(f"Bullish FVG Retest: {fvg_info['reason']}")
elif is_bearish and in_premium:
if ob_info['signal'] == 'sell': signal = 'sell'; strength += 40; reasons.append(f"Bearish OB in Premium: {ob_info['reason']}")
if sweeps['signal'] == 'sell': signal = 'sell'; strength += 30; reasons.append(f"Liquidity Sweep (Buy-side): {sweeps['reason']}")
if fvg_info['signal'] == 'sell': signal = 'sell'; strength += 20; reasons.append(f"Bearish FVG Retest: {fvg_info['reason']}")
return {"signal": signal, "strength": strength, "structure": structure['trend'], "reason": "; ".join(reasons), "details": {"swings": swings, "ob": ob_info, "fvg": fvg_info, "sweeps": sweeps, "premium_discount": pd_info}}
def _detect_swings(self, df):
highs = df['high'].values; lows = df['low'].values; n = len(df)
swing_highs = []; swing_lows = []
for i in range(5, n-5):
if all(highs[i] >= highs[i-j] for j in range(1, 6)) and all(highs[i] > highs[i+j] for j in range(1, 6)): swing_highs.append((i, highs[i]))
if all(lows[i] <= lows[i-j] for j in range(1, 6)) and all(lows[i] < lows[i+j] for j in range(1, 6)): swing_lows.append((i, lows[i]))
return {"highs": swing_highs, "lows": swing_lows}
def _analyze_structure(self, df, swings):
if not swings['highs'] or not swings['lows']: return {"trend": "neutral"}
last_high = swings['highs'][-1][1]; prev_high = swings['highs'][-2][1] if len(swings['highs']) > 1 else last_high
last_low = swings['lows'][-1][1]; prev_low = swings['lows'][-2][1] if len(swings['lows']) > 1 else last_low
current_close = df['close'].iloc[-1]
trend = "neutral"
if last_high > prev_high and last_low > prev_low: trend = "bullish"
elif last_high < prev_high and last_low < prev_low: trend = "bearish"
if trend == "bullish" and current_close < last_low: trend = "bearish_change"
elif trend == "bearish" and current_close > last_high: trend = "bullish_change"
return {"trend": trend}
def _detect_order_blocks(self, df):
closes = df['close'].values; opens = df['open'].values; highs = df['high'].values; lows = df['low'].values; current_price = closes[-1]
for i in range(len(df)-3, len(df)-50, -1):
if opens[i-1] > closes[i-1]:
if closes[i] > opens[i] and (closes[i] - opens[i]) > (opens[i-1] - closes[i-1]):
ob_high = highs[i-1]; ob_low = lows[i-1]; violated = False
for k in range(i+1, len(df)):
if closes[k] < ob_low: violated = True; break
if not violated and ob_low <= current_price <= ob_high * 1.01: return {"signal": "buy", "reason": f"Retesting Bullish OB from index {i-1}", "level": [ob_low, ob_high]}
if opens[i-1] < closes[i-1]:
if closes[i] < opens[i] and (opens[i] - closes[i]) > (closes[i-1] - opens[i-1]):
ob_high = highs[i-1]; ob_low = lows[i-1]; violated = False
for k in range(i+1, len(df)):
if closes[k] > ob_high: violated = True; break
if not violated and ob_low * 0.99 <= current_price <= ob_high: return {"signal": "sell", "reason": f"Retesting Bearish OB from index {i-1}", "level": [ob_low, ob_high]}
return {"signal": "neutral", "reason": "", "level": []}
def _detect_fvg(self, df):
highs = df['high'].values; lows = df['low'].values; closes = df['close'].values; current_price = closes[-1]; point = 0.00001
for i in range(len(df)-1, len(df)-20, -1):
if lows[i] > highs[i-2]:
gap_top = lows[i]; gap_bot = highs[i-2]; mitigated = False
for k in range(i+1, len(df)):
if lows[k] < gap_top: mitigated = True
if (not mitigated or (mitigated and current_price >= gap_bot)) and gap_bot <= current_price <= gap_top: return {"signal": "buy", "reason": f"In Bullish FVG {i}", "zone": [gap_bot, gap_top]}
if highs[i] < lows[i-2]:
gap_top = lows[i-2]; gap_bot = highs[i]; mitigated = False
for k in range(i+1, len(df)):
if highs[k] > gap_bot: mitigated = True
if (not mitigated or (mitigated and current_price <= gap_top)) and gap_bot <= current_price <= gap_top: return {"signal": "sell", "reason": f"In Bearish FVG {i}", "zone": [gap_bot, gap_top]}
return {"signal": "neutral", "reason": "", "zone": []}
def _detect_liquidity_sweeps(self, df, swings):
if not swings['highs'] or not swings['lows']: return {"signal": "neutral"}
current_high = df['high'].iloc[-1]; current_low = df['low'].iloc[-1]; current_close = df['close'].iloc[-1]
last_swing_high = swings['highs'][-1][1]; last_swing_low = swings['lows'][-1][1]
if current_high > last_swing_high and current_close < last_swing_high: return {"signal": "sell", "reason": "Buy-side Liquidity Sweep"}
if current_low < last_swing_low and current_close > last_swing_low: return {"signal": "buy", "reason": "Sell-side Liquidity Sweep"}
return {"signal": "neutral", "reason": ""}
def _analyze_premium_discount(self, df, swings):
if not swings['highs'] or not swings['lows']: return {"zone": "equilibrium"}
recent_range_high = max(x[1] for x in swings['highs'][-3:]); recent_range_low = min(x[1] for x in swings['lows'][-3:])
current_price = df['close'].iloc[-1]; mid_point = (recent_range_high + recent_range_low) / 2
zone = "equilibrium"
if current_price > mid_point: zone = "premium"
elif current_price < mid_point: zone = "discount"
return {"zone": zone, "range_high": recent_range_high, "range_low": recent_range_low, "equilibrium": mid_point}
class MFHAnalyzer:
def __init__(self, input_size=16, learning_rate=0.01):
self.input_size = input_size
self.learning_rate = learning_rate
self.horizon = 5
self.ma_period = 5
self.weights = np.random.randn(input_size) * np.sqrt(1 / input_size)
self.bias = 0.0
self.last_features = None
self.last_prediction = 0.0
self.count = 0
self.mean = np.zeros(input_size)
self.m2 = np.zeros(input_size)
def calculate_features(self, df):
if len(df) < (self.ma_period + self.horizon + 1): return None
closes = df['close'].values; opens = df['open'].values; highs = df['high'].values; lows = df['low'].values
ma_close = df['close'].rolling(window=self.ma_period).mean().values
ma_open = df['open'].rolling(window=self.ma_period).mean().values
ma_high = df['high'].rolling(window=self.ma_period).mean().values
ma_low = df['low'].rolling(window=self.ma_period).mean().values
curr = -1; prev_h = -1 - self.horizon
features = np.zeros(16)
features[0] = closes[curr]; features[1] = opens[curr]; features[2] = highs[curr]; features[3] = lows[curr]
features[4] = ma_close[curr]; features[5] = ma_open[curr]; features[6] = ma_high[curr]; features[7] = ma_low[curr]
features[8] = opens[curr] - opens[prev_h]; features[9] = highs[curr] - highs[prev_h]
features[10] = lows[curr] - lows[prev_h]; features[11] = closes[curr] - closes[prev_h]
features[12] = ma_close[curr] - ma_close[prev_h]; features[13] = ma_open[curr] - ma_open[prev_h]
features[14] = ma_high[curr] - ma_high[prev_h]; features[15] = ma_low[curr] - ma_low[prev_h]
if self.count == 0: self.mean = features.copy(); self.m2 = np.zeros_like(features)
else:
delta = features - self.mean; self.mean += delta / (self.count + 1)
delta2 = features - self.mean; self.m2 += delta * delta2
self.count += 1
if self.count < 2: std = np.ones_like(features)
else: variance = self.m2 / (self.count - 1); std = np.sqrt(variance); std[std == 0] = 1.0
return (features - self.mean) / std
def predict(self, df):
features = self.calculate_features(df)
if features is None: return {"signal": "neutral", "slope": 0.0}
self.last_features = features
prediction = np.dot(self.weights, features) + self.bias
self.last_prediction = prediction
slope = prediction
signal = "buy" if slope > 0.001 else "sell" if slope < -0.001 else "neutral"
return {"signal": signal, "slope": float(slope), "features": features.tolist() if features is not None else []}
def train(self, current_price_change):
if self.last_features is None: return
target = current_price_change
error = target - self.last_prediction
self.weights += self.learning_rate * error * self.last_features
self.bias += self.learning_rate * error
return error
class MTFAnalyzer:
def __init__(self):
pass
def analyze(self, current_df, htf_df):
curr_trend = self._get_trend(current_df)
htf_trend = self._get_trend(htf_df)
signal = "neutral"; strength = 0; reason = ""
if curr_trend == htf_trend and curr_trend != 0: signal = "buy" if curr_trend > 0 else "sell"; strength = 80; reason = "MTF Alignment (Trend confirmed)"
elif htf_trend != 0: reason = f"MTF Divergence: HTF {htf_trend}, Curr {curr_trend}"
return {"signal": signal, "strength": strength, "reason": reason, "htf_trend": htf_trend, "curr_trend": curr_trend}
def _get_trend(self, df):
ema20 = df['close'].ewm(span=20).mean().iloc[-1]; ema50 = df['close'].ewm(span=50).mean().iloc[-1]; close = df['close'].iloc[-1]
if close > ema20 > ema50: return 1
if close < ema20 < ema50: return -1
return 0
class MatrixMLAnalyzer:
def __init__(self, input_size=10, learning_rate=0.01):
self.input_size = input_size
self.learning_rate = learning_rate
self.weights = np.random.randn(input_size) * np.sqrt(1 / input_size)
self.bias = 0.0
self.last_inputs = None
self.last_prediction = 0.0
def sigmoid(self, x): return 1 / (1 + np.exp(-x))
def sigmoid_derivative(self, x): s = self.sigmoid(x); return s * (1 - s)
def tanh(self, x): return np.tanh(x)
def tanh_derivative(self, x): return 1.0 - np.tanh(x)**2
def predict(self, tick_data_or_returns):
# Compatible with both tick list (Gold) and direct returns array (Crypto)
if isinstance(tick_data_or_returns, list) and len(tick_data_or_returns) > 0 and isinstance(tick_data_or_returns[0], dict):
prices = np.array([t['ask'] for t in tick_data_or_returns])
returns = np.diff(prices)
elif isinstance(tick_data_or_returns, (np.ndarray, list, pd.Series)):
returns = np.array(tick_data_or_returns)
else:
return {"signal": "neutral", "strength": 0.0, "raw_output": 0.0}
if len(returns) < self.input_size: return {"signal": "neutral", "strength": 0.0, "raw_output": 0.0}
features = returns[-self.input_size:]
std = np.std(features)
if std > 0: features = features / std
else: features = np.zeros_like(features)
self.last_inputs = features
linear_output = np.dot(self.weights, features) + self.bias
prediction = self.tanh(linear_output)
self.last_prediction = prediction
signal = "neutral"; strength = abs(prediction) * 100
if prediction > 0.1: signal = "buy"
elif prediction < -0.1: signal = "sell"
return {"signal": signal, "strength": float(strength), "raw_output": float(prediction)}
def train(self, actual_price_change):
if self.last_inputs is None: return
target = 1.0 if actual_price_change > 0 else -1.0
if actual_price_change == 0: target = 0.0
error = target - self.last_prediction
derivative = self.tanh_derivative(self.last_prediction)
self.weights += self.learning_rate * error * derivative * self.last_inputs
self.bias += self.learning_rate * error * derivative
return error
class AdvancedMarketAnalysisAdapter(AdvancedMarketAnalysis):
def analyze_full(self, df: pd.DataFrame, params: Dict[str, any] = None) -> Dict[str, any]:
if df is None or len(df) < 50: return None
params = params or {}
try:
indicators = self.calculate_technical_indicators(df)
regime = self.detect_market_regime(df)
levels = self.generate_support_resistance(df)
risk = self.calculate_risk_metrics(df)
signal_info = self.generate_signal_from_indicators(indicators)
summary = self.generate_analysis_summary(df)
ifvg = self.analyze_ifvg(df, min_gap_points=params.get('ifvg_gap', 10))
rvgi_cci = self.analyze_rvgi_cci_strategy(df, sma_period=params.get('rvgi_sma', 10), cci_period=params.get('rvgi_cci', 14))
return {
"indicators": indicators,
"regime": regime,
"levels": levels,
"risk": risk,
"signal_info": signal_info,
"summary": summary,
"ifvg": ifvg,
"rvgi_cci": rvgi_cci
}
except Exception as e:
logging.error(f"Full Analysis failed: {e}")
return None