intelligent-trading-bot/scripts/predict.py
2025-04-26 11:13:57 +02:00

168 lines
5.5 KiB
Python

from pathlib import Path
from datetime import datetime, timezone, timedelta
import click
from tqdm import tqdm
import numpy as np
import pandas as pd
from service.App import *
from common.model_store import *
from common.generators import predict_feature_set
"""
Apply models to (previously generated) features and compute prediction scores.
"""
#
# Parameters
#
class P:
in_nrows = 100_000_000 # For debugging
tail_rows = 0 # How many last rows to select (for debugging)
@click.command()
@click.option('--config_file', '-c', type=click.Path(), default='', help='Configuration file name')
def main(config_file):
load_config(config_file)
time_column = App.config["time_column"]
now = datetime.now()
#
# Load feature matrix
#
symbol = App.config["symbol"]
data_path = Path(App.config["data_folder"]) / symbol
file_path = data_path / App.config.get("matrix_file_name")
if not file_path.is_file():
print(f"ERROR: Input file does not exist: {file_path}")
return
print(f"Loading data from source data file {file_path}...")
if file_path.suffix == ".parquet":
df = pd.read_parquet(file_path)
elif file_path.suffix == ".csv":
df = pd.read_csv(file_path, parse_dates=[time_column], date_format="ISO8601", nrows=P.in_nrows)
else:
print(f"ERROR: Unknown extension of the input file '{file_path.suffix}'. Only 'csv' and 'parquet' are supported")
return
print(f"Finished loading {len(df)} records with {len(df.columns)} columns.")
df = df.iloc[-P.tail_rows:]
df = df.reset_index(drop=True)
print(f"Input data size {len(df)} records. Range: [{df.iloc[0][time_column]}, {df.iloc[-1][time_column]}]")
#
# Prepare data by selecting columns and rows
#
train_features = App.config.get("train_features")
labels = App.config["labels"]
algorithms = App.config.get("algorithms")
# Select necessary features and label
out_columns = [time_column, 'open', 'high', 'low', 'close', 'volume', 'close_time']
out_columns = [x for x in out_columns if x in df.columns]
labels_present = set(labels).issubset(df.columns)
if labels_present:
all_features = train_features + labels
else:
all_features = train_features
df = df[out_columns + [x for x in all_features if x not in out_columns]]
df.replace([np.inf, -np.inf], np.nan, inplace=True)
if len(df) == 0:
print(f"ERROR: Empty data set after removing NULLs in feature columns. Some features might all have NULL values.")
# print(train_df.isnull().sum().sort_values(ascending=False))
df = df.dropna(subset=train_features)
df = df.reset_index(drop=True) # We must reset index after removing rows to remove gaps
#
# Load models for all score columns
#
model_path = Path(App.config["model_folder"])
if not model_path.is_absolute():
model_path = data_path / model_path
model_path = model_path.resolve()
models = load_models_for_generators(App.config, model_path)
#
# Generate/predict train features
#
train_feature_sets = App.config.get("train_feature_sets", [])
if not train_feature_sets:
print(f"ERROR: no train feature sets defined. Nothing to process.")
return
print(f"Start generating trained features for {len(df)} input records.")
out_df = pd.DataFrame() # Collect predictions
features = []
scores = dict()
for i, fs in enumerate(train_feature_sets):
fs_now = datetime.now()
print(f"Start train feature set {i}/{len(train_feature_sets)}. Generator {fs.get('generator')}...")
fs_out_df, fs_features, fs_scores = predict_feature_set(df, fs, App.config, models)
out_df = pd.concat([out_df, fs_out_df], axis=1)
features.extend(fs_features)
scores.update(fs_scores)
fs_elapsed = datetime.now() - fs_now
print(f"Finished train feature set {i}/{len(train_feature_sets)}. Generator {fs.get('generator')}. Time: {str(fs_elapsed).split('.')[0]}")
print(f"Finished generating trained features.")
#
# Store scores
#
if labels_present:
lines = list()
for score_column_name, score in scores.items():
line = score_column_name + ", " + str(score)
lines.append(line)
metrics_file_name = f"prediction-metrics.txt"
metrics_path = (data_path / metrics_file_name).resolve()
with open(metrics_path, 'a+') as f:
f.write("\n".join(lines) + "\n\n")
print(f"Metrics stored in path: {metrics_path.absolute()}")
#
# Store predictions
#
# Store only selected original data, labels, and their predictions
out_df = out_df.join(df[out_columns + (labels if labels_present else [])])
out_path = data_path / App.config.get("predict_file_name")
print(f"Storing predictions with {len(out_df)} records and {len(out_df.columns)} columns in output file {out_path}...")
if out_path.suffix == ".parquet":
out_df.to_parquet(out_path, index=False)
elif out_path.suffix == ".csv":
out_df.to_csv(out_path, index=False, float_format='%.6f')
else:
print(f"ERROR: Unknown extension of the output file '{out_path.suffix}'. Only 'csv' and 'parquet' are supported")
return
print(f"Predictions stored in file: {out_path}. Length: {len(out_df)}. Columns: {len(out_df.columns)}")
#
# End
#
elapsed = datetime.now() - now
print(f"Finished predicting in {str(elapsed).split('.')[0]}")
if __name__ == '__main__':
main()