""" technical_indicators.py (refactor advanced) =========================================== Versi refaktor: multi-zone detection, scoring, pattern, boundary, pivot, feature extractor, zone plotting & event logging. """ import pandas as pd import numpy as np from typing import List, Dict, Any, Optional, Tuple import matplotlib.pyplot as plt # ========== ZONE & STRUCTURE ========== def calculate_atr_dynamic(df: pd.DataFrame, period: int = 14) -> float: if len(df) < period + 1: return 0.0 tr = pd.concat([ df['high'] - df['low'], abs(df['high'] - df['close'].shift()), abs(df['low'] - df['close'].shift()) ], axis=1).max(axis=1) atr_val = tr.ewm(span=period, adjust=False).mean().iloc[-1] return float(atr_val) def detect_structure(df: pd.DataFrame, swing_lookback: int = 5) -> Tuple[str, Dict[str, Any]]: if len(df) < swing_lookback * 2: return "INDECISIVE", {} df = df.copy() # Deteksi Swing Highs and Lows yang lebih robust df['is_swing_high'] = (df['high'] == df['high'].rolling(swing_lookback * 2 + 1, center=True).max()) df['is_swing_low'] = (df['low'] == df['low'].rolling(swing_lookback * 2 + 1, center=True).min()) swing_highs = df[df['is_swing_high']] swing_lows = df[df['is_swing_low']] res = [] last_swing_high_price = None last_swing_low_price = None last_swing_high_idx = None last_swing_low_idx = None # Urutkan swing points berdasarkan waktu (index) if not swing_highs.empty: last_sh = swing_highs.iloc[-1] last_swing_high_price = last_sh['high'] last_swing_high_idx = last_sh.name if len(swing_highs) > 1: prev_sh = swing_highs.iloc[-2] if last_sh['high'] > prev_sh['high']: res.append('HH') else: res.append('LH') if not swing_lows.empty: last_sl = swing_lows.iloc[-1] last_swing_low_price = last_sl['low'] last_swing_low_idx = last_sl.name if len(swing_lows) > 1: prev_sl = swing_lows.iloc[-2] if last_sl['low'] > prev_sl['low']: res.append('HL') else: res.append('LL') # --- LOGIKA BARU UNTUK BREAK OF STRUCTURE (BOS) --- # Membutuhkan penutupan candle (candle close) untuk konfirmasi if last_swing_high_idx is not None: # Cek candle SETELAH swing high terakhir subsequent_candles = df.loc[last_swing_high_idx:].iloc[1:] confirmed_break = subsequent_candles[subsequent_candles['close'] > last_swing_high_price] if not confirmed_break.empty: res.append("BULLISH_BOS") if last_swing_low_idx is not None: # Cek candle SETELAH swing low terakhir subsequent_candles = df.loc[last_swing_low_idx:].iloc[1:] confirmed_break = subsequent_candles[subsequent_candles['close'] < last_swing_low_price] if not confirmed_break.empty: res.append("BEARISH_BOS") structure_str = "/".join(sorted(list(set(res)))) if res else "INDECISIVE" return structure_str, {'last_high': last_swing_high_price, 'last_low': last_swing_low_price} def detect_order_blocks_multi( df: pd.DataFrame, lookback: int = 15, structure_filter: Optional[str] = None, max_age: int = 20 ) -> List[Dict[str, Any]]: order_blocks = [] now_price = df['close'].iloc[-1] atr = calculate_atr_dynamic(df) last_idx = df.index[-1] for i in range(len(df) - lookback - 2, len(df) - 2): candle = df.iloc[i] age = last_idx - df.index[i] if age > max_age: continue # Bullish OB if candle['close'] < candle['open']: found_bos = False for j in range(i + 1, min(i + lookback + 1, len(df))): if df.iloc[j]['close'] > candle['high']: found_bos = True break if found_bos: if structure_filter and structure_filter != "BULLISH_BOS": continue strength = (abs(candle['open'] - candle['close']) / atr) if atr else 1 order_blocks.append({ 'type': 'BULLISH_OB', 'high': float(candle['high']), 'low': float(candle['low']), 'time': candle['time'], 'age': age, 'strength': strength, 'distance': abs(now_price - ((candle['open'] + candle['close']) / 2)), }) # Bearish OB elif candle['close'] > candle['open']: found_bos = False for j in range(i + 1, min(i + lookback + 1, len(df))): if df.iloc[j]['close'] < candle['low']: found_bos = True break if found_bos: if structure_filter and structure_filter != "BEARISH_BOS": continue strength = (abs(candle['open'] - candle['close']) / atr) if atr else 1 order_blocks.append({ 'type': 'BEARISH_OB', 'high': float(candle['high']), 'low': float(candle['low']), 'time': candle['time'], 'age': age, 'strength': strength, 'distance': abs(now_price - ((candle['open'] + candle['close']) / 2)), }) order_blocks = sorted(order_blocks, key=lambda x: (x['distance'], -x['strength'], x['age'])) return order_blocks def detect_fvg_multi(df: pd.DataFrame, min_gap: float = 0.0002, max_age: int = 20) -> List[Dict[str, Any]]: fvg_zones = [] now_price = df['close'].iloc[-1] atr = calculate_atr_dynamic(df) last_idx = df.index[-1] for i in range(1, len(df) - 1): c_prev, c_now, c_next = df.iloc[i - 1], df.iloc[i], df.iloc[i + 1] age = last_idx - df.index[i] if age > max_age: continue # Bullish FVG gap = c_next['low'] - c_prev['high'] if gap > min_gap: strength = gap / atr if atr else 1 fvg_zones.append({ 'type': 'FVG_BULLISH', 'start': float(c_prev['high']), 'end': float(c_next['low']), 'time': c_now['time'], 'age': age, 'strength': strength, 'distance': abs(now_price - ((c_prev['high'] + c_next['low']) / 2)) }) # Bearish FVG gap = c_prev['low'] - c_next['high'] if gap > min_gap: strength = gap / atr if atr else 1 fvg_zones.append({ 'type': 'FVG_BEARISH', 'start': float(c_next['high']), 'end': float(c_prev['low']), 'time': c_now['time'], 'age': age, 'strength': strength, 'distance': abs(now_price - ((c_next['high'] + c_prev['low']) / 2)) }) fvg_zones = sorted(fvg_zones, key=lambda x: (x['distance'], -x['strength'], x['age'])) return fvg_zones # ========== PATTERN, VOLUME, BOUNDARY, PIVOT ========== def detect_pinbar(df: pd.DataFrame, min_ratio: float = 2.0) -> List[Dict[str, Any]]: """Pinbar: ekor minimal 2x body.""" pattern = [] for i in range(2, len(df)): c = df.iloc[i] body = abs(c['close'] - c['open']) upper = c['high'] - max(c['close'], c['open']) lower = min(c['close'], c['open']) - c['low'] # Bullish pinbar if lower > min_ratio * body and upper < 0.5 * body: pattern.append({'type': 'PINBAR_BULL', 'time': c['time'], 'idx': i}) # Bearish pinbar elif upper > min_ratio * body and lower < 0.5 * body: pattern.append({'type': 'PINBAR_BEAR', 'time': c['time'], 'idx': i}) return pattern def detect_engulfing(df: pd.DataFrame) -> List[Dict[str, Any]]: pattern = [] for i in range(1, len(df)): prev, c = df.iloc[i - 1], df.iloc[i] # Bullish engulfing if c['close'] > c['open'] and prev['close'] < prev['open'] and \ c['close'] > prev['open'] and c['open'] < prev['close']: pattern.append({'type': 'ENGULFING_BULL', 'time': c['time'], 'idx': i}) # Bearish engulfing elif c['close'] < c['open'] and prev['close'] > prev['open'] and \ c['close'] < prev['open'] and c['open'] > prev['close']: pattern.append({'type': 'ENGULFING_BEAR', 'time': c['time'], 'idx': i}) return pattern def detect_volume_spike(df: pd.DataFrame, window: int = 20, threshold: float = 2.0) -> List[Dict[str, Any]]: if 'tick_volume' not in df.columns: return [] pattern = [] vol_ma = df['tick_volume'].rolling(window).mean() for i in range(window, len(df)): if df['tick_volume'].iloc[i] > threshold * vol_ma.iloc[i]: pattern.append({'type': 'VOLUME_SPIKE', 'time': df['time'].iloc[i], 'idx': i}) return pattern def get_daily_high_low(df: pd.DataFrame) -> Dict[str, Any]: today = pd.to_datetime(df['time'].iloc[-1]).date() today_data = df[pd.to_datetime(df['time']).dt.date == today] high = today_data['high'].max() low = today_data['low'].min() last_close = today_data['close'].iloc[-1] if len(today_data) else df['close'].iloc[-1] return { 'daily_high': high, 'daily_low': low, 'distance_to_high': abs(last_close - high), 'distance_to_low': abs(last_close - low) } def get_pivot_points(df: pd.DataFrame) -> Dict[str, float]: # Classic pivot, pakai data daily terakhir today = pd.to_datetime(df['time'].iloc[-1]).date() prev_day = today - pd.Timedelta(days=1) prev_data = df[pd.to_datetime(df['time']).dt.date == prev_day] if len(prev_data) == 0: prev_data = df.iloc[-20:] # fallback: 20 bar terakhir high, low, close = prev_data['high'].max(), prev_data['low'].min(), prev_data['close'].iloc[-1] pivot = (high + low + close) / 3 r1 = 2 * pivot - low s1 = 2 * pivot - high return {'pivot': pivot, 'r1': r1, 's1': s1} def detect_continuation_patterns( df: pd.DataFrame, base_max_candles: int = 4, body_threshold: float = 1.2 ) -> List[Dict[str, Any]]: """Mendeteksi pola RBR (Rally-Base-Rally) dan DBD (Drop-Base-Drop).""" patterns = [] if len(df) < base_max_candles + 2: return patterns df = df.copy() df['body'] = abs(df['close'] - df['open']) avg_body = df['body'].rolling(20).mean() for i in range(len(df) - base_max_candles - 1): # Cek RBR (Rally-Base-Rally) rally1 = df.iloc[i] is_strong_rally1 = rally1['close'] > rally1['open'] and rally1['body'] > avg_body.iloc[i] * body_threshold if is_strong_rally1: base_candles = df.iloc[i + 1: i + 1 + base_max_candles] base_high = base_candles['high'].max() base_low = base_candles['low'].min() # Base harus berada dalam rentang rally1 dan tidak terlalu besar if base_high < rally1['high'] + (rally1['body'] * 0.2) and \ base_low > rally1['low'] - (rally1['body'] * 0.2): for j in range(1, base_max_candles + 1): rally2_idx = i + j if rally2_idx < len(df) - 1: rally2 = df.iloc[rally2_idx + 1] is_strong_rally2 = rally2['close'] > rally2['open'] and \ rally2['body'] > avg_body.iloc[rally2_idx + 1] * body_threshold if is_strong_rally2 and rally2['close'] > rally1['high']: patterns.append({'type': 'RBR', 'time': rally2['time'], 'idx': rally2_idx + 1}) i = rally2_idx + 1 # Skip untuk menghindari deteksi tumpang tindih break # Cek DBD (Drop-Base-Drop) drop1 = df.iloc[i] is_strong_drop1 = drop1['close'] < drop1['open'] and drop1['body'] > avg_body.iloc[i] * body_threshold if is_strong_drop1: base_candles = df.iloc[i + 1: i + 1 + base_max_candles] base_high = base_candles['high'].max() base_low = base_candles['low'].min() # Base harus berada dalam rentang drop1 if base_high < drop1['high'] + (drop1['body'] * 0.2) and \ base_low > drop1['low'] - (drop1['body'] * 0.2): for j in range(1, base_max_candles + 1): drop2_idx = i + j if drop2_idx < len(df) - 1: drop2 = df.iloc[drop2_idx + 1] is_strong_drop2 = drop2['close'] < drop2['open'] and \ drop2['body'] > avg_body.iloc[drop2_idx + 1] * body_threshold if is_strong_drop2 and drop2['close'] < drop1['low']: patterns.append({'type': 'DBD', 'time': drop2['time'], 'idx': drop2_idx + 1}) i = drop2_idx + 1 # Skip break return patterns # ========== FEATURE EXTRACTOR ========== def extract_features_full( df: pd.DataFrame, structure: str, order_blocks: List[Dict[str, Any]], fvg_zones: List[Dict[str, Any]], patterns: List[Dict[str, Any]], boundary: Dict[str, Any], pivot: Dict[str, float] ) -> np.ndarray: """Keluarkan vektor fitur komprehensif untuk AI/ML.""" last_price = df['close'].iloc[-1] features = [ last_price, calculate_atr_dynamic(df), min([ob['distance'] for ob in order_blocks]) if order_blocks else 0, max([ob['strength'] for ob in order_blocks]) if order_blocks else 0, min([fvg['distance'] for fvg in fvg_zones]) if fvg_zones else 0, max([fvg['strength'] for fvg in fvg_zones]) if fvg_zones else 0, int(any(p['type'].startswith('ENGULFING') for p in patterns)), boundary.get('distance_to_high', 0), boundary.get('distance_to_low', 0), pivot.get('r1', 0) - last_price, pivot.get('s1', 0) - last_price, # Tambah fitur lain sesuai kebutuhan... ] return np.array(features, dtype=float) # ========== TARGET GENERATOR (LABEL OTOMATIS) ========== def generate_label_fvg( df: pd.DataFrame, fvg_zones: List[Dict[str, Any]], horizon: int = 10, pip_thresh: float = 0.001 ) -> List[int]: """ Label 1 jika X bar setelah FVG harga naik/turun >= pip_thresh, else 0. """ label = [0] * len(df) for fvg in fvg_zones: idx = df[df['time'] == fvg['time']].index[0] base_price = (fvg['start'] + fvg['end']) / 2 future_idx = min(idx + horizon, len(df) - 1) if 'BULLISH' in fvg['type']: if df['high'].iloc[future_idx] - base_price >= pip_thresh: label[idx] = 1 elif 'BEARISH' in fvg['type']: if base_price - df['low'].iloc[future_idx] >= pip_thresh: label[idx] = 1 return label # ========== ZONE PLOTTING & EVENT LOGGING ========== def plot_zones(df, order_blocks, fvg_zones, patterns=None): plt.figure(figsize=(15, 7)) plt.plot(df['time'], df['close'], label='Close Price', color='black') for ob in order_blocks: plt.axhspan(ob['low'], ob['high'], color='green' if 'BULL' in ob['type'] else 'red', alpha=0.25) for fvg in fvg_zones: plt.axhspan(fvg['start'], fvg['end'], color='blue' if 'BULL' in fvg['type'] else 'orange', alpha=0.15) if patterns: for p in patterns: idx = p['idx'] marker = '^' if 'BULL' in p['type'] else 'v' plt.scatter(df['time'].iloc[idx], df['close'].iloc[idx], marker=marker, color='purple') plt.legend() plt.title("Price with OB/FVG/Pattern Zones") plt.show() def log_zone_events(order_blocks, fvg_zones, patterns): """Log bar yang trigger event, useful untuk AI/analitik.""" event_log = [] for ob in order_blocks: event_log.append({'time': ob['time'], 'price': (ob['high'] + ob['low']) / 2, 'type': ob['type']}) for fvg in fvg_zones: event_log.append({'time': fvg['time'], 'price': (fvg['start'] + fvg['end']) / 2, 'type': fvg['type']}) for p in patterns: event_log.append({'time': p['time'], 'price': None, 'type': p['type']}) event_log = sorted(event_log, key=lambda x: x['time']) return event_log # ========== EQH/EQL & LIQUIDITY SWEEP ========== def detect_eqh_eql( df: pd.DataFrame, window: int = 10, tolerance: float = 0.0003 ) -> Tuple[List[Dict[str, float]], List[Dict[str, float]]]: if len(df) < window: return [], [] eqh, eql = [], [] for i in range(window, len(df)): hi = df['high'].iloc[i - window:i] lo = df['low'].iloc[i - window:i] if (hi.max() - hi.min()) < (hi.mean() * tolerance): eqh.append({'time': df['time'].iloc[i], 'value': float(hi.mean())}) if (lo.max() - lo.min()) < (lo.mean() * tolerance): eql.append({'time': df['time'].iloc[i], 'value': float(lo.mean())}) return eqh, eql def detect_liquidity_sweep( df: pd.DataFrame, min_candles_for_swing: int = 5, require_confirmation_candle: bool = True ) -> List[Dict[str, Any]]: """ Mendeteksi Liquidity Sweep (LS) dengan konfirmasi candle berikutnya. Logika diperbaiki untuk efisiensi dan akurasi. """ liquidity_sweeps = [] if len(df) < min_candles_for_swing * 2 + 1: # Butuh satu ekstra untuk konfirmasi return liquidity_sweeps df = df.copy() df['body'] = abs(df['close'] - df['open']) avg_body = df['body'].rolling(20).mean() df['is_swing_high'] = (df['high'].rolling(min_candles_for_swing, center=True).max() == df['high']) df['is_swing_low'] = (df['low'].rolling(min_candles_for_swing, center=True).min() == df['low']) swing_highs_idx = df[df['is_swing_high']].index.tolist() swing_lows_idx = df[df['is_swing_low']].index.tolist() start_iter_idx = max(0, len(df) - 50) for i in range(start_iter_idx, len(df) - 1): current_candle = df.iloc[i] # Cek Bullish Sweep relevant_swing_lows = [sl for sl in swing_lows_idx if sl < i] if relevant_swing_lows: prev_swing_low_idx = max(relevant_swing_lows) prev_swing_low_val = df['low'].loc[prev_swing_low_idx] if current_candle['low'] < prev_swing_low_val and current_candle['close'] > prev_swing_low_val: if not require_confirmation_candle: is_confirmed = True else: confirmation_candle = df.iloc[i + 1] is_confirmed = confirmation_candle['close'] > confirmation_candle['open'] and \ confirmation_candle['body'] > avg_body.iloc[i + 1] * 0.8 if is_confirmed: strength = (current_candle['close'] - prev_swing_low_val) strength /= calculate_atr_dynamic(df.loc[:current_candle.name]) liquidity_sweeps.append({ 'type': 'BULLISH_LS', 'swept_level': float(prev_swing_low_val), 'sweep_time': current_candle['time'], 'strength': strength }) # Cek Bearish Sweep relevant_swing_highs = [sh for sh in swing_highs_idx if sh < i] if relevant_swing_highs: prev_swing_high_idx = max(relevant_swing_highs) prev_swing_high_val = df['high'].loc[prev_swing_high_idx] if current_candle['high'] > prev_swing_high_val and current_candle['close'] < prev_swing_high_val: if not require_confirmation_candle: is_confirmed = True else: confirmation_candle = df.iloc[i + 1] is_confirmed = confirmation_candle['close'] < confirmation_candle['open'] and \ confirmation_candle['body'] > avg_body.iloc[i + 1] * 0.8 if is_confirmed: strength = (prev_swing_high_val - current_candle['close']) strength /= calculate_atr_dynamic(df.loc[:current_candle.name]) liquidity_sweeps.append({ 'type': 'BEARISH_LS', 'swept_level': float(prev_swing_high_val), 'sweep_time': current_candle['time'], 'strength': strength }) unique_sweeps = {s['sweep_time']: s for s in liquidity_sweeps}.values() return list(unique_sweeps) # ========== LIQUIDITY GRAB (SWEEP ON EQH/EQL) ========== def detect_liquidity_grab( df: pd.DataFrame, window: int = 20, tolerance: float = 0.0003, confirmation_wick_ratio: float = 0.5 ) -> List[Dict[str, Any]]: """ Mendeteksi sweep pada liquidity pool (EQH/EQL). Ini adalah sinyal probabilitas tinggi karena menargetkan area likuiditas yang jelas. """ grabs = [] if len(df) < window + 5: # Butuh beberapa candle setelah window return grabs eqh_pools, eql_pools = detect_eqh_eql(df, window=window, tolerance=tolerance) # Hanya proses pool terbaru untuk efisiensi recent_eql = eql_pools[-1] if eql_pools else None recent_eqh = eqh_pools[-1] if eqh_pools else None # Cek Bullish Liquidity Grab (sweep EQL) if recent_eql: pool_level = recent_eql['value'] pool_time = recent_eql['time'] # Cari candle yang melakukan sweep setelah pool terbentuk candles_after_pool = df[df['time'] > pool_time] for i in range(len(candles_after_pool)): sweep_candle = candles_after_pool.iloc[i] # 1. Harga harus turun di bawah pool # 2. Harga harus ditutup kembali di atas pool (konfirmasi) if sweep_candle['low'] < pool_level and sweep_candle['close'] > pool_level: # 3. Konfirmasi tambahan: wick bawah harus signifikan lower_wick = sweep_candle['open'] - sweep_candle['low'] \ if sweep_candle['open'] > sweep_candle['close'] else sweep_candle['close'] - sweep_candle['low'] if lower_wick > (sweep_candle['high'] - sweep_candle['low']) * confirmation_wick_ratio: strength = lower_wick / calculate_atr_dynamic(df.loc[:sweep_candle.name]) grabs.append({ 'type': 'LIQUIDITY_GRAB_BULL', 'swept_level': float(pool_level), 'sweep_time': sweep_candle['time'], 'strength': strength # Kekuatan berdasarkan penolakan }) break # Hanya ambil grab pertama setelah pool # Cek Bearish Liquidity Grab (sweep EQH) if recent_eqh: pool_level = recent_eqh['value'] pool_time = recent_eqh['time'] candles_after_pool = df[df['time'] > pool_time] for i in range(len(candles_after_pool)): sweep_candle = candles_after_pool.iloc[i] # 1. Harga harus naik di atas pool # 2. Harga harus ditutup kembali di bawah pool if sweep_candle['high'] > pool_level and sweep_candle['close'] < pool_level: # 3. Konfirmasi tambahan: wick atas harus signifikan upper_wick = sweep_candle['high'] - sweep_candle['close'] \ if sweep_candle['open'] > sweep_candle['close'] else sweep_candle['high'] - sweep_candle['open'] if upper_wick > (sweep_candle['high'] - sweep_candle['low']) * confirmation_wick_ratio: strength = upper_wick / calculate_atr_dynamic(df.loc[:sweep_candle.name]) grabs.append({ 'type': 'LIQUIDITY_GRAB_BEAR', 'swept_level': float(pool_level), 'sweep_time': sweep_candle['time'], 'strength': strength }) break return grabs # ========== OTE ZONE ========== def calculate_optimal_trade_entry( swing_start_price: float, swing_end_price: float, direction: str ) -> Dict[str, float]: fib_levels = { '0.618': 0.618, '0.705': 0.705, '0.790': 0.790 } ote_zone: Dict[str, float] = {} high_val = max(swing_start_price, swing_end_price) low_val = min(swing_start_price, swing_end_price) price_range = high_val - low_val if direction.upper() == "BUY": ote_zone['upper'] = high_val - (price_range * fib_levels['0.618']) ote_zone['mid'] = high_val - (price_range * fib_levels['0.705']) ote_zone['lower'] = high_val - (price_range * fib_levels['0.790']) elif direction.upper() == "SELL": ote_zone['upper'] = low_val + (price_range * fib_levels['0.790']) ote_zone['mid'] = low_val + (price_range * fib_levels['0.705']) ote_zone['lower'] = low_val + (price_range * fib_levels['0.618']) return ote_zone # ========== CHANGE OF CHARACTER (CHoCH) ========== def detect_change_of_character(df: pd.DataFrame, swing_lookback: int = 5) -> Optional[Dict[str, Any]]: """ Mendeteksi Change of Character (CHoCH), sinyal awal dari potensi reversal. CHoCH terjadi ketika harga menembus swing point minor terakhir setelah sebuah Break of Structure (BOS) terjadi. """ if len(df) < swing_lookback * 2 + 5: # Butuh beberapa candle ekstra return None _, swing_points = detect_structure(df, swing_lookback) last_high = swing_points.get('last_high') last_low = swing_points.get('last_low') # Tidak bisa mendeteksi CHoCH tanpa swing point yang jelas if not last_high or not last_low: return None df = df.copy() df['is_swing_high'] = (df['high'] == df['high'].rolling(swing_lookback * 2 + 1, center=True).max()) df['is_swing_low'] = (df['low'] == df['low'].rolling(swing_lookback * 2 + 1, center=True).min()) swing_highs = df[df['is_swing_high']] swing_lows = df[df['is_swing_low']] if swing_highs.empty or swing_lows.empty: return None # Skenario Bearish CHoCH: # Terjadi setelah Bullish BOS, lalu harga break swing low terakhir. # 1. Cari Bullish BOS terbaru (higher high). if len(swing_highs) >= 2: last_sh = swing_highs.iloc[-1] prev_sh = swing_highs.iloc[-2] if last_sh['high'] > prev_sh['high']: # Indikasi Bullish Trend/BOS # 2. Cari swing low yang terbentuk antara dua swing high ini. relevant_lows = swing_lows[(swing_lows.index > prev_sh.name) & (swing_lows.index < last_sh.name)] if not relevant_lows.empty: choch_level = relevant_lows.iloc[-1]['low'] # 3. Cek apakah harga saat ini atau beberapa candle terakhir break di bawah level itu. recent_candles = df.iloc[-3:] if (recent_candles['close'] < choch_level).any(): return {'type': 'BEARISH_CHoCH', 'price': choch_level, 'time': df.iloc[-1]['time']} # Skenario Bullish CHoCH: # Terjadi setelah Bearish BOS, lalu harga break swing high terakhir. # 1. Cari Bearish BOS terbaru (lower low). if len(swing_lows) >= 2: last_sl = swing_lows.iloc[-1] prev_sl = swing_lows.iloc[-2] if last_sl['low'] < prev_sl['low']: # PERBAIKAN: prev_sh -> prev_sl # 2. Cari swing high yang terbentuk antara dua swing low ini. relevant_highs = swing_highs[(swing_highs.index > prev_sl.name) & (swing_highs.index < last_sl.name)] if not relevant_highs.empty: choch_level = relevant_highs.iloc[-1]['high'] # 3. Cek apakah harga saat ini atau beberapa candle terakhir break di atas level itu. recent_candles = df.iloc[-3:] if (recent_candles['close'] > choch_level).any(): return {'type': 'BULLISH_CHoCH', 'price': choch_level, 'time': df.iloc[-1]['time']} return None # ========== RSI & DIVERGENCE ========== def calculate_rsi(prices: pd.Series, period: int = 14) -> pd.Series: """ Menghitung Relative Strength Index (RSI). """ if len(prices) < period + 1: return pd.Series([np.nan] * len(prices), index=prices.index) delta = prices.diff() gain = (delta.where(delta > 0, 0)).ewm(alpha=1/period, adjust=False).mean() loss = (-delta.where(delta < 0, 0)).ewm(alpha=1/period, adjust=False).mean() rs = gain / loss rsi = 100 - (100 / (1 + rs)) return rsi def detect_regular_divergence(df: pd.DataFrame, swing_lookback: int = 5) -> List[Dict[str, Any]]: """ Mendeteksi divergensi reguler (sinyal reversal) antara harga dan RSI. """ divergences = [] if 'rsi' not in df.columns or df['rsi'].isnull().all(): return divergences df['price_swing_high'] = (df['high'] == df['high'].rolling(swing_lookback * 2 + 1, center=True).max()) df['price_swing_low'] = (df['low'] == df['low'].rolling(swing_lookback * 2 + 1, center=True).min()) df['rsi_swing_high'] = (df['rsi'] == df['rsi'].rolling(swing_lookback * 2 + 1, center=True).max()) df['rsi_swing_low'] = (df['rsi'] == df['rsi'].rolling(swing_lookback * 2 + 1, center=True).min()) price_lows = df[df['price_swing_low']] price_highs = df[df['price_swing_high']] rsi_lows = df[df['rsi_swing_low']] rsi_highs = df[df['rsi_swing_high']] # Regular Bullish Divergence: Lower Low di Harga, Higher Low di RSI if len(price_lows) >= 2 and len(rsi_lows) >= 2: last_price_low = price_lows.iloc[-1] prev_price_low = price_lows.iloc[-2] if last_price_low['low'] < prev_price_low['low']: # Cari RSI low yang relevan relevant_rsi_lows = rsi_lows[(rsi_lows.index >= prev_price_low.name) & (rsi_lows.index <= last_price_low.name)] if len(relevant_rsi_lows) >= 2: if relevant_rsi_lows.iloc[-1]['rsi'] > relevant_rsi_lows.iloc[0]['rsi']: divergences.append({'type': 'REGULAR_BULLISH_DIVERGENCE', 'time': last_price_low['time']}) # Regular Bearish Divergence: Higher High di Harga, Lower High di RSI if len(price_highs) >= 2 and len(rsi_highs) >= 2: last_price_high = price_highs.iloc[-1] prev_price_high = price_highs.iloc[-2] if last_price_high['high'] > prev_price_high['high']: # Cari RSI high yang relevan relevant_rsi_highs = rsi_highs[(rsi_highs.index >= prev_price_high.name) & (rsi_highs.index <= last_price_high.name)] if len(relevant_rsi_highs) >= 2: if relevant_rsi_highs.iloc[-1]['rsi'] < relevant_rsi_highs.iloc[0]['rsi']: divergences.append({'type': 'REGULAR_BEARISH_DIVERGENCE', 'time': last_price_high['time']}) return divergences def detect_hidden_divergence(df: pd.DataFrame, swing_lookback: int = 5) -> List[Dict[str, Any]]: """ Mendeteksi divergensi tersembunyi (sinyal continuation) antara harga dan RSI. """ divergences = [] if 'rsi' not in df.columns or df['rsi'].isnull().all(): return divergences df['price_swing_high'] = (df['high'] == df['high'].rolling(swing_lookback * 2 + 1, center=True).max()) df['price_swing_low'] = (df['low'] == df['low'].rolling(swing_lookback * 2 + 1, center=True).min()) df['rsi_swing_high'] = (df['rsi'] == df['rsi'].rolling(swing_lookback * 2 + 1, center=True).max()) df['rsi_swing_low'] = (df['rsi'] == df['rsi'].rolling(swing_lookback * 2 + 1, center=True).min()) price_lows = df[df['price_swing_low']] price_highs = df[df['price_swing_high']] rsi_lows = df[df['rsi_swing_low']] rsi_highs = df[df['rsi_swing_high']] # Hidden Bullish Divergence: Higher Low di Harga, Lower Low di RSI if len(price_lows) >= 2 and len(rsi_lows) >= 2: last_price_low = price_lows.iloc[-1] prev_price_low = price_lows.iloc[-2] if last_price_low['low'] > prev_price_low['low']: relevant_rsi_lows = rsi_lows[(rsi_lows.index >= prev_price_low.name) & (rsi_lows.index <= last_price_low.name)] if len(relevant_rsi_lows) >= 2: if relevant_rsi_lows.iloc[-1]['rsi'] < relevant_rsi_lows.iloc[0]['rsi']: divergences.append({'type': 'HIDDEN_BULLISH_DIVERGENCE', 'time': last_price_low['time']}) # Hidden Bearish Divergence: Lower High di Harga, Higher High di RSI if len(price_highs) >= 2 and len(rsi_highs) >= 2: last_price_high = price_highs.iloc[-1] prev_price_high = price_highs.iloc[-2] if last_price_high['high'] < prev_price_high['high']: relevant_rsi_highs = rsi_highs[(rsi_highs.index >= prev_price_high.name) & (rsi_highs.index <= last_price_high.name)] if len(relevant_rsi_highs) >= 2: if relevant_rsi_highs.iloc[-1]['rsi'] > relevant_rsi_highs.iloc[0]['rsi']: divergences.append({'type': 'HIDDEN_BEARISH_DIVERGENCE', 'time': last_price_high['time']}) return divergences # ========== ANALISIS MULTI-TIMEFRAME ========== def analyze_mtf_trend(timeframes_data: Dict[str, pd.DataFrame]) -> Dict[str, str]: """Analisis trend di multiple timeframe.""" trend_analysis = {} for tf, df in timeframes_data.items(): structure, _ = detect_structure(df) if "BULLISH_BOS" in structure or "HH/HL" in structure: trend_analysis[tf] = "BULLISH" elif "BEARISH_BOS" in structure or "LL/LH" in structure: trend_analysis[tf] = "BEARISH" else: trend_analysis[tf] = "NEUTRAL" return trend_analysis def detect_mtf_divergence(timeframes_data: Dict[str, pd.DataFrame]) -> Dict[str, List[Dict[str, Any]]]: """Deteksi divergence di multiple timeframe.""" divergences = {} for tf, df in timeframes_data.items(): df = df.copy() df['rsi'] = calculate_rsi(df['close']) # Deteksi regular divergence reg_div = detect_regular_divergence(df) # Deteksi hidden divergence hidden_div = detect_hidden_divergence(df) divergences[tf] = reg_div + hidden_div return divergences def analyze_volatility(df: pd.DataFrame, window: int = 20) -> Dict[str, float]: """Analisis volatilitas untuk penyesuaian parameter.""" atr = calculate_atr_dynamic(df) avg_range = df['high'].sub(df['low']).rolling(window).mean().iloc[-1] return { 'atr': atr, 'avg_range': avg_range, 'volatility_ratio': atr / avg_range if avg_range > 0 else 1.0 } def adjust_trade_parameters( volatility: Dict[str, float], base_sl_pips: float, base_tp_pips: float ) -> Tuple[float, float]: """Sesuaikan SL/TP berdasarkan volatilitas.""" vol_ratio = volatility['volatility_ratio'] adjusted_sl = base_sl_pips * (1 + (vol_ratio - 1) * 0.5) adjusted_tp = base_tp_pips * (1 + (vol_ratio - 1) * 0.5) return adjusted_sl, adjusted_tp # ========== TRENDLINE DETECTION ========== def detect_trendline_breaks( df: pd.DataFrame, length: int = 14, mult: float = 1.0, calc_method: str = 'ATR' ) -> Dict[str, Any]: """ Implementasi LuxAlgo Trendline dengan deteksi breakout """ df = df.copy() # Deteksi pivot points df['ph'] = df['high'].rolling(window=length * 2 + 1, center=True).apply( lambda x: x[length] == max(x), raw=True ) df['pl'] = df['low'].rolling(window=length * 2 + 1, center=True).apply( lambda x: x[length] == min(x), raw=True ) # Kalkulasi slope berdasarkan metode if calc_method == 'ATR': slope = df['high'].sub(df['low']).rolling(length).mean() / length * mult elif calc_method == 'STDEV': slope = df['close'].rolling(length).std() / length * mult else: # LINREG x = np.arange(length) slope = df['close'].rolling(length).apply( lambda y: np.polyfit(x, y, 1)[0] if len(y) > 1 else np.nan ) * mult # Inisialisasi trendlines upper = pd.Series(index=df.index, dtype=float) lower = pd.Series(index=df.index, dtype=float) last_ph = None last_pl = None last_ph_slope = 0 last_pl_slope = 0 # Konstruksi trendlines for i in range(length, len(df)): if df['ph'].iloc[i]: last_ph = df['high'].iloc[i] last_ph_slope = slope.iloc[i] elif last_ph is not None: upper.iloc[i] = last_ph - last_ph_slope * (i - df.index[df['ph']].max()) if df['pl'].iloc[i]: last_pl = df['low'].iloc[i] last_pl_slope = slope.iloc[i] elif last_pl is not None: lower.iloc[i] = last_pl + last_pl_slope * (i - df.index[df['pl']].max()) # Deteksi breakouts breaks = { 'upper_break': False, 'lower_break': False, 'upper_line': upper.iloc[-1], 'lower_line': lower.iloc[-1] } if len(df) > 1: current_close = df['close'].iloc[-1] prev_close = df['close'].iloc[-2] # Upper break (bullish) if prev_close <= upper.iloc[-2] and current_close > upper.iloc[-1]: breaks['upper_break'] = True # Lower break (bearish) if prev_close >= lower.iloc[-2] and current_close < lower.iloc[-1]: breaks['lower_break'] = True return breaks # ========== ANALISIS OPPORTUNITY TF ========== def analyze_tf_opportunity( df: pd.DataFrame, symbol: str, tf: str, mt5_path: str, start_date: str, end_date: str, profile_type: str = "intraday", # ["scalping", "intraday", "swing"] atr_length: int = 14, order_block_lookback: int = 15, fvg_min_gap: float = 0.0002, fvg_max_age: int = 20, pattern_body_threshold: float = 1.2, volume_window: int = 20, volume_threshold: float = 2.0, trendline_length: int = 14, trendline_mult: float = 1.0, calc_method: str = 'ATR', weights: Optional[Dict[str, float]] = None ) -> Optional[Dict[str, Any]]: """ Analisis peluang trading berdasarkan profil yang dipilih (scalping/intraday/swing). """ # Validasi profile type if profile_type not in ["scalping", "intraday", "swing"]: raise ValueError("profile_type harus salah satu dari: scalping, intraday, atau swing") # Inisialisasi bobot jika None weights = weights or {} # Inisialisasi parameter profil min_score_threshold = 5.0 # Nilai default # Sesuaikan parameter berdasarkan profil if profile_type == "scalping": order_block_lookback = 10 fvg_max_age = 10 pattern_body_threshold = 1.0 volume_threshold = 1.5 min_score_threshold = 3.0 elif profile_type == "intraday": order_block_lookback = 15 fvg_max_age = 20 pattern_body_threshold = 1.2 volume_threshold = 2.0 min_score_threshold = 5.0 elif profile_type == "swing": order_block_lookback = 30 fvg_max_age = 40 pattern_body_threshold = 1.5 volume_threshold = 2.5 min_score_threshold = 7.0 # Inisialisasi scoring score = 0.0 score_components = {} info_list = [] # Deteksi struktur pasar structure, swing_points = detect_structure(df) # Analisis Order Blocks order_blocks = detect_order_blocks_multi( df, lookback=order_block_lookback, structure_filter=None, max_age=fvg_max_age ) # Analisis Fair Value Gaps fvg_zones = detect_fvg_multi( df, min_gap=fvg_min_gap, max_age=fvg_max_age ) # Deteksi pattern berdasarkan profil patterns = [] if profile_type in ["scalping", "intraday"]: patterns.extend(detect_pinbar(df, min_ratio=pattern_body_threshold)) patterns.extend(detect_engulfing(df)) if profile_type in ["intraday", "swing"]: patterns.extend(detect_continuation_patterns(df, body_threshold=pattern_body_threshold)) # Analisis volume khusus untuk scalping dan intraday volume_analysis = None if profile_type in ["scalping", "intraday"]: volume_spikes = detect_volume_spike(df, window=volume_window, threshold=volume_threshold) if volume_spikes: volume_analysis = volume_spikes[-1] # Ambil spike terakhir # Analisis trendline dengan sensitivitas berbeda per profil trendline_breaks = detect_trendline_breaks( df, length=trendline_length, mult=trendline_mult, calc_method=calc_method ) # Scoring berdasarkan profil # Structure scoring if "BULLISH_BOS" in structure: structure_score = weights.get('BULLISH_BOS', 3.0) if profile_type == "swing": structure_score *= 1.5 # Lebih penting untuk swing score += structure_score score_components['structure'] = structure_score info_list.append(f"Bullish BOS detected ({profile_type} profile)") elif "BEARISH_BOS" in structure: structure_score = weights.get('BEARISH_BOS', -3.0) if profile_type == "swing": structure_score *= 1.5 score += structure_score score_components['structure'] = structure_score info_list.append(f"Bearish BOS detected ({profile_type} profile)") # Order Block scoring for ob in order_blocks[:2]: # Consider top 2 closest OBs ob_score = weights.get(f"{ob['type']}", 1.0) if profile_type == "scalping": ob_score *= 0.7 # Less important for scalping elif profile_type == "swing": ob_score *= 1.3 # More important for swing score += ob_score score_components['order_block'] = ob_score info_list.append(f"Order Block: {ob['type']} detected") # Pattern scoring berdasarkan profil for pattern in patterns: pattern_score = weights.get(pattern['type'], 1.0) if profile_type == "scalping": pattern_score *= 1.3 # More important for scalping elif profile_type == "swing": pattern_score *= 0.7 # Less important for swing score += pattern_score score_components['pattern'] = pattern_score info_list.append(f"Pattern: {pattern['type']} detected") # Volume analysis scoring (terutama untuk scalping/intraday) if volume_analysis and profile_type in ["scalping", "intraday"]: volume_score = weights.get('VOLUME_SPIKE', 1.0) score += volume_score score_components['volume'] = volume_score info_list.append("Volume spike detected") # Trendline break scoring if trendline_breaks['upper_break']: trendline_score = weights.get('TRENDLINE_BREAK_BULL', 2.0) if profile_type == "swing": trendline_score *= 1.2 score += trendline_score score_components['trendline'] = trendline_score info_list.append(f"Bullish trendline break ({profile_type} profile)") elif trendline_breaks['lower_break']: trendline_score = weights.get('TRENDLINE_BREAK_BEAR', -2.0) if profile_type == "swing": trendline_score *= 1.2 score += trendline_score score_components['trendline'] = trendline_score info_list.append(f"Bearish trendline break ({profile_type} profile)") # Check if score meets minimum threshold for the profile if abs(score) < min_score_threshold: return None return { 'symbol': symbol, 'timeframe': tf, 'profile_type': profile_type, 'score': score, 'score_components': score_components, 'structure': structure, 'order_blocks': order_blocks, 'fvg_zones': fvg_zones, 'patterns': patterns, 'trendline_breaks': trendline_breaks, 'volume_analysis': volume_analysis, 'info': info_list } def detect_smc_structure(df: pd.DataFrame, length: int = 14) -> Dict[str, Any]: """ Deteksi struktur Smart Money Concepts (SMC) """ df = df.copy() result = { 'internal_structure': {'bullish_bos': False, 'bearish_bos': False, 'bullish_choch': False, 'bearish_choch': False}, 'swing_structure': {'bullish_bos': False, 'bearish_bos': False, 'bullish_choch': False, 'bearish_choch': False}, 'order_blocks': [], 'fair_value_gaps': [], 'equal_levels': {'highs': [], 'lows': []} } # Deteksi Internal Structure df['internal_high'] = df['high'].rolling(5).max() == df['high'] df['internal_low'] = df['low'].rolling(5).min() == df['low'] # Deteksi Break of Structure (BOS) for i in range(5, len(df) - 1): # Internal BOS if df['close'].iloc[i] > df['high'].iloc[i - 1] and df['internal_high'].iloc[i - 1]: result['internal_structure']['bullish_bos'] = True elif df['close'].iloc[i] < df['low'].iloc[i - 1] and df['internal_low'].iloc[i - 1]: result['internal_structure']['bearish_bos'] = True # Swing BOS dengan konfirmasi if i >= 14: if df['close'].iloc[i] > df['high'].iloc[i - 14:i].max(): result['swing_structure']['bullish_bos'] = True elif df['close'].iloc[i] < df['low'].iloc[i - 14:i].min(): result['swing_structure']['bearish_bos'] = True # Deteksi Change of Character (CHoCH) if len(df) > 20: last_swing = "none" for i in range(20, len(df)): if df['high'].iloc[i] > df['high'].iloc[i - 20:i].max(): if last_swing == "low": result['swing_structure']['bullish_choch'] = True last_swing = "high" elif df['low'].iloc[i] < df['low'].iloc[i - 20:i].min(): if last_swing == "high": result['swing_structure']['bearish_choch'] = True last_swing = "low" # Deteksi Order Blocks atr = calculate_atr_dynamic(df) for i in range(len(df) - 3, len(df)): candle_size = df['high'].iloc[i] - df['low'].iloc[i] if candle_size > 1.5 * atr: # Significant candle if df['close'].iloc[i] < df['open'].iloc[i]: # Bearish OB result['order_blocks'].append({ 'type': 'bearish', 'high': df['high'].iloc[i], 'low': df['low'].iloc[i], 'time': df.index[i] }) elif df['close'].iloc[i] > df['open'].iloc[i]: # Bullish OB result['order_blocks'].append({ 'type': 'bullish', 'high': df['high'].iloc[i], 'low': df['low'].iloc[i], 'time': df.index[i] }) # Deteksi Fair Value Gaps for i in range(1, len(df) - 1): if df['low'].iloc[i + 1] > df['high'].iloc[i - 1]: # Bullish FVG result['fair_value_gaps'].append({ 'type': 'bullish', 'top': df['low'].iloc[i + 1], 'bottom': df['high'].iloc[i - 1], 'time': df.index[i] }) elif df['high'].iloc[i + 1] < df['low'].iloc[i - 1]: # Bearish FVG result['fair_value_gaps'].append({ 'type': 'bearish', 'top': df['low'].iloc[i - 1], 'bottom': df['high'].iloc[i + 1], 'time': df.index[i] }) return result def adapt_weights_to_context(weights: Dict[str, float], context: Dict[str, Any]) -> Dict[str, float]: """ Adaptasi bobot berdasarkan kondisi market. """ adapted = weights.copy() # Sesuaikan berdasarkan fase trend if context['trend_phase'] == 'RANGING': # Kurangi bobot breakout, tingkatkan bobot reversal for key in adapted: if 'BREAKOUT' in key or 'BOS' in key: adapted[key] *= 0.7 if 'REVERSAL' in key or 'CHoCH' in key: adapted[key] *= 1.3 elif context['trend_phase'] == 'TRENDING': # Tingkatkan bobot breakout & continuation for key in adapted: if 'BREAKOUT' in key or 'BOS' in key or 'CONTINUATION' in key: adapted[key] *= 1.3 if 'REVERSAL' in key or 'CHoCH' in key: adapted[key] *= 0.7 # Sesuaikan berdasarkan volatilitas vol_ratio = context.get('volatility', {}).get('volatility_ratio', 1.0) if vol_ratio > 1.5: # High volatility # Kurangi bobot pattern, tingkatkan bobot struktur for key in adapted: if 'PATTERN' in key or 'ENGULFING' in key or 'PINBAR' in key: adapted[key] *= 0.8 if 'STRUCTURE' in key or 'BOS' in key: adapted[key] *= 1.2 elif vol_ratio < 0.5: # Low volatility # Tingkatkan bobot pattern for key in adapted: if 'PATTERN' in key or 'ENGULFING' in key or 'PINBAR' in key: adapted[key] *= 1.2 # Sesuaikan berdasarkan kondisi likuiditas liquidity = context.get('liquidity_conditions', {}) if liquidity.get('average_spread', 0) > 2.0: # Spread tinggi # Kurangi semua bobot karena kondisi kurang ideal for key in adapted: adapted[key] *= 0.8 return adapted def validate_scalping_opportunity( df: pd.DataFrame, profile_settings: Dict[str, Any] ) -> Tuple[bool, float, List[str]]: """ Validasi khusus untuk scalping dengan kriteria yang lebih ketat """ validations = [] confidence_boost = 0 # 1. Analisis Volume volume_ma = df['tick_volume'].rolling(20).mean() current_volume = df['tick_volume'].iloc[-1] volume_ratio = current_volume / volume_ma.iloc[-1] if volume_ratio >= profile_settings['entry_rules']['minimum_volume_threshold']: validations.append(f"Volume valid ({volume_ratio:.2f}x average)") confidence_boost += 0.5 else: return False, 0, ["Volume tidak mencukupi"] # 2. Analisis Volatilitas volatility_percentile = df['high'].sub(df['low']).rolling(50).mean().rank(pct=True).iloc[-1] * 100 if (profile_settings['entry_rules']['minimum_volatility_percentile'] <= volatility_percentile <= profile_settings['entry_rules']['maximum_volatility_percentile']): validations.append(f"Volatilitas optimal ({volatility_percentile:.1f}%)") confidence_boost += 0.5 else: return False, 0, ["Volatilitas di luar range optimal"] # 3. Momentum Check rsi = calculate_rsi(df['close']) last_rsi = rsi.iloc[-1] if 20 <= last_rsi <= 80: # Avoid extreme conditions momentum_aligned = (last_rsi > 50 and df['close'].iloc[-1] > df['close'].iloc[-2]) or \ (last_rsi < 50 and df['close'].iloc[-1] < df['close'].iloc[-2]) if momentum_aligned: validations.append("Momentum aligned") confidence_boost += 0.7 # 4. Pattern Quality Check if profile_settings['advanced_filters']['pattern_quality_threshold']: pattern_quality = analyze_pattern_quality(df) if pattern_quality >= profile_settings['advanced_filters']['pattern_quality_threshold']: validations.append(f"Pattern quality good ({pattern_quality:.2f})") confidence_boost += 0.8 else: return False, 0, ["Pattern quality insufficient"] # 5. Price Action Clarity price_clarity = analyze_price_action_clarity(df) if price_clarity >= 0.7: validations.append(f"Clear price action ({price_clarity:.2f})") confidence_boost += 0.5 # 6. Reward Ratio Check if 'minimum_reward_ratio' in profile_settings['advanced_filters']: nearest_resistance = find_nearest_resistance(df) nearest_support = find_nearest_support(df) current_price = df['close'].iloc[-1] potential_reward = abs(nearest_resistance - current_price) potential_risk = abs(current_price - nearest_support) if potential_risk > 0: reward_ratio = potential_reward / potential_risk if reward_ratio >= profile_settings['advanced_filters']['minimum_reward_ratio']: validations.append(f"Good reward ratio ({reward_ratio:.2f})") confidence_boost += 0.6 else: return False, 0, ["Insufficient reward ratio"] return len(validations) >= 4, confidence_boost, validations def analyze_pattern_quality(df: pd.DataFrame) -> float: """Analisis kualitas pattern untuk scalping""" quality_score = 0.0 last_candle = df.iloc[-1] prev_candle = df.iloc[-2] # 1. Body to Wick Ratio body = abs(last_candle['close'] - last_candle['open']) upper_wick = last_candle['high'] - max(last_candle['open'], last_candle['close']) lower_wick = min(last_candle['open'], last_candle['close']) - last_candle['low'] if body > (upper_wick + lower_wick): quality_score += 0.4 # Strong conviction # 2. Clean Break of Previous Structure if last_candle['close'] > prev_candle['high'] or last_candle['close'] < prev_candle['low']: quality_score += 0.3 # 3. Volume Confirmation if last_candle['tick_volume'] > df['tick_volume'].rolling(20).mean().iloc[-1]: quality_score += 0.3 return quality_score def analyze_price_action_clarity(df: pd.DataFrame) -> float: """Analisis kejelasan price action untuk scalping""" clarity_score = 0.0 recent_candles = df.tail(5) # 1. Consistency in Direction closes = recent_candles['close'] if (closes.diff().dropna() > 0).all() or (closes.diff().dropna() < 0).all(): clarity_score += 0.4 # 2. Clean Candle Formation for _, candle in recent_candles.iterrows(): body = abs(candle['close'] - candle['open']) total_range = candle['high'] - candle['low'] if total_range > 0 and body / total_range > 0.6: clarity_score += 0.1 # Max 0.5 from this # 3. Minimal Overlap for i in range(1, len(recent_candles)): curr = recent_candles.iloc[i] prev = recent_candles.iloc[i - 1] if (curr['high'] > prev['high'] and curr['low'] > prev['low']) or \ (curr['high'] < prev['high'] and curr['low'] < prev['low']): clarity_score += 0.1 # Max 0.4 from this return min(clarity_score, 1.0) def find_nearest_support(df: pd.DataFrame) -> float: """Mencari level support terdekat""" recent_lows = df['low'].rolling(window=20).min() current_price = df['close'].iloc[-1] support_levels = recent_lows[recent_lows < current_price].unique() if len(support_levels) > 0: return max(support_levels) # Highest support below current price return current_price - calculate_atr_dynamic(df) def find_nearest_resistance(df: pd.DataFrame) -> float: """Mencari level resistance terdekat""" recent_highs = df['high'].rolling(window=20).max() current_price = df['close'].iloc[-1] resistance_levels = recent_highs[recent_highs > current_price].unique() if len(resistance_levels) > 0: return min(resistance_levels) # Lowest resistance above current price return current_price + calculate_atr_dynamic(df)