mirror of
https://github.com/asavinov/intelligent-trading-bot.git
synced 2026-05-04 08:26:19 +00:00
343 lines
13 KiB
Python
343 lines
13 KiB
Python
import os
|
|
import sys
|
|
import argparse
|
|
import math, time
|
|
from datetime import datetime, timedelta
|
|
from dateutil import parser
|
|
from decimal import *
|
|
from typing import Any, Coroutine
|
|
from pathlib import Path
|
|
|
|
import pandas as pd
|
|
import asyncio
|
|
|
|
from binance import Client
|
|
from binance.exceptions import *
|
|
from binance.helpers import date_to_milliseconds, interval_to_milliseconds
|
|
from binance.enums import *
|
|
|
|
from service.App import *
|
|
from common.utils import *
|
|
from inputs.utils_binance import *
|
|
|
|
import logging
|
|
log = logging.getLogger('binance.base_client')
|
|
|
|
client = None
|
|
|
|
#
|
|
# Parameters
|
|
#
|
|
append_overlap_records = 5 # How many records to request in addition to the missing data (overlap length)
|
|
|
|
# Binance-specific columns name corresponding to the values returned from API
|
|
column_names = [
|
|
'timestamp',
|
|
'open', 'high', 'low', 'close', 'volume',
|
|
'close_time',
|
|
'quote_av', 'trades', 'tb_base_av', 'tb_quote_av',
|
|
'ignore'
|
|
]
|
|
column_types = {
|
|
'timestamp': 'datetime64[ns, UTC]', # datetime64[ns, UTC] datetime64[ns]
|
|
'open': 'float64', 'high': 'float64', 'low': 'float64', 'close': 'float64', 'volume': 'float64',
|
|
'close_time': 'int64',
|
|
'quote_av': 'float64', 'trades': 'int64', 'tb_base_av': 'float64', 'tb_quote_av': 'float64',
|
|
'ignore': 'float64',
|
|
}
|
|
time_column = 'timestamp'
|
|
|
|
|
|
def init_client(parameters, client_args):
|
|
global client, append_overlap_records
|
|
append_overlap_records = parameters.get("append_overlap_records", 5)
|
|
client = Client(**client_args)
|
|
|
|
def get_client():
|
|
return client
|
|
|
|
def close_client():
|
|
client.close_connection()
|
|
|
|
async def fetch_klines(config: dict, start_from_dt) -> dict[str, pd.DataFrame] | None:
|
|
"""
|
|
Retrieve and return latest data from binance client.
|
|
|
|
Limitation: maximum 999 latest klines can be retrieved. If more is needed then some other function has to be used
|
|
|
|
:return: For each symbol (key of the dict), data frame with data and binance-specific columns
|
|
"""
|
|
|
|
data_sources = config.get("data_sources", [])
|
|
symbols = [x.get("folder") for x in data_sources]
|
|
freq = config["freq"]
|
|
binance_freq = binance_freq_from_pandas(freq)
|
|
|
|
if not symbols:
|
|
symbols = [config["symbol"]]
|
|
|
|
# Compute how many records need to be fetched from the specified start timestamp
|
|
intervals_count = get_interval_count_from_start_dt(freq, start_from_dt)
|
|
request_count = intervals_count + append_overlap_records
|
|
|
|
# Create a list of tasks for retrieving data
|
|
missing_klines_counts = [request_count for sym in symbols]
|
|
#coros = [request_symbol_klines(sym, "1m", 5) for sym in symbols]
|
|
tasks = [asyncio.create_task(request_symbol_klines(s, freq, c)) for c, s in zip(missing_klines_counts, symbols)]
|
|
|
|
results = {}
|
|
timeout = 10 # Seconds to wait for the result
|
|
|
|
# Process responses in the order of arrival
|
|
for fut in asyncio.as_completed(tasks, timeout=timeout):
|
|
# Get the results
|
|
res = None
|
|
try:
|
|
res = await fut # res is dict for symbol, which is a list of record lists of 12 fields
|
|
except TimeoutError as te:
|
|
log.warning(f"Timeout {timeout} seconds when requesting kline data.")
|
|
return None
|
|
except Exception as e:
|
|
log.warning(f"Exception when requesting kline data.")
|
|
return None
|
|
|
|
# Add to the database (will overwrite existing klines if any)
|
|
if res and res.keys():
|
|
results.update(res)
|
|
else:
|
|
log.error("Received empty or wrong result from klines request.")
|
|
return None
|
|
|
|
for symbol, klines in results.items():
|
|
df = klines_to_df(klines)
|
|
df.name = symbol
|
|
results[symbol] = df
|
|
|
|
return results
|
|
|
|
async def request_symbol_klines(symbol, freq, limit: int):
|
|
"""
|
|
Request klines data from the service for one symbol.
|
|
Maximum the specified number of klines will be returned.
|
|
|
|
:param symbol:
|
|
:param freq: pandas frequency like '1min' which is supported by Binance API
|
|
:param limit: desired and maximum number of klines
|
|
:return: Dict with the symbol as a key and a list of klines as a value. One kline is also a list.
|
|
"""
|
|
klines_per_request = 400 # Limitation of API
|
|
|
|
now_ts = now_timestamp()
|
|
start_ts, end_ts = pandas_get_interval(freq)
|
|
|
|
binance_freq = binance_freq_from_pandas(freq)
|
|
interval_length_ms = pandas_interval_length_ms(freq)
|
|
|
|
try:
|
|
if limit <= klines_per_request: # Server will return these number of klines in one request
|
|
# INFO:
|
|
# - startTime: include all intervals (ids) with same or greater id: if within interval then excluding this interval; if is equal to open time then include this interval
|
|
# - endTime: include all intervals (ids) with same or smaller id: if equal to left border then return this interval, if within interval then return this interval
|
|
# - It will return also incomplete current interval (in particular, we could collect approximate klines for higher frequencies by requesting incomplete intervals)
|
|
klines = client.get_klines(symbol=symbol, interval=binance_freq, limit=limit, endTime=now_ts)
|
|
# Return: list of lists, that is, one kline is a list (not dict) with items ordered: timestamp, open, high, low, close etc.
|
|
else:
|
|
# https://sammchardy.github.io/binance/2018/01/08/historical-data-download-binance.html
|
|
# get_historical_klines(symbol, interval, start_str, end_str=None, limit=500)
|
|
# Find start from the number of records and frequency (interval length in milliseconds)
|
|
request_start_ts = now_ts - interval_length_ms * (limit+1)
|
|
klines = client.get_historical_klines(symbol=symbol, interval=binance_freq, start_str=request_start_ts, end_str=now_ts)
|
|
except BinanceRequestException as bre:
|
|
# {"code": 1103, "msg": "An unknown parameter was sent"}
|
|
log.error(f"BinanceRequestException while requesting klines: {bre}")
|
|
return {}
|
|
except BinanceAPIException as bae:
|
|
# {"code": 1002, "msg": "Invalid API call"}
|
|
log.error(f"BinanceAPIException while requesting klines: {bae}")
|
|
return {}
|
|
except Exception as e:
|
|
log.error(f"Exception while requesting klines: {e}")
|
|
return {}
|
|
|
|
#
|
|
# Post-process
|
|
#
|
|
|
|
# Find last complete interval in the result list
|
|
# The problem is that the result also contains the current (still running) interval which we want to exclude
|
|
# Exclude last kline if it corresponds to the current interval
|
|
klines_full = [kl for kl in klines if kl[0] < start_ts]
|
|
last_full_kline_ts = klines_full[-1][0]
|
|
|
|
if last_full_kline_ts != start_ts - interval_length_ms:
|
|
log.error(f"UNEXPECTED RESULT: Last full kline timestamp {last_full_kline_ts} is not equal to previous full interval start {start_ts - interval_length_ms}. Maybe some results are missing and there are gaps.")
|
|
|
|
# Return all received klines with the symbol as a key
|
|
return {symbol: klines_full}
|
|
|
|
async def health_check():
|
|
"""
|
|
Request information about the data provider server state.
|
|
"""
|
|
# Get server state (ping) and trade status (e.g., trade can be suspended on some symbol)
|
|
system_status = client.get_system_status()
|
|
#{
|
|
# "status": 0, # 0: normal,1:system maintenance
|
|
# "msg": "normal" # normal or System maintenance.
|
|
#}
|
|
if not system_status:
|
|
log.error(f"Error connecting to Binance server. No status information.")
|
|
return 1
|
|
if system_status.get("status") != 0:
|
|
log.error(f"Error connecting to Binance server. Bad status: {system_status.get("status")}")
|
|
return 1
|
|
|
|
# Check time synchronization (difference betweeen server and local time)
|
|
#server_time = client.get_server_time()
|
|
#time_diff = int(time.time() * 1000) - server_time['serverTime']
|
|
# TODO: Log large time differences (or better trigger time synchronization procedure)
|
|
|
|
return 0
|
|
|
|
def klines_to_df(klines: list):
|
|
"""
|
|
Convert a list of klines (for one symbol) to a data frame by using the binance-specific convention for (a sequence of) column names and their types.
|
|
"""
|
|
df = pd.DataFrame(klines, columns=column_names)
|
|
df[time_column] = pd.to_datetime(df[time_column], unit='ms', utc=True)
|
|
df = df.astype(column_types)
|
|
|
|
# Explicitly assign or convert time zone not needed because we convert millis directly to UTC
|
|
#if df[time_column].dt.tz is None:
|
|
# df[time_column] = df[time_column].dt.tz_localize('UTC')
|
|
#else:
|
|
# df[time_column] = df[time_column].dt.tz_convert('UTC')
|
|
|
|
#df['close_time'] = pd.to_datetime(df['close_time'], unit='ms', utc=True)
|
|
|
|
#df["open"] = pd.to_numeric(df["open"])
|
|
#df["high"] = pd.to_numeric(df["high"])
|
|
#df["low"] = pd.to_numeric(df["low"])
|
|
#df["close"] = pd.to_numeric(df["close"])
|
|
#df["volume"] = pd.to_numeric(df["volume"])
|
|
|
|
#df["quote_av"] = pd.to_numeric(df["quote_av"])
|
|
#df["trades"] = pd.to_numeric(df["trades"])
|
|
#df["tb_base_av"] = pd.to_numeric(df["tb_base_av"])
|
|
#df["tb_quote_av"] = pd.to_numeric(df["tb_quote_av"])
|
|
|
|
# Set index by retaining the time column
|
|
df.set_index(time_column, inplace=True, drop=False)
|
|
|
|
# Validate
|
|
if df.isnull().any().any():
|
|
null_columns = {k: v for k, v in df.isnull().any().to_dict().items() if v}
|
|
print(f"WARNING: Null in raw data found during conversion. Columns with Nulls: {null_columns}")
|
|
# TODO: We might receive empty strings or 0s in numeric data - how can we detect them?
|
|
# TODO: Check that timestamps in 'close_time' are strictly consecutive. It is warning - not error
|
|
|
|
return df
|
|
|
|
def download_klines(config, data_sources):
|
|
"""
|
|
Retrieving historic klines from binance server.
|
|
|
|
Client.get_historical_klines
|
|
"""
|
|
time_column = config["time_column"]
|
|
data_path = Path(config["data_folder"])
|
|
download_max_rows = config.get("download_max_rows", 0)
|
|
|
|
now = datetime.now()
|
|
|
|
freq = config["freq"] # Pandas frequency
|
|
print(f"Pandas frequency: {freq}")
|
|
|
|
freq = binance_freq_from_pandas(freq)
|
|
print(f"Binance frequency: {freq}")
|
|
|
|
client_args = config.get("client_args", {})
|
|
if config.get("api_key"):
|
|
client_args["api_key"] = config.get("api_key")
|
|
if config.get("api_secret"):
|
|
client_args["api_secret"] = config.get("api_secret")
|
|
|
|
# Create binance client to be used for data retrieval
|
|
client = Client(**client_args)
|
|
|
|
futures = False
|
|
if futures:
|
|
client.API_URL = "https://fapi.binance.com/fapi"
|
|
client.PRIVATE_API_VERSION = "v1"
|
|
client.PUBLIC_API_VERSION = "v1"
|
|
|
|
for ds in data_sources:
|
|
# Assumption: folder name is equal to the symbol name we want to download
|
|
quote = ds.get("folder")
|
|
if not quote:
|
|
print(f"ERROR. Folder is not specified.")
|
|
continue
|
|
|
|
print(f"Start downloading '{quote}' ...")
|
|
|
|
file_path = data_path / quote
|
|
file_path.mkdir(parents=True, exist_ok=True) # Ensure that folder exists
|
|
|
|
file_name = (file_path / ("futures" if futures else "klines")).with_suffix(".csv")
|
|
|
|
# Get a few latest klines to determine the latest available timestamp
|
|
latest_klines = client.get_klines(symbol=quote, interval=freq, limit=5)
|
|
latest_ts = pd.to_datetime(latest_klines[-1][0], unit='ms', utc=True)
|
|
|
|
if file_name.is_file():
|
|
# Load the existing data in order to append newly downloaded data
|
|
df = pd.read_csv(file_name)
|
|
df[time_column] = pd.to_datetime(df[time_column], format='ISO8601', utc=True)
|
|
df = df.astype(column_types)
|
|
df = df.set_index('timestamp', inplace=False, drop=False)
|
|
|
|
# oldest_point = parser.parse(data["timestamp"].iloc[-1])
|
|
oldest_point = df["timestamp"].iloc[-5] # Use an older point so that new data will overwrite old data
|
|
|
|
print(f"File found. Downloaded data for {quote} and {freq} since {str(latest_ts)} will be appended to the existing file {file_name}")
|
|
else:
|
|
# No existing data so we will download all available data and store as a new file
|
|
df = None
|
|
|
|
oldest_point = datetime(2017, 1, 1)
|
|
|
|
print(f"File not found. All data will be downloaded and stored in newly created file for {quote} and {freq}.")
|
|
|
|
#delta_minutes = (latest_ts - oldest_point).total_seconds() / 60
|
|
#binsizes = {"1m": 1, "5m": 5, "1h": 60, "1d": 1440}
|
|
#delta_lines = math.ceil(delta_minutes / binsizes[freq])
|
|
|
|
# === Download from the remote server using binance client
|
|
klines = client.get_historical_klines(
|
|
symbol=quote,
|
|
interval=freq,
|
|
start_str=oldest_point.isoformat(),
|
|
#end_str=latest_ts.isoformat() # fetch everything up to now
|
|
)
|
|
|
|
df_new = klines_to_df(klines)
|
|
|
|
if df is None:
|
|
df = df_new
|
|
else:
|
|
df = pd.concat([df, df_new])
|
|
|
|
# Drop duplicates
|
|
df = df.drop_duplicates(subset=["timestamp"], keep="last")
|
|
# df = df[~df.index.duplicated(keep='last')] # alternatively, drop duplicates in index
|
|
|
|
# Remove last row because it represents a non-complete kline (the interval not finished yet)
|
|
df = df.iloc[:-1]
|
|
|
|
# Limit the saved size by only the latest rows
|
|
if download_max_rows:
|
|
df = df.tail(download_max_rows)
|
|
|
|
df.to_csv(file_name, index=False)
|
|
|
|
print(f"Finished downloading '{quote}'. Stored {len(df)} rows in '{file_name}'")
|