2022-03-20 10:09:33 +01:00
from datetime import datetime , date , timedelta
import numpy as np
import pandas as pd
from common . utils import *
2022-04-17 11:34:34 +02:00
from common . model_store import *
2024-03-24 13:22:52 +01:00
from common . generators import generate_feature_set
from common . generators import predict_feature_set
2022-04-23 09:18:45 +02:00
2022-03-20 10:09:33 +01:00
import logging
log = logging . getLogger ( ' analyzer ' )
class Analyzer :
"""
2026-01-25 17:11:28 +01:00
In - memory database which represents the current state of the data context ( all trading data ) including its history .
2022-03-20 10:09:33 +01:00
"""
2025-06-15 12:17:00 +02:00
def __init__ ( self , config : dict , model_store : ModelStore ) :
2022-03-20 10:09:33 +01:00
"""
Create a new operation object using its definition .
: param config : Initialization parameters defining what is in the database including its persistent parameters and schema
2026-01-25 20:22:12 +01:00
: param model_store : Model store object which provides access to ( trainable ) algorithm parameters as opposed to fixed by - value parameters in the configuration object
2022-03-20 10:09:33 +01:00
"""
self . config = config
2025-06-15 12:17:00 +02:00
self . model_store = model_store
2022-03-20 10:09:33 +01:00
#
2025-11-02 12:51:54 +01:00
# Data shape and parameters
2025-07-28 19:05:49 +02:00
#
2022-03-20 10:09:33 +01:00
2025-07-28 19:05:49 +02:00
# Minimum length of the data frame determined by such criteria as the necessary history to compute new values or how much we want to output
self . min_window_length = self . config [ " predict_length " ] + self . config [ " features_horizon " ]
# After growing larger than maximum the array will be truncated (back to the minimum required size) by removing older records
self . max_window_length = self . min_window_length + 15
2025-11-02 12:51:54 +01:00
# How many tail records store empty/wrong values and must be recomputed.
# Initially it is equal to the data size (all records have to be recomputed). -1 (or too large) means that all records will be re-computed in batch mode (rather than stream mode).
# After each analysis it is set to 0 which means that all features were evaluated and data is up-to-date.
# After appending new records it again set to non-0 value by indicating that the data state needs to be re-evaluated.
2025-07-28 19:05:49 +02:00
self . dirty_records = - 1
2025-11-02 12:51:54 +01:00
self . is_train = config . get ( " train " )
if self . is_train :
print ( f " WARNING: Train mode is specified although the server is intended for running in predict mode " )
#
# Data frame with all the data (source and derived) where rows are appended and their (derived) columns are computed
#
# All explicitly declared in config derived feature columns
train_features = self . config . get ( " train_features " , [ ] )
train_features_dtypes = { k : ' float64 ' for k in train_features } # Same data type
# All explicitly declared in config label columns if in train mode
labels = self . config . get ( " labels " , [ ] )
labels_dtypes = { k : ' float64 ' for k in labels } # Same data type
# Combine all raw columns, derived features and (if train mode) label columns
2026-01-25 20:22:12 +01:00
time_column = self . config [ " time_column " ]
freq = self . config [ " freq " ]
all_columns_dtypes = { time_column : ' datetime64[ns, UTC] ' } | train_features_dtypes
2025-11-02 12:51:54 +01:00
if self . is_train :
all_columns_dtypes = all_columns_dtypes | labels_dtypes
self . df = pd . DataFrame ( columns = all_columns_dtypes ) . astype ( all_columns_dtypes )
2026-01-25 20:22:12 +01:00
self . df = self . df . set_index ( time_column , inplace = False , drop = False ) # timestamp column in the index and also as a column for convenience
self . df = self . df . asfreq ( freq )
2025-11-02 12:51:54 +01:00
# Now the data frame is initialized and regular updates will append rows to it followed by analysis (computation of derived features)
self . previous_df = None # For validation
2022-03-20 10:09:33 +01:00
#
# Data state operations
#
2025-11-02 12:51:54 +01:00
def get_size ( self ) :
return len ( self . df )
2022-03-20 10:09:33 +01:00
2025-11-02 12:51:54 +01:00
def get_last_kline ( self ) :
if len ( self . df ) > 0 :
return self . df . iloc [ - 1 ]
2022-03-20 10:09:33 +01:00
else :
return None
2025-11-02 12:51:54 +01:00
def get_last_kline_dt ( self ) :
2022-03-20 10:09:33 +01:00
""" Open time of the last kline. It is simultaneously kline id. Add 1m if the end is needed. """
2025-11-02 12:51:54 +01:00
if len ( self . df ) > 0 :
return self . df . index [ - 1 ]
else :
2026-02-05 13:17:14 +01:00
# Compute it from the maximum history self.min_window_length
freq = self . config [ " freq " ]
2026-02-05 15:31:44 +01:00
last_kline_dt = get_start_dt_for_interval_count ( freq , self . min_window_length )
return last_kline_dt
2022-03-20 10:09:33 +01:00
2025-11-02 12:51:54 +01:00
def get_missing_klines_count ( self ) :
2024-05-12 19:17:10 +02:00
"""
The number of complete discrete intervals between the last available kline and current timestamp .
The interval length is determined by the frequency parameter .
"""
2025-11-02 12:51:54 +01:00
last_kline_dt = self . get_last_kline_dt ( )
if not last_kline_dt :
2025-07-28 19:05:49 +02:00
return self . min_window_length
2022-03-20 10:09:33 +01:00
2025-07-27 14:32:31 +02:00
freq = self . config [ " freq " ]
2026-02-05 15:31:44 +01:00
intervals_count = get_interval_count_from_start_dt ( freq , last_kline_dt )
2026-02-05 12:51:46 +01:00
return intervals_count
2022-03-20 10:09:33 +01:00
2026-01-25 17:11:28 +01:00
def append_data ( self , dfs : dict ) :
2022-03-20 10:09:33 +01:00
"""
2026-01-25 17:11:28 +01:00
Merge individual data frames by creating a common index and append to the main data frame in this class .
The values in the overlapped range ( if any ) will be overwritten .
2022-03-20 10:09:33 +01:00
"""
#
2026-01-25 17:11:28 +01:00
# Merge multiple dfs in one df by adding columns prefixes and creating a common regular time index
2022-03-20 10:09:33 +01:00
#
2026-01-25 17:11:28 +01:00
# The merge function works with data_source structure so we fill it with data before calling
2025-07-27 14:32:31 +02:00
data_sources = self . config . get ( " data_sources " , [ ] )
2026-01-25 17:11:28 +01:00
if len ( dfs ) != len ( data_sources ) :
log . warning ( f " The number of symbols retrieved { len ( dfs ) } is not equal to the number of data sources { len ( data_sources ) } " )
2022-04-23 09:18:45 +02:00
for ds in data_sources :
2026-01-25 17:11:28 +01:00
ds_symbol = ds . get ( " folder " )
ds [ " df " ] = dfs . get ( ds_symbol )
2022-04-23 09:18:45 +02:00
2026-01-25 17:11:28 +01:00
# Really merge and get one data frame (regular index will be created and column prefixes added)
time_column = self . config [ " time_column " ]
freq = self . config [ " freq " ]
2026-01-09 16:50:52 +01:00
merge_interpolate = self . config . get ( " merge_interpolate " , False )
df = merge_data_sources ( data_sources , time_column , freq , merge_interpolate )
2022-04-13 16:34:38 +02:00
2026-01-25 17:11:28 +01:00
# Store part of the previous state for validation etc. purposes before it is overwritten
2025-11-02 12:51:54 +01:00
self . previous_df = self . df . tail ( 10 ) . copy ( )
2026-01-25 17:11:28 +01:00
#
# Append new records by overwritting the overlap area
#
2025-11-02 12:51:54 +01:00
initial_df_len = len ( self . df )
appended_df_len = len ( df )
self . df = append_df_drop_concat ( self . df , df )
result_df_len = len ( self . df )
overwritten_rows_len = ( initial_df_len + appended_df_len ) - result_df_len # 0 if exact concatenation, positive if overlap, negative if gap (error)
#
2026-01-25 17:11:28 +01:00
# Compute and set new dirty count
# It is how many records must be (re-)evaluated
# The current (old) number might be more than 0, e.g., if we append several times without evaluation)
2025-11-02 12:51:54 +01:00
#
if self . dirty_records < 0 :
pass # No change: still all records have to be re-computed
elif initial_df_len == 0 :
self . dirty_records = - 1 # All records are new therefore all re-compute
else :
# For example, we already had 10 records dirty and 4 were deleted (overwritten by new records)
# All appended rows have to be re-computed. Plus the non-overwritten rows from the old dirty rows (normally 0)
self . dirty_records = appended_df_len + max ( 0 , self . dirty_records - overwritten_rows_len )
if self . dirty_records > = result_df_len :
self . dirty_records = - 1
def analyze ( self ) :
"""
Compute derived columns for dirty records and add them to the data frame :
1. Evaluate derived features
2. Evaluate trained ML features
3. Evaluate signal features
"""
symbol = self . config [ " symbol " ]
if self . dirty_records == 0 :
log . warning ( f " Analysis function called with 0 dirty records. Exit because there is nothing to do. Normally should not happen. " )
return
# It is a parameter passed to generators which indicates the exact (small) number of last rows to re-evaluate to avoid full-evaluation for performance reasons
last_rows = self . dirty_records if self . dirty_records > 0 else 0
last_kline_dt = self . get_last_kline_dt ( )
last_kline_ts_str = str ( pd . to_datetime ( last_kline_dt , unit = ' ms ' , utc = True ) )
log . info ( f " Analyze { symbol } . Last kline timestamp: { last_kline_ts_str } " )
#
# 1. Generate all derived features (NaNs are possible due to limited history)
#
2025-07-27 14:32:31 +02:00
feature_sets = self . config . get ( " feature_sets " , [ ] )
2023-12-09 19:59:29 +01:00
feature_columns = [ ]
2022-07-18 20:23:36 +02:00
for fs in feature_sets :
2025-11-02 12:51:54 +01:00
df , feats = generate_feature_set ( self . df , fs , self . config , self . model_store , last_rows = last_rows )
self . df = df
2023-12-09 19:59:29 +01:00
feature_columns . extend ( feats )
2023-09-24 12:33:43 +02:00
2022-03-20 10:09:33 +01:00
#
2025-11-02 12:51:54 +01:00
# 2. Apply ML models and generate intelligent indicators
2022-03-20 10:09:33 +01:00
#
2025-11-02 12:51:54 +01:00
train_features = self . config [ " train_features " ]
# Shorten the data frame by selecting last rows for which to do predictions
tail_rows = notnull_tail_rows ( self . df [ train_features ] ) # How many last rows have all non-null values
predict_size = tail_rows if not last_rows else min ( tail_rows , last_rows )
predict_features_df = self . df . tail ( predict_size )
predict_features_df = predict_features_df [ train_features ]
# Validation
if predict_features_df . isnull ( ) . any ( ) . any ( ) :
null_columns = { k : v for k , v in predict_features_df . isnull ( ) . any ( ) . to_dict ( ) . items ( ) if v }
2022-04-10 19:38:53 +02:00
log . error ( f " Null in predict_df found. Columns with Null: { null_columns } " )
return
2022-03-20 10:09:33 +01:00
2025-07-27 14:32:31 +02:00
train_feature_sets = self . config . get ( " train_feature_sets " , [ ] )
2025-11-02 12:51:54 +01:00
predict_labels_df = pd . DataFrame ( index = predict_features_df . index )
predict_label_columns = [ ]
2024-03-24 12:25:59 +01:00
for fs in train_feature_sets :
2025-11-02 12:51:54 +01:00
fs_df , feats = predict_feature_set ( predict_features_df , fs , self . config , self . model_store )
predict_labels_df = pd . concat ( [ predict_labels_df , fs_df ] , axis = 1 )
predict_label_columns . extend ( feats )
2024-03-24 12:25:59 +01:00
2025-11-02 12:51:54 +01:00
# Attach all predicted label columns (only for last rows) to the main data frame
self . df = self . df . combine_first ( predict_labels_df ) # Attach new columns
self . df . update ( predict_labels_df ) # Overwrite older values with newly computed values
2022-03-20 10:09:33 +01:00
#
2025-11-02 12:51:54 +01:00
# 3. Signals
2022-03-20 10:09:33 +01:00
#
2025-11-02 12:51:54 +01:00
2025-07-27 14:32:31 +02:00
signal_sets = self . config . get ( " signal_sets " , [ ] )
2023-12-09 19:59:29 +01:00
signal_columns = [ ]
for fs in signal_sets :
2025-11-02 12:51:54 +01:00
df , feats = generate_feature_set ( self . df , fs , self . config , self . model_store , last_rows = last_rows )
self . df = df # TODO: Signal features should be computed in the same way as normal (pre-ML) features
2023-12-09 19:59:29 +01:00
signal_columns . extend ( feats )
2022-08-05 21:27:46 +02:00
2023-12-25 17:14:37 +01:00
#
# Append the new rows to the main data frame with all previously computed data
#
2023-12-09 19:59:29 +01:00
# Log signal values
2025-11-02 12:51:54 +01:00
row = self . get_last_kline ( ) # Last row stores the latest values we need
2023-12-24 16:24:18 +01:00
scores = " , " . join ( [ f " { x } = { row [ x ] : +.3f } " if isinstance ( row [ x ] , float ) else f " { x } = { str ( row [ x ] ) } " for x in signal_columns ] )
2023-12-09 19:59:29 +01:00
log . info ( f " Analyze finished. Close: { int ( row [ ' close ' ] ) : , } Signals: { scores } " )
2023-02-04 12:54:58 +01:00
2025-11-02 12:51:54 +01:00
#
# Validation: newly retrieved and computed values should be (almost) equal to those computed previously in the overlap area
#
check_row_count = 3 # These last rows should be correctly computed (particularly, have enough history in case of aggregation)
num_cols = self . previous_df . select_dtypes ( ( float , int ) ) . columns . tolist ( )
2023-12-24 20:22:49 +01:00
# Loop over several last newly computed data rows
# Skip last row because it should not exist, and before the last row because its kline is frequently updated after retrieval
2025-11-02 12:51:54 +01:00
for r in range ( 2 , min ( check_row_count , len ( self . df ) ) ) :
idx = self . df . index [ - r - 1 ]
2023-12-24 20:22:49 +01:00
2025-11-02 12:51:54 +01:00
if idx not in self . previous_df . index :
2023-12-24 16:24:18 +01:00
continue
2023-12-24 20:22:49 +01:00
2023-12-24 16:24:18 +01:00
# Compare all numeric values of the previously retrieved and newly retrieved rows for the same time
2025-11-02 12:51:54 +01:00
old_row = self . previous_df [ num_cols ] . loc [ idx ]
new_row = self . df [ num_cols ] . loc [ idx ]
2023-12-24 20:22:49 +01:00
comp_idx = np . isclose ( old_row , new_row )
if not np . all ( comp_idx ) :
log . warning ( f " Newly computed row is not equal to the previously computed row for ' { idx } ' . NEW: { new_row [ ~ comp_idx ] . to_dict ( ) } . OLD: { old_row [ ~ comp_idx ] . to_dict ( ) } " )
2023-12-24 16:24:18 +01:00
2025-07-28 19:05:49 +02:00
self . dirty_records = 0 # Everything is computed
2023-12-24 16:24:18 +01:00
# Remove too old rows
2025-07-28 19:05:49 +02:00
if len ( self . df ) > self . max_window_length :
self . df = self . df . tail ( self . max_window_length )
2022-03-20 10:09:33 +01:00
if __name__ == " __main__ " :
pass