mirror of
https://github.com/asavinov/intelligent-trading-bot.git
synced 2026-05-04 08:26:19 +00:00
361 lines
15 KiB
Python
361 lines
15 KiB
Python
from pathlib import Path
|
|
from datetime import datetime, timezone, timedelta
|
|
import time
|
|
import click
|
|
from concurrent.futures import ProcessPoolExecutor
|
|
from joblib import Parallel, delayed
|
|
import multiprocessing as mp
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
import pandas.api.types as ptypes
|
|
|
|
from service.App import *
|
|
from common.model_store import *
|
|
from common.gen_features import *
|
|
from common.utils import compute_scores_regression, compute_scores
|
|
from common.generators import train_feature_set, predict_feature_set
|
|
|
|
"""
|
|
Generate label predictions for the whole input feature matrix by iteratively training models using historic data and predicting labels for some future horizon.
|
|
The main parameter is the step of iteration, that is, the future horizon for prediction.
|
|
As usual, we can specify past history length used to train a model.
|
|
The output file will store predicted labels in addition to all input columns (generated features and true labels).
|
|
This file is intended for training signal models (by simulating trade process and computing overall performance for some long period).
|
|
The output predicted labels will cover shorter period of time because we need some relatively long history to train the very first model.
|
|
"""
|
|
|
|
#
|
|
# Main
|
|
#
|
|
|
|
@click.command()
|
|
@click.option('--config_file', '-c', type=click.Path(), default='', help='Configuration file name')
|
|
def main(config_file):
|
|
load_config(config_file)
|
|
config = App.config
|
|
|
|
App.model_store = ModelStore(config)
|
|
App.model_store.load_models()
|
|
|
|
time_column = config["time_column"]
|
|
|
|
now = datetime.now()
|
|
|
|
symbol = config["symbol"]
|
|
data_path = Path(config["data_folder"]) / symbol
|
|
|
|
#
|
|
# Load matrix data with regular time series
|
|
#
|
|
file_path = data_path / config.get("matrix_file_name")
|
|
if not file_path.is_file():
|
|
print(f"ERROR: Input file does not exist: {file_path}")
|
|
return
|
|
|
|
print(f"Loading data from source data file {file_path}...")
|
|
if file_path.suffix == ".parquet":
|
|
df = pd.read_parquet(file_path)
|
|
elif file_path.suffix == ".csv":
|
|
df = pd.read_csv(file_path, parse_dates=[time_column], date_format="ISO8601")
|
|
else:
|
|
print(f"ERROR: Unknown extension of the input file '{file_path.suffix}'. Only 'csv' and 'parquet' are supported")
|
|
return
|
|
|
|
print(f"Finished loading {len(df)} records with {len(df.columns)} columns.")
|
|
|
|
#
|
|
# Limit the source data
|
|
#
|
|
rp_config = config["rolling_predict"]
|
|
|
|
data_start = rp_config.get("data_start", None)
|
|
data_end = rp_config.get("data_end", None)
|
|
|
|
if data_start:
|
|
if isinstance(data_start, str):
|
|
df = df[ df[time_column] >= data_start ]
|
|
elif isinstance(data_start, int):
|
|
df = df.iloc[data_start:]
|
|
|
|
if data_end:
|
|
if isinstance(data_end, str):
|
|
df = df[ df[time_column] < data_end ]
|
|
elif isinstance(data_end, int):
|
|
df = df.iloc[:-data_end]
|
|
|
|
df = df.reset_index(drop=True)
|
|
|
|
print(f"Input data size {len(df)} records. Range: [{df.iloc[0][time_column]}, {df.iloc[-1][time_column]}]")
|
|
|
|
#
|
|
# Determine parameters of the rolling prediction loop
|
|
#
|
|
|
|
prediction_start = rp_config.get("prediction_start", None)
|
|
if isinstance(prediction_start, str):
|
|
prediction_start = find_index(df, prediction_start)
|
|
prediction_size = rp_config.get("prediction_size")
|
|
prediction_steps = rp_config.get("prediction_steps")
|
|
|
|
# Compute a missing parameter if any
|
|
if not prediction_start:
|
|
if not prediction_size or not prediction_steps:
|
|
raise ValueError(f"Only one of the three rolling prediction loop parameters can be empty.")
|
|
# Where we have to start in order to perform the specified number of steps each having the specified length
|
|
prediction_start = len(df) - prediction_size*prediction_steps
|
|
elif not prediction_size:
|
|
if not prediction_start or not prediction_steps:
|
|
raise ValueError(f"Only one of the three rolling prediction loop parameters can be empty.")
|
|
# Size of one prediction in order to get the specified number of steps with the specified length
|
|
prediction_size = (len(df) - prediction_start) // prediction_steps
|
|
elif not prediction_steps:
|
|
if not prediction_start or not prediction_size:
|
|
raise ValueError(f"Only one of the three rolling prediction loop parameters can be empty.")
|
|
# Number of steps with the specified length with the specified start
|
|
prediction_steps = (len(df) - prediction_start) // prediction_size
|
|
|
|
# Check consistency of the loop parameters
|
|
if len(df) - prediction_start < prediction_steps * prediction_size:
|
|
raise ValueError(f"Not enough data for {prediction_steps} steps each of size {prediction_size} starting from {prediction_start}. Available data for prediction: {len(df) - prediction_start}")
|
|
|
|
#
|
|
# Prepare data by selecting columns and rows
|
|
#
|
|
train_features_all = config.get("train_features")
|
|
labels_all = config.get("labels")
|
|
|
|
# Select necessary features and label
|
|
out_columns = [time_column, 'open', 'high', 'low', 'close', 'volume', 'close_time']
|
|
out_columns = [x for x in out_columns if x in df.columns]
|
|
labels_present = set(labels_all).issubset(df.columns)
|
|
if labels_present:
|
|
all_features = train_features_all + labels_all
|
|
else:
|
|
all_features = train_features_all
|
|
df = df[out_columns + [x for x in all_features if x not in out_columns]]
|
|
|
|
for label in labels_all:
|
|
if np.issubdtype(df[label].dtype, bool):
|
|
df[label] = df[label].astype(int) # For classification tasks we want to use integers
|
|
|
|
df.replace([np.inf, -np.inf], np.nan, inplace=True)
|
|
#in_df = in_df.dropna(subset=labels)
|
|
df = df.reset_index(drop=True) # We must reset index after removing rows to remove gaps
|
|
|
|
print(f"Start index: {prediction_start}. Number of steps: {prediction_steps}. Step size: {prediction_size}")
|
|
|
|
#
|
|
# Rolling/moving train-predict sequence
|
|
#
|
|
|
|
train_feature_sets = config.get("train_feature_sets", [])
|
|
if not train_feature_sets:
|
|
print(f"ERROR: no train feature sets defined. Nothing to process.")
|
|
return
|
|
|
|
# TODO: It will be removed because we do not use directly a list of algorithms (it can be empty)
|
|
label_horizon = config["label_horizon"] # Labels are generated from future data and hence we might want to explicitly remove some tail rows
|
|
train_length = config.get("train_length")
|
|
|
|
labels_hat_df = pd.DataFrame() # Result rows. Here store only rows for which we make predictions
|
|
|
|
print(f"Starting rolling predict loop...")
|
|
|
|
use_multiprocessing = rp_config.get("use_multiprocessing", False)
|
|
max_workers = rp_config.get("max_workers", None)
|
|
if use_multiprocessing:
|
|
parallel = Parallel(n_jobs=max_workers, backend="loky", verbose=13) # ['loky', 'multiprocessing', 'sequential', 'threading']
|
|
#parallel = mp.Pool(processes=max_workers)
|
|
#parallel = ProcessPoolExecutor(max_workers=max_workers)
|
|
else:
|
|
parallel = None
|
|
|
|
for step in range(prediction_steps):
|
|
|
|
# Predict data
|
|
|
|
predict_start = prediction_start + (step * prediction_size)
|
|
predict_end = predict_start + prediction_size
|
|
|
|
predict_df = df.iloc[predict_start:predict_end] # Assume iloc equal to index
|
|
df_X_test = predict_df[train_features_all]
|
|
|
|
# Train data
|
|
|
|
# We exclude recent objects from training, because they do not have labels yet - the labels are in future
|
|
# In real (stream) data, we will have null labels for recent objects. During simulation, labels are available and hence we need to ignore/exclude them manually
|
|
train_end = predict_start - label_horizon - 1
|
|
if train_length:
|
|
train_start = max(0, train_end - train_length)
|
|
else:
|
|
train_start = 0
|
|
|
|
train_df = df.iloc[train_start:train_end] # We assume that iloc is equal to index
|
|
train_df = train_df.dropna(subset=train_features_all)
|
|
|
|
print(f"\n===>>> Start step {step}/{prediction_steps}. Train range: [{train_start}, {train_end}]={train_end-train_start}. Prediction range: [{predict_start}, {predict_end}]={predict_end-predict_start}")
|
|
|
|
step_start_time = datetime.now()
|
|
|
|
#
|
|
# Real execution of one step
|
|
#
|
|
predict_labels_df = execute_train_predict_step(config, train_df, predict_df, parallel)
|
|
|
|
# Append predicted rows to the end of previous predicted rows
|
|
labels_hat_df = pd.concat([labels_hat_df, predict_labels_df])
|
|
|
|
elapsed = datetime.now() - step_start_time
|
|
print(f"End step {step}/{prediction_steps}. Scores predicted: {len(predict_labels_df.columns)}. Time elapsed: {str(elapsed).split('.')[0]}")
|
|
|
|
# End of loop over prediction steps
|
|
print("")
|
|
print(f"Finished all {prediction_steps} prediction steps each with {prediction_size} predicted rows (stride). ")
|
|
print(f"Size of predicted dataframe {len(labels_hat_df)}. Number of rows in all steps {prediction_steps*prediction_size} (steps * stride). ")
|
|
print(f"Number of predicted columns {len(labels_hat_df.columns)}")
|
|
|
|
#
|
|
# Store data
|
|
#
|
|
# We do not store features. Only selected original data, labels, and their predictions
|
|
out_df = labels_hat_df.join(df[out_columns + labels_all])
|
|
|
|
out_path = data_path / config.get("predict_file_name")
|
|
|
|
print(f"Storing predictions with {len(out_df)} records and {len(out_df.columns)} columns in output file {out_path}...")
|
|
if out_path.suffix == ".parquet":
|
|
out_df.to_parquet(out_path, index=False)
|
|
elif out_path.suffix == ".csv":
|
|
out_df.to_csv(out_path, index=False, float_format='%.6f')
|
|
else:
|
|
print(f"ERROR: Unknown extension of the output file '{out_path.suffix}'. Only 'csv' and 'parquet' are supported")
|
|
return
|
|
|
|
print(f"Predictions stored in file: {out_path}. Length: {len(out_df)}. Columns: {len(out_df.columns)}")
|
|
|
|
#
|
|
# Compute and store scores
|
|
#
|
|
score_lines = []
|
|
# For each predicted column, find the corresponding true label column and then compare them
|
|
for score_column_name in labels_hat_df.columns:
|
|
label_column, _ = score_to_label_algo_pair(score_column_name)
|
|
|
|
# Drop nans from scores
|
|
df_scores = pd.DataFrame({"y_true": out_df[label_column], "y_predicted": out_df[score_column_name]})
|
|
df_scores = df_scores.dropna()
|
|
|
|
y_true = df_scores["y_true"]
|
|
y_predicted = df_scores["y_predicted"]
|
|
y_predicted_class = np.where(y_predicted.values > 0.5, 1, 0)
|
|
|
|
if ptypes.is_float_dtype(y_true) and ptypes.is_float_dtype(y_predicted):
|
|
score = compute_scores_regression(y_true, y_predicted) # Regression stores
|
|
else:
|
|
score = compute_scores(y_true.astype(int), y_predicted) # Classification stores
|
|
|
|
score_lines.append(f"{score_column_name}: {score}")
|
|
|
|
#
|
|
# Store scores
|
|
#
|
|
score_path = out_path.with_suffix('.txt')
|
|
with open(score_path, "a+") as f:
|
|
f.write("\n".join([str(x) for x in score_lines]) + "\n\n")
|
|
|
|
print(f"Prediction scores stored in path: {score_path.absolute()}")
|
|
|
|
#
|
|
# End
|
|
#
|
|
elapsed = datetime.now() - now
|
|
print(f"Finished rolling prediction in {str(elapsed).split('.')[0]}")
|
|
|
|
|
|
def execute_train_predict_step(config: dict, train_df: pd.DataFrame, predict_df: pd.DataFrame, parallel):
|
|
"""
|
|
This function is supposed to be used in one step of rolling prediction.
|
|
It gets train data (which is supposed to move along larger data set) and data to be used for predictions.
|
|
The predict data set is supposed to be selected from future data relative to the train data.
|
|
|
|
:param config: global config
|
|
:param train_df: data to be used for training with all features and all true labels
|
|
:param predict_df: data to be used for prediction with all features and true labels used for computing score
|
|
:return: predictions for all labels for objects in predict_df
|
|
"""
|
|
rp_config = config["rolling_predict"]
|
|
|
|
train_feature_sets = config.get("train_feature_sets", [])
|
|
|
|
#
|
|
# 1 Execute only training (copy from train script)
|
|
#
|
|
|
|
fs_now = datetime.now()
|
|
print(f"Start train all models from {len(train_feature_sets)} feature sets {"sequentially" if not parallel else "in parallel"}. Train set size: {len(train_df)} ")
|
|
|
|
models = dict()
|
|
if isinstance(parallel, Parallel):
|
|
results = parallel(delayed(train_feature_set)(train_df, fs, config) for fs in train_feature_sets)
|
|
for fs_models in results:
|
|
models.update(fs_models)
|
|
elif isinstance(parallel, mp.pool.Pool):
|
|
#results = parallel.starmap(train_feature_set, [(train_df, fs, config) for fs in train_feature_sets])
|
|
results = [parallel.apply(train_feature_set, args=(train_df, fs, config)) for fs in train_feature_sets]
|
|
elif isinstance(parallel, ProcessPoolExecutor):
|
|
# Submit all in a loop
|
|
execution_results = dict() # Futures for each label
|
|
for i, fs in enumerate(train_feature_sets):
|
|
score_column_name = f"label_{i}"
|
|
execution_results[score_column_name] = parallel.submit(train_feature_set, train_df, fs, config)
|
|
|
|
results = dict()
|
|
for score_column_name, future in execution_results.items():
|
|
results[score_column_name] = future.result()
|
|
if future.exception():
|
|
print(f"Exception while train-predict {score_column_name}.")
|
|
return
|
|
|
|
else: # No multiprocessing - sequential execution
|
|
for i, fs in enumerate(train_feature_sets): # Execute sequentially
|
|
fs_models = train_feature_set(train_df, fs, config)
|
|
models.update(fs_models)
|
|
|
|
fs_elapsed = datetime.now() - fs_now
|
|
print(f"Finished train all. Time: {str(fs_elapsed).split('.')[0]}")
|
|
|
|
#
|
|
# 2 Store all collected models in files
|
|
#
|
|
|
|
# NOTE: train generator does NOT store models in model store but only returns them
|
|
# yet, predict generator, expects the models to be in model store - it constructs the necessary model name and extracts it from model store
|
|
# so we need to put the trained models in the model store (they will be stored automatically and overwrite previously trained models which is ok)
|
|
for score_column_name, model_pair in models.items():
|
|
App.model_store.put_model_pair(score_column_name, model_pair)
|
|
|
|
print(f"Models stored in path: {App.model_store.model_path.absolute()}")
|
|
|
|
# 3 Execute only predict where df for the generator is a different predict df
|
|
# (Copy from predict script)
|
|
|
|
fs_now = datetime.now()
|
|
print(f"Start predictions for {len(predict_df)} input records.")
|
|
|
|
out_df = pd.DataFrame() # Collect predictions
|
|
features = []
|
|
|
|
for i, fs in enumerate(train_feature_sets):
|
|
fs_out_df, fs_features = predict_feature_set(predict_df, fs, config, App.model_store)
|
|
out_df = pd.concat([out_df, fs_out_df], axis=1)
|
|
features.extend(fs_features)
|
|
|
|
fs_elapsed = datetime.now() - fs_now
|
|
print(f"Finished predictions. Time: {str(fs_elapsed).split('.')[0]}")
|
|
|
|
return out_df
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|