2022-03-20 10:09:33 +01:00
import os
import sys
import argparse
import math , time
2026-02-06 12:36:12 +01:00
from datetime import datetime , timedelta
from dateutil import parser
2022-03-20 10:09:33 +01:00
from decimal import *
2025-10-28 12:09:10 +01:00
from typing import Any , Coroutine
2026-02-06 12:36:12 +01:00
from pathlib import Path
2022-03-20 10:09:33 +01:00
import pandas as pd
import asyncio
2024-12-15 15:25:11 +01:00
from binance import Client
2022-03-20 10:09:33 +01:00
from binance . exceptions import *
from binance . helpers import date_to_milliseconds , interval_to_milliseconds
from binance . enums import *
from service . App import *
from common . utils import *
2026-02-05 15:03:23 +01:00
from inputs . utils_binance import *
2022-03-20 10:09:33 +01:00
import logging
2026-01-31 16:59:29 +01:00
log = logging . getLogger ( ' binance.base_client ' )
2022-03-20 10:09:33 +01:00
2026-01-31 15:32:34 +01:00
client = None
2022-03-20 10:09:33 +01:00
2026-02-05 12:51:46 +01:00
#
# Parameters
#
append_overlap_records = 5 # How many records to request in addition to the missing data (overlap length)
# Binance-specific columns name corresponding to the values returned from API
column_names = [
' timestamp ' ,
' open ' , ' high ' , ' low ' , ' close ' , ' volume ' ,
' close_time ' ,
' quote_av ' , ' trades ' , ' tb_base_av ' , ' tb_quote_av ' ,
' ignore '
]
column_types = {
' timestamp ' : ' datetime64[ns, UTC] ' , # datetime64[ns, UTC] datetime64[ns]
' open ' : ' float64 ' , ' high ' : ' float64 ' , ' low ' : ' float64 ' , ' close ' : ' float64 ' , ' volume ' : ' float64 ' ,
' close_time ' : ' int64 ' ,
' quote_av ' : ' float64 ' , ' trades ' : ' int64 ' , ' tb_base_av ' : ' float64 ' , ' tb_quote_av ' : ' float64 ' ,
' ignore ' : ' float64 ' ,
}
time_column = ' timestamp '
2026-02-16 10:46:40 +01:00
def init_client ( parameters , client_args ) :
global client , append_overlap_records
append_overlap_records = parameters . get ( " append_overlap_records " , 5 )
client = Client ( * * client_args )
def get_client ( ) :
return client
def close_client ( ) :
client . close_connection ( )
2026-02-06 11:34:13 +01:00
async def fetch_klines ( config : dict , start_from_dt ) - > dict [ str , pd . DataFrame ] | None :
2022-03-20 10:09:33 +01:00
"""
2025-10-28 12:09:10 +01:00
Retrieve and return latest data from binance client .
Limitation : maximum 999 latest klines can be retrieved . If more is needed then some other function has to be used
2022-03-20 10:09:33 +01:00
2026-02-01 15:25:26 +01:00
: return : For each symbol ( key of the dict ) , data frame with data and binance - specific columns
2022-03-20 10:09:33 +01:00
"""
2025-10-28 12:09:10 +01:00
data_sources = config . get ( " data_sources " , [ ] )
2022-04-23 09:18:45 +02:00
symbols = [ x . get ( " folder " ) for x in data_sources ]
2025-10-28 12:09:10 +01:00
freq = config [ " freq " ]
2024-05-12 19:17:10 +02:00
binance_freq = binance_freq_from_pandas ( freq )
2022-04-23 09:18:45 +02:00
if not symbols :
2025-10-28 12:09:10 +01:00
symbols = [ config [ " symbol " ] ]
2022-03-20 10:09:33 +01:00
2026-02-05 12:51:46 +01:00
# Compute how many records need to be fetched from the specified start timestamp
2026-02-05 15:31:44 +01:00
intervals_count = get_interval_count_from_start_dt ( freq , start_from_dt )
2026-02-05 12:51:46 +01:00
request_count = intervals_count + append_overlap_records
2022-03-20 10:09:33 +01:00
2022-04-23 09:18:45 +02:00
# Create a list of tasks for retrieving data
2026-02-05 12:51:46 +01:00
missing_klines_counts = [ request_count for sym in symbols ]
2026-02-06 11:34:13 +01:00
#coros = [request_symbol_klines(sym, "1m", 5) for sym in symbols]
tasks = [ asyncio . create_task ( request_symbol_klines ( s , freq , c ) ) for c , s in zip ( missing_klines_counts , symbols ) ]
2022-03-20 10:09:33 +01:00
results = { }
2022-04-23 09:18:45 +02:00
timeout = 10 # Seconds to wait for the result
2022-03-20 10:09:33 +01:00
# Process responses in the order of arrival
for fut in asyncio . as_completed ( tasks , timeout = timeout ) :
# Get the results
res = None
try :
2025-10-28 12:09:10 +01:00
res = await fut # res is dict for symbol, which is a list of record lists of 12 fields
2022-03-20 10:09:33 +01:00
except TimeoutError as te :
log . warning ( f " Timeout { timeout } seconds when requesting kline data. " )
2025-10-28 12:09:10 +01:00
return None
2022-03-20 10:09:33 +01:00
except Exception as e :
log . warning ( f " Exception when requesting kline data. " )
2025-10-28 12:09:10 +01:00
return None
2022-03-20 10:09:33 +01:00
# Add to the database (will overwrite existing klines if any)
if res and res . keys ( ) :
results . update ( res )
else :
log . error ( " Received empty or wrong result from klines request. " )
2025-10-28 12:09:10 +01:00
return None
2022-03-20 10:09:33 +01:00
2026-02-01 15:25:26 +01:00
for symbol , klines in results . items ( ) :
df = klines_to_df ( klines )
df . name = symbol
results [ symbol ] = df
2025-10-28 12:09:10 +01:00
return results
2022-03-20 10:09:33 +01:00
2026-02-06 11:34:13 +01:00
async def request_symbol_klines ( symbol , freq , limit : int ) :
2022-03-20 10:09:33 +01:00
"""
2024-05-14 21:31:08 +02:00
Request klines data from the service for one symbol .
Maximum the specified number of klines will be returned .
2022-03-20 10:09:33 +01:00
2024-05-14 21:31:08 +02:00
: param symbol :
: param freq : pandas frequency like ' 1min ' which is supported by Binance API
: param limit : desired and maximum number of klines
2022-03-20 10:09:33 +01:00
: return : Dict with the symbol as a key and a list of klines as a value . One kline is also a list .
"""
2024-05-14 21:31:08 +02:00
klines_per_request = 400 # Limitation of API
2022-03-20 10:09:33 +01:00
now_ts = now_timestamp ( )
2024-05-14 21:31:08 +02:00
start_ts , end_ts = pandas_get_interval ( freq )
2022-03-20 10:09:33 +01:00
2024-05-14 21:31:08 +02:00
binance_freq = binance_freq_from_pandas ( freq )
interval_length_ms = pandas_interval_length_ms ( freq )
2022-03-20 10:09:33 +01:00
try :
if limit < = klines_per_request : # Server will return these number of klines in one request
# INFO:
# - startTime: include all intervals (ids) with same or greater id: if within interval then excluding this interval; if is equal to open time then include this interval
# - endTime: include all intervals (ids) with same or smaller id: if equal to left border then return this interval, if within interval then return this interval
# - It will return also incomplete current interval (in particular, we could collect approximate klines for higher frequencies by requesting incomplete intervals)
2026-01-31 15:32:34 +01:00
klines = client . get_klines ( symbol = symbol , interval = binance_freq , limit = limit , endTime = now_ts )
2022-03-20 10:09:33 +01:00
# Return: list of lists, that is, one kline is a list (not dict) with items ordered: timestamp, open, high, low, close etc.
else :
# https://sammchardy.github.io/binance/2018/01/08/historical-data-download-binance.html
# get_historical_klines(symbol, interval, start_str, end_str=None, limit=500)
2024-05-14 21:31:08 +02:00
# Find start from the number of records and frequency (interval length in milliseconds)
request_start_ts = now_ts - interval_length_ms * ( limit + 1 )
2026-01-31 15:32:34 +01:00
klines = client . get_historical_klines ( symbol = symbol , interval = binance_freq , start_str = request_start_ts , end_str = now_ts )
2022-03-20 10:09:33 +01:00
except BinanceRequestException as bre :
# {"code": 1103, "msg": "An unknown parameter was sent"}
log . error ( f " BinanceRequestException while requesting klines: { bre } " )
return { }
except BinanceAPIException as bae :
# {"code": 1002, "msg": "Invalid API call"}
log . error ( f " BinanceAPIException while requesting klines: { bae } " )
return { }
except Exception as e :
log . error ( f " Exception while requesting klines: { e } " )
return { }
#
# Post-process
#
2024-05-14 21:31:08 +02:00
# Find last complete interval in the result list
2022-03-20 10:09:33 +01:00
# The problem is that the result also contains the current (still running) interval which we want to exclude
2024-05-14 21:31:08 +02:00
# Exclude last kline if it corresponds to the current interval
klines_full = [ kl for kl in klines if kl [ 0 ] < start_ts ]
last_full_kline_ts = klines_full [ - 1 ] [ 0 ]
2022-03-20 10:09:33 +01:00
2024-05-14 21:31:08 +02:00
if last_full_kline_ts != start_ts - interval_length_ms :
log . error ( f " UNEXPECTED RESULT: Last full kline timestamp { last_full_kline_ts } is not equal to previous full interval start { start_ts - interval_length_ms } . Maybe some results are missing and there are gaps. " )
2022-03-20 10:09:33 +01:00
# Return all received klines with the symbol as a key
return { symbol : klines_full }
2026-02-06 11:34:13 +01:00
async def health_check ( ) :
2022-03-20 10:09:33 +01:00
"""
Request information about the data provider server state .
"""
# Get server state (ping) and trade status (e.g., trade can be suspended on some symbol)
2026-01-31 15:32:34 +01:00
system_status = client . get_system_status ( )
2022-03-20 10:09:33 +01:00
#{
# "status": 0, # 0: normal,1:system maintenance
# "msg": "normal" # normal or System maintenance.
#}
2026-02-06 11:34:13 +01:00
if not system_status :
log . error ( f " Error connecting to Binance server. No status information. " )
return 1
if system_status . get ( " status " ) != 0 :
log . error ( f " Error connecting to Binance server. Bad status: { system_status . get ( " status " ) } " )
2022-03-20 10:09:33 +01:00
return 1
2026-02-06 11:34:13 +01:00
# Check time synchronization (difference betweeen server and local time)
2026-01-31 15:32:34 +01:00
#server_time = client.get_server_time()
2022-03-20 10:09:33 +01:00
#time_diff = int(time.time() * 1000) - server_time['serverTime']
# TODO: Log large time differences (or better trigger time synchronization procedure)
return 0
2025-10-28 19:28:06 +01:00
def klines_to_df ( klines : list ) :
"""
Convert a list of klines ( for one symbol ) to a data frame by using the binance - specific convention for ( a sequence of ) column names and their types .
"""
df = pd . DataFrame ( klines , columns = column_names )
2025-11-02 12:51:54 +01:00
df [ time_column ] = pd . to_datetime ( df [ time_column ] , unit = ' ms ' , utc = True )
2025-10-28 19:28:06 +01:00
df = df . astype ( column_types )
2025-11-02 12:51:54 +01:00
# Explicitly assign or convert time zone not needed because we convert millis directly to UTC
2025-10-28 19:28:06 +01:00
#if df[time_column].dt.tz is None:
# df[time_column] = df[time_column].dt.tz_localize('UTC')
#else:
# df[time_column] = df[time_column].dt.tz_convert('UTC')
2025-11-02 12:51:54 +01:00
#df['close_time'] = pd.to_datetime(df['close_time'], unit='ms', utc=True)
2025-10-28 19:28:06 +01:00
#df["open"] = pd.to_numeric(df["open"])
#df["high"] = pd.to_numeric(df["high"])
#df["low"] = pd.to_numeric(df["low"])
#df["close"] = pd.to_numeric(df["close"])
#df["volume"] = pd.to_numeric(df["volume"])
#df["quote_av"] = pd.to_numeric(df["quote_av"])
#df["trades"] = pd.to_numeric(df["trades"])
#df["tb_base_av"] = pd.to_numeric(df["tb_base_av"])
#df["tb_quote_av"] = pd.to_numeric(df["tb_quote_av"])
2026-02-01 15:25:26 +01:00
# Set index by retaining the time column
df . set_index ( time_column , inplace = True , drop = False )
2025-10-28 19:28:06 +01:00
2026-01-25 17:11:28 +01:00
# Validate
if df . isnull ( ) . any ( ) . any ( ) :
null_columns = { k : v for k , v in df . isnull ( ) . any ( ) . to_dict ( ) . items ( ) if v }
print ( f " WARNING: Null in raw data found during conversion. Columns with Nulls: { null_columns } " )
# TODO: We might receive empty strings or 0s in numeric data - how can we detect them?
# TODO: Check that timestamps in 'close_time' are strictly consecutive. It is warning - not error
2025-10-28 19:28:06 +01:00
return df
2026-02-06 12:36:12 +01:00
def download_klines ( config , data_sources ) :
"""
Retrieving historic klines from binance server .
Client . get_historical_klines
"""
time_column = config [ " time_column " ]
data_path = Path ( config [ " data_folder " ] )
download_max_rows = config . get ( " download_max_rows " , 0 )
now = datetime . now ( )
freq = config [ " freq " ] # Pandas frequency
print ( f " Pandas frequency: { freq } " )
freq = binance_freq_from_pandas ( freq )
print ( f " Binance frequency: { freq } " )
client_args = config . get ( " client_args " , { } )
if config . get ( " api_key " ) :
client_args [ " api_key " ] = config . get ( " api_key " )
if config . get ( " api_secret " ) :
client_args [ " api_secret " ] = config . get ( " api_secret " )
# Create binance client to be used for data retrieval
client = Client ( * * client_args )
futures = False
if futures :
client . API_URL = " https://fapi.binance.com/fapi "
client . PRIVATE_API_VERSION = " v1 "
client . PUBLIC_API_VERSION = " v1 "
for ds in data_sources :
# Assumption: folder name is equal to the symbol name we want to download
quote = ds . get ( " folder " )
if not quote :
print ( f " ERROR. Folder is not specified. " )
continue
print ( f " Start downloading ' { quote } ' ... " )
file_path = data_path / quote
file_path . mkdir ( parents = True , exist_ok = True ) # Ensure that folder exists
file_name = ( file_path / ( " futures " if futures else " klines " ) ) . with_suffix ( " .csv " )
# Get a few latest klines to determine the latest available timestamp
latest_klines = client . get_klines ( symbol = quote , interval = freq , limit = 5 )
latest_ts = pd . to_datetime ( latest_klines [ - 1 ] [ 0 ] , unit = ' ms ' , utc = True )
if file_name . is_file ( ) :
# Load the existing data in order to append newly downloaded data
df = pd . read_csv ( file_name )
df [ time_column ] = pd . to_datetime ( df [ time_column ] , format = ' ISO8601 ' , utc = True )
df = df . astype ( column_types )
df = df . set_index ( ' timestamp ' , inplace = False , drop = False )
# oldest_point = parser.parse(data["timestamp"].iloc[-1])
oldest_point = df [ " timestamp " ] . iloc [ - 5 ] # Use an older point so that new data will overwrite old data
print ( f " File found. Downloaded data for { quote } and { freq } since { str ( latest_ts ) } will be appended to the existing file { file_name } " )
else :
# No existing data so we will download all available data and store as a new file
df = None
oldest_point = datetime ( 2017 , 1 , 1 )
print ( f " File not found. All data will be downloaded and stored in newly created file for { quote } and { freq } . " )
#delta_minutes = (latest_ts - oldest_point).total_seconds() / 60
#binsizes = {"1m": 1, "5m": 5, "1h": 60, "1d": 1440}
#delta_lines = math.ceil(delta_minutes / binsizes[freq])
# === Download from the remote server using binance client
klines = client . get_historical_klines (
symbol = quote ,
interval = freq ,
start_str = oldest_point . isoformat ( ) ,
#end_str=latest_ts.isoformat() # fetch everything up to now
)
df_new = klines_to_df ( klines )
if df is None :
df = df_new
else :
df = pd . concat ( [ df , df_new ] )
# Drop duplicates
df = df . drop_duplicates ( subset = [ " timestamp " ] , keep = " last " )
# df = df[~df.index.duplicated(keep='last')] # alternatively, drop duplicates in index
# Remove last row because it represents a non-complete kline (the interval not finished yet)
df = df . iloc [ : - 1 ]
# Limit the saved size by only the latest rows
if download_max_rows :
df = df . tail ( download_max_rows )
df . to_csv ( file_name , index = False )
print ( f " Finished downloading ' { quote } ' . Stored { len ( df ) } rows in ' { file_name } ' " )