1368 lines
No EOL
54 KiB
Python
1368 lines
No EOL
54 KiB
Python
"""
|
|
technical_indicators.py (refactor advanced)
|
|
===========================================
|
|
|
|
Versi refaktor: multi-zone detection, scoring, pattern, boundary, pivot,
|
|
feature extractor, zone plotting & event logging.
|
|
"""
|
|
|
|
import pandas as pd
|
|
import numpy as np
|
|
from typing import List, Dict, Any, Optional, Tuple
|
|
import matplotlib.pyplot as plt
|
|
|
|
|
|
# ========== ZONE & STRUCTURE ==========
|
|
|
|
def calculate_atr_dynamic(df: pd.DataFrame, period: int = 14) -> float:
|
|
if len(df) < period + 1:
|
|
return 0.0
|
|
tr = pd.concat([
|
|
df['high'] - df['low'],
|
|
abs(df['high'] - df['close'].shift()),
|
|
abs(df['low'] - df['close'].shift())
|
|
], axis=1).max(axis=1)
|
|
atr_val = tr.ewm(span=period, adjust=False).mean().iloc[-1]
|
|
return float(atr_val)
|
|
|
|
|
|
def detect_structure(df: pd.DataFrame, swing_lookback: int = 5) -> Tuple[str, Dict[str, Any]]:
|
|
if len(df) < swing_lookback * 2:
|
|
return "INDECISIVE", {}
|
|
|
|
df = df.copy()
|
|
|
|
# Deteksi Swing Highs and Lows yang lebih robust
|
|
df['is_swing_high'] = (df['high'] == df['high'].rolling(swing_lookback * 2 + 1, center=True).max())
|
|
df['is_swing_low'] = (df['low'] == df['low'].rolling(swing_lookback * 2 + 1, center=True).min())
|
|
|
|
swing_highs = df[df['is_swing_high']]
|
|
swing_lows = df[df['is_swing_low']]
|
|
|
|
res = []
|
|
last_swing_high_price = None
|
|
last_swing_low_price = None
|
|
last_swing_high_idx = None
|
|
last_swing_low_idx = None
|
|
|
|
# Urutkan swing points berdasarkan waktu (index)
|
|
if not swing_highs.empty:
|
|
last_sh = swing_highs.iloc[-1]
|
|
last_swing_high_price = last_sh['high']
|
|
last_swing_high_idx = last_sh.name
|
|
if len(swing_highs) > 1:
|
|
prev_sh = swing_highs.iloc[-2]
|
|
if last_sh['high'] > prev_sh['high']:
|
|
res.append('HH')
|
|
else:
|
|
res.append('LH')
|
|
|
|
if not swing_lows.empty:
|
|
last_sl = swing_lows.iloc[-1]
|
|
last_swing_low_price = last_sl['low']
|
|
last_swing_low_idx = last_sl.name
|
|
if len(swing_lows) > 1:
|
|
prev_sl = swing_lows.iloc[-2]
|
|
if last_sl['low'] > prev_sl['low']:
|
|
res.append('HL')
|
|
else:
|
|
res.append('LL')
|
|
|
|
# --- LOGIKA BARU UNTUK BREAK OF STRUCTURE (BOS) ---
|
|
# Membutuhkan penutupan candle (candle close) untuk konfirmasi
|
|
if last_swing_high_idx is not None:
|
|
# Cek candle SETELAH swing high terakhir
|
|
subsequent_candles = df.loc[last_swing_high_idx:].iloc[1:]
|
|
confirmed_break = subsequent_candles[subsequent_candles['close'] > last_swing_high_price]
|
|
if not confirmed_break.empty:
|
|
res.append("BULLISH_BOS")
|
|
|
|
if last_swing_low_idx is not None:
|
|
# Cek candle SETELAH swing low terakhir
|
|
subsequent_candles = df.loc[last_swing_low_idx:].iloc[1:]
|
|
confirmed_break = subsequent_candles[subsequent_candles['close'] < last_swing_low_price]
|
|
if not confirmed_break.empty:
|
|
res.append("BEARISH_BOS")
|
|
|
|
structure_str = "/".join(sorted(list(set(res)))) if res else "INDECISIVE"
|
|
return structure_str, {'last_high': last_swing_high_price, 'last_low': last_swing_low_price}
|
|
|
|
|
|
def detect_order_blocks_multi(
|
|
df: pd.DataFrame, lookback: int = 15, structure_filter: Optional[str] = None, max_age: int = 20
|
|
) -> List[Dict[str, Any]]:
|
|
order_blocks = []
|
|
now_price = df['close'].iloc[-1]
|
|
atr = calculate_atr_dynamic(df)
|
|
last_idx = df.index[-1]
|
|
for i in range(len(df) - lookback - 2, len(df) - 2):
|
|
candle = df.iloc[i]
|
|
age = last_idx - df.index[i]
|
|
if age > max_age:
|
|
continue
|
|
# Bullish OB
|
|
if candle['close'] < candle['open']:
|
|
found_bos = False
|
|
for j in range(i + 1, min(i + lookback + 1, len(df))):
|
|
if df.iloc[j]['close'] > candle['high']:
|
|
found_bos = True
|
|
break
|
|
if found_bos:
|
|
if structure_filter and structure_filter != "BULLISH_BOS":
|
|
continue
|
|
strength = (abs(candle['open'] - candle['close']) / atr) if atr else 1
|
|
order_blocks.append({
|
|
'type': 'BULLISH_OB',
|
|
'high': float(candle['high']),
|
|
'low': float(candle['low']),
|
|
'time': candle['time'],
|
|
'age': age,
|
|
'strength': strength,
|
|
'distance': abs(now_price - ((candle['open'] + candle['close']) / 2)),
|
|
})
|
|
# Bearish OB
|
|
elif candle['close'] > candle['open']:
|
|
found_bos = False
|
|
for j in range(i + 1, min(i + lookback + 1, len(df))):
|
|
if df.iloc[j]['close'] < candle['low']:
|
|
found_bos = True
|
|
break
|
|
if found_bos:
|
|
if structure_filter and structure_filter != "BEARISH_BOS":
|
|
continue
|
|
strength = (abs(candle['open'] - candle['close']) / atr) if atr else 1
|
|
order_blocks.append({
|
|
'type': 'BEARISH_OB',
|
|
'high': float(candle['high']),
|
|
'low': float(candle['low']),
|
|
'time': candle['time'],
|
|
'age': age,
|
|
'strength': strength,
|
|
'distance': abs(now_price - ((candle['open'] + candle['close']) / 2)),
|
|
})
|
|
order_blocks = sorted(order_blocks, key=lambda x: (x['distance'], -x['strength'], x['age']))
|
|
return order_blocks
|
|
|
|
|
|
def detect_fvg_multi(df: pd.DataFrame, min_gap: float = 0.0002, max_age: int = 20) -> List[Dict[str, Any]]:
|
|
fvg_zones = []
|
|
now_price = df['close'].iloc[-1]
|
|
atr = calculate_atr_dynamic(df)
|
|
last_idx = df.index[-1]
|
|
for i in range(1, len(df) - 1):
|
|
c_prev, c_now, c_next = df.iloc[i - 1], df.iloc[i], df.iloc[i + 1]
|
|
age = last_idx - df.index[i]
|
|
if age > max_age:
|
|
continue
|
|
# Bullish FVG
|
|
gap = c_next['low'] - c_prev['high']
|
|
if gap > min_gap:
|
|
strength = gap / atr if atr else 1
|
|
fvg_zones.append({
|
|
'type': 'FVG_BULLISH',
|
|
'start': float(c_prev['high']),
|
|
'end': float(c_next['low']),
|
|
'time': c_now['time'],
|
|
'age': age,
|
|
'strength': strength,
|
|
'distance': abs(now_price - ((c_prev['high'] + c_next['low']) / 2))
|
|
})
|
|
# Bearish FVG
|
|
gap = c_prev['low'] - c_next['high']
|
|
if gap > min_gap:
|
|
strength = gap / atr if atr else 1
|
|
fvg_zones.append({
|
|
'type': 'FVG_BEARISH',
|
|
'start': float(c_next['high']),
|
|
'end': float(c_prev['low']),
|
|
'time': c_now['time'],
|
|
'age': age,
|
|
'strength': strength,
|
|
'distance': abs(now_price - ((c_next['high'] + c_prev['low']) / 2))
|
|
})
|
|
fvg_zones = sorted(fvg_zones, key=lambda x: (x['distance'], -x['strength'], x['age']))
|
|
return fvg_zones
|
|
|
|
|
|
# ========== PATTERN, VOLUME, BOUNDARY, PIVOT ==========
|
|
|
|
def detect_pinbar(df: pd.DataFrame, min_ratio: float = 2.0) -> List[Dict[str, Any]]:
|
|
"""Pinbar: ekor minimal 2x body."""
|
|
pattern = []
|
|
for i in range(2, len(df)):
|
|
c = df.iloc[i]
|
|
body = abs(c['close'] - c['open'])
|
|
upper = c['high'] - max(c['close'], c['open'])
|
|
lower = min(c['close'], c['open']) - c['low']
|
|
# Bullish pinbar
|
|
if lower > min_ratio * body and upper < 0.5 * body:
|
|
pattern.append({'type': 'PINBAR_BULL', 'time': c['time'], 'idx': i})
|
|
# Bearish pinbar
|
|
elif upper > min_ratio * body and lower < 0.5 * body:
|
|
pattern.append({'type': 'PINBAR_BEAR', 'time': c['time'], 'idx': i})
|
|
return pattern
|
|
|
|
|
|
def detect_engulfing(df: pd.DataFrame) -> List[Dict[str, Any]]:
|
|
pattern = []
|
|
for i in range(1, len(df)):
|
|
prev, c = df.iloc[i - 1], df.iloc[i]
|
|
# Bullish engulfing
|
|
if c['close'] > c['open'] and prev['close'] < prev['open'] and \
|
|
c['close'] > prev['open'] and c['open'] < prev['close']:
|
|
pattern.append({'type': 'ENGULFING_BULL', 'time': c['time'], 'idx': i})
|
|
# Bearish engulfing
|
|
elif c['close'] < c['open'] and prev['close'] > prev['open'] and \
|
|
c['close'] < prev['open'] and c['open'] > prev['close']:
|
|
pattern.append({'type': 'ENGULFING_BEAR', 'time': c['time'], 'idx': i})
|
|
return pattern
|
|
|
|
|
|
def detect_volume_spike(df: pd.DataFrame, window: int = 20, threshold: float = 2.0) -> List[Dict[str, Any]]:
|
|
if 'tick_volume' not in df.columns:
|
|
return []
|
|
pattern = []
|
|
vol_ma = df['tick_volume'].rolling(window).mean()
|
|
for i in range(window, len(df)):
|
|
if df['tick_volume'].iloc[i] > threshold * vol_ma.iloc[i]:
|
|
pattern.append({'type': 'VOLUME_SPIKE', 'time': df['time'].iloc[i], 'idx': i})
|
|
return pattern
|
|
|
|
|
|
def get_daily_high_low(df: pd.DataFrame) -> Dict[str, Any]:
|
|
today = pd.to_datetime(df['time'].iloc[-1]).date()
|
|
today_data = df[pd.to_datetime(df['time']).dt.date == today]
|
|
high = today_data['high'].max()
|
|
low = today_data['low'].min()
|
|
last_close = today_data['close'].iloc[-1] if len(today_data) else df['close'].iloc[-1]
|
|
return {
|
|
'daily_high': high,
|
|
'daily_low': low,
|
|
'distance_to_high': abs(last_close - high),
|
|
'distance_to_low': abs(last_close - low)
|
|
}
|
|
|
|
|
|
def get_pivot_points(df: pd.DataFrame) -> Dict[str, float]:
|
|
# Classic pivot, pakai data daily terakhir
|
|
today = pd.to_datetime(df['time'].iloc[-1]).date()
|
|
prev_day = today - pd.Timedelta(days=1)
|
|
prev_data = df[pd.to_datetime(df['time']).dt.date == prev_day]
|
|
if len(prev_data) == 0:
|
|
prev_data = df.iloc[-20:] # fallback: 20 bar terakhir
|
|
high, low, close = prev_data['high'].max(), prev_data['low'].min(), prev_data['close'].iloc[-1]
|
|
pivot = (high + low + close) / 3
|
|
r1 = 2 * pivot - low
|
|
s1 = 2 * pivot - high
|
|
return {'pivot': pivot, 'r1': r1, 's1': s1}
|
|
|
|
|
|
def detect_continuation_patterns(
|
|
df: pd.DataFrame, base_max_candles: int = 4, body_threshold: float = 1.2
|
|
) -> List[Dict[str, Any]]:
|
|
"""Mendeteksi pola RBR (Rally-Base-Rally) dan DBD (Drop-Base-Drop)."""
|
|
patterns = []
|
|
if len(df) < base_max_candles + 2:
|
|
return patterns
|
|
|
|
df = df.copy()
|
|
df['body'] = abs(df['close'] - df['open'])
|
|
avg_body = df['body'].rolling(20).mean()
|
|
|
|
for i in range(len(df) - base_max_candles - 1):
|
|
# Cek RBR (Rally-Base-Rally)
|
|
rally1 = df.iloc[i]
|
|
is_strong_rally1 = rally1['close'] > rally1['open'] and rally1['body'] > avg_body.iloc[i] * body_threshold
|
|
|
|
if is_strong_rally1:
|
|
base_candles = df.iloc[i + 1: i + 1 + base_max_candles]
|
|
base_high = base_candles['high'].max()
|
|
base_low = base_candles['low'].min()
|
|
|
|
# Base harus berada dalam rentang rally1 dan tidak terlalu besar
|
|
if base_high < rally1['high'] + (rally1['body'] * 0.2) and \
|
|
base_low > rally1['low'] - (rally1['body'] * 0.2):
|
|
for j in range(1, base_max_candles + 1):
|
|
rally2_idx = i + j
|
|
if rally2_idx < len(df) - 1:
|
|
rally2 = df.iloc[rally2_idx + 1]
|
|
is_strong_rally2 = rally2['close'] > rally2['open'] and \
|
|
rally2['body'] > avg_body.iloc[rally2_idx + 1] * body_threshold
|
|
if is_strong_rally2 and rally2['close'] > rally1['high']:
|
|
patterns.append({'type': 'RBR', 'time': rally2['time'], 'idx': rally2_idx + 1})
|
|
i = rally2_idx + 1 # Skip untuk menghindari deteksi tumpang tindih
|
|
break
|
|
|
|
# Cek DBD (Drop-Base-Drop)
|
|
drop1 = df.iloc[i]
|
|
is_strong_drop1 = drop1['close'] < drop1['open'] and drop1['body'] > avg_body.iloc[i] * body_threshold
|
|
|
|
if is_strong_drop1:
|
|
base_candles = df.iloc[i + 1: i + 1 + base_max_candles]
|
|
base_high = base_candles['high'].max()
|
|
base_low = base_candles['low'].min()
|
|
|
|
# Base harus berada dalam rentang drop1
|
|
if base_high < drop1['high'] + (drop1['body'] * 0.2) and \
|
|
base_low > drop1['low'] - (drop1['body'] * 0.2):
|
|
for j in range(1, base_max_candles + 1):
|
|
drop2_idx = i + j
|
|
if drop2_idx < len(df) - 1:
|
|
drop2 = df.iloc[drop2_idx + 1]
|
|
is_strong_drop2 = drop2['close'] < drop2['open'] and \
|
|
drop2['body'] > avg_body.iloc[drop2_idx + 1] * body_threshold
|
|
if is_strong_drop2 and drop2['close'] < drop1['low']:
|
|
patterns.append({'type': 'DBD', 'time': drop2['time'], 'idx': drop2_idx + 1})
|
|
i = drop2_idx + 1 # Skip
|
|
break
|
|
|
|
return patterns
|
|
|
|
|
|
# ========== FEATURE EXTRACTOR ==========
|
|
|
|
def extract_features_full(
|
|
df: pd.DataFrame,
|
|
structure: str,
|
|
order_blocks: List[Dict[str, Any]],
|
|
fvg_zones: List[Dict[str, Any]],
|
|
patterns: List[Dict[str, Any]],
|
|
boundary: Dict[str, Any],
|
|
pivot: Dict[str, float]
|
|
) -> np.ndarray:
|
|
"""Keluarkan vektor fitur komprehensif untuk AI/ML."""
|
|
last_price = df['close'].iloc[-1]
|
|
features = [
|
|
last_price,
|
|
calculate_atr_dynamic(df),
|
|
min([ob['distance'] for ob in order_blocks]) if order_blocks else 0,
|
|
max([ob['strength'] for ob in order_blocks]) if order_blocks else 0,
|
|
min([fvg['distance'] for fvg in fvg_zones]) if fvg_zones else 0,
|
|
max([fvg['strength'] for fvg in fvg_zones]) if fvg_zones else 0,
|
|
int(any(p['type'].startswith('ENGULFING') for p in patterns)),
|
|
boundary.get('distance_to_high', 0),
|
|
boundary.get('distance_to_low', 0),
|
|
pivot.get('r1', 0) - last_price,
|
|
pivot.get('s1', 0) - last_price,
|
|
# Tambah fitur lain sesuai kebutuhan...
|
|
]
|
|
return np.array(features, dtype=float)
|
|
|
|
|
|
# ========== TARGET GENERATOR (LABEL OTOMATIS) ==========
|
|
|
|
def generate_label_fvg(
|
|
df: pd.DataFrame, fvg_zones: List[Dict[str, Any]], horizon: int = 10, pip_thresh: float = 0.001
|
|
) -> List[int]:
|
|
"""
|
|
Label 1 jika X bar setelah FVG harga naik/turun >= pip_thresh, else 0.
|
|
"""
|
|
label = [0] * len(df)
|
|
for fvg in fvg_zones:
|
|
idx = df[df['time'] == fvg['time']].index[0]
|
|
base_price = (fvg['start'] + fvg['end']) / 2
|
|
future_idx = min(idx + horizon, len(df) - 1)
|
|
if 'BULLISH' in fvg['type']:
|
|
if df['high'].iloc[future_idx] - base_price >= pip_thresh:
|
|
label[idx] = 1
|
|
elif 'BEARISH' in fvg['type']:
|
|
if base_price - df['low'].iloc[future_idx] >= pip_thresh:
|
|
label[idx] = 1
|
|
return label
|
|
|
|
|
|
# ========== ZONE PLOTTING & EVENT LOGGING ==========
|
|
|
|
def plot_zones(df, order_blocks, fvg_zones, patterns=None):
|
|
plt.figure(figsize=(15, 7))
|
|
plt.plot(df['time'], df['close'], label='Close Price', color='black')
|
|
for ob in order_blocks:
|
|
plt.axhspan(ob['low'], ob['high'], color='green' if 'BULL' in ob['type'] else 'red', alpha=0.25)
|
|
for fvg in fvg_zones:
|
|
plt.axhspan(fvg['start'], fvg['end'], color='blue' if 'BULL' in fvg['type'] else 'orange', alpha=0.15)
|
|
if patterns:
|
|
for p in patterns:
|
|
idx = p['idx']
|
|
marker = '^' if 'BULL' in p['type'] else 'v'
|
|
plt.scatter(df['time'].iloc[idx], df['close'].iloc[idx], marker=marker, color='purple')
|
|
plt.legend()
|
|
plt.title("Price with OB/FVG/Pattern Zones")
|
|
plt.show()
|
|
|
|
|
|
def log_zone_events(order_blocks, fvg_zones, patterns):
|
|
"""Log bar yang trigger event, useful untuk AI/analitik."""
|
|
event_log = []
|
|
for ob in order_blocks:
|
|
event_log.append({'time': ob['time'], 'price': (ob['high'] + ob['low']) / 2, 'type': ob['type']})
|
|
for fvg in fvg_zones:
|
|
event_log.append({'time': fvg['time'], 'price': (fvg['start'] + fvg['end']) / 2, 'type': fvg['type']})
|
|
for p in patterns:
|
|
event_log.append({'time': p['time'], 'price': None, 'type': p['type']})
|
|
event_log = sorted(event_log, key=lambda x: x['time'])
|
|
return event_log
|
|
|
|
|
|
# ========== EQH/EQL & LIQUIDITY SWEEP ==========
|
|
|
|
def detect_eqh_eql(
|
|
df: pd.DataFrame, window: int = 10, tolerance: float = 0.0003
|
|
) -> Tuple[List[Dict[str, float]], List[Dict[str, float]]]:
|
|
if len(df) < window:
|
|
return [], []
|
|
eqh, eql = [], []
|
|
for i in range(window, len(df)):
|
|
hi = df['high'].iloc[i - window:i]
|
|
lo = df['low'].iloc[i - window:i]
|
|
if (hi.max() - hi.min()) < (hi.mean() * tolerance):
|
|
eqh.append({'time': df['time'].iloc[i], 'value': float(hi.mean())})
|
|
if (lo.max() - lo.min()) < (lo.mean() * tolerance):
|
|
eql.append({'time': df['time'].iloc[i], 'value': float(lo.mean())})
|
|
return eqh, eql
|
|
|
|
|
|
def detect_liquidity_sweep(
|
|
df: pd.DataFrame, min_candles_for_swing: int = 5, require_confirmation_candle: bool = True
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Mendeteksi Liquidity Sweep (LS) dengan konfirmasi candle berikutnya.
|
|
Logika diperbaiki untuk efisiensi dan akurasi.
|
|
"""
|
|
liquidity_sweeps = []
|
|
if len(df) < min_candles_for_swing * 2 + 1: # Butuh satu ekstra untuk konfirmasi
|
|
return liquidity_sweeps
|
|
|
|
df = df.copy()
|
|
df['body'] = abs(df['close'] - df['open'])
|
|
avg_body = df['body'].rolling(20).mean()
|
|
|
|
df['is_swing_high'] = (df['high'].rolling(min_candles_for_swing, center=True).max() == df['high'])
|
|
df['is_swing_low'] = (df['low'].rolling(min_candles_for_swing, center=True).min() == df['low'])
|
|
swing_highs_idx = df[df['is_swing_high']].index.tolist()
|
|
swing_lows_idx = df[df['is_swing_low']].index.tolist()
|
|
|
|
start_iter_idx = max(0, len(df) - 50)
|
|
|
|
for i in range(start_iter_idx, len(df) - 1):
|
|
current_candle = df.iloc[i]
|
|
|
|
# Cek Bullish Sweep
|
|
relevant_swing_lows = [sl for sl in swing_lows_idx if sl < i]
|
|
if relevant_swing_lows:
|
|
prev_swing_low_idx = max(relevant_swing_lows)
|
|
prev_swing_low_val = df['low'].loc[prev_swing_low_idx]
|
|
|
|
if current_candle['low'] < prev_swing_low_val and current_candle['close'] > prev_swing_low_val:
|
|
if not require_confirmation_candle:
|
|
is_confirmed = True
|
|
else:
|
|
confirmation_candle = df.iloc[i + 1]
|
|
is_confirmed = confirmation_candle['close'] > confirmation_candle['open'] and \
|
|
confirmation_candle['body'] > avg_body.iloc[i + 1] * 0.8
|
|
|
|
if is_confirmed:
|
|
strength = (current_candle['close'] - prev_swing_low_val)
|
|
strength /= calculate_atr_dynamic(df.loc[:current_candle.name])
|
|
liquidity_sweeps.append({
|
|
'type': 'BULLISH_LS',
|
|
'swept_level': float(prev_swing_low_val),
|
|
'sweep_time': current_candle['time'],
|
|
'strength': strength
|
|
})
|
|
|
|
# Cek Bearish Sweep
|
|
relevant_swing_highs = [sh for sh in swing_highs_idx if sh < i]
|
|
if relevant_swing_highs:
|
|
prev_swing_high_idx = max(relevant_swing_highs)
|
|
prev_swing_high_val = df['high'].loc[prev_swing_high_idx]
|
|
|
|
if current_candle['high'] > prev_swing_high_val and current_candle['close'] < prev_swing_high_val:
|
|
if not require_confirmation_candle:
|
|
is_confirmed = True
|
|
else:
|
|
confirmation_candle = df.iloc[i + 1]
|
|
is_confirmed = confirmation_candle['close'] < confirmation_candle['open'] and \
|
|
confirmation_candle['body'] > avg_body.iloc[i + 1] * 0.8
|
|
|
|
if is_confirmed:
|
|
strength = (prev_swing_high_val - current_candle['close'])
|
|
strength /= calculate_atr_dynamic(df.loc[:current_candle.name])
|
|
liquidity_sweeps.append({
|
|
'type': 'BEARISH_LS',
|
|
'swept_level': float(prev_swing_high_val),
|
|
'sweep_time': current_candle['time'],
|
|
'strength': strength
|
|
})
|
|
|
|
unique_sweeps = {s['sweep_time']: s for s in liquidity_sweeps}.values()
|
|
return list(unique_sweeps)
|
|
|
|
|
|
# ========== LIQUIDITY GRAB (SWEEP ON EQH/EQL) ==========
|
|
|
|
def detect_liquidity_grab(
|
|
df: pd.DataFrame, window: int = 20, tolerance: float = 0.0003, confirmation_wick_ratio: float = 0.5
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Mendeteksi sweep pada liquidity pool (EQH/EQL).
|
|
Ini adalah sinyal probabilitas tinggi karena menargetkan area likuiditas yang jelas.
|
|
"""
|
|
grabs = []
|
|
if len(df) < window + 5: # Butuh beberapa candle setelah window
|
|
return grabs
|
|
|
|
eqh_pools, eql_pools = detect_eqh_eql(df, window=window, tolerance=tolerance)
|
|
|
|
# Hanya proses pool terbaru untuk efisiensi
|
|
recent_eql = eql_pools[-1] if eql_pools else None
|
|
recent_eqh = eqh_pools[-1] if eqh_pools else None
|
|
|
|
# Cek Bullish Liquidity Grab (sweep EQL)
|
|
if recent_eql:
|
|
pool_level = recent_eql['value']
|
|
pool_time = recent_eql['time']
|
|
|
|
# Cari candle yang melakukan sweep setelah pool terbentuk
|
|
candles_after_pool = df[df['time'] > pool_time]
|
|
for i in range(len(candles_after_pool)):
|
|
sweep_candle = candles_after_pool.iloc[i]
|
|
# 1. Harga harus turun di bawah pool
|
|
# 2. Harga harus ditutup kembali di atas pool (konfirmasi)
|
|
if sweep_candle['low'] < pool_level and sweep_candle['close'] > pool_level:
|
|
# 3. Konfirmasi tambahan: wick bawah harus signifikan
|
|
lower_wick = sweep_candle['open'] - sweep_candle['low'] \
|
|
if sweep_candle['open'] > sweep_candle['close'] else sweep_candle['close'] - sweep_candle['low']
|
|
if lower_wick > (sweep_candle['high'] - sweep_candle['low']) * confirmation_wick_ratio:
|
|
strength = lower_wick / calculate_atr_dynamic(df.loc[:sweep_candle.name])
|
|
grabs.append({
|
|
'type': 'LIQUIDITY_GRAB_BULL',
|
|
'swept_level': float(pool_level),
|
|
'sweep_time': sweep_candle['time'],
|
|
'strength': strength # Kekuatan berdasarkan penolakan
|
|
})
|
|
break # Hanya ambil grab pertama setelah pool
|
|
|
|
# Cek Bearish Liquidity Grab (sweep EQH)
|
|
if recent_eqh:
|
|
pool_level = recent_eqh['value']
|
|
pool_time = recent_eqh['time']
|
|
|
|
candles_after_pool = df[df['time'] > pool_time]
|
|
for i in range(len(candles_after_pool)):
|
|
sweep_candle = candles_after_pool.iloc[i]
|
|
# 1. Harga harus naik di atas pool
|
|
# 2. Harga harus ditutup kembali di bawah pool
|
|
if sweep_candle['high'] > pool_level and sweep_candle['close'] < pool_level:
|
|
# 3. Konfirmasi tambahan: wick atas harus signifikan
|
|
upper_wick = sweep_candle['high'] - sweep_candle['close'] \
|
|
if sweep_candle['open'] > sweep_candle['close'] else sweep_candle['high'] - sweep_candle['open']
|
|
if upper_wick > (sweep_candle['high'] - sweep_candle['low']) * confirmation_wick_ratio:
|
|
strength = upper_wick / calculate_atr_dynamic(df.loc[:sweep_candle.name])
|
|
grabs.append({
|
|
'type': 'LIQUIDITY_GRAB_BEAR',
|
|
'swept_level': float(pool_level),
|
|
'sweep_time': sweep_candle['time'],
|
|
'strength': strength
|
|
})
|
|
break
|
|
|
|
return grabs
|
|
|
|
|
|
# ========== OTE ZONE ==========
|
|
|
|
def calculate_optimal_trade_entry(
|
|
swing_start_price: float, swing_end_price: float, direction: str
|
|
) -> Dict[str, float]:
|
|
fib_levels = {
|
|
'0.618': 0.618,
|
|
'0.705': 0.705,
|
|
'0.790': 0.790
|
|
}
|
|
ote_zone: Dict[str, float] = {}
|
|
high_val = max(swing_start_price, swing_end_price)
|
|
low_val = min(swing_start_price, swing_end_price)
|
|
price_range = high_val - low_val
|
|
if direction.upper() == "BUY":
|
|
ote_zone['upper'] = high_val - (price_range * fib_levels['0.618'])
|
|
ote_zone['mid'] = high_val - (price_range * fib_levels['0.705'])
|
|
ote_zone['lower'] = high_val - (price_range * fib_levels['0.790'])
|
|
elif direction.upper() == "SELL":
|
|
ote_zone['upper'] = low_val + (price_range * fib_levels['0.790'])
|
|
ote_zone['mid'] = low_val + (price_range * fib_levels['0.705'])
|
|
ote_zone['lower'] = low_val + (price_range * fib_levels['0.618'])
|
|
return ote_zone
|
|
|
|
|
|
# ========== CHANGE OF CHARACTER (CHoCH) ==========
|
|
|
|
def detect_change_of_character(df: pd.DataFrame, swing_lookback: int = 5) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Mendeteksi Change of Character (CHoCH), sinyal awal dari potensi reversal.
|
|
CHoCH terjadi ketika harga menembus swing point minor terakhir setelah sebuah
|
|
Break of Structure (BOS) terjadi.
|
|
"""
|
|
if len(df) < swing_lookback * 2 + 5: # Butuh beberapa candle ekstra
|
|
return None
|
|
|
|
_, swing_points = detect_structure(df, swing_lookback)
|
|
last_high = swing_points.get('last_high')
|
|
last_low = swing_points.get('last_low')
|
|
|
|
# Tidak bisa mendeteksi CHoCH tanpa swing point yang jelas
|
|
if not last_high or not last_low:
|
|
return None
|
|
|
|
df = df.copy()
|
|
df['is_swing_high'] = (df['high'] == df['high'].rolling(swing_lookback * 2 + 1, center=True).max())
|
|
df['is_swing_low'] = (df['low'] == df['low'].rolling(swing_lookback * 2 + 1, center=True).min())
|
|
|
|
swing_highs = df[df['is_swing_high']]
|
|
swing_lows = df[df['is_swing_low']]
|
|
|
|
if swing_highs.empty or swing_lows.empty:
|
|
return None
|
|
|
|
# Skenario Bearish CHoCH:
|
|
# Terjadi setelah Bullish BOS, lalu harga break swing low terakhir.
|
|
# 1. Cari Bullish BOS terbaru (higher high).
|
|
if len(swing_highs) >= 2:
|
|
last_sh = swing_highs.iloc[-1]
|
|
prev_sh = swing_highs.iloc[-2]
|
|
if last_sh['high'] > prev_sh['high']: # Indikasi Bullish Trend/BOS
|
|
# 2. Cari swing low yang terbentuk antara dua swing high ini.
|
|
relevant_lows = swing_lows[(swing_lows.index > prev_sh.name) & (swing_lows.index < last_sh.name)]
|
|
if not relevant_lows.empty:
|
|
choch_level = relevant_lows.iloc[-1]['low']
|
|
# 3. Cek apakah harga saat ini atau beberapa candle terakhir break di bawah level itu.
|
|
recent_candles = df.iloc[-3:]
|
|
if (recent_candles['close'] < choch_level).any():
|
|
return {'type': 'BEARISH_CHoCH', 'price': choch_level, 'time': df.iloc[-1]['time']}
|
|
|
|
# Skenario Bullish CHoCH:
|
|
# Terjadi setelah Bearish BOS, lalu harga break swing high terakhir.
|
|
# 1. Cari Bearish BOS terbaru (lower low).
|
|
if len(swing_lows) >= 2:
|
|
last_sl = swing_lows.iloc[-1]
|
|
prev_sl = swing_lows.iloc[-2]
|
|
if last_sl['low'] < prev_sl['low']: # PERBAIKAN: prev_sh -> prev_sl
|
|
# 2. Cari swing high yang terbentuk antara dua swing low ini.
|
|
relevant_highs = swing_highs[(swing_highs.index > prev_sl.name) & (swing_highs.index < last_sl.name)]
|
|
if not relevant_highs.empty:
|
|
choch_level = relevant_highs.iloc[-1]['high']
|
|
# 3. Cek apakah harga saat ini atau beberapa candle terakhir break di atas level itu.
|
|
recent_candles = df.iloc[-3:]
|
|
if (recent_candles['close'] > choch_level).any():
|
|
return {'type': 'BULLISH_CHoCH', 'price': choch_level, 'time': df.iloc[-1]['time']}
|
|
|
|
return None
|
|
|
|
|
|
# ========== RSI & DIVERGENCE ==========
|
|
|
|
def calculate_rsi(prices: pd.Series, period: int = 14) -> pd.Series:
|
|
"""
|
|
Menghitung Relative Strength Index (RSI).
|
|
"""
|
|
if len(prices) < period + 1:
|
|
return pd.Series([np.nan] * len(prices), index=prices.index)
|
|
|
|
delta = prices.diff()
|
|
|
|
gain = (delta.where(delta > 0, 0)).ewm(alpha=1/period, adjust=False).mean()
|
|
loss = (-delta.where(delta < 0, 0)).ewm(alpha=1/period, adjust=False).mean()
|
|
|
|
rs = gain / loss
|
|
rsi = 100 - (100 / (1 + rs))
|
|
|
|
return rsi
|
|
|
|
|
|
def detect_regular_divergence(df: pd.DataFrame, swing_lookback: int = 5) -> List[Dict[str, Any]]:
|
|
"""
|
|
Mendeteksi divergensi reguler (sinyal reversal) antara harga dan RSI.
|
|
"""
|
|
divergences = []
|
|
if 'rsi' not in df.columns or df['rsi'].isnull().all():
|
|
return divergences
|
|
|
|
df['price_swing_high'] = (df['high'] == df['high'].rolling(swing_lookback * 2 + 1, center=True).max())
|
|
df['price_swing_low'] = (df['low'] == df['low'].rolling(swing_lookback * 2 + 1, center=True).min())
|
|
df['rsi_swing_high'] = (df['rsi'] == df['rsi'].rolling(swing_lookback * 2 + 1, center=True).max())
|
|
df['rsi_swing_low'] = (df['rsi'] == df['rsi'].rolling(swing_lookback * 2 + 1, center=True).min())
|
|
|
|
price_lows = df[df['price_swing_low']]
|
|
price_highs = df[df['price_swing_high']]
|
|
rsi_lows = df[df['rsi_swing_low']]
|
|
rsi_highs = df[df['rsi_swing_high']]
|
|
|
|
# Regular Bullish Divergence: Lower Low di Harga, Higher Low di RSI
|
|
if len(price_lows) >= 2 and len(rsi_lows) >= 2:
|
|
last_price_low = price_lows.iloc[-1]
|
|
prev_price_low = price_lows.iloc[-2]
|
|
|
|
if last_price_low['low'] < prev_price_low['low']:
|
|
# Cari RSI low yang relevan
|
|
relevant_rsi_lows = rsi_lows[(rsi_lows.index >= prev_price_low.name) & (rsi_lows.index <= last_price_low.name)]
|
|
if len(relevant_rsi_lows) >= 2:
|
|
if relevant_rsi_lows.iloc[-1]['rsi'] > relevant_rsi_lows.iloc[0]['rsi']:
|
|
divergences.append({'type': 'REGULAR_BULLISH_DIVERGENCE', 'time': last_price_low['time']})
|
|
|
|
# Regular Bearish Divergence: Higher High di Harga, Lower High di RSI
|
|
if len(price_highs) >= 2 and len(rsi_highs) >= 2:
|
|
last_price_high = price_highs.iloc[-1]
|
|
prev_price_high = price_highs.iloc[-2]
|
|
|
|
if last_price_high['high'] > prev_price_high['high']:
|
|
# Cari RSI high yang relevan
|
|
relevant_rsi_highs = rsi_highs[(rsi_highs.index >= prev_price_high.name) & (rsi_highs.index <= last_price_high.name)]
|
|
if len(relevant_rsi_highs) >= 2:
|
|
if relevant_rsi_highs.iloc[-1]['rsi'] < relevant_rsi_highs.iloc[0]['rsi']:
|
|
divergences.append({'type': 'REGULAR_BEARISH_DIVERGENCE', 'time': last_price_high['time']})
|
|
|
|
return divergences
|
|
|
|
|
|
def detect_hidden_divergence(df: pd.DataFrame, swing_lookback: int = 5) -> List[Dict[str, Any]]:
|
|
"""
|
|
Mendeteksi divergensi tersembunyi (sinyal continuation) antara harga dan RSI.
|
|
"""
|
|
divergences = []
|
|
if 'rsi' not in df.columns or df['rsi'].isnull().all():
|
|
return divergences
|
|
|
|
df['price_swing_high'] = (df['high'] == df['high'].rolling(swing_lookback * 2 + 1, center=True).max())
|
|
df['price_swing_low'] = (df['low'] == df['low'].rolling(swing_lookback * 2 + 1, center=True).min())
|
|
df['rsi_swing_high'] = (df['rsi'] == df['rsi'].rolling(swing_lookback * 2 + 1, center=True).max())
|
|
df['rsi_swing_low'] = (df['rsi'] == df['rsi'].rolling(swing_lookback * 2 + 1, center=True).min())
|
|
|
|
price_lows = df[df['price_swing_low']]
|
|
price_highs = df[df['price_swing_high']]
|
|
rsi_lows = df[df['rsi_swing_low']]
|
|
rsi_highs = df[df['rsi_swing_high']]
|
|
|
|
# Hidden Bullish Divergence: Higher Low di Harga, Lower Low di RSI
|
|
if len(price_lows) >= 2 and len(rsi_lows) >= 2:
|
|
last_price_low = price_lows.iloc[-1]
|
|
prev_price_low = price_lows.iloc[-2]
|
|
|
|
if last_price_low['low'] > prev_price_low['low']:
|
|
relevant_rsi_lows = rsi_lows[(rsi_lows.index >= prev_price_low.name) & (rsi_lows.index <= last_price_low.name)]
|
|
if len(relevant_rsi_lows) >= 2:
|
|
if relevant_rsi_lows.iloc[-1]['rsi'] < relevant_rsi_lows.iloc[0]['rsi']:
|
|
divergences.append({'type': 'HIDDEN_BULLISH_DIVERGENCE', 'time': last_price_low['time']})
|
|
|
|
# Hidden Bearish Divergence: Lower High di Harga, Higher High di RSI
|
|
if len(price_highs) >= 2 and len(rsi_highs) >= 2:
|
|
last_price_high = price_highs.iloc[-1]
|
|
prev_price_high = price_highs.iloc[-2]
|
|
|
|
if last_price_high['high'] < prev_price_high['high']:
|
|
relevant_rsi_highs = rsi_highs[(rsi_highs.index >= prev_price_high.name) & (rsi_highs.index <= last_price_high.name)]
|
|
if len(relevant_rsi_highs) >= 2:
|
|
if relevant_rsi_highs.iloc[-1]['rsi'] > relevant_rsi_highs.iloc[0]['rsi']:
|
|
divergences.append({'type': 'HIDDEN_BEARISH_DIVERGENCE', 'time': last_price_high['time']})
|
|
|
|
return divergences
|
|
|
|
|
|
# ========== ANALISIS MULTI-TIMEFRAME ==========
|
|
|
|
def analyze_mtf_trend(timeframes_data: Dict[str, pd.DataFrame]) -> Dict[str, str]:
|
|
"""Analisis trend di multiple timeframe."""
|
|
trend_analysis = {}
|
|
for tf, df in timeframes_data.items():
|
|
structure, _ = detect_structure(df)
|
|
if "BULLISH_BOS" in structure or "HH/HL" in structure:
|
|
trend_analysis[tf] = "BULLISH"
|
|
elif "BEARISH_BOS" in structure or "LL/LH" in structure:
|
|
trend_analysis[tf] = "BEARISH"
|
|
else:
|
|
trend_analysis[tf] = "NEUTRAL"
|
|
return trend_analysis
|
|
|
|
|
|
def detect_mtf_divergence(timeframes_data: Dict[str, pd.DataFrame]) -> Dict[str, List[Dict[str, Any]]]:
|
|
"""Deteksi divergence di multiple timeframe."""
|
|
divergences = {}
|
|
for tf, df in timeframes_data.items():
|
|
df = df.copy()
|
|
df['rsi'] = calculate_rsi(df['close'])
|
|
# Deteksi regular divergence
|
|
reg_div = detect_regular_divergence(df)
|
|
# Deteksi hidden divergence
|
|
hidden_div = detect_hidden_divergence(df)
|
|
divergences[tf] = reg_div + hidden_div
|
|
return divergences
|
|
|
|
|
|
def analyze_volatility(df: pd.DataFrame, window: int = 20) -> Dict[str, float]:
|
|
"""Analisis volatilitas untuk penyesuaian parameter."""
|
|
atr = calculate_atr_dynamic(df)
|
|
avg_range = df['high'].sub(df['low']).rolling(window).mean().iloc[-1]
|
|
return {
|
|
'atr': atr,
|
|
'avg_range': avg_range,
|
|
'volatility_ratio': atr / avg_range if avg_range > 0 else 1.0
|
|
}
|
|
|
|
|
|
def adjust_trade_parameters(
|
|
volatility: Dict[str, float],
|
|
base_sl_pips: float,
|
|
base_tp_pips: float
|
|
) -> Tuple[float, float]:
|
|
"""Sesuaikan SL/TP berdasarkan volatilitas."""
|
|
vol_ratio = volatility['volatility_ratio']
|
|
adjusted_sl = base_sl_pips * (1 + (vol_ratio - 1) * 0.5)
|
|
adjusted_tp = base_tp_pips * (1 + (vol_ratio - 1) * 0.5)
|
|
return adjusted_sl, adjusted_tp
|
|
|
|
|
|
# ========== TRENDLINE DETECTION ==========
|
|
|
|
def detect_trendline_breaks(
|
|
df: pd.DataFrame, length: int = 14, mult: float = 1.0, calc_method: str = 'ATR'
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Implementasi LuxAlgo Trendline dengan deteksi breakout
|
|
"""
|
|
df = df.copy()
|
|
|
|
# Deteksi pivot points
|
|
df['ph'] = df['high'].rolling(window=length * 2 + 1, center=True).apply(
|
|
lambda x: x[length] == max(x), raw=True
|
|
)
|
|
df['pl'] = df['low'].rolling(window=length * 2 + 1, center=True).apply(
|
|
lambda x: x[length] == min(x), raw=True
|
|
)
|
|
|
|
# Kalkulasi slope berdasarkan metode
|
|
if calc_method == 'ATR':
|
|
slope = df['high'].sub(df['low']).rolling(length).mean() / length * mult
|
|
elif calc_method == 'STDEV':
|
|
slope = df['close'].rolling(length).std() / length * mult
|
|
else: # LINREG
|
|
x = np.arange(length)
|
|
slope = df['close'].rolling(length).apply(
|
|
lambda y: np.polyfit(x, y, 1)[0] if len(y) > 1 else np.nan
|
|
) * mult
|
|
|
|
# Inisialisasi trendlines
|
|
upper = pd.Series(index=df.index, dtype=float)
|
|
lower = pd.Series(index=df.index, dtype=float)
|
|
|
|
last_ph = None
|
|
last_pl = None
|
|
last_ph_slope = 0
|
|
last_pl_slope = 0
|
|
|
|
# Konstruksi trendlines
|
|
for i in range(length, len(df)):
|
|
if df['ph'].iloc[i]:
|
|
last_ph = df['high'].iloc[i]
|
|
last_ph_slope = slope.iloc[i]
|
|
elif last_ph is not None:
|
|
upper.iloc[i] = last_ph - last_ph_slope * (i - df.index[df['ph']].max())
|
|
|
|
if df['pl'].iloc[i]:
|
|
last_pl = df['low'].iloc[i]
|
|
last_pl_slope = slope.iloc[i]
|
|
elif last_pl is not None:
|
|
lower.iloc[i] = last_pl + last_pl_slope * (i - df.index[df['pl']].max())
|
|
|
|
# Deteksi breakouts
|
|
breaks = {
|
|
'upper_break': False,
|
|
'lower_break': False,
|
|
'upper_line': upper.iloc[-1],
|
|
'lower_line': lower.iloc[-1]
|
|
}
|
|
|
|
if len(df) > 1:
|
|
current_close = df['close'].iloc[-1]
|
|
prev_close = df['close'].iloc[-2]
|
|
|
|
# Upper break (bullish)
|
|
if prev_close <= upper.iloc[-2] and current_close > upper.iloc[-1]:
|
|
breaks['upper_break'] = True
|
|
|
|
# Lower break (bearish)
|
|
if prev_close >= lower.iloc[-2] and current_close < lower.iloc[-1]:
|
|
breaks['lower_break'] = True
|
|
|
|
return breaks
|
|
|
|
|
|
# ========== ANALISIS OPPORTUNITY TF ==========
|
|
|
|
def analyze_tf_opportunity(
|
|
df: pd.DataFrame,
|
|
symbol: str,
|
|
tf: str,
|
|
mt5_path: str,
|
|
start_date: str,
|
|
end_date: str,
|
|
profile_type: str = "intraday", # ["scalping", "intraday", "swing"]
|
|
atr_length: int = 14,
|
|
order_block_lookback: int = 15,
|
|
fvg_min_gap: float = 0.0002,
|
|
fvg_max_age: int = 20,
|
|
pattern_body_threshold: float = 1.2,
|
|
volume_window: int = 20,
|
|
volume_threshold: float = 2.0,
|
|
trendline_length: int = 14,
|
|
trendline_mult: float = 1.0,
|
|
calc_method: str = 'ATR',
|
|
weights: Optional[Dict[str, float]] = None
|
|
) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Analisis peluang trading berdasarkan profil yang dipilih (scalping/intraday/swing).
|
|
"""
|
|
# Validasi profile type
|
|
if profile_type not in ["scalping", "intraday", "swing"]:
|
|
raise ValueError("profile_type harus salah satu dari: scalping, intraday, atau swing")
|
|
|
|
# Inisialisasi bobot jika None
|
|
weights = weights or {}
|
|
|
|
# Inisialisasi parameter profil
|
|
min_score_threshold = 5.0 # Nilai default
|
|
|
|
# Sesuaikan parameter berdasarkan profil
|
|
if profile_type == "scalping":
|
|
order_block_lookback = 10
|
|
fvg_max_age = 10
|
|
pattern_body_threshold = 1.0
|
|
volume_threshold = 1.5
|
|
min_score_threshold = 3.0
|
|
|
|
elif profile_type == "intraday":
|
|
order_block_lookback = 15
|
|
fvg_max_age = 20
|
|
pattern_body_threshold = 1.2
|
|
volume_threshold = 2.0
|
|
min_score_threshold = 5.0
|
|
|
|
elif profile_type == "swing":
|
|
order_block_lookback = 30
|
|
fvg_max_age = 40
|
|
pattern_body_threshold = 1.5
|
|
volume_threshold = 2.5
|
|
min_score_threshold = 7.0
|
|
|
|
# Inisialisasi scoring
|
|
score = 0.0
|
|
score_components = {}
|
|
info_list = []
|
|
|
|
# Deteksi struktur pasar
|
|
structure, swing_points = detect_structure(df)
|
|
|
|
# Analisis Order Blocks
|
|
order_blocks = detect_order_blocks_multi(
|
|
df,
|
|
lookback=order_block_lookback,
|
|
structure_filter=None,
|
|
max_age=fvg_max_age
|
|
)
|
|
|
|
# Analisis Fair Value Gaps
|
|
fvg_zones = detect_fvg_multi(
|
|
df,
|
|
min_gap=fvg_min_gap,
|
|
max_age=fvg_max_age
|
|
)
|
|
|
|
# Deteksi pattern berdasarkan profil
|
|
patterns = []
|
|
if profile_type in ["scalping", "intraday"]:
|
|
patterns.extend(detect_pinbar(df, min_ratio=pattern_body_threshold))
|
|
patterns.extend(detect_engulfing(df))
|
|
|
|
if profile_type in ["intraday", "swing"]:
|
|
patterns.extend(detect_continuation_patterns(df, body_threshold=pattern_body_threshold))
|
|
|
|
# Analisis volume khusus untuk scalping dan intraday
|
|
volume_analysis = None
|
|
if profile_type in ["scalping", "intraday"]:
|
|
volume_spikes = detect_volume_spike(df, window=volume_window, threshold=volume_threshold)
|
|
if volume_spikes:
|
|
volume_analysis = volume_spikes[-1] # Ambil spike terakhir
|
|
|
|
# Analisis trendline dengan sensitivitas berbeda per profil
|
|
trendline_breaks = detect_trendline_breaks(
|
|
df,
|
|
length=trendline_length,
|
|
mult=trendline_mult,
|
|
calc_method=calc_method
|
|
)
|
|
|
|
# Scoring berdasarkan profil
|
|
# Structure scoring
|
|
if "BULLISH_BOS" in structure:
|
|
structure_score = weights.get('BULLISH_BOS', 3.0)
|
|
if profile_type == "swing":
|
|
structure_score *= 1.5 # Lebih penting untuk swing
|
|
score += structure_score
|
|
score_components['structure'] = structure_score
|
|
info_list.append(f"Bullish BOS detected ({profile_type} profile)")
|
|
|
|
elif "BEARISH_BOS" in structure:
|
|
structure_score = weights.get('BEARISH_BOS', -3.0)
|
|
if profile_type == "swing":
|
|
structure_score *= 1.5
|
|
score += structure_score
|
|
score_components['structure'] = structure_score
|
|
info_list.append(f"Bearish BOS detected ({profile_type} profile)")
|
|
|
|
# Order Block scoring
|
|
for ob in order_blocks[:2]: # Consider top 2 closest OBs
|
|
ob_score = weights.get(f"{ob['type']}", 1.0)
|
|
if profile_type == "scalping":
|
|
ob_score *= 0.7 # Less important for scalping
|
|
elif profile_type == "swing":
|
|
ob_score *= 1.3 # More important for swing
|
|
score += ob_score
|
|
score_components['order_block'] = ob_score
|
|
info_list.append(f"Order Block: {ob['type']} detected")
|
|
|
|
# Pattern scoring berdasarkan profil
|
|
for pattern in patterns:
|
|
pattern_score = weights.get(pattern['type'], 1.0)
|
|
if profile_type == "scalping":
|
|
pattern_score *= 1.3 # More important for scalping
|
|
elif profile_type == "swing":
|
|
pattern_score *= 0.7 # Less important for swing
|
|
score += pattern_score
|
|
score_components['pattern'] = pattern_score
|
|
info_list.append(f"Pattern: {pattern['type']} detected")
|
|
|
|
# Volume analysis scoring (terutama untuk scalping/intraday)
|
|
if volume_analysis and profile_type in ["scalping", "intraday"]:
|
|
volume_score = weights.get('VOLUME_SPIKE', 1.0)
|
|
score += volume_score
|
|
score_components['volume'] = volume_score
|
|
info_list.append("Volume spike detected")
|
|
|
|
# Trendline break scoring
|
|
if trendline_breaks['upper_break']:
|
|
trendline_score = weights.get('TRENDLINE_BREAK_BULL', 2.0)
|
|
if profile_type == "swing":
|
|
trendline_score *= 1.2
|
|
score += trendline_score
|
|
score_components['trendline'] = trendline_score
|
|
info_list.append(f"Bullish trendline break ({profile_type} profile)")
|
|
|
|
elif trendline_breaks['lower_break']:
|
|
trendline_score = weights.get('TRENDLINE_BREAK_BEAR', -2.0)
|
|
if profile_type == "swing":
|
|
trendline_score *= 1.2
|
|
score += trendline_score
|
|
score_components['trendline'] = trendline_score
|
|
info_list.append(f"Bearish trendline break ({profile_type} profile)")
|
|
|
|
# Check if score meets minimum threshold for the profile
|
|
if abs(score) < min_score_threshold:
|
|
return None
|
|
|
|
return {
|
|
'symbol': symbol,
|
|
'timeframe': tf,
|
|
'profile_type': profile_type,
|
|
'score': score,
|
|
'score_components': score_components,
|
|
'structure': structure,
|
|
'order_blocks': order_blocks,
|
|
'fvg_zones': fvg_zones,
|
|
'patterns': patterns,
|
|
'trendline_breaks': trendline_breaks,
|
|
'volume_analysis': volume_analysis,
|
|
'info': info_list
|
|
}
|
|
|
|
|
|
def detect_smc_structure(df: pd.DataFrame, length: int = 14) -> Dict[str, Any]:
|
|
"""
|
|
Deteksi struktur Smart Money Concepts (SMC)
|
|
"""
|
|
df = df.copy()
|
|
result = {
|
|
'internal_structure': {'bullish_bos': False, 'bearish_bos': False,
|
|
'bullish_choch': False, 'bearish_choch': False},
|
|
'swing_structure': {'bullish_bos': False, 'bearish_bos': False,
|
|
'bullish_choch': False, 'bearish_choch': False},
|
|
'order_blocks': [],
|
|
'fair_value_gaps': [],
|
|
'equal_levels': {'highs': [], 'lows': []}
|
|
}
|
|
|
|
# Deteksi Internal Structure
|
|
df['internal_high'] = df['high'].rolling(5).max() == df['high']
|
|
df['internal_low'] = df['low'].rolling(5).min() == df['low']
|
|
|
|
# Deteksi Break of Structure (BOS)
|
|
for i in range(5, len(df) - 1):
|
|
# Internal BOS
|
|
if df['close'].iloc[i] > df['high'].iloc[i - 1] and df['internal_high'].iloc[i - 1]:
|
|
result['internal_structure']['bullish_bos'] = True
|
|
elif df['close'].iloc[i] < df['low'].iloc[i - 1] and df['internal_low'].iloc[i - 1]:
|
|
result['internal_structure']['bearish_bos'] = True
|
|
|
|
# Swing BOS dengan konfirmasi
|
|
if i >= 14:
|
|
if df['close'].iloc[i] > df['high'].iloc[i - 14:i].max():
|
|
result['swing_structure']['bullish_bos'] = True
|
|
elif df['close'].iloc[i] < df['low'].iloc[i - 14:i].min():
|
|
result['swing_structure']['bearish_bos'] = True
|
|
|
|
# Deteksi Change of Character (CHoCH)
|
|
if len(df) > 20:
|
|
last_swing = "none"
|
|
for i in range(20, len(df)):
|
|
if df['high'].iloc[i] > df['high'].iloc[i - 20:i].max():
|
|
if last_swing == "low":
|
|
result['swing_structure']['bullish_choch'] = True
|
|
last_swing = "high"
|
|
elif df['low'].iloc[i] < df['low'].iloc[i - 20:i].min():
|
|
if last_swing == "high":
|
|
result['swing_structure']['bearish_choch'] = True
|
|
last_swing = "low"
|
|
|
|
# Deteksi Order Blocks
|
|
atr = calculate_atr_dynamic(df)
|
|
for i in range(len(df) - 3, len(df)):
|
|
candle_size = df['high'].iloc[i] - df['low'].iloc[i]
|
|
if candle_size > 1.5 * atr: # Significant candle
|
|
if df['close'].iloc[i] < df['open'].iloc[i]: # Bearish OB
|
|
result['order_blocks'].append({
|
|
'type': 'bearish',
|
|
'high': df['high'].iloc[i],
|
|
'low': df['low'].iloc[i],
|
|
'time': df.index[i]
|
|
})
|
|
elif df['close'].iloc[i] > df['open'].iloc[i]: # Bullish OB
|
|
result['order_blocks'].append({
|
|
'type': 'bullish',
|
|
'high': df['high'].iloc[i],
|
|
'low': df['low'].iloc[i],
|
|
'time': df.index[i]
|
|
})
|
|
|
|
# Deteksi Fair Value Gaps
|
|
for i in range(1, len(df) - 1):
|
|
if df['low'].iloc[i + 1] > df['high'].iloc[i - 1]: # Bullish FVG
|
|
result['fair_value_gaps'].append({
|
|
'type': 'bullish',
|
|
'top': df['low'].iloc[i + 1],
|
|
'bottom': df['high'].iloc[i - 1],
|
|
'time': df.index[i]
|
|
})
|
|
elif df['high'].iloc[i + 1] < df['low'].iloc[i - 1]: # Bearish FVG
|
|
result['fair_value_gaps'].append({
|
|
'type': 'bearish',
|
|
'top': df['low'].iloc[i - 1],
|
|
'bottom': df['high'].iloc[i + 1],
|
|
'time': df.index[i]
|
|
})
|
|
|
|
return result
|
|
|
|
|
|
def adapt_weights_to_context(weights: Dict[str, float], context: Dict[str, Any]) -> Dict[str, float]:
|
|
"""
|
|
Adaptasi bobot berdasarkan kondisi market.
|
|
"""
|
|
adapted = weights.copy()
|
|
|
|
# Sesuaikan berdasarkan fase trend
|
|
if context['trend_phase'] == 'RANGING':
|
|
# Kurangi bobot breakout, tingkatkan bobot reversal
|
|
for key in adapted:
|
|
if 'BREAKOUT' in key or 'BOS' in key:
|
|
adapted[key] *= 0.7
|
|
if 'REVERSAL' in key or 'CHoCH' in key:
|
|
adapted[key] *= 1.3
|
|
elif context['trend_phase'] == 'TRENDING':
|
|
# Tingkatkan bobot breakout & continuation
|
|
for key in adapted:
|
|
if 'BREAKOUT' in key or 'BOS' in key or 'CONTINUATION' in key:
|
|
adapted[key] *= 1.3
|
|
if 'REVERSAL' in key or 'CHoCH' in key:
|
|
adapted[key] *= 0.7
|
|
|
|
# Sesuaikan berdasarkan volatilitas
|
|
vol_ratio = context.get('volatility', {}).get('volatility_ratio', 1.0)
|
|
if vol_ratio > 1.5: # High volatility
|
|
# Kurangi bobot pattern, tingkatkan bobot struktur
|
|
for key in adapted:
|
|
if 'PATTERN' in key or 'ENGULFING' in key or 'PINBAR' in key:
|
|
adapted[key] *= 0.8
|
|
if 'STRUCTURE' in key or 'BOS' in key:
|
|
adapted[key] *= 1.2
|
|
elif vol_ratio < 0.5: # Low volatility
|
|
# Tingkatkan bobot pattern
|
|
for key in adapted:
|
|
if 'PATTERN' in key or 'ENGULFING' in key or 'PINBAR' in key:
|
|
adapted[key] *= 1.2
|
|
|
|
# Sesuaikan berdasarkan kondisi likuiditas
|
|
liquidity = context.get('liquidity_conditions', {})
|
|
if liquidity.get('average_spread', 0) > 2.0: # Spread tinggi
|
|
# Kurangi semua bobot karena kondisi kurang ideal
|
|
for key in adapted:
|
|
adapted[key] *= 0.8
|
|
|
|
return adapted
|
|
|
|
|
|
def validate_scalping_opportunity(
|
|
df: pd.DataFrame, profile_settings: Dict[str, Any]
|
|
) -> Tuple[bool, float, List[str]]:
|
|
"""
|
|
Validasi khusus untuk scalping dengan kriteria yang lebih ketat
|
|
"""
|
|
validations = []
|
|
confidence_boost = 0
|
|
|
|
# 1. Analisis Volume
|
|
volume_ma = df['tick_volume'].rolling(20).mean()
|
|
current_volume = df['tick_volume'].iloc[-1]
|
|
volume_ratio = current_volume / volume_ma.iloc[-1]
|
|
|
|
if volume_ratio >= profile_settings['entry_rules']['minimum_volume_threshold']:
|
|
validations.append(f"Volume valid ({volume_ratio:.2f}x average)")
|
|
confidence_boost += 0.5
|
|
else:
|
|
return False, 0, ["Volume tidak mencukupi"]
|
|
|
|
# 2. Analisis Volatilitas
|
|
volatility_percentile = df['high'].sub(df['low']).rolling(50).mean().rank(pct=True).iloc[-1] * 100
|
|
|
|
if (profile_settings['entry_rules']['minimum_volatility_percentile'] <=
|
|
volatility_percentile <=
|
|
profile_settings['entry_rules']['maximum_volatility_percentile']):
|
|
validations.append(f"Volatilitas optimal ({volatility_percentile:.1f}%)")
|
|
confidence_boost += 0.5
|
|
else:
|
|
return False, 0, ["Volatilitas di luar range optimal"]
|
|
|
|
# 3. Momentum Check
|
|
rsi = calculate_rsi(df['close'])
|
|
last_rsi = rsi.iloc[-1]
|
|
|
|
if 20 <= last_rsi <= 80: # Avoid extreme conditions
|
|
momentum_aligned = (last_rsi > 50 and df['close'].iloc[-1] > df['close'].iloc[-2]) or \
|
|
(last_rsi < 50 and df['close'].iloc[-1] < df['close'].iloc[-2])
|
|
if momentum_aligned:
|
|
validations.append("Momentum aligned")
|
|
confidence_boost += 0.7
|
|
|
|
# 4. Pattern Quality Check
|
|
if profile_settings['advanced_filters']['pattern_quality_threshold']:
|
|
pattern_quality = analyze_pattern_quality(df)
|
|
if pattern_quality >= profile_settings['advanced_filters']['pattern_quality_threshold']:
|
|
validations.append(f"Pattern quality good ({pattern_quality:.2f})")
|
|
confidence_boost += 0.8
|
|
else:
|
|
return False, 0, ["Pattern quality insufficient"]
|
|
|
|
# 5. Price Action Clarity
|
|
price_clarity = analyze_price_action_clarity(df)
|
|
if price_clarity >= 0.7:
|
|
validations.append(f"Clear price action ({price_clarity:.2f})")
|
|
confidence_boost += 0.5
|
|
|
|
# 6. Reward Ratio Check
|
|
if 'minimum_reward_ratio' in profile_settings['advanced_filters']:
|
|
nearest_resistance = find_nearest_resistance(df)
|
|
nearest_support = find_nearest_support(df)
|
|
current_price = df['close'].iloc[-1]
|
|
|
|
potential_reward = abs(nearest_resistance - current_price)
|
|
potential_risk = abs(current_price - nearest_support)
|
|
|
|
if potential_risk > 0:
|
|
reward_ratio = potential_reward / potential_risk
|
|
if reward_ratio >= profile_settings['advanced_filters']['minimum_reward_ratio']:
|
|
validations.append(f"Good reward ratio ({reward_ratio:.2f})")
|
|
confidence_boost += 0.6
|
|
else:
|
|
return False, 0, ["Insufficient reward ratio"]
|
|
|
|
return len(validations) >= 4, confidence_boost, validations
|
|
|
|
|
|
def analyze_pattern_quality(df: pd.DataFrame) -> float:
|
|
"""Analisis kualitas pattern untuk scalping"""
|
|
quality_score = 0.0
|
|
last_candle = df.iloc[-1]
|
|
prev_candle = df.iloc[-2]
|
|
|
|
# 1. Body to Wick Ratio
|
|
body = abs(last_candle['close'] - last_candle['open'])
|
|
upper_wick = last_candle['high'] - max(last_candle['open'], last_candle['close'])
|
|
lower_wick = min(last_candle['open'], last_candle['close']) - last_candle['low']
|
|
|
|
if body > (upper_wick + lower_wick):
|
|
quality_score += 0.4 # Strong conviction
|
|
|
|
# 2. Clean Break of Previous Structure
|
|
if last_candle['close'] > prev_candle['high'] or last_candle['close'] < prev_candle['low']:
|
|
quality_score += 0.3
|
|
|
|
# 3. Volume Confirmation
|
|
if last_candle['tick_volume'] > df['tick_volume'].rolling(20).mean().iloc[-1]:
|
|
quality_score += 0.3
|
|
|
|
return quality_score
|
|
|
|
|
|
def analyze_price_action_clarity(df: pd.DataFrame) -> float:
|
|
"""Analisis kejelasan price action untuk scalping"""
|
|
clarity_score = 0.0
|
|
recent_candles = df.tail(5)
|
|
|
|
# 1. Consistency in Direction
|
|
closes = recent_candles['close']
|
|
if (closes.diff().dropna() > 0).all() or (closes.diff().dropna() < 0).all():
|
|
clarity_score += 0.4
|
|
|
|
# 2. Clean Candle Formation
|
|
for _, candle in recent_candles.iterrows():
|
|
body = abs(candle['close'] - candle['open'])
|
|
total_range = candle['high'] - candle['low']
|
|
if total_range > 0 and body / total_range > 0.6:
|
|
clarity_score += 0.1 # Max 0.5 from this
|
|
|
|
# 3. Minimal Overlap
|
|
for i in range(1, len(recent_candles)):
|
|
curr = recent_candles.iloc[i]
|
|
prev = recent_candles.iloc[i - 1]
|
|
if (curr['high'] > prev['high'] and curr['low'] > prev['low']) or \
|
|
(curr['high'] < prev['high'] and curr['low'] < prev['low']):
|
|
clarity_score += 0.1 # Max 0.4 from this
|
|
|
|
return min(clarity_score, 1.0)
|
|
|
|
|
|
def find_nearest_support(df: pd.DataFrame) -> float:
|
|
"""Mencari level support terdekat"""
|
|
recent_lows = df['low'].rolling(window=20).min()
|
|
current_price = df['close'].iloc[-1]
|
|
|
|
support_levels = recent_lows[recent_lows < current_price].unique()
|
|
if len(support_levels) > 0:
|
|
return max(support_levels) # Highest support below current price
|
|
return current_price - calculate_atr_dynamic(df)
|
|
|
|
|
|
def find_nearest_resistance(df: pd.DataFrame) -> float:
|
|
"""Mencari level resistance terdekat"""
|
|
recent_highs = df['high'].rolling(window=20).max()
|
|
current_price = df['close'].iloc[-1]
|
|
|
|
resistance_levels = recent_highs[recent_highs > current_price].unique()
|
|
if len(resistance_levels) > 0:
|
|
return min(resistance_levels) # Lowest resistance above current price
|
|
return current_price + calculate_atr_dynamic(df) |