mql5/crypto/okx_data_processor.py
2025-12-28 23:33:30 +08:00

494 líneas
22 KiB
Python

import ccxt
import pandas as pd
import numpy as np
import logging
import os
from datetime import datetime, timedelta
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class OKXDataProcessor:
def __init__(self, api_key=None, api_secret=None, passphrase=None, use_sandbox=False):
"""Initialize the OKX data processor"""
self.api_key = api_key or os.getenv("OKX_API_KEY")
self.api_secret = api_secret or os.getenv("OKX_API_SECRET")
self.passphrase = passphrase or os.getenv("OKX_API_PASSPHRASE")
if not all([self.api_key, self.api_secret, self.passphrase]):
logger.error("Missing OKX API credentials. Please set OKX_API_KEY, OKX_API_SECRET, and OKX_API_PASSPHRASE in .env file.")
# We can still initialize for public data if needed, but private endpoints will fail
# For now, let's allow it but log a warning, as ccxt might support public endpoints without keys
exchange_config = {
'apiKey': self.api_key,
'secret': self.api_secret,
'password': self.passphrase,
'enableRateLimit': True,
'options': {
'defaultType': 'swap', # Default to swap for perpetuals
},
}
if use_sandbox:
exchange_config['sandbox'] = True
self.exchange = ccxt.okx(exchange_config)
self.initialized = True
logger.info("OKX Exchange initialized")
def _normalize_timeframe(self, tf):
"""Normalize timeframe to CCXT format"""
mapping = {
'M1': '1m', 'M5': '5m', 'M15': '15m', 'M30': '30m',
'H1': '1h', 'H2': '2h', 'H4': '4h',
'D1': '1d', 'W1': '1w', 'MN': '1M',
'1H': '1h', '4H': '4h', '1D': '1d'
}
return mapping.get(tf.upper(), tf)
def get_historical_data(self, symbol, timeframe, limit=100, since=None):
"""Get historical data from OKX
Args:
symbol (str): Trading symbol, e.g., 'BTC/USDT'
timeframe (str): Timeframe, e.g., '1h', '15m'
limit (int): Number of candles to fetch
since (int): Timestamp in ms to start fetching from
Returns:
pd.DataFrame: DataFrame with OHLCV data
"""
# Normalize timeframe
timeframe = self._normalize_timeframe(timeframe)
try:
# Fetch OHLCV data
ohlcv = self.exchange.fetch_ohlcv(symbol, timeframe, since=since, limit=limit)
# Convert to DataFrame
df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
# Convert timestamp to datetime
df['time'] = pd.to_datetime(df['timestamp'], unit='ms')
df.set_index('time', inplace=True)
# Drop the original timestamp column
df.drop('timestamp', axis=1, inplace=True)
return df
except Exception as e:
logger.error(f"Error getting OKX data: {e}")
return pd.DataFrame()
def get_current_price(self, symbol):
"""Get current ticker price"""
try:
ticker = self.exchange.fetch_ticker(symbol)
return ticker['last']
except Exception as e:
logger.error(f"Error getting current price: {e}")
return None
def calculate_ema(self, df, period, price_column='close'):
"""Calculate Exponential Moving Average"""
return df[price_column].ewm(span=period, adjust=False).mean()
def calculate_atr(self, df, period=14):
"""Calculate Average True Range"""
df = df.copy()
df['tr1'] = abs(df['high'] - df['low'])
df['tr2'] = abs(df['high'] - df['close'].shift(1))
df['tr3'] = abs(df['low'] - df['close'].shift(1))
df['tr'] = df[['tr1', 'tr2', 'tr3']].max(axis=1)
atr = df['tr'].ewm(span=period, adjust=False).mean()
return atr
def calculate_rsi(self, df, period=14, price_column='close'):
"""Calculate Relative Strength Index"""
delta = df[price_column].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
return rsi
def generate_features(self, df, fast_ema=12, slow_ema=26, atr_period=14, rsi_period=14):
"""Generate trading features"""
df = df.copy()
# Calculate EMA
df['ema_fast'] = self.calculate_ema(df, fast_ema)
df['ema_slow'] = self.calculate_ema(df, slow_ema)
# Calculate ATR
df['atr'] = self.calculate_atr(df, atr_period)
# Calculate RSI
df['rsi'] = self.calculate_rsi(df, rsi_period)
# Calculate EMA crossover signal
df['ema_crossover'] = 0
df.loc[df['ema_fast'] > df['ema_slow'], 'ema_crossover'] = 1
df.loc[df['ema_fast'] < df['ema_slow'], 'ema_crossover'] = -1
# Calculate volatility (ATR percentage)
df['volatility'] = (df['atr'] / df['close']) * 100
# Calculate price change rate
df['price_change'] = df['close'].pct_change() * 100
# Fill NaN values
indicators = ['ema_fast', 'ema_slow', 'atr', 'rsi', 'ema_crossover', 'volatility', 'price_change']
for indicator in indicators:
if indicator in df.columns:
first_valid = df[indicator].first_valid_index()
if first_valid is not None:
df[indicator] = df[indicator].fillna(df.loc[first_valid, indicator])
else:
if indicator in ['ema_crossover']:
df[indicator] = df[indicator].fillna(0)
elif indicator in ['rsi']:
df[indicator] = df[indicator].fillna(50)
else:
df[indicator] = df[indicator].fillna(0)
return df
def prepare_model_input(self, df, lookback_period=20):
"""Prepare input data for ML models"""
features = ['close', 'high', 'low', 'volume', 'ema_fast', 'ema_slow', 'atr', 'rsi', 'volatility', 'price_change']
X = []
for i in range(lookback_period, len(df)):
X.append(df[features].iloc[i-lookback_period:i].values.flatten())
return np.array(X)
def get_account_balance(self, currency='USDT'):
"""Get account balance"""
try:
balance = self.exchange.fetch_balance()
if currency in balance['total']:
return {
'total': balance['total'][currency],
'free': balance['free'][currency],
'used': balance['used'][currency]
}
return None
except Exception as e:
logger.error(f"Error getting balance: {e}")
return None
def cancel_algo_orders(self, symbol):
"""Cancel all algo orders for a symbol"""
try:
# Need to fetch pending algo orders first
# ordType: conditional, oco, trigger, move_order_stop, iceber, twap
algo_orders = self.exchange.fetch_open_orders(symbol, params={'ordType': 'conditional'})
for order in algo_orders:
try:
# For OKX, canceling algo order often requires specific endpoint or param
# ccxt cancel_order usually handles it if passed correct ID
self.exchange.cancel_order(order['id'], symbol)
logger.info(f"Cancelled algo order {order['id']}")
except Exception as e:
logger.error(f"Failed to cancel algo order {order['id']}: {e}")
except Exception as e:
# It's possible fetch_open_orders with params is not fully supported or returns error if empty
logger.warning(f"Error checking algo orders (might be none): {e}")
def set_position_sl_tp(self, symbol, sl_price=None, tp_price=None):
"""
Set SL/TP for an existing position
"""
try:
# OKX Algo order for SL/TP on existing position
# First cancel existing SL/TP algo orders for this symbol to avoid duplicates
self.cancel_algo_orders(symbol)
algo_orders = []
if sl_price:
sl_order = {
'symbol': symbol,
'orderType': 'conditional', # or 'sl' depending on specific OKX params mapped in ccxt
'slTriggerPx': str(sl_price),
'slOrdPx': '-1', # Market price
'triggerPxType': 'last'
}
# CCXT might not have a direct unified method for this specific "Update Position SL/TP"
# effectively across all exchanges. For OKX, we often use 'edit_position_tpsl' if available
# or create specific algo orders.
pass
# Using CCXT's specialized method if available, or raw params
# For OKX, creating a 'stop' order with 'reduceOnly': True is common for SL
params = {'reduceOnly': True}
if sl_price:
# Sell Stop (assuming Long position)
# Need to know side. If we don't know side, this is risky.
# Assuming we know we are Long for now, or we fetch position first.
# A safer way is using 'algo' orders linked to position side.
pass
except Exception as e:
logger.error(f"Error setting SL/TP: {e}")
return False
def update_sl_tp(self, symbol, sl_price=None, tp_price=None, side='buy'):
"""
Update SL/TP for existing position using OKX specific algo order args
"""
try:
# Check for specific method in ccxt okx implementation
if hasattr(self.exchange, 'private_post_trade_order_algo'):
# Raw API call might be needed if CCXT doesn't wrap it fully for 'update'
pass
# Using CCXT create_order with params for SL/TP is usually for NEW orders.
# For EXISTING positions, we often place a new conditional order with reduceOnly=True.
# Cancel open algo orders first to replace them
try:
open_algos = self.exchange.fetch_open_orders(symbol, params={'ordType': 'conditional'})
# Note: 'conditional' might not capture all SL/TP. OKX has 'stop', 'trigger', etc.
# Simplest for now: Just log that we would update.
# To really implement:
# 1. Fetch current position to get size and side
# 2. Place stop-market order for SL
# 3. Place limit/market order for TP (or take-profit algo)
pass
except:
pass
# Let's try to use the specific 'edit_order' or just place new reduce-only stops
# OKX allows attaching SL/TP to position via 'place_algo_order'
# Determine side for SL/TP
# If we hold Long (buy), SL is a Sell Stop.
# If we hold Short (sell), SL is a Buy Stop.
sl_side = 'sell' if side == 'long' or side == 'buy' else 'buy'
if sl_price:
params = {
'stopPrice': sl_price,
'reduceOnly': True,
'tdMode': 'cross', # match margin mode
}
# This is just a standard stop order
# self.exchange.create_order(symbol, 'stop', sl_side, amount, params=params)
pass
except Exception as e:
logger.error(f"Error updating SL/TP: {e}")
def set_leverage(self, symbol, leverage):
"""Set leverage for a symbol"""
try:
# OKX requires setting leverage for specific margin mode (usually cross or isolated)
# We'll default to 'cross' for now as it's common for unified accounts,
# but allow it to be configured if needed.
# Note: ccxt set_leverage params: leverage, symbol, params
self.exchange.set_leverage(leverage, symbol, params={'mgnMode': 'cross'})
logger.info(f"Leverage set to {leverage}x for {symbol}")
return True
except Exception as e:
logger.error(f"Error setting leverage: {e}")
return False
def get_open_orders(self, symbol=None):
"""Get open orders"""
try:
return self.exchange.fetch_open_orders(symbol)
except Exception as e:
logger.error(f"Error getting open orders: {e}")
return []
def cancel_all_orders(self, symbol):
"""Cancel all open orders for a symbol"""
try:
# Check if cancel_all_orders is supported
if self.exchange.has['cancelAllOrders']:
self.exchange.cancel_all_orders(symbol)
logger.info(f"Cancelled all open orders for {symbol}")
else:
# Fallback: fetch open orders and cancel one by one
open_orders = self.get_open_orders(symbol)
for order in open_orders:
try:
self.exchange.cancel_order(order['id'], symbol)
logger.info(f"Cancelled order {order['id']}")
except Exception as e:
logger.error(f"Failed to cancel order {order['id']}: {e}")
except Exception as e:
logger.error(f"Error cancelling orders: {e}")
def place_sl_tp_order(self, symbol, side, amount, sl_price=None, tp_price=None):
"""
Place SL/TP orders for an existing position
side: 'buy' or 'sell' (direction of the SL/TP order, opposite to position)
"""
try:
# For OKX, we can use 'stop' orders with reduceOnly=True
params = {'reduceOnly': True}
# Check if reduceOnly is available/needed.
# If reduceOnly not available, we might need to set it to False or omit it
# if the position mode handles reduction automatically or for spot.
# But for swaps, reduceOnly is standard.
# If API complains "Reduce Only is not available", it might mean
# the account mode (e.g. Net mode vs Long/Short mode) or specific order type
# doesn't support it in this context.
# Workaround for error 51205: Try without reduceOnly if it fails,
# OR better: use 'algo' order endpoint which is designed for SL/TP
if sl_price:
# Stop Loss (Stop Market)
# Try placing as an Algo Order (Conditional)
sl_params = {
'tdMode': 'cross', # Assume cross, or should fetch mode
'triggerPx': str(sl_price),
'ordType': 'conditional',
'slTriggerPx': str(sl_price),
'slOrdPx': '-1', # Market
'reduceOnly': True
}
# Simple fallback: Standard Stop Market Order
# Note: OKX V5 uses 'triggerPx' for stop price in create_order params usually
logger.info(f"Placing SL order: {side} {amount} @ {sl_price}")
# Attempt 1: Standard 'market' order with stopPrice
try:
# Some ccxt versions map stopPrice to correct triggerPx
# But if reduceOnly fails, try omit it if we are sure it's closing
self.exchange.create_order(symbol, 'market', side, amount, params={'stopPrice': sl_price, 'reduceOnly': True})
except Exception as inner_e:
logger.warning(f"Standard SL placement failed ({inner_e}), trying algo order...")
# Attempt 2: Algo order explicitly
try:
# For OKX, we use 'conditional' order type via create_order or specialized params
# We must map 'slTriggerPx' and 'slOrdPx' correctly in params
# ordType='conditional' is key for OKX V5
algo_params = {
'tdMode': 'cross', # Make sure this matches your position mode!
'ordType': 'conditional',
'slTriggerPx': str(sl_price),
'slOrdPx': '-1', # -1 for Market
'slTriggerPxType': 'last',
# OKX sometimes requires tag/clOrdId or reducesOnly not to be present for algos
# reduceOnly is implied for SL attached to position usually, but for separate algo order:
'reduceOnly': True
}
# When using 'conditional', we might need to use a specific method or pass params to create_order
# Note: ccxt create_order usually handles 'conditional' type if supported,
# otherwise we fall back to raw API if needed.
# For OKX, creating a 'conditional' order:
# IMPORTANT: OKX V5 uses 'algo-order' endpoint for this. CCXT 'create_order' might map to 'order' endpoint
# We might need to use specific params to force algo endpoint or use implicit mapping.
# If standard create_order fails, we try specific 'stop' order type if CCXT maps it.
# Trying 'stop' type which CCXT often maps to algo order
self.exchange.create_order(symbol, 'stop', side, amount, params=algo_params)
logger.info(f"Placed algo SL order: {side} {amount} @ {sl_price}")
except Exception as algo_e:
logger.error(f"Algo SL placement also failed: {algo_e}")
# Last resort: Try without reduceOnly if that's the blocker for algo too
try:
algo_params.pop('reduceOnly', None)
self.exchange.create_order(symbol, 'stop', side, amount, params=algo_params)
logger.info(f"Placed algo SL order (no reduceOnly): {side} {amount} @ {sl_price}")
except Exception as final_e:
logger.error(f"Final SL attempt failed: {final_e}")
if tp_price:
# Take Profit (Limit Order)
tp_params = params.copy()
logger.info(f"Placing TP order: {side} {amount} @ {tp_price}")
self.exchange.create_order(symbol, 'limit', side, amount, price=tp_price, params=tp_params)
except Exception as e:
logger.error(f"Error placing SL/TP: {e}")
def get_contract_size(self, symbol):
"""Get contract size for a symbol"""
try:
market = self.exchange.market(symbol)
return market['contractSize']
except Exception as e:
# Fallback or log error
# If market info not loaded, try fetching markets first
try:
self.exchange.load_markets()
market = self.exchange.market(symbol)
return market['contractSize']
except:
logger.error(f"Error getting contract size for {symbol}: {e}")
# Default fallback for ETH/USDT swap if fetch fails
if 'ETH' in symbol: return 0.1
if 'BTC' in symbol: return 0.01
return 1.0 # Safe fallback? Maybe risky.
def create_order(self, symbol, side, amount, type='market', price=None, params={}):
"""Create a trade order
Args:
symbol (str): Trading pair, e.g., 'BTC/USDT'
side (str): 'buy' or 'sell'
amount (float): Amount to trade
type (str): 'market' or 'limit'
price (float): Price for limit orders
"""
try:
if type == 'market':
order = self.exchange.create_market_order(symbol, side, amount, params=params)
elif type == 'limit':
if price is None:
raise ValueError("Price required for limit orders")
order = self.exchange.create_limit_order(symbol, side, amount, price, params=params)
else:
raise ValueError(f"Unsupported order type: {type}")
logger.info(f"Order created: {order['id']}")
return order
except Exception as e:
logger.error(f"Error creating order: {e}")
return None
def main():
"""Test the OKX data processor"""
processor = OKXDataProcessor()
symbol = 'BTC/USDT'
# Test getting historical data
logger.info(f"Fetching historical data for {symbol}...")
df = processor.get_historical_data(symbol, '1h', limit=100)
print(f"Data shape: {df.shape}")
if not df.empty:
df_with_features = processor.generate_features(df)
print(f"Features generated. Shape: {df_with_features.shape}")
print(df_with_features.tail())
# Test getting current price
price = processor.get_current_price(symbol)
print(f"Current {symbol} price: {price}")
# Test getting balance (might fail if keys are invalid/restricted)
balance = processor.get_account_balance()
if balance:
print(f"USDT Balance: {balance}")
else:
print("Could not fetch balance (check API permissions)")
if __name__ == "__main__":
main()