2022-07-16 10:09:56 +02:00
|
|
|
from datetime import timedelta, datetime, time
|
2022-03-20 10:09:33 +01:00
|
|
|
from pathlib import Path
|
|
|
|
|
|
2022-07-16 10:09:56 +02:00
|
|
|
import pandas as pd
|
2022-03-20 10:09:33 +01:00
|
|
|
import numpy as np
|
|
|
|
|
|
2022-07-16 10:09:56 +02:00
|
|
|
import click
|
|
|
|
|
|
2026-01-09 16:50:52 +01:00
|
|
|
from common.utils import merge_data_sources
|
2022-03-20 10:09:33 +01:00
|
|
|
from service.App import *
|
|
|
|
|
|
|
|
|
|
"""
|
2024-12-15 14:57:35 +01:00
|
|
|
Create one output file from multiple input data files.
|
2022-03-20 10:09:33 +01:00
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
@click.command()
|
|
|
|
|
@click.option('--config_file', '-c', type=click.Path(), default='', help='Configuration file name')
|
|
|
|
|
def main(config_file):
|
|
|
|
|
load_config(config_file)
|
2025-07-26 19:40:49 +02:00
|
|
|
config = App.config
|
2022-03-20 10:09:33 +01:00
|
|
|
|
2025-07-26 19:40:49 +02:00
|
|
|
time_column = config["time_column"]
|
2022-07-16 10:09:56 +02:00
|
|
|
|
2025-07-26 19:40:49 +02:00
|
|
|
now = datetime.now()
|
|
|
|
|
|
|
|
|
|
symbol = config["symbol"]
|
|
|
|
|
data_path = Path(config["data_folder"])
|
|
|
|
|
|
|
|
|
|
# Determine desired data length depending on train/predict mode
|
|
|
|
|
is_train = config.get("train")
|
|
|
|
|
if is_train:
|
|
|
|
|
window_size = config.get("train_length")
|
|
|
|
|
else:
|
|
|
|
|
window_size = config.get("predict_length")
|
|
|
|
|
features_horizon = config.get("features_horizon")
|
|
|
|
|
if window_size:
|
|
|
|
|
window_size += features_horizon
|
|
|
|
|
|
|
|
|
|
#
|
|
|
|
|
# Load data from multiple sources and merge
|
|
|
|
|
#
|
|
|
|
|
data_sources = config.get("data_sources", [])
|
2022-04-17 19:55:22 +02:00
|
|
|
if not data_sources:
|
2022-07-16 10:09:56 +02:00
|
|
|
print(f"ERROR: Data sources are not defined. Nothing to merge.")
|
|
|
|
|
#data_sources = [{"folder": symbol, "file": "klines", "column_prefix": ""}]
|
2022-03-20 10:09:33 +01:00
|
|
|
|
2022-07-16 10:09:56 +02:00
|
|
|
# Read data from input files
|
2022-04-17 19:55:22 +02:00
|
|
|
for ds in data_sources:
|
|
|
|
|
# What is want is for each source, load file into df, determine its properties (columns, start, end etc.), and then merge all these dfs
|
2022-07-16 10:09:56 +02:00
|
|
|
|
|
|
|
|
quote = ds.get("folder")
|
|
|
|
|
if not quote:
|
|
|
|
|
print(f"ERROR. Folder is not specified.")
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# If file name is not specified then use symbol name as file name
|
|
|
|
|
file = ds.get("file", quote)
|
|
|
|
|
if not file:
|
|
|
|
|
file = quote
|
|
|
|
|
|
2025-07-26 19:40:49 +02:00
|
|
|
file_path = (data_path / quote / file)
|
|
|
|
|
if not file_path.suffix:
|
|
|
|
|
file_path = file_path.with_suffix(".csv") # CSV by default
|
2022-03-20 10:09:33 +01:00
|
|
|
|
2022-04-17 19:55:22 +02:00
|
|
|
print(f"Reading data file: {file_path}")
|
2025-07-26 19:40:49 +02:00
|
|
|
if file_path.suffix == ".parquet":
|
|
|
|
|
df = pd.read_parquet(file_path)
|
|
|
|
|
elif file_path.suffix == ".csv":
|
|
|
|
|
df = pd.read_csv(file_path, parse_dates=[time_column], date_format="ISO8601")
|
|
|
|
|
else:
|
|
|
|
|
print(f"ERROR: Unknown extension of the input file '{file_path.suffix}'. Only 'csv' and 'parquet' are supported")
|
|
|
|
|
return
|
2022-04-23 09:18:45 +02:00
|
|
|
print(f"Loaded file with {len(df)} records.")
|
2022-07-16 10:09:56 +02:00
|
|
|
|
2025-07-26 19:40:49 +02:00
|
|
|
# Select only the data necessary for analysis
|
|
|
|
|
if window_size:
|
|
|
|
|
df = df.tail(window_size)
|
|
|
|
|
df = df.reset_index(drop=True)
|
|
|
|
|
|
2022-04-23 09:18:45 +02:00
|
|
|
ds["df"] = df
|
|
|
|
|
|
|
|
|
|
# Merge in one df with prefixes and common regular time index
|
2026-01-09 16:50:52 +01:00
|
|
|
freq = App.config["freq"]
|
|
|
|
|
merge_interpolate = App.config.get("merge_interpolate", False)
|
|
|
|
|
df_out = merge_data_sources(data_sources, time_column, freq, merge_interpolate)
|
2022-04-23 09:18:45 +02:00
|
|
|
|
|
|
|
|
#
|
|
|
|
|
# Store file with features
|
|
|
|
|
#
|
2025-07-26 19:40:49 +02:00
|
|
|
out_path = data_path / symbol / config.get("merge_file_name")
|
2022-04-23 09:18:45 +02:00
|
|
|
|
|
|
|
|
print(f"Storing output file...")
|
2025-10-28 19:28:06 +01:00
|
|
|
df_out = df_out.reset_index(drop=(df_out.index.name in df_out.columns))
|
2024-03-16 11:42:24 +01:00
|
|
|
if out_path.suffix == ".parquet":
|
|
|
|
|
df_out.to_parquet(out_path, index=False)
|
|
|
|
|
elif out_path.suffix == ".csv":
|
2024-03-16 12:31:14 +01:00
|
|
|
df_out.to_csv(out_path, index=False) # float_format="%.6f"
|
2024-03-16 11:42:24 +01:00
|
|
|
else:
|
2025-04-17 17:22:55 +02:00
|
|
|
print(f"ERROR: Unknown extension of the output file '{out_path.suffix}'. Only 'csv' and 'parquet' are supported")
|
2024-03-16 11:42:24 +01:00
|
|
|
return
|
|
|
|
|
|
2022-07-16 10:09:56 +02:00
|
|
|
range_start = df_out.index[0]
|
|
|
|
|
range_end = df_out.index[-1]
|
2024-03-16 11:42:24 +01:00
|
|
|
print(f"Stored output file {out_path} with {len(df_out)} records. Range: ({range_start}, {range_end})")
|
2022-04-23 09:18:45 +02:00
|
|
|
|
2022-07-16 10:09:56 +02:00
|
|
|
elapsed = datetime.now() - now
|
|
|
|
|
print(f"Finished merging data in {str(elapsed).split('.')[0]}")
|
2022-04-23 09:18:45 +02:00
|
|
|
|
|
|
|
|
|
2022-03-20 10:09:33 +01:00
|
|
|
if __name__ == '__main__':
|
|
|
|
|
main()
|