import pandas as pd import numpy as np from typing import Dict, Any, Tuple, List, Optional from datetime import datetime from enum import Enum import talib as ta class TradingProfile(Enum): SCALPER = "scalper" INTRADAY = "intraday" SWING = "swing" POSITION = "position" class ProfileAnalyzer: """Kelas untuk menganalisis market sesuai profil trading""" def __init__(self, profile: TradingProfile): self.profile = profile def analyze(self, df: pd.DataFrame) -> Dict[str, Any]: """Analisis berdasarkan profil trading spesifik""" if self.profile == TradingProfile.SCALPER: return self._analyze_scalper(df) elif self.profile == TradingProfile.INTRADAY: return self._analyze_intraday(df) elif self.profile == TradingProfile.SWING: return self._analyze_swing(df) else: # POSITION return self._analyze_position(df) def _analyze_scalper(self, df: pd.DataFrame) -> Dict[str, Any]: """Analisis khusus untuk scalper""" # Fokus pada timeframe pendek (1-5 menit) result = { 'volatility': self._calculate_micro_volatility(df), 'momentum': self._calculate_short_momentum(df), 'spread_impact': self._analyze_spread_impact(df), 'micro_structure': self._analyze_micro_structure(df) } return result def _analyze_intraday(self, df: pd.DataFrame) -> Dict[str, Any]: """Analisis khusus untuk intraday trader""" # Fokus pada pergerakan dalam satu hari result = { 'day_momentum': self._calculate_day_momentum(df), 'session_analysis': self._analyze_trading_session(df), 'key_levels': self._find_intraday_levels(df), 'volume_profile': self._analyze_volume_profile(df) } return result def _analyze_swing(self, df: pd.DataFrame) -> Dict[str, Any]: """Analisis khusus untuk swing trader""" # Fokus pada multiple timeframe analysis result = { 'trend_strength': self._calculate_trend_strength(df), 'key_support_resistance': self._find_key_levels(df), 'market_structure': self._analyze_market_structure(df), 'momentum_divergence': self._find_divergences(df) } return result def _analyze_position(self, df: pd.DataFrame) -> Dict[str, Any]: """Analisis khusus untuk position trader""" # Fokus pada analisis fundamental dan trend jangka panjang result = { 'major_trend': self._analyze_major_trend(df), 'structural_changes': self._detect_structural_changes(df), 'volatility_regime': self._analyze_volatility_regime(df), 'market_phase': self._determine_market_phase(df) } return result # --- Helper Methods untuk Scalper --- def _calculate_micro_volatility(self, df: pd.DataFrame) -> Dict[str, float]: """Hitung volatilitas mikro untuk scalping""" if len(df) < 20: return {'current': 0.0, 'average': 0.0} df['micro_vol'] = (df['high'] - df['low']) / df['low'] * 100 current = float(df['micro_vol'].iloc[-1]) average = float(df['micro_vol'].rolling(20).mean().iloc[-1]) return {'current': current, 'average': average} def _calculate_short_momentum(self, df: pd.DataFrame) -> Dict[str, Any]: """Analisis momentum jangka pendek""" if len(df) < 14: return {'rsi': 50.0, 'momentum': 0.0} # Hitung RSI delta = df['close'].diff() gain = (delta.where(delta > 0, 0)).rolling(window=14).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean() rs = gain / loss rsi = 100 - (100 / (1 + rs)) # Hitung momentum momentum = df['close'].iloc[-1] - df['close'].iloc[-5] return { 'rsi': float(rsi.iloc[-1]), 'momentum': float(momentum) } def _analyze_spread_impact(self, df: pd.DataFrame) -> Dict[str, float]: """Analisis dampak spread pada scalping""" if 'spread' not in df.columns or len(df) < 20: return {'spread_ratio': 1.0} avg_range = (df['high'] - df['low']).rolling(20).mean() spread_ratio = df['spread'] / avg_range return {'spread_ratio': float(spread_ratio.iloc[-1])} def _analyze_micro_structure(self, df: pd.DataFrame) -> Dict[str, Any]: """Analisis struktur mikro pasar""" if len(df) < 20: return {'pattern': 'unknown', 'strength': 0.0} # Deteksi pola-pola price action last_candles = df.tail(3) pattern = self._detect_candlestick_pattern(last_candles) strength = self._calculate_pattern_strength(last_candles) return {'pattern': pattern, 'strength': strength} # --- Helper Methods untuk Intraday --- def _calculate_day_momentum(self, df: pd.DataFrame) -> Dict[str, float]: """Hitung momentum harian""" if len(df) < 24: # Minimal 24 candle untuk data harian return {'momentum': 0.0} day_open = df['open'].iloc[0] current_price = df['close'].iloc[-1] momentum = (current_price - day_open) / day_open * 100 return {'momentum': float(momentum)} def _analyze_trading_session(self, df: pd.DataFrame) -> Dict[str, str]: """Analisis sesi trading (Asia, London, New York)""" current_hour = pd.Timestamp.now().hour if 0 <= current_hour < 8: session = "ASIAN" elif 8 <= current_hour < 16: session = "LONDON" else: session = "NEW_YORK" return {'current_session': session} def _find_intraday_levels(self, df: pd.DataFrame) -> Dict[str, List[float]]: """Temukan level-level penting intraday""" if len(df) < 100: return {'support': [], 'resistance': []} # Identifikasi swing high/low highs = self._find_swing_points(df['high'], 'high') lows = self._find_swing_points(df['low'], 'low') return { 'support': [float(x) for x in lows], 'resistance': [float(x) for x in highs] } # --- Helper Methods untuk Swing Trading --- def _calculate_trend_strength(self, df: pd.DataFrame) -> Dict[str, float]: """Hitung kekuatan trend untuk swing trading""" if len(df) < 50: return {'strength': 0.0} # Hitung ADX tr1 = df['high'] - df['low'] tr2 = abs(df['high'] - df['close'].shift()) tr3 = abs(df['low'] - df['close'].shift()) tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1) atr = tr.rolling(14).mean() dx = atr.diff() / atr * 100 adx = abs(dx).rolling(14).mean() return {'strength': float(adx.iloc[-1])} def _find_key_levels(self, df: pd.DataFrame) -> Dict[str, List[float]]: """Identifikasi level kunci untuk swing trading""" if len(df) < 100: return {'major_levels': []} # Temukan level-level yang sering ditest price_rounds = np.round(df['close'] / 10) * 10 level_counts = price_rounds.value_counts() major_levels = level_counts[level_counts > level_counts.mean()].index.tolist() return {'major_levels': [float(x) for x in major_levels]} # --- Helper Methods untuk Position Trading --- def _analyze_major_trend(self, df: pd.DataFrame) -> Dict[str, str]: """Analisis trend utama untuk position trading""" if len(df) < 200: return {'trend': 'UNDEFINED'} sma200 = df['close'].rolling(200).mean() current_price = df['close'].iloc[-1] if current_price > sma200.iloc[-1]: trend = 'BULLISH' else: trend = 'BEARISH' return {'trend': trend} def _detect_structural_changes(self, df: pd.DataFrame) -> Dict[str, Any]: """Deteksi perubahan struktur pasar jangka panjang""" if len(df) < 100: return {'change_detected': False} # Deteksi dengan moving average crossover ma50 = df['close'].rolling(50).mean() ma200 = df['close'].rolling(200).mean() cross_over = (ma50.iloc[-2] <= ma200.iloc[-2]) and (ma50.iloc[-1] > ma200.iloc[-1]) cross_under = (ma50.iloc[-2] >= ma200.iloc[-2]) and (ma50.iloc[-1] < ma200.iloc[-1]) return { 'change_detected': cross_over or cross_under, 'type': 'BULLISH' if cross_over else 'BEARISH' if cross_under else 'NONE' } def _find_swing_points(self, series: pd.Series, point_type: str, window: int = 5) -> List[float]: """Helper untuk menemukan swing points""" points = [] for i in range(window, len(series) - window): if point_type == 'high': if all(series[i] > series[i-window:i]) and all(series[i] > series[i+1:i+window+1]): points.append(series[i]) else: # low if all(series[i] < series[i-window:i]) and all(series[i] < series[i+1:i+window+1]): points.append(series[i]) return points[-3:] if points else [] # Return 3 terakhir saja def _detect_candlestick_pattern(self, candles: pd.DataFrame) -> str: """Helper untuk deteksi pola candlestick""" if len(candles) < 3: return "unknown" # Contoh sederhana: deteksi bullish engulfing prev_candle = candles.iloc[-2] curr_candle = candles.iloc[-1] if (prev_candle['close'] < prev_candle['open'] and # Previous bearish curr_candle['close'] > curr_candle['open'] and # Current bullish curr_candle['open'] < prev_candle['close'] and # Opens below prev close curr_candle['close'] > prev_candle['open']): # Closes above prev open return "bullish_engulfing" return "no_pattern" def _calculate_pattern_strength(self, candles: pd.DataFrame) -> float: """Helper untuk hitung kekuatan pola""" if len(candles) < 3: return 0.0 # Hitung berdasarkan ukuran relatif candle dan volume curr_candle = candles.iloc[-1] body_size = abs(curr_candle['close'] - curr_candle['open']) avg_body = abs(candles['close'] - candles['open']).mean() strength = body_size / avg_body if avg_body != 0 else 1.0 return min(strength, 1.0) # Normalize to max 1.0 def calculate_slope(df: pd.DataFrame, length: int = 14, method: str = 'atr', multiplier: float = 1.0) -> float: """ Menghitung slope untuk trendline berdasarkan beberapa metode. Args: df: DataFrame dengan data OHLCV length: Periode lookback method: Metode perhitungan ('atr', 'stdev', 'linreg') multiplier: Pengali slope Returns: float: Nilai slope """ if method == 'atr': # ATR calculation high_low = df['high'] - df['low'] high_close = abs(df['high'] - df['close'].shift()) low_close = abs(df['low'] - df['close'].shift()) ranges = pd.concat([high_low, high_close, low_close], axis=1) true_range = ranges.max(axis=1) atr = true_range.rolling(length).mean() return (atr.iloc[-1] / length) * multiplier elif method == 'stdev': return (df['close'].rolling(length).std().iloc[-1] / length) * multiplier elif method == 'linreg': # Linear regression slope x = np.arange(len(df[-length:])) y = df['close'].iloc[-length:].values slope, _ = np.polyfit(x, y, 1) return abs(slope) * multiplier return 0.0 def detect_pivot_points(df: pd.DataFrame, length: int = 14) -> Dict[str, Any]: """ Mendeteksi pivot high dan pivot low. Args: df: DataFrame dengan data OHLCV length: Periode lookback Returns: Dict dengan pivot high dan low """ highs = df['high'].rolling(2*length + 1, center=True).apply( lambda x: 1 if x.iloc[length] == max(x) else 0 ) lows = df['low'].rolling(2*length + 1, center=True).apply( lambda x: 1 if x.iloc[length] == min(x) else 0 ) pivot_high = df['high'].where(highs == 1) pivot_low = df['low'].where(lows == 1) return { 'pivot_high': pivot_high.dropna(), 'pivot_low': pivot_low.dropna() } def analyze_trendlines(df: pd.DataFrame, length: int = 14, slope_mult: float = 1.0, calc_method: str = 'atr') -> Dict[str, Any]: """ Menganalisis trendline dan deteksi breakout. Args: df: DataFrame dengan data OHLCV length: Periode untuk deteksi swing slope_mult: Pengali untuk perhitungan slope calc_method: Metode perhitungan slope ('atr', 'stdev', 'linreg') Returns: Dict dengan informasi trendline dan breakout """ # Deteksi pivot points pivots = detect_pivot_points(df, length) # Hitung slope slope = calculate_slope(df, length, calc_method, slope_mult) # Inisialisasi variabel upper_trendline = None lower_trendline = None last_pivot_high = None last_pivot_low = None # Dapatkan pivot terakhir if len(pivots['pivot_high']) > 0: last_pivot_high = pivots['pivot_high'].iloc[-1] if len(pivots['pivot_low']) > 0: last_pivot_low = pivots['pivot_low'].iloc[-1] # Hitung trendlines current_price = df['close'].iloc[-1] current_high = df['high'].iloc[-1] current_low = df['low'].iloc[-1] if last_pivot_high is not None: bars_since_high = len(df) - pivots['pivot_high'].index[-1] upper_trendline = last_pivot_high - (slope * bars_since_high) if last_pivot_low is not None: bars_since_low = len(df) - pivots['pivot_low'].index[-1] lower_trendline = last_pivot_low + (slope * bars_since_low) # Deteksi breakout upward_break = False downward_break = False if upper_trendline is not None and current_high > upper_trendline: upward_break = True if lower_trendline is not None and current_low < lower_trendline: downward_break = True return { 'upper_trendline': upper_trendline, 'lower_trendline': lower_trendline, 'upward_break': upward_break, 'downward_break': downward_break, 'slope': slope, 'last_pivot_high': last_pivot_high, 'last_pivot_low': last_pivot_low } # --- Update existing get_market_context function --- def get_market_context(df: pd.DataFrame, profile: Optional[str] = None) -> Dict[str, Any]: """Analisis lengkap konteks market dengan mempertimbangkan profil trading.""" # Base context tetap sama context = { 'zones': detect_premium_discount_zones(df), 'structure': analyze_break_of_structure(df), 'equilibrium': detect_market_equilibrium(df) } # Tambahkan analisis trendline trendline_analysis = analyze_trendlines( df, length=14, # Sesuaikan dengan kebutuhan slope_mult=1.0, calc_method='atr' ) context['trendlines'] = trendline_analysis # Tentukan bias market dengan mempertimbangkan trendline breaks if trendline_analysis['upward_break']: context['structure']['trendline_break'] = 'BULLISH' elif trendline_analysis['downward_break']: context['structure']['trendline_break'] = 'BEARISH' else: context['structure']['trendline_break'] = None # Tambahkan analisis SMC zones dan profile-specific analysis smc_zones = calculate_smc_zones(df) zone_analysis = detect_zone_transition(df, smc_zones) context['smc_analysis'] = { 'zones': smc_zones, 'current_state': zone_analysis } if profile: try: profile_enum = TradingProfile(profile.lower()) analyzer = ProfileAnalyzer(profile_enum) profile_analysis = analyzer.analyze(df) context['profile_analysis'] = profile_analysis except ValueError: context['profile_analysis'] = None # Tentukan bias market berdasarkan semua analisis if context['structure']['bos_detected']: bias = context['structure']['bos_direction'] elif context['structure']['trendline_break']: bias = context['structure']['trendline_break'] elif context['equilibrium']['equilibrium_detected']: bias = 'NEUTRAL' else: current_price = df['close'].iloc[-1] if current_price > context['zones']['premium']: bias = 'BULLISH' elif current_price < context['zones']['discount']: bias = 'BEARISH' else: bias = 'NEUTRAL' context['market_bias'] = bias return context def detect_premium_discount_zones(df: pd.DataFrame) -> Dict[str, float]: """Deteksi zona premium dan discount""" if len(df) < 20: return {'premium': df['high'].max(), 'discount': df['low'].min()} # Gunakan Volume Profile untuk menentukan zona volume_profile = analyze_volume_profile(df) high_vol_zones = sorted([zone for zone, vol in volume_profile.items() if vol > volume_profile.mean()], reverse=True) if len(high_vol_zones) >= 2: premium = high_vol_zones[0] discount = high_vol_zones[-1] else: # Fallback ke simple moving average sma20 = df['close'].rolling(20).mean().iloc[-1] premium = sma20 * 1.01 # 1% di atas SMA discount = sma20 * 0.99 # 1% di bawah SMA return {'premium': float(premium), 'discount': float(discount)} def analyze_break_of_structure(df: pd.DataFrame) -> Dict[str, Any]: """Analisis Break of Structure (BOS)""" if len(df) < 50: return {'bos_detected': False, 'bos_direction': None, 'strength': 0.0} # Temukan swing high/low terakhir swing_highs = find_swing_points(df['high'], 'high', window=10) swing_lows = find_swing_points(df['low'], 'low', window=10) current_price = df['close'].iloc[-1] # Cek break of structure if swing_highs and current_price > max(swing_highs): return { 'bos_detected': True, 'bos_direction': 'BULLISH', 'strength': (current_price - max(swing_highs)) / df['atr'].iloc[-1] } elif swing_lows and current_price < min(swing_lows): return { 'bos_detected': True, 'bos_direction': 'BEARISH', 'strength': (min(swing_lows) - current_price) / df['atr'].iloc[-1] } return {'bos_detected': False, 'bos_direction': None, 'strength': 0.0} def detect_market_equilibrium(df: pd.DataFrame) -> Dict[str, Any]: """Deteksi keseimbangan market (equilibrium)""" if len(df) < 20: return {'equilibrium_detected': False, 'confidence': 0.0} # Hitung volatilitas relatif atr = df['atr'].iloc[-1] recent_range = df['high'].iloc[-5:].max() - df['low'].iloc[-5:].min() # Hitung volume relatif avg_volume = df['volume'].rolling(20).mean().iloc[-1] current_volume = df['volume'].iloc[-1] # Deteksi kondisi equilibrium low_volatility = recent_range < atr * 0.5 declining_volume = current_volume < avg_volume * 0.7 equilibrium_detected = low_volatility and declining_volume # Hitung tingkat keyakinan if equilibrium_detected: volatility_ratio = 1 - (recent_range / (atr * 0.5)) volume_ratio = 1 - (current_volume / (avg_volume * 0.7)) confidence = (volatility_ratio + volume_ratio) / 2 else: confidence = 0.0 return { 'equilibrium_detected': equilibrium_detected, 'confidence': float(confidence) } def analyze_volume_profile(df: pd.DataFrame) -> Dict[float, float]: """Analisis Volume Profile untuk menentukan area interest""" if len(df) < 100: return {} # Buat price bins price_range = df['high'].max() - df['low'].min() bin_size = price_range / 20 # 20 bins # Hitung volume untuk setiap price level price_points = (df['high'] + df['low']) / 2 # Point of Control volume_profile = {} for price in np.arange(df['low'].min(), df['high'].max(), bin_size): mask = (price_points >= price) & (price_points < price + bin_size) volume_profile[float(price)] = float(df.loc[mask, 'volume'].sum()) return volume_profile def detect_smc_structure(df: pd.DataFrame) -> Dict[str, Any]: # Placeholder for SMC structure detection. This function should be implemented or imported from technical_indicators_smc.py return {'internal_structure': {}, 'swing_structure': {}, 'order_blocks': [], 'fair_value_gaps': [], 'equal_levels': {'highs': [], 'lows': []}} def analyze_market_structure(df: pd.DataFrame, predictor=None, history_analyzer=None) -> Dict[str, Any]: """Enhanced market structure analysis with prediction and historical insights""" # Get current market analysis current_analysis = detect_smc_structure(df) # Add future price prediction if predictor is available if predictor: prediction = predictor.predict(df) current_analysis['future_prediction'] = prediction # Add trading bias based on prediction if prediction['confidence'] > 0.6: if prediction['predicted_price'] > prediction['current_price']: current_analysis['trading_bias'] = 'BULLISH' else: current_analysis['trading_bias'] = 'BEARISH' else: current_analysis['trading_bias'] = 'NEUTRAL' # Add historical insights if analyzer is available if history_analyzer and hasattr(history_analyzer, 'profitable_patterns'): historical_insights = { 'profitable_patterns': history_analyzer.profitable_patterns, 'optimal_conditions': history_analyzer.market_conditions } current_analysis['historical_insights'] = historical_insights # Check if current conditions match profitable historical patterns current_conditions = { 'volatility': df['high'].rolling(20).std().iloc[-1], 'volume': df['volume'].iloc[-1] if 'volume' in df.columns else None, 'hour': pd.to_datetime(df['time'].iloc[-1]).hour } pattern_match_score = _calculate_pattern_match( current_conditions, historical_insights['optimal_conditions'] ) current_analysis['pattern_match_score'] = pattern_match_score return current_analysis def _calculate_pattern_match(current: Dict[str, Any], optimal: Dict[str, Any]) -> float: """Calculate how well current conditions match historical optimal conditions""" scores = [] # Check volatility match if 'volatility' in current and 'volatility_range' in optimal: vol_range = optimal['volatility_range'] if vol_range['min'] <= current['volatility'] <= vol_range['max']: scores.append(1.0) else: distance = min(abs(current['volatility'] - vol_range['min']), abs(current['volatility'] - vol_range['max'])) scores.append(max(0, 1 - distance/vol_range['optimal'])) # Check volume match if (current['volume'] is not None and 'volume_range' in optimal): vol_range = optimal['volume_range'] if vol_range['min'] <= current['volume'] <= vol_range['max']: scores.append(1.0) else: distance = min(abs(current['volume'] - vol_range['min']), abs(current['volume'] - vol_range['max'])) scores.append(max(0, 1 - distance/vol_range['optimal'])) # Check trading hour match if current['hour'] in optimal.get('best_hours', []): scores.append(1.0) else: scores.append(0.0) return sum(scores) / len(scores) if scores else 0.0 class MarketPredictor: def __init__(self): self.model = None # Placeholder untuk model machine learning def train(self, df: pd.DataFrame): """Latih model prediksi dengan data historis""" # Proses training model pass def predict(self, df: pd.DataFrame) -> Dict[str, float]: """Buat prediksi untuk data pasar saat ini""" if self.model is None: return {'buy_signal': 0.0, 'sell_signal': 0.0} # Proses prediksi features = self._prepare_features(df) prediction = self.model.predict(features) return { 'buy_signal': float(prediction[0]), 'sell_signal': float(prediction[1]) } def _prepare_features(self, df: pd.DataFrame) -> np.ndarray: """Siapkan fitur untuk model prediksi""" # Contoh: gunakan hanya harga penutupan untuk regresi linier sederhana return df['close'].values.reshape(-1, 1) class HistoryAnalyzer: def __init__(self): pass def analyze_historical_trades(self, df: pd.DataFrame, past_trades: List[Dict[str, Any]]) -> Dict[str, Any]: """Analisis trade historis untuk menemukan pola dan evaluasi performa""" # Contoh analisis: hitung rasio profitabilitas if not past_trades: return {'profit_ratio': 0.0, 'loss_ratio': 0.0} total_profit = sum(trade['profit'] for trade in past_trades) total_loss = sum(trade['loss'] for trade in past_trades) profit_ratio = total_profit / (total_profit + total_loss) if (total_profit + total_loss) != 0 else 0 return { 'profit_ratio': profit_ratio, 'loss_ratio': 1 - profit_ratio } predictor = MarketPredictor() analyzer = HistoryAnalyzer()