markaz_arshy/technical_indicators.py
2025-08-12 14:36:24 +00:00

1368 lines
No EOL
54 KiB
Python

"""
technical_indicators.py (refactor advanced)
===========================================
Versi refaktor: multi-zone detection, scoring, pattern, boundary, pivot,
feature extractor, zone plotting & event logging.
"""
import pandas as pd
import numpy as np
from typing import List, Dict, Any, Optional, Tuple
import matplotlib.pyplot as plt
# ========== ZONE & STRUCTURE ==========
def calculate_atr_dynamic(df: pd.DataFrame, period: int = 14) -> float:
if len(df) < period + 1:
return 0.0
tr = pd.concat([
df['high'] - df['low'],
abs(df['high'] - df['close'].shift()),
abs(df['low'] - df['close'].shift())
], axis=1).max(axis=1)
atr_val = tr.ewm(span=period, adjust=False).mean().iloc[-1]
return float(atr_val)
def detect_structure(df: pd.DataFrame, swing_lookback: int = 5) -> Tuple[str, Dict[str, Any]]:
if len(df) < swing_lookback * 2:
return "INDECISIVE", {}
df = df.copy()
# Deteksi Swing Highs and Lows yang lebih robust
df['is_swing_high'] = (df['high'] == df['high'].rolling(swing_lookback * 2 + 1, center=True).max())
df['is_swing_low'] = (df['low'] == df['low'].rolling(swing_lookback * 2 + 1, center=True).min())
swing_highs = df[df['is_swing_high']]
swing_lows = df[df['is_swing_low']]
res = []
last_swing_high_price = None
last_swing_low_price = None
last_swing_high_idx = None
last_swing_low_idx = None
# Urutkan swing points berdasarkan waktu (index)
if not swing_highs.empty:
last_sh = swing_highs.iloc[-1]
last_swing_high_price = last_sh['high']
last_swing_high_idx = last_sh.name
if len(swing_highs) > 1:
prev_sh = swing_highs.iloc[-2]
if last_sh['high'] > prev_sh['high']:
res.append('HH')
else:
res.append('LH')
if not swing_lows.empty:
last_sl = swing_lows.iloc[-1]
last_swing_low_price = last_sl['low']
last_swing_low_idx = last_sl.name
if len(swing_lows) > 1:
prev_sl = swing_lows.iloc[-2]
if last_sl['low'] > prev_sl['low']:
res.append('HL')
else:
res.append('LL')
# --- LOGIKA BARU UNTUK BREAK OF STRUCTURE (BOS) ---
# Membutuhkan penutupan candle (candle close) untuk konfirmasi
if last_swing_high_idx is not None:
# Cek candle SETELAH swing high terakhir
subsequent_candles = df.loc[last_swing_high_idx:].iloc[1:]
confirmed_break = subsequent_candles[subsequent_candles['close'] > last_swing_high_price]
if not confirmed_break.empty:
res.append("BULLISH_BOS")
if last_swing_low_idx is not None:
# Cek candle SETELAH swing low terakhir
subsequent_candles = df.loc[last_swing_low_idx:].iloc[1:]
confirmed_break = subsequent_candles[subsequent_candles['close'] < last_swing_low_price]
if not confirmed_break.empty:
res.append("BEARISH_BOS")
structure_str = "/".join(sorted(list(set(res)))) if res else "INDECISIVE"
return structure_str, {'last_high': last_swing_high_price, 'last_low': last_swing_low_price}
def detect_order_blocks_multi(
df: pd.DataFrame, lookback: int = 15, structure_filter: Optional[str] = None, max_age: int = 20
) -> List[Dict[str, Any]]:
order_blocks = []
now_price = df['close'].iloc[-1]
atr = calculate_atr_dynamic(df)
last_idx = df.index[-1]
for i in range(len(df) - lookback - 2, len(df) - 2):
candle = df.iloc[i]
age = last_idx - df.index[i]
if age > max_age:
continue
# Bullish OB
if candle['close'] < candle['open']:
found_bos = False
for j in range(i + 1, min(i + lookback + 1, len(df))):
if df.iloc[j]['close'] > candle['high']:
found_bos = True
break
if found_bos:
if structure_filter and structure_filter != "BULLISH_BOS":
continue
strength = (abs(candle['open'] - candle['close']) / atr) if atr else 1
order_blocks.append({
'type': 'BULLISH_OB',
'high': float(candle['high']),
'low': float(candle['low']),
'time': candle['time'],
'age': age,
'strength': strength,
'distance': abs(now_price - ((candle['open'] + candle['close']) / 2)),
})
# Bearish OB
elif candle['close'] > candle['open']:
found_bos = False
for j in range(i + 1, min(i + lookback + 1, len(df))):
if df.iloc[j]['close'] < candle['low']:
found_bos = True
break
if found_bos:
if structure_filter and structure_filter != "BEARISH_BOS":
continue
strength = (abs(candle['open'] - candle['close']) / atr) if atr else 1
order_blocks.append({
'type': 'BEARISH_OB',
'high': float(candle['high']),
'low': float(candle['low']),
'time': candle['time'],
'age': age,
'strength': strength,
'distance': abs(now_price - ((candle['open'] + candle['close']) / 2)),
})
order_blocks = sorted(order_blocks, key=lambda x: (x['distance'], -x['strength'], x['age']))
return order_blocks
def detect_fvg_multi(df: pd.DataFrame, min_gap: float = 0.0002, max_age: int = 20) -> List[Dict[str, Any]]:
fvg_zones = []
now_price = df['close'].iloc[-1]
atr = calculate_atr_dynamic(df)
last_idx = df.index[-1]
for i in range(1, len(df) - 1):
c_prev, c_now, c_next = df.iloc[i - 1], df.iloc[i], df.iloc[i + 1]
age = last_idx - df.index[i]
if age > max_age:
continue
# Bullish FVG
gap = c_next['low'] - c_prev['high']
if gap > min_gap:
strength = gap / atr if atr else 1
fvg_zones.append({
'type': 'FVG_BULLISH',
'start': float(c_prev['high']),
'end': float(c_next['low']),
'time': c_now['time'],
'age': age,
'strength': strength,
'distance': abs(now_price - ((c_prev['high'] + c_next['low']) / 2))
})
# Bearish FVG
gap = c_prev['low'] - c_next['high']
if gap > min_gap:
strength = gap / atr if atr else 1
fvg_zones.append({
'type': 'FVG_BEARISH',
'start': float(c_next['high']),
'end': float(c_prev['low']),
'time': c_now['time'],
'age': age,
'strength': strength,
'distance': abs(now_price - ((c_next['high'] + c_prev['low']) / 2))
})
fvg_zones = sorted(fvg_zones, key=lambda x: (x['distance'], -x['strength'], x['age']))
return fvg_zones
# ========== PATTERN, VOLUME, BOUNDARY, PIVOT ==========
def detect_pinbar(df: pd.DataFrame, min_ratio: float = 2.0) -> List[Dict[str, Any]]:
"""Pinbar: ekor minimal 2x body."""
pattern = []
for i in range(2, len(df)):
c = df.iloc[i]
body = abs(c['close'] - c['open'])
upper = c['high'] - max(c['close'], c['open'])
lower = min(c['close'], c['open']) - c['low']
# Bullish pinbar
if lower > min_ratio * body and upper < 0.5 * body:
pattern.append({'type': 'PINBAR_BULL', 'time': c['time'], 'idx': i})
# Bearish pinbar
elif upper > min_ratio * body and lower < 0.5 * body:
pattern.append({'type': 'PINBAR_BEAR', 'time': c['time'], 'idx': i})
return pattern
def detect_engulfing(df: pd.DataFrame) -> List[Dict[str, Any]]:
pattern = []
for i in range(1, len(df)):
prev, c = df.iloc[i - 1], df.iloc[i]
# Bullish engulfing
if c['close'] > c['open'] and prev['close'] < prev['open'] and \
c['close'] > prev['open'] and c['open'] < prev['close']:
pattern.append({'type': 'ENGULFING_BULL', 'time': c['time'], 'idx': i})
# Bearish engulfing
elif c['close'] < c['open'] and prev['close'] > prev['open'] and \
c['close'] < prev['open'] and c['open'] > prev['close']:
pattern.append({'type': 'ENGULFING_BEAR', 'time': c['time'], 'idx': i})
return pattern
def detect_volume_spike(df: pd.DataFrame, window: int = 20, threshold: float = 2.0) -> List[Dict[str, Any]]:
if 'tick_volume' not in df.columns:
return []
pattern = []
vol_ma = df['tick_volume'].rolling(window).mean()
for i in range(window, len(df)):
if df['tick_volume'].iloc[i] > threshold * vol_ma.iloc[i]:
pattern.append({'type': 'VOLUME_SPIKE', 'time': df['time'].iloc[i], 'idx': i})
return pattern
def get_daily_high_low(df: pd.DataFrame) -> Dict[str, Any]:
today = pd.to_datetime(df['time'].iloc[-1]).date()
today_data = df[pd.to_datetime(df['time']).dt.date == today]
high = today_data['high'].max()
low = today_data['low'].min()
last_close = today_data['close'].iloc[-1] if len(today_data) else df['close'].iloc[-1]
return {
'daily_high': high,
'daily_low': low,
'distance_to_high': abs(last_close - high),
'distance_to_low': abs(last_close - low)
}
def get_pivot_points(df: pd.DataFrame) -> Dict[str, float]:
# Classic pivot, pakai data daily terakhir
today = pd.to_datetime(df['time'].iloc[-1]).date()
prev_day = today - pd.Timedelta(days=1)
prev_data = df[pd.to_datetime(df['time']).dt.date == prev_day]
if len(prev_data) == 0:
prev_data = df.iloc[-20:] # fallback: 20 bar terakhir
high, low, close = prev_data['high'].max(), prev_data['low'].min(), prev_data['close'].iloc[-1]
pivot = (high + low + close) / 3
r1 = 2 * pivot - low
s1 = 2 * pivot - high
return {'pivot': pivot, 'r1': r1, 's1': s1}
def detect_continuation_patterns(
df: pd.DataFrame, base_max_candles: int = 4, body_threshold: float = 1.2
) -> List[Dict[str, Any]]:
"""Mendeteksi pola RBR (Rally-Base-Rally) dan DBD (Drop-Base-Drop)."""
patterns = []
if len(df) < base_max_candles + 2:
return patterns
df = df.copy()
df['body'] = abs(df['close'] - df['open'])
avg_body = df['body'].rolling(20).mean()
for i in range(len(df) - base_max_candles - 1):
# Cek RBR (Rally-Base-Rally)
rally1 = df.iloc[i]
is_strong_rally1 = rally1['close'] > rally1['open'] and rally1['body'] > avg_body.iloc[i] * body_threshold
if is_strong_rally1:
base_candles = df.iloc[i + 1: i + 1 + base_max_candles]
base_high = base_candles['high'].max()
base_low = base_candles['low'].min()
# Base harus berada dalam rentang rally1 dan tidak terlalu besar
if base_high < rally1['high'] + (rally1['body'] * 0.2) and \
base_low > rally1['low'] - (rally1['body'] * 0.2):
for j in range(1, base_max_candles + 1):
rally2_idx = i + j
if rally2_idx < len(df) - 1:
rally2 = df.iloc[rally2_idx + 1]
is_strong_rally2 = rally2['close'] > rally2['open'] and \
rally2['body'] > avg_body.iloc[rally2_idx + 1] * body_threshold
if is_strong_rally2 and rally2['close'] > rally1['high']:
patterns.append({'type': 'RBR', 'time': rally2['time'], 'idx': rally2_idx + 1})
i = rally2_idx + 1 # Skip untuk menghindari deteksi tumpang tindih
break
# Cek DBD (Drop-Base-Drop)
drop1 = df.iloc[i]
is_strong_drop1 = drop1['close'] < drop1['open'] and drop1['body'] > avg_body.iloc[i] * body_threshold
if is_strong_drop1:
base_candles = df.iloc[i + 1: i + 1 + base_max_candles]
base_high = base_candles['high'].max()
base_low = base_candles['low'].min()
# Base harus berada dalam rentang drop1
if base_high < drop1['high'] + (drop1['body'] * 0.2) and \
base_low > drop1['low'] - (drop1['body'] * 0.2):
for j in range(1, base_max_candles + 1):
drop2_idx = i + j
if drop2_idx < len(df) - 1:
drop2 = df.iloc[drop2_idx + 1]
is_strong_drop2 = drop2['close'] < drop2['open'] and \
drop2['body'] > avg_body.iloc[drop2_idx + 1] * body_threshold
if is_strong_drop2 and drop2['close'] < drop1['low']:
patterns.append({'type': 'DBD', 'time': drop2['time'], 'idx': drop2_idx + 1})
i = drop2_idx + 1 # Skip
break
return patterns
# ========== FEATURE EXTRACTOR ==========
def extract_features_full(
df: pd.DataFrame,
structure: str,
order_blocks: List[Dict[str, Any]],
fvg_zones: List[Dict[str, Any]],
patterns: List[Dict[str, Any]],
boundary: Dict[str, Any],
pivot: Dict[str, float]
) -> np.ndarray:
"""Keluarkan vektor fitur komprehensif untuk AI/ML."""
last_price = df['close'].iloc[-1]
features = [
last_price,
calculate_atr_dynamic(df),
min([ob['distance'] for ob in order_blocks]) if order_blocks else 0,
max([ob['strength'] for ob in order_blocks]) if order_blocks else 0,
min([fvg['distance'] for fvg in fvg_zones]) if fvg_zones else 0,
max([fvg['strength'] for fvg in fvg_zones]) if fvg_zones else 0,
int(any(p['type'].startswith('ENGULFING') for p in patterns)),
boundary.get('distance_to_high', 0),
boundary.get('distance_to_low', 0),
pivot.get('r1', 0) - last_price,
pivot.get('s1', 0) - last_price,
# Tambah fitur lain sesuai kebutuhan...
]
return np.array(features, dtype=float)
# ========== TARGET GENERATOR (LABEL OTOMATIS) ==========
def generate_label_fvg(
df: pd.DataFrame, fvg_zones: List[Dict[str, Any]], horizon: int = 10, pip_thresh: float = 0.001
) -> List[int]:
"""
Label 1 jika X bar setelah FVG harga naik/turun >= pip_thresh, else 0.
"""
label = [0] * len(df)
for fvg in fvg_zones:
idx = df[df['time'] == fvg['time']].index[0]
base_price = (fvg['start'] + fvg['end']) / 2
future_idx = min(idx + horizon, len(df) - 1)
if 'BULLISH' in fvg['type']:
if df['high'].iloc[future_idx] - base_price >= pip_thresh:
label[idx] = 1
elif 'BEARISH' in fvg['type']:
if base_price - df['low'].iloc[future_idx] >= pip_thresh:
label[idx] = 1
return label
# ========== ZONE PLOTTING & EVENT LOGGING ==========
def plot_zones(df, order_blocks, fvg_zones, patterns=None):
plt.figure(figsize=(15, 7))
plt.plot(df['time'], df['close'], label='Close Price', color='black')
for ob in order_blocks:
plt.axhspan(ob['low'], ob['high'], color='green' if 'BULL' in ob['type'] else 'red', alpha=0.25)
for fvg in fvg_zones:
plt.axhspan(fvg['start'], fvg['end'], color='blue' if 'BULL' in fvg['type'] else 'orange', alpha=0.15)
if patterns:
for p in patterns:
idx = p['idx']
marker = '^' if 'BULL' in p['type'] else 'v'
plt.scatter(df['time'].iloc[idx], df['close'].iloc[idx], marker=marker, color='purple')
plt.legend()
plt.title("Price with OB/FVG/Pattern Zones")
plt.show()
def log_zone_events(order_blocks, fvg_zones, patterns):
"""Log bar yang trigger event, useful untuk AI/analitik."""
event_log = []
for ob in order_blocks:
event_log.append({'time': ob['time'], 'price': (ob['high'] + ob['low']) / 2, 'type': ob['type']})
for fvg in fvg_zones:
event_log.append({'time': fvg['time'], 'price': (fvg['start'] + fvg['end']) / 2, 'type': fvg['type']})
for p in patterns:
event_log.append({'time': p['time'], 'price': None, 'type': p['type']})
event_log = sorted(event_log, key=lambda x: x['time'])
return event_log
# ========== EQH/EQL & LIQUIDITY SWEEP ==========
def detect_eqh_eql(
df: pd.DataFrame, window: int = 10, tolerance: float = 0.0003
) -> Tuple[List[Dict[str, float]], List[Dict[str, float]]]:
if len(df) < window:
return [], []
eqh, eql = [], []
for i in range(window, len(df)):
hi = df['high'].iloc[i - window:i]
lo = df['low'].iloc[i - window:i]
if (hi.max() - hi.min()) < (hi.mean() * tolerance):
eqh.append({'time': df['time'].iloc[i], 'value': float(hi.mean())})
if (lo.max() - lo.min()) < (lo.mean() * tolerance):
eql.append({'time': df['time'].iloc[i], 'value': float(lo.mean())})
return eqh, eql
def detect_liquidity_sweep(
df: pd.DataFrame, min_candles_for_swing: int = 5, require_confirmation_candle: bool = True
) -> List[Dict[str, Any]]:
"""
Mendeteksi Liquidity Sweep (LS) dengan konfirmasi candle berikutnya.
Logika diperbaiki untuk efisiensi dan akurasi.
"""
liquidity_sweeps = []
if len(df) < min_candles_for_swing * 2 + 1: # Butuh satu ekstra untuk konfirmasi
return liquidity_sweeps
df = df.copy()
df['body'] = abs(df['close'] - df['open'])
avg_body = df['body'].rolling(20).mean()
df['is_swing_high'] = (df['high'].rolling(min_candles_for_swing, center=True).max() == df['high'])
df['is_swing_low'] = (df['low'].rolling(min_candles_for_swing, center=True).min() == df['low'])
swing_highs_idx = df[df['is_swing_high']].index.tolist()
swing_lows_idx = df[df['is_swing_low']].index.tolist()
start_iter_idx = max(0, len(df) - 50)
for i in range(start_iter_idx, len(df) - 1):
current_candle = df.iloc[i]
# Cek Bullish Sweep
relevant_swing_lows = [sl for sl in swing_lows_idx if sl < i]
if relevant_swing_lows:
prev_swing_low_idx = max(relevant_swing_lows)
prev_swing_low_val = df['low'].loc[prev_swing_low_idx]
if current_candle['low'] < prev_swing_low_val and current_candle['close'] > prev_swing_low_val:
if not require_confirmation_candle:
is_confirmed = True
else:
confirmation_candle = df.iloc[i + 1]
is_confirmed = confirmation_candle['close'] > confirmation_candle['open'] and \
confirmation_candle['body'] > avg_body.iloc[i + 1] * 0.8
if is_confirmed:
strength = (current_candle['close'] - prev_swing_low_val)
strength /= calculate_atr_dynamic(df.loc[:current_candle.name])
liquidity_sweeps.append({
'type': 'BULLISH_LS',
'swept_level': float(prev_swing_low_val),
'sweep_time': current_candle['time'],
'strength': strength
})
# Cek Bearish Sweep
relevant_swing_highs = [sh for sh in swing_highs_idx if sh < i]
if relevant_swing_highs:
prev_swing_high_idx = max(relevant_swing_highs)
prev_swing_high_val = df['high'].loc[prev_swing_high_idx]
if current_candle['high'] > prev_swing_high_val and current_candle['close'] < prev_swing_high_val:
if not require_confirmation_candle:
is_confirmed = True
else:
confirmation_candle = df.iloc[i + 1]
is_confirmed = confirmation_candle['close'] < confirmation_candle['open'] and \
confirmation_candle['body'] > avg_body.iloc[i + 1] * 0.8
if is_confirmed:
strength = (prev_swing_high_val - current_candle['close'])
strength /= calculate_atr_dynamic(df.loc[:current_candle.name])
liquidity_sweeps.append({
'type': 'BEARISH_LS',
'swept_level': float(prev_swing_high_val),
'sweep_time': current_candle['time'],
'strength': strength
})
unique_sweeps = {s['sweep_time']: s for s in liquidity_sweeps}.values()
return list(unique_sweeps)
# ========== LIQUIDITY GRAB (SWEEP ON EQH/EQL) ==========
def detect_liquidity_grab(
df: pd.DataFrame, window: int = 20, tolerance: float = 0.0003, confirmation_wick_ratio: float = 0.5
) -> List[Dict[str, Any]]:
"""
Mendeteksi sweep pada liquidity pool (EQH/EQL).
Ini adalah sinyal probabilitas tinggi karena menargetkan area likuiditas yang jelas.
"""
grabs = []
if len(df) < window + 5: # Butuh beberapa candle setelah window
return grabs
eqh_pools, eql_pools = detect_eqh_eql(df, window=window, tolerance=tolerance)
# Hanya proses pool terbaru untuk efisiensi
recent_eql = eql_pools[-1] if eql_pools else None
recent_eqh = eqh_pools[-1] if eqh_pools else None
# Cek Bullish Liquidity Grab (sweep EQL)
if recent_eql:
pool_level = recent_eql['value']
pool_time = recent_eql['time']
# Cari candle yang melakukan sweep setelah pool terbentuk
candles_after_pool = df[df['time'] > pool_time]
for i in range(len(candles_after_pool)):
sweep_candle = candles_after_pool.iloc[i]
# 1. Harga harus turun di bawah pool
# 2. Harga harus ditutup kembali di atas pool (konfirmasi)
if sweep_candle['low'] < pool_level and sweep_candle['close'] > pool_level:
# 3. Konfirmasi tambahan: wick bawah harus signifikan
lower_wick = sweep_candle['open'] - sweep_candle['low'] \
if sweep_candle['open'] > sweep_candle['close'] else sweep_candle['close'] - sweep_candle['low']
if lower_wick > (sweep_candle['high'] - sweep_candle['low']) * confirmation_wick_ratio:
strength = lower_wick / calculate_atr_dynamic(df.loc[:sweep_candle.name])
grabs.append({
'type': 'LIQUIDITY_GRAB_BULL',
'swept_level': float(pool_level),
'sweep_time': sweep_candle['time'],
'strength': strength # Kekuatan berdasarkan penolakan
})
break # Hanya ambil grab pertama setelah pool
# Cek Bearish Liquidity Grab (sweep EQH)
if recent_eqh:
pool_level = recent_eqh['value']
pool_time = recent_eqh['time']
candles_after_pool = df[df['time'] > pool_time]
for i in range(len(candles_after_pool)):
sweep_candle = candles_after_pool.iloc[i]
# 1. Harga harus naik di atas pool
# 2. Harga harus ditutup kembali di bawah pool
if sweep_candle['high'] > pool_level and sweep_candle['close'] < pool_level:
# 3. Konfirmasi tambahan: wick atas harus signifikan
upper_wick = sweep_candle['high'] - sweep_candle['close'] \
if sweep_candle['open'] > sweep_candle['close'] else sweep_candle['high'] - sweep_candle['open']
if upper_wick > (sweep_candle['high'] - sweep_candle['low']) * confirmation_wick_ratio:
strength = upper_wick / calculate_atr_dynamic(df.loc[:sweep_candle.name])
grabs.append({
'type': 'LIQUIDITY_GRAB_BEAR',
'swept_level': float(pool_level),
'sweep_time': sweep_candle['time'],
'strength': strength
})
break
return grabs
# ========== OTE ZONE ==========
def calculate_optimal_trade_entry(
swing_start_price: float, swing_end_price: float, direction: str
) -> Dict[str, float]:
fib_levels = {
'0.618': 0.618,
'0.705': 0.705,
'0.790': 0.790
}
ote_zone: Dict[str, float] = {}
high_val = max(swing_start_price, swing_end_price)
low_val = min(swing_start_price, swing_end_price)
price_range = high_val - low_val
if direction.upper() == "BUY":
ote_zone['upper'] = high_val - (price_range * fib_levels['0.618'])
ote_zone['mid'] = high_val - (price_range * fib_levels['0.705'])
ote_zone['lower'] = high_val - (price_range * fib_levels['0.790'])
elif direction.upper() == "SELL":
ote_zone['upper'] = low_val + (price_range * fib_levels['0.790'])
ote_zone['mid'] = low_val + (price_range * fib_levels['0.705'])
ote_zone['lower'] = low_val + (price_range * fib_levels['0.618'])
return ote_zone
# ========== CHANGE OF CHARACTER (CHoCH) ==========
def detect_change_of_character(df: pd.DataFrame, swing_lookback: int = 5) -> Optional[Dict[str, Any]]:
"""
Mendeteksi Change of Character (CHoCH), sinyal awal dari potensi reversal.
CHoCH terjadi ketika harga menembus swing point minor terakhir setelah sebuah
Break of Structure (BOS) terjadi.
"""
if len(df) < swing_lookback * 2 + 5: # Butuh beberapa candle ekstra
return None
_, swing_points = detect_structure(df, swing_lookback)
last_high = swing_points.get('last_high')
last_low = swing_points.get('last_low')
# Tidak bisa mendeteksi CHoCH tanpa swing point yang jelas
if not last_high or not last_low:
return None
df = df.copy()
df['is_swing_high'] = (df['high'] == df['high'].rolling(swing_lookback * 2 + 1, center=True).max())
df['is_swing_low'] = (df['low'] == df['low'].rolling(swing_lookback * 2 + 1, center=True).min())
swing_highs = df[df['is_swing_high']]
swing_lows = df[df['is_swing_low']]
if swing_highs.empty or swing_lows.empty:
return None
# Skenario Bearish CHoCH:
# Terjadi setelah Bullish BOS, lalu harga break swing low terakhir.
# 1. Cari Bullish BOS terbaru (higher high).
if len(swing_highs) >= 2:
last_sh = swing_highs.iloc[-1]
prev_sh = swing_highs.iloc[-2]
if last_sh['high'] > prev_sh['high']: # Indikasi Bullish Trend/BOS
# 2. Cari swing low yang terbentuk antara dua swing high ini.
relevant_lows = swing_lows[(swing_lows.index > prev_sh.name) & (swing_lows.index < last_sh.name)]
if not relevant_lows.empty:
choch_level = relevant_lows.iloc[-1]['low']
# 3. Cek apakah harga saat ini atau beberapa candle terakhir break di bawah level itu.
recent_candles = df.iloc[-3:]
if (recent_candles['close'] < choch_level).any():
return {'type': 'BEARISH_CHoCH', 'price': choch_level, 'time': df.iloc[-1]['time']}
# Skenario Bullish CHoCH:
# Terjadi setelah Bearish BOS, lalu harga break swing high terakhir.
# 1. Cari Bearish BOS terbaru (lower low).
if len(swing_lows) >= 2:
last_sl = swing_lows.iloc[-1]
prev_sl = swing_lows.iloc[-2]
if last_sl['low'] < prev_sl['low']: # PERBAIKAN: prev_sh -> prev_sl
# 2. Cari swing high yang terbentuk antara dua swing low ini.
relevant_highs = swing_highs[(swing_highs.index > prev_sl.name) & (swing_highs.index < last_sl.name)]
if not relevant_highs.empty:
choch_level = relevant_highs.iloc[-1]['high']
# 3. Cek apakah harga saat ini atau beberapa candle terakhir break di atas level itu.
recent_candles = df.iloc[-3:]
if (recent_candles['close'] > choch_level).any():
return {'type': 'BULLISH_CHoCH', 'price': choch_level, 'time': df.iloc[-1]['time']}
return None
# ========== RSI & DIVERGENCE ==========
def calculate_rsi(prices: pd.Series, period: int = 14) -> pd.Series:
"""
Menghitung Relative Strength Index (RSI).
"""
if len(prices) < period + 1:
return pd.Series([np.nan] * len(prices), index=prices.index)
delta = prices.diff()
gain = (delta.where(delta > 0, 0)).ewm(alpha=1/period, adjust=False).mean()
loss = (-delta.where(delta < 0, 0)).ewm(alpha=1/period, adjust=False).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
return rsi
def detect_regular_divergence(df: pd.DataFrame, swing_lookback: int = 5) -> List[Dict[str, Any]]:
"""
Mendeteksi divergensi reguler (sinyal reversal) antara harga dan RSI.
"""
divergences = []
if 'rsi' not in df.columns or df['rsi'].isnull().all():
return divergences
df['price_swing_high'] = (df['high'] == df['high'].rolling(swing_lookback * 2 + 1, center=True).max())
df['price_swing_low'] = (df['low'] == df['low'].rolling(swing_lookback * 2 + 1, center=True).min())
df['rsi_swing_high'] = (df['rsi'] == df['rsi'].rolling(swing_lookback * 2 + 1, center=True).max())
df['rsi_swing_low'] = (df['rsi'] == df['rsi'].rolling(swing_lookback * 2 + 1, center=True).min())
price_lows = df[df['price_swing_low']]
price_highs = df[df['price_swing_high']]
rsi_lows = df[df['rsi_swing_low']]
rsi_highs = df[df['rsi_swing_high']]
# Regular Bullish Divergence: Lower Low di Harga, Higher Low di RSI
if len(price_lows) >= 2 and len(rsi_lows) >= 2:
last_price_low = price_lows.iloc[-1]
prev_price_low = price_lows.iloc[-2]
if last_price_low['low'] < prev_price_low['low']:
# Cari RSI low yang relevan
relevant_rsi_lows = rsi_lows[(rsi_lows.index >= prev_price_low.name) & (rsi_lows.index <= last_price_low.name)]
if len(relevant_rsi_lows) >= 2:
if relevant_rsi_lows.iloc[-1]['rsi'] > relevant_rsi_lows.iloc[0]['rsi']:
divergences.append({'type': 'REGULAR_BULLISH_DIVERGENCE', 'time': last_price_low['time']})
# Regular Bearish Divergence: Higher High di Harga, Lower High di RSI
if len(price_highs) >= 2 and len(rsi_highs) >= 2:
last_price_high = price_highs.iloc[-1]
prev_price_high = price_highs.iloc[-2]
if last_price_high['high'] > prev_price_high['high']:
# Cari RSI high yang relevan
relevant_rsi_highs = rsi_highs[(rsi_highs.index >= prev_price_high.name) & (rsi_highs.index <= last_price_high.name)]
if len(relevant_rsi_highs) >= 2:
if relevant_rsi_highs.iloc[-1]['rsi'] < relevant_rsi_highs.iloc[0]['rsi']:
divergences.append({'type': 'REGULAR_BEARISH_DIVERGENCE', 'time': last_price_high['time']})
return divergences
def detect_hidden_divergence(df: pd.DataFrame, swing_lookback: int = 5) -> List[Dict[str, Any]]:
"""
Mendeteksi divergensi tersembunyi (sinyal continuation) antara harga dan RSI.
"""
divergences = []
if 'rsi' not in df.columns or df['rsi'].isnull().all():
return divergences
df['price_swing_high'] = (df['high'] == df['high'].rolling(swing_lookback * 2 + 1, center=True).max())
df['price_swing_low'] = (df['low'] == df['low'].rolling(swing_lookback * 2 + 1, center=True).min())
df['rsi_swing_high'] = (df['rsi'] == df['rsi'].rolling(swing_lookback * 2 + 1, center=True).max())
df['rsi_swing_low'] = (df['rsi'] == df['rsi'].rolling(swing_lookback * 2 + 1, center=True).min())
price_lows = df[df['price_swing_low']]
price_highs = df[df['price_swing_high']]
rsi_lows = df[df['rsi_swing_low']]
rsi_highs = df[df['rsi_swing_high']]
# Hidden Bullish Divergence: Higher Low di Harga, Lower Low di RSI
if len(price_lows) >= 2 and len(rsi_lows) >= 2:
last_price_low = price_lows.iloc[-1]
prev_price_low = price_lows.iloc[-2]
if last_price_low['low'] > prev_price_low['low']:
relevant_rsi_lows = rsi_lows[(rsi_lows.index >= prev_price_low.name) & (rsi_lows.index <= last_price_low.name)]
if len(relevant_rsi_lows) >= 2:
if relevant_rsi_lows.iloc[-1]['rsi'] < relevant_rsi_lows.iloc[0]['rsi']:
divergences.append({'type': 'HIDDEN_BULLISH_DIVERGENCE', 'time': last_price_low['time']})
# Hidden Bearish Divergence: Lower High di Harga, Higher High di RSI
if len(price_highs) >= 2 and len(rsi_highs) >= 2:
last_price_high = price_highs.iloc[-1]
prev_price_high = price_highs.iloc[-2]
if last_price_high['high'] < prev_price_high['high']:
relevant_rsi_highs = rsi_highs[(rsi_highs.index >= prev_price_high.name) & (rsi_highs.index <= last_price_high.name)]
if len(relevant_rsi_highs) >= 2:
if relevant_rsi_highs.iloc[-1]['rsi'] > relevant_rsi_highs.iloc[0]['rsi']:
divergences.append({'type': 'HIDDEN_BEARISH_DIVERGENCE', 'time': last_price_high['time']})
return divergences
# ========== ANALISIS MULTI-TIMEFRAME ==========
def analyze_mtf_trend(timeframes_data: Dict[str, pd.DataFrame]) -> Dict[str, str]:
"""Analisis trend di multiple timeframe."""
trend_analysis = {}
for tf, df in timeframes_data.items():
structure, _ = detect_structure(df)
if "BULLISH_BOS" in structure or "HH/HL" in structure:
trend_analysis[tf] = "BULLISH"
elif "BEARISH_BOS" in structure or "LL/LH" in structure:
trend_analysis[tf] = "BEARISH"
else:
trend_analysis[tf] = "NEUTRAL"
return trend_analysis
def detect_mtf_divergence(timeframes_data: Dict[str, pd.DataFrame]) -> Dict[str, List[Dict[str, Any]]]:
"""Deteksi divergence di multiple timeframe."""
divergences = {}
for tf, df in timeframes_data.items():
df = df.copy()
df['rsi'] = calculate_rsi(df['close'])
# Deteksi regular divergence
reg_div = detect_regular_divergence(df)
# Deteksi hidden divergence
hidden_div = detect_hidden_divergence(df)
divergences[tf] = reg_div + hidden_div
return divergences
def analyze_volatility(df: pd.DataFrame, window: int = 20) -> Dict[str, float]:
"""Analisis volatilitas untuk penyesuaian parameter."""
atr = calculate_atr_dynamic(df)
avg_range = df['high'].sub(df['low']).rolling(window).mean().iloc[-1]
return {
'atr': atr,
'avg_range': avg_range,
'volatility_ratio': atr / avg_range if avg_range > 0 else 1.0
}
def adjust_trade_parameters(
volatility: Dict[str, float],
base_sl_pips: float,
base_tp_pips: float
) -> Tuple[float, float]:
"""Sesuaikan SL/TP berdasarkan volatilitas."""
vol_ratio = volatility['volatility_ratio']
adjusted_sl = base_sl_pips * (1 + (vol_ratio - 1) * 0.5)
adjusted_tp = base_tp_pips * (1 + (vol_ratio - 1) * 0.5)
return adjusted_sl, adjusted_tp
# ========== TRENDLINE DETECTION ==========
def detect_trendline_breaks(
df: pd.DataFrame, length: int = 14, mult: float = 1.0, calc_method: str = 'ATR'
) -> Dict[str, Any]:
"""
Implementasi LuxAlgo Trendline dengan deteksi breakout
"""
df = df.copy()
# Deteksi pivot points
df['ph'] = df['high'].rolling(window=length * 2 + 1, center=True).apply(
lambda x: x[length] == max(x), raw=True
)
df['pl'] = df['low'].rolling(window=length * 2 + 1, center=True).apply(
lambda x: x[length] == min(x), raw=True
)
# Kalkulasi slope berdasarkan metode
if calc_method == 'ATR':
slope = df['high'].sub(df['low']).rolling(length).mean() / length * mult
elif calc_method == 'STDEV':
slope = df['close'].rolling(length).std() / length * mult
else: # LINREG
x = np.arange(length)
slope = df['close'].rolling(length).apply(
lambda y: np.polyfit(x, y, 1)[0] if len(y) > 1 else np.nan
) * mult
# Inisialisasi trendlines
upper = pd.Series(index=df.index, dtype=float)
lower = pd.Series(index=df.index, dtype=float)
last_ph = None
last_pl = None
last_ph_slope = 0
last_pl_slope = 0
# Konstruksi trendlines
for i in range(length, len(df)):
if df['ph'].iloc[i]:
last_ph = df['high'].iloc[i]
last_ph_slope = slope.iloc[i]
elif last_ph is not None:
upper.iloc[i] = last_ph - last_ph_slope * (i - df.index[df['ph']].max())
if df['pl'].iloc[i]:
last_pl = df['low'].iloc[i]
last_pl_slope = slope.iloc[i]
elif last_pl is not None:
lower.iloc[i] = last_pl + last_pl_slope * (i - df.index[df['pl']].max())
# Deteksi breakouts
breaks = {
'upper_break': False,
'lower_break': False,
'upper_line': upper.iloc[-1],
'lower_line': lower.iloc[-1]
}
if len(df) > 1:
current_close = df['close'].iloc[-1]
prev_close = df['close'].iloc[-2]
# Upper break (bullish)
if prev_close <= upper.iloc[-2] and current_close > upper.iloc[-1]:
breaks['upper_break'] = True
# Lower break (bearish)
if prev_close >= lower.iloc[-2] and current_close < lower.iloc[-1]:
breaks['lower_break'] = True
return breaks
# ========== ANALISIS OPPORTUNITY TF ==========
def analyze_tf_opportunity(
df: pd.DataFrame,
symbol: str,
tf: str,
mt5_path: str,
start_date: str,
end_date: str,
profile_type: str = "intraday", # ["scalping", "intraday", "swing"]
atr_length: int = 14,
order_block_lookback: int = 15,
fvg_min_gap: float = 0.0002,
fvg_max_age: int = 20,
pattern_body_threshold: float = 1.2,
volume_window: int = 20,
volume_threshold: float = 2.0,
trendline_length: int = 14,
trendline_mult: float = 1.0,
calc_method: str = 'ATR',
weights: Optional[Dict[str, float]] = None
) -> Optional[Dict[str, Any]]:
"""
Analisis peluang trading berdasarkan profil yang dipilih (scalping/intraday/swing).
"""
# Validasi profile type
if profile_type not in ["scalping", "intraday", "swing"]:
raise ValueError("profile_type harus salah satu dari: scalping, intraday, atau swing")
# Inisialisasi bobot jika None
weights = weights or {}
# Inisialisasi parameter profil
min_score_threshold = 5.0 # Nilai default
# Sesuaikan parameter berdasarkan profil
if profile_type == "scalping":
order_block_lookback = 10
fvg_max_age = 10
pattern_body_threshold = 1.0
volume_threshold = 1.5
min_score_threshold = 3.0
elif profile_type == "intraday":
order_block_lookback = 15
fvg_max_age = 20
pattern_body_threshold = 1.2
volume_threshold = 2.0
min_score_threshold = 5.0
elif profile_type == "swing":
order_block_lookback = 30
fvg_max_age = 40
pattern_body_threshold = 1.5
volume_threshold = 2.5
min_score_threshold = 7.0
# Inisialisasi scoring
score = 0.0
score_components = {}
info_list = []
# Deteksi struktur pasar
structure, swing_points = detect_structure(df)
# Analisis Order Blocks
order_blocks = detect_order_blocks_multi(
df,
lookback=order_block_lookback,
structure_filter=None,
max_age=fvg_max_age
)
# Analisis Fair Value Gaps
fvg_zones = detect_fvg_multi(
df,
min_gap=fvg_min_gap,
max_age=fvg_max_age
)
# Deteksi pattern berdasarkan profil
patterns = []
if profile_type in ["scalping", "intraday"]:
patterns.extend(detect_pinbar(df, min_ratio=pattern_body_threshold))
patterns.extend(detect_engulfing(df))
if profile_type in ["intraday", "swing"]:
patterns.extend(detect_continuation_patterns(df, body_threshold=pattern_body_threshold))
# Analisis volume khusus untuk scalping dan intraday
volume_analysis = None
if profile_type in ["scalping", "intraday"]:
volume_spikes = detect_volume_spike(df, window=volume_window, threshold=volume_threshold)
if volume_spikes:
volume_analysis = volume_spikes[-1] # Ambil spike terakhir
# Analisis trendline dengan sensitivitas berbeda per profil
trendline_breaks = detect_trendline_breaks(
df,
length=trendline_length,
mult=trendline_mult,
calc_method=calc_method
)
# Scoring berdasarkan profil
# Structure scoring
if "BULLISH_BOS" in structure:
structure_score = weights.get('BULLISH_BOS', 3.0)
if profile_type == "swing":
structure_score *= 1.5 # Lebih penting untuk swing
score += structure_score
score_components['structure'] = structure_score
info_list.append(f"Bullish BOS detected ({profile_type} profile)")
elif "BEARISH_BOS" in structure:
structure_score = weights.get('BEARISH_BOS', -3.0)
if profile_type == "swing":
structure_score *= 1.5
score += structure_score
score_components['structure'] = structure_score
info_list.append(f"Bearish BOS detected ({profile_type} profile)")
# Order Block scoring
for ob in order_blocks[:2]: # Consider top 2 closest OBs
ob_score = weights.get(f"{ob['type']}", 1.0)
if profile_type == "scalping":
ob_score *= 0.7 # Less important for scalping
elif profile_type == "swing":
ob_score *= 1.3 # More important for swing
score += ob_score
score_components['order_block'] = ob_score
info_list.append(f"Order Block: {ob['type']} detected")
# Pattern scoring berdasarkan profil
for pattern in patterns:
pattern_score = weights.get(pattern['type'], 1.0)
if profile_type == "scalping":
pattern_score *= 1.3 # More important for scalping
elif profile_type == "swing":
pattern_score *= 0.7 # Less important for swing
score += pattern_score
score_components['pattern'] = pattern_score
info_list.append(f"Pattern: {pattern['type']} detected")
# Volume analysis scoring (terutama untuk scalping/intraday)
if volume_analysis and profile_type in ["scalping", "intraday"]:
volume_score = weights.get('VOLUME_SPIKE', 1.0)
score += volume_score
score_components['volume'] = volume_score
info_list.append("Volume spike detected")
# Trendline break scoring
if trendline_breaks['upper_break']:
trendline_score = weights.get('TRENDLINE_BREAK_BULL', 2.0)
if profile_type == "swing":
trendline_score *= 1.2
score += trendline_score
score_components['trendline'] = trendline_score
info_list.append(f"Bullish trendline break ({profile_type} profile)")
elif trendline_breaks['lower_break']:
trendline_score = weights.get('TRENDLINE_BREAK_BEAR', -2.0)
if profile_type == "swing":
trendline_score *= 1.2
score += trendline_score
score_components['trendline'] = trendline_score
info_list.append(f"Bearish trendline break ({profile_type} profile)")
# Check if score meets minimum threshold for the profile
if abs(score) < min_score_threshold:
return None
return {
'symbol': symbol,
'timeframe': tf,
'profile_type': profile_type,
'score': score,
'score_components': score_components,
'structure': structure,
'order_blocks': order_blocks,
'fvg_zones': fvg_zones,
'patterns': patterns,
'trendline_breaks': trendline_breaks,
'volume_analysis': volume_analysis,
'info': info_list
}
def detect_smc_structure(df: pd.DataFrame, length: int = 14) -> Dict[str, Any]:
"""
Deteksi struktur Smart Money Concepts (SMC)
"""
df = df.copy()
result = {
'internal_structure': {'bullish_bos': False, 'bearish_bos': False,
'bullish_choch': False, 'bearish_choch': False},
'swing_structure': {'bullish_bos': False, 'bearish_bos': False,
'bullish_choch': False, 'bearish_choch': False},
'order_blocks': [],
'fair_value_gaps': [],
'equal_levels': {'highs': [], 'lows': []}
}
# Deteksi Internal Structure
df['internal_high'] = df['high'].rolling(5).max() == df['high']
df['internal_low'] = df['low'].rolling(5).min() == df['low']
# Deteksi Break of Structure (BOS)
for i in range(5, len(df) - 1):
# Internal BOS
if df['close'].iloc[i] > df['high'].iloc[i - 1] and df['internal_high'].iloc[i - 1]:
result['internal_structure']['bullish_bos'] = True
elif df['close'].iloc[i] < df['low'].iloc[i - 1] and df['internal_low'].iloc[i - 1]:
result['internal_structure']['bearish_bos'] = True
# Swing BOS dengan konfirmasi
if i >= 14:
if df['close'].iloc[i] > df['high'].iloc[i - 14:i].max():
result['swing_structure']['bullish_bos'] = True
elif df['close'].iloc[i] < df['low'].iloc[i - 14:i].min():
result['swing_structure']['bearish_bos'] = True
# Deteksi Change of Character (CHoCH)
if len(df) > 20:
last_swing = "none"
for i in range(20, len(df)):
if df['high'].iloc[i] > df['high'].iloc[i - 20:i].max():
if last_swing == "low":
result['swing_structure']['bullish_choch'] = True
last_swing = "high"
elif df['low'].iloc[i] < df['low'].iloc[i - 20:i].min():
if last_swing == "high":
result['swing_structure']['bearish_choch'] = True
last_swing = "low"
# Deteksi Order Blocks
atr = calculate_atr_dynamic(df)
for i in range(len(df) - 3, len(df)):
candle_size = df['high'].iloc[i] - df['low'].iloc[i]
if candle_size > 1.5 * atr: # Significant candle
if df['close'].iloc[i] < df['open'].iloc[i]: # Bearish OB
result['order_blocks'].append({
'type': 'bearish',
'high': df['high'].iloc[i],
'low': df['low'].iloc[i],
'time': df.index[i]
})
elif df['close'].iloc[i] > df['open'].iloc[i]: # Bullish OB
result['order_blocks'].append({
'type': 'bullish',
'high': df['high'].iloc[i],
'low': df['low'].iloc[i],
'time': df.index[i]
})
# Deteksi Fair Value Gaps
for i in range(1, len(df) - 1):
if df['low'].iloc[i + 1] > df['high'].iloc[i - 1]: # Bullish FVG
result['fair_value_gaps'].append({
'type': 'bullish',
'top': df['low'].iloc[i + 1],
'bottom': df['high'].iloc[i - 1],
'time': df.index[i]
})
elif df['high'].iloc[i + 1] < df['low'].iloc[i - 1]: # Bearish FVG
result['fair_value_gaps'].append({
'type': 'bearish',
'top': df['low'].iloc[i - 1],
'bottom': df['high'].iloc[i + 1],
'time': df.index[i]
})
return result
def adapt_weights_to_context(weights: Dict[str, float], context: Dict[str, Any]) -> Dict[str, float]:
"""
Adaptasi bobot berdasarkan kondisi market.
"""
adapted = weights.copy()
# Sesuaikan berdasarkan fase trend
if context['trend_phase'] == 'RANGING':
# Kurangi bobot breakout, tingkatkan bobot reversal
for key in adapted:
if 'BREAKOUT' in key or 'BOS' in key:
adapted[key] *= 0.7
if 'REVERSAL' in key or 'CHoCH' in key:
adapted[key] *= 1.3
elif context['trend_phase'] == 'TRENDING':
# Tingkatkan bobot breakout & continuation
for key in adapted:
if 'BREAKOUT' in key or 'BOS' in key or 'CONTINUATION' in key:
adapted[key] *= 1.3
if 'REVERSAL' in key or 'CHoCH' in key:
adapted[key] *= 0.7
# Sesuaikan berdasarkan volatilitas
vol_ratio = context.get('volatility', {}).get('volatility_ratio', 1.0)
if vol_ratio > 1.5: # High volatility
# Kurangi bobot pattern, tingkatkan bobot struktur
for key in adapted:
if 'PATTERN' in key or 'ENGULFING' in key or 'PINBAR' in key:
adapted[key] *= 0.8
if 'STRUCTURE' in key or 'BOS' in key:
adapted[key] *= 1.2
elif vol_ratio < 0.5: # Low volatility
# Tingkatkan bobot pattern
for key in adapted:
if 'PATTERN' in key or 'ENGULFING' in key or 'PINBAR' in key:
adapted[key] *= 1.2
# Sesuaikan berdasarkan kondisi likuiditas
liquidity = context.get('liquidity_conditions', {})
if liquidity.get('average_spread', 0) > 2.0: # Spread tinggi
# Kurangi semua bobot karena kondisi kurang ideal
for key in adapted:
adapted[key] *= 0.8
return adapted
def validate_scalping_opportunity(
df: pd.DataFrame, profile_settings: Dict[str, Any]
) -> Tuple[bool, float, List[str]]:
"""
Validasi khusus untuk scalping dengan kriteria yang lebih ketat
"""
validations = []
confidence_boost = 0
# 1. Analisis Volume
volume_ma = df['tick_volume'].rolling(20).mean()
current_volume = df['tick_volume'].iloc[-1]
volume_ratio = current_volume / volume_ma.iloc[-1]
if volume_ratio >= profile_settings['entry_rules']['minimum_volume_threshold']:
validations.append(f"Volume valid ({volume_ratio:.2f}x average)")
confidence_boost += 0.5
else:
return False, 0, ["Volume tidak mencukupi"]
# 2. Analisis Volatilitas
volatility_percentile = df['high'].sub(df['low']).rolling(50).mean().rank(pct=True).iloc[-1] * 100
if (profile_settings['entry_rules']['minimum_volatility_percentile'] <=
volatility_percentile <=
profile_settings['entry_rules']['maximum_volatility_percentile']):
validations.append(f"Volatilitas optimal ({volatility_percentile:.1f}%)")
confidence_boost += 0.5
else:
return False, 0, ["Volatilitas di luar range optimal"]
# 3. Momentum Check
rsi = calculate_rsi(df['close'])
last_rsi = rsi.iloc[-1]
if 20 <= last_rsi <= 80: # Avoid extreme conditions
momentum_aligned = (last_rsi > 50 and df['close'].iloc[-1] > df['close'].iloc[-2]) or \
(last_rsi < 50 and df['close'].iloc[-1] < df['close'].iloc[-2])
if momentum_aligned:
validations.append("Momentum aligned")
confidence_boost += 0.7
# 4. Pattern Quality Check
if profile_settings['advanced_filters']['pattern_quality_threshold']:
pattern_quality = analyze_pattern_quality(df)
if pattern_quality >= profile_settings['advanced_filters']['pattern_quality_threshold']:
validations.append(f"Pattern quality good ({pattern_quality:.2f})")
confidence_boost += 0.8
else:
return False, 0, ["Pattern quality insufficient"]
# 5. Price Action Clarity
price_clarity = analyze_price_action_clarity(df)
if price_clarity >= 0.7:
validations.append(f"Clear price action ({price_clarity:.2f})")
confidence_boost += 0.5
# 6. Reward Ratio Check
if 'minimum_reward_ratio' in profile_settings['advanced_filters']:
nearest_resistance = find_nearest_resistance(df)
nearest_support = find_nearest_support(df)
current_price = df['close'].iloc[-1]
potential_reward = abs(nearest_resistance - current_price)
potential_risk = abs(current_price - nearest_support)
if potential_risk > 0:
reward_ratio = potential_reward / potential_risk
if reward_ratio >= profile_settings['advanced_filters']['minimum_reward_ratio']:
validations.append(f"Good reward ratio ({reward_ratio:.2f})")
confidence_boost += 0.6
else:
return False, 0, ["Insufficient reward ratio"]
return len(validations) >= 4, confidence_boost, validations
def analyze_pattern_quality(df: pd.DataFrame) -> float:
"""Analisis kualitas pattern untuk scalping"""
quality_score = 0.0
last_candle = df.iloc[-1]
prev_candle = df.iloc[-2]
# 1. Body to Wick Ratio
body = abs(last_candle['close'] - last_candle['open'])
upper_wick = last_candle['high'] - max(last_candle['open'], last_candle['close'])
lower_wick = min(last_candle['open'], last_candle['close']) - last_candle['low']
if body > (upper_wick + lower_wick):
quality_score += 0.4 # Strong conviction
# 2. Clean Break of Previous Structure
if last_candle['close'] > prev_candle['high'] or last_candle['close'] < prev_candle['low']:
quality_score += 0.3
# 3. Volume Confirmation
if last_candle['tick_volume'] > df['tick_volume'].rolling(20).mean().iloc[-1]:
quality_score += 0.3
return quality_score
def analyze_price_action_clarity(df: pd.DataFrame) -> float:
"""Analisis kejelasan price action untuk scalping"""
clarity_score = 0.0
recent_candles = df.tail(5)
# 1. Consistency in Direction
closes = recent_candles['close']
if (closes.diff().dropna() > 0).all() or (closes.diff().dropna() < 0).all():
clarity_score += 0.4
# 2. Clean Candle Formation
for _, candle in recent_candles.iterrows():
body = abs(candle['close'] - candle['open'])
total_range = candle['high'] - candle['low']
if total_range > 0 and body / total_range > 0.6:
clarity_score += 0.1 # Max 0.5 from this
# 3. Minimal Overlap
for i in range(1, len(recent_candles)):
curr = recent_candles.iloc[i]
prev = recent_candles.iloc[i - 1]
if (curr['high'] > prev['high'] and curr['low'] > prev['low']) or \
(curr['high'] < prev['high'] and curr['low'] < prev['low']):
clarity_score += 0.1 # Max 0.4 from this
return min(clarity_score, 1.0)
def find_nearest_support(df: pd.DataFrame) -> float:
"""Mencari level support terdekat"""
recent_lows = df['low'].rolling(window=20).min()
current_price = df['close'].iloc[-1]
support_levels = recent_lows[recent_lows < current_price].unique()
if len(support_levels) > 0:
return max(support_levels) # Highest support below current price
return current_price - calculate_atr_dynamic(df)
def find_nearest_resistance(df: pd.DataFrame) -> float:
"""Mencari level resistance terdekat"""
recent_highs = df['high'].rolling(window=20).max()
current_price = df['close'].iloc[-1]
resistance_levels = recent_highs[recent_highs > current_price].unique()
if len(resistance_levels) > 0:
return min(resistance_levels) # Lowest resistance above current price
return current_price + calculate_atr_dynamic(df)