markaz_arshy/analyze_market.py

692 lines
26 KiB
Python
Raw Permalink Normal View History

2025-08-12 14:36:24 +00:00
import pandas as pd
import numpy as np
from typing import Dict, Any, Tuple, List, Optional
from datetime import datetime
from enum import Enum
import talib as ta
class TradingProfile(Enum):
SCALPER = "scalper"
INTRADAY = "intraday"
SWING = "swing"
POSITION = "position"
class ProfileAnalyzer:
"""Kelas untuk menganalisis market sesuai profil trading"""
def __init__(self, profile: TradingProfile):
self.profile = profile
def analyze(self, df: pd.DataFrame) -> Dict[str, Any]:
"""Analisis berdasarkan profil trading spesifik"""
if self.profile == TradingProfile.SCALPER:
return self._analyze_scalper(df)
elif self.profile == TradingProfile.INTRADAY:
return self._analyze_intraday(df)
elif self.profile == TradingProfile.SWING:
return self._analyze_swing(df)
else: # POSITION
return self._analyze_position(df)
def _analyze_scalper(self, df: pd.DataFrame) -> Dict[str, Any]:
"""Analisis khusus untuk scalper"""
# Fokus pada timeframe pendek (1-5 menit)
result = {
'volatility': self._calculate_micro_volatility(df),
'momentum': self._calculate_short_momentum(df),
'spread_impact': self._analyze_spread_impact(df),
'micro_structure': self._analyze_micro_structure(df)
}
return result
def _analyze_intraday(self, df: pd.DataFrame) -> Dict[str, Any]:
"""Analisis khusus untuk intraday trader"""
# Fokus pada pergerakan dalam satu hari
result = {
'day_momentum': self._calculate_day_momentum(df),
'session_analysis': self._analyze_trading_session(df),
'key_levels': self._find_intraday_levels(df),
'volume_profile': self._analyze_volume_profile(df)
}
return result
def _analyze_swing(self, df: pd.DataFrame) -> Dict[str, Any]:
"""Analisis khusus untuk swing trader"""
# Fokus pada multiple timeframe analysis
result = {
'trend_strength': self._calculate_trend_strength(df),
'key_support_resistance': self._find_key_levels(df),
'market_structure': self._analyze_market_structure(df),
'momentum_divergence': self._find_divergences(df)
}
return result
def _analyze_position(self, df: pd.DataFrame) -> Dict[str, Any]:
"""Analisis khusus untuk position trader"""
# Fokus pada analisis fundamental dan trend jangka panjang
result = {
'major_trend': self._analyze_major_trend(df),
'structural_changes': self._detect_structural_changes(df),
'volatility_regime': self._analyze_volatility_regime(df),
'market_phase': self._determine_market_phase(df)
}
return result
# --- Helper Methods untuk Scalper ---
def _calculate_micro_volatility(self, df: pd.DataFrame) -> Dict[str, float]:
"""Hitung volatilitas mikro untuk scalping"""
if len(df) < 20:
return {'current': 0.0, 'average': 0.0}
df['micro_vol'] = (df['high'] - df['low']) / df['low'] * 100
current = float(df['micro_vol'].iloc[-1])
average = float(df['micro_vol'].rolling(20).mean().iloc[-1])
return {'current': current, 'average': average}
def _calculate_short_momentum(self, df: pd.DataFrame) -> Dict[str, Any]:
"""Analisis momentum jangka pendek"""
if len(df) < 14:
return {'rsi': 50.0, 'momentum': 0.0}
# Hitung RSI
delta = df['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
# Hitung momentum
momentum = df['close'].iloc[-1] - df['close'].iloc[-5]
return {
'rsi': float(rsi.iloc[-1]),
'momentum': float(momentum)
}
def _analyze_spread_impact(self, df: pd.DataFrame) -> Dict[str, float]:
"""Analisis dampak spread pada scalping"""
if 'spread' not in df.columns or len(df) < 20:
return {'spread_ratio': 1.0}
avg_range = (df['high'] - df['low']).rolling(20).mean()
spread_ratio = df['spread'] / avg_range
return {'spread_ratio': float(spread_ratio.iloc[-1])}
def _analyze_micro_structure(self, df: pd.DataFrame) -> Dict[str, Any]:
"""Analisis struktur mikro pasar"""
if len(df) < 20:
return {'pattern': 'unknown', 'strength': 0.0}
# Deteksi pola-pola price action
last_candles = df.tail(3)
pattern = self._detect_candlestick_pattern(last_candles)
strength = self._calculate_pattern_strength(last_candles)
return {'pattern': pattern, 'strength': strength}
# --- Helper Methods untuk Intraday ---
def _calculate_day_momentum(self, df: pd.DataFrame) -> Dict[str, float]:
"""Hitung momentum harian"""
if len(df) < 24: # Minimal 24 candle untuk data harian
return {'momentum': 0.0}
day_open = df['open'].iloc[0]
current_price = df['close'].iloc[-1]
momentum = (current_price - day_open) / day_open * 100
return {'momentum': float(momentum)}
def _analyze_trading_session(self, df: pd.DataFrame) -> Dict[str, str]:
"""Analisis sesi trading (Asia, London, New York)"""
current_hour = pd.Timestamp.now().hour
if 0 <= current_hour < 8:
session = "ASIAN"
elif 8 <= current_hour < 16:
session = "LONDON"
else:
session = "NEW_YORK"
return {'current_session': session}
def _find_intraday_levels(self, df: pd.DataFrame) -> Dict[str, List[float]]:
"""Temukan level-level penting intraday"""
if len(df) < 100:
return {'support': [], 'resistance': []}
# Identifikasi swing high/low
highs = self._find_swing_points(df['high'], 'high')
lows = self._find_swing_points(df['low'], 'low')
return {
'support': [float(x) for x in lows],
'resistance': [float(x) for x in highs]
}
# --- Helper Methods untuk Swing Trading ---
def _calculate_trend_strength(self, df: pd.DataFrame) -> Dict[str, float]:
"""Hitung kekuatan trend untuk swing trading"""
if len(df) < 50:
return {'strength': 0.0}
# Hitung ADX
tr1 = df['high'] - df['low']
tr2 = abs(df['high'] - df['close'].shift())
tr3 = abs(df['low'] - df['close'].shift())
tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
atr = tr.rolling(14).mean()
dx = atr.diff() / atr * 100
adx = abs(dx).rolling(14).mean()
return {'strength': float(adx.iloc[-1])}
def _find_key_levels(self, df: pd.DataFrame) -> Dict[str, List[float]]:
"""Identifikasi level kunci untuk swing trading"""
if len(df) < 100:
return {'major_levels': []}
# Temukan level-level yang sering ditest
price_rounds = np.round(df['close'] / 10) * 10
level_counts = price_rounds.value_counts()
major_levels = level_counts[level_counts > level_counts.mean()].index.tolist()
return {'major_levels': [float(x) for x in major_levels]}
# --- Helper Methods untuk Position Trading ---
def _analyze_major_trend(self, df: pd.DataFrame) -> Dict[str, str]:
"""Analisis trend utama untuk position trading"""
if len(df) < 200:
return {'trend': 'UNDEFINED'}
sma200 = df['close'].rolling(200).mean()
current_price = df['close'].iloc[-1]
if current_price > sma200.iloc[-1]:
trend = 'BULLISH'
else:
trend = 'BEARISH'
return {'trend': trend}
def _detect_structural_changes(self, df: pd.DataFrame) -> Dict[str, Any]:
"""Deteksi perubahan struktur pasar jangka panjang"""
if len(df) < 100:
return {'change_detected': False}
# Deteksi dengan moving average crossover
ma50 = df['close'].rolling(50).mean()
ma200 = df['close'].rolling(200).mean()
cross_over = (ma50.iloc[-2] <= ma200.iloc[-2]) and (ma50.iloc[-1] > ma200.iloc[-1])
cross_under = (ma50.iloc[-2] >= ma200.iloc[-2]) and (ma50.iloc[-1] < ma200.iloc[-1])
return {
'change_detected': cross_over or cross_under,
'type': 'BULLISH' if cross_over else 'BEARISH' if cross_under else 'NONE'
}
def _find_swing_points(self, series: pd.Series, point_type: str, window: int = 5) -> List[float]:
"""Helper untuk menemukan swing points"""
points = []
for i in range(window, len(series) - window):
if point_type == 'high':
if all(series[i] > series[i-window:i]) and all(series[i] > series[i+1:i+window+1]):
points.append(series[i])
else: # low
if all(series[i] < series[i-window:i]) and all(series[i] < series[i+1:i+window+1]):
points.append(series[i])
return points[-3:] if points else [] # Return 3 terakhir saja
def _detect_candlestick_pattern(self, candles: pd.DataFrame) -> str:
"""Helper untuk deteksi pola candlestick"""
if len(candles) < 3:
return "unknown"
# Contoh sederhana: deteksi bullish engulfing
prev_candle = candles.iloc[-2]
curr_candle = candles.iloc[-1]
if (prev_candle['close'] < prev_candle['open'] and # Previous bearish
curr_candle['close'] > curr_candle['open'] and # Current bullish
curr_candle['open'] < prev_candle['close'] and # Opens below prev close
curr_candle['close'] > prev_candle['open']): # Closes above prev open
return "bullish_engulfing"
return "no_pattern"
def _calculate_pattern_strength(self, candles: pd.DataFrame) -> float:
"""Helper untuk hitung kekuatan pola"""
if len(candles) < 3:
return 0.0
# Hitung berdasarkan ukuran relatif candle dan volume
curr_candle = candles.iloc[-1]
body_size = abs(curr_candle['close'] - curr_candle['open'])
avg_body = abs(candles['close'] - candles['open']).mean()
strength = body_size / avg_body if avg_body != 0 else 1.0
return min(strength, 1.0) # Normalize to max 1.0
def calculate_slope(df: pd.DataFrame, length: int = 14, method: str = 'atr', multiplier: float = 1.0) -> float:
"""
Menghitung slope untuk trendline berdasarkan beberapa metode.
Args:
df: DataFrame dengan data OHLCV
length: Periode lookback
method: Metode perhitungan ('atr', 'stdev', 'linreg')
multiplier: Pengali slope
Returns:
float: Nilai slope
"""
if method == 'atr':
# ATR calculation
high_low = df['high'] - df['low']
high_close = abs(df['high'] - df['close'].shift())
low_close = abs(df['low'] - df['close'].shift())
ranges = pd.concat([high_low, high_close, low_close], axis=1)
true_range = ranges.max(axis=1)
atr = true_range.rolling(length).mean()
return (atr.iloc[-1] / length) * multiplier
elif method == 'stdev':
return (df['close'].rolling(length).std().iloc[-1] / length) * multiplier
elif method == 'linreg':
# Linear regression slope
x = np.arange(len(df[-length:]))
y = df['close'].iloc[-length:].values
slope, _ = np.polyfit(x, y, 1)
return abs(slope) * multiplier
return 0.0
def detect_pivot_points(df: pd.DataFrame, length: int = 14) -> Dict[str, Any]:
"""
Mendeteksi pivot high dan pivot low.
Args:
df: DataFrame dengan data OHLCV
length: Periode lookback
Returns:
Dict dengan pivot high dan low
"""
highs = df['high'].rolling(2*length + 1, center=True).apply(
lambda x: 1 if x.iloc[length] == max(x) else 0
)
lows = df['low'].rolling(2*length + 1, center=True).apply(
lambda x: 1 if x.iloc[length] == min(x) else 0
)
pivot_high = df['high'].where(highs == 1)
pivot_low = df['low'].where(lows == 1)
return {
'pivot_high': pivot_high.dropna(),
'pivot_low': pivot_low.dropna()
}
def analyze_trendlines(df: pd.DataFrame, length: int = 14, slope_mult: float = 1.0,
calc_method: str = 'atr') -> Dict[str, Any]:
"""
Menganalisis trendline dan deteksi breakout.
Args:
df: DataFrame dengan data OHLCV
length: Periode untuk deteksi swing
slope_mult: Pengali untuk perhitungan slope
calc_method: Metode perhitungan slope ('atr', 'stdev', 'linreg')
Returns:
Dict dengan informasi trendline dan breakout
"""
# Deteksi pivot points
pivots = detect_pivot_points(df, length)
# Hitung slope
slope = calculate_slope(df, length, calc_method, slope_mult)
# Inisialisasi variabel
upper_trendline = None
lower_trendline = None
last_pivot_high = None
last_pivot_low = None
# Dapatkan pivot terakhir
if len(pivots['pivot_high']) > 0:
last_pivot_high = pivots['pivot_high'].iloc[-1]
if len(pivots['pivot_low']) > 0:
last_pivot_low = pivots['pivot_low'].iloc[-1]
# Hitung trendlines
current_price = df['close'].iloc[-1]
current_high = df['high'].iloc[-1]
current_low = df['low'].iloc[-1]
if last_pivot_high is not None:
bars_since_high = len(df) - pivots['pivot_high'].index[-1]
upper_trendline = last_pivot_high - (slope * bars_since_high)
if last_pivot_low is not None:
bars_since_low = len(df) - pivots['pivot_low'].index[-1]
lower_trendline = last_pivot_low + (slope * bars_since_low)
# Deteksi breakout
upward_break = False
downward_break = False
if upper_trendline is not None and current_high > upper_trendline:
upward_break = True
if lower_trendline is not None and current_low < lower_trendline:
downward_break = True
return {
'upper_trendline': upper_trendline,
'lower_trendline': lower_trendline,
'upward_break': upward_break,
'downward_break': downward_break,
'slope': slope,
'last_pivot_high': last_pivot_high,
'last_pivot_low': last_pivot_low
}
# --- Update existing get_market_context function ---
def get_market_context(df: pd.DataFrame, profile: Optional[str] = None) -> Dict[str, Any]:
"""Analisis lengkap konteks market dengan mempertimbangkan profil trading."""
# Base context tetap sama
context = {
'zones': detect_premium_discount_zones(df),
'structure': analyze_break_of_structure(df),
'equilibrium': detect_market_equilibrium(df)
}
# Tambahkan analisis trendline
trendline_analysis = analyze_trendlines(
df,
length=14, # Sesuaikan dengan kebutuhan
slope_mult=1.0,
calc_method='atr'
)
context['trendlines'] = trendline_analysis
# Tentukan bias market dengan mempertimbangkan trendline breaks
if trendline_analysis['upward_break']:
context['structure']['trendline_break'] = 'BULLISH'
elif trendline_analysis['downward_break']:
context['structure']['trendline_break'] = 'BEARISH'
else:
context['structure']['trendline_break'] = None
# Tambahkan analisis SMC zones dan profile-specific analysis
smc_zones = calculate_smc_zones(df)
zone_analysis = detect_zone_transition(df, smc_zones)
context['smc_analysis'] = {
'zones': smc_zones,
'current_state': zone_analysis
}
if profile:
try:
profile_enum = TradingProfile(profile.lower())
analyzer = ProfileAnalyzer(profile_enum)
profile_analysis = analyzer.analyze(df)
context['profile_analysis'] = profile_analysis
except ValueError:
context['profile_analysis'] = None
# Tentukan bias market berdasarkan semua analisis
if context['structure']['bos_detected']:
bias = context['structure']['bos_direction']
elif context['structure']['trendline_break']:
bias = context['structure']['trendline_break']
elif context['equilibrium']['equilibrium_detected']:
bias = 'NEUTRAL'
else:
current_price = df['close'].iloc[-1]
if current_price > context['zones']['premium']:
bias = 'BULLISH'
elif current_price < context['zones']['discount']:
bias = 'BEARISH'
else:
bias = 'NEUTRAL'
context['market_bias'] = bias
return context
def detect_premium_discount_zones(df: pd.DataFrame) -> Dict[str, float]:
"""Deteksi zona premium dan discount"""
if len(df) < 20:
return {'premium': df['high'].max(), 'discount': df['low'].min()}
# Gunakan Volume Profile untuk menentukan zona
volume_profile = analyze_volume_profile(df)
high_vol_zones = sorted([zone for zone, vol in volume_profile.items() if vol > volume_profile.mean()], reverse=True)
if len(high_vol_zones) >= 2:
premium = high_vol_zones[0]
discount = high_vol_zones[-1]
else:
# Fallback ke simple moving average
sma20 = df['close'].rolling(20).mean().iloc[-1]
premium = sma20 * 1.01 # 1% di atas SMA
discount = sma20 * 0.99 # 1% di bawah SMA
return {'premium': float(premium), 'discount': float(discount)}
def analyze_break_of_structure(df: pd.DataFrame) -> Dict[str, Any]:
"""Analisis Break of Structure (BOS)"""
if len(df) < 50:
return {'bos_detected': False, 'bos_direction': None, 'strength': 0.0}
# Temukan swing high/low terakhir
swing_highs = find_swing_points(df['high'], 'high', window=10)
swing_lows = find_swing_points(df['low'], 'low', window=10)
current_price = df['close'].iloc[-1]
# Cek break of structure
if swing_highs and current_price > max(swing_highs):
return {
'bos_detected': True,
'bos_direction': 'BULLISH',
'strength': (current_price - max(swing_highs)) / df['atr'].iloc[-1]
}
elif swing_lows and current_price < min(swing_lows):
return {
'bos_detected': True,
'bos_direction': 'BEARISH',
'strength': (min(swing_lows) - current_price) / df['atr'].iloc[-1]
}
return {'bos_detected': False, 'bos_direction': None, 'strength': 0.0}
def detect_market_equilibrium(df: pd.DataFrame) -> Dict[str, Any]:
"""Deteksi keseimbangan market (equilibrium)"""
if len(df) < 20:
return {'equilibrium_detected': False, 'confidence': 0.0}
# Hitung volatilitas relatif
atr = df['atr'].iloc[-1]
recent_range = df['high'].iloc[-5:].max() - df['low'].iloc[-5:].min()
# Hitung volume relatif
avg_volume = df['volume'].rolling(20).mean().iloc[-1]
current_volume = df['volume'].iloc[-1]
# Deteksi kondisi equilibrium
low_volatility = recent_range < atr * 0.5
declining_volume = current_volume < avg_volume * 0.7
equilibrium_detected = low_volatility and declining_volume
# Hitung tingkat keyakinan
if equilibrium_detected:
volatility_ratio = 1 - (recent_range / (atr * 0.5))
volume_ratio = 1 - (current_volume / (avg_volume * 0.7))
confidence = (volatility_ratio + volume_ratio) / 2
else:
confidence = 0.0
return {
'equilibrium_detected': equilibrium_detected,
'confidence': float(confidence)
}
def analyze_volume_profile(df: pd.DataFrame) -> Dict[float, float]:
"""Analisis Volume Profile untuk menentukan area interest"""
if len(df) < 100:
return {}
# Buat price bins
price_range = df['high'].max() - df['low'].min()
bin_size = price_range / 20 # 20 bins
# Hitung volume untuk setiap price level
price_points = (df['high'] + df['low']) / 2 # Point of Control
volume_profile = {}
for price in np.arange(df['low'].min(), df['high'].max(), bin_size):
mask = (price_points >= price) & (price_points < price + bin_size)
volume_profile[float(price)] = float(df.loc[mask, 'volume'].sum())
return volume_profile
def detect_smc_structure(df: pd.DataFrame) -> Dict[str, Any]:
# Placeholder for SMC structure detection. This function should be implemented or imported from technical_indicators_smc.py
return {'internal_structure': {}, 'swing_structure': {}, 'order_blocks': [], 'fair_value_gaps': [], 'equal_levels': {'highs': [], 'lows': []}}
def analyze_market_structure(df: pd.DataFrame, predictor=None, history_analyzer=None) -> Dict[str, Any]:
"""Enhanced market structure analysis with prediction and historical insights"""
# Get current market analysis
current_analysis = detect_smc_structure(df)
# Add future price prediction if predictor is available
if predictor:
prediction = predictor.predict(df)
current_analysis['future_prediction'] = prediction
# Add trading bias based on prediction
if prediction['confidence'] > 0.6:
if prediction['predicted_price'] > prediction['current_price']:
current_analysis['trading_bias'] = 'BULLISH'
else:
current_analysis['trading_bias'] = 'BEARISH'
else:
current_analysis['trading_bias'] = 'NEUTRAL'
# Add historical insights if analyzer is available
if history_analyzer and hasattr(history_analyzer, 'profitable_patterns'):
historical_insights = {
'profitable_patterns': history_analyzer.profitable_patterns,
'optimal_conditions': history_analyzer.market_conditions
}
current_analysis['historical_insights'] = historical_insights
# Check if current conditions match profitable historical patterns
current_conditions = {
'volatility': df['high'].rolling(20).std().iloc[-1],
'volume': df['volume'].iloc[-1] if 'volume' in df.columns else None,
'hour': pd.to_datetime(df['time'].iloc[-1]).hour
}
pattern_match_score = _calculate_pattern_match(
current_conditions,
historical_insights['optimal_conditions']
)
current_analysis['pattern_match_score'] = pattern_match_score
return current_analysis
def _calculate_pattern_match(current: Dict[str, Any], optimal: Dict[str, Any]) -> float:
"""Calculate how well current conditions match historical optimal conditions"""
scores = []
# Check volatility match
if 'volatility' in current and 'volatility_range' in optimal:
vol_range = optimal['volatility_range']
if vol_range['min'] <= current['volatility'] <= vol_range['max']:
scores.append(1.0)
else:
distance = min(abs(current['volatility'] - vol_range['min']),
abs(current['volatility'] - vol_range['max']))
scores.append(max(0, 1 - distance/vol_range['optimal']))
# Check volume match
if (current['volume'] is not None and 'volume_range' in optimal):
vol_range = optimal['volume_range']
if vol_range['min'] <= current['volume'] <= vol_range['max']:
scores.append(1.0)
else:
distance = min(abs(current['volume'] - vol_range['min']),
abs(current['volume'] - vol_range['max']))
scores.append(max(0, 1 - distance/vol_range['optimal']))
# Check trading hour match
if current['hour'] in optimal.get('best_hours', []):
scores.append(1.0)
else:
scores.append(0.0)
return sum(scores) / len(scores) if scores else 0.0
class MarketPredictor:
def __init__(self):
self.model = None # Placeholder untuk model machine learning
def train(self, df: pd.DataFrame):
"""Latih model prediksi dengan data historis"""
# Proses training model
pass
def predict(self, df: pd.DataFrame) -> Dict[str, float]:
"""Buat prediksi untuk data pasar saat ini"""
if self.model is None:
return {'buy_signal': 0.0, 'sell_signal': 0.0}
# Proses prediksi
features = self._prepare_features(df)
prediction = self.model.predict(features)
return {
'buy_signal': float(prediction[0]),
'sell_signal': float(prediction[1])
}
def _prepare_features(self, df: pd.DataFrame) -> np.ndarray:
"""Siapkan fitur untuk model prediksi"""
# Contoh: gunakan hanya harga penutupan untuk regresi linier sederhana
return df['close'].values.reshape(-1, 1)
class HistoryAnalyzer:
def __init__(self):
pass
def analyze_historical_trades(self, df: pd.DataFrame, past_trades: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Analisis trade historis untuk menemukan pola dan evaluasi performa"""
# Contoh analisis: hitung rasio profitabilitas
if not past_trades:
return {'profit_ratio': 0.0, 'loss_ratio': 0.0}
total_profit = sum(trade['profit'] for trade in past_trades)
total_loss = sum(trade['loss'] for trade in past_trades)
profit_ratio = total_profit / (total_profit + total_loss) if (total_profit + total_loss) != 0 else 0
return {
'profit_ratio': profit_ratio,
'loss_ratio': 1 - profit_ratio
}
predictor = MarketPredictor()
analyzer = HistoryAnalyzer()