2022-03-20 10:09:33 +01:00
|
|
|
import pandas as pd
|
|
|
|
|
import math
|
|
|
|
|
#import os.path
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
import json
|
|
|
|
|
import time
|
|
|
|
|
from datetime import timedelta, datetime
|
|
|
|
|
from dateutil import parser
|
|
|
|
|
from tqdm import tqdm_notebook #(Optional, used for progress-bars)
|
|
|
|
|
import click
|
|
|
|
|
|
|
|
|
|
import numpy as np
|
|
|
|
|
|
|
|
|
|
from common.utils import *
|
|
|
|
|
from service.App import *
|
|
|
|
|
from common.feature_generation import *
|
|
|
|
|
|
|
|
|
|
"""
|
2022-04-17 19:55:22 +02:00
|
|
|
Create one output file from many input files:
|
|
|
|
|
- symbols like BTC or ETH
|
|
|
|
|
- different data types like klines, futures, depth (depth can be in several files)
|
|
|
|
|
|
|
|
|
|
Align the timestamps and create a uniform time axis without gaps.
|
|
|
|
|
|
|
|
|
|
Prefix column names with the corresponding origin modifiers so that each column name stores its origin.
|
|
|
|
|
|
|
|
|
|
IMPLEMENTATION.
|
|
|
|
|
What we need to know:
|
|
|
|
|
- data folder with all files, symbol list to get subfolders, data types for each symbol to find its individual file, prefix to be used for this file
|
|
|
|
|
- once files are loaded, create raster index, and merge all loaded data to this index by also providing column prefixes.
|
|
|
|
|
- what to do with empty (initial or trailing)? Automatically detect maximum common timestamp.
|
|
|
|
|
|
|
|
|
|
list of:
|
|
|
|
|
<symbol>-<data source> - prefix
|
|
|
|
|
|
2022-03-20 10:09:33 +01:00
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
depth_file_names = [ # Leave empty to skip
|
|
|
|
|
#r"C:\DATA2\BITCOIN\GENERATED\depth-BTCUSDT-batch1.csv",
|
|
|
|
|
#r"C:\DATA2\BITCOIN\GENERATED\depth-BTCUSDT-batch2.csv",
|
|
|
|
|
#r"C:\DATA2\BITCOIN\GENERATED\depth-BTCUSDT-batch3.csv",
|
|
|
|
|
#r"C:\DATA2\BITCOIN\GENERATED\depth-BTCUSDT-batch4.csv",
|
|
|
|
|
#r"C:\DATA2\BITCOIN\GENERATED\depth-BTCUSDT-batch5.csv",
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
futur_column_prefix = "f_"
|
|
|
|
|
range_type = "kline" # Selector: kline, futur, depth, merged (common range)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#
|
|
|
|
|
# Historic data
|
|
|
|
|
#
|
|
|
|
|
|
|
|
|
|
def load_futur_files(futur_file_path):
|
|
|
|
|
"""Return a data frame with future features."""
|
|
|
|
|
|
|
|
|
|
df = pd.read_csv(futur_file_path, parse_dates=['timestamp'])
|
|
|
|
|
start = df["timestamp"].iloc[0]
|
|
|
|
|
end = df["timestamp"].iloc[-1]
|
|
|
|
|
|
|
|
|
|
df = df.set_index("timestamp")
|
|
|
|
|
|
|
|
|
|
print(f"Loaded futur file with {len(df)} records in total. Range: ({start}, {end})")
|
|
|
|
|
|
|
|
|
|
return df, start, end
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def load_kline_files(kline_file_path):
|
|
|
|
|
"""Return a data frame with kline features."""
|
|
|
|
|
|
|
|
|
|
df = pd.read_csv(kline_file_path, parse_dates=['timestamp'])
|
|
|
|
|
start = df["timestamp"].iloc[0]
|
|
|
|
|
end = df["timestamp"].iloc[-1]
|
|
|
|
|
|
|
|
|
|
df = df.set_index("timestamp")
|
|
|
|
|
|
|
|
|
|
print(f"Loaded kline file with {len(df)} records in total. Range: ({start}, {end})")
|
|
|
|
|
|
|
|
|
|
return df, start, end
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def load_depth_files():
|
|
|
|
|
"""Return a list of data frames with depth features."""
|
|
|
|
|
|
|
|
|
|
dfs = []
|
|
|
|
|
start = None
|
|
|
|
|
end = None
|
|
|
|
|
for depth_file_name in depth_file_names:
|
|
|
|
|
df = pd.read_csv(depth_file_name, parse_dates=['timestamp'])
|
|
|
|
|
# Start
|
|
|
|
|
if start is None:
|
|
|
|
|
start = df["timestamp"].iloc[0]
|
|
|
|
|
elif df["timestamp"].iloc[0] < start:
|
|
|
|
|
start = df["timestamp"].iloc[0]
|
|
|
|
|
# End
|
|
|
|
|
if end is None:
|
|
|
|
|
end = df["timestamp"].iloc[-1]
|
|
|
|
|
elif df["timestamp"].iloc[-1] > end:
|
|
|
|
|
end = df["timestamp"].iloc[-1]
|
|
|
|
|
|
|
|
|
|
df = df.set_index("timestamp")
|
|
|
|
|
|
|
|
|
|
dfs.append(df)
|
|
|
|
|
|
|
|
|
|
length = np.sum([len(df) for df in dfs])
|
|
|
|
|
print(f"Loaded {len(depth_file_names)} depth files with {length} records in total. Range: ({start}, {end})")
|
|
|
|
|
|
|
|
|
|
return dfs, start, end
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@click.command()
|
|
|
|
|
@click.option('--config_file', '-c', type=click.Path(), default='', help='Configuration file name')
|
|
|
|
|
def main(config_file):
|
|
|
|
|
load_config(config_file)
|
|
|
|
|
|
|
|
|
|
freq = "1m"
|
|
|
|
|
symbol = App.config["symbol"]
|
2022-04-17 19:55:22 +02:00
|
|
|
data_path = Path(App.config["data_folder"])
|
|
|
|
|
data_sources = App.config.get("data_sources", [])
|
|
|
|
|
if not data_sources:
|
|
|
|
|
data_sources = [{"folder": symbol, "file": "klines", "column_prefix": ""}]
|
2022-03-20 10:09:33 +01:00
|
|
|
|
2022-04-15 21:45:46 +02:00
|
|
|
config_file_modifier = App.config.get("config_file_modifier")
|
|
|
|
|
config_file_modifier = ("-" + config_file_modifier) if config_file_modifier else ""
|
|
|
|
|
|
2022-03-20 10:09:33 +01:00
|
|
|
start_dt = datetime.now()
|
2022-04-15 21:45:46 +02:00
|
|
|
|
2022-04-23 09:18:45 +02:00
|
|
|
# Read data from files
|
2022-04-17 19:55:22 +02:00
|
|
|
for ds in data_sources:
|
|
|
|
|
# What is want is for each source, load file into df, determine its properties (columns, start, end etc.), and then merge all these dfs
|
|
|
|
|
file_path = (data_path / ds.get("folder") / ds.get("file")).with_suffix(".csv")
|
|
|
|
|
if not file_path.is_file():
|
|
|
|
|
print(f"Data file does not exist: {file_path}")
|
|
|
|
|
return
|
2022-03-20 10:09:33 +01:00
|
|
|
|
2022-04-17 19:55:22 +02:00
|
|
|
print(f"Reading data file: {file_path}")
|
|
|
|
|
df = pd.read_csv(file_path, parse_dates=['timestamp'])
|
2022-04-23 09:18:45 +02:00
|
|
|
print(f"Loaded file with {len(df)} records.")
|
|
|
|
|
ds["df"] = df
|
|
|
|
|
|
|
|
|
|
# Merge in one df with prefixes and common regular time index
|
|
|
|
|
df_out = merge_data_frames(data_sources)
|
2022-04-24 20:52:38 +02:00
|
|
|
range_start = df_out.index[0]
|
|
|
|
|
range_end = df_out.index[-1]
|
2022-04-23 09:18:45 +02:00
|
|
|
|
|
|
|
|
#
|
|
|
|
|
# Store file with features
|
|
|
|
|
#
|
|
|
|
|
out_file_suffix = App.config.get("merge_file_modifier")
|
|
|
|
|
|
|
|
|
|
out_file_name = f"{out_file_suffix}{config_file_modifier}.csv"
|
|
|
|
|
out_path = data_path / symbol / out_file_name
|
|
|
|
|
|
|
|
|
|
print(f"Storing output file...")
|
|
|
|
|
df_out.to_csv(out_path, index=True) # float_format="%.6f"
|
|
|
|
|
print(f"Stored output merged file with {len(df_out)} records. Range: ({range_start}, {range_end})")
|
|
|
|
|
|
|
|
|
|
elapsed = datetime.now() - start_dt
|
|
|
|
|
print(f"Finished processing in {int(elapsed.total_seconds())} seconds.")
|
|
|
|
|
print(f"Output file location: {out_path}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def merge_data_frames(data_sources: list):
|
|
|
|
|
for ds in data_sources:
|
|
|
|
|
df = ds.get("df")
|
|
|
|
|
|
|
|
|
|
if "timestamp" in df.columns:
|
|
|
|
|
df = df.set_index("timestamp")
|
2022-03-20 10:09:33 +01:00
|
|
|
|
2022-04-17 19:55:22 +02:00
|
|
|
if ds['column_prefix']:
|
|
|
|
|
df = df.add_prefix(ds['column_prefix']+"_")
|
|
|
|
|
#df.columns = [f"{ds['column_prefix']}_{col}" for col in df.columns]
|
|
|
|
|
|
|
|
|
|
ds["start"] = df.index[0]
|
|
|
|
|
ds["end"] = df.index[-1]
|
|
|
|
|
|
2022-04-23 09:18:45 +02:00
|
|
|
ds["df"] = df
|
2022-03-20 10:09:33 +01:00
|
|
|
|
|
|
|
|
#
|
|
|
|
|
# Create 1m common (main) index and empty data frame
|
|
|
|
|
#
|
2022-04-17 19:55:22 +02:00
|
|
|
range_start = min([ds["start"] for ds in data_sources])
|
2022-04-23 09:18:45 +02:00
|
|
|
range_end = min([ds["end"] for ds in data_sources])
|
2022-04-17 19:55:22 +02:00
|
|
|
|
|
|
|
|
index = pd.date_range(range_start, range_end, freq="T")
|
2022-03-20 10:09:33 +01:00
|
|
|
df_out = pd.DataFrame(index=index)
|
|
|
|
|
df_out.index.name = "timestamp"
|
|
|
|
|
|
2022-04-17 19:55:22 +02:00
|
|
|
for ds in data_sources:
|
|
|
|
|
# Note that timestamps must have the same semantics, for example, start of kline (and not end of kline)
|
|
|
|
|
# If different data sets have different semantics for timestamps, then data must be shifted accordingly
|
|
|
|
|
df_out = df_out.join(ds["df"])
|
2022-03-20 10:09:33 +01:00
|
|
|
|
2022-04-23 09:18:45 +02:00
|
|
|
return df_out
|
2022-03-20 10:09:33 +01:00
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
|
main()
|