mirror of
https://github.com/asavinov/intelligent-trading-bot.git
synced 2026-05-04 16:26:44 +00:00
211 lines
8.4 KiB
Python
211 lines
8.4 KiB
Python
from typing import Tuple
|
|
from pathlib import Path
|
|
import click
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
|
|
from service.App import *
|
|
from common.feature_generation import *
|
|
from common.label_generation_highlow import generate_labels_highlow
|
|
from common.label_generation_highlow import generate_labels_highlow2
|
|
from common.label_generation_topbot import generate_labels_topbot
|
|
|
|
#
|
|
# Parameters
|
|
#
|
|
class P:
|
|
in_nrows = 50_000_000 # Load only this number of records
|
|
tail_rows = int(3.0 * 525_600) # Process only this number of last rows
|
|
|
|
|
|
@click.command()
|
|
@click.option('--config_file', '-c', type=click.Path(), default='', help='Configuration file name')
|
|
def main(config_file):
|
|
load_config(config_file)
|
|
|
|
time_column = App.config["time_column"]
|
|
|
|
now = datetime.now()
|
|
|
|
#
|
|
# Load merged data with regular time series
|
|
#
|
|
symbol = App.config["symbol"]
|
|
data_path = Path(App.config["data_folder"]) / symbol
|
|
|
|
file_path = (data_path / App.config.get("merge_file_name")).with_suffix(".csv")
|
|
if not file_path.is_file():
|
|
print(f"Data file does not exist: {file_path}")
|
|
return
|
|
|
|
print(f"Loading data from source data file {file_path}...")
|
|
df = pd.read_csv(file_path, parse_dates=[time_column], nrows=P.in_nrows)
|
|
print(f"Finished loading {len(df)} records with {len(df.columns)} columns.")
|
|
|
|
df = df.iloc[-P.tail_rows:]
|
|
df = df.reset_index(drop=True)
|
|
|
|
#
|
|
# Generate derived features
|
|
#
|
|
feature_sets = App.config.get("feature_sets", [])
|
|
if not feature_sets:
|
|
print(f"ERROR: no feature sets defined. Nothing to process.")
|
|
return
|
|
# By default, we generate standard kline features
|
|
#feature_sets = [{"column_prefix": "", "generator": "klines", "feature_prefix": ""}]
|
|
|
|
# Apply all feature generators to the data frame which get accordingly new derived columns
|
|
# The feature parameters will be taken from App.config (depending on generator)
|
|
print(f"Start generating features for {len(df)} input records.")
|
|
|
|
all_features = []
|
|
for fs in feature_sets:
|
|
fs_now = datetime.now()
|
|
print(f"Start generator {fs.get('generator')}...")
|
|
df, new_features = generate_feature_set(df, fs, last_rows=0)
|
|
all_features.extend(new_features)
|
|
fs_elapsed = datetime.now() - fs_now
|
|
print(f"Finished generator {fs.get('generator')}. Features: {len(new_features)}. Time: {str(fs_elapsed).split('.')[0]}")
|
|
|
|
print(f"Finished generating features.")
|
|
|
|
print(f"Number of NULL values:")
|
|
print(df[all_features].isnull().sum().sort_values(ascending=False))
|
|
|
|
#
|
|
# Store feature matrix in output file
|
|
#
|
|
out_file_name = App.config.get("feature_file_name")
|
|
out_path = (data_path / out_file_name).with_suffix(".csv").resolve()
|
|
|
|
print(f"Storing feature matrix with {len(df)} records and {len(df.columns)} columns in output file...")
|
|
df.to_csv(out_path, index=False, float_format="%.4f")
|
|
#df.to_parquet(out_path.with_suffix('.parquet'), engine='auto', compression=None, index=None, partition_cols=None)
|
|
|
|
#
|
|
# Store features
|
|
#
|
|
with open(out_path.with_suffix('.txt'), "a+") as f:
|
|
f.write(", ".join([f'"{f}"' for f in all_features] ) + "\n\n")
|
|
|
|
print(f"Stored {len(all_features)} features in output file {out_path}")
|
|
|
|
elapsed = datetime.now() - now
|
|
print(f"Finished generating {len(all_features)} features in {str(elapsed).split('.')[0]}. Time per feature: {str(elapsed/len(all_features)).split('.')[0]}")
|
|
|
|
print(f"Output file location: {out_path}")
|
|
|
|
|
|
def generate_feature_set(df: pd.DataFrame, fs: dict, last_rows: int) -> Tuple[pd.DataFrame, list]:
|
|
"""
|
|
Apply the specified resolved feature generator to the input data set.
|
|
"""
|
|
|
|
#
|
|
# Select columns from the data set to be processed by the feature generator
|
|
#
|
|
cp = fs.get("column_prefix")
|
|
if cp:
|
|
cp = cp + "_"
|
|
f_cols = [col for col in df if col.startswith(cp)]
|
|
f_df = df[f_cols] # Alternatively: f_df = df.loc[:, df.columns.str.startswith(cf)]
|
|
# Remove prefix because feature generators are generic (a prefix will be then added to derived features before adding them back to the main frame)
|
|
f_df = f_df.rename(columns=lambda x: x[len(cp):] if x.startswith(cp) else x) # Alternatively: f_df.columns = f_df.columns.str.replace(cp, "")
|
|
else:
|
|
f_df = df[df.columns.to_list()] # We want to have a different data frame object to add derived featuers and then join them back to the main frame with prefix
|
|
|
|
#
|
|
# Resolve and apply feature generator functions from the configuration
|
|
#
|
|
generator = fs.get("generator")
|
|
if generator == "binance_main":
|
|
features = generate_features_binance_main(
|
|
f_df, use_differences=False,
|
|
base_window=App.config["base_window"], windows=App.config["averaging_windows"],
|
|
area_windows=App.config["area_windows"], last_rows=last_rows
|
|
)
|
|
elif generator == "binance_secondary":
|
|
features = generate_features_binance_secondary(
|
|
f_df, use_differences=False,
|
|
base_window=App.config["base_window"], windows=App.config["averaging_windows"],
|
|
area_windows=App.config["area_windows"], last_rows=last_rows
|
|
)
|
|
elif generator == "futures":
|
|
features = generate_features_futures(f_df)
|
|
elif generator == "depth":
|
|
features = generate_features_depth(f_df)
|
|
elif generator == "yahoo_main":
|
|
features = generate_features_yahoo_main(
|
|
f_df, use_differences=False,
|
|
base_window=App.config["base_window"], windows=App.config["averaging_windows"],
|
|
area_windows=App.config["area_windows"], last_rows=last_rows
|
|
)
|
|
elif generator == "yahoo_secondary":
|
|
features = generate_features_yahoo_secondary(
|
|
f_df, use_differences=False,
|
|
base_window=App.config["base_window"], windows=App.config["averaging_windows"],
|
|
area_windows=App.config["area_windows"], last_rows=last_rows
|
|
)
|
|
elif generator == "area_features":
|
|
area_windows = App.config["area_windows"]
|
|
if not area_windows:
|
|
area_windows = [60, 120, 180, 300]
|
|
# Numeric features which is a ratio between areas over and under the latest price
|
|
features = add_area_ratio(f_df, is_future=False, column_name="close", windows=area_windows, suffix="_area_past")
|
|
elif generator == "tsfresh":
|
|
tsfresh_windows = App.config["tsfresh_windows"]
|
|
features = generate_features_tsfresh(f_df, column_name="close", windows=tsfresh_windows, last_rows=last_rows)
|
|
|
|
# Labels
|
|
elif generator == "highlow":
|
|
horizon = App.config["highlow_horizon"]
|
|
|
|
# Binary labels whether max has exceeded a threshold or not
|
|
print(f"Generating 'highlow' labels with horizon {horizon}...")
|
|
features = generate_labels_highlow(f_df, horizon=horizon)
|
|
|
|
print(f"Finished generating 'highlow' labels. {len(features)} labels generated.")
|
|
elif generator == "highlow2":
|
|
horizon = App.config["highlow_horizon"]
|
|
|
|
# Binary labels whether high/low cross the threshold whichever happens first
|
|
print(f"Generating 'highlow2' labels with horizon {horizon}...")
|
|
features = generate_labels_highlow2(f_df, horizon=horizon)
|
|
|
|
print(f"Finished generating 'highlow' labels. {len(features)} labels generated.")
|
|
elif generator == "topbot":
|
|
column_name = App.config.get("topbot_column_name", "close")
|
|
|
|
top_level_fracs = [0.02, 0.03, 0.04, 0.05, 0.06]
|
|
bot_level_fracs = [-x for x in top_level_fracs]
|
|
|
|
f_df, features = generate_labels_topbot(f_df, column_name, top_level_fracs, bot_level_fracs)
|
|
elif generator == "area_labels":
|
|
area_windows = App.config["area_windows_labels"]
|
|
if not area_windows:
|
|
area_windows = [60, 120, 180, 300]
|
|
# Numeric label which is a ratio between areas over and under the latest price
|
|
features = add_area_ratio(f_df, is_future=True, column_name="close", windows=area_windows, suffix = "_area_future")
|
|
else:
|
|
print(f"Unknown feature generator {generator}")
|
|
return
|
|
|
|
#
|
|
# Add generated features to the main data frame with all other columns and features
|
|
#
|
|
f_df = f_df[features]
|
|
fp = fs.get("feature_prefix")
|
|
if fp:
|
|
f_df = f_df.add_prefix(fp + "_")
|
|
|
|
new_features = f_df.columns.to_list()
|
|
|
|
df = df.join(f_df) # Attach all derived features to the main frame
|
|
|
|
return df, new_features
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|