2023-12-10 11:51:26 +01:00
from typing import Tuple
2025-09-07 11:01:45 +02:00
import asyncio
2023-12-10 11:51:26 +01:00
import numpy as np
import pandas as pd
2025-08-24 13:02:51 +02:00
import pandas . api . types as ptypes
2023-12-10 11:51:26 +01:00
2025-06-16 17:52:46 +02:00
from common . types import Venue
2025-09-07 11:01:45 +02:00
from common . utils import *
2024-03-24 13:22:52 +01:00
from common . model_store import *
2023-12-10 11:51:26 +01:00
from common . gen_features import *
from common . gen_labels_highlow import generate_labels_highlow , generate_labels_highlow2
from common . gen_labels_topbot import generate_labels_topbot , generate_labels_topbot2
from common . gen_signals import (
generate_smoothen_scores , generate_combine_scores ,
generate_threshold_rule , generate_threshold_rule2
)
2025-06-15 12:17:00 +02:00
def generate_feature_set ( df : pd . DataFrame , fs : dict , config : dict , model_store : ModelStore , last_rows : int ) - > Tuple [ pd . DataFrame , list ] :
2023-12-10 11:51:26 +01:00
"""
Apply the specified resolved feature generator to the input data set .
"""
#
# Select columns from the data set to be processed by the feature generator
#
cp = fs . get ( " column_prefix " )
if cp :
cp = cp + " _ "
f_cols = [ col for col in df if col . startswith ( cp ) ]
f_df = df [ f_cols ] # Alternatively: f_df = df.loc[:, df.columns.str.startswith(cf)]
# Remove prefix because feature generators are generic (a prefix will be then added to derived features before adding them back to the main frame)
f_df = f_df . rename ( columns = lambda x : x [ len ( cp ) : ] if x . startswith ( cp ) else x ) # Alternatively: f_df.columns = f_df.columns.str.replace(cp, "")
else :
2025-11-02 13:08:09 +01:00
f_df = df [ df . columns . to_list ( ) ] # We want to have a different data frame object to add derived features and then join them back to the main frame with prefix
2023-12-10 11:51:26 +01:00
#
# Resolve and apply feature generator functions from the configuration
#
generator = fs . get ( " generator " )
gen_config = fs . get ( ' config ' , { } )
if generator == " itblib " :
features = generate_features_itblib ( f_df , gen_config , last_rows = last_rows )
elif generator == " depth " :
features = generate_features_depth ( f_df )
elif generator == " tsfresh " :
features = generate_features_tsfresh ( f_df , gen_config , last_rows = last_rows )
elif generator == " talib " :
features = generate_features_talib ( f_df , gen_config , last_rows = last_rows )
elif generator == " itbstats " :
features = generate_features_itbstats ( f_df , gen_config , last_rows = last_rows )
# Labels
elif generator == " highlow " :
horizon = gen_config . get ( " horizon " )
# Binary labels whether max has exceeded a threshold or not
print ( f " Generating ' highlow ' labels with horizon { horizon } ... " )
features = generate_labels_highlow ( f_df , horizon = horizon )
print ( f " Finished generating ' highlow ' labels. { len ( features ) } labels generated. " )
elif generator == " highlow2 " :
print ( f " Generating ' highlow2 ' labels... " )
f_df , features = generate_labels_highlow2 ( f_df , gen_config )
print ( f " Finished generating ' highlow2 ' labels. { len ( features ) } labels generated. " )
elif generator == " topbot " :
column_name = gen_config . get ( " columns " , " close " )
top_level_fracs = [ 0.01 , 0.02 , 0.03 , 0.04 , 0.05 ]
bot_level_fracs = [ - x for x in top_level_fracs ]
f_df , features = generate_labels_topbot ( f_df , column_name , top_level_fracs , bot_level_fracs )
elif generator == " topbot2 " :
f_df , features = generate_labels_topbot2 ( f_df , gen_config )
# Signals
elif generator == " smoothen " :
f_df , features = generate_smoothen_scores ( f_df , gen_config )
elif generator == " combine " :
f_df , features = generate_combine_scores ( f_df , gen_config )
elif generator == " threshold_rule " :
f_df , features = generate_threshold_rule ( f_df , gen_config )
elif generator == " threshold_rule2 " :
f_df , features = generate_threshold_rule2 ( f_df , gen_config )
else :
2024-03-17 11:12:55 +01:00
# Resolve generator name to a function reference
generator_fn = resolve_generator_name ( generator )
if generator_fn is None :
raise ValueError ( f " Unknown feature generator name or name cannot be resolved: { generator } " )
# Call this function
2025-06-16 17:52:46 +02:00
f_df , features = generator_fn ( f_df , gen_config , config , model_store )
2023-12-10 11:51:26 +01:00
#
# Add generated features to the main data frame with all other columns and features
#
f_df = f_df [ features ]
fp = fs . get ( " feature_prefix " )
if fp :
f_df = f_df . add_prefix ( fp + " _ " )
new_features = f_df . columns . to_list ( )
2023-12-22 16:36:28 +01:00
# Delete new columns if they already exist
df . drop ( list ( set ( df . columns ) & set ( new_features ) ) , axis = 1 , inplace = True )
2023-12-10 11:51:26 +01:00
df = df . join ( f_df ) # Attach all derived features to the main frame
return df , new_features
2024-03-17 11:12:55 +01:00
2025-08-24 13:02:51 +02:00
def predict_feature_set ( df , fs , config , model_store : ModelStore ) - > Tuple [ pd . DataFrame , list ] :
2024-03-24 13:22:52 +01:00
2025-08-24 13:02:51 +02:00
train_features , labels , algorithms = get_features_labels_algorithms ( fs , config )
2024-03-24 13:22:52 +01:00
train_df = df [ train_features ]
features = [ ]
out_df = pd . DataFrame ( index = train_df . index ) # Collect predictions
for label in labels :
for model_config in algorithms :
algo_name = model_config . get ( " name " )
algo_type = model_config . get ( " algo " )
score_column_name = label + label_algo_separator + algo_name
# It is an entry from loaded model dict
2025-06-15 12:17:00 +02:00
model_pair = model_store . get_model_pair ( score_column_name ) # Trained model from model registry
2024-03-24 13:22:52 +01:00
print ( f " Predict ' { score_column_name } ' . Algorithm { algo_name } . Label: { label } . Train length { len ( train_df ) } . Train columns { len ( train_df . columns ) } " )
if algo_type == " gb " :
2026-01-08 16:19:13 +01:00
from common . classifier_gb import predict_gb
2024-03-24 13:22:52 +01:00
df_y_hat = predict_gb ( model_pair , train_df , model_config )
elif algo_type == " nn " :
2026-01-08 18:01:38 +01:00
from common . classifier_nn import predict_nn
2024-03-24 13:22:52 +01:00
df_y_hat = predict_nn ( model_pair , train_df , model_config )
elif algo_type == " lc " :
2026-01-08 18:01:38 +01:00
from common . classifier_lc import predict_lc
2024-03-24 13:22:52 +01:00
df_y_hat = predict_lc ( model_pair , train_df , model_config )
elif algo_type == " svc " :
2026-01-08 18:01:38 +01:00
from common . classifier_svc import predict_svc
2024-03-24 13:22:52 +01:00
df_y_hat = predict_svc ( model_pair , train_df , model_config )
else :
2025-08-24 13:02:51 +02:00
raise ValueError ( f " Unknown algorithm type { algo_type } . Check algorithm list. " )
2024-03-24 13:22:52 +01:00
out_df [ score_column_name ] = df_y_hat
features . append ( score_column_name )
2025-08-24 13:02:51 +02:00
return out_df , features
2024-03-24 13:22:52 +01:00
2025-08-24 13:02:51 +02:00
def train_feature_set ( df , fs , config ) - > dict :
2024-03-24 13:22:52 +01:00
2025-08-24 13:02:51 +02:00
train_features , labels , algorithms = get_features_labels_algorithms ( fs , config )
2025-01-11 18:44:28 +01:00
2025-03-02 18:01:24 +01:00
# Only for train mode
2025-01-11 18:44:28 +01:00
df = df . dropna ( subset = train_features ) . reset_index ( drop = True )
2025-03-02 18:01:24 +01:00
df = df . dropna ( subset = labels ) . reset_index ( drop = True )
2024-03-24 13:22:52 +01:00
2025-08-24 13:02:51 +02:00
models = dict ( ) # Here collect the resulted trained models
2024-03-24 13:22:52 +01:00
for label in labels :
for model_config in algorithms :
algo_name = model_config . get ( " name " )
algo_type = model_config . get ( " algo " )
score_column_name = label + label_algo_separator + algo_name
2025-03-02 18:01:24 +01:00
# Limit length according to the algorith train parameters
2025-03-07 16:46:14 +01:00
algo_every_nth_row = model_config . get ( " params " , { } ) . get ( " every_nth_row " )
2025-03-02 20:12:58 +01:00
if algo_every_nth_row :
train_df = df . iloc [ : : algo_every_nth_row , : ]
2024-03-24 13:22:52 +01:00
else :
train_df = df
2025-03-07 16:46:14 +01:00
algo_train_length = model_config . get ( " params " , { } ) . get ( " length " )
2025-03-02 20:12:58 +01:00
if algo_train_length :
train_df = train_df . tail ( algo_train_length )
2025-03-02 18:01:24 +01:00
2024-03-24 13:22:52 +01:00
df_X = train_df [ train_features ]
df_y = train_df [ label ]
print ( f " Train ' { score_column_name } ' . Algorithm { algo_name } . Label: { label } . Train length { len ( df_X ) } . Train columns { len ( df_X . columns ) } " )
if algo_type == " gb " :
2026-01-08 16:19:13 +01:00
from common . classifier_gb import train_gb
2024-03-24 13:22:52 +01:00
model_pair = train_gb ( df_X , df_y , model_config )
models [ score_column_name ] = model_pair
elif algo_type == " nn " :
2026-01-08 18:01:38 +01:00
from common . classifier_nn import train_nn
2024-03-24 13:22:52 +01:00
model_pair = train_nn ( df_X , df_y , model_config )
models [ score_column_name ] = model_pair
elif algo_type == " lc " :
2026-01-08 18:01:38 +01:00
from common . classifier_lc import train_lc
2024-03-24 13:22:52 +01:00
model_pair = train_lc ( df_X , df_y , model_config )
models [ score_column_name ] = model_pair
elif algo_type == " svc " :
2026-01-08 18:01:38 +01:00
from common . classifier_svc import train_svc
2024-03-24 13:22:52 +01:00
model_pair = train_svc ( df_X , df_y , model_config )
models [ score_column_name ] = model_pair
else :
2025-08-24 13:02:51 +02:00
raise ValueError ( f " Unknown algorithm type { algo_type } . Check algorithm list. " )
2024-03-24 13:22:52 +01:00
2025-08-24 13:02:51 +02:00
return models
2024-03-24 13:22:52 +01:00
2025-08-24 13:02:51 +02:00
def get_features_labels_algorithms ( fs , config ) - > Tuple [ list , list , list ] :
"""
Get three lists by combining the entries from default lists in the config file
and lists in the generator config . The function will return a list from the specific
generator config if it is available and the default list otherwise .
For the algorithm list , it will resolve the algorithm names into their definitions if necessary .
"""
train_features_all = config . get ( " train_features " , [ ] )
train_features = fs . get ( " config " ) . get ( " columns " , [ ] )
if not train_features :
train_features = fs . get ( " config " ) . get ( " features " , [ ] )
if not train_features :
train_features = train_features_all
labels_all = config . get ( " labels " , [ ] )
labels = fs . get ( " config " ) . get ( " labels " , [ ] )
if not labels :
labels = labels_all
algorithms_all = config . get ( " algorithms " )
algorithms_str = fs . get ( " config " ) . get ( " functions " , [ ] )
if not algorithms_str :
algorithms_str = fs . get ( " config " ) . get ( " algorithms " , [ ] )
# The algorithms can be either strings (names) or dicts (definitions) so we resolve the names
algorithms = [ ]
for alg in algorithms_str :
if isinstance ( alg , str ) : # Find in the list of algorithms
alg = find_algorithm_by_name ( algorithms_all , alg )
elif not isinstance ( alg , dict ) :
raise ValueError ( f " Algorithm has to be either dict or name " )
algorithms . append ( alg )
if not algorithms :
algorithms = algorithms_all
return train_features , labels , algorithms
2025-02-15 13:32:24 +01:00
2025-06-15 12:17:00 +02:00
async def output_feature_set ( df , fs : dict , config : dict , model_store : ModelStore ) - > None :
2025-09-07 11:01:45 +02:00
from outputs . notifier_scores import send_score_notification
from outputs . notifier_diagram import send_diagram
from outputs . notifier_trades import trader_simulation
from outputs import get_trader_functions
2025-02-15 13:32:24 +01:00
#
# Resolve and apply feature generator functions from the configuration
#
generator = fs . get ( " generator " )
gen_config = fs . get ( ' config ' , { } )
2025-02-16 12:46:03 +01:00
if generator == " score_notification_model " :
generator_fn = send_score_notification
elif generator == " diagram_notification_model " :
generator_fn = send_diagram
2025-02-15 13:32:24 +01:00
elif generator == " trader_simulation " :
2025-02-16 12:46:03 +01:00
generator_fn = trader_simulation
elif generator == " trader_binance " :
2025-06-16 17:52:46 +02:00
generator_fn = get_trader_functions ( Venue . BINANCE ) [ " trader " ]
elif generator == " trader_mt5 " :
generator_fn = get_trader_functions ( Venue . MT5 ) [ " trader " ]
2025-02-15 13:32:24 +01:00
else :
# Resolve generator name to a function reference
generator_fn = resolve_generator_name ( generator )
if generator_fn is None :
raise ValueError ( f " Unknown feature generator name or name cannot be resolved: { generator } " )
2025-02-16 12:46:03 +01:00
# Call the resolved function
if asyncio . iscoroutinefunction ( generator_fn ) :
2025-03-25 20:27:43 +01:00
if asyncio . get_running_loop ( ) :
2025-06-16 17:52:46 +02:00
await generator_fn ( df , gen_config , config , model_store )
2025-03-25 20:27:43 +01:00
else :
2025-06-16 17:52:46 +02:00
asyncio . run ( generator_fn ( df , gen_config , config , model_store ) )
2025-02-16 12:46:03 +01:00
else :
2025-06-16 17:52:46 +02:00
generator_fn ( df , gen_config , config , model_store )