2022-07-16 10:09:56 +02:00
|
|
|
from datetime import timedelta, datetime, time
|
2022-03-20 10:09:33 +01:00
|
|
|
from pathlib import Path
|
|
|
|
|
|
2022-07-16 10:09:56 +02:00
|
|
|
import pandas as pd
|
2022-03-20 10:09:33 +01:00
|
|
|
import numpy as np
|
|
|
|
|
|
2022-07-16 10:09:56 +02:00
|
|
|
import click
|
|
|
|
|
|
2022-03-20 10:09:33 +01:00
|
|
|
from service.App import *
|
|
|
|
|
|
|
|
|
|
"""
|
2022-07-16 10:09:56 +02:00
|
|
|
This script is intended for creating one output file from multiple input data files.
|
|
|
|
|
It is needed when we want to use additional data source in order to predict the main parameter.
|
|
|
|
|
For example, in order to predict BTC price, we might want to add ETH prices.
|
|
|
|
|
This script solves the following problems:
|
|
|
|
|
- Input files might have the same column names (e.g., open, high, low, close) and therefore it adds prefixes to the columns of the output file
|
|
|
|
|
- Input data may have gaps and therefore the script generates a regular time raster for the output file. The granularity of the time raster is determined by the parameter
|
2022-03-20 10:09:33 +01:00
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
depth_file_names = [ # Leave empty to skip
|
|
|
|
|
#r"C:\DATA2\BITCOIN\GENERATED\depth-BTCUSDT-batch1.csv",
|
|
|
|
|
#r"C:\DATA2\BITCOIN\GENERATED\depth-BTCUSDT-batch2.csv",
|
|
|
|
|
#r"C:\DATA2\BITCOIN\GENERATED\depth-BTCUSDT-batch3.csv",
|
|
|
|
|
#r"C:\DATA2\BITCOIN\GENERATED\depth-BTCUSDT-batch4.csv",
|
|
|
|
|
#r"C:\DATA2\BITCOIN\GENERATED\depth-BTCUSDT-batch5.csv",
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#
|
2022-07-16 10:09:56 +02:00
|
|
|
# Readers from inputs files (DEPRECATED)
|
2022-03-20 10:09:33 +01:00
|
|
|
#
|
|
|
|
|
|
|
|
|
|
def load_futur_files(futur_file_path):
|
|
|
|
|
"""Return a data frame with future features."""
|
|
|
|
|
|
|
|
|
|
df = pd.read_csv(futur_file_path, parse_dates=['timestamp'])
|
|
|
|
|
start = df["timestamp"].iloc[0]
|
|
|
|
|
end = df["timestamp"].iloc[-1]
|
|
|
|
|
|
|
|
|
|
df = df.set_index("timestamp")
|
|
|
|
|
|
|
|
|
|
print(f"Loaded futur file with {len(df)} records in total. Range: ({start}, {end})")
|
|
|
|
|
|
|
|
|
|
return df, start, end
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def load_kline_files(kline_file_path):
|
|
|
|
|
"""Return a data frame with kline features."""
|
|
|
|
|
|
|
|
|
|
df = pd.read_csv(kline_file_path, parse_dates=['timestamp'])
|
|
|
|
|
start = df["timestamp"].iloc[0]
|
|
|
|
|
end = df["timestamp"].iloc[-1]
|
|
|
|
|
|
|
|
|
|
df = df.set_index("timestamp")
|
|
|
|
|
|
|
|
|
|
print(f"Loaded kline file with {len(df)} records in total. Range: ({start}, {end})")
|
|
|
|
|
|
|
|
|
|
return df, start, end
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def load_depth_files():
|
|
|
|
|
"""Return a list of data frames with depth features."""
|
|
|
|
|
|
|
|
|
|
dfs = []
|
|
|
|
|
start = None
|
|
|
|
|
end = None
|
|
|
|
|
for depth_file_name in depth_file_names:
|
|
|
|
|
df = pd.read_csv(depth_file_name, parse_dates=['timestamp'])
|
|
|
|
|
# Start
|
|
|
|
|
if start is None:
|
|
|
|
|
start = df["timestamp"].iloc[0]
|
|
|
|
|
elif df["timestamp"].iloc[0] < start:
|
|
|
|
|
start = df["timestamp"].iloc[0]
|
|
|
|
|
# End
|
|
|
|
|
if end is None:
|
|
|
|
|
end = df["timestamp"].iloc[-1]
|
|
|
|
|
elif df["timestamp"].iloc[-1] > end:
|
|
|
|
|
end = df["timestamp"].iloc[-1]
|
|
|
|
|
|
|
|
|
|
df = df.set_index("timestamp")
|
|
|
|
|
|
|
|
|
|
dfs.append(df)
|
|
|
|
|
|
|
|
|
|
length = np.sum([len(df) for df in dfs])
|
|
|
|
|
print(f"Loaded {len(depth_file_names)} depth files with {length} records in total. Range: ({start}, {end})")
|
|
|
|
|
|
|
|
|
|
return dfs, start, end
|
|
|
|
|
|
2022-07-16 10:09:56 +02:00
|
|
|
#
|
|
|
|
|
# Merger
|
|
|
|
|
#
|
|
|
|
|
|
2022-03-20 10:09:33 +01:00
|
|
|
|
|
|
|
|
@click.command()
|
|
|
|
|
@click.option('--config_file', '-c', type=click.Path(), default='', help='Configuration file name')
|
|
|
|
|
def main(config_file):
|
|
|
|
|
load_config(config_file)
|
|
|
|
|
|
2022-07-16 10:09:56 +02:00
|
|
|
time_column = App.config["time_column"]
|
|
|
|
|
|
2022-04-17 19:55:22 +02:00
|
|
|
data_sources = App.config.get("data_sources", [])
|
|
|
|
|
if not data_sources:
|
2022-07-16 10:09:56 +02:00
|
|
|
print(f"ERROR: Data sources are not defined. Nothing to merge.")
|
|
|
|
|
#data_sources = [{"folder": symbol, "file": "klines", "column_prefix": ""}]
|
2022-03-20 10:09:33 +01:00
|
|
|
|
2022-07-16 10:09:56 +02:00
|
|
|
now = datetime.now()
|
2022-04-15 21:45:46 +02:00
|
|
|
|
2022-07-16 10:09:56 +02:00
|
|
|
# Read data from input files
|
|
|
|
|
data_path = Path(App.config["data_folder"])
|
2022-04-17 19:55:22 +02:00
|
|
|
for ds in data_sources:
|
|
|
|
|
# What is want is for each source, load file into df, determine its properties (columns, start, end etc.), and then merge all these dfs
|
2022-07-16 10:09:56 +02:00
|
|
|
|
|
|
|
|
quote = ds.get("folder")
|
|
|
|
|
if not quote:
|
|
|
|
|
print(f"ERROR. Folder is not specified.")
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# If file name is not specified then use symbol name as file name
|
|
|
|
|
file = ds.get("file", quote)
|
|
|
|
|
if not file:
|
|
|
|
|
file = quote
|
|
|
|
|
|
|
|
|
|
file_path = (data_path / quote / file).with_suffix(".csv")
|
2022-04-17 19:55:22 +02:00
|
|
|
if not file_path.is_file():
|
|
|
|
|
print(f"Data file does not exist: {file_path}")
|
|
|
|
|
return
|
2022-03-20 10:09:33 +01:00
|
|
|
|
2022-04-17 19:55:22 +02:00
|
|
|
print(f"Reading data file: {file_path}")
|
2022-07-16 10:09:56 +02:00
|
|
|
df = pd.read_csv(file_path, parse_dates=[time_column])
|
2022-04-23 09:18:45 +02:00
|
|
|
print(f"Loaded file with {len(df)} records.")
|
2022-07-16 10:09:56 +02:00
|
|
|
|
2022-04-23 09:18:45 +02:00
|
|
|
ds["df"] = df
|
|
|
|
|
|
|
|
|
|
# Merge in one df with prefixes and common regular time index
|
2022-08-27 13:05:49 +02:00
|
|
|
df_out = merge_data_sources(data_sources)
|
2022-04-23 09:18:45 +02:00
|
|
|
|
|
|
|
|
#
|
|
|
|
|
# Store file with features
|
|
|
|
|
#
|
2022-07-17 10:04:20 +02:00
|
|
|
out_path = data_path / App.config["symbol"] / App.config.get("merge_file_name")
|
2022-04-23 09:18:45 +02:00
|
|
|
|
|
|
|
|
print(f"Storing output file...")
|
2022-07-16 10:09:56 +02:00
|
|
|
df_out.to_csv(out_path.with_suffix(".csv"), index=True) # float_format="%.6f"
|
|
|
|
|
range_start = df_out.index[0]
|
|
|
|
|
range_end = df_out.index[-1]
|
2022-04-23 09:18:45 +02:00
|
|
|
print(f"Stored output merged file with {len(df_out)} records. Range: ({range_start}, {range_end})")
|
|
|
|
|
|
2022-07-16 10:09:56 +02:00
|
|
|
elapsed = datetime.now() - now
|
|
|
|
|
print(f"Finished merging data in {str(elapsed).split('.')[0]}")
|
|
|
|
|
print(f"Output file location: {out_path.with_suffix('.csv')}")
|
2022-04-23 09:18:45 +02:00
|
|
|
|
|
|
|
|
|
2022-08-27 13:05:49 +02:00
|
|
|
def merge_data_sources(data_sources: list):
|
2022-07-16 10:09:56 +02:00
|
|
|
|
|
|
|
|
time_column = App.config["time_column"]
|
|
|
|
|
freq = App.config["freq"]
|
|
|
|
|
|
2022-04-23 09:18:45 +02:00
|
|
|
for ds in data_sources:
|
|
|
|
|
df = ds.get("df")
|
|
|
|
|
|
2022-07-16 10:09:56 +02:00
|
|
|
if time_column in df.columns:
|
|
|
|
|
df = df.set_index(time_column)
|
2022-07-18 20:23:36 +02:00
|
|
|
elif df.index.name == time_column:
|
|
|
|
|
pass
|
2022-07-16 10:09:56 +02:00
|
|
|
else:
|
|
|
|
|
print(f"ERROR: Timestamp column is absent.")
|
|
|
|
|
return
|
2022-03-20 10:09:33 +01:00
|
|
|
|
2022-07-16 10:09:56 +02:00
|
|
|
# Add prefix if not already there
|
2022-04-17 19:55:22 +02:00
|
|
|
if ds['column_prefix']:
|
2022-07-16 10:09:56 +02:00
|
|
|
#df = df.add_prefix(ds['column_prefix']+"_")
|
|
|
|
|
df.columns = [
|
|
|
|
|
ds['column_prefix']+"_"+col if not col.startswith(ds['column_prefix']+"_") else col
|
|
|
|
|
for col in df.columns
|
|
|
|
|
]
|
2022-04-17 19:55:22 +02:00
|
|
|
|
2022-07-16 10:09:56 +02:00
|
|
|
ds["start"] = df.first_valid_index() # df.index[0]
|
|
|
|
|
ds["end"] = df.last_valid_index() # df.index[-1]
|
2022-04-17 19:55:22 +02:00
|
|
|
|
2022-04-23 09:18:45 +02:00
|
|
|
ds["df"] = df
|
2022-03-20 10:09:33 +01:00
|
|
|
|
|
|
|
|
#
|
|
|
|
|
# Create 1m common (main) index and empty data frame
|
|
|
|
|
#
|
2022-04-17 19:55:22 +02:00
|
|
|
range_start = min([ds["start"] for ds in data_sources])
|
2022-04-23 09:18:45 +02:00
|
|
|
range_end = min([ds["end"] for ds in data_sources])
|
2022-04-17 19:55:22 +02:00
|
|
|
|
2022-07-16 10:09:56 +02:00
|
|
|
# Regular time raster according to the parameter
|
|
|
|
|
if freq == "1m":
|
|
|
|
|
index = pd.date_range(range_start, range_end, freq="T")
|
|
|
|
|
elif freq == "1d":
|
|
|
|
|
index = pd.date_range(range_start, range_end, freq="B") # D - daily, B - business days (no weekends)
|
2022-07-17 10:04:20 +02:00
|
|
|
#index = pd.bdate_range(start=range_start, end=range_end) # tz='UTC'
|
2022-07-16 10:09:56 +02:00
|
|
|
else:
|
|
|
|
|
print(f"ERROR: Frequency parameter 'freq' is unknown or not specified: {freq}")
|
|
|
|
|
return
|
|
|
|
|
|
2022-03-20 10:09:33 +01:00
|
|
|
df_out = pd.DataFrame(index=index)
|
2022-07-16 10:09:56 +02:00
|
|
|
df_out.index.name = time_column
|
2022-03-20 10:09:33 +01:00
|
|
|
|
2022-04-17 19:55:22 +02:00
|
|
|
for ds in data_sources:
|
|
|
|
|
# Note that timestamps must have the same semantics, for example, start of kline (and not end of kline)
|
|
|
|
|
# If different data sets have different semantics for timestamps, then data must be shifted accordingly
|
|
|
|
|
df_out = df_out.join(ds["df"])
|
2022-03-20 10:09:33 +01:00
|
|
|
|
2022-04-23 09:18:45 +02:00
|
|
|
return df_out
|
2022-03-20 10:09:33 +01:00
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
|
main()
|