intelligent-trading-bot/service/server.py

193 lines
5.7 KiB
Python
Raw Permalink Normal View History

2022-03-20 10:09:33 +01:00
from datetime import datetime
from decimal import *
import asyncio
2024-12-15 15:06:03 +01:00
import click
2022-03-20 10:09:33 +01:00
from apscheduler.schedulers.asyncio import AsyncIOScheduler
2024-12-15 15:06:03 +01:00
from binance import Client
2024-06-22 09:58:08 +02:00
2022-03-20 10:09:33 +01:00
from service.App import *
from common.utils import *
from common.generators import output_feature_set
2022-03-20 10:09:33 +01:00
from service.analyzer import *
from inputs.collector_binance import main_collector_task, data_provider_health_check, sync_data_collector_task
from outputs.notifier_trades import *
from outputs.notifier_scores import *
from outputs.notifier_diagram import *
from outputs.trader_binance import trader_binance, update_trade_status
2022-03-20 10:09:33 +01:00
import logging
log = logging.getLogger('server')
2024-12-15 15:25:11 +01:00
logging.basicConfig(
filename="server.log",
level=logging.DEBUG,
#format = "%(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s",
format = "%(asctime)s %(levelname)s %(message)s",
#datefmt = '%Y-%m-%d %H:%M:%S',
)
2022-03-20 10:09:33 +01:00
#
# Main procedure
#
async def main_task():
"""This task will be executed regularly according to the schedule"""
2025-02-12 20:52:50 +01:00
#
# 1. Execute input adapters to receive new data from data source(s)
#
2024-12-15 15:06:03 +01:00
try:
res = await main_collector_task()
except Exception as e:
log.error(f"Error in main_collector_task function: {e}")
return
2022-03-20 10:09:33 +01:00
if res:
2024-12-15 15:06:03 +01:00
log.error(f"Error in main_collector_task function: {res}")
2022-03-20 10:09:33 +01:00
return res
# TODO: Validation
#last_kline_ts = App.analyzer.get_last_kline_ts(symbol)
#if last_kline_ts + 60_000 != startTime:
# log.error(f"Problem during analysis. Last kline end ts {last_kline_ts + 60_000} not equal to start of current interval {startTime}.")
2025-02-12 20:52:50 +01:00
#
# 2. Apply transformations (merge, features, prediction scores, signals) and generate new data columns
#
2022-03-20 10:09:33 +01:00
try:
analyze_task = await App.loop.run_in_executor(None, App.analyzer.analyze)
except Exception as e:
2024-12-15 15:06:03 +01:00
log.error(f"Error in analyze function: {e}")
2022-03-20 10:09:33 +01:00
return
2025-02-12 20:52:50 +01:00
#
# 3. Execute output adapter which send the results of analysis to consumers
#
# Execute all output set entries
output_sets = App.config.get("output_sets", [])
for os in output_sets:
2024-12-15 15:06:03 +01:00
try:
await output_feature_set(App.df, os, App.config)
2024-12-15 15:06:03 +01:00
except Exception as e:
log.error(f"Error in output function: {e}")
2024-12-15 15:06:03 +01:00
return
2023-09-26 21:21:45 +02:00
2022-03-20 10:09:33 +01:00
return
@click.command()
@click.option('--config_file', '-c', type=click.Path(), default='', help='Configuration file name')
def start_server(config_file):
load_config(config_file)
symbol = App.config["symbol"]
freq = App.config["freq"]
2022-03-20 10:09:33 +01:00
2024-12-15 15:06:03 +01:00
log.info(f"Initializing server. Trade pair: {symbol}. ")
2022-03-20 10:09:33 +01:00
#getcontext().prec = 8
#
# Validation
#
#
# Connect to the server and update/initialize the system state
#
App.client = Client(api_key=App.config["api_key"], api_secret=App.config["api_secret"])
App.analyzer = Analyzer(App.config)
App.loop = asyncio.get_event_loop()
# Do one time server check and state update
try:
App.loop.run_until_complete(data_provider_health_check())
except Exception as e:
2024-12-15 15:06:03 +01:00
log.error(f"Problems during health check (connectivity, server etc.) {e}")
2022-03-20 10:09:33 +01:00
if data_provider_problems_exist():
2024-12-15 15:06:03 +01:00
log.error(f"Problems during health check (connectivity, server etc.)")
2022-03-20 10:09:33 +01:00
return
2024-12-15 15:06:03 +01:00
log.info(f"Finished health check (connection, server status etc.)")
2022-03-20 10:09:33 +01:00
2023-12-25 17:14:37 +01:00
# Cold start: load initial data, do complete analysis
2022-03-20 10:09:33 +01:00
try:
App.loop.run_until_complete(sync_data_collector_task())
2023-12-25 17:14:37 +01:00
# First call may take some time because of big initial size and hence we make the second call to get the (possible) newest klines
2022-03-25 22:49:33 +01:00
App.loop.run_until_complete(sync_data_collector_task())
2023-12-25 17:14:37 +01:00
# Analyze all received data (and not only last few rows) so that we have full history
App.analyzer.analyze(ignore_last_rows=True)
2022-03-20 10:09:33 +01:00
except Exception as e:
2024-12-15 15:06:03 +01:00
log.error(f"Problems during initial data collection. {e}")
2022-03-20 10:09:33 +01:00
if data_provider_problems_exist():
2024-12-15 15:06:03 +01:00
log.error(f"Problems during initial data collection.")
2022-03-20 10:09:33 +01:00
return
2024-12-15 15:06:03 +01:00
log.info(f"Finished initial data collection.")
2022-03-20 10:09:33 +01:00
# TODO: Only for binance output and if it has been defined
2024-06-22 09:58:08 +02:00
# Initialize trade status (account, balances, orders etc.) in case we are going to really execute orders
if App.config.get("trade_model", {}).get("trader_binance"):
2022-03-20 10:09:33 +01:00
try:
App.loop.run_until_complete(update_trade_status())
except Exception as e:
2024-12-15 15:06:03 +01:00
log.error(f"Problems trade status sync. {e}")
2022-03-20 10:09:33 +01:00
if data_provider_problems_exist():
2024-12-15 15:06:03 +01:00
log.error(f"Problems trade status sync.")
2022-03-20 10:09:33 +01:00
return
2024-12-15 15:06:03 +01:00
log.info(f"Finished trade status sync (account, balances etc.)")
log.info(f"Balance: {App.config['base_asset']} = {str(App.base_quantity)}")
log.info(f"Balance: {App.config['quote_asset']} = {str(App.quote_quantity)}")
2022-03-20 10:09:33 +01:00
#
# Register scheduler
#
App.sched = AsyncIOScheduler()
# logging.getLogger('apscheduler.executors.default').setLevel(logging.WARNING)
logging.getLogger('apscheduler').setLevel(logging.WARNING)
trigger = freq_to_CronTrigger(freq)
2022-03-20 10:09:33 +01:00
App.sched.add_job(
main_task,
trigger=trigger,
2022-03-20 10:09:33 +01:00
id='main_task'
)
App.sched.start() # Start scheduler (essentially, start the thread)
2024-12-15 15:06:03 +01:00
log.info(f"Scheduler started.")
2022-03-20 10:09:33 +01:00
#
# Start event loop
#
try:
App.loop.run_forever() # Blocking. Run until stop() is called
except KeyboardInterrupt:
2024-12-15 15:06:03 +01:00
log.info(f"KeyboardInterrupt.")
2022-03-20 10:09:33 +01:00
finally:
App.loop.close()
2024-12-15 15:06:03 +01:00
log.info(f"Event loop closed.")
2022-03-20 10:09:33 +01:00
App.sched.shutdown()
2024-12-15 15:06:03 +01:00
log.info(f"Scheduler shutdown.")
2022-03-20 10:09:33 +01:00
return 0
if __name__ == "__main__":
start_server()