intelligent-trading-bot/common/analyzer.py

272 lines
12 KiB
Python

from datetime import datetime, date, timedelta
import numpy as np
import pandas as pd
from common.utils import *
from common.model_store import *
from common.generators import generate_feature_set
from common.generators import predict_feature_set
import logging
log = logging.getLogger('analyzer')
class Analyzer:
"""
In-memory database which represents the current state of the data context (all trading data) including its history.
"""
def __init__(self, config: dict, model_store: ModelStore):
"""
Create a new operation object using its definition.
:param config: Initialization parameters defining what is in the database including its persistent parameters and schema
:param model_store: Model store object which provides access to (trainable) algorithm parameters as opposed to fixed by-value parameters in the configuration object
"""
self.config = config
self.model_store = model_store
#
# Data shape and parameters
#
# Minimum length of the data frame determined by such criteria as the necessary history to compute new values or how much we want to output
self.min_window_length = self.config["predict_length"] + self.config["features_horizon"]
# After growing larger than maximum the array will be truncated (back to the minimum required size) by removing older records
self.max_window_length = self.min_window_length + 15
# How many tail records store empty/wrong values and must be recomputed.
# Initially it is equal to the data size (all records have to be recomputed). -1 (or too large) means that all records will be re-computed in batch mode (rather than stream mode).
# After each analysis it is set to 0 which means that all features were evaluated and data is up-to-date.
# After appending new records it again set to non-0 value by indicating that the data state needs to be re-evaluated.
self.dirty_records = -1
self.is_train = config.get("train")
if self.is_train:
print(f"WARNING: Train mode is specified although the server is intended for running in predict mode")
#
# Data frame with all the data (source and derived) where rows are appended and their (derived) columns are computed
#
# All explicitly declared in config derived feature columns
train_features = self.config.get("train_features", [])
train_features_dtypes = {k: 'float64' for k in train_features} # Same data type
# All explicitly declared in config label columns if in train mode
labels = self.config.get("labels", [])
labels_dtypes = {k: 'float64' for k in labels} # Same data type
# Combine all raw columns, derived features and (if train mode) label columns
time_column = self.config["time_column"]
freq = self.config["freq"]
all_columns_dtypes = {time_column: 'datetime64[ns, UTC]'} | train_features_dtypes
if self.is_train:
all_columns_dtypes = all_columns_dtypes | labels_dtypes
self.df = pd.DataFrame(columns=all_columns_dtypes).astype(all_columns_dtypes)
self.df = self.df.set_index(time_column, inplace=False, drop=False) # timestamp column in the index and also as a column for convenience
self.df = self.df.asfreq(freq)
# Now the data frame is initialized and regular updates will append rows to it followed by analysis (computation of derived features)
self.previous_df = None # For validation
#
# Data state operations
#
def get_size(self):
return len(self.df)
def get_last_kline(self):
if len(self.df) > 0:
return self.df.iloc[-1]
else:
return None
def get_last_kline_dt(self):
"""Open time of the last kline. It is simultaneously kline id. Add 1m if the end is needed."""
if len(self.df) > 0:
return self.df.index[-1]
else:
# Compute it from the maximum history self.min_window_length
freq = self.config["freq"]
last_kline_dt = get_start_dt_for_interval_count(freq, self.min_window_length)
return last_kline_dt
def get_missing_klines_count(self):
"""
The number of complete discrete intervals between the last available kline and current timestamp.
The interval length is determined by the frequency parameter.
"""
last_kline_dt = self.get_last_kline_dt()
if not last_kline_dt:
return self.min_window_length
freq = self.config["freq"]
intervals_count = get_interval_count_from_start_dt(freq, last_kline_dt)
return intervals_count
def append_data(self, dfs: dict):
"""
Merge individual data frames by creating a common index and append to the main data frame in this class.
The values in the overlapped range (if any) will be overwritten.
"""
#
# Merge multiple dfs in one df by adding columns prefixes and creating a common regular time index
#
# The merge function works with data_source structure so we fill it with data before calling
data_sources = self.config.get("data_sources", [])
if len(dfs) != len(data_sources):
log.warning(f"The number of symbols retrieved {len(dfs)} is not equal to the number of data sources{len(data_sources)}")
for ds in data_sources:
ds_symbol = ds.get("folder")
ds["df"] = dfs.get(ds_symbol)
# Really merge and get one data frame (regular index will be created and column prefixes added)
time_column = self.config["time_column"]
freq = self.config["freq"]
merge_interpolate = self.config.get("merge_interpolate", False)
df = merge_data_sources(data_sources, time_column, freq, merge_interpolate)
# Store part of the previous state for validation etc. purposes before it is overwritten
self.previous_df = self.df.tail(10).copy()
#
# Append new records by overwritting the overlap area
#
initial_df_len = len(self.df)
appended_df_len = len(df)
self.df = append_df_drop_concat(self.df, df)
result_df_len = len(self.df)
overwritten_rows_len = (initial_df_len + appended_df_len) - result_df_len # 0 if exact concatenation, positive if overlap, negative if gap (error)
#
# Compute and set new dirty count
# It is how many records must be (re-)evaluated
# The current (old) number might be more than 0, e.g., if we append several times without evaluation)
#
if self.dirty_records < 0:
pass # No change: still all records have to be re-computed
elif initial_df_len == 0:
self.dirty_records = -1 # All records are new therefore all re-compute
else:
# For example, we already had 10 records dirty and 4 were deleted (overwritten by new records)
# All appended rows have to be re-computed. Plus the non-overwritten rows from the old dirty rows (normally 0)
self.dirty_records = appended_df_len + max(0, self.dirty_records-overwritten_rows_len)
if self.dirty_records >= result_df_len:
self.dirty_records = -1
def analyze(self):
"""
Compute derived columns for dirty records and add them to the data frame:
1. Evaluate derived features
2. Evaluate trained ML features
3. Evaluate signal features
"""
symbol = self.config["symbol"]
if self.dirty_records == 0:
log.warning(f"Analysis function called with 0 dirty records. Exit because there is nothing to do. Normally should not happen.")
return
# It is a parameter passed to generators which indicates the exact (small) number of last rows to re-evaluate to avoid full-evaluation for performance reasons
last_rows = self.dirty_records if self.dirty_records > 0 else 0
last_kline_dt = self.get_last_kline_dt()
last_kline_ts_str = str(pd.to_datetime(last_kline_dt, unit='ms', utc=True))
log.info(f"Analyze {symbol}. Last kline timestamp: {last_kline_ts_str}")
#
# 1. Generate all derived features (NaNs are possible due to limited history)
#
feature_sets = self.config.get("feature_sets", [])
feature_columns = []
for fs in feature_sets:
df, feats = generate_feature_set(self.df, fs, self.config, self.model_store, last_rows=last_rows)
self.df = df
feature_columns.extend(feats)
#
# 2. Apply ML models and generate intelligent indicators
#
train_features = self.config["train_features"]
# Shorten the data frame by selecting last rows for which to do predictions
tail_rows = notnull_tail_rows(self.df[train_features]) # How many last rows have all non-null values
predict_size = tail_rows if not last_rows else min(tail_rows, last_rows)
predict_features_df = self.df.tail(predict_size)
predict_features_df = predict_features_df[train_features]
# Validation
if predict_features_df.isnull().any().any():
null_columns = {k: v for k, v in predict_features_df.isnull().any().to_dict().items() if v}
log.error(f"Null in predict_df found. Columns with Null: {null_columns}")
return
train_feature_sets = self.config.get("train_feature_sets", [])
predict_labels_df = pd.DataFrame(index=predict_features_df.index)
predict_label_columns = []
for fs in train_feature_sets:
fs_df, feats = predict_feature_set(predict_features_df, fs, self.config, self.model_store)
predict_labels_df = pd.concat([predict_labels_df, fs_df], axis=1)
predict_label_columns.extend(feats)
# Attach all predicted label columns (only for last rows) to the main data frame
self.df = self.df.combine_first(predict_labels_df) # Attach new columns
self.df.update(predict_labels_df) # Overwrite older values with newly computed values
#
# 3. Signals
#
signal_sets = self.config.get("signal_sets", [])
signal_columns = []
for fs in signal_sets:
df, feats = generate_feature_set(self.df, fs, self.config, self.model_store, last_rows=last_rows)
self.df = df # TODO: Signal features should be computed in the same way as normal (pre-ML) features
signal_columns.extend(feats)
#
# Append the new rows to the main data frame with all previously computed data
#
# Log signal values
row = self.get_last_kline() # Last row stores the latest values we need
scores = ", ".join([f"{x}={row[x]:+.3f}" if isinstance(row[x], float) else f"{x}={str(row[x])}" for x in signal_columns])
log.info(f"Analyze finished. Close: {int(row['close']):,} Signals: {scores}")
#
# Validation: newly retrieved and computed values should be (almost) equal to those computed previously in the overlap area
#
check_row_count = 3 # These last rows should be correctly computed (particularly, have enough history in case of aggregation)
num_cols = self.previous_df.select_dtypes((float, int)).columns.tolist()
# Loop over several last newly computed data rows
# Skip last row because it should not exist, and before the last row because its kline is frequently updated after retrieval
for r in range(2, min(check_row_count, len(self.df))):
idx = self.df.index[-r-1]
if idx not in self.previous_df.index:
continue
# Compare all numeric values of the previously retrieved and newly retrieved rows for the same time
old_row = self.previous_df[num_cols].loc[idx]
new_row = self.df[num_cols].loc[idx]
comp_idx = np.isclose(old_row, new_row)
if not np.all(comp_idx):
log.warning(f"Newly computed row is not equal to the previously computed row for '{idx}'. NEW: {new_row[~comp_idx].to_dict()}. OLD: {old_row[~comp_idx].to_dict()}")
self.dirty_records = 0 # Everything is computed
# Remove too old rows
if len(self.df) > self.max_window_length:
self.df = self.df.tail(self.max_window_length)
if __name__ == "__main__":
pass