intelligent-trading-bot/scripts/signals.py

134 lines
4.3 KiB
Python
Raw Permalink Normal View History

2022-08-06 18:26:54 +02:00
from pathlib import Path
import click
from tqdm import tqdm
import numpy as np
import pandas as pd
from service.App import *
from common.model_store import *
from common.generators import generate_feature_set
2022-08-06 18:26:54 +02:00
"""
Generate new derived columns according to the signal definitions.
The transformations are applied to the results of ML predictions.
2022-08-06 18:26:54 +02:00
"""
@click.command()
@click.option('--config_file', '-c', type=click.Path(), default='', help='Configuration file name')
def main(config_file):
"""
"""
load_config(config_file)
config = App.config
2022-08-06 18:26:54 +02:00
App.model_store = ModelStore(config)
App.model_store.load_models()
time_column = config["time_column"]
2022-08-06 18:26:54 +02:00
now = datetime.now()
symbol = config["symbol"]
data_path = Path(config["data_folder"]) / symbol
# Determine desired data length depending on train/predict mode
is_train = config.get("train")
if is_train:
window_size = config.get("train_length")
else:
window_size = config.get("predict_length")
features_horizon = config.get("features_horizon")
if window_size:
window_size += features_horizon
2022-08-06 18:26:54 +02:00
#
# Load data
2022-08-06 18:26:54 +02:00
#
file_path = data_path / config.get("predict_file_name")
2022-08-06 18:26:54 +02:00
if not file_path.exists():
print(f"ERROR: Input file does not exist: {file_path}")
return
print(f"Loading data from source data file {file_path}...")
2024-03-16 13:46:08 +01:00
if file_path.suffix == ".parquet":
df = pd.read_parquet(file_path)
elif file_path.suffix == ".csv":
df = pd.read_csv(file_path, parse_dates=[time_column], date_format="ISO8601")
2024-03-16 13:46:08 +01:00
else:
print(f"ERROR: Unknown extension of the input file '{file_path.suffix}'. Only 'csv' and 'parquet' are supported")
2024-03-16 13:46:08 +01:00
return
2022-08-06 18:26:54 +02:00
print(f"Finished loading {len(df)} records with {len(df.columns)} columns from the source file {file_path}")
# Select only the data necessary for analysis
if window_size:
df = df.tail(window_size)
df = df.reset_index(drop=True)
2022-08-06 18:26:54 +02:00
2023-09-02 11:42:12 +02:00
print(f"Input data size {len(df)} records. Range: [{df.iloc[0][time_column]}, {df.iloc[-1][time_column]}]")
2022-08-06 18:26:54 +02:00
#
# Apply signal generators
2022-08-06 18:26:54 +02:00
#
feature_sets = config.get("signal_sets", [])
if not feature_sets:
print(f"ERROR: no signal sets defined. Nothing to process.")
return
print(f"Start generating features for {len(df)} input records.")
all_features = []
for i, fs in enumerate(feature_sets):
fs_now = datetime.now()
print(f"Start feature set {i}/{len(feature_sets)}. Generator {fs.get('generator')}...")
2025-01-11 18:44:28 +01:00
df, new_features = generate_feature_set(df, fs, config, App.model_store, last_rows=0)
2025-01-11 18:44:28 +01:00
all_features.extend(new_features)
2025-01-11 18:44:28 +01:00
fs_elapsed = datetime.now() - fs_now
print(f"Finished feature set {i}/{len(feature_sets)}. Generator {fs.get('generator')}. Features: {len(new_features)}. Time: {str(fs_elapsed).split('.')[0]}")
print(f"Finished generating features.")
# Handle NULLs
df.replace([np.inf, -np.inf], np.nan, inplace=True)
na_df = df[ df[all_features].isna().any(axis=1) ]
if len(na_df) > 0:
print(f"WARNING: There exist {len(na_df)} rows with NULLs in some columns")
print(f"Number of NULL values:")
print(df[all_features].isnull().sum().sort_values(ascending=False))
#
# Choose columns to stored
2022-08-06 18:26:54 +02:00
#
2025-03-25 20:04:36 +01:00
out_columns = [time_column, "open", "high", "low", "close"] # Source data
out_columns.extend(config.get('labels')) # True labels
2025-08-24 12:28:22 +02:00
out_columns = [x for x in out_columns if x in df.columns]
out_columns.extend(all_features)
2022-08-06 18:26:54 +02:00
out_df = df[out_columns]
#
# Store data
#
out_path = data_path / config.get("signal_file_name")
2022-08-06 18:26:54 +02:00
2024-03-16 14:06:50 +01:00
print(f"Storing signals with {len(out_df)} records and {len(out_df.columns)} columns in output file {out_path}...")
if out_path.suffix == ".parquet":
out_df.to_parquet(out_path, index=False)
elif out_path.suffix == ".csv":
out_df.to_csv(out_path, index=False, float_format='%.6f')
else:
print(f"ERROR: Unknown extension of the output file '{out_path.suffix}'. Only 'csv' and 'parquet' are supported")
2024-03-16 14:06:50 +01:00
return
2022-08-06 18:26:54 +02:00
print(f"Signals stored in file: {out_path}. Length: {len(out_df)}. Columns: {len(out_df.columns)}")
elapsed = datetime.now() - now
print(f"Finished signal generation in {str(elapsed).split('.')[0]}")
if __name__ == '__main__':
main()