802 lines
31 KiB
Python
802 lines
31 KiB
Python
# technical_indicators_smc.py
|
|
# Pure Smart Money Concepts (SMC) Analysis - No Traditional Indicators
|
|
# Focus on Market Structure, Liquidity, and Institutional Behavior
|
|
|
|
import pandas as pd
|
|
import numpy as np
|
|
import logging
|
|
from typing import List, Dict, Any, Optional, Tuple, Union
|
|
from functools import lru_cache
|
|
from datetime import datetime, timedelta
|
|
from enum import Enum
|
|
|
|
# =========================
|
|
# SMC Enums & Constants
|
|
# =========================
|
|
|
|
class MarketStructure(Enum):
|
|
BOS_BULLISH = "BOS_BULLISH" # Break of Structure Bullish
|
|
BOS_BEARISH = "BOS_BEARISH" # Break of Structure Bearish
|
|
CHOCH_BULLISH = "CHOCH_BULLISH" # Change of Character Bullish
|
|
CHOCH_BEARISH = "CHOCH_BEARISH" # Change of Character Bearish
|
|
HH = "HH" # Higher High
|
|
HL = "HL" # Higher Low
|
|
LH = "LH" # Lower High
|
|
LL = "LL" # Lower Low
|
|
RANGE = "RANGE" # Ranging Market
|
|
|
|
class LiquidityType(Enum):
|
|
BSL = "BSL" # Buy Side Liquidity
|
|
SSL = "SSL" # Sell Side Liquidity
|
|
EQH = "EQH" # Equal Highs
|
|
EQL = "EQL" # Equal Lows
|
|
LIQUIDITY_SWEEP = "LIQUIDITY_SWEEP"
|
|
LIQUIDITY_GRAB = "LIQUIDITY_GRAB"
|
|
|
|
class OrderBlockType(Enum):
|
|
BULLISH_OB = "BULLISH_OB" # Bullish Order Block
|
|
BEARISH_OB = "BEARISH_OB" # Bearish Order Block
|
|
MITIGATION_BLOCK = "MITIGATION_BLOCK" # Mitigated Order Block
|
|
BREAKER_BLOCK = "BREAKER_BLOCK" # Breaker Block
|
|
|
|
class FVGType(Enum):
|
|
BULLISH_FVG = "BULLISH_FVG" # Bullish Fair Value Gap
|
|
BEARISH_FVG = "BEARISH_FVG" # Bearish Fair Value Gap
|
|
BALANCED_PRICE_RANGE = "BALANCED_PRICE_RANGE" # Balanced Price Range
|
|
|
|
# =========================
|
|
# Core SMC Utilities
|
|
# =========================
|
|
|
|
def calculate_atr(df: pd.DataFrame, period: int = 14) -> float:
|
|
"""Calculate Average True Range for volatility measurement"""
|
|
if len(df) < period + 1:
|
|
return 0.0
|
|
|
|
high_low = df['high'] - df['low']
|
|
high_close = (df['high'] - df['close'].shift()).abs()
|
|
low_close = (df['low'] - df['close'].shift()).abs()
|
|
|
|
true_range = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1)
|
|
return true_range.rolling(window=period).mean().iloc[-1]
|
|
|
|
def identify_swing_points(df: pd.DataFrame, swing_length: int = 5) -> Tuple[pd.DataFrame, pd.DataFrame]:
|
|
"""Identify swing highs and lows with improved accuracy"""
|
|
if len(df) < swing_length * 2 + 1:
|
|
return pd.DataFrame(), pd.DataFrame()
|
|
|
|
df = df.copy()
|
|
|
|
# Enhanced swing detection with confirmation
|
|
swing_high_condition = (
|
|
(df['high'].rolling(window=swing_length*2+1, center=True).max() == df['high']) &
|
|
(df['high'].shift(1) < df['high']) & (df['high'].shift(-1) < df['high'])
|
|
)
|
|
|
|
swing_low_condition = (
|
|
(df['low'].rolling(window=swing_length*2+1, center=True).min() == df['low']) &
|
|
(df['low'].shift(1) > df['low']) & (df['low'].shift(-1) > df['low'])
|
|
)
|
|
|
|
swing_highs = df[swing_high_condition].copy()
|
|
swing_lows = df[swing_low_condition].copy()
|
|
|
|
# Add additional context
|
|
if not swing_highs.empty:
|
|
# Calculate strength for swing highs (simple approach)
|
|
swing_highs_copy = swing_highs.copy()
|
|
strength_list = []
|
|
for idx, row in swing_highs_copy.iterrows():
|
|
strength = row['high'] - min(row['open'], row['close'])
|
|
strength_list.append(strength)
|
|
swing_highs_copy['strength'] = strength_list
|
|
swing_highs = swing_highs_copy
|
|
|
|
if not swing_lows.empty:
|
|
# Calculate strength for swing lows (simple approach)
|
|
swing_lows_copy = swing_lows.copy()
|
|
strength_list = []
|
|
for idx, row in swing_lows_copy.iterrows():
|
|
strength = max(row['open'], row['close']) - row['low']
|
|
strength_list.append(strength)
|
|
swing_lows_copy['strength'] = strength_list
|
|
swing_lows = swing_lows_copy
|
|
|
|
return swing_highs, swing_lows
|
|
|
|
# =========================
|
|
# Market Structure Analysis
|
|
# =========================
|
|
|
|
def analyze_market_structure(df: pd.DataFrame, swing_length: int = 5) -> Dict[str, Any]:
|
|
"""Comprehensive market structure analysis using pure SMC concepts"""
|
|
swing_highs, swing_lows = identify_swing_points(df, swing_length)
|
|
|
|
if swing_highs.empty or swing_lows.empty:
|
|
return {
|
|
'structure_type': MarketStructure.RANGE.value,
|
|
'trend_direction': 'NEUTRAL',
|
|
'bos_detected': False,
|
|
'choch_detected': False,
|
|
'last_swing_high': None,
|
|
'last_swing_low': None,
|
|
'structure_strength': 0.0
|
|
}
|
|
|
|
# Get recent swings
|
|
recent_highs = swing_highs.tail(3)
|
|
recent_lows = swing_lows.tail(3)
|
|
|
|
structure_signals = []
|
|
|
|
# Analyze Higher Highs and Higher Lows (Bullish Structure)
|
|
if len(recent_highs) >= 2:
|
|
if recent_highs.iloc[-1]['high'] > recent_highs.iloc[-2]['high']:
|
|
structure_signals.append('HH')
|
|
|
|
if len(recent_lows) >= 2:
|
|
if recent_lows.iloc[-1]['low'] > recent_lows.iloc[-2]['low']:
|
|
structure_signals.append('HL')
|
|
|
|
# Analyze Lower Highs and Lower Lows (Bearish Structure)
|
|
if len(recent_highs) >= 2:
|
|
if recent_highs.iloc[-1]['high'] < recent_highs.iloc[-2]['high']:
|
|
structure_signals.append('LH')
|
|
|
|
if len(recent_lows) >= 2:
|
|
if recent_lows.iloc[-1]['low'] < recent_lows.iloc[-2]['low']:
|
|
structure_signals.append('LL')
|
|
|
|
# Determine overall structure
|
|
if 'HH' in structure_signals and 'HL' in structure_signals:
|
|
structure_type = MarketStructure.BOS_BULLISH.value
|
|
trend_direction = 'BULLISH'
|
|
elif 'LH' in structure_signals and 'LL' in structure_signals:
|
|
structure_type = MarketStructure.BOS_BEARISH.value
|
|
trend_direction = 'BEARISH'
|
|
else:
|
|
structure_type = MarketStructure.RANGE.value
|
|
trend_direction = 'NEUTRAL'
|
|
|
|
# Check for BOS (Break of Structure)
|
|
current_price = df['close'].iloc[-1]
|
|
bos_detected = False
|
|
choch_detected = False
|
|
|
|
if not swing_highs.empty:
|
|
last_significant_high = swing_highs['high'].max()
|
|
if current_price > last_significant_high:
|
|
bos_detected = True
|
|
structure_type = MarketStructure.BOS_BULLISH.value
|
|
|
|
if not swing_lows.empty:
|
|
last_significant_low = swing_lows['low'].min()
|
|
if current_price < last_significant_low:
|
|
bos_detected = True
|
|
structure_type = MarketStructure.BOS_BEARISH.value
|
|
|
|
# Calculate structure strength
|
|
structure_strength = len([s for s in structure_signals if s in ['HH', 'HL', 'LH', 'LL']]) / 4.0
|
|
|
|
return {
|
|
'structure_type': structure_type,
|
|
'trend_direction': trend_direction,
|
|
'bos_detected': bos_detected,
|
|
'choch_detected': choch_detected,
|
|
'last_swing_high': swing_highs['high'].iloc[-1] if not swing_highs.empty else None,
|
|
'last_swing_low': swing_lows['low'].iloc[-1] if not swing_lows.empty else None,
|
|
'structure_strength': structure_strength,
|
|
'structure_signals': structure_signals
|
|
}
|
|
|
|
# =========================
|
|
# Liquidity Analysis
|
|
# =========================
|
|
|
|
def detect_liquidity_zones(df: pd.DataFrame, lookback: int = 20) -> List[Dict[str, Any]]:
|
|
"""Detect liquidity zones (BSL/SSL) where stops are likely placed"""
|
|
swing_highs, swing_lows = identify_swing_points(df)
|
|
liquidity_zones = []
|
|
|
|
if not swing_highs.empty:
|
|
# Buy Side Liquidity (above swing highs)
|
|
recent_highs = swing_highs.tail(lookback)
|
|
for _, high_point in recent_highs.iterrows():
|
|
age = len(df) - df.index.get_loc(high_point.name)
|
|
if age <= lookback: # Only recent zones
|
|
liquidity_zones.append({
|
|
'type': LiquidityType.BSL.value,
|
|
'price': high_point['high'],
|
|
'time': high_point['time'],
|
|
'age': age,
|
|
'strength': high_point.get('strength', 1.0),
|
|
'status': 'ACTIVE'
|
|
})
|
|
|
|
if not swing_lows.empty:
|
|
# Sell Side Liquidity (below swing lows)
|
|
recent_lows = swing_lows.tail(lookback)
|
|
for _, low_point in recent_lows.iterrows():
|
|
age = len(df) - df.index.get_loc(low_point.name)
|
|
if age <= lookback: # Only recent zones
|
|
liquidity_zones.append({
|
|
'type': LiquidityType.SSL.value,
|
|
'price': low_point['low'],
|
|
'time': low_point['time'],
|
|
'age': age,
|
|
'strength': low_point.get('strength', 1.0),
|
|
'status': 'ACTIVE'
|
|
})
|
|
|
|
return sorted(liquidity_zones, key=lambda x: x['age'])
|
|
|
|
def detect_equal_highs_lows(df: pd.DataFrame, tolerance_pips: float = 5.0) -> Dict[str, List[Dict]]:
|
|
"""Detect Equal Highs and Equal Lows (EQH/EQL)"""
|
|
swing_highs, swing_lows = identify_swing_points(df)
|
|
|
|
equal_highs = []
|
|
equal_lows = []
|
|
|
|
# Convert pips to price difference (assuming 4-digit pricing)
|
|
tolerance = tolerance_pips * 0.0001
|
|
|
|
# Find Equal Highs
|
|
if len(swing_highs) >= 2:
|
|
for i in range(len(swing_highs) - 1):
|
|
for j in range(i + 1, len(swing_highs)):
|
|
high1 = swing_highs.iloc[i]['high']
|
|
high2 = swing_highs.iloc[j]['high']
|
|
if abs(high1 - high2) <= tolerance:
|
|
equal_highs.append({
|
|
'type': LiquidityType.EQH.value,
|
|
'price': (high1 + high2) / 2,
|
|
'time1': swing_highs.iloc[i]['time'],
|
|
'time2': swing_highs.iloc[j]['time'],
|
|
'strength': 2.0 # EQH has higher liquidity
|
|
})
|
|
|
|
# Find Equal Lows
|
|
if len(swing_lows) >= 2:
|
|
for i in range(len(swing_lows) - 1):
|
|
for j in range(i + 1, len(swing_lows)):
|
|
low1 = swing_lows.iloc[i]['low']
|
|
low2 = swing_lows.iloc[j]['low']
|
|
if abs(low1 - low2) <= tolerance:
|
|
equal_lows.append({
|
|
'type': LiquidityType.EQL.value,
|
|
'price': (low1 + low2) / 2,
|
|
'time1': swing_lows.iloc[i]['time'],
|
|
'time2': swing_lows.iloc[j]['time'],
|
|
'strength': 2.0 # EQL has higher liquidity
|
|
})
|
|
|
|
return {
|
|
'equal_highs': equal_highs,
|
|
'equal_lows': equal_lows
|
|
}
|
|
|
|
def detect_liquidity_sweeps(df: pd.DataFrame, liquidity_zones: List[Dict]) -> List[Dict[str, Any]]:
|
|
"""Detect when price sweeps liquidity and reverses"""
|
|
sweeps = []
|
|
current_price = df['close'].iloc[-1]
|
|
recent_candles = df.tail(10) # Look at recent 10 candles
|
|
|
|
for zone in liquidity_zones:
|
|
if zone['status'] != 'ACTIVE':
|
|
continue
|
|
|
|
zone_price = zone['price']
|
|
zone_type = zone['type']
|
|
|
|
# Check for sweep in recent candles
|
|
for i, (idx, candle) in enumerate(recent_candles.iterrows()):
|
|
if zone_type == LiquidityType.BSL.value:
|
|
# Buy Side Liquidity sweep (price goes above then reverses)
|
|
if candle['high'] > zone_price and candle['close'] < zone_price:
|
|
# Confirm reversal in next candles
|
|
if i < len(recent_candles) - 2:
|
|
next_candle = recent_candles.iloc[i + 1]
|
|
if next_candle['close'] < candle['close']:
|
|
sweeps.append({
|
|
'type': LiquidityType.LIQUIDITY_SWEEP.value,
|
|
'direction': 'BEARISH',
|
|
'swept_price': zone_price,
|
|
'sweep_time': candle['time'],
|
|
'strength': zone['strength'],
|
|
'reversal_confirmed': True
|
|
})
|
|
|
|
elif zone_type == LiquidityType.SSL.value:
|
|
# Sell Side Liquidity sweep (price goes below then reverses)
|
|
if candle['low'] < zone_price and candle['close'] > zone_price:
|
|
# Confirm reversal in next candles
|
|
if i < len(recent_candles) - 2:
|
|
next_candle = recent_candles.iloc[i + 1]
|
|
if next_candle['close'] > candle['close']:
|
|
sweeps.append({
|
|
'type': LiquidityType.LIQUIDITY_SWEEP.value,
|
|
'direction': 'BULLISH',
|
|
'swept_price': zone_price,
|
|
'sweep_time': candle['time'],
|
|
'strength': zone['strength'],
|
|
'reversal_confirmed': True
|
|
})
|
|
|
|
return sweeps
|
|
|
|
# =========================
|
|
# Order Block Analysis
|
|
# =========================
|
|
|
|
def detect_order_blocks(df: pd.DataFrame, lookback: int = 50) -> List[Dict[str, Any]]:
|
|
"""Detect Order Blocks using pure SMC methodology"""
|
|
order_blocks = []
|
|
|
|
if len(df) < lookback:
|
|
return order_blocks
|
|
|
|
recent_data = df.tail(lookback)
|
|
atr = calculate_atr(df)
|
|
|
|
for i in range(1, len(recent_data) - 1):
|
|
current = recent_data.iloc[i]
|
|
prev_candle = recent_data.iloc[i - 1]
|
|
next_candle = recent_data.iloc[i + 1] if i + 1 < len(recent_data) else None
|
|
|
|
# Bullish Order Block Detection
|
|
if (current['close'] < current['open'] and # Bearish candle
|
|
next_candle is not None and
|
|
next_candle['close'] > current['high']): # Next candle breaks high
|
|
|
|
# Confirm it's a strong move
|
|
body_size = abs(current['open'] - current['close'])
|
|
if body_size > atr * 0.3: # Significant candle size
|
|
order_blocks.append({
|
|
'type': OrderBlockType.BULLISH_OB.value,
|
|
'high': current['high'],
|
|
'low': current['low'],
|
|
'open': current['open'],
|
|
'close': current['close'],
|
|
'time': current['time'],
|
|
'strength': body_size / atr,
|
|
'status': 'ACTIVE',
|
|
'tested': False
|
|
})
|
|
|
|
# Bearish Order Block Detection
|
|
if (current['close'] > current['open'] and # Bullish candle
|
|
next_candle is not None and
|
|
next_candle['close'] < current['low']): # Next candle breaks low
|
|
|
|
# Confirm it's a strong move
|
|
body_size = abs(current['open'] - current['close'])
|
|
if body_size > atr * 0.3: # Significant candle size
|
|
order_blocks.append({
|
|
'type': OrderBlockType.BEARISH_OB.value,
|
|
'high': current['high'],
|
|
'low': current['low'],
|
|
'open': current['open'],
|
|
'close': current['close'],
|
|
'time': current['time'],
|
|
'strength': body_size / atr,
|
|
'status': 'ACTIVE',
|
|
'tested': False
|
|
})
|
|
|
|
return order_blocks
|
|
|
|
def check_order_block_mitigation(df: pd.DataFrame, order_blocks: List[Dict]) -> List[Dict]:
|
|
"""Check if order blocks have been mitigated (tested)"""
|
|
current_price = df['close'].iloc[-1]
|
|
recent_lows = df.tail(20)['low'].min()
|
|
recent_highs = df.tail(20)['high'].max()
|
|
|
|
for ob in order_blocks:
|
|
if ob['status'] != 'ACTIVE':
|
|
continue
|
|
|
|
ob_high = ob['high']
|
|
ob_low = ob['low']
|
|
|
|
if ob['type'] == OrderBlockType.BULLISH_OB.value:
|
|
# Check if price has returned to test the order block
|
|
if recent_lows <= ob_high and current_price >= ob_low:
|
|
ob['tested'] = True
|
|
if current_price < ob_low: # Failed to hold
|
|
ob['status'] = 'MITIGATED'
|
|
|
|
elif ob['type'] == OrderBlockType.BEARISH_OB.value:
|
|
# Check if price has returned to test the order block
|
|
if recent_highs >= ob_low and current_price <= ob_high:
|
|
ob['tested'] = True
|
|
if current_price > ob_high: # Failed to hold
|
|
ob['status'] = 'MITIGATED'
|
|
|
|
return order_blocks
|
|
|
|
# =========================
|
|
# Fair Value Gap (FVG) Analysis
|
|
# =========================
|
|
|
|
def detect_fair_value_gaps(df: pd.DataFrame, min_gap_atr: float = 0.1) -> List[Dict[str, Any]]:
|
|
"""Detect Fair Value Gaps using SMC methodology"""
|
|
fvgs = []
|
|
|
|
if len(df) < 3:
|
|
return fvgs
|
|
|
|
atr = calculate_atr(df)
|
|
min_gap_size = atr * min_gap_atr
|
|
|
|
for i in range(1, len(df) - 1):
|
|
candle1 = df.iloc[i - 1]
|
|
candle2 = df.iloc[i]
|
|
candle3 = df.iloc[i + 1]
|
|
|
|
# Bullish FVG: candle1 low > candle3 high
|
|
bullish_gap = candle1['low'] - candle3['high']
|
|
if bullish_gap > min_gap_size:
|
|
fvgs.append({
|
|
'type': FVGType.BULLISH_FVG.value,
|
|
'top': candle1['low'],
|
|
'bottom': candle3['high'],
|
|
'time': candle2['time'],
|
|
'gap_size': bullish_gap,
|
|
'strength': bullish_gap / atr,
|
|
'status': 'ACTIVE',
|
|
'filled': False
|
|
})
|
|
|
|
# Bearish FVG: candle1 high < candle3 low
|
|
bearish_gap = candle3['low'] - candle1['high']
|
|
if bearish_gap > min_gap_size:
|
|
fvgs.append({
|
|
'type': FVGType.BEARISH_FVG.value,
|
|
'top': candle3['low'],
|
|
'bottom': candle1['high'],
|
|
'time': candle2['time'],
|
|
'gap_size': bearish_gap,
|
|
'strength': bearish_gap / atr,
|
|
'status': 'ACTIVE',
|
|
'filled': False
|
|
})
|
|
|
|
return fvgs
|
|
|
|
def check_fvg_filling(df: pd.DataFrame, fvgs: List[Dict]) -> List[Dict]:
|
|
"""Check if Fair Value Gaps have been filled"""
|
|
recent_highs = df.tail(10)['high']
|
|
recent_lows = df.tail(10)['low']
|
|
|
|
for fvg in fvgs:
|
|
if fvg['status'] != 'ACTIVE':
|
|
continue
|
|
|
|
fvg_top = fvg['top']
|
|
fvg_bottom = fvg['bottom']
|
|
|
|
if fvg['type'] == FVGType.BULLISH_FVG.value:
|
|
# Check if price has filled the gap
|
|
if any(low <= fvg_bottom for low in recent_lows):
|
|
fvg['filled'] = True
|
|
fvg['status'] = 'FILLED'
|
|
|
|
elif fvg['type'] == FVGType.BEARISH_FVG.value:
|
|
# Check if price has filled the gap
|
|
if any(high >= fvg_top for high in recent_highs):
|
|
fvg['filled'] = True
|
|
fvg['status'] = 'FILLED'
|
|
|
|
return fvgs
|
|
|
|
# =========================
|
|
# Change of Character (CHoCH)
|
|
# =========================
|
|
|
|
def detect_change_of_character(df: pd.DataFrame, structure_data: Dict) -> Optional[Dict[str, Any]]:
|
|
"""Detect Change of Character in market structure"""
|
|
if not structure_data.get('structure_signals'):
|
|
return None
|
|
|
|
recent_structure = structure_data['structure_signals']
|
|
current_trend = structure_data['trend_direction']
|
|
|
|
# CHoCH occurs when we break the previous structure
|
|
swing_highs, swing_lows = identify_swing_points(df)
|
|
|
|
if swing_highs.empty or swing_lows.empty:
|
|
return None
|
|
|
|
current_price = df['close'].iloc[-1]
|
|
|
|
# Bullish CHoCH: In bearish trend, we break previous higher low
|
|
if current_trend == 'BEARISH' and len(swing_lows) >= 2:
|
|
prev_low = swing_lows.iloc[-2]['low']
|
|
last_low = swing_lows.iloc[-1]['low']
|
|
|
|
if last_low > prev_low and current_price > last_low:
|
|
return {
|
|
'type': MarketStructure.CHOCH_BULLISH.value,
|
|
'price': last_low,
|
|
'time': df.iloc[-1]['time'],
|
|
'strength': (current_price - last_low) / calculate_atr(df),
|
|
'previous_trend': 'BEARISH',
|
|
'new_trend': 'BULLISH'
|
|
}
|
|
|
|
# Bearish CHoCH: In bullish trend, we break previous lower high
|
|
if current_trend == 'BULLISH' and len(swing_highs) >= 2:
|
|
prev_high = swing_highs.iloc[-2]['high']
|
|
last_high = swing_highs.iloc[-1]['high']
|
|
|
|
if last_high < prev_high and current_price < last_high:
|
|
return {
|
|
'type': MarketStructure.CHOCH_BEARISH.value,
|
|
'price': last_high,
|
|
'time': df.iloc[-1]['time'],
|
|
'strength': (last_high - current_price) / calculate_atr(df),
|
|
'previous_trend': 'BULLISH',
|
|
'new_trend': 'BEARISH'
|
|
}
|
|
|
|
return None
|
|
|
|
# =========================
|
|
# Premium/Discount Zones
|
|
# =========================
|
|
|
|
def calculate_premium_discount_zones(df: pd.DataFrame, lookback: int = 100) -> Dict[str, Any]:
|
|
"""Calculate Premium and Discount zones based on recent price action"""
|
|
if len(df) < lookback:
|
|
lookback = len(df)
|
|
|
|
recent_data = df.tail(lookback)
|
|
high = recent_data['high'].max()
|
|
low = recent_data['low'].min()
|
|
range_size = high - low
|
|
|
|
# Standard SMC levels
|
|
premium_zone = {
|
|
'top': high,
|
|
'bottom': high - (range_size * 0.2), # Top 20%
|
|
'mid': high - (range_size * 0.1) # Top 10%
|
|
}
|
|
|
|
equilibrium_zone = {
|
|
'top': low + (range_size * 0.6),
|
|
'bottom': low + (range_size * 0.4),
|
|
'mid': low + (range_size * 0.5) # 50% level
|
|
}
|
|
|
|
discount_zone = {
|
|
'top': low + (range_size * 0.2), # Bottom 20%
|
|
'bottom': low,
|
|
'mid': low + (range_size * 0.1) # Bottom 10%
|
|
}
|
|
|
|
current_price = df['close'].iloc[-1]
|
|
|
|
# Determine current zone
|
|
if current_price >= premium_zone['bottom']:
|
|
current_zone = 'PREMIUM'
|
|
elif current_price <= discount_zone['top']:
|
|
current_zone = 'DISCOUNT'
|
|
else:
|
|
current_zone = 'EQUILIBRIUM'
|
|
|
|
return {
|
|
'premium': premium_zone,
|
|
'equilibrium': equilibrium_zone,
|
|
'discount': discount_zone,
|
|
'current_zone': current_zone,
|
|
'range_high': high,
|
|
'range_low': low
|
|
}
|
|
|
|
# =========================
|
|
# Main SMC Analysis Function
|
|
# =========================
|
|
|
|
def analyze_smc_full(df: pd.DataFrame, config: Dict[str, Any] = None) -> Dict[str, Any]:
|
|
"""Complete SMC analysis combining all elements"""
|
|
if config is None:
|
|
config = {
|
|
'swing_length': 5,
|
|
'liquidity_lookback': 20,
|
|
'order_block_lookback': 50,
|
|
'min_gap_atr': 0.1,
|
|
'eq_tolerance_pips': 5.0
|
|
}
|
|
|
|
# Core analysis
|
|
structure = analyze_market_structure(df, config['swing_length'])
|
|
liquidity_zones = detect_liquidity_zones(df, config['liquidity_lookback'])
|
|
equal_levels = detect_equal_highs_lows(df, config['eq_tolerance_pips'])
|
|
liquidity_sweeps = detect_liquidity_sweeps(df, liquidity_zones)
|
|
|
|
order_blocks = detect_order_blocks(df, config['order_block_lookback'])
|
|
order_blocks = check_order_block_mitigation(df, order_blocks)
|
|
|
|
fvgs = detect_fair_value_gaps(df, config['min_gap_atr'])
|
|
fvgs = check_fvg_filling(df, fvgs)
|
|
|
|
choch = detect_change_of_character(df, structure)
|
|
zones = calculate_premium_discount_zones(df)
|
|
|
|
# Calculate overall bias
|
|
bullish_signals = 0
|
|
bearish_signals = 0
|
|
|
|
# Structure bias
|
|
if structure['trend_direction'] == 'BULLISH':
|
|
bullish_signals += 3
|
|
elif structure['trend_direction'] == 'BEARISH':
|
|
bearish_signals += 3
|
|
|
|
# Liquidity sweep bias
|
|
for sweep in liquidity_sweeps:
|
|
if sweep['direction'] == 'BULLISH':
|
|
bullish_signals += 2
|
|
elif sweep['direction'] == 'BEARISH':
|
|
bearish_signals += 2
|
|
|
|
# CHoCH bias
|
|
if choch:
|
|
if 'BULLISH' in choch['type']:
|
|
bullish_signals += 2
|
|
elif 'BEARISH' in choch['type']:
|
|
bearish_signals += 2
|
|
|
|
# Zone bias
|
|
if zones['current_zone'] == 'DISCOUNT':
|
|
bullish_signals += 1 # Discount = potential buy zone
|
|
elif zones['current_zone'] == 'PREMIUM':
|
|
bearish_signals += 1 # Premium = potential sell zone
|
|
|
|
# Determine overall bias
|
|
total_signals = bullish_signals + bearish_signals
|
|
if total_signals > 0:
|
|
bullish_percentage = bullish_signals / total_signals
|
|
if bullish_percentage >= 0.7:
|
|
overall_bias = 'STRONG_BULLISH'
|
|
elif bullish_percentage >= 0.6:
|
|
overall_bias = 'BULLISH'
|
|
elif bullish_percentage <= 0.3:
|
|
overall_bias = 'STRONG_BEARISH'
|
|
elif bullish_percentage <= 0.4:
|
|
overall_bias = 'BEARISH'
|
|
else:
|
|
overall_bias = 'NEUTRAL'
|
|
else:
|
|
overall_bias = 'NEUTRAL'
|
|
|
|
return {
|
|
'timestamp': df.iloc[-1]['time'] if 'time' in df.columns else datetime.now(),
|
|
'current_price': df['close'].iloc[-1],
|
|
'atr': calculate_atr(df),
|
|
'overall_bias': overall_bias,
|
|
'bias_strength': max(bullish_signals, bearish_signals),
|
|
'structure': structure,
|
|
'liquidity_zones': liquidity_zones,
|
|
'equal_levels': equal_levels,
|
|
'liquidity_sweeps': liquidity_sweeps,
|
|
'order_blocks': [ob for ob in order_blocks if ob['status'] == 'ACTIVE'],
|
|
'fair_value_gaps': [fvg for fvg in fvgs if fvg['status'] == 'ACTIVE'],
|
|
'change_of_character': choch,
|
|
'premium_discount_zones': zones,
|
|
'bullish_signals': bullish_signals,
|
|
'bearish_signals': bearish_signals
|
|
}
|
|
|
|
# =========================
|
|
# Entry Point Calculation
|
|
# =========================
|
|
|
|
def calculate_optimal_entry_exit(smc_analysis: Dict[str, Any], risk_reward_ratio: float = 2.0) -> Dict[str, Any]:
|
|
"""Calculate optimal entry and exit points based on SMC analysis"""
|
|
current_price = smc_analysis['current_price']
|
|
atr = smc_analysis['atr']
|
|
bias = smc_analysis['overall_bias']
|
|
|
|
if 'BULLISH' in bias:
|
|
# Look for bullish entry opportunities
|
|
entry_zones = []
|
|
|
|
# Order blocks as entry zones
|
|
for ob in smc_analysis['order_blocks']:
|
|
if ob['type'] == OrderBlockType.BULLISH_OB.value and not ob['tested']:
|
|
entry_zones.append({
|
|
'type': 'ORDER_BLOCK',
|
|
'entry_price': (ob['high'] + ob['low']) / 2,
|
|
'stop_loss': ob['low'] - (atr * 0.5),
|
|
'confidence': ob['strength']
|
|
})
|
|
|
|
# Fair Value Gaps as entry zones
|
|
for fvg in smc_analysis['fair_value_gaps']:
|
|
if fvg['type'] == FVGType.BULLISH_FVG.value and not fvg['filled']:
|
|
entry_price = (fvg['top'] + fvg['bottom']) / 2
|
|
stop_loss = fvg['bottom'] - (atr * 0.5)
|
|
take_profit = entry_price + (abs(entry_price - stop_loss) * risk_reward_ratio)
|
|
|
|
entry_zones.append({
|
|
'type': 'FAIR_VALUE_GAP',
|
|
'entry_price': entry_price,
|
|
'stop_loss': stop_loss,
|
|
'take_profit': take_profit,
|
|
'confidence': fvg['strength']
|
|
})
|
|
|
|
# Discount zones as entry areas
|
|
if smc_analysis['premium_discount_zones']['current_zone'] == 'DISCOUNT':
|
|
discount_zone = smc_analysis['premium_discount_zones']['discount']
|
|
entry_price = discount_zone['mid']
|
|
stop_loss = discount_zone['bottom'] - (atr * 0.5)
|
|
take_profit = entry_price + (abs(entry_price - stop_loss) * risk_reward_ratio)
|
|
|
|
entry_zones.append({
|
|
'type': 'DISCOUNT_ZONE',
|
|
'entry_price': entry_price,
|
|
'stop_loss': stop_loss,
|
|
'take_profit': take_profit,
|
|
'confidence': 1.5
|
|
})
|
|
|
|
return {
|
|
'direction': 'BUY',
|
|
'entry_zones': sorted(entry_zones, key=lambda x: x['confidence'], reverse=True)
|
|
}
|
|
|
|
elif 'BEARISH' in bias:
|
|
# Look for bearish entry opportunities
|
|
entry_zones = []
|
|
|
|
# Order blocks as entry zones
|
|
for ob in smc_analysis['order_blocks']:
|
|
if ob['type'] == OrderBlockType.BEARISH_OB.value and not ob['tested']:
|
|
entry_zones.append({
|
|
'type': 'ORDER_BLOCK',
|
|
'entry_price': (ob['high'] + ob['low']) / 2,
|
|
'stop_loss': ob['high'] + (atr * 0.5),
|
|
'confidence': ob['strength']
|
|
})
|
|
|
|
# Fair Value Gaps as entry zones
|
|
for fvg in smc_analysis['fair_value_gaps']:
|
|
if fvg['type'] == FVGType.BEARISH_FVG.value and not fvg['filled']:
|
|
entry_price = (fvg['top'] + fvg['bottom']) / 2
|
|
stop_loss = fvg['top'] + (atr * 0.5)
|
|
take_profit = entry_price - (abs(stop_loss - entry_price) * risk_reward_ratio)
|
|
|
|
entry_zones.append({
|
|
'type': 'FAIR_VALUE_GAP',
|
|
'entry_price': entry_price,
|
|
'stop_loss': stop_loss,
|
|
'take_profit': take_profit,
|
|
'confidence': fvg['strength']
|
|
})
|
|
|
|
# Premium zones as entry areas
|
|
if smc_analysis['premium_discount_zones']['current_zone'] == 'PREMIUM':
|
|
premium_zone = smc_analysis['premium_discount_zones']['premium']
|
|
entry_price = premium_zone['mid']
|
|
stop_loss = premium_zone['top'] + (atr * 0.5)
|
|
take_profit = entry_price - (abs(stop_loss - entry_price) * risk_reward_ratio)
|
|
|
|
entry_zones.append({
|
|
'type': 'PREMIUM_ZONE',
|
|
'entry_price': entry_price,
|
|
'stop_loss': stop_loss,
|
|
'take_profit': take_profit,
|
|
'confidence': 1.5
|
|
})
|
|
|
|
return {
|
|
'direction': 'SELL',
|
|
'entry_zones': sorted(entry_zones, key=lambda x: x['confidence'], reverse=True)
|
|
}
|
|
|
|
return {
|
|
'direction': 'WAIT',
|
|
'entry_zones': []
|
|
}
|