markaz_arshy/utility_functions.py
2025-08-12 14:36:24 +00:00

170 lines
No EOL
5.9 KiB
Python

# === UTILITY FUNCTIONS FOR ANALYZE_MARKET ===
import pandas as pd
import numpy as np
from typing import Dict, Any, List, Optional
def calculate_atr(df: pd.DataFrame, period: int = 14) -> pd.Series:
"""Hitung Average True Range (ATR)"""
high_low = df['high'] - df['low']
high_close = abs(df['high'] - df['close'].shift())
low_close = abs(df['low'] - df['close'].shift())
ranges = pd.concat([high_low, high_close, low_close], axis=1)
true_range = ranges.max(axis=1)
return true_range.rolling(period).mean()
def find_swing_points(series: pd.Series, point_type: str, window: int = 5) -> List[float]:
"""Helper untuk menemukan swing points"""
points = []
for i in range(window, len(series) - window):
if point_type == 'high':
if all(series.iloc[i] > series.iloc[i-window:i]) and all(series.iloc[i] > series.iloc[i+1:i+window+1]):
points.append(series.iloc[i])
else: # low
if all(series.iloc[i] < series.iloc[i-window:i]) and all(series.iloc[i] < series.iloc[i+1:i+window+1]):
points.append(series.iloc[i])
return points[-3:] if points else [] # Return 3 terakhir saja
def prepare_dataframe(df: pd.DataFrame) -> pd.DataFrame:
"""Persiapkan dataframe dengan indikator yang diperlukan"""
df = df.copy()
# Tambahkan ATR jika belum ada
if 'atr' not in df.columns:
df['atr'] = calculate_atr(df)
# Tambahkan volume jika belum ada (untuk testing)
if 'volume' not in df.columns:
df['volume'] = 1000 # Default volume
return df
def calculate_technical_indicators(df: pd.DataFrame) -> Dict[str, Any]:
"""Hitung indikator teknikal utama"""
indicators = {}
# RSI
if len(df) >= 14:
delta = df['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
indicators['rsi'] = float(rsi.iloc[-1])
else:
indicators['rsi'] = 50.0
# MACD
if len(df) >= 26:
ema12 = df['close'].ewm(span=12).mean()
ema26 = df['close'].ewm(span=26).mean()
macd = ema12 - ema26
signal = macd.ewm(span=9).mean()
histogram = macd - signal
indicators['macd'] = {
'macd': float(macd.iloc[-1]),
'signal': float(signal.iloc[-1]),
'histogram': float(histogram.iloc[-1])
}
else:
indicators['macd'] = {'macd': 0.0, 'signal': 0.0, 'histogram': 0.0}
# Bollinger Bands
if len(df) >= 20:
sma20 = df['close'].rolling(20).mean()
std20 = df['close'].rolling(20).std()
upper_band = sma20 + (std20 * 2)
lower_band = sma20 - (std20 * 2)
indicators['bollinger_bands'] = {
'upper': float(upper_band.iloc[-1]),
'middle': float(sma20.iloc[-1]),
'lower': float(lower_band.iloc[-1]),
'position': 'upper' if df['close'].iloc[-1] > upper_band.iloc[-1] else
'lower' if df['close'].iloc[-1] < lower_band.iloc[-1] else 'middle'
}
else:
current_price = df['close'].iloc[-1]
indicators['bollinger_bands'] = {
'upper': current_price * 1.02,
'middle': current_price,
'lower': current_price * 0.98,
'position': 'middle'
}
return indicators
def analyze_market_sentiment(df: pd.DataFrame) -> Dict[str, Any]:
"""Analisis sentimen market"""
sentiment = {}
# Volume trend
if 'volume' in df.columns and len(df) >= 10:
recent_volume = df['volume'].iloc[-5:].mean()
avg_volume = df['volume'].iloc[-20:].mean() if len(df) >= 20 else df['volume'].mean()
if recent_volume > avg_volume * 1.2:
volume_sentiment = 'high'
elif recent_volume < avg_volume * 0.8:
volume_sentiment = 'low'
else:
volume_sentiment = 'normal'
sentiment['volume_sentiment'] = volume_sentiment
else:
sentiment['volume_sentiment'] = 'normal'
# Price momentum
if len(df) >= 5:
price_change = (df['close'].iloc[-1] - df['close'].iloc[-5]) / df['close'].iloc[-5] * 100
if price_change > 2:
price_sentiment = 'bullish'
elif price_change < -2:
price_sentiment = 'bearish'
else:
price_sentiment = 'neutral'
sentiment['price_sentiment'] = price_sentiment
sentiment['price_change_5d'] = float(price_change)
else:
sentiment['price_sentiment'] = 'neutral'
sentiment['price_change_5d'] = 0.0
return sentiment
def calculate_risk_metrics(df: pd.DataFrame) -> Dict[str, Any]:
"""Hitung metrik risiko"""
risk_metrics = {}
# Volatilitas
if len(df) >= 20:
returns = np.log(df['close'] / df['close'].shift(1))
volatility = returns.rolling(20).std() * np.sqrt(252) # Annualized
risk_metrics['volatility'] = float(volatility.iloc[-1])
else:
risk_metrics['volatility'] = 0.2 # Default 20%
# Maximum Drawdown
if len(df) >= 10:
cumulative = (1 + np.log(df['close'] / df['close'].shift(1)).fillna(0)).cumprod()
running_max = cumulative.expanding().max()
drawdown = (cumulative - running_max) / running_max
max_drawdown = drawdown.min()
risk_metrics['max_drawdown'] = float(max_drawdown)
else:
risk_metrics['max_drawdown'] = 0.0
# Sharpe Ratio (simplified)
if len(df) >= 30:
returns = np.log(df['close'] / df['close'].shift(1)).dropna()
if returns.std() != 0:
sharpe = returns.mean() / returns.std() * np.sqrt(252)
risk_metrics['sharpe_ratio'] = float(sharpe)
else:
risk_metrics['sharpe_ratio'] = 0.0
else:
risk_metrics['sharpe_ratio'] = 0.0
return risk_metrics