intelligent-trading-bot/service/analyzer.py

362 lines
14 KiB
Python
Raw Permalink Normal View History

2020-02-23 20:45:50 +01:00
from pathlib import Path
from typing import Union
import json
import pickle
2020-12-26 17:56:54 +01:00
from datetime import datetime, date, timedelta
2020-02-23 20:45:50 +01:00
import queue
import numpy as np
import pandas as pd
2021-09-04 18:19:04 +02:00
from service.App import *
2021-09-04 21:28:45 +02:00
from common.utils import *
from common.classifiers import *
2020-12-13 19:30:06 +01:00
from common.feature_generation import *
from common.signal_generation import *
2020-02-23 20:45:50 +01:00
import logging
2021-09-04 21:28:45 +02:00
log = logging.getLogger('analyzer')
2020-02-23 20:45:50 +01:00
2021-08-29 19:04:58 +02:00
class Analyzer:
2020-02-23 20:45:50 +01:00
"""
In-memory database which represents the current state of the (trading) environment including its history.
Properties of klines:
- "timestamp" is a left border of the interval like "2017-08-17 04:00:00"
- "close_time" is a right border of the interval in ms (last millisecond) like "1502942459999" equivalent to "2017-08-17 04:00::59.999"
"""
def __init__(self, config):
"""
Create a new operation object using its definition.
:param config: Initialization parameters defining what is in the database including its persistent parameters and schema
"""
self.config = config
#
# Data state
#
# Klines are stored as a dict of lists. Key is a symbol and the list is a list of latest kline records
# One kline record is a list of values (not dict) as returned by API: open time, open, high, low, close, volume etc.
self.klines = {}
2020-02-23 20:45:50 +01:00
self.queue = queue.Queue()
#
# Load models
#
model_path = App.config["model_folder"]
model_path = Path(model_path)
if not model_path.is_absolute():
model_path = PACKAGE_ROOT / model_path
model_path = model_path.resolve()
labels = App.config["labels"]
feature_sets = ["kline"]
algorithms = ["gb", "nn", "lc"]
self.models = load_models(model_path, labels, feature_sets, algorithms)
2020-12-26 17:56:54 +01:00
2020-02-23 20:45:50 +01:00
#
# Start a thread for storing data
#
#
# Data state operations
#
def get_klines_count(self, symbol):
return len(self.klines.get(symbol, []))
def get_last_kline(self, symbol):
if self.get_klines_count(symbol) > 0:
return self.klines.get(symbol)[-1]
2020-02-23 20:45:50 +01:00
else:
return None
def get_last_kline_ts(self, symbol):
2021-03-07 10:44:32 +01:00
"""Open time of the last kline. It is simultaneously kline id. Add 1m if the end is needed."""
last_kline = self.get_last_kline(symbol=symbol)
if not last_kline:
return 0
last_kline_ts = last_kline[0]
return last_kline_ts
2020-02-23 20:45:50 +01:00
def get_missing_klines_count(self, symbol):
now_ts = now_timestamp()
last_kline_ts = self.get_last_kline_ts(symbol)
if not last_kline_ts:
return App.config["signaler"]["analysis"]["features_horizon"]
end_of_last_kline = last_kline_ts + 60_000 # Plus 1m
minutes = (now_ts - end_of_last_kline) / 60_000
minutes += 2
return int(minutes)
2020-02-23 20:45:50 +01:00
def store_klines(self, data: dict):
"""
Store latest klines for the specified symbols.
2021-03-07 10:44:32 +01:00
Existing klines for the symbol and timestamp will be overwritten.
2020-02-23 20:45:50 +01:00
:param data: Dict of lists with symbol as a key, and list of klines for this symbol as a value.
Example: { 'BTCUSDT': [ [], [], [] ] }
:type dict:
"""
now_ts = now_timestamp()
2020-02-23 20:45:50 +01:00
for symbol, klines in data.items():
# If symbol does not exist then create
klines_data = self.klines.get(symbol)
if klines_data is None:
self.klines[symbol] = []
klines_data = self.klines.get(symbol)
ts = klines[0][0] # Very first timestamp of the new data
# Find kline with this or younger timestamp in the database
# same_kline = next((x for x in klines_data if x[0] == ts), None)
existing_indexes = [i for i, x in enumerate(klines_data) if x[0] >= ts]
#print(f"===>>> Existing tss: {[x[0] for x in klines_data]}")
#print(f"===>>> New tss: {[x[0] for x in klines]}")
#print(f"===>>> {symbol} Overlap {len(existing_indexes)}. Existing Indexes: {existing_indexes}")
if existing_indexes: # If there is overlap with new klines
start = min(existing_indexes)
num_deleted = len(klines_data) - start
del klines_data[start:] # Delete starting from the first kline in new data (which will be added below)
if len(klines) < num_deleted: # It is expected that we add same or more klines than deleted
log.error("More klines is deleted by new klines added, than we actually add. Something woring with timestamps and storage logic.")
# Append new klines
klines_data.extend(klines)
# Remove too old klines
kline_window = App.config["signaler"]["analysis"]["features_horizon"]
to_delete = len(klines_data) - kline_window
if to_delete > 0:
del klines_data[:to_delete]
# Check validity. It has to be an ordered time series with certain frequency
2020-02-23 20:45:50 +01:00
for i, kline in enumerate(self.klines.get(symbol)):
ts = kline[0]
if i > 0:
if ts - prev_ts != 60_000:
log.error("Wrong sequence of klines. They are expected to be a regular time series with 1m frequency.")
prev_ts = kline[0]
# Debug message about the last received kline end and current ts (which must be less than 1m - rather small delay)
log.debug(f"Stored klines. Total {len(klines_data)} in db. Last kline end: {self.get_last_kline_ts(symbol)+60_000}. Current time: {now_ts}")
2020-02-23 20:45:50 +01:00
def store_depth(self, depths: list, freq):
"""
Persistently store order books from the input list. Each entry is one response from order book request for one symbol.
Currently the order books are directly stored in a file (for this symbol) and not in this object.
:param depths: List of dicts where each dict is an order book with such fields as 'asks', 'bids' and 'symbol' (symbol is added after loading).
:type list:
"""
# File name like TRADE_HOME/COLLECT/DEPTH/depth-BTCUSDT-5s.txt
TRADE_DATA = "." # TODO: We need to read it from the environment. It could be data dir or docker volume.
# BASE_DIR = Path(__file__).resolve().parent.parent
# BASE_DIR = Path.cwd()
for depth in depths:
# TODO: The result might be an exception or some other object denoting bad return (timeout, cancelled etc.)
symbol = depth["symbol"]
2020-12-13 14:22:13 +01:00
path = Path(TRADE_DATA).joinpath(App.config["collector"]["folder"])
path = path.joinpath(App.config["collector"]["depth"]["folder"])
2020-02-23 20:45:50 +01:00
path.mkdir(parents=True, exist_ok=True) # Ensure that dir exists
file_name = f"depth-{symbol}-{freq}"
file = Path(path, file_name).with_suffix(".txt")
# Append to the file (create if it does not exist)
json_line = json.dumps(depth)
with open(file, 'a+') as f:
f.write(json_line + "\n")
def store_queue(self):
"""
Persistently store the queue data to one or more files corresponding to the stream (event) type, symbol (and frequency).
:return:
"""
#
# Get all the data from the queue
#
events = {}
item = None
while True:
try:
item = self.queue.get_nowait()
except queue.Empty as ee:
break
except:
break
if item is None:
break
c = item.get("e") # Channel
if not events.get(c): # Insert if does not exit
events[c] = {}
symbols = events[c]
s = item.get("s") # Symbol
if not symbols.get(s): # Insert if does not exit
symbols[s] = []
data = symbols[s]
data.append(item)
self.queue.task_done() # TODO: Do we really need this?
# File name like TRADE_HOME/COLLECT/DEPTH/depth-BTCUSDT-5s.txt
TRADE_DATA = "." # TODO: We need to read it from the environment. It could be data dir or docker volume.
# BASE_DIR = Path(__file__).resolve().parent.parent
# BASE_DIR = Path.cwd()
2020-12-13 14:22:13 +01:00
path = Path(TRADE_DATA).joinpath(App.config["collector"]["folder"])
path = path.joinpath(App.config["collector"]["stream"]["folder"])
2020-02-23 20:45:50 +01:00
path.mkdir(parents=True, exist_ok=True) # Ensure that dir exists
now = datetime.utcnow()
2020-02-25 21:26:05 +01:00
#rotate_suffix = f"{now:%Y}{now:%m}{now:%d}" # Daily files
rotate_suffix = f"{now:%Y}{now:%m}" # Monthly files
2020-02-23 20:45:50 +01:00
#
# Get all the data from the queue and store in file
#
for c, symbols in events.items():
for s, data in symbols.items():
file_name = f"{c}-{s}-{rotate_suffix}"
file = Path(path, file_name).with_suffix(".txt")
# Append to the file (create if it does not exist)
data = [json.dumps(event) for event in data]
data_str = "\n".join(data)
with open(file, 'a+') as f:
f.write(data_str + "\n")
#
# Analysis (features, predictions, signals etc.)
#
2021-08-29 19:04:58 +02:00
def analyze(self):
2020-02-23 20:45:50 +01:00
"""
1. Convert klines to df
2. Derive (compute) features (use same function as for model training)
3. Derive (predict) labels by applying models trained for each label
4. Generate buy/sell signals by applying rule models trained for best overall trade performance
2020-02-23 20:45:50 +01:00
"""
2021-08-29 19:04:58 +02:00
symbol = App.config["symbol"]
2020-02-23 20:45:50 +01:00
klines = self.klines.get(symbol)
last_kline_ts = self.get_last_kline_ts(symbol)
2020-02-23 20:45:50 +01:00
log.info(f"Analyze {symbol}. {len(klines)} klines in the database. Last kline timestamp: {last_kline_ts}")
2020-02-23 20:45:50 +01:00
#
# 1.
# Produce a data frame with înput data
2020-02-23 20:45:50 +01:00
#
try:
df = klines_to_df(klines)
except Exception as e:
print(f"Error in klines_to_df: {e}")
return
2020-02-23 20:45:50 +01:00
#
# 2.
# Generate all necessary derived features (NaNs are possible due to short history)
2020-02-23 20:45:50 +01:00
#
try:
features_out = generate_features(
df, use_differences=False,
base_window=App.config["base_window_kline"], windows=App.config["windows_kline"],
area_windows=App.config["area_windows_kline"]
)
except Exception as e:
print(f"Error in generate_features: {e}")
return
2020-02-23 20:45:50 +01:00
# Now we have as many additional columns as we have defined derived features
#
# 3.
2020-12-14 13:10:21 +01:00
# Generate scores using existing models (trained in advance using latest history by a separate script)
2020-02-23 20:45:50 +01:00
#
# kline feature set
features = App.config["features_kline"]
predict_df = df[features]
# Do not drop nans because they will be processed by predictor
# Do prediction by applying models to the data
score_df = pd.DataFrame(index=predict_df.index)
try:
for score_column_name, model_pair in self.models.items():
if score_column_name.endswith("_gb"):
df_y_hat = predict_gb(model_pair, predict_df)
elif score_column_name.endswith("_nn"):
df_y_hat = predict_nn(model_pair, predict_df)
elif score_column_name.endswith("_lc"):
df_y_hat = predict_lc(model_pair, predict_df)
else:
raise ValueError(f"Unknown column name algorithm suffix {score_column_name[-3:]}. Currently only '_gb', '_nn', '_lc' are supported.")
score_df[score_column_name] = df_y_hat
except Exception as e:
print(f"Error in predict: {e}")
return
# Now we have all predictions (score columns) needed to make a buy/sell decision - many predictions for each true label column
# We will need only the latest row for signal generation
2020-02-23 20:45:50 +01:00
#
# 4.
2020-12-14 13:10:21 +01:00
# Generate buy/sell signals using rules and thresholds
2020-02-23 20:45:50 +01:00
#
2020-12-26 17:56:54 +01:00
all_scores = self.models.keys()
high_scores = [col for col in all_scores if "high_" in col] # 3 algos x 3 thresholds x 1 k = 9
low_scores = [col for col in all_scores if "low_" in col] # 3 algos x 3 thresholds x 1 k = 9
# Compute final score column
score_df["high"] = score_df[high_scores].mean(axis=1)
score_df["low"] = score_df[low_scores].mean(axis=1)
high_and_low = score_df["high"] + score_df["low"]
score_df["score"] = ((score_df["high"] / high_and_low) * 2) - 1.0 # in [-1, +1]
score = score_df.iloc[-1].score
row = df.iloc[-1]
close_price = row.close
high_price = row.high
low_price = row.high
timestamp = row.name
2020-12-26 17:56:54 +01:00
close_time = row.name+timedelta(minutes=1) # row.close_time
# Thresholds
2021-01-01 13:38:07 +01:00
model = App.config["signaler"]["model"]
2020-12-26 17:56:54 +01:00
signal = dict(side=None, score=score, close_price=close_price, close_time=close_time)
if not score:
signal = dict()
elif score > model.get("buy_threshold"):
signal["side"] = "BUY"
elif score < model.get("sell_threshold"):
signal["side"] = "SELL"
else:
signal["side"] = ""
App.signal = signal
log.info(f"Analyze finished. Score: {score:+.2f}. Signal: {signal['side']}. Price: {int(close_price):,}")
2020-02-23 20:45:50 +01:00
if __name__ == "__main__":
pass