markaz_arshy/analyze_market.py
2025-08-12 14:36:24 +00:00

691 lines
26 KiB
Python

import pandas as pd
import numpy as np
from typing import Dict, Any, Tuple, List, Optional
from datetime import datetime
from enum import Enum
import talib as ta
class TradingProfile(Enum):
SCALPER = "scalper"
INTRADAY = "intraday"
SWING = "swing"
POSITION = "position"
class ProfileAnalyzer:
"""Kelas untuk menganalisis market sesuai profil trading"""
def __init__(self, profile: TradingProfile):
self.profile = profile
def analyze(self, df: pd.DataFrame) -> Dict[str, Any]:
"""Analisis berdasarkan profil trading spesifik"""
if self.profile == TradingProfile.SCALPER:
return self._analyze_scalper(df)
elif self.profile == TradingProfile.INTRADAY:
return self._analyze_intraday(df)
elif self.profile == TradingProfile.SWING:
return self._analyze_swing(df)
else: # POSITION
return self._analyze_position(df)
def _analyze_scalper(self, df: pd.DataFrame) -> Dict[str, Any]:
"""Analisis khusus untuk scalper"""
# Fokus pada timeframe pendek (1-5 menit)
result = {
'volatility': self._calculate_micro_volatility(df),
'momentum': self._calculate_short_momentum(df),
'spread_impact': self._analyze_spread_impact(df),
'micro_structure': self._analyze_micro_structure(df)
}
return result
def _analyze_intraday(self, df: pd.DataFrame) -> Dict[str, Any]:
"""Analisis khusus untuk intraday trader"""
# Fokus pada pergerakan dalam satu hari
result = {
'day_momentum': self._calculate_day_momentum(df),
'session_analysis': self._analyze_trading_session(df),
'key_levels': self._find_intraday_levels(df),
'volume_profile': self._analyze_volume_profile(df)
}
return result
def _analyze_swing(self, df: pd.DataFrame) -> Dict[str, Any]:
"""Analisis khusus untuk swing trader"""
# Fokus pada multiple timeframe analysis
result = {
'trend_strength': self._calculate_trend_strength(df),
'key_support_resistance': self._find_key_levels(df),
'market_structure': self._analyze_market_structure(df),
'momentum_divergence': self._find_divergences(df)
}
return result
def _analyze_position(self, df: pd.DataFrame) -> Dict[str, Any]:
"""Analisis khusus untuk position trader"""
# Fokus pada analisis fundamental dan trend jangka panjang
result = {
'major_trend': self._analyze_major_trend(df),
'structural_changes': self._detect_structural_changes(df),
'volatility_regime': self._analyze_volatility_regime(df),
'market_phase': self._determine_market_phase(df)
}
return result
# --- Helper Methods untuk Scalper ---
def _calculate_micro_volatility(self, df: pd.DataFrame) -> Dict[str, float]:
"""Hitung volatilitas mikro untuk scalping"""
if len(df) < 20:
return {'current': 0.0, 'average': 0.0}
df['micro_vol'] = (df['high'] - df['low']) / df['low'] * 100
current = float(df['micro_vol'].iloc[-1])
average = float(df['micro_vol'].rolling(20).mean().iloc[-1])
return {'current': current, 'average': average}
def _calculate_short_momentum(self, df: pd.DataFrame) -> Dict[str, Any]:
"""Analisis momentum jangka pendek"""
if len(df) < 14:
return {'rsi': 50.0, 'momentum': 0.0}
# Hitung RSI
delta = df['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
# Hitung momentum
momentum = df['close'].iloc[-1] - df['close'].iloc[-5]
return {
'rsi': float(rsi.iloc[-1]),
'momentum': float(momentum)
}
def _analyze_spread_impact(self, df: pd.DataFrame) -> Dict[str, float]:
"""Analisis dampak spread pada scalping"""
if 'spread' not in df.columns or len(df) < 20:
return {'spread_ratio': 1.0}
avg_range = (df['high'] - df['low']).rolling(20).mean()
spread_ratio = df['spread'] / avg_range
return {'spread_ratio': float(spread_ratio.iloc[-1])}
def _analyze_micro_structure(self, df: pd.DataFrame) -> Dict[str, Any]:
"""Analisis struktur mikro pasar"""
if len(df) < 20:
return {'pattern': 'unknown', 'strength': 0.0}
# Deteksi pola-pola price action
last_candles = df.tail(3)
pattern = self._detect_candlestick_pattern(last_candles)
strength = self._calculate_pattern_strength(last_candles)
return {'pattern': pattern, 'strength': strength}
# --- Helper Methods untuk Intraday ---
def _calculate_day_momentum(self, df: pd.DataFrame) -> Dict[str, float]:
"""Hitung momentum harian"""
if len(df) < 24: # Minimal 24 candle untuk data harian
return {'momentum': 0.0}
day_open = df['open'].iloc[0]
current_price = df['close'].iloc[-1]
momentum = (current_price - day_open) / day_open * 100
return {'momentum': float(momentum)}
def _analyze_trading_session(self, df: pd.DataFrame) -> Dict[str, str]:
"""Analisis sesi trading (Asia, London, New York)"""
current_hour = pd.Timestamp.now().hour
if 0 <= current_hour < 8:
session = "ASIAN"
elif 8 <= current_hour < 16:
session = "LONDON"
else:
session = "NEW_YORK"
return {'current_session': session}
def _find_intraday_levels(self, df: pd.DataFrame) -> Dict[str, List[float]]:
"""Temukan level-level penting intraday"""
if len(df) < 100:
return {'support': [], 'resistance': []}
# Identifikasi swing high/low
highs = self._find_swing_points(df['high'], 'high')
lows = self._find_swing_points(df['low'], 'low')
return {
'support': [float(x) for x in lows],
'resistance': [float(x) for x in highs]
}
# --- Helper Methods untuk Swing Trading ---
def _calculate_trend_strength(self, df: pd.DataFrame) -> Dict[str, float]:
"""Hitung kekuatan trend untuk swing trading"""
if len(df) < 50:
return {'strength': 0.0}
# Hitung ADX
tr1 = df['high'] - df['low']
tr2 = abs(df['high'] - df['close'].shift())
tr3 = abs(df['low'] - df['close'].shift())
tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
atr = tr.rolling(14).mean()
dx = atr.diff() / atr * 100
adx = abs(dx).rolling(14).mean()
return {'strength': float(adx.iloc[-1])}
def _find_key_levels(self, df: pd.DataFrame) -> Dict[str, List[float]]:
"""Identifikasi level kunci untuk swing trading"""
if len(df) < 100:
return {'major_levels': []}
# Temukan level-level yang sering ditest
price_rounds = np.round(df['close'] / 10) * 10
level_counts = price_rounds.value_counts()
major_levels = level_counts[level_counts > level_counts.mean()].index.tolist()
return {'major_levels': [float(x) for x in major_levels]}
# --- Helper Methods untuk Position Trading ---
def _analyze_major_trend(self, df: pd.DataFrame) -> Dict[str, str]:
"""Analisis trend utama untuk position trading"""
if len(df) < 200:
return {'trend': 'UNDEFINED'}
sma200 = df['close'].rolling(200).mean()
current_price = df['close'].iloc[-1]
if current_price > sma200.iloc[-1]:
trend = 'BULLISH'
else:
trend = 'BEARISH'
return {'trend': trend}
def _detect_structural_changes(self, df: pd.DataFrame) -> Dict[str, Any]:
"""Deteksi perubahan struktur pasar jangka panjang"""
if len(df) < 100:
return {'change_detected': False}
# Deteksi dengan moving average crossover
ma50 = df['close'].rolling(50).mean()
ma200 = df['close'].rolling(200).mean()
cross_over = (ma50.iloc[-2] <= ma200.iloc[-2]) and (ma50.iloc[-1] > ma200.iloc[-1])
cross_under = (ma50.iloc[-2] >= ma200.iloc[-2]) and (ma50.iloc[-1] < ma200.iloc[-1])
return {
'change_detected': cross_over or cross_under,
'type': 'BULLISH' if cross_over else 'BEARISH' if cross_under else 'NONE'
}
def _find_swing_points(self, series: pd.Series, point_type: str, window: int = 5) -> List[float]:
"""Helper untuk menemukan swing points"""
points = []
for i in range(window, len(series) - window):
if point_type == 'high':
if all(series[i] > series[i-window:i]) and all(series[i] > series[i+1:i+window+1]):
points.append(series[i])
else: # low
if all(series[i] < series[i-window:i]) and all(series[i] < series[i+1:i+window+1]):
points.append(series[i])
return points[-3:] if points else [] # Return 3 terakhir saja
def _detect_candlestick_pattern(self, candles: pd.DataFrame) -> str:
"""Helper untuk deteksi pola candlestick"""
if len(candles) < 3:
return "unknown"
# Contoh sederhana: deteksi bullish engulfing
prev_candle = candles.iloc[-2]
curr_candle = candles.iloc[-1]
if (prev_candle['close'] < prev_candle['open'] and # Previous bearish
curr_candle['close'] > curr_candle['open'] and # Current bullish
curr_candle['open'] < prev_candle['close'] and # Opens below prev close
curr_candle['close'] > prev_candle['open']): # Closes above prev open
return "bullish_engulfing"
return "no_pattern"
def _calculate_pattern_strength(self, candles: pd.DataFrame) -> float:
"""Helper untuk hitung kekuatan pola"""
if len(candles) < 3:
return 0.0
# Hitung berdasarkan ukuran relatif candle dan volume
curr_candle = candles.iloc[-1]
body_size = abs(curr_candle['close'] - curr_candle['open'])
avg_body = abs(candles['close'] - candles['open']).mean()
strength = body_size / avg_body if avg_body != 0 else 1.0
return min(strength, 1.0) # Normalize to max 1.0
def calculate_slope(df: pd.DataFrame, length: int = 14, method: str = 'atr', multiplier: float = 1.0) -> float:
"""
Menghitung slope untuk trendline berdasarkan beberapa metode.
Args:
df: DataFrame dengan data OHLCV
length: Periode lookback
method: Metode perhitungan ('atr', 'stdev', 'linreg')
multiplier: Pengali slope
Returns:
float: Nilai slope
"""
if method == 'atr':
# ATR calculation
high_low = df['high'] - df['low']
high_close = abs(df['high'] - df['close'].shift())
low_close = abs(df['low'] - df['close'].shift())
ranges = pd.concat([high_low, high_close, low_close], axis=1)
true_range = ranges.max(axis=1)
atr = true_range.rolling(length).mean()
return (atr.iloc[-1] / length) * multiplier
elif method == 'stdev':
return (df['close'].rolling(length).std().iloc[-1] / length) * multiplier
elif method == 'linreg':
# Linear regression slope
x = np.arange(len(df[-length:]))
y = df['close'].iloc[-length:].values
slope, _ = np.polyfit(x, y, 1)
return abs(slope) * multiplier
return 0.0
def detect_pivot_points(df: pd.DataFrame, length: int = 14) -> Dict[str, Any]:
"""
Mendeteksi pivot high dan pivot low.
Args:
df: DataFrame dengan data OHLCV
length: Periode lookback
Returns:
Dict dengan pivot high dan low
"""
highs = df['high'].rolling(2*length + 1, center=True).apply(
lambda x: 1 if x.iloc[length] == max(x) else 0
)
lows = df['low'].rolling(2*length + 1, center=True).apply(
lambda x: 1 if x.iloc[length] == min(x) else 0
)
pivot_high = df['high'].where(highs == 1)
pivot_low = df['low'].where(lows == 1)
return {
'pivot_high': pivot_high.dropna(),
'pivot_low': pivot_low.dropna()
}
def analyze_trendlines(df: pd.DataFrame, length: int = 14, slope_mult: float = 1.0,
calc_method: str = 'atr') -> Dict[str, Any]:
"""
Menganalisis trendline dan deteksi breakout.
Args:
df: DataFrame dengan data OHLCV
length: Periode untuk deteksi swing
slope_mult: Pengali untuk perhitungan slope
calc_method: Metode perhitungan slope ('atr', 'stdev', 'linreg')
Returns:
Dict dengan informasi trendline dan breakout
"""
# Deteksi pivot points
pivots = detect_pivot_points(df, length)
# Hitung slope
slope = calculate_slope(df, length, calc_method, slope_mult)
# Inisialisasi variabel
upper_trendline = None
lower_trendline = None
last_pivot_high = None
last_pivot_low = None
# Dapatkan pivot terakhir
if len(pivots['pivot_high']) > 0:
last_pivot_high = pivots['pivot_high'].iloc[-1]
if len(pivots['pivot_low']) > 0:
last_pivot_low = pivots['pivot_low'].iloc[-1]
# Hitung trendlines
current_price = df['close'].iloc[-1]
current_high = df['high'].iloc[-1]
current_low = df['low'].iloc[-1]
if last_pivot_high is not None:
bars_since_high = len(df) - pivots['pivot_high'].index[-1]
upper_trendline = last_pivot_high - (slope * bars_since_high)
if last_pivot_low is not None:
bars_since_low = len(df) - pivots['pivot_low'].index[-1]
lower_trendline = last_pivot_low + (slope * bars_since_low)
# Deteksi breakout
upward_break = False
downward_break = False
if upper_trendline is not None and current_high > upper_trendline:
upward_break = True
if lower_trendline is not None and current_low < lower_trendline:
downward_break = True
return {
'upper_trendline': upper_trendline,
'lower_trendline': lower_trendline,
'upward_break': upward_break,
'downward_break': downward_break,
'slope': slope,
'last_pivot_high': last_pivot_high,
'last_pivot_low': last_pivot_low
}
# --- Update existing get_market_context function ---
def get_market_context(df: pd.DataFrame, profile: Optional[str] = None) -> Dict[str, Any]:
"""Analisis lengkap konteks market dengan mempertimbangkan profil trading."""
# Base context tetap sama
context = {
'zones': detect_premium_discount_zones(df),
'structure': analyze_break_of_structure(df),
'equilibrium': detect_market_equilibrium(df)
}
# Tambahkan analisis trendline
trendline_analysis = analyze_trendlines(
df,
length=14, # Sesuaikan dengan kebutuhan
slope_mult=1.0,
calc_method='atr'
)
context['trendlines'] = trendline_analysis
# Tentukan bias market dengan mempertimbangkan trendline breaks
if trendline_analysis['upward_break']:
context['structure']['trendline_break'] = 'BULLISH'
elif trendline_analysis['downward_break']:
context['structure']['trendline_break'] = 'BEARISH'
else:
context['structure']['trendline_break'] = None
# Tambahkan analisis SMC zones dan profile-specific analysis
smc_zones = calculate_smc_zones(df)
zone_analysis = detect_zone_transition(df, smc_zones)
context['smc_analysis'] = {
'zones': smc_zones,
'current_state': zone_analysis
}
if profile:
try:
profile_enum = TradingProfile(profile.lower())
analyzer = ProfileAnalyzer(profile_enum)
profile_analysis = analyzer.analyze(df)
context['profile_analysis'] = profile_analysis
except ValueError:
context['profile_analysis'] = None
# Tentukan bias market berdasarkan semua analisis
if context['structure']['bos_detected']:
bias = context['structure']['bos_direction']
elif context['structure']['trendline_break']:
bias = context['structure']['trendline_break']
elif context['equilibrium']['equilibrium_detected']:
bias = 'NEUTRAL'
else:
current_price = df['close'].iloc[-1]
if current_price > context['zones']['premium']:
bias = 'BULLISH'
elif current_price < context['zones']['discount']:
bias = 'BEARISH'
else:
bias = 'NEUTRAL'
context['market_bias'] = bias
return context
def detect_premium_discount_zones(df: pd.DataFrame) -> Dict[str, float]:
"""Deteksi zona premium dan discount"""
if len(df) < 20:
return {'premium': df['high'].max(), 'discount': df['low'].min()}
# Gunakan Volume Profile untuk menentukan zona
volume_profile = analyze_volume_profile(df)
high_vol_zones = sorted([zone for zone, vol in volume_profile.items() if vol > volume_profile.mean()], reverse=True)
if len(high_vol_zones) >= 2:
premium = high_vol_zones[0]
discount = high_vol_zones[-1]
else:
# Fallback ke simple moving average
sma20 = df['close'].rolling(20).mean().iloc[-1]
premium = sma20 * 1.01 # 1% di atas SMA
discount = sma20 * 0.99 # 1% di bawah SMA
return {'premium': float(premium), 'discount': float(discount)}
def analyze_break_of_structure(df: pd.DataFrame) -> Dict[str, Any]:
"""Analisis Break of Structure (BOS)"""
if len(df) < 50:
return {'bos_detected': False, 'bos_direction': None, 'strength': 0.0}
# Temukan swing high/low terakhir
swing_highs = find_swing_points(df['high'], 'high', window=10)
swing_lows = find_swing_points(df['low'], 'low', window=10)
current_price = df['close'].iloc[-1]
# Cek break of structure
if swing_highs and current_price > max(swing_highs):
return {
'bos_detected': True,
'bos_direction': 'BULLISH',
'strength': (current_price - max(swing_highs)) / df['atr'].iloc[-1]
}
elif swing_lows and current_price < min(swing_lows):
return {
'bos_detected': True,
'bos_direction': 'BEARISH',
'strength': (min(swing_lows) - current_price) / df['atr'].iloc[-1]
}
return {'bos_detected': False, 'bos_direction': None, 'strength': 0.0}
def detect_market_equilibrium(df: pd.DataFrame) -> Dict[str, Any]:
"""Deteksi keseimbangan market (equilibrium)"""
if len(df) < 20:
return {'equilibrium_detected': False, 'confidence': 0.0}
# Hitung volatilitas relatif
atr = df['atr'].iloc[-1]
recent_range = df['high'].iloc[-5:].max() - df['low'].iloc[-5:].min()
# Hitung volume relatif
avg_volume = df['volume'].rolling(20).mean().iloc[-1]
current_volume = df['volume'].iloc[-1]
# Deteksi kondisi equilibrium
low_volatility = recent_range < atr * 0.5
declining_volume = current_volume < avg_volume * 0.7
equilibrium_detected = low_volatility and declining_volume
# Hitung tingkat keyakinan
if equilibrium_detected:
volatility_ratio = 1 - (recent_range / (atr * 0.5))
volume_ratio = 1 - (current_volume / (avg_volume * 0.7))
confidence = (volatility_ratio + volume_ratio) / 2
else:
confidence = 0.0
return {
'equilibrium_detected': equilibrium_detected,
'confidence': float(confidence)
}
def analyze_volume_profile(df: pd.DataFrame) -> Dict[float, float]:
"""Analisis Volume Profile untuk menentukan area interest"""
if len(df) < 100:
return {}
# Buat price bins
price_range = df['high'].max() - df['low'].min()
bin_size = price_range / 20 # 20 bins
# Hitung volume untuk setiap price level
price_points = (df['high'] + df['low']) / 2 # Point of Control
volume_profile = {}
for price in np.arange(df['low'].min(), df['high'].max(), bin_size):
mask = (price_points >= price) & (price_points < price + bin_size)
volume_profile[float(price)] = float(df.loc[mask, 'volume'].sum())
return volume_profile
def detect_smc_structure(df: pd.DataFrame) -> Dict[str, Any]:
# Placeholder for SMC structure detection. This function should be implemented or imported from technical_indicators_smc.py
return {'internal_structure': {}, 'swing_structure': {}, 'order_blocks': [], 'fair_value_gaps': [], 'equal_levels': {'highs': [], 'lows': []}}
def analyze_market_structure(df: pd.DataFrame, predictor=None, history_analyzer=None) -> Dict[str, Any]:
"""Enhanced market structure analysis with prediction and historical insights"""
# Get current market analysis
current_analysis = detect_smc_structure(df)
# Add future price prediction if predictor is available
if predictor:
prediction = predictor.predict(df)
current_analysis['future_prediction'] = prediction
# Add trading bias based on prediction
if prediction['confidence'] > 0.6:
if prediction['predicted_price'] > prediction['current_price']:
current_analysis['trading_bias'] = 'BULLISH'
else:
current_analysis['trading_bias'] = 'BEARISH'
else:
current_analysis['trading_bias'] = 'NEUTRAL'
# Add historical insights if analyzer is available
if history_analyzer and hasattr(history_analyzer, 'profitable_patterns'):
historical_insights = {
'profitable_patterns': history_analyzer.profitable_patterns,
'optimal_conditions': history_analyzer.market_conditions
}
current_analysis['historical_insights'] = historical_insights
# Check if current conditions match profitable historical patterns
current_conditions = {
'volatility': df['high'].rolling(20).std().iloc[-1],
'volume': df['volume'].iloc[-1] if 'volume' in df.columns else None,
'hour': pd.to_datetime(df['time'].iloc[-1]).hour
}
pattern_match_score = _calculate_pattern_match(
current_conditions,
historical_insights['optimal_conditions']
)
current_analysis['pattern_match_score'] = pattern_match_score
return current_analysis
def _calculate_pattern_match(current: Dict[str, Any], optimal: Dict[str, Any]) -> float:
"""Calculate how well current conditions match historical optimal conditions"""
scores = []
# Check volatility match
if 'volatility' in current and 'volatility_range' in optimal:
vol_range = optimal['volatility_range']
if vol_range['min'] <= current['volatility'] <= vol_range['max']:
scores.append(1.0)
else:
distance = min(abs(current['volatility'] - vol_range['min']),
abs(current['volatility'] - vol_range['max']))
scores.append(max(0, 1 - distance/vol_range['optimal']))
# Check volume match
if (current['volume'] is not None and 'volume_range' in optimal):
vol_range = optimal['volume_range']
if vol_range['min'] <= current['volume'] <= vol_range['max']:
scores.append(1.0)
else:
distance = min(abs(current['volume'] - vol_range['min']),
abs(current['volume'] - vol_range['max']))
scores.append(max(0, 1 - distance/vol_range['optimal']))
# Check trading hour match
if current['hour'] in optimal.get('best_hours', []):
scores.append(1.0)
else:
scores.append(0.0)
return sum(scores) / len(scores) if scores else 0.0
class MarketPredictor:
def __init__(self):
self.model = None # Placeholder untuk model machine learning
def train(self, df: pd.DataFrame):
"""Latih model prediksi dengan data historis"""
# Proses training model
pass
def predict(self, df: pd.DataFrame) -> Dict[str, float]:
"""Buat prediksi untuk data pasar saat ini"""
if self.model is None:
return {'buy_signal': 0.0, 'sell_signal': 0.0}
# Proses prediksi
features = self._prepare_features(df)
prediction = self.model.predict(features)
return {
'buy_signal': float(prediction[0]),
'sell_signal': float(prediction[1])
}
def _prepare_features(self, df: pd.DataFrame) -> np.ndarray:
"""Siapkan fitur untuk model prediksi"""
# Contoh: gunakan hanya harga penutupan untuk regresi linier sederhana
return df['close'].values.reshape(-1, 1)
class HistoryAnalyzer:
def __init__(self):
pass
def analyze_historical_trades(self, df: pd.DataFrame, past_trades: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Analisis trade historis untuk menemukan pola dan evaluasi performa"""
# Contoh analisis: hitung rasio profitabilitas
if not past_trades:
return {'profit_ratio': 0.0, 'loss_ratio': 0.0}
total_profit = sum(trade['profit'] for trade in past_trades)
total_loss = sum(trade['loss'] for trade in past_trades)
profit_ratio = total_profit / (total_profit + total_loss) if (total_profit + total_loss) != 0 else 0
return {
'profit_ratio': profit_ratio,
'loss_ratio': 1 - profit_ratio
}
predictor = MarketPredictor()
analyzer = HistoryAnalyzer()