2022-07-16 11:08:53 +02:00
from typing import Tuple
2022-03-20 10:09:33 +01:00
from pathlib import Path
import click
import numpy as np
import pandas as pd
from service . App import *
from common . feature_generation import *
2022-07-16 11:24:04 +02:00
from common . label_generation_highlow import generate_labels_highlow
2023-02-19 20:50:25 +01:00
from common . label_generation_highlow import generate_labels_highlow2
2022-07-16 11:24:04 +02:00
from common . label_generation_topbot import generate_labels_topbot
2023-07-29 15:14:56 +02:00
from common . label_generation_topbot import generate_labels_topbot2
2022-03-20 10:09:33 +01:00
#
# Parameters
#
class P :
2022-04-24 20:52:38 +02:00
in_nrows = 50_000_000 # Load only this number of records
2023-08-11 20:54:54 +02:00
tail_rows = int ( 10.0 * 525_600 ) # Process only this number of last rows
2022-03-20 10:09:33 +01:00
@click.command ( )
@click.option ( ' --config_file ' , ' -c ' , type = click . Path ( ) , default = ' ' , help = ' Configuration file name ' )
def main ( config_file ) :
load_config ( config_file )
2022-07-16 11:08:53 +02:00
time_column = App . config [ " time_column " ]
2022-04-15 21:45:46 +02:00
2022-07-16 11:08:53 +02:00
now = datetime . now ( )
2022-03-20 10:09:33 +01:00
#
2022-04-17 11:34:34 +02:00
# Load merged data with regular time series
2022-03-20 10:09:33 +01:00
#
2022-07-16 11:08:53 +02:00
symbol = App . config [ " symbol " ]
data_path = Path ( App . config [ " data_folder " ] ) / symbol
2022-04-18 13:25:25 +02:00
2022-07-17 10:04:20 +02:00
file_path = ( data_path / App . config . get ( " merge_file_name " ) ) . with_suffix ( " .csv " )
2022-07-16 11:08:53 +02:00
if not file_path . is_file ( ) :
print ( f " Data file does not exist: { file_path } " )
return
2022-03-20 10:09:33 +01:00
2022-07-16 11:08:53 +02:00
print ( f " Loading data from source data file { file_path } ... " )
2023-08-24 20:24:11 +02:00
df = pd . read_csv ( file_path , parse_dates = [ time_column ] , date_format = " ISO8601 " , nrows = P . in_nrows )
2022-04-23 09:18:45 +02:00
print ( f " Finished loading { len ( df ) } records with { len ( df . columns ) } columns. " )
2022-03-20 10:09:33 +01:00
2022-04-24 20:52:38 +02:00
df = df . iloc [ - P . tail_rows : ]
df = df . reset_index ( drop = True )
2023-09-02 11:42:12 +02:00
print ( f " Input data size { len ( df ) } records. Range: [ { df . iloc [ 0 ] [ time_column ] } , { df . iloc [ - 1 ] [ time_column ] } ] " )
2022-03-20 10:09:33 +01:00
#
# Generate derived features
#
2022-04-18 13:25:25 +02:00
feature_sets = App . config . get ( " feature_sets " , [ ] )
if not feature_sets :
2022-07-16 11:08:53 +02:00
print ( f " ERROR: no feature sets defined. Nothing to process. " )
return
2022-04-18 13:25:25 +02:00
# By default, we generate standard kline features
2022-07-16 11:08:53 +02:00
#feature_sets = [{"column_prefix": "", "generator": "klines", "feature_prefix": ""}]
2022-04-18 13:25:25 +02:00
2022-04-23 09:18:45 +02:00
# Apply all feature generators to the data frame which get accordingly new derived columns
# The feature parameters will be taken from App.config (depending on generator)
2022-06-26 17:32:41 +02:00
print ( f " Start generating features for { len ( df ) } input records. " )
2022-07-16 11:08:53 +02:00
all_features = [ ]
2023-06-13 21:13:55 +02:00
for i , fs in enumerate ( feature_sets ) :
2022-07-23 10:03:47 +02:00
fs_now = datetime . now ( )
2023-08-06 12:03:55 +02:00
print ( f " Start feature set { i } / { len ( feature_sets ) } . Generator { fs . get ( ' generator ' ) } ... " )
2022-07-16 11:08:53 +02:00
df , new_features = generate_feature_set ( df , fs , last_rows = 0 )
all_features . extend ( new_features )
2022-07-23 10:03:47 +02:00
fs_elapsed = datetime . now ( ) - fs_now
2023-07-28 08:46:52 +02:00
print ( f " Finished feature set { i } / { len ( feature_sets ) } . Generator { fs . get ( ' generator ' ) } . Features: { len ( new_features ) } . Time: { str ( fs_elapsed ) . split ( ' . ' ) [ 0 ] } " )
2022-07-16 11:08:53 +02:00
2022-06-26 17:32:41 +02:00
print ( f " Finished generating features. " )
2022-04-23 09:18:45 +02:00
2022-07-23 10:03:47 +02:00
print ( f " Number of NULL values: " )
print ( df [ all_features ] . isnull ( ) . sum ( ) . sort_values ( ascending = False ) )
2022-04-23 09:18:45 +02:00
#
# Store feature matrix in output file
#
2022-07-17 10:04:20 +02:00
out_file_name = App . config . get ( " feature_file_name " )
2022-07-16 11:08:53 +02:00
out_path = ( data_path / out_file_name ) . with_suffix ( " .csv " ) . resolve ( )
2022-04-23 09:18:45 +02:00
print ( f " Storing feature matrix with { len ( df ) } records and { len ( df . columns ) } columns in output file... " )
df . to_csv ( out_path , index = False , float_format = " %.4f " )
#df.to_parquet(out_path.with_suffix('.parquet'), engine='auto', compression=None, index=None, partition_cols=None)
2022-04-18 13:25:25 +02:00
#
2022-04-23 09:18:45 +02:00
# Store features
#
with open ( out_path . with_suffix ( ' .txt ' ) , " a+ " ) as f :
2022-07-17 21:32:45 +02:00
f . write ( " , " . join ( [ f ' " { f } " ' for f in all_features ] ) + " \n \n " )
2022-04-23 09:18:45 +02:00
print ( f " Stored { len ( all_features ) } features in output file { out_path } " )
2022-07-16 11:08:53 +02:00
elapsed = datetime . now ( ) - now
2023-02-19 20:50:25 +01:00
print ( f " Finished generating { len ( all_features ) } features in { str ( elapsed ) . split ( ' . ' ) [ 0 ] } . Time per feature: { str ( elapsed / len ( all_features ) ) . split ( ' . ' ) [ 0 ] } " )
2022-07-16 11:08:53 +02:00
2022-04-23 09:18:45 +02:00
print ( f " Output file location: { out_path } " )
2022-07-16 11:08:53 +02:00
def generate_feature_set ( df : pd . DataFrame , fs : dict , last_rows : int ) - > Tuple [ pd . DataFrame , list ] :
"""
Apply the specified resolved feature generator to the input data set .
"""
2022-04-23 09:18:45 +02:00
2022-07-16 11:08:53 +02:00
#
# Select columns from the data set to be processed by the feature generator
#
cp = fs . get ( " column_prefix " )
if cp :
cp = cp + " _ "
f_cols = [ col for col in df if col . startswith ( cp ) ]
f_df = df [ f_cols ] # Alternatively: f_df = df.loc[:, df.columns.str.startswith(cf)]
# Remove prefix because feature generators are generic (a prefix will be then added to derived features before adding them back to the main frame)
f_df = f_df . rename ( columns = lambda x : x [ len ( cp ) : ] if x . startswith ( cp ) else x ) # Alternatively: f_df.columns = f_df.columns.str.replace(cp, "")
else :
f_df = df [ df . columns . to_list ( ) ] # We want to have a different data frame object to add derived featuers and then join them back to the main frame with prefix
#
# Resolve and apply feature generator functions from the configuration
#
generator = fs . get ( " generator " )
2023-10-01 20:51:09 +02:00
gen_config = fs . get ( ' config ' , { } )
2023-06-15 22:17:45 +02:00
if generator == " itblib " :
2023-10-01 20:51:09 +02:00
features = generate_features_itblib ( f_df , gen_config , last_rows = last_rows )
2022-07-16 11:08:53 +02:00
elif generator == " depth " :
features = generate_features_depth ( f_df )
2022-07-17 13:58:41 +02:00
elif generator == " tsfresh " :
2023-10-01 20:51:09 +02:00
features = generate_features_tsfresh ( f_df , gen_config , last_rows = last_rows )
2023-06-13 21:13:55 +02:00
elif generator == " talib " :
2023-10-01 20:51:09 +02:00
features = generate_features_talib ( f_df , gen_config , last_rows = last_rows )
2023-07-22 13:18:15 +02:00
elif generator == " itbstats " :
2023-10-01 20:51:09 +02:00
features = generate_features_itbstats ( f_df , gen_config , last_rows = last_rows )
2022-07-16 11:08:53 +02:00
# Labels
elif generator == " highlow " :
2023-10-01 20:51:09 +02:00
horizon = gen_config . get ( " horizon " )
2022-07-16 11:08:53 +02:00
# Binary labels whether max has exceeded a threshold or not
2022-07-17 10:04:20 +02:00
print ( f " Generating ' highlow ' labels with horizon { horizon } ... " )
features = generate_labels_highlow ( f_df , horizon = horizon )
2022-07-16 11:08:53 +02:00
2023-02-19 20:50:25 +01:00
print ( f " Finished generating ' highlow ' labels. { len ( features ) } labels generated. " )
elif generator == " highlow2 " :
2023-07-29 22:45:42 +02:00
print ( f " Generating ' highlow2 ' labels... " )
2023-10-01 20:51:09 +02:00
f_df , features = generate_labels_highlow2 ( f_df , gen_config )
2023-07-29 22:45:42 +02:00
print ( f " Finished generating ' highlow2 ' labels. { len ( features ) } labels generated. " )
2022-07-16 11:08:53 +02:00
elif generator == " topbot " :
2023-10-01 20:51:09 +02:00
column_name = gen_config . get ( " columns " , " close " )
2022-07-16 11:08:53 +02:00
2023-07-29 15:14:56 +02:00
top_level_fracs = [ 0.01 , 0.02 , 0.03 , 0.04 , 0.05 ]
2022-07-16 11:08:53 +02:00
bot_level_fracs = [ - x for x in top_level_fracs ]
2022-12-18 10:52:17 +01:00
f_df , features = generate_labels_topbot ( f_df , column_name , top_level_fracs , bot_level_fracs )
2023-07-29 15:14:56 +02:00
elif generator == " topbot2 " :
2023-10-01 20:51:09 +02:00
f_df , features = generate_labels_topbot2 ( f_df , gen_config )
2022-07-16 11:08:53 +02:00
else :
print ( f " Unknown feature generator { generator } " )
return
#
# Add generated features to the main data frame with all other columns and features
#
f_df = f_df [ features ]
fp = fs . get ( " feature_prefix " )
if fp :
f_df = f_df . add_prefix ( fp + " _ " )
new_features = f_df . columns . to_list ( )
df = df . join ( f_df ) # Attach all derived features to the main frame
return df , new_features
2022-03-20 10:09:33 +01:00
if __name__ == ' __main__ ' :
main ( )