markaz_arshy/main.py
2025-08-12 14:36:24 +00:00

389 lines
15 KiB
Python

# main.py (Refactored for Profiles & Learning) — patched
#
# Deskripsi:
# Versi ini mendukung "Strategy Profiles" dari config.json,
# cooldown per-profil, prefilter confidence sebelum handle,
# validasi jarak minimal terhadap order/posisi aktif,
# dan propagasi HTF bias/konfigurasi ke analyzer.
import logging
import os
import time
import numpy as np
import xgboost as xgb
from datetime import datetime, timedelta
from typing import Dict, List, Any, Tuple
import json
from data_fetching import get_candlestick_data, DataCache
from gng_model import initialize_gng_models # (opsional, tidak dipakai di loop ini)
from log_handler import WebServerHandler
from learning import analyze_and_adapt_profiles
from signal_generator import (
analyze_tf_opportunity,
build_signal_format,
make_signal_id,
get_open_positions_per_tf,
get_active_orders,
is_far_enough,
)
from server_comm import send_signal_to_server
# --- Global States ---
DATA_CACHE = DataCache()
active_signals: Dict[str, Dict[str, Dict[str, Any]]] = {}
# Cooldown per (symbol, profile_name)
signal_cooldown: Dict[Tuple[str, str], datetime] = {}
def load_config(filepath: str = "config.json") -> Dict[str, Any]:
"""Memuat konfigurasi dari file JSON dan setup logging."""
try:
with open(filepath, 'r') as f:
config = json.load(f)
# Logging setup
log_config = config.get('logging', {})
log_format = log_config.get('format', '%(asctime)s - %(levelname)s - %(message)s')
log_level = getattr(logging, log_config.get('level', 'INFO').upper(), logging.INFO)
logging.basicConfig(level=log_level, format=log_format)
# Web server logging (opsional)
if config.get('global_settings', {}).get('server_url'):
root_logger = logging.getLogger()
log_server_url = config['global_settings']['server_url'].replace('/submit_signal', '')
web_handler = WebServerHandler(url=log_server_url)
web_handler.setFormatter(logging.Formatter(log_format))
root_logger.addHandler(web_handler)
logging.info("Konfigurasi berhasil dimuat dari %s.", filepath)
return config
except (FileNotFoundError, json.JSONDecodeError) as e:
logging.critical("Error file konfigurasi: %s. Bot tidak bisa berjalan.", e)
raise SystemExit(1)
except Exception as e:
logging.critical("Error saat memuat konfigurasi: %s", e)
raise SystemExit(1)
def initialize_models(config: Dict[str, Any]) -> tuple[dict, dict]:
"""Inisialisasi semua model (GNG, XGBoost) untuk semua simbol."""
gng_models, xgb_models = {}, {}
symbols = [s for s in config.keys() if s not in ["global_settings", "logging", "learning", "trial_user_settings"]]
logging.info("--- Inisialisasi Model AI (XGBoost) ---")
for symbol in symbols:
try:
model_path = f"xgboost_model_{symbol}.json"
model = xgb.XGBClassifier()
model.load_model(model_path)
xgb_models[symbol] = model
logging.info("Model AI untuk %s berhasil dimuat.", symbol)
except Exception as e:
logging.error("GAGAL memuat model AI untuk %s: %s.", symbol, e)
xgb_models[symbol] = None
return gng_models, xgb_models
def handle_opportunity(
opp: Dict[str, Any],
symbol: str,
tf: str,
symbol_config: Dict[str, Any],
global_config: Dict[str, Any],
xgb_model: xgb.XGBClassifier,
profile_name: str,
) -> bool:
"""Memproses, memvalidasi, dan mengirim sinyal jika ada peluang yang memenuhi syarat."""
global active_signals, signal_cooldown
profile_config = symbol_config['strategy_profiles'][profile_name]
confidence_threshold = profile_config['confidence_threshold']
if abs(opp['score']) < confidence_threshold:
logging.info("⏳ [%s|%s|%s] SINYAL WAIT. Skor (%.2f) di bawah threshold (%.1f).",
profile_name, symbol, tf, opp['score'], confidence_threshold)
return False
logging.info("✅ [%s|%s|%s] SINYAL DITEMUKAN! Peluang %s memenuhi syarat. Skor: %.2f (Min: %.1f).",
profile_name, symbol, tf, opp['signal'], opp['score'], confidence_threshold)
# Validasi kuantitatif (opsional) dengan XGBoost
if xgb_model and opp.get('features') is not None:
try:
feat = np.array(opp['features'])
if feat.size > 0:
logging.info("[Catelya | %s|%s|%s] Memvalidasi probabilitas...",
profile_name, symbol, tf)
features = feat.reshape(1, -1)
proba = xgb_model.predict_proba(features)[0][1]
min_p = float(profile_config.get('xgboost_confidence_threshold', 0.75))
if proba < min_p:
logging.warning("[Catelya | %s|%s|%s] Prob. rendah (%.2f%% < %.2f%%). BATAL.",
profile_name, symbol, tf, proba * 100, min_p * 100)
return False
logging.info("[Catelya | %s|%s|%s] Prob. sukses: %.2f%%. LANJUT.",
profile_name, symbol, tf, proba * 100)
except Exception as e:
logging.error("[Catelya] Gagal validasi XGB: %s", e, exc_info=True)
# --- Guard config wajib ---
for k in ("api_key", "server_url", "secret_key", "mt5_terminal_path"):
if not global_config.get(k):
logging.error("Global config '%s' kosong. Batalkan pengiriman.", k)
return False
# --- Gate: jarak minimal terhadap order/posisi aktif ---
try:
existing = get_active_orders(symbol, global_config['mt5_terminal_path'])
except Exception as e:
logging.error("Gagal ambil order aktif: %s", e)
existing = []
# pip size by symbol dari config (fallback 0.0001)
pip_size = (
global_config.get('pip_size_by_symbol', {}).get(symbol.upper())
or 0.0001
)
min_pips = profile_config['min_distance_pips_per_tf'].get(tf, 10)
if not is_far_enough(float(opp['entry_price_chosen']), existing, pip_size, min_pips):
logging.info("[%s|%s|%s] Entry terlalu dekat dgn order aktif. Skip.",
profile_name, symbol, tf)
return False
# Gunakan order_type yg valid: jika None -> fallback ke signal (BUY/SELL)
order_type_to_use = opp.get('order_type') or opp['signal']
signal_json = build_signal_format(
symbol=symbol,
entry_price=float(opp['entry_price_chosen']),
direction=opp['signal'],
sl=float(opp['sl']),
tp=float(opp['tp']),
order_type=order_type_to_use,
)
if not signal_json:
logging.warning("[%s|%s|%s] build_signal_format gagal (order_type invalid). Skip.",
profile_name, symbol, tf)
return False
payload = {
"symbol": symbol,
"signal_json": signal_json,
"api_key": global_config['api_key'],
"server_url": global_config['server_url'],
"secret_key": global_config['secret_key'],
"order_type": order_type_to_use,
"score": opp.get('score'),
"info": opp.get('info'),
"profile_name": profile_name,
}
send_status = send_signal_to_server(**payload)
if send_status == 'SUCCESS':
sig_id = make_signal_id(signal_json)
active_signals.setdefault(symbol, {})[sig_id] = {
'signal_json': signal_json,
'tf': tf,
'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
'profile': profile_name,
}
logging.info("[%s|%s|%s] Sinyal berhasil dikirim!", profile_name, symbol, tf)
# Cooldown per (symbol, profile)
signal_cooldown[(symbol, profile_name)] = datetime.now()
return True
logging.error("[%s|%s|%s] Pengiriman sinyal GAGAL. Status: %s",
profile_name, symbol, tf, send_status)
return False
def process_symbol(
symbol: str,
symbol_config: Dict[str, Any],
global_config: Dict[str, Any],
models: Dict[str, Any],
adapted_weights_per_profile: Dict[str, Dict[str, float]],
):
"""Menjalankan seluruh siklus analisis untuk satu simbol."""
global signal_cooldown
logging.info("--- Memproses Simbol: '%s' ---", symbol)
for profile_name, profile_config in symbol_config.get("strategy_profiles", {}).items():
if not profile_config.get("enabled", False):
continue
# Cooldown per-profil
cd_key = (symbol, profile_name)
if cd_key in signal_cooldown:
cooldown_minutes = profile_config.get('signal_cooldown_minutes', 1)
elapsed = (datetime.now() - signal_cooldown[cd_key]).total_seconds() / 60
if elapsed < cooldown_minutes:
logging.info("[%s|%s] Cooldown aktif untuk profil '%s' (sisa: %.1f mnt).",
symbol, profile_name, profile_name, cooldown_minutes - elapsed)
continue
logging.info("--- Memproses Profil: '%s' untuk Simbol '%s' ---", profile_name, symbol)
adapted_weights = adapted_weights_per_profile.get(symbol, {}).get(
profile_name, symbol_config.get("base_weights", {})
)
# Prefilter confidence agar analyzer tidak memuntahkan terlalu banyak kandidat lemah
prefilter = profile_config.get('prefilter_confidence', 0.5)
for tf in profile_config['timeframes']:
logging.info("[%s|%s|%s] Menganalisis...", profile_name, symbol, tf)
try:
opp = analyze_tf_opportunity(
symbol=symbol,
tf=tf,
mt5_path=global_config['mt5_terminal_path'],
gng_model=None,
gng_feature_stats={},
confidence_threshold=prefilter,
min_distance_pips_per_tf=profile_config['min_distance_pips_per_tf'],
weights=adapted_weights if isinstance(adapted_weights, dict) else {},
htf_bias=profile_config.get('htf_bias', 'NEUTRAL'),
htf_config=profile_config.get(
'htf_config',
{"enabled": True, "bias_influence_score": 2.0, "penalty_score": -3.0},
),
)
if opp and opp.get('signal') != "WAIT":
signal_sent = handle_opportunity(
opp, symbol, tf, symbol_config, global_config, models['xgb'].get(symbol), profile_name
)
if signal_sent:
# Berhenti menganalisis timeframe lain jika sinyal sudah terkirim untuk profil ini
break
except Exception as e:
logging.error("Error saat menganalisis %s|%s|%s: %s", profile_name, symbol, tf, e, exc_info=True)
def run_trading_bot(
symbol: str,
timeframe: str,
mt5_path: str,
profile_type: str = "scalping", # ["scalping", "intraday", "swing"]
config_path: str = "config.json"
):
"""
Main trading bot function with profile selection
"""
# Load config
with open(config_path, 'r') as f:
config = json.load(f)
symbol_config = config.get(symbol, {})
# Validate profile type
if profile_type not in ["scalping", "intraday", "swing"]:
raise ValueError("profile_type harus salah satu dari: scalping, intraday, atau swing")
# Get profile specific settings
profile_settings = symbol_config.get("strategy_profiles", {}).get(profile_type, {})
if not profile_settings:
raise ValueError(f"Profile {profile_type} tidak ditemukan dalam konfigurasi untuk {symbol}")
if not profile_settings.get("enabled", False):
raise ValueError(f"Profile {profile_type} tidak diaktifkan untuk {symbol}")
# Initialize analysis with profile settings
analysis = analyze_tf_opportunity(
symbol=symbol,
tf=timeframe,
mt5_path=mt5_path,
gng_model=None,
gng_feature_stats=None,
confidence_threshold=profile_settings.get("confidence_threshold", 0.5),
min_distance_pips_per_tf=profile_settings.get("min_distance_pips_per_tf", {}),
weights=symbol_config.get("base_weights", {}),
htf_bias=profile_settings.get("htf_bias", "NEUTRAL"),
htf_config=profile_settings.get(
"htf_config",
{"enabled": True, "bias_influence_score": 2.0, "penalty_score": -3.0},
)
)
if analysis:
logging.info(f"Analysis result for {symbol} ({profile_type} profile):")
logging.info(f"Score: {analysis['score']}")
logging.info(f"Info: {', '.join(analysis['info'])}")
# Apply profile specific risk management
if analysis['score'] >= profile_settings.get("confidence_threshold", 5.0):
position_size = calculate_position_size(
symbol=symbol,
risk_percent=config["risk_management"]["position_sizing"]["default_risk_percent"],
profile_type=profile_type
)
# Generate signal with profile specific parameters
signal = generate_trading_signal(
symbol=symbol,
analysis=analysis,
position_size=position_size,
profile_settings=profile_settings
)
if signal:
logging.info(f"Generated signal for {symbol}: {signal}")
return signal
return None
def main():
"""Fungsi utama untuk menjalankan bot."""
global active_signals, signal_cooldown
config = load_config()
global_config = config.get('global_settings', {})
active_signals = {}
signal_cooldown = {}
# Inisialisasi model (XGB)
_, xgb_models = initialize_models(config)
all_models = {'xgb': xgb_models}
logging.info("==================================================")
logging.info("Bot Trading AI v5.0 (Per-Symbol Profiles) Siap Beraksi!")
logging.info("==================================================")
try:
while True:
# Adaptasi bobot per profil (learning loop)
adapted_weights = analyze_and_adapt_profiles(config)
logging.info("--- Memulai Siklus Analisis Baru ---")
symbols_to_process = [
s for s in config.keys()
if s not in ["global_settings", "logging", "learning", "trial_user_settings"]
]
for symbol in symbols_to_process:
symbol_config = config[symbol]
process_symbol(symbol, symbol_config, global_config, all_models, adapted_weights)
sleep_duration = int(global_config.get('main_loop_sleep_seconds', 20))
logging.info("Semua simbol telah dianalisis. Istirahat %d detik...", sleep_duration)
time.sleep(sleep_duration)
except KeyboardInterrupt:
logging.info("Perintah berhenti diterima. Bot akan dimatikan.")
finally:
logging.info("Aplikasi Selesai.")
if __name__ == '__main__':
main()