# signal_generator.py (VERSI FINAL YANG SUDAH DIPERBAIKI) from __future__ import annotations import logging import json from typing import Dict, List, Tuple, Optional, Any from collections import Counter import MetaTrader5 as mt5 from data_fetching import get_candlestick_data from technical_indicators import ( detect_structure, detect_order_blocks_multi, detect_fvg_multi, detect_eqh_eql, detect_liquidity_sweep, detect_liquidity_grab, calculate_optimal_trade_entry, detect_engulfing, detect_pinbar, detect_continuation_patterns, analyze_volatility, # Tambahkan import ini adjust_trade_parameters, # Tambahkan import ini detect_smc_structure, # Tambahkan import ini ) from gng_model import ( get_gng_input_features_full, get_gng_context, ) def get_open_positions_per_tf(symbol: str, tf: str, mt5_path: str) -> int: if not mt5.initialize(path=mt5_path): return 99 positions = mt5.positions_get(symbol=symbol) mt5.shutdown() return len(positions) if positions is not None else 0 def get_active_orders(symbol: str, mt5_path: str) -> List[float]: if not mt5.initialize(path=mt5_path): return [] active_prices: List[float] = [] try: positions = mt5.positions_get(symbol=symbol) if positions: for pos in positions: active_prices.append(pos.price_open) orders = mt5.orders_get(symbol=symbol) if orders: for order in orders: active_prices.append(order.price_open) except Exception as e: logging.error(f"Error saat mengambil order/posisi aktif: {e}") finally: mt5.shutdown() return active_prices def is_far_enough(entry_price: float, existing_prices: List[float], point_value: float, min_distance_pips: float) -> bool: min_distance_points = min_distance_pips * point_value for price in existing_prices: if abs(entry_price - price) < min_distance_points: logging.warning(f"Sinyal DITOLAK: Entry {entry_price:.3f} terlalu dekat dengan order aktif di {price:.3f}.") return False return True def build_signal_format(symbol: str, entry_price: float, direction: str, sl: float, tp: float, order_type: str) -> dict: signal = {"Symbol": symbol} # Disesuaikan dengan case-sensitivity dari MQL dan tambahkan kunci yang hilang order_keys = [ "BuyEntry", "BuySL", "BuyTP", "SellEntry", "SellSL", "SellTP", "BuyStop", "BuyStopSL", "BuyStopTP", "SellStop", "SellStopSL", "SellStopTP", "Buylimit", "BuylimitSL", "BuylimitTP", "Selllimit", "SelllimitSL", "SellLimitTP", "DeleteLimit/Stop" ] for key in order_keys: signal[key] = "" order_type_upper = order_type.upper() if order_type_upper == 'BUY': signal.update({"BuyEntry": str(entry_price), "BuySL": str(sl), "BuyTP": str(tp)}) elif order_type_upper == 'SELL': signal.update({"SellEntry": str(entry_price), "SellSL": str(sl), "SellTP": str(tp)}) elif order_type_upper == 'BUY_LIMIT': signal.update({"Buylimit": str(entry_price), "BuylimitSL": str(sl), "BuylimitTP": str(tp)}) elif order_type_upper == 'SELL_LIMIT': signal.update({"Selllimit": str(entry_price), "SelllimitSL": str(sl), "SellLimitTP": str(tp)}) elif order_type_upper == 'BUY_STOP': signal.update({"BuyStop": str(entry_price), "BuyStopSL": str(sl), "BuyStopTP": str(tp)}) elif order_type_upper == 'SELL_STOP': signal.update({"SellStop": str(entry_price), "SellStopSL": str(sl), "SellStopTP": str(tp)}) return signal def make_signal_id(signal_json: Dict[str, str]) -> str: return str(abs(hash(json.dumps(signal_json, sort_keys=True)))) def analyze_tf_opportunity( symbol: str, tf: str, mt5_path: str, gng_model, gng_feature_stats: Dict[str, Any], confidence_threshold: float, min_distance_pips_per_tf: Dict[str, float], weights: Dict[str, float], atr_multiplier: Dict[str, float] = None, htf_bias: str = 'NEUTRAL', htf_config: Dict[str, Any] = None ) -> Optional[Dict[str, Any]]: # Definisikan htf_config jika None if htf_config is None: htf_config = { 'enabled': False, 'bias_influence_score': 2.5, 'penalty_score': -5.0 } df = get_candlestick_data(symbol, tf, 200, mt5_path) if df is None or len(df) < 50: logging.warning(f"Data TF {tf} tidak cukup untuk analisis.") return None # --- Inisialisasi Skor & Komponen --- score = 0.0 score_components = {} info_list: List[str] = [] logging.info(f"[Arshy | {tf}] --- Memulai Analisis Konfluensi ---") # --- Analisis Komponen --- current_price = df['close'].iloc[-1] structure_str, swing_points = detect_structure(df) atr = df['high'].sub(df['low']).rolling(14).mean().iloc[-1] order_blocks = detect_order_blocks_multi(df, structure_filter=structure_str) fvg_zones = detect_fvg_multi(df) liquidity_sweep = detect_liquidity_sweep(df) liquidity_grabs = detect_liquidity_grab(df) patterns = detect_engulfing(df) + detect_pinbar(df) + detect_continuation_patterns(df) # --- Analisis SMC --- smc_analysis = detect_smc_structure(df) # Tambahkan skor untuk struktur SMC if smc_analysis['internal_structure']['bullish_bos']: score += weights.get('SMC_INTERNAL_BULL_BOS', 2.0) score_components['smc_internal'] = weights.get('SMC_INTERNAL_BULL_BOS', 2.0) info_list.append("SMC Internal Bullish BOS") elif smc_analysis['internal_structure']['bearish_bos']: score += weights.get('SMC_INTERNAL_BEAR_BOS', -2.0) score_components['smc_internal'] = weights.get('SMC_INTERNAL_BEAR_BOS', -2.0) info_list.append("SMC Internal Bearish BOS") if smc_analysis['swing_structure']['bullish_choch']: score += weights.get('SMC_SWING_BULL_CHOCH', 3.0) score_components['smc_swing'] = weights.get('SMC_SWING_BULL_CHOCH', 3.0) info_list.append("SMC Swing Bullish CHoCH") elif smc_analysis['swing_structure']['bearish_choch']: score += weights.get('SMC_SWING_BEAR_CHOCH', -3.0) score_components['smc_swing'] = weights.get('SMC_SWING_BEAR_CHOCH', -3.0) info_list.append("SMC Swing Bearish CHoCH") # Analisis Order Blocks for ob in smc_analysis['order_blocks']: if ob['type'] == 'bullish' and current_price < ob['high']: score += weights.get('SMC_BULL_OB', 1.5) score_components['smc_ob'] = weights.get('SMC_BULL_OB', 1.5) info_list.append("Inside Bullish Order Block") elif ob['type'] == 'bearish' and current_price > ob['low']: score += weights.get('SMC_BEAR_OB', -1.5) score_components['smc_ob'] = weights.get('SMC_BEAR_OB', -1.5) info_list.append("Inside Bearish Order Block") # Analisis Fair Value Gaps for fvg in smc_analysis['fair_value_gaps'][-3:]: # Cek 3 FVG terakhir if fvg['type'] == 'bullish' and current_price < fvg['top']: score += weights.get('SMC_BULL_FVG', 2.0) score_components['smc_fvg'] = weights.get('SMC_BULL_FVG', 2.0) info_list.append("Below Bullish FVG") elif fvg['type'] == 'bearish' and current_price > fvg['bottom']: score += weights.get('SMC_BEAR_FVG', -2.0) score_components['smc_fvg'] = weights.get('SMC_BEAR_FVG', -2.0) info_list.append("Above Bearish FVG") # --- Kalkulasi Skor --- score = 0.0 score_components = {} # <-- DITAMBAHKAN DI SINI info_list: List[str] = [] logging.info(f"[Arshy | {tf}] --- Memulai Analisis Konfluensi ---") # Skor Struktur structure_score = 0 if "BULLISH_BOS" in structure_str: structure_score += weights.get("BULLISH_BOS", 3.0) if "BEARISH_BOS" in structure_str: structure_score += weights.get("BEARISH_BOS", -3.0) if "HH" in structure_str: structure_score += weights.get("HH", 1.0) if "LL" in structure_str: structure_score += weights.get("LL", -1.0) if "HL" in structure_str: structure_score += weights.get("HL", 1.0) if "LH" in structure_str: structure_score += weights.get("LH", -1.0) score_components['structure'] = structure_score score += structure_score logging.info(f"[Arshy | {tf}] Analisis Struktur: Teridentifikasi '{structure_str}' (Skor: {structure_score:+.2f})") # Skor Zona (FVG, OB) & Event (LS) if fvg_zones: nearest_fvg = fvg_zones[0] fvg_score = (weights.get('FVG_BULLISH', 3.0) if 'BULLISH' in nearest_fvg['type'] else weights.get('FVG_BEARISH', -3.0)) * nearest_fvg['strength'] score_components['fvg'] = fvg_score score += fvg_score logging.info(f"[Arshy | {tf}] Zona Inefisiensi (FVG): {nearest_fvg['type']} @ {nearest_fvg['start']:.3f}-{nearest_fvg['end']:.3f} terdeteksi (Skor: {fvg_score:+.2f})") if liquidity_sweep: ls_event = liquidity_sweep[-1] ls_score = weights.get(ls_event.get('type')) if ls_score: score_components['liquidity_sweep'] = ls_score score += ls_score logging.info(f"[Arshy | {tf}] Perburuan Likuiditas (Sweep): {ls_event.get('type')} @ {ls_event.get('swept_level'):.3f} terdeteksi (Skor: {ls_score:+.2f})") if liquidity_grabs: grab = liquidity_grabs[-1] # Ambil yang terbaru grab_score = weights.get(grab.get('type'), 0.0) * grab.get('strength', 1.0) if grab_score != 0: score_components['liquidity_grab'] = grab_score score += grab_score logging.info(f"[Arshy | {tf}] Perburuan Likuiditas (Grab): {grab.get('type')} pada level {grab.get('swept_level'):.4f} terdeteksi (Skor: {grab_score:+.2f})") if order_blocks: nearest_ob = order_blocks[0] ob_score = (weights.get('BULLISH_OB', 1.0) if 'BULLISH' in nearest_ob['type'] else weights.get('BEARISH_OB', -1.0)) * nearest_ob['strength'] score_components['order_block'] = ob_score score += ob_score logging.info(f"[Arshy | {tf}] Zona Order Block: {nearest_ob['type']} @ {nearest_ob['low']:.3f}-{nearest_ob['high']:.3f} terdeteksi (Skor: {ob_score:+.2f})") # Skor Pola Minor pattern_score = sum(weights.get(p.get('type'), 0) for p in patterns) if pattern_score != 0: bullish_patterns = [p.get('type') for p in patterns if weights.get(p.get('type'), 0) > 0] bearish_patterns = [p.get('type') for p in patterns if weights.get(p.get('type'), 0) < 0] bull_summary = ", ".join([f"{count}x {name}" for name, count in Counter(bullish_patterns).items()]) bear_summary = ", ".join([f"{count}x {name}" for name, count in Counter(bearish_patterns).items()]) summary_parts = [s for s in [f"Bullish: [{bull_summary}]" if bull_summary else "", f"Bearish: [{bear_summary}]" if bear_summary else ""] if s] logging.info(f"[Arshy | {tf}] Konfluensi Pola Minor: {' | '.join(summary_parts)} (Skor Total: {pattern_score:+.2f})") score_components['patterns'] = pattern_score score += pattern_score logging.info(f"[Arshy | {tf}] Kalkulasi Skor Awal: {score:.2f}") # --- Penerapan HTF Bias --- if htf_config.get('enabled', False) and htf_bias != 'NEUTRAL': direction_pre_bias = "BUY" if score > 0 else "SELL" bias_score = htf_config.get('bias_influence_score', 2.5) penalty_score = htf_config.get('penalty_score', -5.0) if (htf_bias == 'BULLISH' and direction_pre_bias == 'BUY') or (htf_bias == 'BEARISH' and direction_pre_bias == 'SELL'): score_components['htf_bias'] = bias_score if direction_pre_bias == 'BUY' else -bias_score score += bias_score if direction_pre_bias == 'BUY' else -bias_score logging.info(f"[Arshy | {tf}] Validasi Tren HTF: Sinyal {direction_pre_bias} didukung. Skor disesuaikan {bias_score:+.2f}") else: score_components['htf_bias'] = penalty_score score += penalty_score logging.warning(f"[Arshy | {tf}] Validasi Tren HTF: Sinyal {direction_pre_bias} berlawanan dengan bias {htf_bias}. Penalti {penalty_score:.2f} diterapkan.") logging.info(f"[Arshy | {tf}] Skor Akhir Terkalkulasi: {score:.2f}") # --- Penentuan Arah & Tipe Order Berdasarkan Setup --- direction = "WAIT" order_type = None entry_price_chosen = current_price if score >= confidence_threshold: direction = "BUY" # 1. BUY LIMIT: Gunakan OTE zone untuk entry yang lebih presisi bullish_poi = sorted([z for z in fvg_zones if 'BULLISH' in z['type'] and z['start'] < current_price] + [z for z in order_blocks if 'BULLISH' in z['type'] and z['high'] < current_price], key=lambda z: z['distance']) if bullish_poi: best_zone = bullish_poi[0] # Hitung zona OTE yang lebih detail swing_start = best_zone.get('end', best_zone.get('low')) swing_end = best_zone.get('start', best_zone.get('high')) ote_zones = calculate_optimal_trade_entry(swing_start, swing_end, direction) # Gunakan mid point dari OTE zone sebagai entry default ote_entry = ote_zones.get('mid') # Validasi apakah harga OTE masih valid if ote_entry and ote_entry < current_price: order_type = "BUY_LIMIT" entry_price_chosen = ote_entry # Gunakan upper OTE sebagai validasi stop loss sl_reference = ote_zones.get('lower', entry_price_chosen - (atr * 1.5)) info_list.append(f"BUY_LIMIT at OTE zone {ote_entry:.2f} based on {best_zone['type']}") logging.info(f"[Arshy | {tf}] Setup: Pullback ke {best_zone['type']}. BUY_LIMIT di zona OTE.") # 2. BUY STOP: Gunakan level struktur + buffer ATR if not order_type and "BULLISH_BOS" in structure_str and swing_points.get('last_high'): breakout_level = swing_points['last_high'] atr_buffer = atr * 0.1 # Buffer 10% ATR untuk menghindari false breakout entry_price_chosen = breakout_level + atr_buffer order_type = "BUY_STOP" info_list.append(f"BUY_STOP above structure {breakout_level:.2f} + buffer") logging.info(f"[Arshy | {tf}] Setup: Breakout BOS. BUY_STOP di atas struktur.") # 3. INSTANT BUY: Gunakan konfirmasi momentum if not order_type: # Cek konfirmasi momentum tambahan is_strong_momentum = any(p['type'] in ['ENGULFING_BULL', 'PINBAR_BULL'] for p in patterns[-3:]) if is_strong_momentum: order_type = "BUY" entry_price_chosen = current_price info_list.append("INSTANT BUY with strong momentum confirmation") logging.info(f"[Arshy | {tf}] Setup: Momentum Kuat dengan konfirmasi pola. MARKET BUY.") elif score <= -confidence_threshold: direction = "SELL" # 1. SELL LIMIT: Gunakan OTE zone untuk entry yang lebih presisi bearish_poi = sorted([z for z in fvg_zones if 'BEARISH' in z['type'] and z['start'] > current_price] + [z for z in order_blocks if 'BEARISH' in z['type'] and z['low'] > current_price], key=lambda z: z['distance']) if bearish_poi: best_zone = bearish_poi[0] swing_start = best_zone.get('start', best_zone.get('high')) swing_end = best_zone.get('end', best_zone.get('low')) ote_zones = calculate_optimal_trade_entry(swing_start, swing_end, direction) ote_entry = ote_zones.get('mid') if ote_entry and ote_entry > current_price: order_type = "SELL_LIMIT" entry_price_chosen = ote_entry sl_reference = ote_zones.get('upper', entry_price_chosen + (atr * 1.5)) info_list.append(f"SELL_LIMIT at OTE zone {ote_entry:.2f} based on {best_zone['type']}") logging.info(f"[Arshy | {tf}] Setup: Pullback ke {best_zone['type']}. SELL_LIMIT di zona OTE.") # 2. SELL STOP: Gunakan level struktur + buffer ATR if not order_type and "BEARISH_BOS" in structure_str and swing_points.get('last_low'): breakout_level = swing_points['last_low'] atr_buffer = atr * 0.1 entry_price_chosen = breakout_level - atr_buffer order_type = "SELL_STOP" info_list.append(f"SELL_STOP below structure {breakout_level:.2f} - buffer") logging.info(f"[Arshy | {tf}] Setup: Breakout BOS. SELL_STOP di bawah struktur.") # 3. INSTANT SELL: Gunakan konfirmasi momentum if not order_type: is_strong_momentum = any(p['type'] in ['ENGULFING_BEAR', 'PINBAR_BEAR'] for p in patterns[-3:]) if is_strong_momentum: order_type = "SELL" entry_price_chosen = current_price info_list.append("INSTANT SELL with strong momentum confirmation") logging.info(f"[Arshy | {tf}] Setup: Momentum Kuat dengan konfirmasi pola. MARKET SELL.") # --- Validasi Akhir & Penentuan SL/TP --- if direction == "WAIT": return None sl, tp = 0.0, 0.0 if direction == "BUY": sl = entry_price_chosen - (atr * 1.5) tp = entry_price_chosen + (atr * 3.0) elif direction == "SELL": sl = entry_price_chosen + (atr * 1.5) tp = entry_price_chosen - (atr * 3.0) # Analisis volatilitas dan penyesuaian parameter volatility = analyze_volatility(df) if atr_multiplier: sl_pips = atr * atr_multiplier['sl'] tp_pips = atr * atr_multiplier['tp'] sl_pips, tp_pips = adjust_trade_parameters(volatility, sl_pips, tp_pips) else: sl_pips = atr * 1.5 tp_pips = atr * 3.0 # Penyesuaian SL/TP berdasarkan jenis trading if tf in ["M1", "M5"]: # Scalping sl_pips *= 0.8 # Lebih ketat tp_pips *= 0.8 elif tf in ["H4", "D1"]: # Swing sl_pips *= 1.5 # Lebih longgar tp_pips *= 1.5 # Update perhitungan SL/TP if direction == "BUY": sl = entry_price_chosen - sl_pips tp = entry_price_chosen + tp_pips elif direction == "SELL": sl = entry_price_chosen + sl_pips tp = entry_price_chosen - tp_pips logging.info(f"[Arshy | {tf}] --- Analisis Selesai --- | Rekomendasi: {direction} | Tipe: {order_type} | Skor Keyakinan: {score:.2f}") return { "signal": direction, "order_type": order_type, "entry_price_chosen": entry_price_chosen, "sl": sl, "tp": tp, "score": score, "info": "; ".join(info_list), "score_components": score_components, "features": get_gng_input_features_full(df, gng_feature_stats, tf) if gng_model else None, "tf": tf, "symbol": symbol, } def validate_and_generate_signal( df: pd.DataFrame, analysis: Dict[str, Any], profile_settings: Dict[str, Any] ) -> Optional[Dict[str, Any]]: """ Validasi dan generate signal dengan pengecekan lebih ketat untuk scalping """ if analysis['profile_type'] == "scalping": # Validasi khusus scalping is_valid, confidence_boost, validations = validate_scalping_opportunity(df, profile_settings) if not is_valid: logging.info(f"Scalping validation failed: {', '.join(validations)}") return None # Tambahkan confidence boost ke score analysis['score'] += confidence_boost analysis['validations'] = validations # Additional checks for scalping current_spread = get_current_spread(df) if current_spread > profile_settings['entry_rules']['max_spread_multiplier']: logging.info(f"Spread too high for scalping: {current_spread}") return None # Quick momentum check last_close = df['close'].iloc[-1] prev_close = df['close'].iloc[-2] momentum_aligned = ( (analysis['score'] > 0 and last_close > prev_close) or (analysis['score'] < 0 and last_close < prev_close) ) if not momentum_aligned: logging.info("Momentum not aligned with signal direction") return None # Generate entry points entry_points = calculate_entry_points(df, analysis, profile_settings) if not entry_points: return None # Calculate stop loss and take profit sl_tp = calculate_sl_tp(df, analysis, entry_points['entry'], profile_settings) if not sl_tp: return None signal = { 'symbol': analysis['symbol'], 'signal': 'BUY' if analysis['score'] > 0 else 'SELL', 'entry_price': entry_points['entry'], 'sl': sl_tp['sl'], 'tp': sl_tp['tp'], 'score': analysis['score'], 'timeframe': analysis['timeframe'], 'profile_type': analysis['profile_type'] } if analysis['profile_type'] == "scalping": signal['validations'] = analysis.get('validations', []) return signal def calculate_entry_points( df: pd.DataFrame, analysis: Dict[str, Any], profile_settings: Dict[str, Any] ) -> Optional[Dict[str, float]]: """Calculate optimal entry points with profile-specific logic""" current_price = df['close'].iloc[-1] atr = calculate_atr_dynamic(df) if analysis['profile_type'] == "scalping": # For scalping, we want very precise entries if analysis['score'] > 0: # Bullish entry = current_price + (atr * 0.1) # Tight entry above current price if entry > df['high'].iloc[-1]: # Don't enter above recent high entry = current_price else: # Bearish entry = current_price - (atr * 0.1) # Tight entry below current price if entry < df['low'].iloc[-1]: # Don't enter below recent low entry = current_price else: # For intraday and swing if analysis['score'] > 0: entry = current_price + (atr * 0.2) else: entry = current_price - (atr * 0.2) return {'entry': entry} def calculate_sl_tp( df: pd.DataFrame, analysis: Dict[str, Any], entry_price: float, profile_settings: Dict[str, Any] ) -> Optional[Dict[str, float]]: """Calculate SL/TP with profile-specific risk management""" atr = calculate_atr_dynamic(df) if analysis['profile_type'] == "scalping": # Tighter stops for scalping sl_distance = atr * profile_settings['atr_multiplier']['sl'] tp_distance = atr * profile_settings['atr_multiplier']['tp'] if analysis['score'] > 0: # Bullish sl = entry_price - sl_distance tp = entry_price + tp_distance # Validate against recent price action if sl > df['low'].iloc[-3:].min(): # SL must be below recent lows sl = df['low'].iloc[-3:].min() - (atr * 0.1) else: # Bearish sl = entry_price + sl_distance tp = entry_price - tp_distance # Validate against recent price action if sl < df['high'].iloc[-3:].max(): # SL must be above recent highs sl = df['high'].iloc[-3:].max() + (atr * 0.1) else: # For intraday and swing sl_distance = atr * profile_settings['atr_multiplier']['sl'] tp_distance = atr * profile_settings['atr_multiplier']['tp'] if analysis['score'] > 0: sl = entry_price - sl_distance tp = entry_price + tp_distance else: sl = entry_price + sl_distance tp = entry_price - tp_distance return {'sl': sl, 'tp': tp} def get_current_spread(df: pd.DataFrame) -> float: """Calculate current spread from OHLC data""" if 'spread' in df.columns: return df['spread'].iloc[-1] return abs(df['high'].iloc[-1] - df['low'].iloc[-1]) / calculate_atr_dynamic(df)