523 lines
No EOL
24 KiB
Python
523 lines
No EOL
24 KiB
Python
# signal_generator.py (VERSI FINAL YANG SUDAH DIPERBAIKI)
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import json
|
|
from typing import Dict, List, Tuple, Optional, Any
|
|
from collections import Counter
|
|
|
|
import MetaTrader5 as mt5
|
|
|
|
from data_fetching import get_candlestick_data
|
|
from technical_indicators import (
|
|
detect_structure,
|
|
detect_order_blocks_multi,
|
|
detect_fvg_multi,
|
|
detect_eqh_eql,
|
|
detect_liquidity_sweep,
|
|
detect_liquidity_grab,
|
|
calculate_optimal_trade_entry,
|
|
detect_engulfing,
|
|
detect_pinbar,
|
|
detect_continuation_patterns,
|
|
analyze_volatility, # Tambahkan import ini
|
|
adjust_trade_parameters, # Tambahkan import ini
|
|
detect_smc_structure, # Tambahkan import ini
|
|
)
|
|
from gng_model import (
|
|
get_gng_input_features_full,
|
|
get_gng_context,
|
|
)
|
|
|
|
def get_open_positions_per_tf(symbol: str, tf: str, mt5_path: str) -> int:
|
|
if not mt5.initialize(path=mt5_path): return 99
|
|
positions = mt5.positions_get(symbol=symbol)
|
|
mt5.shutdown()
|
|
return len(positions) if positions is not None else 0
|
|
|
|
def get_active_orders(symbol: str, mt5_path: str) -> List[float]:
|
|
if not mt5.initialize(path=mt5_path): return []
|
|
active_prices: List[float] = []
|
|
try:
|
|
positions = mt5.positions_get(symbol=symbol)
|
|
if positions:
|
|
for pos in positions: active_prices.append(pos.price_open)
|
|
orders = mt5.orders_get(symbol=symbol)
|
|
if orders:
|
|
for order in orders: active_prices.append(order.price_open)
|
|
except Exception as e:
|
|
logging.error(f"Error saat mengambil order/posisi aktif: {e}")
|
|
finally:
|
|
mt5.shutdown()
|
|
return active_prices
|
|
|
|
def is_far_enough(entry_price: float, existing_prices: List[float], point_value: float, min_distance_pips: float) -> bool:
|
|
min_distance_points = min_distance_pips * point_value
|
|
for price in existing_prices:
|
|
if abs(entry_price - price) < min_distance_points:
|
|
logging.warning(f"Sinyal DITOLAK: Entry {entry_price:.3f} terlalu dekat dengan order aktif di {price:.3f}.")
|
|
return False
|
|
return True
|
|
|
|
def build_signal_format(symbol: str, entry_price: float, direction: str, sl: float, tp: float, order_type: str) -> dict:
|
|
signal = {"Symbol": symbol}
|
|
# Disesuaikan dengan case-sensitivity dari MQL dan tambahkan kunci yang hilang
|
|
order_keys = [
|
|
"BuyEntry", "BuySL", "BuyTP", "SellEntry", "SellSL", "SellTP",
|
|
"BuyStop", "BuyStopSL", "BuyStopTP", "SellStop", "SellStopSL", "SellStopTP",
|
|
"Buylimit", "BuylimitSL", "BuylimitTP", "Selllimit", "SelllimitSL", "SellLimitTP",
|
|
"DeleteLimit/Stop"
|
|
]
|
|
for key in order_keys:
|
|
signal[key] = ""
|
|
order_type_upper = order_type.upper()
|
|
if order_type_upper == 'BUY':
|
|
signal.update({"BuyEntry": str(entry_price), "BuySL": str(sl), "BuyTP": str(tp)})
|
|
elif order_type_upper == 'SELL':
|
|
signal.update({"SellEntry": str(entry_price), "SellSL": str(sl), "SellTP": str(tp)})
|
|
elif order_type_upper == 'BUY_LIMIT':
|
|
signal.update({"Buylimit": str(entry_price), "BuylimitSL": str(sl), "BuylimitTP": str(tp)})
|
|
elif order_type_upper == 'SELL_LIMIT':
|
|
signal.update({"Selllimit": str(entry_price), "SelllimitSL": str(sl), "SellLimitTP": str(tp)})
|
|
elif order_type_upper == 'BUY_STOP':
|
|
signal.update({"BuyStop": str(entry_price), "BuyStopSL": str(sl), "BuyStopTP": str(tp)})
|
|
elif order_type_upper == 'SELL_STOP':
|
|
signal.update({"SellStop": str(entry_price), "SellStopSL": str(sl), "SellStopTP": str(tp)})
|
|
return signal
|
|
|
|
def make_signal_id(signal_json: Dict[str, str]) -> str:
|
|
return str(abs(hash(json.dumps(signal_json, sort_keys=True))))
|
|
|
|
def analyze_tf_opportunity(
|
|
symbol: str,
|
|
tf: str,
|
|
mt5_path: str,
|
|
gng_model,
|
|
gng_feature_stats: Dict[str, Any],
|
|
confidence_threshold: float,
|
|
min_distance_pips_per_tf: Dict[str, float],
|
|
weights: Dict[str, float],
|
|
atr_multiplier: Dict[str, float] = None,
|
|
htf_bias: str = 'NEUTRAL',
|
|
htf_config: Dict[str, Any] = None
|
|
) -> Optional[Dict[str, Any]]:
|
|
# Definisikan htf_config jika None
|
|
if htf_config is None:
|
|
htf_config = {
|
|
'enabled': False,
|
|
'bias_influence_score': 2.5,
|
|
'penalty_score': -5.0
|
|
}
|
|
|
|
df = get_candlestick_data(symbol, tf, 200, mt5_path)
|
|
if df is None or len(df) < 50:
|
|
logging.warning(f"Data TF {tf} tidak cukup untuk analisis.")
|
|
return None
|
|
|
|
# --- Inisialisasi Skor & Komponen ---
|
|
score = 0.0
|
|
score_components = {}
|
|
info_list: List[str] = []
|
|
logging.info(f"[Arshy | {tf}] --- Memulai Analisis Konfluensi ---")
|
|
|
|
# --- Analisis Komponen ---
|
|
current_price = df['close'].iloc[-1]
|
|
structure_str, swing_points = detect_structure(df)
|
|
atr = df['high'].sub(df['low']).rolling(14).mean().iloc[-1]
|
|
order_blocks = detect_order_blocks_multi(df, structure_filter=structure_str)
|
|
fvg_zones = detect_fvg_multi(df)
|
|
liquidity_sweep = detect_liquidity_sweep(df)
|
|
liquidity_grabs = detect_liquidity_grab(df)
|
|
patterns = detect_engulfing(df) + detect_pinbar(df) + detect_continuation_patterns(df)
|
|
|
|
# --- Analisis SMC ---
|
|
smc_analysis = detect_smc_structure(df)
|
|
|
|
# Tambahkan skor untuk struktur SMC
|
|
if smc_analysis['internal_structure']['bullish_bos']:
|
|
score += weights.get('SMC_INTERNAL_BULL_BOS', 2.0)
|
|
score_components['smc_internal'] = weights.get('SMC_INTERNAL_BULL_BOS', 2.0)
|
|
info_list.append("SMC Internal Bullish BOS")
|
|
elif smc_analysis['internal_structure']['bearish_bos']:
|
|
score += weights.get('SMC_INTERNAL_BEAR_BOS', -2.0)
|
|
score_components['smc_internal'] = weights.get('SMC_INTERNAL_BEAR_BOS', -2.0)
|
|
info_list.append("SMC Internal Bearish BOS")
|
|
|
|
if smc_analysis['swing_structure']['bullish_choch']:
|
|
score += weights.get('SMC_SWING_BULL_CHOCH', 3.0)
|
|
score_components['smc_swing'] = weights.get('SMC_SWING_BULL_CHOCH', 3.0)
|
|
info_list.append("SMC Swing Bullish CHoCH")
|
|
elif smc_analysis['swing_structure']['bearish_choch']:
|
|
score += weights.get('SMC_SWING_BEAR_CHOCH', -3.0)
|
|
score_components['smc_swing'] = weights.get('SMC_SWING_BEAR_CHOCH', -3.0)
|
|
info_list.append("SMC Swing Bearish CHoCH")
|
|
|
|
# Analisis Order Blocks
|
|
for ob in smc_analysis['order_blocks']:
|
|
if ob['type'] == 'bullish' and current_price < ob['high']:
|
|
score += weights.get('SMC_BULL_OB', 1.5)
|
|
score_components['smc_ob'] = weights.get('SMC_BULL_OB', 1.5)
|
|
info_list.append("Inside Bullish Order Block")
|
|
elif ob['type'] == 'bearish' and current_price > ob['low']:
|
|
score += weights.get('SMC_BEAR_OB', -1.5)
|
|
score_components['smc_ob'] = weights.get('SMC_BEAR_OB', -1.5)
|
|
info_list.append("Inside Bearish Order Block")
|
|
|
|
# Analisis Fair Value Gaps
|
|
for fvg in smc_analysis['fair_value_gaps'][-3:]: # Cek 3 FVG terakhir
|
|
if fvg['type'] == 'bullish' and current_price < fvg['top']:
|
|
score += weights.get('SMC_BULL_FVG', 2.0)
|
|
score_components['smc_fvg'] = weights.get('SMC_BULL_FVG', 2.0)
|
|
info_list.append("Below Bullish FVG")
|
|
elif fvg['type'] == 'bearish' and current_price > fvg['bottom']:
|
|
score += weights.get('SMC_BEAR_FVG', -2.0)
|
|
score_components['smc_fvg'] = weights.get('SMC_BEAR_FVG', -2.0)
|
|
info_list.append("Above Bearish FVG")
|
|
|
|
# --- Kalkulasi Skor ---
|
|
score = 0.0
|
|
score_components = {} # <-- DITAMBAHKAN DI SINI
|
|
info_list: List[str] = []
|
|
logging.info(f"[Arshy | {tf}] --- Memulai Analisis Konfluensi ---")
|
|
|
|
# Skor Struktur
|
|
structure_score = 0
|
|
if "BULLISH_BOS" in structure_str: structure_score += weights.get("BULLISH_BOS", 3.0)
|
|
if "BEARISH_BOS" in structure_str: structure_score += weights.get("BEARISH_BOS", -3.0)
|
|
if "HH" in structure_str: structure_score += weights.get("HH", 1.0)
|
|
if "LL" in structure_str: structure_score += weights.get("LL", -1.0)
|
|
if "HL" in structure_str: structure_score += weights.get("HL", 1.0)
|
|
if "LH" in structure_str: structure_score += weights.get("LH", -1.0)
|
|
score_components['structure'] = structure_score
|
|
score += structure_score
|
|
logging.info(f"[Arshy | {tf}] Analisis Struktur: Teridentifikasi '{structure_str}' (Skor: {structure_score:+.2f})")
|
|
|
|
# Skor Zona (FVG, OB) & Event (LS)
|
|
if fvg_zones:
|
|
nearest_fvg = fvg_zones[0]
|
|
fvg_score = (weights.get('FVG_BULLISH', 3.0) if 'BULLISH' in nearest_fvg['type'] else weights.get('FVG_BEARISH', -3.0)) * nearest_fvg['strength']
|
|
score_components['fvg'] = fvg_score
|
|
score += fvg_score
|
|
logging.info(f"[Arshy | {tf}] Zona Inefisiensi (FVG): {nearest_fvg['type']} @ {nearest_fvg['start']:.3f}-{nearest_fvg['end']:.3f} terdeteksi (Skor: {fvg_score:+.2f})")
|
|
if liquidity_sweep:
|
|
ls_event = liquidity_sweep[-1]
|
|
ls_score = weights.get(ls_event.get('type'))
|
|
if ls_score:
|
|
score_components['liquidity_sweep'] = ls_score
|
|
score += ls_score
|
|
logging.info(f"[Arshy | {tf}] Perburuan Likuiditas (Sweep): {ls_event.get('type')} @ {ls_event.get('swept_level'):.3f} terdeteksi (Skor: {ls_score:+.2f})")
|
|
if liquidity_grabs:
|
|
grab = liquidity_grabs[-1] # Ambil yang terbaru
|
|
grab_score = weights.get(grab.get('type'), 0.0) * grab.get('strength', 1.0)
|
|
if grab_score != 0:
|
|
score_components['liquidity_grab'] = grab_score
|
|
score += grab_score
|
|
logging.info(f"[Arshy | {tf}] Perburuan Likuiditas (Grab): {grab.get('type')} pada level {grab.get('swept_level'):.4f} terdeteksi (Skor: {grab_score:+.2f})")
|
|
if order_blocks:
|
|
nearest_ob = order_blocks[0]
|
|
ob_score = (weights.get('BULLISH_OB', 1.0) if 'BULLISH' in nearest_ob['type'] else weights.get('BEARISH_OB', -1.0)) * nearest_ob['strength']
|
|
score_components['order_block'] = ob_score
|
|
score += ob_score
|
|
logging.info(f"[Arshy | {tf}] Zona Order Block: {nearest_ob['type']} @ {nearest_ob['low']:.3f}-{nearest_ob['high']:.3f} terdeteksi (Skor: {ob_score:+.2f})")
|
|
|
|
# Skor Pola Minor
|
|
pattern_score = sum(weights.get(p.get('type'), 0) for p in patterns)
|
|
if pattern_score != 0:
|
|
bullish_patterns = [p.get('type') for p in patterns if weights.get(p.get('type'), 0) > 0]
|
|
bearish_patterns = [p.get('type') for p in patterns if weights.get(p.get('type'), 0) < 0]
|
|
bull_summary = ", ".join([f"{count}x {name}" for name, count in Counter(bullish_patterns).items()])
|
|
bear_summary = ", ".join([f"{count}x {name}" for name, count in Counter(bearish_patterns).items()])
|
|
summary_parts = [s for s in [f"Bullish: [{bull_summary}]" if bull_summary else "", f"Bearish: [{bear_summary}]" if bear_summary else ""] if s]
|
|
logging.info(f"[Arshy | {tf}] Konfluensi Pola Minor: {' | '.join(summary_parts)} (Skor Total: {pattern_score:+.2f})")
|
|
score_components['patterns'] = pattern_score
|
|
score += pattern_score
|
|
logging.info(f"[Arshy | {tf}] Kalkulasi Skor Awal: {score:.2f}")
|
|
|
|
# --- Penerapan HTF Bias ---
|
|
if htf_config.get('enabled', False) and htf_bias != 'NEUTRAL':
|
|
direction_pre_bias = "BUY" if score > 0 else "SELL"
|
|
bias_score = htf_config.get('bias_influence_score', 2.5)
|
|
penalty_score = htf_config.get('penalty_score', -5.0)
|
|
if (htf_bias == 'BULLISH' and direction_pre_bias == 'BUY') or (htf_bias == 'BEARISH' and direction_pre_bias == 'SELL'):
|
|
score_components['htf_bias'] = bias_score if direction_pre_bias == 'BUY' else -bias_score
|
|
score += bias_score if direction_pre_bias == 'BUY' else -bias_score
|
|
logging.info(f"[Arshy | {tf}] Validasi Tren HTF: Sinyal {direction_pre_bias} didukung. Skor disesuaikan {bias_score:+.2f}")
|
|
else:
|
|
score_components['htf_bias'] = penalty_score
|
|
score += penalty_score
|
|
logging.warning(f"[Arshy | {tf}] Validasi Tren HTF: Sinyal {direction_pre_bias} berlawanan dengan bias {htf_bias}. Penalti {penalty_score:.2f} diterapkan.")
|
|
logging.info(f"[Arshy | {tf}] Skor Akhir Terkalkulasi: {score:.2f}")
|
|
|
|
# --- Penentuan Arah & Tipe Order Berdasarkan Setup ---
|
|
direction = "WAIT"
|
|
order_type = None
|
|
entry_price_chosen = current_price
|
|
|
|
if score >= confidence_threshold:
|
|
direction = "BUY"
|
|
# 1. BUY LIMIT: Gunakan OTE zone untuk entry yang lebih presisi
|
|
bullish_poi = sorted([z for z in fvg_zones if 'BULLISH' in z['type'] and z['start'] < current_price] +
|
|
[z for z in order_blocks if 'BULLISH' in z['type'] and z['high'] < current_price],
|
|
key=lambda z: z['distance'])
|
|
if bullish_poi:
|
|
best_zone = bullish_poi[0]
|
|
# Hitung zona OTE yang lebih detail
|
|
swing_start = best_zone.get('end', best_zone.get('low'))
|
|
swing_end = best_zone.get('start', best_zone.get('high'))
|
|
ote_zones = calculate_optimal_trade_entry(swing_start, swing_end, direction)
|
|
|
|
# Gunakan mid point dari OTE zone sebagai entry default
|
|
ote_entry = ote_zones.get('mid')
|
|
|
|
# Validasi apakah harga OTE masih valid
|
|
if ote_entry and ote_entry < current_price:
|
|
order_type = "BUY_LIMIT"
|
|
entry_price_chosen = ote_entry
|
|
# Gunakan upper OTE sebagai validasi stop loss
|
|
sl_reference = ote_zones.get('lower', entry_price_chosen - (atr * 1.5))
|
|
info_list.append(f"BUY_LIMIT at OTE zone {ote_entry:.2f} based on {best_zone['type']}")
|
|
logging.info(f"[Arshy | {tf}] Setup: Pullback ke {best_zone['type']}. BUY_LIMIT di zona OTE.")
|
|
|
|
# 2. BUY STOP: Gunakan level struktur + buffer ATR
|
|
if not order_type and "BULLISH_BOS" in structure_str and swing_points.get('last_high'):
|
|
breakout_level = swing_points['last_high']
|
|
atr_buffer = atr * 0.1 # Buffer 10% ATR untuk menghindari false breakout
|
|
entry_price_chosen = breakout_level + atr_buffer
|
|
order_type = "BUY_STOP"
|
|
info_list.append(f"BUY_STOP above structure {breakout_level:.2f} + buffer")
|
|
logging.info(f"[Arshy | {tf}] Setup: Breakout BOS. BUY_STOP di atas struktur.")
|
|
|
|
# 3. INSTANT BUY: Gunakan konfirmasi momentum
|
|
if not order_type:
|
|
# Cek konfirmasi momentum tambahan
|
|
is_strong_momentum = any(p['type'] in ['ENGULFING_BULL', 'PINBAR_BULL'] for p in patterns[-3:])
|
|
if is_strong_momentum:
|
|
order_type = "BUY"
|
|
entry_price_chosen = current_price
|
|
info_list.append("INSTANT BUY with strong momentum confirmation")
|
|
logging.info(f"[Arshy | {tf}] Setup: Momentum Kuat dengan konfirmasi pola. MARKET BUY.")
|
|
|
|
elif score <= -confidence_threshold:
|
|
direction = "SELL"
|
|
# 1. SELL LIMIT: Gunakan OTE zone untuk entry yang lebih presisi
|
|
bearish_poi = sorted([z for z in fvg_zones if 'BEARISH' in z['type'] and z['start'] > current_price] +
|
|
[z for z in order_blocks if 'BEARISH' in z['type'] and z['low'] > current_price],
|
|
key=lambda z: z['distance'])
|
|
if bearish_poi:
|
|
best_zone = bearish_poi[0]
|
|
swing_start = best_zone.get('start', best_zone.get('high'))
|
|
swing_end = best_zone.get('end', best_zone.get('low'))
|
|
ote_zones = calculate_optimal_trade_entry(swing_start, swing_end, direction)
|
|
|
|
ote_entry = ote_zones.get('mid')
|
|
if ote_entry and ote_entry > current_price:
|
|
order_type = "SELL_LIMIT"
|
|
entry_price_chosen = ote_entry
|
|
sl_reference = ote_zones.get('upper', entry_price_chosen + (atr * 1.5))
|
|
info_list.append(f"SELL_LIMIT at OTE zone {ote_entry:.2f} based on {best_zone['type']}")
|
|
logging.info(f"[Arshy | {tf}] Setup: Pullback ke {best_zone['type']}. SELL_LIMIT di zona OTE.")
|
|
|
|
# 2. SELL STOP: Gunakan level struktur + buffer ATR
|
|
if not order_type and "BEARISH_BOS" in structure_str and swing_points.get('last_low'):
|
|
breakout_level = swing_points['last_low']
|
|
atr_buffer = atr * 0.1
|
|
entry_price_chosen = breakout_level - atr_buffer
|
|
order_type = "SELL_STOP"
|
|
info_list.append(f"SELL_STOP below structure {breakout_level:.2f} - buffer")
|
|
logging.info(f"[Arshy | {tf}] Setup: Breakout BOS. SELL_STOP di bawah struktur.")
|
|
|
|
# 3. INSTANT SELL: Gunakan konfirmasi momentum
|
|
if not order_type:
|
|
is_strong_momentum = any(p['type'] in ['ENGULFING_BEAR', 'PINBAR_BEAR'] for p in patterns[-3:])
|
|
if is_strong_momentum:
|
|
order_type = "SELL"
|
|
entry_price_chosen = current_price
|
|
info_list.append("INSTANT SELL with strong momentum confirmation")
|
|
logging.info(f"[Arshy | {tf}] Setup: Momentum Kuat dengan konfirmasi pola. MARKET SELL.")
|
|
|
|
# --- Validasi Akhir & Penentuan SL/TP ---
|
|
if direction == "WAIT":
|
|
return None
|
|
|
|
sl, tp = 0.0, 0.0
|
|
if direction == "BUY":
|
|
sl = entry_price_chosen - (atr * 1.5)
|
|
tp = entry_price_chosen + (atr * 3.0)
|
|
elif direction == "SELL":
|
|
sl = entry_price_chosen + (atr * 1.5)
|
|
tp = entry_price_chosen - (atr * 3.0)
|
|
|
|
# Analisis volatilitas dan penyesuaian parameter
|
|
volatility = analyze_volatility(df)
|
|
if atr_multiplier:
|
|
sl_pips = atr * atr_multiplier['sl']
|
|
tp_pips = atr * atr_multiplier['tp']
|
|
sl_pips, tp_pips = adjust_trade_parameters(volatility, sl_pips, tp_pips)
|
|
else:
|
|
sl_pips = atr * 1.5
|
|
tp_pips = atr * 3.0
|
|
|
|
# Penyesuaian SL/TP berdasarkan jenis trading
|
|
if tf in ["M1", "M5"]: # Scalping
|
|
sl_pips *= 0.8 # Lebih ketat
|
|
tp_pips *= 0.8
|
|
elif tf in ["H4", "D1"]: # Swing
|
|
sl_pips *= 1.5 # Lebih longgar
|
|
tp_pips *= 1.5
|
|
|
|
# Update perhitungan SL/TP
|
|
if direction == "BUY":
|
|
sl = entry_price_chosen - sl_pips
|
|
tp = entry_price_chosen + tp_pips
|
|
elif direction == "SELL":
|
|
sl = entry_price_chosen + sl_pips
|
|
tp = entry_price_chosen - tp_pips
|
|
|
|
logging.info(f"[Arshy | {tf}] --- Analisis Selesai --- | Rekomendasi: {direction} | Tipe: {order_type} | Skor Keyakinan: {score:.2f}")
|
|
return {
|
|
"signal": direction, "order_type": order_type, "entry_price_chosen": entry_price_chosen,
|
|
"sl": sl, "tp": tp, "score": score, "info": "; ".join(info_list),
|
|
"score_components": score_components,
|
|
"features": get_gng_input_features_full(df, gng_feature_stats, tf) if gng_model else None,
|
|
"tf": tf, "symbol": symbol,
|
|
}
|
|
|
|
def validate_and_generate_signal(
|
|
df: pd.DataFrame,
|
|
analysis: Dict[str, Any],
|
|
profile_settings: Dict[str, Any]
|
|
) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Validasi dan generate signal dengan pengecekan lebih ketat untuk scalping
|
|
"""
|
|
if analysis['profile_type'] == "scalping":
|
|
# Validasi khusus scalping
|
|
is_valid, confidence_boost, validations = validate_scalping_opportunity(df, profile_settings)
|
|
|
|
if not is_valid:
|
|
logging.info(f"Scalping validation failed: {', '.join(validations)}")
|
|
return None
|
|
|
|
# Tambahkan confidence boost ke score
|
|
analysis['score'] += confidence_boost
|
|
analysis['validations'] = validations
|
|
|
|
# Additional checks for scalping
|
|
current_spread = get_current_spread(df)
|
|
if current_spread > profile_settings['entry_rules']['max_spread_multiplier']:
|
|
logging.info(f"Spread too high for scalping: {current_spread}")
|
|
return None
|
|
|
|
# Quick momentum check
|
|
last_close = df['close'].iloc[-1]
|
|
prev_close = df['close'].iloc[-2]
|
|
momentum_aligned = (
|
|
(analysis['score'] > 0 and last_close > prev_close) or
|
|
(analysis['score'] < 0 and last_close < prev_close)
|
|
)
|
|
if not momentum_aligned:
|
|
logging.info("Momentum not aligned with signal direction")
|
|
return None
|
|
|
|
# Generate entry points
|
|
entry_points = calculate_entry_points(df, analysis, profile_settings)
|
|
if not entry_points:
|
|
return None
|
|
|
|
# Calculate stop loss and take profit
|
|
sl_tp = calculate_sl_tp(df, analysis, entry_points['entry'], profile_settings)
|
|
if not sl_tp:
|
|
return None
|
|
|
|
signal = {
|
|
'symbol': analysis['symbol'],
|
|
'signal': 'BUY' if analysis['score'] > 0 else 'SELL',
|
|
'entry_price': entry_points['entry'],
|
|
'sl': sl_tp['sl'],
|
|
'tp': sl_tp['tp'],
|
|
'score': analysis['score'],
|
|
'timeframe': analysis['timeframe'],
|
|
'profile_type': analysis['profile_type']
|
|
}
|
|
|
|
if analysis['profile_type'] == "scalping":
|
|
signal['validations'] = analysis.get('validations', [])
|
|
|
|
return signal
|
|
|
|
def calculate_entry_points(
|
|
df: pd.DataFrame,
|
|
analysis: Dict[str, Any],
|
|
profile_settings: Dict[str, Any]
|
|
) -> Optional[Dict[str, float]]:
|
|
"""Calculate optimal entry points with profile-specific logic"""
|
|
current_price = df['close'].iloc[-1]
|
|
atr = calculate_atr_dynamic(df)
|
|
|
|
if analysis['profile_type'] == "scalping":
|
|
# For scalping, we want very precise entries
|
|
if analysis['score'] > 0: # Bullish
|
|
entry = current_price + (atr * 0.1) # Tight entry above current price
|
|
if entry > df['high'].iloc[-1]: # Don't enter above recent high
|
|
entry = current_price
|
|
else: # Bearish
|
|
entry = current_price - (atr * 0.1) # Tight entry below current price
|
|
if entry < df['low'].iloc[-1]: # Don't enter below recent low
|
|
entry = current_price
|
|
|
|
else: # For intraday and swing
|
|
if analysis['score'] > 0:
|
|
entry = current_price + (atr * 0.2)
|
|
else:
|
|
entry = current_price - (atr * 0.2)
|
|
|
|
return {'entry': entry}
|
|
|
|
def calculate_sl_tp(
|
|
df: pd.DataFrame,
|
|
analysis: Dict[str, Any],
|
|
entry_price: float,
|
|
profile_settings: Dict[str, Any]
|
|
) -> Optional[Dict[str, float]]:
|
|
"""Calculate SL/TP with profile-specific risk management"""
|
|
atr = calculate_atr_dynamic(df)
|
|
|
|
if analysis['profile_type'] == "scalping":
|
|
# Tighter stops for scalping
|
|
sl_distance = atr * profile_settings['atr_multiplier']['sl']
|
|
tp_distance = atr * profile_settings['atr_multiplier']['tp']
|
|
|
|
if analysis['score'] > 0: # Bullish
|
|
sl = entry_price - sl_distance
|
|
tp = entry_price + tp_distance
|
|
|
|
# Validate against recent price action
|
|
if sl > df['low'].iloc[-3:].min(): # SL must be below recent lows
|
|
sl = df['low'].iloc[-3:].min() - (atr * 0.1)
|
|
else: # Bearish
|
|
sl = entry_price + sl_distance
|
|
tp = entry_price - tp_distance
|
|
|
|
# Validate against recent price action
|
|
if sl < df['high'].iloc[-3:].max(): # SL must be above recent highs
|
|
sl = df['high'].iloc[-3:].max() + (atr * 0.1)
|
|
|
|
else: # For intraday and swing
|
|
sl_distance = atr * profile_settings['atr_multiplier']['sl']
|
|
tp_distance = atr * profile_settings['atr_multiplier']['tp']
|
|
|
|
if analysis['score'] > 0:
|
|
sl = entry_price - sl_distance
|
|
tp = entry_price + tp_distance
|
|
else:
|
|
sl = entry_price + sl_distance
|
|
tp = entry_price - tp_distance
|
|
|
|
return {'sl': sl, 'tp': tp}
|
|
|
|
def get_current_spread(df: pd.DataFrame) -> float:
|
|
"""Calculate current spread from OHLC data"""
|
|
if 'spread' in df.columns:
|
|
return df['spread'].iloc[-1]
|
|
return abs(df['high'].iloc[-1] - df['low'].iloc[-1]) / calculate_atr_dynamic(df) |