markaz_arshy/signal_generator.py
2025-08-12 14:36:24 +00:00

523 lines
No EOL
24 KiB
Python

# signal_generator.py (VERSI FINAL YANG SUDAH DIPERBAIKI)
from __future__ import annotations
import logging
import json
from typing import Dict, List, Tuple, Optional, Any
from collections import Counter
import MetaTrader5 as mt5
from data_fetching import get_candlestick_data
from technical_indicators import (
detect_structure,
detect_order_blocks_multi,
detect_fvg_multi,
detect_eqh_eql,
detect_liquidity_sweep,
detect_liquidity_grab,
calculate_optimal_trade_entry,
detect_engulfing,
detect_pinbar,
detect_continuation_patterns,
analyze_volatility, # Tambahkan import ini
adjust_trade_parameters, # Tambahkan import ini
detect_smc_structure, # Tambahkan import ini
)
from gng_model import (
get_gng_input_features_full,
get_gng_context,
)
def get_open_positions_per_tf(symbol: str, tf: str, mt5_path: str) -> int:
if not mt5.initialize(path=mt5_path): return 99
positions = mt5.positions_get(symbol=symbol)
mt5.shutdown()
return len(positions) if positions is not None else 0
def get_active_orders(symbol: str, mt5_path: str) -> List[float]:
if not mt5.initialize(path=mt5_path): return []
active_prices: List[float] = []
try:
positions = mt5.positions_get(symbol=symbol)
if positions:
for pos in positions: active_prices.append(pos.price_open)
orders = mt5.orders_get(symbol=symbol)
if orders:
for order in orders: active_prices.append(order.price_open)
except Exception as e:
logging.error(f"Error saat mengambil order/posisi aktif: {e}")
finally:
mt5.shutdown()
return active_prices
def is_far_enough(entry_price: float, existing_prices: List[float], point_value: float, min_distance_pips: float) -> bool:
min_distance_points = min_distance_pips * point_value
for price in existing_prices:
if abs(entry_price - price) < min_distance_points:
logging.warning(f"Sinyal DITOLAK: Entry {entry_price:.3f} terlalu dekat dengan order aktif di {price:.3f}.")
return False
return True
def build_signal_format(symbol: str, entry_price: float, direction: str, sl: float, tp: float, order_type: str) -> dict:
signal = {"Symbol": symbol}
# Disesuaikan dengan case-sensitivity dari MQL dan tambahkan kunci yang hilang
order_keys = [
"BuyEntry", "BuySL", "BuyTP", "SellEntry", "SellSL", "SellTP",
"BuyStop", "BuyStopSL", "BuyStopTP", "SellStop", "SellStopSL", "SellStopTP",
"Buylimit", "BuylimitSL", "BuylimitTP", "Selllimit", "SelllimitSL", "SellLimitTP",
"DeleteLimit/Stop"
]
for key in order_keys:
signal[key] = ""
order_type_upper = order_type.upper()
if order_type_upper == 'BUY':
signal.update({"BuyEntry": str(entry_price), "BuySL": str(sl), "BuyTP": str(tp)})
elif order_type_upper == 'SELL':
signal.update({"SellEntry": str(entry_price), "SellSL": str(sl), "SellTP": str(tp)})
elif order_type_upper == 'BUY_LIMIT':
signal.update({"Buylimit": str(entry_price), "BuylimitSL": str(sl), "BuylimitTP": str(tp)})
elif order_type_upper == 'SELL_LIMIT':
signal.update({"Selllimit": str(entry_price), "SelllimitSL": str(sl), "SellLimitTP": str(tp)})
elif order_type_upper == 'BUY_STOP':
signal.update({"BuyStop": str(entry_price), "BuyStopSL": str(sl), "BuyStopTP": str(tp)})
elif order_type_upper == 'SELL_STOP':
signal.update({"SellStop": str(entry_price), "SellStopSL": str(sl), "SellStopTP": str(tp)})
return signal
def make_signal_id(signal_json: Dict[str, str]) -> str:
return str(abs(hash(json.dumps(signal_json, sort_keys=True))))
def analyze_tf_opportunity(
symbol: str,
tf: str,
mt5_path: str,
gng_model,
gng_feature_stats: Dict[str, Any],
confidence_threshold: float,
min_distance_pips_per_tf: Dict[str, float],
weights: Dict[str, float],
atr_multiplier: Dict[str, float] = None,
htf_bias: str = 'NEUTRAL',
htf_config: Dict[str, Any] = None
) -> Optional[Dict[str, Any]]:
# Definisikan htf_config jika None
if htf_config is None:
htf_config = {
'enabled': False,
'bias_influence_score': 2.5,
'penalty_score': -5.0
}
df = get_candlestick_data(symbol, tf, 200, mt5_path)
if df is None or len(df) < 50:
logging.warning(f"Data TF {tf} tidak cukup untuk analisis.")
return None
# --- Inisialisasi Skor & Komponen ---
score = 0.0
score_components = {}
info_list: List[str] = []
logging.info(f"[Arshy | {tf}] --- Memulai Analisis Konfluensi ---")
# --- Analisis Komponen ---
current_price = df['close'].iloc[-1]
structure_str, swing_points = detect_structure(df)
atr = df['high'].sub(df['low']).rolling(14).mean().iloc[-1]
order_blocks = detect_order_blocks_multi(df, structure_filter=structure_str)
fvg_zones = detect_fvg_multi(df)
liquidity_sweep = detect_liquidity_sweep(df)
liquidity_grabs = detect_liquidity_grab(df)
patterns = detect_engulfing(df) + detect_pinbar(df) + detect_continuation_patterns(df)
# --- Analisis SMC ---
smc_analysis = detect_smc_structure(df)
# Tambahkan skor untuk struktur SMC
if smc_analysis['internal_structure']['bullish_bos']:
score += weights.get('SMC_INTERNAL_BULL_BOS', 2.0)
score_components['smc_internal'] = weights.get('SMC_INTERNAL_BULL_BOS', 2.0)
info_list.append("SMC Internal Bullish BOS")
elif smc_analysis['internal_structure']['bearish_bos']:
score += weights.get('SMC_INTERNAL_BEAR_BOS', -2.0)
score_components['smc_internal'] = weights.get('SMC_INTERNAL_BEAR_BOS', -2.0)
info_list.append("SMC Internal Bearish BOS")
if smc_analysis['swing_structure']['bullish_choch']:
score += weights.get('SMC_SWING_BULL_CHOCH', 3.0)
score_components['smc_swing'] = weights.get('SMC_SWING_BULL_CHOCH', 3.0)
info_list.append("SMC Swing Bullish CHoCH")
elif smc_analysis['swing_structure']['bearish_choch']:
score += weights.get('SMC_SWING_BEAR_CHOCH', -3.0)
score_components['smc_swing'] = weights.get('SMC_SWING_BEAR_CHOCH', -3.0)
info_list.append("SMC Swing Bearish CHoCH")
# Analisis Order Blocks
for ob in smc_analysis['order_blocks']:
if ob['type'] == 'bullish' and current_price < ob['high']:
score += weights.get('SMC_BULL_OB', 1.5)
score_components['smc_ob'] = weights.get('SMC_BULL_OB', 1.5)
info_list.append("Inside Bullish Order Block")
elif ob['type'] == 'bearish' and current_price > ob['low']:
score += weights.get('SMC_BEAR_OB', -1.5)
score_components['smc_ob'] = weights.get('SMC_BEAR_OB', -1.5)
info_list.append("Inside Bearish Order Block")
# Analisis Fair Value Gaps
for fvg in smc_analysis['fair_value_gaps'][-3:]: # Cek 3 FVG terakhir
if fvg['type'] == 'bullish' and current_price < fvg['top']:
score += weights.get('SMC_BULL_FVG', 2.0)
score_components['smc_fvg'] = weights.get('SMC_BULL_FVG', 2.0)
info_list.append("Below Bullish FVG")
elif fvg['type'] == 'bearish' and current_price > fvg['bottom']:
score += weights.get('SMC_BEAR_FVG', -2.0)
score_components['smc_fvg'] = weights.get('SMC_BEAR_FVG', -2.0)
info_list.append("Above Bearish FVG")
# --- Kalkulasi Skor ---
score = 0.0
score_components = {} # <-- DITAMBAHKAN DI SINI
info_list: List[str] = []
logging.info(f"[Arshy | {tf}] --- Memulai Analisis Konfluensi ---")
# Skor Struktur
structure_score = 0
if "BULLISH_BOS" in structure_str: structure_score += weights.get("BULLISH_BOS", 3.0)
if "BEARISH_BOS" in structure_str: structure_score += weights.get("BEARISH_BOS", -3.0)
if "HH" in structure_str: structure_score += weights.get("HH", 1.0)
if "LL" in structure_str: structure_score += weights.get("LL", -1.0)
if "HL" in structure_str: structure_score += weights.get("HL", 1.0)
if "LH" in structure_str: structure_score += weights.get("LH", -1.0)
score_components['structure'] = structure_score
score += structure_score
logging.info(f"[Arshy | {tf}] Analisis Struktur: Teridentifikasi '{structure_str}' (Skor: {structure_score:+.2f})")
# Skor Zona (FVG, OB) & Event (LS)
if fvg_zones:
nearest_fvg = fvg_zones[0]
fvg_score = (weights.get('FVG_BULLISH', 3.0) if 'BULLISH' in nearest_fvg['type'] else weights.get('FVG_BEARISH', -3.0)) * nearest_fvg['strength']
score_components['fvg'] = fvg_score
score += fvg_score
logging.info(f"[Arshy | {tf}] Zona Inefisiensi (FVG): {nearest_fvg['type']} @ {nearest_fvg['start']:.3f}-{nearest_fvg['end']:.3f} terdeteksi (Skor: {fvg_score:+.2f})")
if liquidity_sweep:
ls_event = liquidity_sweep[-1]
ls_score = weights.get(ls_event.get('type'))
if ls_score:
score_components['liquidity_sweep'] = ls_score
score += ls_score
logging.info(f"[Arshy | {tf}] Perburuan Likuiditas (Sweep): {ls_event.get('type')} @ {ls_event.get('swept_level'):.3f} terdeteksi (Skor: {ls_score:+.2f})")
if liquidity_grabs:
grab = liquidity_grabs[-1] # Ambil yang terbaru
grab_score = weights.get(grab.get('type'), 0.0) * grab.get('strength', 1.0)
if grab_score != 0:
score_components['liquidity_grab'] = grab_score
score += grab_score
logging.info(f"[Arshy | {tf}] Perburuan Likuiditas (Grab): {grab.get('type')} pada level {grab.get('swept_level'):.4f} terdeteksi (Skor: {grab_score:+.2f})")
if order_blocks:
nearest_ob = order_blocks[0]
ob_score = (weights.get('BULLISH_OB', 1.0) if 'BULLISH' in nearest_ob['type'] else weights.get('BEARISH_OB', -1.0)) * nearest_ob['strength']
score_components['order_block'] = ob_score
score += ob_score
logging.info(f"[Arshy | {tf}] Zona Order Block: {nearest_ob['type']} @ {nearest_ob['low']:.3f}-{nearest_ob['high']:.3f} terdeteksi (Skor: {ob_score:+.2f})")
# Skor Pola Minor
pattern_score = sum(weights.get(p.get('type'), 0) for p in patterns)
if pattern_score != 0:
bullish_patterns = [p.get('type') for p in patterns if weights.get(p.get('type'), 0) > 0]
bearish_patterns = [p.get('type') for p in patterns if weights.get(p.get('type'), 0) < 0]
bull_summary = ", ".join([f"{count}x {name}" for name, count in Counter(bullish_patterns).items()])
bear_summary = ", ".join([f"{count}x {name}" for name, count in Counter(bearish_patterns).items()])
summary_parts = [s for s in [f"Bullish: [{bull_summary}]" if bull_summary else "", f"Bearish: [{bear_summary}]" if bear_summary else ""] if s]
logging.info(f"[Arshy | {tf}] Konfluensi Pola Minor: {' | '.join(summary_parts)} (Skor Total: {pattern_score:+.2f})")
score_components['patterns'] = pattern_score
score += pattern_score
logging.info(f"[Arshy | {tf}] Kalkulasi Skor Awal: {score:.2f}")
# --- Penerapan HTF Bias ---
if htf_config.get('enabled', False) and htf_bias != 'NEUTRAL':
direction_pre_bias = "BUY" if score > 0 else "SELL"
bias_score = htf_config.get('bias_influence_score', 2.5)
penalty_score = htf_config.get('penalty_score', -5.0)
if (htf_bias == 'BULLISH' and direction_pre_bias == 'BUY') or (htf_bias == 'BEARISH' and direction_pre_bias == 'SELL'):
score_components['htf_bias'] = bias_score if direction_pre_bias == 'BUY' else -bias_score
score += bias_score if direction_pre_bias == 'BUY' else -bias_score
logging.info(f"[Arshy | {tf}] Validasi Tren HTF: Sinyal {direction_pre_bias} didukung. Skor disesuaikan {bias_score:+.2f}")
else:
score_components['htf_bias'] = penalty_score
score += penalty_score
logging.warning(f"[Arshy | {tf}] Validasi Tren HTF: Sinyal {direction_pre_bias} berlawanan dengan bias {htf_bias}. Penalti {penalty_score:.2f} diterapkan.")
logging.info(f"[Arshy | {tf}] Skor Akhir Terkalkulasi: {score:.2f}")
# --- Penentuan Arah & Tipe Order Berdasarkan Setup ---
direction = "WAIT"
order_type = None
entry_price_chosen = current_price
if score >= confidence_threshold:
direction = "BUY"
# 1. BUY LIMIT: Gunakan OTE zone untuk entry yang lebih presisi
bullish_poi = sorted([z for z in fvg_zones if 'BULLISH' in z['type'] and z['start'] < current_price] +
[z for z in order_blocks if 'BULLISH' in z['type'] and z['high'] < current_price],
key=lambda z: z['distance'])
if bullish_poi:
best_zone = bullish_poi[0]
# Hitung zona OTE yang lebih detail
swing_start = best_zone.get('end', best_zone.get('low'))
swing_end = best_zone.get('start', best_zone.get('high'))
ote_zones = calculate_optimal_trade_entry(swing_start, swing_end, direction)
# Gunakan mid point dari OTE zone sebagai entry default
ote_entry = ote_zones.get('mid')
# Validasi apakah harga OTE masih valid
if ote_entry and ote_entry < current_price:
order_type = "BUY_LIMIT"
entry_price_chosen = ote_entry
# Gunakan upper OTE sebagai validasi stop loss
sl_reference = ote_zones.get('lower', entry_price_chosen - (atr * 1.5))
info_list.append(f"BUY_LIMIT at OTE zone {ote_entry:.2f} based on {best_zone['type']}")
logging.info(f"[Arshy | {tf}] Setup: Pullback ke {best_zone['type']}. BUY_LIMIT di zona OTE.")
# 2. BUY STOP: Gunakan level struktur + buffer ATR
if not order_type and "BULLISH_BOS" in structure_str and swing_points.get('last_high'):
breakout_level = swing_points['last_high']
atr_buffer = atr * 0.1 # Buffer 10% ATR untuk menghindari false breakout
entry_price_chosen = breakout_level + atr_buffer
order_type = "BUY_STOP"
info_list.append(f"BUY_STOP above structure {breakout_level:.2f} + buffer")
logging.info(f"[Arshy | {tf}] Setup: Breakout BOS. BUY_STOP di atas struktur.")
# 3. INSTANT BUY: Gunakan konfirmasi momentum
if not order_type:
# Cek konfirmasi momentum tambahan
is_strong_momentum = any(p['type'] in ['ENGULFING_BULL', 'PINBAR_BULL'] for p in patterns[-3:])
if is_strong_momentum:
order_type = "BUY"
entry_price_chosen = current_price
info_list.append("INSTANT BUY with strong momentum confirmation")
logging.info(f"[Arshy | {tf}] Setup: Momentum Kuat dengan konfirmasi pola. MARKET BUY.")
elif score <= -confidence_threshold:
direction = "SELL"
# 1. SELL LIMIT: Gunakan OTE zone untuk entry yang lebih presisi
bearish_poi = sorted([z for z in fvg_zones if 'BEARISH' in z['type'] and z['start'] > current_price] +
[z for z in order_blocks if 'BEARISH' in z['type'] and z['low'] > current_price],
key=lambda z: z['distance'])
if bearish_poi:
best_zone = bearish_poi[0]
swing_start = best_zone.get('start', best_zone.get('high'))
swing_end = best_zone.get('end', best_zone.get('low'))
ote_zones = calculate_optimal_trade_entry(swing_start, swing_end, direction)
ote_entry = ote_zones.get('mid')
if ote_entry and ote_entry > current_price:
order_type = "SELL_LIMIT"
entry_price_chosen = ote_entry
sl_reference = ote_zones.get('upper', entry_price_chosen + (atr * 1.5))
info_list.append(f"SELL_LIMIT at OTE zone {ote_entry:.2f} based on {best_zone['type']}")
logging.info(f"[Arshy | {tf}] Setup: Pullback ke {best_zone['type']}. SELL_LIMIT di zona OTE.")
# 2. SELL STOP: Gunakan level struktur + buffer ATR
if not order_type and "BEARISH_BOS" in structure_str and swing_points.get('last_low'):
breakout_level = swing_points['last_low']
atr_buffer = atr * 0.1
entry_price_chosen = breakout_level - atr_buffer
order_type = "SELL_STOP"
info_list.append(f"SELL_STOP below structure {breakout_level:.2f} - buffer")
logging.info(f"[Arshy | {tf}] Setup: Breakout BOS. SELL_STOP di bawah struktur.")
# 3. INSTANT SELL: Gunakan konfirmasi momentum
if not order_type:
is_strong_momentum = any(p['type'] in ['ENGULFING_BEAR', 'PINBAR_BEAR'] for p in patterns[-3:])
if is_strong_momentum:
order_type = "SELL"
entry_price_chosen = current_price
info_list.append("INSTANT SELL with strong momentum confirmation")
logging.info(f"[Arshy | {tf}] Setup: Momentum Kuat dengan konfirmasi pola. MARKET SELL.")
# --- Validasi Akhir & Penentuan SL/TP ---
if direction == "WAIT":
return None
sl, tp = 0.0, 0.0
if direction == "BUY":
sl = entry_price_chosen - (atr * 1.5)
tp = entry_price_chosen + (atr * 3.0)
elif direction == "SELL":
sl = entry_price_chosen + (atr * 1.5)
tp = entry_price_chosen - (atr * 3.0)
# Analisis volatilitas dan penyesuaian parameter
volatility = analyze_volatility(df)
if atr_multiplier:
sl_pips = atr * atr_multiplier['sl']
tp_pips = atr * atr_multiplier['tp']
sl_pips, tp_pips = adjust_trade_parameters(volatility, sl_pips, tp_pips)
else:
sl_pips = atr * 1.5
tp_pips = atr * 3.0
# Penyesuaian SL/TP berdasarkan jenis trading
if tf in ["M1", "M5"]: # Scalping
sl_pips *= 0.8 # Lebih ketat
tp_pips *= 0.8
elif tf in ["H4", "D1"]: # Swing
sl_pips *= 1.5 # Lebih longgar
tp_pips *= 1.5
# Update perhitungan SL/TP
if direction == "BUY":
sl = entry_price_chosen - sl_pips
tp = entry_price_chosen + tp_pips
elif direction == "SELL":
sl = entry_price_chosen + sl_pips
tp = entry_price_chosen - tp_pips
logging.info(f"[Arshy | {tf}] --- Analisis Selesai --- | Rekomendasi: {direction} | Tipe: {order_type} | Skor Keyakinan: {score:.2f}")
return {
"signal": direction, "order_type": order_type, "entry_price_chosen": entry_price_chosen,
"sl": sl, "tp": tp, "score": score, "info": "; ".join(info_list),
"score_components": score_components,
"features": get_gng_input_features_full(df, gng_feature_stats, tf) if gng_model else None,
"tf": tf, "symbol": symbol,
}
def validate_and_generate_signal(
df: pd.DataFrame,
analysis: Dict[str, Any],
profile_settings: Dict[str, Any]
) -> Optional[Dict[str, Any]]:
"""
Validasi dan generate signal dengan pengecekan lebih ketat untuk scalping
"""
if analysis['profile_type'] == "scalping":
# Validasi khusus scalping
is_valid, confidence_boost, validations = validate_scalping_opportunity(df, profile_settings)
if not is_valid:
logging.info(f"Scalping validation failed: {', '.join(validations)}")
return None
# Tambahkan confidence boost ke score
analysis['score'] += confidence_boost
analysis['validations'] = validations
# Additional checks for scalping
current_spread = get_current_spread(df)
if current_spread > profile_settings['entry_rules']['max_spread_multiplier']:
logging.info(f"Spread too high for scalping: {current_spread}")
return None
# Quick momentum check
last_close = df['close'].iloc[-1]
prev_close = df['close'].iloc[-2]
momentum_aligned = (
(analysis['score'] > 0 and last_close > prev_close) or
(analysis['score'] < 0 and last_close < prev_close)
)
if not momentum_aligned:
logging.info("Momentum not aligned with signal direction")
return None
# Generate entry points
entry_points = calculate_entry_points(df, analysis, profile_settings)
if not entry_points:
return None
# Calculate stop loss and take profit
sl_tp = calculate_sl_tp(df, analysis, entry_points['entry'], profile_settings)
if not sl_tp:
return None
signal = {
'symbol': analysis['symbol'],
'signal': 'BUY' if analysis['score'] > 0 else 'SELL',
'entry_price': entry_points['entry'],
'sl': sl_tp['sl'],
'tp': sl_tp['tp'],
'score': analysis['score'],
'timeframe': analysis['timeframe'],
'profile_type': analysis['profile_type']
}
if analysis['profile_type'] == "scalping":
signal['validations'] = analysis.get('validations', [])
return signal
def calculate_entry_points(
df: pd.DataFrame,
analysis: Dict[str, Any],
profile_settings: Dict[str, Any]
) -> Optional[Dict[str, float]]:
"""Calculate optimal entry points with profile-specific logic"""
current_price = df['close'].iloc[-1]
atr = calculate_atr_dynamic(df)
if analysis['profile_type'] == "scalping":
# For scalping, we want very precise entries
if analysis['score'] > 0: # Bullish
entry = current_price + (atr * 0.1) # Tight entry above current price
if entry > df['high'].iloc[-1]: # Don't enter above recent high
entry = current_price
else: # Bearish
entry = current_price - (atr * 0.1) # Tight entry below current price
if entry < df['low'].iloc[-1]: # Don't enter below recent low
entry = current_price
else: # For intraday and swing
if analysis['score'] > 0:
entry = current_price + (atr * 0.2)
else:
entry = current_price - (atr * 0.2)
return {'entry': entry}
def calculate_sl_tp(
df: pd.DataFrame,
analysis: Dict[str, Any],
entry_price: float,
profile_settings: Dict[str, Any]
) -> Optional[Dict[str, float]]:
"""Calculate SL/TP with profile-specific risk management"""
atr = calculate_atr_dynamic(df)
if analysis['profile_type'] == "scalping":
# Tighter stops for scalping
sl_distance = atr * profile_settings['atr_multiplier']['sl']
tp_distance = atr * profile_settings['atr_multiplier']['tp']
if analysis['score'] > 0: # Bullish
sl = entry_price - sl_distance
tp = entry_price + tp_distance
# Validate against recent price action
if sl > df['low'].iloc[-3:].min(): # SL must be below recent lows
sl = df['low'].iloc[-3:].min() - (atr * 0.1)
else: # Bearish
sl = entry_price + sl_distance
tp = entry_price - tp_distance
# Validate against recent price action
if sl < df['high'].iloc[-3:].max(): # SL must be above recent highs
sl = df['high'].iloc[-3:].max() + (atr * 0.1)
else: # For intraday and swing
sl_distance = atr * profile_settings['atr_multiplier']['sl']
tp_distance = atr * profile_settings['atr_multiplier']['tp']
if analysis['score'] > 0:
sl = entry_price - sl_distance
tp = entry_price + tp_distance
else:
sl = entry_price + sl_distance
tp = entry_price - tp_distance
return {'sl': sl, 'tp': tp}
def get_current_spread(df: pd.DataFrame) -> float:
"""Calculate current spread from OHLC data"""
if 'spread' in df.columns:
return df['spread'].iloc[-1]
return abs(df['high'].iloc[-1] - df['low'].iloc[-1]) / calculate_atr_dynamic(df)