mirror of
https://github.com/asavinov/intelligent-trading-bot.git
synced 2026-05-04 08:26:19 +00:00
272 lines
12 KiB
Python
272 lines
12 KiB
Python
from datetime import datetime, date, timedelta
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
|
|
from common.utils import *
|
|
from common.model_store import *
|
|
from common.generators import generate_feature_set
|
|
from common.generators import predict_feature_set
|
|
|
|
import logging
|
|
log = logging.getLogger('analyzer')
|
|
|
|
|
|
class Analyzer:
|
|
"""
|
|
In-memory database which represents the current state of the data context (all trading data) including its history.
|
|
"""
|
|
|
|
def __init__(self, config: dict, model_store: ModelStore):
|
|
"""
|
|
Create a new operation object using its definition.
|
|
|
|
:param config: Initialization parameters defining what is in the database including its persistent parameters and schema
|
|
:param model_store: Model store object which provides access to (trainable) algorithm parameters as opposed to fixed by-value parameters in the configuration object
|
|
"""
|
|
self.config = config
|
|
self.model_store = model_store
|
|
|
|
#
|
|
# Data shape and parameters
|
|
#
|
|
|
|
# Minimum length of the data frame determined by such criteria as the necessary history to compute new values or how much we want to output
|
|
self.min_window_length = self.config["predict_length"] + self.config["features_horizon"]
|
|
# After growing larger than maximum the array will be truncated (back to the minimum required size) by removing older records
|
|
self.max_window_length = self.min_window_length + 15
|
|
|
|
# How many tail records store empty/wrong values and must be recomputed.
|
|
# Initially it is equal to the data size (all records have to be recomputed). -1 (or too large) means that all records will be re-computed in batch mode (rather than stream mode).
|
|
# After each analysis it is set to 0 which means that all features were evaluated and data is up-to-date.
|
|
# After appending new records it again set to non-0 value by indicating that the data state needs to be re-evaluated.
|
|
self.dirty_records = -1
|
|
|
|
self.is_train = config.get("train")
|
|
if self.is_train:
|
|
print(f"WARNING: Train mode is specified although the server is intended for running in predict mode")
|
|
|
|
#
|
|
# Data frame with all the data (source and derived) where rows are appended and their (derived) columns are computed
|
|
#
|
|
|
|
# All explicitly declared in config derived feature columns
|
|
train_features = self.config.get("train_features", [])
|
|
train_features_dtypes = {k: 'float64' for k in train_features} # Same data type
|
|
|
|
# All explicitly declared in config label columns if in train mode
|
|
labels = self.config.get("labels", [])
|
|
labels_dtypes = {k: 'float64' for k in labels} # Same data type
|
|
|
|
# Combine all raw columns, derived features and (if train mode) label columns
|
|
time_column = self.config["time_column"]
|
|
freq = self.config["freq"]
|
|
all_columns_dtypes = {time_column: 'datetime64[ns, UTC]'} | train_features_dtypes
|
|
if self.is_train:
|
|
all_columns_dtypes = all_columns_dtypes | labels_dtypes
|
|
|
|
self.df = pd.DataFrame(columns=all_columns_dtypes).astype(all_columns_dtypes)
|
|
self.df = self.df.set_index(time_column, inplace=False, drop=False) # timestamp column in the index and also as a column for convenience
|
|
self.df = self.df.asfreq(freq)
|
|
# Now the data frame is initialized and regular updates will append rows to it followed by analysis (computation of derived features)
|
|
|
|
self.previous_df = None # For validation
|
|
|
|
#
|
|
# Data state operations
|
|
#
|
|
|
|
def get_size(self):
|
|
return len(self.df)
|
|
|
|
def get_last_kline(self):
|
|
if len(self.df) > 0:
|
|
return self.df.iloc[-1]
|
|
else:
|
|
return None
|
|
|
|
def get_last_kline_dt(self):
|
|
"""Open time of the last kline. It is simultaneously kline id. Add 1m if the end is needed."""
|
|
if len(self.df) > 0:
|
|
return self.df.index[-1]
|
|
else:
|
|
# Compute it from the maximum history self.min_window_length
|
|
freq = self.config["freq"]
|
|
last_kline_dt = get_start_dt_for_interval_count(freq, self.min_window_length)
|
|
return last_kline_dt
|
|
|
|
def get_missing_klines_count(self):
|
|
"""
|
|
The number of complete discrete intervals between the last available kline and current timestamp.
|
|
The interval length is determined by the frequency parameter.
|
|
"""
|
|
last_kline_dt = self.get_last_kline_dt()
|
|
if not last_kline_dt:
|
|
return self.min_window_length
|
|
|
|
freq = self.config["freq"]
|
|
intervals_count = get_interval_count_from_start_dt(freq, last_kline_dt)
|
|
return intervals_count
|
|
|
|
def append_data(self, dfs: dict):
|
|
"""
|
|
Merge individual data frames by creating a common index and append to the main data frame in this class.
|
|
The values in the overlapped range (if any) will be overwritten.
|
|
"""
|
|
#
|
|
# Merge multiple dfs in one df by adding columns prefixes and creating a common regular time index
|
|
#
|
|
|
|
# The merge function works with data_source structure so we fill it with data before calling
|
|
data_sources = self.config.get("data_sources", [])
|
|
if len(dfs) != len(data_sources):
|
|
log.warning(f"The number of symbols retrieved {len(dfs)} is not equal to the number of data sources{len(data_sources)}")
|
|
for ds in data_sources:
|
|
ds_symbol = ds.get("folder")
|
|
ds["df"] = dfs.get(ds_symbol)
|
|
|
|
# Really merge and get one data frame (regular index will be created and column prefixes added)
|
|
time_column = self.config["time_column"]
|
|
freq = self.config["freq"]
|
|
merge_interpolate = self.config.get("merge_interpolate", False)
|
|
df = merge_data_sources(data_sources, time_column, freq, merge_interpolate)
|
|
|
|
# Store part of the previous state for validation etc. purposes before it is overwritten
|
|
self.previous_df = self.df.tail(10).copy()
|
|
|
|
#
|
|
# Append new records by overwritting the overlap area
|
|
#
|
|
initial_df_len = len(self.df)
|
|
appended_df_len = len(df)
|
|
self.df = append_df_drop_concat(self.df, df)
|
|
result_df_len = len(self.df)
|
|
overwritten_rows_len = (initial_df_len + appended_df_len) - result_df_len # 0 if exact concatenation, positive if overlap, negative if gap (error)
|
|
|
|
#
|
|
# Compute and set new dirty count
|
|
# It is how many records must be (re-)evaluated
|
|
# The current (old) number might be more than 0, e.g., if we append several times without evaluation)
|
|
#
|
|
if self.dirty_records < 0:
|
|
pass # No change: still all records have to be re-computed
|
|
elif initial_df_len == 0:
|
|
self.dirty_records = -1 # All records are new therefore all re-compute
|
|
else:
|
|
# For example, we already had 10 records dirty and 4 were deleted (overwritten by new records)
|
|
# All appended rows have to be re-computed. Plus the non-overwritten rows from the old dirty rows (normally 0)
|
|
self.dirty_records = appended_df_len + max(0, self.dirty_records-overwritten_rows_len)
|
|
if self.dirty_records >= result_df_len:
|
|
self.dirty_records = -1
|
|
|
|
def analyze(self):
|
|
"""
|
|
Compute derived columns for dirty records and add them to the data frame:
|
|
1. Evaluate derived features
|
|
2. Evaluate trained ML features
|
|
3. Evaluate signal features
|
|
"""
|
|
symbol = self.config["symbol"]
|
|
|
|
if self.dirty_records == 0:
|
|
log.warning(f"Analysis function called with 0 dirty records. Exit because there is nothing to do. Normally should not happen.")
|
|
return
|
|
|
|
# It is a parameter passed to generators which indicates the exact (small) number of last rows to re-evaluate to avoid full-evaluation for performance reasons
|
|
last_rows = self.dirty_records if self.dirty_records > 0 else 0
|
|
|
|
last_kline_dt = self.get_last_kline_dt()
|
|
last_kline_ts_str = str(pd.to_datetime(last_kline_dt, unit='ms', utc=True))
|
|
|
|
log.info(f"Analyze {symbol}. Last kline timestamp: {last_kline_ts_str}")
|
|
|
|
#
|
|
# 1. Generate all derived features (NaNs are possible due to limited history)
|
|
#
|
|
|
|
feature_sets = self.config.get("feature_sets", [])
|
|
feature_columns = []
|
|
for fs in feature_sets:
|
|
df, feats = generate_feature_set(self.df, fs, self.config, self.model_store, last_rows=last_rows)
|
|
self.df = df
|
|
feature_columns.extend(feats)
|
|
|
|
#
|
|
# 2. Apply ML models and generate intelligent indicators
|
|
#
|
|
|
|
train_features = self.config["train_features"]
|
|
|
|
# Shorten the data frame by selecting last rows for which to do predictions
|
|
tail_rows = notnull_tail_rows(self.df[train_features]) # How many last rows have all non-null values
|
|
predict_size = tail_rows if not last_rows else min(tail_rows, last_rows)
|
|
predict_features_df = self.df.tail(predict_size)
|
|
|
|
predict_features_df = predict_features_df[train_features]
|
|
|
|
# Validation
|
|
if predict_features_df.isnull().any().any():
|
|
null_columns = {k: v for k, v in predict_features_df.isnull().any().to_dict().items() if v}
|
|
log.error(f"Null in predict_df found. Columns with Null: {null_columns}")
|
|
return
|
|
|
|
train_feature_sets = self.config.get("train_feature_sets", [])
|
|
predict_labels_df = pd.DataFrame(index=predict_features_df.index)
|
|
predict_label_columns = []
|
|
for fs in train_feature_sets:
|
|
fs_df, feats = predict_feature_set(predict_features_df, fs, self.config, self.model_store)
|
|
predict_labels_df = pd.concat([predict_labels_df, fs_df], axis=1)
|
|
predict_label_columns.extend(feats)
|
|
|
|
# Attach all predicted label columns (only for last rows) to the main data frame
|
|
self.df = self.df.combine_first(predict_labels_df) # Attach new columns
|
|
self.df.update(predict_labels_df) # Overwrite older values with newly computed values
|
|
|
|
#
|
|
# 3. Signals
|
|
#
|
|
|
|
signal_sets = self.config.get("signal_sets", [])
|
|
signal_columns = []
|
|
for fs in signal_sets:
|
|
df, feats = generate_feature_set(self.df, fs, self.config, self.model_store, last_rows=last_rows)
|
|
self.df = df # TODO: Signal features should be computed in the same way as normal (pre-ML) features
|
|
signal_columns.extend(feats)
|
|
|
|
#
|
|
# Append the new rows to the main data frame with all previously computed data
|
|
#
|
|
|
|
# Log signal values
|
|
row = self.get_last_kline() # Last row stores the latest values we need
|
|
scores = ", ".join([f"{x}={row[x]:+.3f}" if isinstance(row[x], float) else f"{x}={str(row[x])}" for x in signal_columns])
|
|
log.info(f"Analyze finished. Close: {int(row['close']):,} Signals: {scores}")
|
|
|
|
#
|
|
# Validation: newly retrieved and computed values should be (almost) equal to those computed previously in the overlap area
|
|
#
|
|
check_row_count = 3 # These last rows should be correctly computed (particularly, have enough history in case of aggregation)
|
|
num_cols = self.previous_df.select_dtypes((float, int)).columns.tolist()
|
|
# Loop over several last newly computed data rows
|
|
# Skip last row because it should not exist, and before the last row because its kline is frequently updated after retrieval
|
|
for r in range(2, min(check_row_count, len(self.df))):
|
|
idx = self.df.index[-r-1]
|
|
|
|
if idx not in self.previous_df.index:
|
|
continue
|
|
|
|
# Compare all numeric values of the previously retrieved and newly retrieved rows for the same time
|
|
old_row = self.previous_df[num_cols].loc[idx]
|
|
new_row = self.df[num_cols].loc[idx]
|
|
comp_idx = np.isclose(old_row, new_row)
|
|
if not np.all(comp_idx):
|
|
log.warning(f"Newly computed row is not equal to the previously computed row for '{idx}'. NEW: {new_row[~comp_idx].to_dict()}. OLD: {old_row[~comp_idx].to_dict()}")
|
|
|
|
self.dirty_records = 0 # Everything is computed
|
|
|
|
# Remove too old rows
|
|
if len(self.df) > self.max_window_length:
|
|
self.df = self.df.tail(self.max_window_length)
|
|
|
|
if __name__ == "__main__":
|
|
pass
|