intelligent-trading-bot/scripts/merge.py
2026-01-09 16:50:52 +01:00

111 lines
3.6 KiB
Python

from datetime import timedelta, datetime, time
from pathlib import Path
import pandas as pd
import numpy as np
import click
from common.utils import merge_data_sources
from service.App import *
"""
Create one output file from multiple input data files.
"""
@click.command()
@click.option('--config_file', '-c', type=click.Path(), default='', help='Configuration file name')
def main(config_file):
load_config(config_file)
config = App.config
time_column = config["time_column"]
now = datetime.now()
symbol = config["symbol"]
data_path = Path(config["data_folder"])
# Determine desired data length depending on train/predict mode
is_train = config.get("train")
if is_train:
window_size = config.get("train_length")
else:
window_size = config.get("predict_length")
features_horizon = config.get("features_horizon")
if window_size:
window_size += features_horizon
#
# Load data from multiple sources and merge
#
data_sources = config.get("data_sources", [])
if not data_sources:
print(f"ERROR: Data sources are not defined. Nothing to merge.")
#data_sources = [{"folder": symbol, "file": "klines", "column_prefix": ""}]
# Read data from input files
for ds in data_sources:
# What is want is for each source, load file into df, determine its properties (columns, start, end etc.), and then merge all these dfs
quote = ds.get("folder")
if not quote:
print(f"ERROR. Folder is not specified.")
continue
# If file name is not specified then use symbol name as file name
file = ds.get("file", quote)
if not file:
file = quote
file_path = (data_path / quote / file)
if not file_path.suffix:
file_path = file_path.with_suffix(".csv") # CSV by default
print(f"Reading data file: {file_path}")
if file_path.suffix == ".parquet":
df = pd.read_parquet(file_path)
elif file_path.suffix == ".csv":
df = pd.read_csv(file_path, parse_dates=[time_column], date_format="ISO8601")
else:
print(f"ERROR: Unknown extension of the input file '{file_path.suffix}'. Only 'csv' and 'parquet' are supported")
return
print(f"Loaded file with {len(df)} records.")
# Select only the data necessary for analysis
if window_size:
df = df.tail(window_size)
df = df.reset_index(drop=True)
ds["df"] = df
# Merge in one df with prefixes and common regular time index
freq = App.config["freq"]
merge_interpolate = App.config.get("merge_interpolate", False)
df_out = merge_data_sources(data_sources, time_column, freq, merge_interpolate)
#
# Store file with features
#
out_path = data_path / symbol / config.get("merge_file_name")
print(f"Storing output file...")
df_out = df_out.reset_index(drop=(df_out.index.name in df_out.columns))
if out_path.suffix == ".parquet":
df_out.to_parquet(out_path, index=False)
elif out_path.suffix == ".csv":
df_out.to_csv(out_path, index=False) # float_format="%.6f"
else:
print(f"ERROR: Unknown extension of the output file '{out_path.suffix}'. Only 'csv' and 'parquet' are supported")
return
range_start = df_out.index[0]
range_end = df_out.index[-1]
print(f"Stored output file {out_path} with {len(df_out)} records. Range: ({range_start}, {range_end})")
elapsed = datetime.now() - now
print(f"Finished merging data in {str(elapsed).split('.')[0]}")
if __name__ == '__main__':
main()