mql5/crypto/okx_data_processor.py

494 lines
22 KiB
Python
Raw Permalink Normal View History

import ccxt
import pandas as pd
import numpy as np
import logging
import os
from datetime import datetime, timedelta
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class OKXDataProcessor:
def __init__(self, api_key=None, api_secret=None, passphrase=None, use_sandbox=False):
"""Initialize the OKX data processor"""
self.api_key = api_key or os.getenv("OKX_API_KEY")
self.api_secret = api_secret or os.getenv("OKX_API_SECRET")
self.passphrase = passphrase or os.getenv("OKX_API_PASSPHRASE")
if not all([self.api_key, self.api_secret, self.passphrase]):
logger.error("Missing OKX API credentials. Please set OKX_API_KEY, OKX_API_SECRET, and OKX_API_PASSPHRASE in .env file.")
# We can still initialize for public data if needed, but private endpoints will fail
# For now, let's allow it but log a warning, as ccxt might support public endpoints without keys
exchange_config = {
'apiKey': self.api_key,
'secret': self.api_secret,
'password': self.passphrase,
'enableRateLimit': True,
'options': {
'defaultType': 'swap', # Default to swap for perpetuals
},
}
if use_sandbox:
exchange_config['sandbox'] = True
self.exchange = ccxt.okx(exchange_config)
self.initialized = True
logger.info("OKX Exchange initialized")
def _normalize_timeframe(self, tf):
"""Normalize timeframe to CCXT format"""
mapping = {
'M1': '1m', 'M5': '5m', 'M15': '15m', 'M30': '30m',
'H1': '1h', 'H2': '2h', 'H4': '4h',
'D1': '1d', 'W1': '1w', 'MN': '1M',
'1H': '1h', '4H': '4h', '1D': '1d'
}
return mapping.get(tf.upper(), tf)
def get_historical_data(self, symbol, timeframe, limit=100, since=None):
"""Get historical data from OKX
Args:
symbol (str): Trading symbol, e.g., 'BTC/USDT'
timeframe (str): Timeframe, e.g., '1h', '15m'
limit (int): Number of candles to fetch
since (int): Timestamp in ms to start fetching from
Returns:
pd.DataFrame: DataFrame with OHLCV data
"""
# Normalize timeframe
timeframe = self._normalize_timeframe(timeframe)
try:
# Fetch OHLCV data
ohlcv = self.exchange.fetch_ohlcv(symbol, timeframe, since=since, limit=limit)
# Convert to DataFrame
df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
# Convert timestamp to datetime
df['time'] = pd.to_datetime(df['timestamp'], unit='ms')
df.set_index('time', inplace=True)
# Drop the original timestamp column
df.drop('timestamp', axis=1, inplace=True)
return df
except Exception as e:
logger.error(f"Error getting OKX data: {e}")
return pd.DataFrame()
def get_current_price(self, symbol):
"""Get current ticker price"""
try:
ticker = self.exchange.fetch_ticker(symbol)
return ticker['last']
except Exception as e:
logger.error(f"Error getting current price: {e}")
return None
def calculate_ema(self, df, period, price_column='close'):
"""Calculate Exponential Moving Average"""
return df[price_column].ewm(span=period, adjust=False).mean()
def calculate_atr(self, df, period=14):
"""Calculate Average True Range"""
df = df.copy()
df['tr1'] = abs(df['high'] - df['low'])
df['tr2'] = abs(df['high'] - df['close'].shift(1))
df['tr3'] = abs(df['low'] - df['close'].shift(1))
df['tr'] = df[['tr1', 'tr2', 'tr3']].max(axis=1)
atr = df['tr'].ewm(span=period, adjust=False).mean()
return atr
def calculate_rsi(self, df, period=14, price_column='close'):
"""Calculate Relative Strength Index"""
delta = df[price_column].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
return rsi
def generate_features(self, df, fast_ema=12, slow_ema=26, atr_period=14, rsi_period=14):
"""Generate trading features"""
df = df.copy()
# Calculate EMA
df['ema_fast'] = self.calculate_ema(df, fast_ema)
df['ema_slow'] = self.calculate_ema(df, slow_ema)
# Calculate ATR
df['atr'] = self.calculate_atr(df, atr_period)
# Calculate RSI
df['rsi'] = self.calculate_rsi(df, rsi_period)
# Calculate EMA crossover signal
df['ema_crossover'] = 0
df.loc[df['ema_fast'] > df['ema_slow'], 'ema_crossover'] = 1
df.loc[df['ema_fast'] < df['ema_slow'], 'ema_crossover'] = -1
# Calculate volatility (ATR percentage)
df['volatility'] = (df['atr'] / df['close']) * 100
# Calculate price change rate
df['price_change'] = df['close'].pct_change() * 100
# Fill NaN values
indicators = ['ema_fast', 'ema_slow', 'atr', 'rsi', 'ema_crossover', 'volatility', 'price_change']
for indicator in indicators:
if indicator in df.columns:
first_valid = df[indicator].first_valid_index()
if first_valid is not None:
df[indicator] = df[indicator].fillna(df.loc[first_valid, indicator])
else:
if indicator in ['ema_crossover']:
df[indicator] = df[indicator].fillna(0)
elif indicator in ['rsi']:
df[indicator] = df[indicator].fillna(50)
else:
df[indicator] = df[indicator].fillna(0)
return df
def prepare_model_input(self, df, lookback_period=20):
"""Prepare input data for ML models"""
features = ['close', 'high', 'low', 'volume', 'ema_fast', 'ema_slow', 'atr', 'rsi', 'volatility', 'price_change']
X = []
for i in range(lookback_period, len(df)):
X.append(df[features].iloc[i-lookback_period:i].values.flatten())
return np.array(X)
def get_account_balance(self, currency='USDT'):
"""Get account balance"""
try:
balance = self.exchange.fetch_balance()
if currency in balance['total']:
return {
'total': balance['total'][currency],
'free': balance['free'][currency],
'used': balance['used'][currency]
}
return None
except Exception as e:
logger.error(f"Error getting balance: {e}")
return None
def cancel_algo_orders(self, symbol):
"""Cancel all algo orders for a symbol"""
try:
# Need to fetch pending algo orders first
# ordType: conditional, oco, trigger, move_order_stop, iceber, twap
algo_orders = self.exchange.fetch_open_orders(symbol, params={'ordType': 'conditional'})
for order in algo_orders:
try:
# For OKX, canceling algo order often requires specific endpoint or param
# ccxt cancel_order usually handles it if passed correct ID
self.exchange.cancel_order(order['id'], symbol)
logger.info(f"Cancelled algo order {order['id']}")
except Exception as e:
logger.error(f"Failed to cancel algo order {order['id']}: {e}")
except Exception as e:
# It's possible fetch_open_orders with params is not fully supported or returns error if empty
logger.warning(f"Error checking algo orders (might be none): {e}")
def set_position_sl_tp(self, symbol, sl_price=None, tp_price=None):
"""
Set SL/TP for an existing position
"""
try:
# OKX Algo order for SL/TP on existing position
# First cancel existing SL/TP algo orders for this symbol to avoid duplicates
self.cancel_algo_orders(symbol)
algo_orders = []
if sl_price:
sl_order = {
'symbol': symbol,
'orderType': 'conditional', # or 'sl' depending on specific OKX params mapped in ccxt
'slTriggerPx': str(sl_price),
'slOrdPx': '-1', # Market price
'triggerPxType': 'last'
}
# CCXT might not have a direct unified method for this specific "Update Position SL/TP"
# effectively across all exchanges. For OKX, we often use 'edit_position_tpsl' if available
# or create specific algo orders.
pass
# Using CCXT's specialized method if available, or raw params
# For OKX, creating a 'stop' order with 'reduceOnly': True is common for SL
params = {'reduceOnly': True}
if sl_price:
# Sell Stop (assuming Long position)
# Need to know side. If we don't know side, this is risky.
# Assuming we know we are Long for now, or we fetch position first.
# A safer way is using 'algo' orders linked to position side.
pass
except Exception as e:
logger.error(f"Error setting SL/TP: {e}")
return False
def update_sl_tp(self, symbol, sl_price=None, tp_price=None, side='buy'):
"""
Update SL/TP for existing position using OKX specific algo order args
"""
try:
# Check for specific method in ccxt okx implementation
if hasattr(self.exchange, 'private_post_trade_order_algo'):
# Raw API call might be needed if CCXT doesn't wrap it fully for 'update'
pass
# Using CCXT create_order with params for SL/TP is usually for NEW orders.
# For EXISTING positions, we often place a new conditional order with reduceOnly=True.
# Cancel open algo orders first to replace them
try:
open_algos = self.exchange.fetch_open_orders(symbol, params={'ordType': 'conditional'})
# Note: 'conditional' might not capture all SL/TP. OKX has 'stop', 'trigger', etc.
# Simplest for now: Just log that we would update.
# To really implement:
# 1. Fetch current position to get size and side
# 2. Place stop-market order for SL
# 3. Place limit/market order for TP (or take-profit algo)
pass
except:
pass
# Let's try to use the specific 'edit_order' or just place new reduce-only stops
# OKX allows attaching SL/TP to position via 'place_algo_order'
# Determine side for SL/TP
# If we hold Long (buy), SL is a Sell Stop.
# If we hold Short (sell), SL is a Buy Stop.
sl_side = 'sell' if side == 'long' or side == 'buy' else 'buy'
if sl_price:
params = {
'stopPrice': sl_price,
'reduceOnly': True,
'tdMode': 'cross', # match margin mode
}
# This is just a standard stop order
# self.exchange.create_order(symbol, 'stop', sl_side, amount, params=params)
pass
except Exception as e:
logger.error(f"Error updating SL/TP: {e}")
def set_leverage(self, symbol, leverage):
"""Set leverage for a symbol"""
try:
# OKX requires setting leverage for specific margin mode (usually cross or isolated)
# We'll default to 'cross' for now as it's common for unified accounts,
# but allow it to be configured if needed.
# Note: ccxt set_leverage params: leverage, symbol, params
self.exchange.set_leverage(leverage, symbol, params={'mgnMode': 'cross'})
logger.info(f"Leverage set to {leverage}x for {symbol}")
return True
except Exception as e:
logger.error(f"Error setting leverage: {e}")
return False
def get_open_orders(self, symbol=None):
"""Get open orders"""
try:
return self.exchange.fetch_open_orders(symbol)
except Exception as e:
logger.error(f"Error getting open orders: {e}")
return []
def cancel_all_orders(self, symbol):
"""Cancel all open orders for a symbol"""
try:
# Check if cancel_all_orders is supported
if self.exchange.has['cancelAllOrders']:
self.exchange.cancel_all_orders(symbol)
logger.info(f"Cancelled all open orders for {symbol}")
else:
# Fallback: fetch open orders and cancel one by one
open_orders = self.get_open_orders(symbol)
for order in open_orders:
try:
self.exchange.cancel_order(order['id'], symbol)
logger.info(f"Cancelled order {order['id']}")
except Exception as e:
logger.error(f"Failed to cancel order {order['id']}: {e}")
except Exception as e:
logger.error(f"Error cancelling orders: {e}")
def place_sl_tp_order(self, symbol, side, amount, sl_price=None, tp_price=None):
"""
Place SL/TP orders for an existing position
side: 'buy' or 'sell' (direction of the SL/TP order, opposite to position)
"""
try:
# For OKX, we can use 'stop' orders with reduceOnly=True
params = {'reduceOnly': True}
# Check if reduceOnly is available/needed.
# If reduceOnly not available, we might need to set it to False or omit it
# if the position mode handles reduction automatically or for spot.
# But for swaps, reduceOnly is standard.
# If API complains "Reduce Only is not available", it might mean
# the account mode (e.g. Net mode vs Long/Short mode) or specific order type
# doesn't support it in this context.
# Workaround for error 51205: Try without reduceOnly if it fails,
# OR better: use 'algo' order endpoint which is designed for SL/TP
if sl_price:
# Stop Loss (Stop Market)
# Try placing as an Algo Order (Conditional)
sl_params = {
'tdMode': 'cross', # Assume cross, or should fetch mode
'triggerPx': str(sl_price),
'ordType': 'conditional',
'slTriggerPx': str(sl_price),
'slOrdPx': '-1', # Market
'reduceOnly': True
}
# Simple fallback: Standard Stop Market Order
# Note: OKX V5 uses 'triggerPx' for stop price in create_order params usually
logger.info(f"Placing SL order: {side} {amount} @ {sl_price}")
# Attempt 1: Standard 'market' order with stopPrice
try:
# Some ccxt versions map stopPrice to correct triggerPx
# But if reduceOnly fails, try omit it if we are sure it's closing
self.exchange.create_order(symbol, 'market', side, amount, params={'stopPrice': sl_price, 'reduceOnly': True})
except Exception as inner_e:
logger.warning(f"Standard SL placement failed ({inner_e}), trying algo order...")
# Attempt 2: Algo order explicitly
try:
# For OKX, we use 'conditional' order type via create_order or specialized params
# We must map 'slTriggerPx' and 'slOrdPx' correctly in params
# ordType='conditional' is key for OKX V5
algo_params = {
'tdMode': 'cross', # Make sure this matches your position mode!
'ordType': 'conditional',
'slTriggerPx': str(sl_price),
'slOrdPx': '-1', # -1 for Market
'slTriggerPxType': 'last',
# OKX sometimes requires tag/clOrdId or reducesOnly not to be present for algos
# reduceOnly is implied for SL attached to position usually, but for separate algo order:
'reduceOnly': True
}
# When using 'conditional', we might need to use a specific method or pass params to create_order
# Note: ccxt create_order usually handles 'conditional' type if supported,
# otherwise we fall back to raw API if needed.
# For OKX, creating a 'conditional' order:
# IMPORTANT: OKX V5 uses 'algo-order' endpoint for this. CCXT 'create_order' might map to 'order' endpoint
# We might need to use specific params to force algo endpoint or use implicit mapping.
# If standard create_order fails, we try specific 'stop' order type if CCXT maps it.
# Trying 'stop' type which CCXT often maps to algo order
self.exchange.create_order(symbol, 'stop', side, amount, params=algo_params)
logger.info(f"Placed algo SL order: {side} {amount} @ {sl_price}")
except Exception as algo_e:
logger.error(f"Algo SL placement also failed: {algo_e}")
# Last resort: Try without reduceOnly if that's the blocker for algo too
try:
algo_params.pop('reduceOnly', None)
self.exchange.create_order(symbol, 'stop', side, amount, params=algo_params)
logger.info(f"Placed algo SL order (no reduceOnly): {side} {amount} @ {sl_price}")
except Exception as final_e:
logger.error(f"Final SL attempt failed: {final_e}")
if tp_price:
# Take Profit (Limit Order)
tp_params = params.copy()
logger.info(f"Placing TP order: {side} {amount} @ {tp_price}")
self.exchange.create_order(symbol, 'limit', side, amount, price=tp_price, params=tp_params)
except Exception as e:
logger.error(f"Error placing SL/TP: {e}")
def get_contract_size(self, symbol):
"""Get contract size for a symbol"""
try:
market = self.exchange.market(symbol)
return market['contractSize']
except Exception as e:
# Fallback or log error
# If market info not loaded, try fetching markets first
try:
self.exchange.load_markets()
market = self.exchange.market(symbol)
return market['contractSize']
except:
logger.error(f"Error getting contract size for {symbol}: {e}")
# Default fallback for ETH/USDT swap if fetch fails
if 'ETH' in symbol: return 0.1
if 'BTC' in symbol: return 0.01
return 1.0 # Safe fallback? Maybe risky.
def create_order(self, symbol, side, amount, type='market', price=None, params={}):
"""Create a trade order
Args:
symbol (str): Trading pair, e.g., 'BTC/USDT'
side (str): 'buy' or 'sell'
amount (float): Amount to trade
type (str): 'market' or 'limit'
price (float): Price for limit orders
"""
try:
if type == 'market':
order = self.exchange.create_market_order(symbol, side, amount, params=params)
elif type == 'limit':
if price is None:
raise ValueError("Price required for limit orders")
order = self.exchange.create_limit_order(symbol, side, amount, price, params=params)
else:
raise ValueError(f"Unsupported order type: {type}")
logger.info(f"Order created: {order['id']}")
return order
except Exception as e:
logger.error(f"Error creating order: {e}")
return None
def main():
"""Test the OKX data processor"""
processor = OKXDataProcessor()
symbol = 'BTC/USDT'
# Test getting historical data
logger.info(f"Fetching historical data for {symbol}...")
df = processor.get_historical_data(symbol, '1h', limit=100)
print(f"Data shape: {df.shape}")
if not df.empty:
df_with_features = processor.generate_features(df)
print(f"Features generated. Shape: {df_with_features.shape}")
print(df_with_features.tail())
# Test getting current price
price = processor.get_current_price(symbol)
print(f"Current {symbol} price: {price}")
# Test getting balance (might fail if keys are invalid/restricted)
balance = processor.get_account_balance()
if balance:
print(f"USDT Balance: {balance}")
else:
print("Could not fetch balance (check API permissions)")
if __name__ == "__main__":
main()