2024-12-15 15:06:03 +01:00
import click
2022-03-20 10:09:33 +01:00
from apscheduler . schedulers . asyncio import AsyncIOScheduler
2024-12-15 15:06:03 +01:00
from binance import Client
2024-06-22 09:58:08 +02:00
2025-04-17 14:21:17 +01:00
from common . types import Venue
2025-02-15 13:32:24 +01:00
from common . generators import output_feature_set
2026-01-09 17:47:08 +01:00
from common . analyzer import Analyzer
2025-02-15 13:32:24 +01:00
2025-04-17 14:21:17 +01:00
from inputs import get_collector_functions
2025-02-15 13:32:24 +01:00
2025-02-15 12:01:25 +01:00
from outputs . notifier_trades import *
from outputs . notifier_scores import *
from outputs . notifier_diagram import *
2025-04-17 14:21:17 +01:00
from outputs import get_trader_functions
2022-03-20 10:09:33 +01:00
import logging
2025-04-17 14:21:17 +01:00
2022-03-20 10:09:33 +01:00
log = logging . getLogger ( ' server ' )
2024-12-15 15:25:11 +01:00
logging . basicConfig (
filename = " server.log " ,
level = logging . DEBUG ,
#format = "%(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s",
format = " %(asctime)s %(levelname)s %(message)s " ,
#datefmt = '%Y-%m-%d %H:%M:%S',
)
2022-03-20 10:09:33 +01:00
2025-04-17 14:21:17 +01:00
# Get the collector functions based on the collector type
2022-03-20 10:09:33 +01:00
#
# Main procedure
#
async def main_task ( ) :
""" This task will be executed regularly according to the schedule """
2025-02-12 20:52:50 +01:00
#
# 1. Execute input adapters to receive new data from data source(s)
#
2024-12-15 15:06:03 +01:00
try :
2025-11-02 12:51:54 +01:00
res = await main_collector_task ( ) # Retrieve raw data, merge, convert to data frame and append
2024-12-15 15:06:03 +01:00
except Exception as e :
log . error ( f " Error in main_collector_task function: { e } " )
return
2022-03-20 10:09:33 +01:00
if res :
2024-12-15 15:06:03 +01:00
log . error ( f " Error in main_collector_task function: { res } " )
2022-03-20 10:09:33 +01:00
return res
# TODO: Validation
#last_kline_ts = App.analyzer.get_last_kline_ts(symbol)
#if last_kline_ts + 60_000 != startTime:
# log.error(f"Problem during analysis. Last kline end ts {last_kline_ts + 60_000} not equal to start of current interval {startTime}.")
2025-02-12 20:52:50 +01:00
#
2025-11-02 12:51:54 +01:00
# 2. Apply transformations and generate derived columns for the appended data
2025-02-12 20:52:50 +01:00
#
2022-03-20 10:09:33 +01:00
try :
analyze_task = await App . loop . run_in_executor ( None , App . analyzer . analyze )
except Exception as e :
2024-12-15 15:06:03 +01:00
log . error ( f " Error in analyze function: { e } " )
2022-03-20 10:09:33 +01:00
return
2025-02-12 20:52:50 +01:00
#
# 3. Execute output adapter which send the results of analysis to consumers
#
2025-02-15 13:32:24 +01:00
output_sets = App . config . get ( " output_sets " , [ ] )
for os in output_sets :
2024-12-15 15:06:03 +01:00
try :
2025-07-27 14:32:31 +02:00
await output_feature_set ( App . analyzer . df , os , App . config , App . model_store )
2024-12-15 15:06:03 +01:00
except Exception as e :
2025-02-15 13:32:24 +01:00
log . error ( f " Error in output function: { e } " )
2024-12-15 15:06:03 +01:00
return
2023-09-26 21:21:45 +02:00
2022-03-20 10:09:33 +01:00
return
2025-10-28 12:09:10 +01:00
async def main_collector_task ( ) :
"""
2025-11-02 12:51:54 +01:00
Retrieve raw data from venue - specific data sources and append to the main data frame
2025-10-28 12:09:10 +01:00
"""
venue = App . config . get ( " venue " )
venue = Venue ( venue )
2026-02-06 11:34:13 +01:00
fetch_klines_fn , health_check_fn = get_collector_functions ( venue )
2025-10-28 12:09:10 +01:00
symbol = App . config [ " symbol " ]
freq = App . config [ " freq " ]
start_ts , end_ts = pandas_get_interval ( freq )
now_ts = now_timestamp ( )
log . info ( f " ===> Start collector task. Timestamp { now_ts } . Interval [ { start_ts } , { end_ts } ]. " )
#
# 1. Check server state (if necessary)
#
if data_provider_problems_exist ( ) :
2026-02-28 13:30:00 +01:00
await health_check_fn ( )
2025-10-28 12:09:10 +01:00
if data_provider_problems_exist ( ) :
log . error ( f " Problems with the data provider server found. No signaling, no trade. Will try next time. " )
return 1
#
# 2. Get how much data is missing and request it
#
2026-02-05 12:51:46 +01:00
# Ask analyzer what is the timestamp of its last available row
last_kline_dt = App . analyzer . get_last_kline_dt ( )
# Request data starting from this time (with certain overlap)
2026-02-06 11:34:13 +01:00
dfs = await fetch_klines_fn ( App . config , last_kline_dt )
2026-02-01 15:25:26 +01:00
if dfs is None :
log . error ( f " Problem getting data from the server. Will try next time. " )
2025-10-28 12:09:10 +01:00
return 1
#
2026-02-01 15:25:26 +01:00
# 3. Append data to the analyzer for further processing (my also creating a common index and merging)
2025-10-28 12:09:10 +01:00
#
try :
2026-02-01 15:25:26 +01:00
App . analyzer . append_data ( dfs )
2025-10-28 12:09:10 +01:00
except Exception as e :
2026-01-25 17:11:28 +01:00
log . error ( f " Error appending data to the analyzer. Exception: { e } " )
2025-10-28 12:09:10 +01:00
return 1
log . info ( f " <=== End collector task. " )
return 0
2022-03-20 10:09:33 +01:00
@click.command ( )
@click.option ( ' --config_file ' , ' -c ' , type = click . Path ( ) , default = ' ' , help = ' Configuration file name ' )
def start_server ( config_file ) :
load_config ( config_file )
2026-02-05 12:51:46 +01:00
App . config [ " train " ] = False # Server does not train - it only predicts therefore explicitly disable train mode
2025-07-27 14:32:31 +02:00
2022-03-20 10:09:33 +01:00
symbol = App . config [ " symbol " ]
2024-05-12 19:17:10 +02:00
freq = App . config [ " freq " ]
2025-04-17 14:21:17 +01:00
venue = App . config . get ( " venue " )
try :
if venue is not None :
venue = Venue ( venue )
except ValueError as e :
2025-05-12 19:52:47 +02:00
log . error ( f " Invalid venue specified in config: { venue } . Error: { e } . Currently these values are supported: { [ e . value for e in Venue ] } " )
2025-04-17 14:21:17 +01:00
return
2026-02-06 11:34:13 +01:00
fetch_klines_fn , health_check_fn = get_collector_functions ( venue )
2025-04-17 14:21:17 +01:00
trader_funcs = get_trader_functions ( venue )
2025-05-12 19:52:47 +02:00
log . info ( f " Initializing server. Venue: { venue . value } . Trade pair: { symbol } . Frequency: { freq } " )
2025-04-17 14:21:17 +01:00
2022-03-20 10:09:33 +01:00
#getcontext().prec = 8
#
# Validation
#
#
# Connect to the server and update/initialize the system state
#
2025-04-17 14:21:17 +01:00
if venue == Venue . BINANCE :
2026-02-16 10:46:40 +01:00
# Prepare binance-specific parameters
client_params = { }
2026-02-06 10:09:38 +01:00
if App . config [ " append_overlap_records " ] :
2026-02-16 10:46:40 +01:00
client_params [ " append_overlap_records " ] = App . config [ " append_overlap_records " ]
# Prepare binance-specific client arguments
client_args = dict (
api_key = App . config . get ( " api_key " ) ,
api_secret = App . config . get ( " api_secret " )
)
client_args = client_args | App . config . get ( " client_args " , { } )
# Initialize client
from inputs . collector_binance import init_client
init_client ( client_params , client_args )
2025-10-12 12:35:34 +02:00
2025-04-17 14:21:17 +01:00
if venue == Venue . MT5 :
2026-02-16 10:46:40 +01:00
# Prepare mt5-specific parameters
client_params = { }
# Prepare mt5-specific client arguments
client_args = dict (
mt5_account_id = int ( App . config . get ( " mt5_account_id " ) ) ,
mt5_password = str ( App . config . get ( " mt5_password " ) ) ,
mt5_server = str ( App . config . get ( " mt5_server " ) )
)
client_args = client_args | App . config . get ( " client_args " , { } )
# Initialize client
from inputs . collector_mt5 import init_client
init_client ( client_params , client_args )
2022-03-20 10:09:33 +01:00
2025-06-15 12:17:00 +02:00
App . model_store = ModelStore ( App . config )
App . model_store . load_models ( )
App . analyzer = Analyzer ( App . config , App . model_store )
2025-07-27 14:32:31 +02:00
# Load latest transaction and (simulated) trade state
App . transaction = load_last_transaction ( )
2025-05-24 20:13:41 +02:00
#App.loop = asyncio.get_event_loop() # In Python 3.12: DeprecationWarning: There is no current event loop
App . loop = asyncio . new_event_loop ( )
2022-03-20 10:09:33 +01:00
2023-12-25 17:14:37 +01:00
# Cold start: load initial data, do complete analysis
2022-03-20 10:09:33 +01:00
try :
2025-10-28 12:09:10 +01:00
App . loop . run_until_complete ( main_collector_task ( ) )
2025-07-28 19:05:49 +02:00
# The very first call (cold start) may take some time because of big initial size and hence we make the second call to get the (possible) newest klines
2025-10-28 12:09:10 +01:00
App . loop . run_until_complete ( main_collector_task ( ) )
2023-12-25 17:14:37 +01:00
2025-10-28 12:09:10 +01:00
# Analyze all received data (not only last few rows) so that we have full history
2025-07-28 19:05:49 +02:00
App . analyzer . analyze ( )
2022-03-20 10:09:33 +01:00
except Exception as e :
2024-12-15 15:06:03 +01:00
log . error ( f " Problems during initial data collection. { e } " )
2022-03-20 10:09:33 +01:00
if data_provider_problems_exist ( ) :
2024-12-15 15:06:03 +01:00
log . error ( f " Problems during initial data collection. " )
2022-03-20 10:09:33 +01:00
return
2024-12-15 15:06:03 +01:00
log . info ( f " Finished initial data collection. " )
2022-03-20 10:09:33 +01:00
2025-02-16 12:46:03 +01:00
# TODO: Only for binance output and if it has been defined
2024-06-22 09:58:08 +02:00
# Initialize trade status (account, balances, orders etc.) in case we are going to really execute orders
if App . config . get ( " trade_model " , { } ) . get ( " trader_binance " ) :
2022-03-20 10:09:33 +01:00
try :
2025-04-17 14:21:17 +01:00
App . loop . run_until_complete ( trader_funcs [ ' update_trade_status ' ] ( ) )
2022-03-20 10:09:33 +01:00
except Exception as e :
2024-12-15 15:06:03 +01:00
log . error ( f " Problems trade status sync. { e } " )
2022-03-20 10:09:33 +01:00
if data_provider_problems_exist ( ) :
2024-12-15 15:06:03 +01:00
log . error ( f " Problems trade status sync. " )
2022-03-20 10:09:33 +01:00
return
2024-12-15 15:06:03 +01:00
log . info ( f " Finished trade status sync (account, balances etc.) " )
2025-04-17 07:49:08 +01:00
log . info ( f " Balance: { App . config [ ' base_asset ' ] } = { str ( App . account_info . base_quantity ) } " )
log . info ( f " Balance: { App . config [ ' quote_asset ' ] } = { str ( App . account_info . quote_quantity ) } " )
2022-03-20 10:09:33 +01:00
#
# Register scheduler
#
App . sched = AsyncIOScheduler ( )
# logging.getLogger('apscheduler.executors.default').setLevel(logging.WARNING)
logging . getLogger ( ' apscheduler ' ) . setLevel ( logging . WARNING )
2024-05-12 19:17:10 +02:00
trigger = freq_to_CronTrigger ( freq )
2022-03-20 10:09:33 +01:00
App . sched . add_job (
2025-05-12 19:52:47 +02:00
main_task ,
2024-05-12 19:17:10 +02:00
trigger = trigger ,
2022-03-20 10:09:33 +01:00
id = ' main_task '
)
2025-05-12 20:12:27 +02:00
App . sched . _eventloop = App . loop
2022-03-20 10:09:33 +01:00
App . sched . start ( ) # Start scheduler (essentially, start the thread)
2024-12-15 15:06:03 +01:00
log . info ( f " Scheduler started. " )
2025-04-22 14:21:24 +01:00
2022-03-20 10:09:33 +01:00
#
2025-04-22 14:21:24 +01:00
# Start event loop and scheduler
2022-03-20 10:09:33 +01:00
#
try :
App . loop . run_forever ( ) # Blocking. Run until stop() is called
except KeyboardInterrupt :
2024-12-15 15:06:03 +01:00
log . info ( f " KeyboardInterrupt. " )
2022-03-20 10:09:33 +01:00
finally :
2025-04-22 14:21:24 +01:00
log . info ( " Shutting down... " )
# Graceful shutdown
if App . sched and App . sched . running :
App . sched . shutdown ( )
log . info ( f " Scheduler shutdown. " )
# Stop the loop if it's still running (e.g., if shutdown initiated by signal other than KeyboardInterrupt)
if App . loop . is_running ( ) :
App . loop . stop ( )
log . info ( " Event loop stop requested. " )
# Close the loop
# Allow pending tasks to complete before closing (optional but good practice)
# You might need to run loop.run_until_complete(asyncio.sleep(0.1)) or similar
# if loop.stop() doesn't immediately halt everything.
2022-03-20 10:09:33 +01:00
App . loop . close ( )
2024-12-15 15:06:03 +01:00
log . info ( f " Event loop closed. " )
2026-01-31 15:32:34 +01:00
if venue == venue . BINANCE :
2026-02-16 10:46:40 +01:00
from inputs . collector_binance import close_client
close_client ( )
2025-04-22 14:21:24 +01:00
if venue == Venue . MT5 :
2026-02-16 10:46:40 +01:00
from inputs . collector_mt5 import close_client
close_client ( )
log . info ( " Connection closed. " )
2022-03-20 10:09:33 +01:00
return 0
if __name__ == " __main__ " :
start_server ( )