1
0
Çatalla 0
şunun yansıması https://github.com/asavinov/intelligent-trading-bot.git eşitlenme: 2026-05-04 08:26:19 +00:00
intelligent-trading-bot/scripts/predict_rolling.py

361 satır
15 KiB
Python
Ham Kalıcı Bağlantı Normal Görünüm Geçmiş

2022-03-20 10:09:33 +01:00
from pathlib import Path
from datetime import datetime, timezone, timedelta
2025-10-12 13:49:11 +02:00
import time
2022-03-20 10:09:33 +01:00
import click
2025-10-12 13:49:11 +02:00
from concurrent.futures import ProcessPoolExecutor
from joblib import Parallel, delayed
import multiprocessing as mp
2022-03-20 10:09:33 +01:00
import numpy as np
import pandas as pd
2025-10-12 13:49:11 +02:00
import pandas.api.types as ptypes
2022-03-20 10:09:33 +01:00
from service.App import *
from common.model_store import *
2025-10-12 13:49:11 +02:00
from common.gen_features import *
from common.utils import compute_scores_regression, compute_scores
2025-10-12 13:49:11 +02:00
from common.generators import train_feature_set, predict_feature_set
2022-03-20 10:09:33 +01:00
"""
Generate label predictions for the whole input feature matrix by iteratively training models using historic data and predicting labels for some future horizon.
The main parameter is the step of iteration, that is, the future horizon for prediction.
As usual, we can specify past history length used to train a model.
The output file will store predicted labels in addition to all input columns (generated features and true labels).
This file is intended for training signal models (by simulating trade process and computing overall performance for some long period).
The output predicted labels will cover shorter period of time because we need some relatively long history to train the very first model.
"""
#
# Main
#
@click.command()
@click.option('--config_file', '-c', type=click.Path(), default='', help='Configuration file name')
def main(config_file):
load_config(config_file)
2025-10-12 13:49:11 +02:00
config = App.config
2022-03-20 10:09:33 +01:00
2025-10-12 13:49:11 +02:00
App.model_store = ModelStore(config)
App.model_store.load_models()
2025-10-12 13:49:11 +02:00
time_column = config["time_column"]
2022-03-26 19:25:37 +01:00
now = datetime.now()
2025-10-12 13:49:11 +02:00
symbol = config["symbol"]
data_path = Path(config["data_folder"]) / symbol
2022-04-15 21:45:46 +02:00
2022-03-20 10:09:33 +01:00
#
2025-10-12 13:49:11 +02:00
# Load matrix data with regular time series
2022-03-20 10:09:33 +01:00
#
2025-10-12 13:49:11 +02:00
file_path = data_path / config.get("matrix_file_name")
if not file_path.is_file():
print(f"ERROR: Input file does not exist: {file_path}")
2022-03-20 10:09:33 +01:00
return
print(f"Loading data from source data file {file_path}...")
if file_path.suffix == ".parquet":
df = pd.read_parquet(file_path)
elif file_path.suffix == ".csv":
2025-10-12 13:49:11 +02:00
df = pd.read_csv(file_path, parse_dates=[time_column], date_format="ISO8601")
else:
print(f"ERROR: Unknown extension of the input file '{file_path.suffix}'. Only 'csv' and 'parquet' are supported")
return
2025-10-12 13:49:11 +02:00
print(f"Finished loading {len(df)} records with {len(df.columns)} columns.")
2022-03-20 10:09:33 +01:00
#
# Limit the source data
#
2025-10-12 13:49:11 +02:00
rp_config = config["rolling_predict"]
2025-10-12 13:49:11 +02:00
data_start = rp_config.get("data_start", None)
data_end = rp_config.get("data_end", None)
2025-10-12 13:49:11 +02:00
if data_start:
if isinstance(data_start, str):
df = df[ df[time_column] >= data_start ]
elif isinstance(data_start, int):
df = df.iloc[data_start:]
if data_end:
if isinstance(data_end, str):
df = df[ df[time_column] < data_end ]
elif isinstance(data_end, int):
df = df.iloc[:-data_end]
df = df.reset_index(drop=True)
2023-09-02 11:42:12 +02:00
print(f"Input data size {len(df)} records. Range: [{df.iloc[0][time_column]}, {df.iloc[-1][time_column]}]")
#
# Determine parameters of the rolling prediction loop
#
prediction_start = rp_config.get("prediction_start", None)
2023-08-19 16:40:59 +02:00
if isinstance(prediction_start, str):
prediction_start = find_index(df, prediction_start)
prediction_size = rp_config.get("prediction_size")
prediction_steps = rp_config.get("prediction_steps")
# Compute a missing parameter if any
if not prediction_start:
if not prediction_size or not prediction_steps:
raise ValueError(f"Only one of the three rolling prediction loop parameters can be empty.")
# Where we have to start in order to perform the specified number of steps each having the specified length
prediction_start = len(df) - prediction_size*prediction_steps
elif not prediction_size:
if not prediction_start or not prediction_steps:
raise ValueError(f"Only one of the three rolling prediction loop parameters can be empty.")
# Size of one prediction in order to get the specified number of steps with the specified length
prediction_size = (len(df) - prediction_start) // prediction_steps
elif not prediction_steps:
if not prediction_start or not prediction_size:
raise ValueError(f"Only one of the three rolling prediction loop parameters can be empty.")
# Number of steps with the specified length with the specified start
prediction_steps = (len(df) - prediction_start) // prediction_size
# Check consistency of the loop parameters
if len(df) - prediction_start < prediction_steps * prediction_size:
raise ValueError(f"Not enough data for {prediction_steps} steps each of size {prediction_size} starting from {prediction_start}. Available data for prediction: {len(df) - prediction_start}")
#
# Prepare data by selecting columns and rows
#
2025-10-12 13:49:11 +02:00
train_features_all = config.get("train_features")
labels_all = config.get("labels")
# Select necessary features and label
2022-08-05 21:27:46 +02:00
out_columns = [time_column, 'open', 'high', 'low', 'close', 'volume', 'close_time']
out_columns = [x for x in out_columns if x in df.columns]
2025-10-12 13:49:11 +02:00
labels_present = set(labels_all).issubset(df.columns)
if labels_present:
2025-10-12 13:49:11 +02:00
all_features = train_features_all + labels_all
else:
2025-10-12 13:49:11 +02:00
all_features = train_features_all
df = df[out_columns + [x for x in all_features if x not in out_columns]]
2025-10-12 13:49:11 +02:00
for label in labels_all:
if np.issubdtype(df[label].dtype, bool):
df[label] = df[label].astype(int) # For classification tasks we want to use integers
2024-03-16 12:01:49 +01:00
df.replace([np.inf, -np.inf], np.nan, inplace=True)
#in_df = in_df.dropna(subset=labels)
df = df.reset_index(drop=True) # We must reset index after removing rows to remove gaps
2022-03-20 10:09:33 +01:00
print(f"Start index: {prediction_start}. Number of steps: {prediction_steps}. Step size: {prediction_size}")
2025-10-12 13:49:11 +02:00
#
# Rolling/moving train-predict sequence
#
train_feature_sets = config.get("train_feature_sets", [])
if not train_feature_sets:
print(f"ERROR: no train feature sets defined. Nothing to process.")
return
# TODO: It will be removed because we do not use directly a list of algorithms (it can be empty)
label_horizon = config["label_horizon"] # Labels are generated from future data and hence we might want to explicitly remove some tail rows
train_length = config.get("train_length")
labels_hat_df = pd.DataFrame() # Result rows. Here store only rows for which we make predictions
print(f"Starting rolling predict loop...")
2025-10-12 13:49:11 +02:00
use_multiprocessing = rp_config.get("use_multiprocessing", False)
max_workers = rp_config.get("max_workers", None)
if use_multiprocessing:
parallel = Parallel(n_jobs=max_workers, backend="loky", verbose=13) # ['loky', 'multiprocessing', 'sequential', 'threading']
#parallel = mp.Pool(processes=max_workers)
#parallel = ProcessPoolExecutor(max_workers=max_workers)
else:
parallel = None
for step in range(prediction_steps):
2022-03-20 10:09:33 +01:00
# Predict data
predict_start = prediction_start + (step * prediction_size)
predict_end = predict_start + prediction_size
2022-03-20 10:09:33 +01:00
2025-10-12 13:49:11 +02:00
predict_df = df.iloc[predict_start:predict_end] # Assume iloc equal to index
df_X_test = predict_df[train_features_all]
# Train data
# We exclude recent objects from training, because they do not have labels yet - the labels are in future
# In real (stream) data, we will have null labels for recent objects. During simulation, labels are available and hence we need to ignore/exclude them manually
train_end = predict_start - label_horizon - 1
if train_length:
train_start = max(0, train_end - train_length)
else:
train_start = 0
train_df = df.iloc[train_start:train_end] # We assume that iloc is equal to index
2025-10-12 13:49:11 +02:00
train_df = train_df.dropna(subset=train_features_all)
2025-10-12 13:49:11 +02:00
print(f"\n===>>> Start step {step}/{prediction_steps}. Train range: [{train_start}, {train_end}]={train_end-train_start}. Prediction range: [{predict_start}, {predict_end}]={predict_end-predict_start}")
step_start_time = datetime.now()
2022-03-20 10:09:33 +01:00
#
2025-10-12 13:49:11 +02:00
# Real execution of one step
2022-03-20 10:09:33 +01:00
#
2025-10-12 13:49:11 +02:00
predict_labels_df = execute_train_predict_step(config, train_df, predict_df, parallel)
2022-08-05 21:27:46 +02:00
2025-10-12 13:49:11 +02:00
# Append predicted rows to the end of previous predicted rows
labels_hat_df = pd.concat([labels_hat_df, predict_labels_df])
2022-03-20 10:09:33 +01:00
elapsed = datetime.now() - step_start_time
print(f"End step {step}/{prediction_steps}. Scores predicted: {len(predict_labels_df.columns)}. Time elapsed: {str(elapsed).split('.')[0]}")
2022-03-20 10:09:33 +01:00
# End of loop over prediction steps
print("")
print(f"Finished all {prediction_steps} prediction steps each with {prediction_size} predicted rows (stride). ")
print(f"Size of predicted dataframe {len(labels_hat_df)}. Number of rows in all steps {prediction_steps*prediction_size} (steps * stride). ")
2022-03-20 10:09:33 +01:00
print(f"Number of predicted columns {len(labels_hat_df.columns)}")
#
# Store data
#
2022-03-26 19:25:37 +01:00
# We do not store features. Only selected original data, labels, and their predictions
2025-10-12 13:49:11 +02:00
out_df = labels_hat_df.join(df[out_columns + labels_all])
2022-03-20 10:09:33 +01:00
2025-10-12 13:49:11 +02:00
out_path = data_path / config.get("predict_file_name")
2022-04-15 21:45:46 +02:00
2024-03-16 13:46:08 +01:00
print(f"Storing predictions with {len(out_df)} records and {len(out_df.columns)} columns in output file {out_path}...")
if out_path.suffix == ".parquet":
out_df.to_parquet(out_path, index=False)
elif out_path.suffix == ".csv":
out_df.to_csv(out_path, index=False, float_format='%.6f')
else:
print(f"ERROR: Unknown extension of the output file '{out_path.suffix}'. Only 'csv' and 'parquet' are supported")
2024-03-16 13:46:08 +01:00
return
2022-03-26 19:25:37 +01:00
print(f"Predictions stored in file: {out_path}. Length: {len(out_df)}. Columns: {len(out_df.columns)}")
2022-03-20 10:09:33 +01:00
#
2025-10-12 13:49:11 +02:00
# Compute and store scores
2022-03-20 10:09:33 +01:00
#
score_lines = []
2025-10-12 13:49:11 +02:00
# For each predicted column, find the corresponding true label column and then compare them
2022-03-20 10:09:33 +01:00
for score_column_name in labels_hat_df.columns:
label_column, _ = score_to_label_algo_pair(score_column_name)
2022-03-20 10:09:33 +01:00
# Drop nans from scores
df_scores = pd.DataFrame({"y_true": out_df[label_column], "y_predicted": out_df[score_column_name]})
df_scores = df_scores.dropna()
2025-10-12 13:49:11 +02:00
y_true = df_scores["y_true"]
2022-03-20 10:09:33 +01:00
y_predicted = df_scores["y_predicted"]
y_predicted_class = np.where(y_predicted.values > 0.5, 1, 0)
2025-10-12 13:49:11 +02:00
if ptypes.is_float_dtype(y_true) and ptypes.is_float_dtype(y_predicted):
score = compute_scores_regression(y_true, y_predicted) # Regression stores
else:
2025-10-12 13:49:11 +02:00
score = compute_scores(y_true.astype(int), y_predicted) # Classification stores
2022-03-20 10:09:33 +01:00
2025-10-12 13:49:11 +02:00
score_lines.append(f"{score_column_name}: {score}")
2022-03-20 10:09:33 +01:00
#
2025-10-12 13:49:11 +02:00
# Store scores
2022-03-20 10:09:33 +01:00
#
2025-10-12 13:49:11 +02:00
score_path = out_path.with_suffix('.txt')
with open(score_path, "a+") as f:
f.write("\n".join([str(x) for x in score_lines]) + "\n\n")
2022-03-20 10:09:33 +01:00
2025-10-12 13:49:11 +02:00
print(f"Prediction scores stored in path: {score_path.absolute()}")
#
# End
#
elapsed = datetime.now() - now
print(f"Finished rolling prediction in {str(elapsed).split('.')[0]}")
2022-03-20 10:09:33 +01:00
2025-10-12 13:49:11 +02:00
def execute_train_predict_step(config: dict, train_df: pd.DataFrame, predict_df: pd.DataFrame, parallel):
"""
This function is supposed to be used in one step of rolling prediction.
It gets train data (which is supposed to move along larger data set) and data to be used for predictions.
The predict data set is supposed to be selected from future data relative to the train data.
:param config: global config
:param train_df: data to be used for training with all features and all true labels
:param predict_df: data to be used for prediction with all features and true labels used for computing score
:return: predictions for all labels for objects in predict_df
"""
rp_config = config["rolling_predict"]
train_feature_sets = config.get("train_feature_sets", [])
#
# 1 Execute only training (copy from train script)
#
fs_now = datetime.now()
print(f"Start train all models from {len(train_feature_sets)} feature sets {"sequentially" if not parallel else "in parallel"}. Train set size: {len(train_df)} ")
models = dict()
if isinstance(parallel, Parallel):
results = parallel(delayed(train_feature_set)(train_df, fs, config) for fs in train_feature_sets)
for fs_models in results:
models.update(fs_models)
elif isinstance(parallel, mp.pool.Pool):
#results = parallel.starmap(train_feature_set, [(train_df, fs, config) for fs in train_feature_sets])
results = [parallel.apply(train_feature_set, args=(train_df, fs, config)) for fs in train_feature_sets]
elif isinstance(parallel, ProcessPoolExecutor):
# Submit all in a loop
execution_results = dict() # Futures for each label
for i, fs in enumerate(train_feature_sets):
score_column_name = f"label_{i}"
execution_results[score_column_name] = parallel.submit(train_feature_set, train_df, fs, config)
results = dict()
for score_column_name, future in execution_results.items():
results[score_column_name] = future.result()
if future.exception():
print(f"Exception while train-predict {score_column_name}.")
return
else: # No multiprocessing - sequential execution
for i, fs in enumerate(train_feature_sets): # Execute sequentially
fs_models = train_feature_set(train_df, fs, config)
models.update(fs_models)
fs_elapsed = datetime.now() - fs_now
print(f"Finished train all. Time: {str(fs_elapsed).split('.')[0]}")
#
# 2 Store all collected models in files
#
# NOTE: train generator does NOT store models in model store but only returns them
# yet, predict generator, expects the models to be in model store - it constructs the necessary model name and extracts it from model store
# so we need to put the trained models in the model store (they will be stored automatically and overwrite previously trained models which is ok)
for score_column_name, model_pair in models.items():
App.model_store.put_model_pair(score_column_name, model_pair)
print(f"Models stored in path: {App.model_store.model_path.absolute()}")
# 3 Execute only predict where df for the generator is a different predict df
# (Copy from predict script)
fs_now = datetime.now()
print(f"Start predictions for {len(predict_df)} input records.")
out_df = pd.DataFrame() # Collect predictions
features = []
for i, fs in enumerate(train_feature_sets):
fs_out_df, fs_features = predict_feature_set(predict_df, fs, config, App.model_store)
out_df = pd.concat([out_df, fs_out_df], axis=1)
features.extend(fs_features)
fs_elapsed = datetime.now() - fs_now
print(f"Finished predictions. Time: {str(fs_elapsed).split('.')[0]}")
return out_df
2022-03-20 10:09:33 +01:00
if __name__ == '__main__':
main()