intelligent-trading-bot/scripts/merge_data.py

199 lines
6.2 KiB
Python
Raw Permalink Normal View History

2022-03-20 10:09:33 +01:00
import pandas as pd
import math
#import os.path
from pathlib import Path
import json
import time
from datetime import timedelta, datetime
from dateutil import parser
from tqdm import tqdm_notebook #(Optional, used for progress-bars)
import click
import numpy as np
from common.utils import *
from service.App import *
from common.feature_generation import *
"""
Create one output file from many input files:
- symbols like BTC or ETH
- different data types like klines, futures, depth (depth can be in several files)
Align the timestamps and create a uniform time axis without gaps.
Prefix column names with the corresponding origin modifiers so that each column name stores its origin.
IMPLEMENTATION.
What we need to know:
- data folder with all files, symbol list to get subfolders, data types for each symbol to find its individual file, prefix to be used for this file
- once files are loaded, create raster index, and merge all loaded data to this index by also providing column prefixes.
- what to do with empty (initial or trailing)? Automatically detect maximum common timestamp.
list of:
<symbol>-<data source> - prefix
2022-03-20 10:09:33 +01:00
"""
depth_file_names = [ # Leave empty to skip
#r"C:\DATA2\BITCOIN\GENERATED\depth-BTCUSDT-batch1.csv",
#r"C:\DATA2\BITCOIN\GENERATED\depth-BTCUSDT-batch2.csv",
#r"C:\DATA2\BITCOIN\GENERATED\depth-BTCUSDT-batch3.csv",
#r"C:\DATA2\BITCOIN\GENERATED\depth-BTCUSDT-batch4.csv",
#r"C:\DATA2\BITCOIN\GENERATED\depth-BTCUSDT-batch5.csv",
]
futur_column_prefix = "f_"
range_type = "kline" # Selector: kline, futur, depth, merged (common range)
#
# Historic data
#
def load_futur_files(futur_file_path):
"""Return a data frame with future features."""
df = pd.read_csv(futur_file_path, parse_dates=['timestamp'])
start = df["timestamp"].iloc[0]
end = df["timestamp"].iloc[-1]
df = df.set_index("timestamp")
print(f"Loaded futur file with {len(df)} records in total. Range: ({start}, {end})")
return df, start, end
def load_kline_files(kline_file_path):
"""Return a data frame with kline features."""
df = pd.read_csv(kline_file_path, parse_dates=['timestamp'])
start = df["timestamp"].iloc[0]
end = df["timestamp"].iloc[-1]
df = df.set_index("timestamp")
print(f"Loaded kline file with {len(df)} records in total. Range: ({start}, {end})")
return df, start, end
def load_depth_files():
"""Return a list of data frames with depth features."""
dfs = []
start = None
end = None
for depth_file_name in depth_file_names:
df = pd.read_csv(depth_file_name, parse_dates=['timestamp'])
# Start
if start is None:
start = df["timestamp"].iloc[0]
elif df["timestamp"].iloc[0] < start:
start = df["timestamp"].iloc[0]
# End
if end is None:
end = df["timestamp"].iloc[-1]
elif df["timestamp"].iloc[-1] > end:
end = df["timestamp"].iloc[-1]
df = df.set_index("timestamp")
dfs.append(df)
length = np.sum([len(df) for df in dfs])
print(f"Loaded {len(depth_file_names)} depth files with {length} records in total. Range: ({start}, {end})")
return dfs, start, end
@click.command()
@click.option('--config_file', '-c', type=click.Path(), default='', help='Configuration file name')
def main(config_file):
load_config(config_file)
freq = "1m"
symbol = App.config["symbol"]
data_path = Path(App.config["data_folder"])
data_sources = App.config.get("data_sources", [])
if not data_sources:
data_sources = [{"folder": symbol, "file": "klines", "column_prefix": ""}]
2022-03-20 10:09:33 +01:00
2022-04-15 21:45:46 +02:00
config_file_modifier = App.config.get("config_file_modifier")
config_file_modifier = ("-" + config_file_modifier) if config_file_modifier else ""
2022-03-20 10:09:33 +01:00
start_dt = datetime.now()
2022-04-15 21:45:46 +02:00
# Read data from files
for ds in data_sources:
# What is want is for each source, load file into df, determine its properties (columns, start, end etc.), and then merge all these dfs
file_path = (data_path / ds.get("folder") / ds.get("file")).with_suffix(".csv")
if not file_path.is_file():
print(f"Data file does not exist: {file_path}")
return
2022-03-20 10:09:33 +01:00
print(f"Reading data file: {file_path}")
df = pd.read_csv(file_path, parse_dates=['timestamp'])
print(f"Loaded file with {len(df)} records.")
ds["df"] = df
# Merge in one df with prefixes and common regular time index
df_out = merge_data_frames(data_sources)
range_start = df_out.index[0]
range_end = df_out.index[-1]
#
# Store file with features
#
out_file_suffix = App.config.get("merge_file_modifier")
out_file_name = f"{out_file_suffix}{config_file_modifier}.csv"
out_path = data_path / symbol / out_file_name
print(f"Storing output file...")
df_out.to_csv(out_path, index=True) # float_format="%.6f"
print(f"Stored output merged file with {len(df_out)} records. Range: ({range_start}, {range_end})")
elapsed = datetime.now() - start_dt
print(f"Finished processing in {int(elapsed.total_seconds())} seconds.")
print(f"Output file location: {out_path}")
def merge_data_frames(data_sources: list):
for ds in data_sources:
df = ds.get("df")
if "timestamp" in df.columns:
df = df.set_index("timestamp")
2022-03-20 10:09:33 +01:00
if ds['column_prefix']:
df = df.add_prefix(ds['column_prefix']+"_")
#df.columns = [f"{ds['column_prefix']}_{col}" for col in df.columns]
ds["start"] = df.index[0]
ds["end"] = df.index[-1]
ds["df"] = df
2022-03-20 10:09:33 +01:00
#
# Create 1m common (main) index and empty data frame
#
range_start = min([ds["start"] for ds in data_sources])
range_end = min([ds["end"] for ds in data_sources])
index = pd.date_range(range_start, range_end, freq="T")
2022-03-20 10:09:33 +01:00
df_out = pd.DataFrame(index=index)
df_out.index.name = "timestamp"
for ds in data_sources:
# Note that timestamps must have the same semantics, for example, start of kline (and not end of kline)
# If different data sets have different semantics for timestamps, then data must be shifted accordingly
df_out = df_out.join(ds["df"])
2022-03-20 10:09:33 +01:00
return df_out
2022-03-20 10:09:33 +01:00
if __name__ == '__main__':
main()