2022-08-07 14:04:47 +02:00
|
|
|
from pathlib import Path
|
|
|
|
|
from datetime import datetime, timezone, timedelta
|
|
|
|
|
import click
|
|
|
|
|
from tqdm import tqdm
|
|
|
|
|
|
|
|
|
|
import numpy as np
|
|
|
|
|
import pandas as pd
|
2025-08-24 13:02:51 +02:00
|
|
|
import pandas.api.types as ptypes
|
2022-08-07 14:04:47 +02:00
|
|
|
|
|
|
|
|
from service.App import *
|
|
|
|
|
from common.model_store import *
|
2026-01-08 16:19:13 +01:00
|
|
|
from common.utils import compute_scores_regression, compute_scores
|
2024-03-24 13:22:52 +01:00
|
|
|
from common.generators import predict_feature_set
|
2022-08-07 14:04:47 +02:00
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
Apply models to (previously generated) features and compute prediction scores.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
@click.command()
|
|
|
|
|
@click.option('--config_file', '-c', type=click.Path(), default='', help='Configuration file name')
|
|
|
|
|
def main(config_file):
|
|
|
|
|
load_config(config_file)
|
2025-07-26 19:40:49 +02:00
|
|
|
config = App.config
|
2025-06-14 20:17:19 +02:00
|
|
|
|
2025-07-26 19:40:49 +02:00
|
|
|
App.model_store = ModelStore(config)
|
2025-06-16 17:51:33 +02:00
|
|
|
App.model_store.load_models()
|
2022-08-07 14:04:47 +02:00
|
|
|
|
2025-07-26 19:40:49 +02:00
|
|
|
time_column = config["time_column"]
|
2022-08-07 14:04:47 +02:00
|
|
|
|
|
|
|
|
now = datetime.now()
|
|
|
|
|
|
2025-07-26 19:40:49 +02:00
|
|
|
symbol = config["symbol"]
|
|
|
|
|
data_path = Path(config["data_folder"]) / symbol
|
|
|
|
|
|
|
|
|
|
# Determine desired data length depending on train/predict mode
|
|
|
|
|
is_train = config.get("train")
|
|
|
|
|
if is_train:
|
|
|
|
|
window_size = config.get("train_length")
|
|
|
|
|
print(f"WARNING: Train mode is specified although this script is intended for prediction and will not train models.")
|
|
|
|
|
else:
|
|
|
|
|
window_size = config.get("predict_length")
|
|
|
|
|
features_horizon = config.get("features_horizon")
|
|
|
|
|
if window_size:
|
|
|
|
|
window_size += features_horizon
|
|
|
|
|
|
2022-08-07 14:04:47 +02:00
|
|
|
#
|
2025-07-26 19:40:49 +02:00
|
|
|
# Load data
|
2022-08-07 14:04:47 +02:00
|
|
|
#
|
2025-07-26 19:40:49 +02:00
|
|
|
file_path = data_path / config.get("matrix_file_name")
|
2022-08-07 14:04:47 +02:00
|
|
|
if not file_path.is_file():
|
|
|
|
|
print(f"ERROR: Input file does not exist: {file_path}")
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
print(f"Loading data from source data file {file_path}...")
|
2024-03-16 13:08:25 +01:00
|
|
|
if file_path.suffix == ".parquet":
|
|
|
|
|
df = pd.read_parquet(file_path)
|
|
|
|
|
elif file_path.suffix == ".csv":
|
2025-07-26 19:40:49 +02:00
|
|
|
df = pd.read_csv(file_path, parse_dates=[time_column], date_format="ISO8601")
|
2024-03-16 13:08:25 +01:00
|
|
|
else:
|
2025-04-17 17:22:55 +02:00
|
|
|
print(f"ERROR: Unknown extension of the input file '{file_path.suffix}'. Only 'csv' and 'parquet' are supported")
|
2024-03-16 13:08:25 +01:00
|
|
|
return
|
2022-08-07 14:04:47 +02:00
|
|
|
|
2025-07-26 19:40:49 +02:00
|
|
|
print(f"Finished loading {len(df)} records with {len(df.columns)} columns from the source file {file_path}")
|
|
|
|
|
|
|
|
|
|
# Select only the data necessary for analysis
|
|
|
|
|
if window_size:
|
|
|
|
|
df = df.tail(window_size)
|
|
|
|
|
df = df.reset_index(drop=True)
|
2022-08-07 14:04:47 +02:00
|
|
|
|
2023-09-02 11:42:12 +02:00
|
|
|
print(f"Input data size {len(df)} records. Range: [{df.iloc[0][time_column]}, {df.iloc[-1][time_column]}]")
|
|
|
|
|
|
2022-08-07 14:04:47 +02:00
|
|
|
#
|
2025-07-26 19:40:49 +02:00
|
|
|
# Apply ML algorithm predictors
|
2022-08-07 14:04:47 +02:00
|
|
|
#
|
2025-08-24 13:02:51 +02:00
|
|
|
train_features_all = config.get("train_features")
|
|
|
|
|
labels_all = config["labels"]
|
2022-08-07 14:04:47 +02:00
|
|
|
|
|
|
|
|
# Select necessary features and label
|
2025-03-25 20:04:36 +01:00
|
|
|
out_columns = [time_column, 'open', 'high', 'low', 'close', 'volume', 'close_time']
|
2022-08-07 14:04:47 +02:00
|
|
|
out_columns = [x for x in out_columns if x in df.columns]
|
2025-08-24 13:02:51 +02:00
|
|
|
labels_present = set(labels_all).issubset(df.columns)
|
2022-08-07 14:04:47 +02:00
|
|
|
if labels_present:
|
2025-08-24 13:02:51 +02:00
|
|
|
all_features = train_features_all + labels_all
|
2022-08-07 14:04:47 +02:00
|
|
|
else:
|
2025-08-24 13:02:51 +02:00
|
|
|
all_features = train_features_all
|
2023-03-11 17:31:16 +01:00
|
|
|
df = df[out_columns + [x for x in all_features if x not in out_columns]]
|
2022-08-07 14:04:47 +02:00
|
|
|
|
2025-07-26 19:40:49 +02:00
|
|
|
# Handle NULLs
|
2024-03-16 12:01:49 +01:00
|
|
|
df.replace([np.inf, -np.inf], np.nan, inplace=True)
|
2025-08-24 13:02:51 +02:00
|
|
|
na_df = df[ df[train_features_all].isna().any(axis=1) ]
|
2025-07-26 19:40:49 +02:00
|
|
|
if len(na_df) > 0:
|
|
|
|
|
print(f"WARNING: There exist {len(na_df)} rows with NULLs in some feature columns. These rows will be removed.")
|
2025-08-24 13:02:51 +02:00
|
|
|
df = df.dropna(subset=train_features_all)
|
2025-07-26 19:40:49 +02:00
|
|
|
df = df.reset_index(drop=True) # We must reset index after removing rows to remove gaps
|
2025-01-11 18:44:28 +01:00
|
|
|
|
2022-08-07 14:04:47 +02:00
|
|
|
#
|
2024-03-23 18:13:57 +01:00
|
|
|
# Generate/predict train features
|
2022-08-07 14:04:47 +02:00
|
|
|
#
|
2025-07-26 19:40:49 +02:00
|
|
|
train_feature_sets = config.get("train_feature_sets", [])
|
2024-03-23 18:13:57 +01:00
|
|
|
if not train_feature_sets:
|
|
|
|
|
print(f"ERROR: no train feature sets defined. Nothing to process.")
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
print(f"Start generating trained features for {len(df)} input records.")
|
|
|
|
|
|
2025-08-24 13:02:51 +02:00
|
|
|
labels_hat_df = pd.DataFrame() # Collect predictions
|
2024-03-23 18:13:57 +01:00
|
|
|
features = []
|
|
|
|
|
|
|
|
|
|
for i, fs in enumerate(train_feature_sets):
|
|
|
|
|
fs_now = datetime.now()
|
|
|
|
|
print(f"Start train feature set {i}/{len(train_feature_sets)}. Generator {fs.get('generator')}...")
|
|
|
|
|
|
2025-08-24 13:02:51 +02:00
|
|
|
fs_out_df, fs_features = predict_feature_set(df, fs, config, App.model_store)
|
2024-03-23 18:13:57 +01:00
|
|
|
|
2025-08-24 13:02:51 +02:00
|
|
|
labels_hat_df = pd.concat([labels_hat_df, fs_out_df], axis=1)
|
2024-03-23 18:13:57 +01:00
|
|
|
features.extend(fs_features)
|
|
|
|
|
|
|
|
|
|
fs_elapsed = datetime.now() - fs_now
|
|
|
|
|
print(f"Finished train feature set {i}/{len(train_feature_sets)}. Generator {fs.get('generator')}. Time: {str(fs_elapsed).split('.')[0]}")
|
|
|
|
|
|
|
|
|
|
print(f"Finished generating trained features.")
|
2022-08-07 14:04:47 +02:00
|
|
|
|
|
|
|
|
#
|
|
|
|
|
# Store predictions
|
|
|
|
|
#
|
|
|
|
|
# Store only selected original data, labels, and their predictions
|
2025-08-24 13:02:51 +02:00
|
|
|
out_df = labels_hat_df.join(df[out_columns + (labels_all if labels_present else [])])
|
2022-08-07 14:04:47 +02:00
|
|
|
|
2025-07-26 19:40:49 +02:00
|
|
|
out_path = data_path / config.get("predict_file_name")
|
2022-08-07 14:04:47 +02:00
|
|
|
|
2024-03-16 13:46:08 +01:00
|
|
|
print(f"Storing predictions with {len(out_df)} records and {len(out_df.columns)} columns in output file {out_path}...")
|
|
|
|
|
if out_path.suffix == ".parquet":
|
|
|
|
|
out_df.to_parquet(out_path, index=False)
|
|
|
|
|
elif out_path.suffix == ".csv":
|
|
|
|
|
out_df.to_csv(out_path, index=False, float_format='%.6f')
|
|
|
|
|
else:
|
2025-04-17 17:22:55 +02:00
|
|
|
print(f"ERROR: Unknown extension of the output file '{out_path.suffix}'. Only 'csv' and 'parquet' are supported")
|
2024-03-16 13:46:08 +01:00
|
|
|
return
|
|
|
|
|
|
2022-08-07 14:04:47 +02:00
|
|
|
print(f"Predictions stored in file: {out_path}. Length: {len(out_df)}. Columns: {len(out_df.columns)}")
|
|
|
|
|
|
2025-08-24 13:02:51 +02:00
|
|
|
#
|
|
|
|
|
# Compute and store scores
|
|
|
|
|
#
|
|
|
|
|
score_lines = []
|
|
|
|
|
# For each predicted column, find the corresponding true label column and then compare them
|
|
|
|
|
for score_column_name in labels_hat_df.columns:
|
|
|
|
|
label_column, _ = score_to_label_algo_pair(score_column_name)
|
|
|
|
|
|
|
|
|
|
# Drop nans from scores
|
|
|
|
|
df_scores = pd.DataFrame({"y_true": out_df[label_column], "y_predicted": out_df[score_column_name]})
|
|
|
|
|
df_scores = df_scores.dropna()
|
|
|
|
|
|
|
|
|
|
y_true = df_scores["y_true"]
|
|
|
|
|
y_predicted = df_scores["y_predicted"]
|
|
|
|
|
y_predicted_class = np.where(y_predicted.values > 0.5, 1, 0)
|
|
|
|
|
|
|
|
|
|
if ptypes.is_float_dtype(y_true) and ptypes.is_float_dtype(y_predicted):
|
|
|
|
|
score = compute_scores_regression(y_true, y_predicted) # Regression stores
|
|
|
|
|
else:
|
|
|
|
|
score = compute_scores(y_true.astype(int), y_predicted) # Classification stores
|
|
|
|
|
|
|
|
|
|
score_lines.append(f"{score_column_name}: {score}")
|
|
|
|
|
|
|
|
|
|
#
|
|
|
|
|
# Store scores
|
|
|
|
|
#
|
|
|
|
|
score_path = out_path.with_suffix('.txt')
|
|
|
|
|
with open(score_path, "a+") as f:
|
|
|
|
|
f.write("\n".join([str(x) for x in score_lines]) + "\n\n")
|
|
|
|
|
|
|
|
|
|
print(f"Prediction scores stored in path: {score_path.absolute()}")
|
|
|
|
|
|
2022-08-07 14:04:47 +02:00
|
|
|
#
|
|
|
|
|
# End
|
|
|
|
|
#
|
|
|
|
|
elapsed = datetime.now() - now
|
2025-04-26 11:13:57 +02:00
|
|
|
print(f"Finished predicting in {str(elapsed).split('.')[0]}")
|
2022-08-07 14:04:47 +02:00
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
|
main()
|