2022-07-16 10:09:56 +02:00
|
|
|
from datetime import timedelta, datetime, time
|
2022-03-20 10:09:33 +01:00
|
|
|
from pathlib import Path
|
|
|
|
|
|
2022-07-16 10:09:56 +02:00
|
|
|
import pandas as pd
|
2022-03-20 10:09:33 +01:00
|
|
|
import numpy as np
|
|
|
|
|
|
2022-07-16 10:09:56 +02:00
|
|
|
import click
|
|
|
|
|
|
2022-03-20 10:09:33 +01:00
|
|
|
from service.App import *
|
|
|
|
|
|
|
|
|
|
"""
|
2024-12-15 14:57:35 +01:00
|
|
|
Create one output file from multiple input data files.
|
2022-03-20 10:09:33 +01:00
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
@click.command()
|
|
|
|
|
@click.option('--config_file', '-c', type=click.Path(), default='', help='Configuration file name')
|
|
|
|
|
def main(config_file):
|
|
|
|
|
load_config(config_file)
|
|
|
|
|
|
2022-07-16 10:09:56 +02:00
|
|
|
time_column = App.config["time_column"]
|
|
|
|
|
|
2022-04-17 19:55:22 +02:00
|
|
|
data_sources = App.config.get("data_sources", [])
|
|
|
|
|
if not data_sources:
|
2022-07-16 10:09:56 +02:00
|
|
|
print(f"ERROR: Data sources are not defined. Nothing to merge.")
|
|
|
|
|
#data_sources = [{"folder": symbol, "file": "klines", "column_prefix": ""}]
|
2022-03-20 10:09:33 +01:00
|
|
|
|
2022-07-16 10:09:56 +02:00
|
|
|
now = datetime.now()
|
2022-04-15 21:45:46 +02:00
|
|
|
|
2022-07-16 10:09:56 +02:00
|
|
|
# Read data from input files
|
|
|
|
|
data_path = Path(App.config["data_folder"])
|
2022-04-17 19:55:22 +02:00
|
|
|
for ds in data_sources:
|
|
|
|
|
# What is want is for each source, load file into df, determine its properties (columns, start, end etc.), and then merge all these dfs
|
2022-07-16 10:09:56 +02:00
|
|
|
|
|
|
|
|
quote = ds.get("folder")
|
|
|
|
|
if not quote:
|
|
|
|
|
print(f"ERROR. Folder is not specified.")
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# If file name is not specified then use symbol name as file name
|
|
|
|
|
file = ds.get("file", quote)
|
|
|
|
|
if not file:
|
|
|
|
|
file = quote
|
|
|
|
|
|
|
|
|
|
file_path = (data_path / quote / file).with_suffix(".csv")
|
2022-04-17 19:55:22 +02:00
|
|
|
if not file_path.is_file():
|
|
|
|
|
print(f"Data file does not exist: {file_path}")
|
|
|
|
|
return
|
2022-03-20 10:09:33 +01:00
|
|
|
|
2022-04-17 19:55:22 +02:00
|
|
|
print(f"Reading data file: {file_path}")
|
2023-08-24 20:24:11 +02:00
|
|
|
df = pd.read_csv(file_path, parse_dates=[time_column], date_format="ISO8601")
|
2022-04-23 09:18:45 +02:00
|
|
|
print(f"Loaded file with {len(df)} records.")
|
2022-07-16 10:09:56 +02:00
|
|
|
|
2022-04-23 09:18:45 +02:00
|
|
|
ds["df"] = df
|
|
|
|
|
|
|
|
|
|
# Merge in one df with prefixes and common regular time index
|
2022-08-27 13:05:49 +02:00
|
|
|
df_out = merge_data_sources(data_sources)
|
2022-04-23 09:18:45 +02:00
|
|
|
|
|
|
|
|
#
|
|
|
|
|
# Store file with features
|
|
|
|
|
#
|
2022-07-17 10:04:20 +02:00
|
|
|
out_path = data_path / App.config["symbol"] / App.config.get("merge_file_name")
|
2022-04-23 09:18:45 +02:00
|
|
|
|
|
|
|
|
print(f"Storing output file...")
|
2024-03-16 11:42:24 +01:00
|
|
|
df_out = df_out.reset_index()
|
|
|
|
|
if out_path.suffix == ".parquet":
|
|
|
|
|
df_out.to_parquet(out_path, index=False)
|
|
|
|
|
elif out_path.suffix == ".csv":
|
2024-03-16 12:31:14 +01:00
|
|
|
df_out.to_csv(out_path, index=False) # float_format="%.6f"
|
2024-03-16 11:42:24 +01:00
|
|
|
else:
|
2025-04-17 17:22:55 +02:00
|
|
|
print(f"ERROR: Unknown extension of the output file '{out_path.suffix}'. Only 'csv' and 'parquet' are supported")
|
2024-03-16 11:42:24 +01:00
|
|
|
return
|
|
|
|
|
|
2022-07-16 10:09:56 +02:00
|
|
|
range_start = df_out.index[0]
|
|
|
|
|
range_end = df_out.index[-1]
|
2024-03-16 11:42:24 +01:00
|
|
|
print(f"Stored output file {out_path} with {len(df_out)} records. Range: ({range_start}, {range_end})")
|
2022-04-23 09:18:45 +02:00
|
|
|
|
2022-07-16 10:09:56 +02:00
|
|
|
elapsed = datetime.now() - now
|
|
|
|
|
print(f"Finished merging data in {str(elapsed).split('.')[0]}")
|
2022-04-23 09:18:45 +02:00
|
|
|
|
|
|
|
|
|
2022-08-27 13:05:49 +02:00
|
|
|
def merge_data_sources(data_sources: list):
|
2022-07-16 10:09:56 +02:00
|
|
|
|
|
|
|
|
time_column = App.config["time_column"]
|
|
|
|
|
freq = App.config["freq"]
|
|
|
|
|
|
2022-04-23 09:18:45 +02:00
|
|
|
for ds in data_sources:
|
|
|
|
|
df = ds.get("df")
|
|
|
|
|
|
2022-07-16 10:09:56 +02:00
|
|
|
if time_column in df.columns:
|
|
|
|
|
df = df.set_index(time_column)
|
2022-07-18 20:23:36 +02:00
|
|
|
elif df.index.name == time_column:
|
|
|
|
|
pass
|
2022-07-16 10:09:56 +02:00
|
|
|
else:
|
|
|
|
|
print(f"ERROR: Timestamp column is absent.")
|
|
|
|
|
return
|
2022-03-20 10:09:33 +01:00
|
|
|
|
2022-07-16 10:09:56 +02:00
|
|
|
# Add prefix if not already there
|
2022-04-17 19:55:22 +02:00
|
|
|
if ds['column_prefix']:
|
2022-07-16 10:09:56 +02:00
|
|
|
#df = df.add_prefix(ds['column_prefix']+"_")
|
|
|
|
|
df.columns = [
|
|
|
|
|
ds['column_prefix']+"_"+col if not col.startswith(ds['column_prefix']+"_") else col
|
|
|
|
|
for col in df.columns
|
|
|
|
|
]
|
2022-04-17 19:55:22 +02:00
|
|
|
|
2022-07-16 10:09:56 +02:00
|
|
|
ds["start"] = df.first_valid_index() # df.index[0]
|
|
|
|
|
ds["end"] = df.last_valid_index() # df.index[-1]
|
2022-04-17 19:55:22 +02:00
|
|
|
|
2022-04-23 09:18:45 +02:00
|
|
|
ds["df"] = df
|
2022-03-20 10:09:33 +01:00
|
|
|
|
|
|
|
|
#
|
2024-05-11 15:43:40 +02:00
|
|
|
# Create common (main) index and empty data frame
|
2022-03-20 10:09:33 +01:00
|
|
|
#
|
2022-04-17 19:55:22 +02:00
|
|
|
range_start = min([ds["start"] for ds in data_sources])
|
2022-04-23 09:18:45 +02:00
|
|
|
range_end = min([ds["end"] for ds in data_sources])
|
2022-04-17 19:55:22 +02:00
|
|
|
|
2024-05-11 15:43:40 +02:00
|
|
|
# Generate a discrete time raster according to the (pandas) frequency parameter
|
|
|
|
|
index = pd.date_range(range_start, range_end, freq=freq)
|
2022-07-16 10:09:56 +02:00
|
|
|
|
2022-03-20 10:09:33 +01:00
|
|
|
df_out = pd.DataFrame(index=index)
|
2022-07-16 10:09:56 +02:00
|
|
|
df_out.index.name = time_column
|
2022-03-20 10:09:33 +01:00
|
|
|
|
2022-04-17 19:55:22 +02:00
|
|
|
for ds in data_sources:
|
|
|
|
|
# Note that timestamps must have the same semantics, for example, start of kline (and not end of kline)
|
|
|
|
|
# If different data sets have different semantics for timestamps, then data must be shifted accordingly
|
|
|
|
|
df_out = df_out.join(ds["df"])
|
2022-03-20 10:09:33 +01:00
|
|
|
|
2024-12-15 14:57:35 +01:00
|
|
|
# Interpolate numeric columns
|
|
|
|
|
merge_interpolate = App.config.get("merge_interpolate", False)
|
|
|
|
|
if merge_interpolate:
|
|
|
|
|
num_columns = df_out.select_dtypes((float, int)).columns.tolist()
|
|
|
|
|
for col in num_columns:
|
|
|
|
|
df_out[col] = df_out[col].interpolate()
|
|
|
|
|
|
2022-04-23 09:18:45 +02:00
|
|
|
return df_out
|
2022-03-20 10:09:33 +01:00
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
|
main()
|