intelligent-trading-bot/scripts/merge_data.py
Alexandr Savinov 4ca349e40f formatting
2021-10-23 21:51:31 +02:00

196 lines
5.7 KiB
Python

import pandas as pd
import math
#import os.path
from pathlib import Path
import json
import time
from datetime import timedelta, datetime
from dateutil import parser
from tqdm import tqdm_notebook #(Optional, used for progress-bars)
import click
import numpy as np
from common.utils import *
from service.App import *
from common.feature_generation import *
"""
Create one output file from input files of different types: klines, futures, depth.
Depth data can be provided in several files.
Also, depth timestamps correspond to end of 1m interval and hence they will be changed to kline convention.
Futures and klines are in single files and their timestamp is start of 1m interval.
Future column names are same as in klines, and hence they will be prefixed in the output.
Output file has continuous index by removing possible gaps in input files.
"""
depth_file_names = [ # Leave empty to skip
#r"C:\DATA2\BITCOIN\GENERATED\depth-BTCUSDT-batch1.csv",
#r"C:\DATA2\BITCOIN\GENERATED\depth-BTCUSDT-batch2.csv",
#r"C:\DATA2\BITCOIN\GENERATED\depth-BTCUSDT-batch3.csv",
#r"C:\DATA2\BITCOIN\GENERATED\depth-BTCUSDT-batch4.csv",
#r"C:\DATA2\BITCOIN\GENERATED\depth-BTCUSDT-batch5.csv",
]
futur_column_prefix = "f_"
range_type = "kline" # Selector: kline, futur, depth, merged (common range)
#
# Historic data
#
def get_symbol_files(symbol):
"""
Get a list of file names with data for this symbol and frequency.
We find all files with this symbol in name in the directly recursively.
"""
file_pattern = f"*{symbol}*.txt"
paths = Path(in_path_name).rglob(file_pattern)
return list(paths)
def load_futur_files(futur_file_path):
"""Return a data frame with future features."""
df = pd.read_csv(futur_file_path, parse_dates=['timestamp'])
start = df["timestamp"].iloc[0]
end = df["timestamp"].iloc[-1]
df = df.set_index("timestamp")
print(f"Loaded futur file with {len(df)} records in total. Range: ({start}, {end})")
return df, start, end
def load_kline_files(kline_file_path):
"""Return a data frame with kline features."""
df = pd.read_csv(kline_file_path, parse_dates=['timestamp'])
start = df["timestamp"].iloc[0]
end = df["timestamp"].iloc[-1]
df = df.set_index("timestamp")
print(f"Loaded kline file with {len(df)} records in total. Range: ({start}, {end})")
return df, start, end
def load_depth_files():
"""Return a list of data frames with depth features."""
dfs = []
start = None
end = None
for depth_file_name in depth_file_names:
df = pd.read_csv(depth_file_name, parse_dates=['timestamp'])
# Start
if start is None:
start = df["timestamp"].iloc[0]
elif df["timestamp"].iloc[0] < start:
start = df["timestamp"].iloc[0]
# End
if end is None:
end = df["timestamp"].iloc[-1]
elif df["timestamp"].iloc[-1] > end:
end = df["timestamp"].iloc[-1]
df = df.set_index("timestamp")
dfs.append(df)
length = np.sum([len(df) for df in dfs])
print(f"Loaded {len(depth_file_names)} depth files with {length} records in total. Range: ({start}, {end})")
return dfs, start, end
@click.command()
@click.option('--config_file', '-c', type=click.Path(), default='', help='Configuration file name')
def main(config_file):
load_config(config_file)
freq = "1m"
symbol = App.config["symbol"]
data_path = Path(App.config["data_folder"])
if not data_path.is_dir():
print(f"Data folder does not exist: {data_path}")
return
start_dt = datetime.now()
print(f"Start processing...")
kline_file_path = data_path / f"{symbol}-{freq}-klines.csv"
k_df, k_start, k_end = load_kline_files(kline_file_path)
futur_file_path = data_path / f"{symbol}-{freq}-futurs.csv"
f_df, f_start, f_end = load_futur_files(futur_file_path)
if depth_file_names:
d_dfs, d_start, d_end = load_depth_files()
else:
d_dfs = []
d_start = 0
d_end = 0
#
# Determine range
#
if range_type.startswith("kline"):
start = k_start
end = k_end
elif range_type.startswith("futur"):
start = f_start
end = f_end
elif range_type.startswith("depth"):
start = d_start
end = d_end
elif range_type.startswith("merge"):
start = np.max([d_start, f_start, k_start])
end = np.min([d_end, f_end, k_end])
else:
print(f"Unknown parameter value. Exit.")
exit()
#
# Create 1m common (main) index and empty data frame
#
index = pd.date_range(start, end, freq="T")
df_out = pd.DataFrame(index=index)
df_out.index.name = "timestamp"
#
# Attach all necessary columns to the common data frame
#
# Attach kline data frame
df_out = df_out.join(k_df)
# Attach futur data frame by also renaming columns
f_df = f_df.rename(lambda x: futur_column_prefix + x if x != "timestamp" else x, axis='columns')
df_out = df_out.join(f_df)
# Attach several depth data frames using the same columns
for i, df in enumerate(d_dfs):
if i == 0:
df_out = df_out.join(df)
else:
df_out.update(df)
#
# Store file with features
#
out_file_name = f"{symbol}-{freq}.csv"
out_path = (data_path / out_file_name).resolve()
df_out.to_csv(out_path, index=True) # float_format="%.6f"
print(f"Stored output merged file with {len(df_out)} records. Range: ({start}, {end})")
elapsed = datetime.now() - start_dt
print(f"Finished processing in {int(elapsed.total_seconds())} seconds.")
print(f"Output file location: {out_path}")
if __name__ == '__main__':
main()