intelligent-trading-bot/inputs/collector_binance.py
2026-02-16 10:46:40 +01:00

343 lines
13 KiB
Python

import os
import sys
import argparse
import math, time
from datetime import datetime, timedelta
from dateutil import parser
from decimal import *
from typing import Any, Coroutine
from pathlib import Path
import pandas as pd
import asyncio
from binance import Client
from binance.exceptions import *
from binance.helpers import date_to_milliseconds, interval_to_milliseconds
from binance.enums import *
from service.App import *
from common.utils import *
from inputs.utils_binance import *
import logging
log = logging.getLogger('binance.base_client')
client = None
#
# Parameters
#
append_overlap_records = 5 # How many records to request in addition to the missing data (overlap length)
# Binance-specific columns name corresponding to the values returned from API
column_names = [
'timestamp',
'open', 'high', 'low', 'close', 'volume',
'close_time',
'quote_av', 'trades', 'tb_base_av', 'tb_quote_av',
'ignore'
]
column_types = {
'timestamp': 'datetime64[ns, UTC]', # datetime64[ns, UTC] datetime64[ns]
'open': 'float64', 'high': 'float64', 'low': 'float64', 'close': 'float64', 'volume': 'float64',
'close_time': 'int64',
'quote_av': 'float64', 'trades': 'int64', 'tb_base_av': 'float64', 'tb_quote_av': 'float64',
'ignore': 'float64',
}
time_column = 'timestamp'
def init_client(parameters, client_args):
global client, append_overlap_records
append_overlap_records = parameters.get("append_overlap_records", 5)
client = Client(**client_args)
def get_client():
return client
def close_client():
client.close_connection()
async def fetch_klines(config: dict, start_from_dt) -> dict[str, pd.DataFrame] | None:
"""
Retrieve and return latest data from binance client.
Limitation: maximum 999 latest klines can be retrieved. If more is needed then some other function has to be used
:return: For each symbol (key of the dict), data frame with data and binance-specific columns
"""
data_sources = config.get("data_sources", [])
symbols = [x.get("folder") for x in data_sources]
freq = config["freq"]
binance_freq = binance_freq_from_pandas(freq)
if not symbols:
symbols = [config["symbol"]]
# Compute how many records need to be fetched from the specified start timestamp
intervals_count = get_interval_count_from_start_dt(freq, start_from_dt)
request_count = intervals_count + append_overlap_records
# Create a list of tasks for retrieving data
missing_klines_counts = [request_count for sym in symbols]
#coros = [request_symbol_klines(sym, "1m", 5) for sym in symbols]
tasks = [asyncio.create_task(request_symbol_klines(s, freq, c)) for c, s in zip(missing_klines_counts, symbols)]
results = {}
timeout = 10 # Seconds to wait for the result
# Process responses in the order of arrival
for fut in asyncio.as_completed(tasks, timeout=timeout):
# Get the results
res = None
try:
res = await fut # res is dict for symbol, which is a list of record lists of 12 fields
except TimeoutError as te:
log.warning(f"Timeout {timeout} seconds when requesting kline data.")
return None
except Exception as e:
log.warning(f"Exception when requesting kline data.")
return None
# Add to the database (will overwrite existing klines if any)
if res and res.keys():
results.update(res)
else:
log.error("Received empty or wrong result from klines request.")
return None
for symbol, klines in results.items():
df = klines_to_df(klines)
df.name = symbol
results[symbol] = df
return results
async def request_symbol_klines(symbol, freq, limit: int):
"""
Request klines data from the service for one symbol.
Maximum the specified number of klines will be returned.
:param symbol:
:param freq: pandas frequency like '1min' which is supported by Binance API
:param limit: desired and maximum number of klines
:return: Dict with the symbol as a key and a list of klines as a value. One kline is also a list.
"""
klines_per_request = 400 # Limitation of API
now_ts = now_timestamp()
start_ts, end_ts = pandas_get_interval(freq)
binance_freq = binance_freq_from_pandas(freq)
interval_length_ms = pandas_interval_length_ms(freq)
try:
if limit <= klines_per_request: # Server will return these number of klines in one request
# INFO:
# - startTime: include all intervals (ids) with same or greater id: if within interval then excluding this interval; if is equal to open time then include this interval
# - endTime: include all intervals (ids) with same or smaller id: if equal to left border then return this interval, if within interval then return this interval
# - It will return also incomplete current interval (in particular, we could collect approximate klines for higher frequencies by requesting incomplete intervals)
klines = client.get_klines(symbol=symbol, interval=binance_freq, limit=limit, endTime=now_ts)
# Return: list of lists, that is, one kline is a list (not dict) with items ordered: timestamp, open, high, low, close etc.
else:
# https://sammchardy.github.io/binance/2018/01/08/historical-data-download-binance.html
# get_historical_klines(symbol, interval, start_str, end_str=None, limit=500)
# Find start from the number of records and frequency (interval length in milliseconds)
request_start_ts = now_ts - interval_length_ms * (limit+1)
klines = client.get_historical_klines(symbol=symbol, interval=binance_freq, start_str=request_start_ts, end_str=now_ts)
except BinanceRequestException as bre:
# {"code": 1103, "msg": "An unknown parameter was sent"}
log.error(f"BinanceRequestException while requesting klines: {bre}")
return {}
except BinanceAPIException as bae:
# {"code": 1002, "msg": "Invalid API call"}
log.error(f"BinanceAPIException while requesting klines: {bae}")
return {}
except Exception as e:
log.error(f"Exception while requesting klines: {e}")
return {}
#
# Post-process
#
# Find last complete interval in the result list
# The problem is that the result also contains the current (still running) interval which we want to exclude
# Exclude last kline if it corresponds to the current interval
klines_full = [kl for kl in klines if kl[0] < start_ts]
last_full_kline_ts = klines_full[-1][0]
if last_full_kline_ts != start_ts - interval_length_ms:
log.error(f"UNEXPECTED RESULT: Last full kline timestamp {last_full_kline_ts} is not equal to previous full interval start {start_ts - interval_length_ms}. Maybe some results are missing and there are gaps.")
# Return all received klines with the symbol as a key
return {symbol: klines_full}
async def health_check():
"""
Request information about the data provider server state.
"""
# Get server state (ping) and trade status (e.g., trade can be suspended on some symbol)
system_status = client.get_system_status()
#{
# "status": 0, # 0: normal,1:system maintenance
# "msg": "normal" # normal or System maintenance.
#}
if not system_status:
log.error(f"Error connecting to Binance server. No status information.")
return 1
if system_status.get("status") != 0:
log.error(f"Error connecting to Binance server. Bad status: {system_status.get("status")}")
return 1
# Check time synchronization (difference betweeen server and local time)
#server_time = client.get_server_time()
#time_diff = int(time.time() * 1000) - server_time['serverTime']
# TODO: Log large time differences (or better trigger time synchronization procedure)
return 0
def klines_to_df(klines: list):
"""
Convert a list of klines (for one symbol) to a data frame by using the binance-specific convention for (a sequence of) column names and their types.
"""
df = pd.DataFrame(klines, columns=column_names)
df[time_column] = pd.to_datetime(df[time_column], unit='ms', utc=True)
df = df.astype(column_types)
# Explicitly assign or convert time zone not needed because we convert millis directly to UTC
#if df[time_column].dt.tz is None:
# df[time_column] = df[time_column].dt.tz_localize('UTC')
#else:
# df[time_column] = df[time_column].dt.tz_convert('UTC')
#df['close_time'] = pd.to_datetime(df['close_time'], unit='ms', utc=True)
#df["open"] = pd.to_numeric(df["open"])
#df["high"] = pd.to_numeric(df["high"])
#df["low"] = pd.to_numeric(df["low"])
#df["close"] = pd.to_numeric(df["close"])
#df["volume"] = pd.to_numeric(df["volume"])
#df["quote_av"] = pd.to_numeric(df["quote_av"])
#df["trades"] = pd.to_numeric(df["trades"])
#df["tb_base_av"] = pd.to_numeric(df["tb_base_av"])
#df["tb_quote_av"] = pd.to_numeric(df["tb_quote_av"])
# Set index by retaining the time column
df.set_index(time_column, inplace=True, drop=False)
# Validate
if df.isnull().any().any():
null_columns = {k: v for k, v in df.isnull().any().to_dict().items() if v}
print(f"WARNING: Null in raw data found during conversion. Columns with Nulls: {null_columns}")
# TODO: We might receive empty strings or 0s in numeric data - how can we detect them?
# TODO: Check that timestamps in 'close_time' are strictly consecutive. It is warning - not error
return df
def download_klines(config, data_sources):
"""
Retrieving historic klines from binance server.
Client.get_historical_klines
"""
time_column = config["time_column"]
data_path = Path(config["data_folder"])
download_max_rows = config.get("download_max_rows", 0)
now = datetime.now()
freq = config["freq"] # Pandas frequency
print(f"Pandas frequency: {freq}")
freq = binance_freq_from_pandas(freq)
print(f"Binance frequency: {freq}")
client_args = config.get("client_args", {})
if config.get("api_key"):
client_args["api_key"] = config.get("api_key")
if config.get("api_secret"):
client_args["api_secret"] = config.get("api_secret")
# Create binance client to be used for data retrieval
client = Client(**client_args)
futures = False
if futures:
client.API_URL = "https://fapi.binance.com/fapi"
client.PRIVATE_API_VERSION = "v1"
client.PUBLIC_API_VERSION = "v1"
for ds in data_sources:
# Assumption: folder name is equal to the symbol name we want to download
quote = ds.get("folder")
if not quote:
print(f"ERROR. Folder is not specified.")
continue
print(f"Start downloading '{quote}' ...")
file_path = data_path / quote
file_path.mkdir(parents=True, exist_ok=True) # Ensure that folder exists
file_name = (file_path / ("futures" if futures else "klines")).with_suffix(".csv")
# Get a few latest klines to determine the latest available timestamp
latest_klines = client.get_klines(symbol=quote, interval=freq, limit=5)
latest_ts = pd.to_datetime(latest_klines[-1][0], unit='ms', utc=True)
if file_name.is_file():
# Load the existing data in order to append newly downloaded data
df = pd.read_csv(file_name)
df[time_column] = pd.to_datetime(df[time_column], format='ISO8601', utc=True)
df = df.astype(column_types)
df = df.set_index('timestamp', inplace=False, drop=False)
# oldest_point = parser.parse(data["timestamp"].iloc[-1])
oldest_point = df["timestamp"].iloc[-5] # Use an older point so that new data will overwrite old data
print(f"File found. Downloaded data for {quote} and {freq} since {str(latest_ts)} will be appended to the existing file {file_name}")
else:
# No existing data so we will download all available data and store as a new file
df = None
oldest_point = datetime(2017, 1, 1)
print(f"File not found. All data will be downloaded and stored in newly created file for {quote} and {freq}.")
#delta_minutes = (latest_ts - oldest_point).total_seconds() / 60
#binsizes = {"1m": 1, "5m": 5, "1h": 60, "1d": 1440}
#delta_lines = math.ceil(delta_minutes / binsizes[freq])
# === Download from the remote server using binance client
klines = client.get_historical_klines(
symbol=quote,
interval=freq,
start_str=oldest_point.isoformat(),
#end_str=latest_ts.isoformat() # fetch everything up to now
)
df_new = klines_to_df(klines)
if df is None:
df = df_new
else:
df = pd.concat([df, df_new])
# Drop duplicates
df = df.drop_duplicates(subset=["timestamp"], keep="last")
# df = df[~df.index.duplicated(keep='last')] # alternatively, drop duplicates in index
# Remove last row because it represents a non-complete kline (the interval not finished yet)
df = df.iloc[:-1]
# Limit the saved size by only the latest rows
if download_max_rows:
df = df.tail(download_max_rows)
df.to_csv(file_name, index=False)
print(f"Finished downloading '{quote}'. Stored {len(df)} rows in '{file_name}'")