import os import sys import argparse import math, time from datetime import datetime from decimal import * import pandas as pd import asyncio from binance.client import Client from binance.exceptions import * from binance.helpers import date_to_milliseconds, interval_to_milliseconds from binance.enums import * from service.App import * from common.utils import * from service.analyzer import * import logging log = logging.getLogger('collector') logging.basicConfig( filename="collector.log", # parameter in App level=logging.DEBUG, #format = "%(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s", format = "%(asctime)s %(levelname)s %(message)s", #datefmt = '%Y-%m-%d %H:%M:%S', ) async def main_collector_task(): """ It is a highest level task which is added to the event loop and executed normally every 1 minute and then it calls other tasks. """ symbol = App.config["symbol"] startTime, endTime = get_interval("1m") now_ts = now_timestamp() log.info(f"===> Start collector task. Timestamp {now_ts}. Interval [{startTime},{endTime}].") # # 0. Check server state (if necessary) # if data_provider_problems_exist(): await data_provider_health_check() if data_provider_problems_exist(): log.error(f"Problems with the data provider server found. No signaling, no trade. Will try next time.") return 1 # # 1. Ensure that we are up-to-date with klines # res = await sync_data_collector_task() if res > 0: log.error(f"Problem getting data from the server. No signaling, no trade. Will try next time.") return 1 log.info(f"<=== End collector task.") return 0 # # Request/update market data # async def sync_data_collector_task(): """ Collect latest data. After executing this task our local (in-memory) data state is up-to-date. Hence, we can do something useful like data analysis and trading. Limitations and notes: - Currently, we can work only with one symbol - We update only local state by loading latest data. If it is necessary to initialize the db then another function should be used. """ symbol = App.config["symbol"] symbols = [symbol] # In future, we might want to collect other data, say, from other cryptocurrencies # Request newest data # We do this in any case in order to update our state (data, orders etc.) missing_klines_count = App.analyzer.get_missing_klines_count(symbol) #coros = [request_klines(sym, "1m", 5) for sym in symbols] tasks = [asyncio.create_task(request_klines(sym, "1m", missing_klines_count+1)) for sym in symbols] results = {} timeout = 5 # Seconds to wait for the result # Process responses in the order of arrival for fut in asyncio.as_completed(tasks, timeout=timeout): # Get the results res = None try: res = await fut except TimeoutError as te: log.warning(f"Timeout {timeout} seconds when requesting kline data.") return 1 except Exception as e: log.warning(f"Exception when requesting kline data.") return 1 # Add to the database (will overwrite existing klines if any) if res and res.keys(): # res is dict for symbol, which is a list of record lists of 12 fields # ============================== # TODO: We need to check these fields for validity (presence, non-null) # TODO: We can load maximum 999 latest klines, so if more 1600, then some other method # TODO: Print somewhere diagnostics about how many lines are in history buffer of db, and if nans are found results.update(res) try: added_count = App.analyzer.store_klines(res) except Exception as e: log.error(f"Error storing kline result in the database. Exception: {e}") return 1 else: log.error("Received empty or wrong result from klines request.") return 1 return 0 async def request_klines(symbol, freq, limit): """ Request klines data from the service for one symbol. Maximum the specified number of klines will be returned. :return: Dict with the symbol as a key and a list of klines as a value. One kline is also a list. """ klines_per_request = 400 now_ts = now_timestamp() startTime, endTime = get_interval(freq) klines = [] try: if limit <= klines_per_request: # Server will return these number of klines in one request # INFO: # - startTime: include all intervals (ids) with same or greater id: if within interval then excluding this interval; if is equal to open time then include this interval # - endTime: include all intervals (ids) with same or smaller id: if equal to left border then return this interval, if within interval then return this interval # - It will return also incomplete current interval (in particular, we could collect approximate klines for higher frequencies by requesting incomplete intervals) klines = App.client.get_klines(symbol=symbol, interval=freq, limit=limit, endTime=now_ts) # Return: list of lists, that is, one kline is a list (not dict) with items ordered: timestamp, open, high, low, close etc. else: # https://sammchardy.github.io/binance/2018/01/08/historical-data-download-binance.html # get_historical_klines(symbol, interval, start_str, end_str=None, limit=500) start_ts = now_ts - (limit+1) * 60_000 # Subtract the number of minutes from now ts klines = App.client.get_historical_klines(symbol=symbol, interval=freq, start_str=start_ts, end_str=now_ts) except BinanceRequestException as bre: # {"code": 1103, "msg": "An unknown parameter was sent"} log.error(f"BinanceRequestException while requesting klines: {bre}") return {} except BinanceAPIException as bae: # {"code": 1002, "msg": "Invalid API call"} log.error(f"BinanceAPIException while requesting klines: {bae}") return {} except Exception as e: log.error(f"Exception while requesting klines: {e}") return {} # # Post-process # # Find latest *full* (completed) interval in the result list. # The problem is that the result also contains the current (still running) interval which we want to exclude klines_full = [kl for kl in klines if kl[0] < startTime] last_full_kline = klines_full[-1] last_full_kline_ts = last_full_kline[0] if last_full_kline_ts != startTime - 60_000: log.error(f"UNEXPECTED RESULT: Last full kline timestamp {last_full_kline_ts} is not equal to previous full interval start {startTime - 60_000}. Maybe some results are missing and there are gaps.") # Return all received klines with the symbol as a key return {symbol: klines_full} # # Server and account info # async def data_provider_health_check(): """ Request information about the data provider server state. """ symbol = App.config["symbol"] # Get server state (ping) and trade status (e.g., trade can be suspended on some symbol) system_status = App.client.get_system_status() #{ # "status": 0, # 0: normal,1:system maintenance # "msg": "normal" # normal or System maintenance. #} if not system_status or system_status.get("status") != 0: App.server_status = 1 return 1 App.server_status = 0 # Ping the server # Check time synchronization #server_time = App.client.get_server_time() #time_diff = int(time.time() * 1000) - server_time['serverTime'] # TODO: Log large time differences (or better trigger time synchronization procedure) return 0