2022-07-16 11:08:53 +02:00
from typing import Tuple
2022-03-20 10:09:33 +01:00
from pathlib import Path
import numpy as np
import pandas as pd
2025-07-26 19:40:49 +02:00
import click
2022-03-20 10:09:33 +01:00
from service . App import *
2025-06-15 12:17:00 +02:00
from common . model_store import *
2023-12-10 11:47:25 +01:00
from common . generators import generate_feature_set
2025-07-26 19:40:49 +02:00
"""
Apply feature generators
"""
2022-03-20 10:09:33 +01:00
@click.command ( )
@click.option ( ' --config_file ' , ' -c ' , type = click . Path ( ) , default = ' ' , help = ' Configuration file name ' )
def main ( config_file ) :
load_config ( config_file )
2025-07-26 19:40:49 +02:00
config = App . config
2022-03-20 10:09:33 +01:00
2025-07-26 19:40:49 +02:00
App . model_store = ModelStore ( config )
2025-06-15 12:17:00 +02:00
App . model_store . load_models ( )
2025-07-26 19:40:49 +02:00
time_column = config [ " time_column " ]
2022-04-15 21:45:46 +02:00
2022-07-16 11:08:53 +02:00
now = datetime . now ( )
2022-03-20 10:09:33 +01:00
2025-07-26 19:40:49 +02:00
symbol = config [ " symbol " ]
data_path = Path ( config [ " data_folder " ] ) / symbol
# Determine desired data length depending on train/predict mode
is_train = config . get ( " train " )
if is_train :
window_size = config . get ( " train_length " )
else :
window_size = config . get ( " predict_length " )
features_horizon = config . get ( " features_horizon " )
if window_size :
window_size + = features_horizon
2022-03-20 10:09:33 +01:00
#
2022-04-17 11:34:34 +02:00
# Load merged data with regular time series
2022-03-20 10:09:33 +01:00
#
2025-07-26 19:40:49 +02:00
file_path = data_path / config . get ( " merge_file_name " )
2022-07-16 11:08:53 +02:00
if not file_path . is_file ( ) :
print ( f " Data file does not exist: { file_path } " )
return
2022-03-20 10:09:33 +01:00
2022-07-16 11:08:53 +02:00
print ( f " Loading data from source data file { file_path } ... " )
2024-03-16 11:42:24 +01:00
if file_path . suffix == " .parquet " :
df = pd . read_parquet ( file_path )
elif file_path . suffix == " .csv " :
2025-07-26 19:40:49 +02:00
df = pd . read_csv ( file_path , parse_dates = [ time_column ] , date_format = " ISO8601 " )
2024-03-16 11:42:24 +01:00
else :
2025-04-17 17:22:55 +02:00
print ( f " ERROR: Unknown extension of the input file ' { file_path . suffix } ' . Only ' csv ' and ' parquet ' are supported " )
2024-03-16 11:42:24 +01:00
return
2022-03-20 10:09:33 +01:00
2025-07-26 19:40:49 +02:00
print ( f " Finished loading { len ( df ) } records with { len ( df . columns ) } columns from the source file { file_path } " )
# Select only the data necessary for analysis
if window_size :
df = df . tail ( window_size )
df = df . reset_index ( drop = True )
2022-04-24 20:52:38 +02:00
2023-09-02 11:42:12 +02:00
print ( f " Input data size { len ( df ) } records. Range: [ { df . iloc [ 0 ] [ time_column ] } , { df . iloc [ - 1 ] [ time_column ] } ] " )
2022-03-20 10:09:33 +01:00
#
# Generate derived features
#
2025-07-26 19:40:49 +02:00
feature_sets = config . get ( " feature_sets " , [ ] )
2022-04-18 13:25:25 +02:00
if not feature_sets :
2022-07-16 11:08:53 +02:00
print ( f " ERROR: no feature sets defined. Nothing to process. " )
return
2022-04-18 13:25:25 +02:00
2022-04-23 09:18:45 +02:00
# Apply all feature generators to the data frame which get accordingly new derived columns
2025-07-26 19:40:49 +02:00
# The feature parameters will be taken from config (depending on generator)
2022-06-26 17:32:41 +02:00
print ( f " Start generating features for { len ( df ) } input records. " )
2022-07-16 11:08:53 +02:00
all_features = [ ]
2023-06-13 21:13:55 +02:00
for i , fs in enumerate ( feature_sets ) :
2022-07-23 10:03:47 +02:00
fs_now = datetime . now ( )
2023-08-06 12:03:55 +02:00
print ( f " Start feature set { i } / { len ( feature_sets ) } . Generator { fs . get ( ' generator ' ) } ... " )
2025-06-15 12:17:00 +02:00
2025-07-26 19:40:49 +02:00
df , new_features = generate_feature_set ( df , fs , config , App . model_store , last_rows = 0 )
2025-06-15 12:17:00 +02:00
2022-07-16 11:08:53 +02:00
all_features . extend ( new_features )
2022-07-23 10:03:47 +02:00
fs_elapsed = datetime . now ( ) - fs_now
2023-07-28 08:46:52 +02:00
print ( f " Finished feature set { i } / { len ( feature_sets ) } . Generator { fs . get ( ' generator ' ) } . Features: { len ( new_features ) } . Time: { str ( fs_elapsed ) . split ( ' . ' ) [ 0 ] } " )
2022-07-16 11:08:53 +02:00
2022-06-26 17:32:41 +02:00
print ( f " Finished generating features. " )
2022-04-23 09:18:45 +02:00
2025-07-26 19:40:49 +02:00
# Handle NULLs
df . replace ( [ np . inf , - np . inf ] , np . nan , inplace = True )
na_df = df [ df [ all_features ] . isna ( ) . any ( axis = 1 ) ]
if len ( na_df ) > 0 :
print ( f " WARNING: There exist { len ( na_df ) } rows with NULLs in some feature columns " )
2022-07-23 10:03:47 +02:00
print ( f " Number of NULL values: " )
print ( df [ all_features ] . isnull ( ) . sum ( ) . sort_values ( ascending = False ) )
2022-04-23 09:18:45 +02:00
#
# Store feature matrix in output file
#
2025-07-26 19:40:49 +02:00
out_file_name = config . get ( " feature_file_name " )
2024-03-16 12:31:14 +01:00
out_path = ( data_path / out_file_name ) . resolve ( )
2022-04-23 09:18:45 +02:00
2024-03-16 12:31:14 +01:00
print ( f " Storing features with { len ( df ) } records and { len ( df . columns ) } columns in output file { out_path } ... " )
if out_path . suffix == " .parquet " :
df . to_parquet ( out_path , index = False )
elif out_path . suffix == " .csv " :
df . to_csv ( out_path , index = False , float_format = " %.6f " )
else :
2025-04-17 17:22:55 +02:00
print ( f " ERROR: Unknown extension of the output file ' { out_path . suffix } ' . Only ' csv ' and ' parquet ' are supported " )
2024-03-16 12:31:14 +01:00
return
print ( f " Stored output file { out_path } with { len ( df ) } records " )
2022-04-23 09:18:45 +02:00
2022-04-18 13:25:25 +02:00
#
2024-03-16 12:31:14 +01:00
# Store feature list
2022-04-23 09:18:45 +02:00
#
with open ( out_path . with_suffix ( ' .txt ' ) , " a+ " ) as f :
2022-07-17 21:32:45 +02:00
f . write ( " , " . join ( [ f ' " { f } " ' for f in all_features ] ) + " \n \n " )
2022-04-23 09:18:45 +02:00
2024-03-16 12:31:14 +01:00
print ( f " Stored { len ( all_features ) } features in output file { out_path . with_suffix ( ' .txt ' ) } " )
2022-04-23 09:18:45 +02:00
2022-07-16 11:08:53 +02:00
elapsed = datetime . now ( ) - now
2023-02-19 20:50:25 +01:00
print ( f " Finished generating { len ( all_features ) } features in { str ( elapsed ) . split ( ' . ' ) [ 0 ] } . Time per feature: { str ( elapsed / len ( all_features ) ) . split ( ' . ' ) [ 0 ] } " )
2022-07-16 11:08:53 +02:00
2022-04-23 09:18:45 +02:00
2022-03-20 10:09:33 +01:00
if __name__ == ' __main__ ' :
main ( )