170 lines
No EOL
5.9 KiB
Python
170 lines
No EOL
5.9 KiB
Python
# === UTILITY FUNCTIONS FOR ANALYZE_MARKET ===
|
|
|
|
import pandas as pd
|
|
import numpy as np
|
|
from typing import Dict, Any, List, Optional
|
|
|
|
def calculate_atr(df: pd.DataFrame, period: int = 14) -> pd.Series:
|
|
"""Hitung Average True Range (ATR)"""
|
|
high_low = df['high'] - df['low']
|
|
high_close = abs(df['high'] - df['close'].shift())
|
|
low_close = abs(df['low'] - df['close'].shift())
|
|
ranges = pd.concat([high_low, high_close, low_close], axis=1)
|
|
true_range = ranges.max(axis=1)
|
|
return true_range.rolling(period).mean()
|
|
|
|
def find_swing_points(series: pd.Series, point_type: str, window: int = 5) -> List[float]:
|
|
"""Helper untuk menemukan swing points"""
|
|
points = []
|
|
for i in range(window, len(series) - window):
|
|
if point_type == 'high':
|
|
if all(series.iloc[i] > series.iloc[i-window:i]) and all(series.iloc[i] > series.iloc[i+1:i+window+1]):
|
|
points.append(series.iloc[i])
|
|
else: # low
|
|
if all(series.iloc[i] < series.iloc[i-window:i]) and all(series.iloc[i] < series.iloc[i+1:i+window+1]):
|
|
points.append(series.iloc[i])
|
|
return points[-3:] if points else [] # Return 3 terakhir saja
|
|
|
|
def prepare_dataframe(df: pd.DataFrame) -> pd.DataFrame:
|
|
"""Persiapkan dataframe dengan indikator yang diperlukan"""
|
|
df = df.copy()
|
|
|
|
# Tambahkan ATR jika belum ada
|
|
if 'atr' not in df.columns:
|
|
df['atr'] = calculate_atr(df)
|
|
|
|
# Tambahkan volume jika belum ada (untuk testing)
|
|
if 'volume' not in df.columns:
|
|
df['volume'] = 1000 # Default volume
|
|
|
|
return df
|
|
|
|
def calculate_technical_indicators(df: pd.DataFrame) -> Dict[str, Any]:
|
|
"""Hitung indikator teknikal utama"""
|
|
indicators = {}
|
|
|
|
# RSI
|
|
if len(df) >= 14:
|
|
delta = df['close'].diff()
|
|
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
|
|
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
|
|
rs = gain / loss
|
|
rsi = 100 - (100 / (1 + rs))
|
|
indicators['rsi'] = float(rsi.iloc[-1])
|
|
else:
|
|
indicators['rsi'] = 50.0
|
|
|
|
# MACD
|
|
if len(df) >= 26:
|
|
ema12 = df['close'].ewm(span=12).mean()
|
|
ema26 = df['close'].ewm(span=26).mean()
|
|
macd = ema12 - ema26
|
|
signal = macd.ewm(span=9).mean()
|
|
histogram = macd - signal
|
|
|
|
indicators['macd'] = {
|
|
'macd': float(macd.iloc[-1]),
|
|
'signal': float(signal.iloc[-1]),
|
|
'histogram': float(histogram.iloc[-1])
|
|
}
|
|
else:
|
|
indicators['macd'] = {'macd': 0.0, 'signal': 0.0, 'histogram': 0.0}
|
|
|
|
# Bollinger Bands
|
|
if len(df) >= 20:
|
|
sma20 = df['close'].rolling(20).mean()
|
|
std20 = df['close'].rolling(20).std()
|
|
upper_band = sma20 + (std20 * 2)
|
|
lower_band = sma20 - (std20 * 2)
|
|
|
|
indicators['bollinger_bands'] = {
|
|
'upper': float(upper_band.iloc[-1]),
|
|
'middle': float(sma20.iloc[-1]),
|
|
'lower': float(lower_band.iloc[-1]),
|
|
'position': 'upper' if df['close'].iloc[-1] > upper_band.iloc[-1] else
|
|
'lower' if df['close'].iloc[-1] < lower_band.iloc[-1] else 'middle'
|
|
}
|
|
else:
|
|
current_price = df['close'].iloc[-1]
|
|
indicators['bollinger_bands'] = {
|
|
'upper': current_price * 1.02,
|
|
'middle': current_price,
|
|
'lower': current_price * 0.98,
|
|
'position': 'middle'
|
|
}
|
|
|
|
return indicators
|
|
|
|
def analyze_market_sentiment(df: pd.DataFrame) -> Dict[str, Any]:
|
|
"""Analisis sentimen market"""
|
|
sentiment = {}
|
|
|
|
# Volume trend
|
|
if 'volume' in df.columns and len(df) >= 10:
|
|
recent_volume = df['volume'].iloc[-5:].mean()
|
|
avg_volume = df['volume'].iloc[-20:].mean() if len(df) >= 20 else df['volume'].mean()
|
|
|
|
if recent_volume > avg_volume * 1.2:
|
|
volume_sentiment = 'high'
|
|
elif recent_volume < avg_volume * 0.8:
|
|
volume_sentiment = 'low'
|
|
else:
|
|
volume_sentiment = 'normal'
|
|
|
|
sentiment['volume_sentiment'] = volume_sentiment
|
|
else:
|
|
sentiment['volume_sentiment'] = 'normal'
|
|
|
|
# Price momentum
|
|
if len(df) >= 5:
|
|
price_change = (df['close'].iloc[-1] - df['close'].iloc[-5]) / df['close'].iloc[-5] * 100
|
|
|
|
if price_change > 2:
|
|
price_sentiment = 'bullish'
|
|
elif price_change < -2:
|
|
price_sentiment = 'bearish'
|
|
else:
|
|
price_sentiment = 'neutral'
|
|
|
|
sentiment['price_sentiment'] = price_sentiment
|
|
sentiment['price_change_5d'] = float(price_change)
|
|
else:
|
|
sentiment['price_sentiment'] = 'neutral'
|
|
sentiment['price_change_5d'] = 0.0
|
|
|
|
return sentiment
|
|
|
|
def calculate_risk_metrics(df: pd.DataFrame) -> Dict[str, Any]:
|
|
"""Hitung metrik risiko"""
|
|
risk_metrics = {}
|
|
|
|
# Volatilitas
|
|
if len(df) >= 20:
|
|
returns = np.log(df['close'] / df['close'].shift(1))
|
|
volatility = returns.rolling(20).std() * np.sqrt(252) # Annualized
|
|
risk_metrics['volatility'] = float(volatility.iloc[-1])
|
|
else:
|
|
risk_metrics['volatility'] = 0.2 # Default 20%
|
|
|
|
# Maximum Drawdown
|
|
if len(df) >= 10:
|
|
cumulative = (1 + np.log(df['close'] / df['close'].shift(1)).fillna(0)).cumprod()
|
|
running_max = cumulative.expanding().max()
|
|
drawdown = (cumulative - running_max) / running_max
|
|
max_drawdown = drawdown.min()
|
|
risk_metrics['max_drawdown'] = float(max_drawdown)
|
|
else:
|
|
risk_metrics['max_drawdown'] = 0.0
|
|
|
|
# Sharpe Ratio (simplified)
|
|
if len(df) >= 30:
|
|
returns = np.log(df['close'] / df['close'].shift(1)).dropna()
|
|
if returns.std() != 0:
|
|
sharpe = returns.mean() / returns.std() * np.sqrt(252)
|
|
risk_metrics['sharpe_ratio'] = float(sharpe)
|
|
else:
|
|
risk_metrics['sharpe_ratio'] = 0.0
|
|
else:
|
|
risk_metrics['sharpe_ratio'] = 0.0
|
|
|
|
return risk_metrics |