intelligent-trading-bot/scripts/train.py

190 lines
6.4 KiB
Python
Raw Permalink Normal View History

2022-03-20 10:09:33 +01:00
from pathlib import Path
from datetime import datetime, timezone, timedelta
import click
2022-07-24 11:06:33 +02:00
from tqdm import tqdm
2022-03-20 10:09:33 +01:00
import numpy as np
import pandas as pd
from service.App import *
from common.gen_features import *
2022-03-20 10:09:33 +01:00
from common.classifiers import *
from common.model_store import *
from common.generators import train_feature_set
2022-03-20 10:09:33 +01:00
"""
2022-07-16 11:41:27 +02:00
Train models for all target labels and all algorithms declared in the configuration using the specified features.
2022-03-20 10:09:33 +01:00
"""
#
# Parameters
#
class P:
2022-03-25 22:49:33 +01:00
in_nrows = 100_000_000 # For debugging
2022-07-17 10:04:20 +02:00
tail_rows = 0 # How many last rows to select (for debugging)
2022-03-20 10:09:33 +01:00
# Whether to store file with predictions
2022-03-20 17:11:00 +01:00
store_predictions = True
2022-03-20 10:09:33 +01:00
@click.command()
@click.option('--config_file', '-c', type=click.Path(), default='', help='Configuration file name')
def main(config_file):
load_config(config_file)
2022-07-16 11:41:27 +02:00
time_column = App.config["time_column"]
2022-03-20 10:09:33 +01:00
2022-07-16 11:41:27 +02:00
now = datetime.now()
2022-04-15 21:45:46 +02:00
2022-03-20 10:09:33 +01:00
#
# Load feature matrix
#
2022-07-16 11:41:27 +02:00
symbol = App.config["symbol"]
data_path = Path(App.config["data_folder"]) / symbol
2022-03-20 10:09:33 +01:00
file_path = data_path / App.config.get("matrix_file_name")
2022-07-16 11:41:27 +02:00
if not file_path.is_file():
print(f"ERROR: Input file does not exist: {file_path}")
2022-03-20 10:09:33 +01:00
return
2022-07-16 11:41:27 +02:00
print(f"Loading data from source data file {file_path}...")
if file_path.suffix == ".parquet":
df = pd.read_parquet(file_path)
elif file_path.suffix == ".csv":
df = pd.read_csv(file_path, parse_dates=[time_column], date_format="ISO8601", nrows=P.in_nrows)
else:
print(f"ERROR: Unknown extension of the input file '{file_path.suffix}'. Only 'csv' and 'parquet' are supported")
return
2022-07-16 11:41:27 +02:00
print(f"Finished loading {len(df)} records with {len(df.columns)} columns.")
2022-03-20 10:09:33 +01:00
2022-07-16 11:41:27 +02:00
df = df.iloc[-P.tail_rows:]
df = df.reset_index(drop=True)
2022-03-20 10:09:33 +01:00
2023-09-02 11:42:12 +02:00
print(f"Input data size {len(df)} records. Range: [{df.iloc[0][time_column]}, {df.iloc[-1][time_column]}]")
2022-07-16 11:41:27 +02:00
#
# Prepare data by selecting columns and rows
#
# Default (common) values for all trained features
2022-07-16 11:41:27 +02:00
label_horizon = App.config["label_horizon"] # Labels are generated from future data and hence we might want to explicitly remove some tail rows
train_length = App.config.get("train_length")
train_features = App.config.get("train_features")
labels = App.config["labels"]
algorithms = App.config.get("algorithms")
2022-03-20 10:09:33 +01:00
# Select necessary features and labels
2025-03-25 20:04:36 +01:00
out_columns = [time_column, 'open', 'high', 'low', 'close', 'volume', 'close_time']
2022-07-17 10:04:20 +02:00
out_columns = [x for x in out_columns if x in df.columns]
2022-07-16 11:41:27 +02:00
all_features = train_features + labels
2022-12-18 10:52:17 +01:00
df = df[out_columns + [x for x in all_features if x not in out_columns]]
2022-07-16 11:41:27 +02:00
for label in labels:
if np.issubdtype(df[label].dtype, bool):
df[label] = df[label].astype(int) # For classification tasks we want to use integers
2022-03-20 10:09:33 +01:00
# Remove the tail data for which no (correct) labels are available
# The reason is that these labels are computed from future values which are not available and hence labels might be wrong
2022-04-02 11:50:07 +02:00
if label_horizon:
2022-07-16 11:41:27 +02:00
df = df.head(-label_horizon)
2024-03-16 12:01:49 +01:00
df.replace([np.inf, -np.inf], np.nan, inplace=True)
if len(df) == 0:
2022-07-23 09:12:34 +02:00
print(f"ERROR: Empty data set after removing NULLs in feature columns. Some features might have all NULL values.")
#print(df.isnull().sum().sort_values(ascending=False))
2022-07-23 09:12:34 +02:00
return
# Limit maximum length for all algorithms (algorithms can further limit their train size)
if train_length:
df = df.tail(train_length)
df = df.reset_index(drop=True) # To remove gaps in index before use
#
# Train feature models
#
train_feature_sets = App.config.get("train_feature_sets", [])
if not train_feature_sets:
print(f"ERROR: no train feature sets defined. Nothing to process.")
return
print(f"Start training models for {len(df)} input records.")
out_df = pd.DataFrame() # Collect predictions
2022-03-20 10:09:33 +01:00
models = dict()
scores = dict()
for i, fs in enumerate(train_feature_sets):
fs_now = datetime.now()
print(f"Start train feature set {i}/{len(train_feature_sets)}. Generator {fs.get('generator')}...")
2022-07-16 11:41:27 +02:00
fs_out_df, fs_models, fs_scores = train_feature_set(df, fs, App.config)
2022-07-16 11:41:27 +02:00
out_df = pd.concat([out_df, fs_out_df], axis=1)
models.update(fs_models)
scores.update(fs_scores)
2022-07-16 11:41:27 +02:00
fs_elapsed = datetime.now() - fs_now
print(f"Finished train feature set {i}/{len(train_feature_sets)}. Generator {fs.get('generator')}. Time: {str(fs_elapsed).split('.')[0]}")
2022-07-16 11:41:27 +02:00
print(f"Finished training models.")
2022-03-20 10:09:33 +01:00
#
# Store all collected models in files
#
2023-03-11 10:05:43 +01:00
model_path = Path(App.config["model_folder"])
if not model_path.is_absolute():
model_path = data_path / model_path
model_path = model_path.resolve()
model_path.mkdir(parents=True, exist_ok=True) # Ensure that folder exists
2022-07-16 11:41:27 +02:00
2022-03-20 10:09:33 +01:00
for score_column_name, model_pair in models.items():
save_model_pair(model_path, score_column_name, model_pair)
2022-03-20 10:09:33 +01:00
print(f"Models stored in path: {model_path.absolute()}")
2022-03-20 10:09:33 +01:00
#
# Store scores
#
lines = list()
for score_column_name, score in scores.items():
line = score_column_name + ", " + str(score)
lines.append(line)
metrics_file_name = f"prediction-metrics.txt"
metrics_path = (data_path / metrics_file_name).resolve()
2022-03-25 22:49:33 +01:00
with open(metrics_path, 'a+') as f:
2022-08-05 21:27:46 +02:00
f.write("\n".join(lines) + "\n\n")
2022-03-20 10:09:33 +01:00
2022-03-25 22:49:33 +01:00
print(f"Metrics stored in path: {metrics_path.absolute()}")
2022-03-20 10:09:33 +01:00
#
# Store predictions if necessary
#
if P.store_predictions:
# Store only selected original data, labels, and their predictions
2022-07-16 11:41:27 +02:00
out_df = out_df.join(df[out_columns + labels])
2022-04-15 21:45:46 +02:00
2022-07-17 10:04:20 +02:00
out_path = data_path / App.config.get("predict_file_name")
2022-03-20 10:09:33 +01:00
2024-03-16 13:46:08 +01:00
print(f"Storing predictions with {len(out_df)} records and {len(out_df.columns)} columns in output file {out_path}...")
if out_path.suffix == ".parquet":
out_df.to_parquet(out_path, index=False)
elif out_path.suffix == ".csv":
out_df.to_csv(out_path, index=False, float_format='%.6f')
else:
print(f"ERROR: Unknown extension of the output file '{out_path.suffix}'. Only 'csv' and 'parquet' are supported")
2024-03-16 13:46:08 +01:00
return
2022-03-26 19:25:37 +01:00
print(f"Predictions stored in file: {out_path}. Length: {len(out_df)}. Columns: {len(out_df.columns)}")
2022-03-20 10:09:33 +01:00
#
# End
#
2022-07-16 11:41:27 +02:00
elapsed = datetime.now() - now
print(f"Finished training models in {str(elapsed).split('.')[0]}")
2022-03-20 10:09:33 +01:00
if __name__ == '__main__':
main()