321 lines
14 KiB
Python
321 lines
14 KiB
Python
"""
|
|
gng_model.py (ADVANCED, FULL VERSION)
|
|
=====================================
|
|
|
|
Implementasi Growing Neural Gas (GNG) trading dengan dukungan fitur multi-dimensi,
|
|
integrasi technical_indicators.py refactor, retrain, dan statistik min/max.
|
|
Tetap support pipeline lama & baru (backward compatible).
|
|
|
|
Copyright (c) 2024.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
import pickle
|
|
from typing import Dict, Tuple, Optional, List, Any
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
|
|
from technical_indicators import (
|
|
extract_features_full,
|
|
detect_structure,
|
|
detect_order_blocks_multi,
|
|
detect_fvg_multi,
|
|
detect_engulfing,
|
|
detect_pinbar,
|
|
get_daily_high_low,
|
|
get_pivot_points,
|
|
)
|
|
|
|
# ==================== GNG MODEL ====================
|
|
|
|
class GrowingNeuralGas:
|
|
"""
|
|
Growing Neural Gas (GNG) dengan update node/edge, training, dan scoring fitur.
|
|
Mendukung pipeline multi-dimensi (AI/ML-ready).
|
|
"""
|
|
|
|
def __init__(self, max_nodes: int = 100, input_dim: int = 1):
|
|
self.max_nodes = max_nodes
|
|
self.input_dim = input_dim
|
|
self.nodes: List[Dict[str, Any]] = []
|
|
self.edges: List[List[int]] = []
|
|
self.input_count = 0
|
|
|
|
def initialize_nodes(self, data: np.ndarray) -> bool:
|
|
if len(data) < 2:
|
|
logging.warning("GNG: Tidak cukup data untuk node awal.")
|
|
return False
|
|
if data.shape[1] != self.input_dim:
|
|
logging.error(f"GNG: Dimensi data ({data.shape[1]}) tidak cocok input_dim ({self.input_dim})")
|
|
return False
|
|
idx = np.random.choice(len(data), 2, replace=False)
|
|
self.nodes = [
|
|
{'w': data[idx[0]].astype(float), 'error': 0.0, 'edges': [], 'win_count': 0, 'loss_count': 0, 'age_of_node': 0},
|
|
{'w': data[idx[1]].astype(float), 'error': 0.0, 'edges': [], 'win_count': 0, 'loss_count': 0, 'age_of_node': 0}
|
|
]
|
|
self.edges = []
|
|
return True
|
|
|
|
def fit(self, data: np.ndarray, num_iterations: int = 5) -> bool:
|
|
if not self.nodes:
|
|
if not self.initialize_nodes(data):
|
|
logging.warning("GNG: Gagal inisialisasi node.")
|
|
return False
|
|
alpha_bmu = 0.5
|
|
alpha_neighbor = 0.01
|
|
age_increment = 1
|
|
max_edge_age = 50
|
|
error_decay_rate = 0.95
|
|
for iteration in range(num_iterations):
|
|
np.random.shuffle(data)
|
|
for x_input in data:
|
|
self.input_count += 1
|
|
if not self.nodes:
|
|
break
|
|
distances = np.array([np.linalg.norm(x_input - node['w']) for node in self.nodes])
|
|
s1_idx = np.argmin(distances)
|
|
s1 = self.nodes[s1_idx]
|
|
s1['error'] += np.linalg.norm(x_input - s1['w'])
|
|
s1['w'] += alpha_bmu * (x_input - s1['w'])
|
|
s1['age_of_node'] += 1
|
|
# Update neighbors
|
|
for neighbor_idx in s1['edges']:
|
|
s_n = self.nodes[neighbor_idx]
|
|
s_n['w'] += alpha_neighbor * (x_input - s_n['w'])
|
|
# Increment edge ages
|
|
for i in range(len(self.edges)):
|
|
edge = self.edges[i]
|
|
if (edge[0] == s1_idx and edge[1] in s1['edges']) or \
|
|
(edge[1] == s1_idx and edge[0] in s1['edges']):
|
|
self.edges[i][2] += age_increment
|
|
# Remove old edges
|
|
new_edges = []
|
|
for edge in self.edges:
|
|
if len(edge) < 3 or edge[2] <= max_edge_age:
|
|
new_edges.append(edge)
|
|
self.edges = new_edges
|
|
# Connect s1 with s2 (second closest)
|
|
if len(self.nodes) > 1:
|
|
s2_idx = np.argsort(distances)[1]
|
|
edge_exists = False
|
|
for edge in self.edges:
|
|
if (edge[0] == s1_idx and edge[1] == s2_idx) or (edge[1] == s1_idx and edge[0] == s2_idx):
|
|
edge_exists = True
|
|
if len(edge) > 2:
|
|
edge[2] = 0
|
|
break
|
|
if not edge_exists:
|
|
self.edges.append([s1_idx, s2_idx, 0])
|
|
if s2_idx not in self.nodes[s1_idx]['edges']:
|
|
self.nodes[s1_idx]['edges'].append(s2_idx)
|
|
if s1_idx not in self.nodes[s2_idx]['edges']:
|
|
self.nodes[s2_idx]['edges'].append(s1_idx)
|
|
# Remove isolated nodes
|
|
nodes_to_remove: List[int] = []
|
|
for i, node in enumerate(self.nodes):
|
|
if not node['edges'] and node['age_of_node'] > 10:
|
|
nodes_to_remove.append(i)
|
|
for idx_to_remove in sorted(nodes_to_remove, reverse=True):
|
|
del self.nodes[idx_to_remove]
|
|
for edge in self.edges:
|
|
if edge[0] > idx_to_remove:
|
|
edge[0] -= 1
|
|
if edge[1] > idx_to_remove:
|
|
edge[1] -= 1
|
|
for node in self.nodes:
|
|
node['edges'] = [e_idx - 1 if e_idx > idx_to_remove else e_idx for e_idx in node['edges']]
|
|
# Decay errors
|
|
for node in self.nodes:
|
|
node['error'] *= error_decay_rate
|
|
return True
|
|
|
|
# =============== FITUR & NORMALISASI ===============
|
|
|
|
def _normalize_features(features: np.ndarray, min_vals: np.ndarray, max_vals: np.ndarray) -> np.ndarray:
|
|
normalized_features = np.zeros_like(features, dtype=float)
|
|
for i in range(len(features)):
|
|
val_range = max_vals[i] - min_vals[i]
|
|
if val_range != 0:
|
|
normalized_features[i] = (features[i] - min_vals[i]) / val_range
|
|
else:
|
|
normalized_features[i] = 0.5
|
|
return normalized_features
|
|
|
|
def prepare_features_from_df(df: pd.DataFrame) -> Tuple[np.ndarray, str, List[Dict[str, Any]], List[Dict[str, Any]], List[Dict[str, Any]], Dict[str, Any], Dict[str, float]]:
|
|
structure, swing_points = detect_structure(df)
|
|
ob_list = detect_order_blocks_multi(df, structure_filter=structure)
|
|
fvg_list = detect_fvg_multi(df)
|
|
patterns = detect_engulfing(df) + detect_pinbar(df)
|
|
boundary = get_daily_high_low(df)
|
|
pivot = get_pivot_points(df)
|
|
features = extract_features_full(df, structure, ob_list, fvg_list, patterns, boundary, pivot)
|
|
return features, structure, ob_list, fvg_list, patterns, boundary, pivot
|
|
|
|
def get_gng_input_features(
|
|
df: pd.DataFrame,
|
|
order_blocks: List[Dict[str, Any]],
|
|
fvg_zones: List[Dict[str, Any]],
|
|
tf_name: str,
|
|
gng_feature_stats: Dict[str, Dict[str, Optional[np.ndarray]]]
|
|
) -> np.ndarray:
|
|
# Deprecated, kept for backward compatibility.
|
|
# Gunakan get_gng_input_features_full() untuk pipeline baru.
|
|
structure, swing_points = detect_structure(df)
|
|
patterns = detect_engulfing(df) + detect_pinbar(df)
|
|
boundary = get_daily_high_low(df)
|
|
pivot = get_pivot_points(df)
|
|
features = extract_features_full(df, structure, order_blocks, fvg_zones, patterns, boundary, pivot)
|
|
if tf_name in gng_feature_stats and gng_feature_stats[tf_name]['min'] is not None:
|
|
min_vals = gng_feature_stats[tf_name]['min']
|
|
max_vals = gng_feature_stats[tf_name]['max']
|
|
if len(features) == len(min_vals) and len(features) == len(max_vals):
|
|
normalized_features = _normalize_features(features, min_vals, max_vals)
|
|
else:
|
|
logging.error(f"Fitur/Stat tidak cocok. Fitur: {len(features)}, Stat: {len(min_vals)}")
|
|
normalized_features = features
|
|
else:
|
|
normalized_features = features
|
|
return normalized_features
|
|
|
|
def get_gng_input_features_full(
|
|
df: pd.DataFrame,
|
|
gng_feature_stats: Dict[str, Dict[str, Optional[np.ndarray]]],
|
|
tf_name: str
|
|
) -> np.ndarray:
|
|
features, *_ = prepare_features_from_df(df)
|
|
if tf_name in gng_feature_stats and gng_feature_stats[tf_name]['min'] is not None:
|
|
min_vals = gng_feature_stats[tf_name]['min']
|
|
max_vals = gng_feature_stats[tf_name]['max']
|
|
if len(features) == len(min_vals) and len(features) == len(max_vals):
|
|
normalized_features = _normalize_features(features, min_vals, max_vals)
|
|
else:
|
|
logging.error(f"Fitur/Stat tidak cocok. Fitur: {len(features)}, Stat: {len(min_vals)}")
|
|
normalized_features = features
|
|
else:
|
|
normalized_features = features
|
|
return normalized_features
|
|
|
|
# ================= ZONA GNG CONTEXT =================
|
|
|
|
def get_gng_context(gng_input_features: np.ndarray, gng_model: GrowingNeuralGas) -> Tuple[int, str]:
|
|
if not gng_model or not hasattr(gng_model, 'nodes') or not gng_model.nodes:
|
|
return 0, "No GNG Model"
|
|
nodes_w = np.array([node['w'] for node in gng_model.nodes])
|
|
if len(nodes_w) == 0:
|
|
return 0, "No GNG Node"
|
|
if gng_input_features.shape[0] != gng_model.input_dim:
|
|
return 0, "Dimension Mismatch"
|
|
distances = np.array([np.linalg.norm(gng_input_features - node_w) for node_w in nodes_w])
|
|
nearest_dist = np.min(distances)
|
|
nearest_node_idx = np.argmin(distances)
|
|
distance_threshold_gng = 0.15
|
|
if nearest_dist < distance_threshold_gng:
|
|
return 1, f"Dekat zona GNG (node#{nearest_node_idx}, dist:{nearest_dist:.3f})"
|
|
return 0, "Jauh dari zona GNG"
|
|
|
|
# ============= SAVE / LOAD MODEL & STATS =============
|
|
|
|
def save_gng_model(tf: str, model: GrowingNeuralGas, model_dir: str) -> None:
|
|
os.makedirs(model_dir, exist_ok=True)
|
|
path = os.path.join(model_dir, f"gng_{tf}.pkl")
|
|
try:
|
|
with open(path, 'wb') as f:
|
|
pickle.dump(model, f)
|
|
logging.info(f"Model GNG {tf} disimpan ke {path}")
|
|
except Exception as e:
|
|
logging.error(f"Gagal simpan model GNG {tf}: {e}")
|
|
|
|
def load_gng_model(tf: str, model_dir: str) -> Optional[GrowingNeuralGas]:
|
|
path = os.path.join(model_dir, f"gng_{tf}.pkl")
|
|
if os.path.exists(path):
|
|
try:
|
|
with open(path, 'rb') as f:
|
|
model = pickle.load(f)
|
|
logging.info(f"Model GNG {tf} dimuat dari {path}")
|
|
return model
|
|
except Exception as e:
|
|
logging.warning(f"Gagal muat model GNG {tf} dari {path}: {e}")
|
|
return None
|
|
|
|
def _calculate_feature_stats(df_list: List[pd.DataFrame]) -> Tuple[np.ndarray, np.ndarray]:
|
|
features_list = []
|
|
for df in df_list:
|
|
feats, *_ = prepare_features_from_df(df)
|
|
features_list.append(feats)
|
|
features_array = np.array(features_list)
|
|
min_vals = np.min(features_array, axis=0)
|
|
max_vals = np.max(features_array, axis=0)
|
|
return min_vals, max_vals
|
|
|
|
def initialize_gng_models(
|
|
symbol: str,
|
|
timeframes: List[str],
|
|
model_dir: str,
|
|
mt5_path: str,
|
|
get_data_func,
|
|
) -> Tuple[Dict[str, GrowingNeuralGas], Dict[str, Dict[str, Optional[np.ndarray]]]]:
|
|
gng_models: Dict[str, GrowingNeuralGas] = {}
|
|
gng_feature_stats: Dict[str, Dict[str, Optional[np.ndarray]]] = {}
|
|
os.makedirs(model_dir, exist_ok=True)
|
|
# Tentukan dimensi input dari feature extractor refactor
|
|
sample_df_for_dim = get_data_func(symbol, timeframes[0], 100, mt5_path)
|
|
if sample_df_for_dim is None or len(sample_df_for_dim) < 20:
|
|
logging.critical("Tidak cukup data awal untuk fitur GNG.")
|
|
return {}, {}
|
|
sample_features, *_ = prepare_features_from_df(sample_df_for_dim)
|
|
input_dim = len(sample_features)
|
|
for tf in timeframes:
|
|
loaded_model = load_gng_model(tf, model_dir)
|
|
stats_path = os.path.join(model_dir, f"gng_{tf}_stats.pkl")
|
|
loaded_stats: Optional[Dict[str, np.ndarray]] = None
|
|
if os.path.exists(stats_path):
|
|
try:
|
|
with open(stats_path, 'rb') as f:
|
|
loaded_stats = pickle.load(f)
|
|
logging.info(f"Statistik fitur GNG {tf} dimuat dari {stats_path}")
|
|
except Exception as e:
|
|
logging.warning(f"Gagal muat statistik GNG {tf} dari {stats_path}: {e}")
|
|
rebuild_model = False
|
|
if loaded_model is None or not hasattr(loaded_model, 'input_dim') or loaded_model.input_dim != input_dim:
|
|
rebuild_model = True
|
|
if loaded_stats is None or loaded_stats.get('min') is None or (loaded_stats.get('min') is not None and loaded_stats['min'].shape[0] != input_dim):
|
|
rebuild_model = True
|
|
if rebuild_model:
|
|
logging.info(f"Build ulang model GNG/statistik fitur untuk TF {tf}.")
|
|
df_hist = get_data_func(symbol, tf, 1500, mt5_path)
|
|
if df_hist is None or len(df_hist) < 100:
|
|
gng_models[tf] = GrowingNeuralGas(max_nodes=100, input_dim=input_dim)
|
|
gng_feature_stats[tf] = {'min': None, 'max': None}
|
|
continue
|
|
# Split rolling window batch untuk fitur statistik
|
|
window = 50
|
|
df_batches = [df_hist.iloc[i-window:i+1] for i in range(window, len(df_hist)-1)]
|
|
min_vals, max_vals = _calculate_feature_stats(df_batches)
|
|
gng_feature_stats[tf] = {'min': min_vals, 'max': max_vals}
|
|
try:
|
|
with open(stats_path, 'wb') as f:
|
|
pickle.dump(gng_feature_stats[tf], f)
|
|
except Exception as e:
|
|
logging.error(f"Gagal simpan statistik GNG {tf}: {e}")
|
|
# Build data untuk training
|
|
gng_data_for_fit: List[np.ndarray] = []
|
|
for df_sub in df_batches:
|
|
feats, *_ = prepare_features_from_df(df_sub)
|
|
normalized_feats = _normalize_features(feats, min_vals, max_vals)
|
|
if normalized_feats is not None and len(normalized_feats) == input_dim:
|
|
gng_data_for_fit.append(normalized_feats)
|
|
model = GrowingNeuralGas(max_nodes=100, input_dim=input_dim)
|
|
if len(gng_data_for_fit) > 1:
|
|
model.fit(np.array(gng_data_for_fit))
|
|
save_gng_model(tf, model, model_dir)
|
|
gng_models[tf] = model
|
|
else:
|
|
gng_models[tf] = loaded_model
|
|
gng_feature_stats[tf] = loaded_stats
|
|
return gng_models, gng_feature_stats
|
|
|
|
# ===================== END OF MODULE =====================
|