2022-03-20 10:09:33 +01:00
from datetime import datetime
from decimal import *
import click
import asyncio
from apscheduler . schedulers . asyncio import AsyncIOScheduler
from service . App import *
from common . utils import *
from service . collector import *
from service . analyzer import *
from service . notifier import *
from service . trader import *
import logging
log = logging . getLogger ( ' server ' )
#
# Main procedure
#
async def main_task ( ) :
""" This task will be executed regularly according to the schedule """
res = await main_collector_task ( )
if res :
return res
# TODO: Validation
#last_kline_ts = App.analyzer.get_last_kline_ts(symbol)
#if last_kline_ts + 60_000 != startTime:
# log.error(f"Problem during analysis. Last kline end ts {last_kline_ts + 60_000} not equal to start of current interval {startTime}.")
# Generate signals (derived features, predictions)
try :
analyze_task = await App . loop . run_in_executor ( None , App . analyzer . analyze )
except Exception as e :
print ( f " Error while analyzing data: { e } " )
return
# Signal is stored in App.signal
# Now we have a list of signals and can make trade decisions using trading logic and trade
2023-09-24 15:20:24 +02:00
if " trade " in App . config . get ( " actions " , { } ) :
2022-03-20 10:09:33 +01:00
trade_task = App . loop . create_task ( main_trader_task ( ) )
2023-09-30 12:58:45 +02:00
notification_model = App . config . get ( " signal_model " , { } ) . get ( " notification " , { } )
if notification_model . get ( " notify_signal " ) :
await send_signal_message ( )
if notification_model . get ( " notify_transaction " ) :
transaction = await simulate_trade ( )
if transaction :
await send_transaction_message ( transaction )
if notification_model . get ( " notify_diagram " ) :
close_time = App . signal . get ( ' close_time ' )
if close_time . hour == 0 and close_time . minute == 0 : # Every day
await send_diagram ( freq = ' H ' , nrows = 2 * 7 * 24 ) # 2 previous weeks with hourly aggregation
elif close_time . minute == 0 : # Every hour
pass
2023-09-26 21:21:45 +02:00
2022-03-20 10:09:33 +01:00
return
@click.command ( )
@click.option ( ' --config_file ' , ' -c ' , type = click . Path ( ) , default = ' ' , help = ' Configuration file name ' )
def start_server ( config_file ) :
load_config ( config_file )
symbol = App . config [ " symbol " ]
print ( f " Initializing server. Trade pair: { symbol } . " )
#getcontext().prec = 8
#
# Validation
#
#
# Connect to the server and update/initialize the system state
#
App . client = Client ( api_key = App . config [ " api_key " ] , api_secret = App . config [ " api_secret " ] )
App . analyzer = Analyzer ( App . config )
App . loop = asyncio . get_event_loop ( )
# Do one time server check and state update
try :
App . loop . run_until_complete ( data_provider_health_check ( ) )
except Exception as e :
print ( f " Problems during health check (connectivity, server etc.) { e } " )
if data_provider_problems_exist ( ) :
print ( f " Problems during health check (connectivity, server etc.) " )
return
print ( f " Finished health check (connection, server status etc.) " )
# Do one time data load (cold start)
try :
App . loop . run_until_complete ( sync_data_collector_task ( ) )
2022-03-25 22:49:33 +01:00
# First call may take a while because of big batch and hence we make second call into to get the (possible) newest klines not received by the first call
App . loop . run_until_complete ( sync_data_collector_task ( ) )
2022-03-20 10:09:33 +01:00
except Exception as e :
print ( f " Problems during initial data collection. { e } " )
if data_provider_problems_exist ( ) :
print ( f " Problems during initial data collection. " )
return
print ( f " Finished initial data collection. " )
# Initialize trade status (account, balances, orders etc.)
if " trade " in App . config [ " actions " ] :
try :
App . loop . run_until_complete ( update_trade_status ( ) )
except Exception as e :
print ( f " Problems trade status sync. { e } " )
if data_provider_problems_exist ( ) :
print ( f " Problems trade status sync. " )
return
print ( f " Finished trade status sync (account, balances etc.) " )
print ( f " Balance: { App . config [ ' base_asset ' ] } = { str ( App . base_quantity ) } " )
print ( f " Balance: { App . config [ ' quote_asset ' ] } = { str ( App . quote_quantity ) } " )
#
# Register scheduler
#
App . sched = AsyncIOScheduler ( )
# logging.getLogger('apscheduler.executors.default').setLevel(logging.WARNING)
logging . getLogger ( ' apscheduler ' ) . setLevel ( logging . WARNING )
App . sched . add_job (
main_task ,
trigger = ' cron ' ,
# second='*/30',
minute = ' * ' ,
id = ' main_task '
)
App . sched . start ( ) # Start scheduler (essentially, start the thread)
print ( f " Scheduler started. " )
#
# Start event loop
#
try :
App . loop . run_forever ( ) # Blocking. Run until stop() is called
except KeyboardInterrupt :
print ( f " KeyboardInterrupt. " )
finally :
App . loop . close ( )
print ( f " Event loop closed. " )
App . sched . shutdown ( )
print ( f " Scheduler shutdown. " )
return 0
if __name__ == " __main__ " :
start_server ( )