2022-03-20 10:09:33 +01:00
|
|
|
from pathlib import Path
|
|
|
|
|
from typing import Union
|
|
|
|
|
import json
|
|
|
|
|
import pickle
|
|
|
|
|
from datetime import datetime, date, timedelta
|
|
|
|
|
import queue
|
|
|
|
|
|
|
|
|
|
import numpy as np
|
|
|
|
|
import pandas as pd
|
|
|
|
|
|
|
|
|
|
from service.App import *
|
|
|
|
|
from common.utils import *
|
|
|
|
|
from common.classifiers import *
|
|
|
|
|
from common.feature_generation import *
|
|
|
|
|
from common.signal_generation import *
|
2022-04-17 11:34:34 +02:00
|
|
|
from common.model_store import *
|
2022-03-20 10:09:33 +01:00
|
|
|
|
2022-08-27 13:05:49 +02:00
|
|
|
from scripts.merge import *
|
|
|
|
|
from scripts.features import *
|
2022-04-23 09:18:45 +02:00
|
|
|
|
2022-03-20 10:09:33 +01:00
|
|
|
import logging
|
|
|
|
|
log = logging.getLogger('analyzer')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Analyzer:
|
|
|
|
|
"""
|
|
|
|
|
In-memory database which represents the current state of the (trading) environment including its history.
|
|
|
|
|
|
|
|
|
|
Properties of klines:
|
|
|
|
|
- "timestamp" is a left border of the interval like "2017-08-17 04:00:00"
|
|
|
|
|
- "close_time" is a right border of the interval in ms (last millisecond) like "1502942459999" equivalent to "2017-08-17 04:00::59.999"
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def __init__(self, config):
|
|
|
|
|
"""
|
|
|
|
|
Create a new operation object using its definition.
|
|
|
|
|
|
|
|
|
|
:param config: Initialization parameters defining what is in the database including its persistent parameters and schema
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
self.config = config
|
|
|
|
|
|
|
|
|
|
#
|
|
|
|
|
# Data state
|
|
|
|
|
#
|
|
|
|
|
|
|
|
|
|
# Klines are stored as a dict of lists. Key is a symbol and the list is a list of latest kline records
|
|
|
|
|
# One kline record is a list of values (not dict) as returned by API: open time, open, high, low, close, volume etc.
|
|
|
|
|
self.klines = {}
|
|
|
|
|
|
|
|
|
|
self.queue = queue.Queue()
|
|
|
|
|
|
|
|
|
|
#
|
|
|
|
|
# Load models
|
|
|
|
|
#
|
2022-04-17 11:34:34 +02:00
|
|
|
symbol = App.config["symbol"]
|
|
|
|
|
data_path = Path(App.config["data_folder"]) / symbol
|
2023-03-11 10:05:43 +01:00
|
|
|
|
|
|
|
|
model_path = Path(App.config["model_folder"])
|
2022-03-20 10:09:33 +01:00
|
|
|
if not model_path.is_absolute():
|
2023-03-11 10:05:43 +01:00
|
|
|
model_path = data_path / model_path
|
2022-03-20 10:09:33 +01:00
|
|
|
model_path = model_path.resolve()
|
|
|
|
|
|
2023-02-20 20:01:39 +01:00
|
|
|
sa_sets = ['score_aggregation', 'score_aggregation_2']
|
|
|
|
|
all_labels = []
|
|
|
|
|
for i, score_aggregation_set in enumerate(sa_sets):
|
|
|
|
|
score_aggregation = App.config.get(score_aggregation_set)
|
|
|
|
|
if not score_aggregation:
|
|
|
|
|
continue
|
|
|
|
|
all_labels.extend(score_aggregation.get("buy_labels"))
|
|
|
|
|
all_labels.extend(score_aggregation.get("sell_labels"))
|
|
|
|
|
|
|
|
|
|
self.models = {label: load_model_pair(model_path, label) for label in all_labels}
|
2022-03-20 10:09:33 +01:00
|
|
|
|
2022-04-10 19:38:53 +02:00
|
|
|
#
|
|
|
|
|
# Load latest transaction and (simulated) trade state
|
|
|
|
|
#
|
|
|
|
|
transaction_file = Path("transactions.txt")
|
|
|
|
|
t_dict = dict(timestamp=str(datetime.now()), price=0.0, profit=0.0, status="")
|
|
|
|
|
if transaction_file.is_file():
|
|
|
|
|
with open(transaction_file, "r") as f:
|
|
|
|
|
line = ""
|
|
|
|
|
for line in f:
|
|
|
|
|
pass
|
|
|
|
|
if line:
|
|
|
|
|
t_dict = dict(zip("timestamp,price,profit,status".split(","), line.strip().split(",")))
|
|
|
|
|
t_dict["price"] = float(t_dict["price"])
|
|
|
|
|
t_dict["profit"] = float(t_dict["profit"])
|
|
|
|
|
#t_dict = json.loads(line)
|
|
|
|
|
else: # Create file with header
|
|
|
|
|
pass
|
|
|
|
|
#with open(transaction_file, 'a+') as f:
|
|
|
|
|
# f.write("timestamp,price,profit,status\n")
|
|
|
|
|
App.transaction = t_dict
|
|
|
|
|
|
2022-03-20 10:09:33 +01:00
|
|
|
#
|
|
|
|
|
# Start a thread for storing data
|
|
|
|
|
#
|
|
|
|
|
|
|
|
|
|
#
|
|
|
|
|
# Data state operations
|
|
|
|
|
#
|
|
|
|
|
|
|
|
|
|
def get_klines_count(self, symbol):
|
|
|
|
|
return len(self.klines.get(symbol, []))
|
|
|
|
|
|
|
|
|
|
def get_last_kline(self, symbol):
|
|
|
|
|
if self.get_klines_count(symbol) > 0:
|
|
|
|
|
return self.klines.get(symbol)[-1]
|
|
|
|
|
else:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def get_last_kline_ts(self, symbol):
|
|
|
|
|
"""Open time of the last kline. It is simultaneously kline id. Add 1m if the end is needed."""
|
|
|
|
|
last_kline = self.get_last_kline(symbol=symbol)
|
|
|
|
|
if not last_kline:
|
|
|
|
|
return 0
|
|
|
|
|
last_kline_ts = last_kline[0]
|
|
|
|
|
return last_kline_ts
|
|
|
|
|
|
|
|
|
|
def get_missing_klines_count(self, symbol):
|
|
|
|
|
now_ts = now_timestamp()
|
|
|
|
|
last_kline_ts = self.get_last_kline_ts(symbol)
|
|
|
|
|
if not last_kline_ts:
|
2022-03-25 22:49:33 +01:00
|
|
|
return App.config["features_horizon"]
|
2022-04-23 09:18:45 +02:00
|
|
|
end_of_last_kline = last_kline_ts + 60_000 # Plus 1m because kline timestamp is
|
2022-03-20 10:09:33 +01:00
|
|
|
|
|
|
|
|
minutes = (now_ts - end_of_last_kline) / 60_000
|
|
|
|
|
minutes += 2
|
|
|
|
|
return int(minutes)
|
|
|
|
|
|
|
|
|
|
def store_klines(self, data: dict):
|
|
|
|
|
"""
|
|
|
|
|
Store latest klines for the specified symbols.
|
|
|
|
|
Existing klines for the symbol and timestamp will be overwritten.
|
|
|
|
|
|
|
|
|
|
:param data: Dict of lists with symbol as a key, and list of klines for this symbol as a value.
|
|
|
|
|
Example: { 'BTCUSDT': [ [], [], [] ] }
|
|
|
|
|
:type dict:
|
|
|
|
|
"""
|
|
|
|
|
now_ts = now_timestamp()
|
|
|
|
|
|
|
|
|
|
for symbol, klines in data.items():
|
|
|
|
|
# If symbol does not exist then create
|
|
|
|
|
klines_data = self.klines.get(symbol)
|
|
|
|
|
if klines_data is None:
|
|
|
|
|
self.klines[symbol] = []
|
|
|
|
|
klines_data = self.klines.get(symbol)
|
|
|
|
|
|
|
|
|
|
ts = klines[0][0] # Very first timestamp of the new data
|
|
|
|
|
|
|
|
|
|
# Find kline with this or younger timestamp in the database
|
|
|
|
|
# same_kline = next((x for x in klines_data if x[0] == ts), None)
|
|
|
|
|
existing_indexes = [i for i, x in enumerate(klines_data) if x[0] >= ts]
|
|
|
|
|
#print(f"===>>> Existing tss: {[x[0] for x in klines_data]}")
|
|
|
|
|
#print(f"===>>> New tss: {[x[0] for x in klines]}")
|
|
|
|
|
#print(f"===>>> {symbol} Overlap {len(existing_indexes)}. Existing Indexes: {existing_indexes}")
|
|
|
|
|
if existing_indexes: # If there is overlap with new klines
|
|
|
|
|
start = min(existing_indexes)
|
|
|
|
|
num_deleted = len(klines_data) - start
|
|
|
|
|
del klines_data[start:] # Delete starting from the first kline in new data (which will be added below)
|
|
|
|
|
if len(klines) < num_deleted: # It is expected that we add same or more klines than deleted
|
|
|
|
|
log.error("More klines is deleted by new klines added, than we actually add. Something woring with timestamps and storage logic.")
|
|
|
|
|
|
|
|
|
|
# Append new klines
|
|
|
|
|
klines_data.extend(klines)
|
|
|
|
|
|
|
|
|
|
# Remove too old klines
|
2022-03-25 22:49:33 +01:00
|
|
|
kline_window = App.config["features_horizon"]
|
2022-03-20 10:09:33 +01:00
|
|
|
to_delete = len(klines_data) - kline_window
|
|
|
|
|
if to_delete > 0:
|
|
|
|
|
del klines_data[:to_delete]
|
|
|
|
|
|
|
|
|
|
# Check validity. It has to be an ordered time series with certain frequency
|
|
|
|
|
for i, kline in enumerate(self.klines.get(symbol)):
|
|
|
|
|
ts = kline[0]
|
|
|
|
|
if i > 0:
|
|
|
|
|
if ts - prev_ts != 60_000:
|
|
|
|
|
log.error("Wrong sequence of klines. They are expected to be a regular time series with 1m frequency.")
|
|
|
|
|
prev_ts = kline[0]
|
|
|
|
|
|
|
|
|
|
# Debug message about the last received kline end and current ts (which must be less than 1m - rather small delay)
|
|
|
|
|
log.debug(f"Stored klines. Total {len(klines_data)} in db. Last kline end: {self.get_last_kline_ts(symbol)+60_000}. Current time: {now_ts}")
|
|
|
|
|
|
|
|
|
|
def store_depth(self, depths: list, freq):
|
|
|
|
|
"""
|
|
|
|
|
Persistently store order books from the input list. Each entry is one response from order book request for one symbol.
|
|
|
|
|
Currently the order books are directly stored in a file (for this symbol) and not in this object.
|
|
|
|
|
|
|
|
|
|
:param depths: List of dicts where each dict is an order book with such fields as 'asks', 'bids' and 'symbol' (symbol is added after loading).
|
|
|
|
|
:type list:
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
# File name like TRADE_HOME/COLLECT/DEPTH/depth-BTCUSDT-5s.txt
|
|
|
|
|
TRADE_DATA = "." # TODO: We need to read it from the environment. It could be data dir or docker volume.
|
|
|
|
|
# BASE_DIR = Path(__file__).resolve().parent.parent
|
|
|
|
|
# BASE_DIR = Path.cwd()
|
|
|
|
|
|
|
|
|
|
for depth in depths:
|
|
|
|
|
# TODO: The result might be an exception or some other object denoting bad return (timeout, cancelled etc.)
|
|
|
|
|
|
|
|
|
|
symbol = depth["symbol"]
|
|
|
|
|
|
|
|
|
|
path = Path(TRADE_DATA).joinpath(App.config["collector"]["folder"])
|
|
|
|
|
path = path.joinpath(App.config["collector"]["depth"]["folder"])
|
|
|
|
|
path.mkdir(parents=True, exist_ok=True) # Ensure that dir exists
|
|
|
|
|
|
|
|
|
|
file_name = f"depth-{symbol}-{freq}"
|
|
|
|
|
file = Path(path, file_name).with_suffix(".txt")
|
|
|
|
|
|
|
|
|
|
# Append to the file (create if it does not exist)
|
|
|
|
|
json_line = json.dumps(depth)
|
|
|
|
|
with open(file, 'a+') as f:
|
|
|
|
|
f.write(json_line + "\n")
|
|
|
|
|
|
|
|
|
|
def store_queue(self):
|
|
|
|
|
"""
|
|
|
|
|
Persistently store the queue data to one or more files corresponding to the stream (event) type, symbol (and frequency).
|
|
|
|
|
|
|
|
|
|
:return:
|
|
|
|
|
"""
|
|
|
|
|
#
|
|
|
|
|
# Get all the data from the queue
|
|
|
|
|
#
|
|
|
|
|
events = {}
|
|
|
|
|
item = None
|
|
|
|
|
while True:
|
|
|
|
|
try:
|
|
|
|
|
item = self.queue.get_nowait()
|
|
|
|
|
except queue.Empty as ee:
|
|
|
|
|
break
|
|
|
|
|
except:
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
if item is None:
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
c = item.get("e") # Channel
|
|
|
|
|
if not events.get(c): # Insert if does not exit
|
|
|
|
|
events[c] = {}
|
|
|
|
|
symbols = events[c]
|
|
|
|
|
|
|
|
|
|
s = item.get("s") # Symbol
|
|
|
|
|
if not symbols.get(s): # Insert if does not exit
|
|
|
|
|
symbols[s] = []
|
|
|
|
|
data = symbols[s]
|
|
|
|
|
|
|
|
|
|
data.append(item)
|
|
|
|
|
|
|
|
|
|
self.queue.task_done() # TODO: Do we really need this?
|
|
|
|
|
|
|
|
|
|
# File name like TRADE_HOME/COLLECT/DEPTH/depth-BTCUSDT-5s.txt
|
|
|
|
|
TRADE_DATA = "." # TODO: We need to read it from the environment. It could be data dir or docker volume.
|
|
|
|
|
# BASE_DIR = Path(__file__).resolve().parent.parent
|
|
|
|
|
# BASE_DIR = Path.cwd()
|
|
|
|
|
|
|
|
|
|
path = Path(TRADE_DATA).joinpath(App.config["collector"]["folder"])
|
|
|
|
|
path = path.joinpath(App.config["collector"]["stream"]["folder"])
|
|
|
|
|
path.mkdir(parents=True, exist_ok=True) # Ensure that dir exists
|
|
|
|
|
|
|
|
|
|
now = datetime.utcnow()
|
|
|
|
|
#rotate_suffix = f"{now:%Y}{now:%m}{now:%d}" # Daily files
|
|
|
|
|
rotate_suffix = f"{now:%Y}{now:%m}" # Monthly files
|
|
|
|
|
|
|
|
|
|
#
|
|
|
|
|
# Get all the data from the queue and store in file
|
|
|
|
|
#
|
|
|
|
|
for c, symbols in events.items():
|
|
|
|
|
for s, data in symbols.items():
|
|
|
|
|
file_name = f"{c}-{s}-{rotate_suffix}"
|
|
|
|
|
file = Path(path, file_name).with_suffix(".txt")
|
|
|
|
|
|
|
|
|
|
# Append to the file (create if it does not exist)
|
|
|
|
|
data = [json.dumps(event) for event in data]
|
|
|
|
|
data_str = "\n".join(data)
|
|
|
|
|
with open(file, 'a+') as f:
|
|
|
|
|
f.write(data_str + "\n")
|
|
|
|
|
|
|
|
|
|
#
|
|
|
|
|
# Analysis (features, predictions, signals etc.)
|
|
|
|
|
#
|
|
|
|
|
|
|
|
|
|
def analyze(self):
|
|
|
|
|
"""
|
|
|
|
|
1. Convert klines to df
|
|
|
|
|
2. Derive (compute) features (use same function as for model training)
|
|
|
|
|
3. Derive (predict) labels by applying models trained for each label
|
|
|
|
|
4. Generate buy/sell signals by applying rule models trained for best overall trade performance
|
|
|
|
|
"""
|
|
|
|
|
symbol = App.config["symbol"]
|
|
|
|
|
|
|
|
|
|
last_kline_ts = self.get_last_kline_ts(symbol)
|
2022-04-13 16:34:38 +02:00
|
|
|
last_kline_ts_str = str(pd.to_datetime(last_kline_ts, unit='ms'))
|
2022-03-20 10:09:33 +01:00
|
|
|
|
2022-04-23 09:18:45 +02:00
|
|
|
log.info(f"Analyze {symbol}. Last kline timestamp: {last_kline_ts_str}")
|
2022-03-20 10:09:33 +01:00
|
|
|
|
|
|
|
|
#
|
|
|
|
|
# 1.
|
2022-04-23 09:18:45 +02:00
|
|
|
# MERGE: Produce a single data frame with înput data from all sources
|
2022-03-20 10:09:33 +01:00
|
|
|
#
|
2022-04-23 09:18:45 +02:00
|
|
|
data_sources = App.config.get("data_sources", [])
|
|
|
|
|
if not data_sources:
|
|
|
|
|
data_sources = [{"folder": App.config["symbol"], "file": "klines", "column_prefix": ""}]
|
|
|
|
|
|
|
|
|
|
# Read data from online sources into data frames
|
|
|
|
|
for ds in data_sources:
|
|
|
|
|
if ds.get("file") == "klines":
|
|
|
|
|
try:
|
|
|
|
|
klines = self.klines.get(ds.get("folder"))
|
|
|
|
|
df = klines_to_df(klines)
|
|
|
|
|
|
|
|
|
|
# Validate
|
|
|
|
|
source_columns = ['open', 'high', 'low', 'close', 'volume', 'close_time', 'quote_av', 'trades', 'tb_base_av', 'tb_quote_av']
|
|
|
|
|
if df.isnull().any().any():
|
|
|
|
|
null_columns = {k: v for k, v in df.isnull().any().to_dict().items() if v}
|
|
|
|
|
log.warning(f"Null in source data found. Columns with Null: {null_columns}")
|
|
|
|
|
# TODO: We might receive empty strings or 0s in numeric data - how can we detect them?
|
|
|
|
|
# TODO: Check that timestamps in 'close_time' are strictly consecutive
|
|
|
|
|
except Exception as e:
|
|
|
|
|
log.error(f"Error in klines_to_df method: {e}. Length klines: {len(klines)}")
|
|
|
|
|
return
|
|
|
|
|
else:
|
|
|
|
|
log.error("Unknown data sources. Currently only 'klines' is supported. Check 'data_sources' in config, key 'file'")
|
|
|
|
|
return
|
|
|
|
|
ds["df"] = df
|
|
|
|
|
|
|
|
|
|
# Merge in one df with prefixes and common regular time index
|
2022-08-27 13:05:49 +02:00
|
|
|
df = merge_data_sources(data_sources)
|
2022-04-13 16:34:38 +02:00
|
|
|
|
2022-03-20 10:09:33 +01:00
|
|
|
#
|
|
|
|
|
# 2.
|
|
|
|
|
# Generate all necessary derived features (NaNs are possible due to short history)
|
|
|
|
|
#
|
2023-02-11 17:50:07 +01:00
|
|
|
# We want to generate features only for the last rows (for performance reasons)
|
|
|
|
|
# Therefore, determine how many last rows we will actually need
|
2023-02-12 11:06:34 +01:00
|
|
|
window_1 = App.config.get("score_aggregation", {}).get("window", 0)
|
2023-02-11 17:50:07 +01:00
|
|
|
window_2 = App.config.get("score_aggregation_2", {}).get("window", 0)
|
|
|
|
|
last_rows = max(window_1, window_2) + 1
|
2022-03-26 12:43:02 +01:00
|
|
|
|
2022-04-23 09:18:45 +02:00
|
|
|
feature_sets = App.config.get("feature_sets", [])
|
|
|
|
|
if not feature_sets:
|
2022-07-23 09:12:34 +02:00
|
|
|
log.error(f"ERROR: no feature sets defined. Nothing to process.")
|
|
|
|
|
return
|
2022-04-23 09:18:45 +02:00
|
|
|
# By default, we generate standard kline features
|
2022-07-23 09:12:34 +02:00
|
|
|
#feature_sets = [{"column_prefix": "", "generator": "binance_main", "feature_prefix": ""}]
|
2022-04-23 09:18:45 +02:00
|
|
|
|
|
|
|
|
# Apply all feature generators to the data frame which get accordingly new derived columns
|
|
|
|
|
# The feature parameters will be taken from App.config (depending on generator)
|
2022-07-18 20:23:36 +02:00
|
|
|
for fs in feature_sets:
|
|
|
|
|
df, _ = generate_feature_set(df, fs, last_rows=last_rows)
|
2022-04-23 09:18:45 +02:00
|
|
|
|
|
|
|
|
df = df.iloc[-last_rows:] # For signal generation, ew will need only several last rows
|
2022-03-20 10:09:33 +01:00
|
|
|
|
|
|
|
|
#
|
|
|
|
|
# 3.
|
2022-04-23 09:18:45 +02:00
|
|
|
# Apply ML models and generate score columns
|
2022-03-20 10:09:33 +01:00
|
|
|
#
|
|
|
|
|
|
|
|
|
|
# kline feature set
|
2022-07-18 20:23:36 +02:00
|
|
|
features = App.config["train_features"]
|
2022-03-26 12:43:02 +01:00
|
|
|
predict_df = df[features]
|
2022-04-10 19:38:53 +02:00
|
|
|
if predict_df.isnull().any().any():
|
|
|
|
|
null_columns = {k: v for k, v in predict_df.isnull().any().to_dict().items() if v}
|
|
|
|
|
log.error(f"Null in predict_df found. Columns with Null: {null_columns}")
|
|
|
|
|
return
|
2022-03-20 10:09:33 +01:00
|
|
|
|
2022-03-25 22:49:33 +01:00
|
|
|
# Do prediction by applying all models (for the score columns declared in config) to the data
|
2022-03-20 10:09:33 +01:00
|
|
|
score_df = pd.DataFrame(index=predict_df.index)
|
|
|
|
|
try:
|
|
|
|
|
for score_column_name, model_pair in self.models.items():
|
2022-08-07 15:10:58 +02:00
|
|
|
|
|
|
|
|
label, algo_name = score_to_label_algo_pair(score_column_name)
|
|
|
|
|
model_config = get_model(algo_name) # Get algorithm description from the algo store
|
|
|
|
|
algo_type = model_config.get("algo")
|
|
|
|
|
|
|
|
|
|
if algo_type == "gb":
|
2022-04-15 15:27:30 +02:00
|
|
|
df_y_hat = predict_gb(model_pair, predict_df, get_model("gb"))
|
2022-08-07 15:10:58 +02:00
|
|
|
elif algo_type == "nn":
|
2022-04-15 15:27:30 +02:00
|
|
|
df_y_hat = predict_nn(model_pair, predict_df, get_model("nn"))
|
2022-08-07 15:10:58 +02:00
|
|
|
elif algo_type == "lc":
|
2022-04-15 15:27:30 +02:00
|
|
|
df_y_hat = predict_lc(model_pair, predict_df, get_model("lc"))
|
2022-08-07 15:10:58 +02:00
|
|
|
elif algo_type == "svc":
|
2022-08-05 21:27:46 +02:00
|
|
|
df_y_hat = predict_svc(model_pair, predict_df, get_model("svc"))
|
2022-03-20 10:09:33 +01:00
|
|
|
else:
|
2022-08-07 15:10:58 +02:00
|
|
|
raise ValueError(f"Unknown algorithm type '{algo_type}'")
|
|
|
|
|
|
2022-03-20 10:09:33 +01:00
|
|
|
score_df[score_column_name] = df_y_hat
|
2022-08-07 15:10:58 +02:00
|
|
|
|
2022-03-20 10:09:33 +01:00
|
|
|
except Exception as e:
|
2022-12-18 10:52:17 +01:00
|
|
|
log.error(f"Error in predict: {e}: '{score_column_name=}', '{algo_name=}")
|
2022-03-20 10:09:33 +01:00
|
|
|
return
|
|
|
|
|
|
2022-03-25 22:49:33 +01:00
|
|
|
# This df contains only one (last) record
|
2022-03-26 12:43:02 +01:00
|
|
|
df = df.join(score_df)
|
2022-03-25 22:49:33 +01:00
|
|
|
#df = pd.concat([predict_df, score_df], axis=1)
|
2022-03-20 10:09:33 +01:00
|
|
|
|
|
|
|
|
#
|
|
|
|
|
# 4.
|
2023-02-11 17:50:07 +01:00
|
|
|
# Aggregate and post-process
|
2022-03-20 10:09:33 +01:00
|
|
|
#
|
2023-03-11 14:37:33 +01:00
|
|
|
trade_score_column_names = []
|
2023-02-12 12:38:10 +01:00
|
|
|
sa_sets = ['score_aggregation', 'score_aggregation_2']
|
|
|
|
|
for i, score_aggregation_set in enumerate(sa_sets):
|
2023-02-12 11:50:12 +01:00
|
|
|
score_aggregation = App.config.get(score_aggregation_set)
|
|
|
|
|
if not score_aggregation:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
buy_labels = score_aggregation.get("buy_labels")
|
|
|
|
|
sell_labels = score_aggregation.get("sell_labels")
|
|
|
|
|
|
|
|
|
|
# Output (post-processed) columns for each aggregation set
|
|
|
|
|
buy_column = 'buy_score_column'
|
|
|
|
|
sell_column = 'sell_score_column'
|
|
|
|
|
# Aggregate scores between each other and in time
|
|
|
|
|
aggregate_scores(df, score_aggregation, buy_column, buy_labels)
|
|
|
|
|
aggregate_scores(df, score_aggregation, sell_column, sell_labels)
|
2023-03-11 14:37:33 +01:00
|
|
|
|
2023-02-12 11:50:12 +01:00
|
|
|
# Mutually adjust two independent scores with opposite semantics
|
|
|
|
|
combine_scores(df, score_aggregation, buy_column, sell_column)
|
2022-03-20 10:09:33 +01:00
|
|
|
|
2023-03-11 14:37:33 +01:00
|
|
|
trade_score_column = score_aggregation.get("trade_score")
|
|
|
|
|
trade_score_column_names.append(trade_score_column)
|
|
|
|
|
|
2023-02-04 12:54:58 +01:00
|
|
|
#
|
|
|
|
|
# 5.
|
|
|
|
|
# Apply rule to last row
|
|
|
|
|
#
|
2023-02-12 12:38:10 +01:00
|
|
|
signal_model = App.config['signal_model']
|
|
|
|
|
if signal_model.get('rule_type') == 'two_dim_rule':
|
2023-03-11 17:43:59 +01:00
|
|
|
apply_rule_with_score_thresholds_2(df, signal_model, trade_score_column_names)
|
2023-02-12 12:38:10 +01:00
|
|
|
else: # Default one dim rule
|
2023-03-11 17:43:59 +01:00
|
|
|
apply_rule_with_score_thresholds(df, signal_model, trade_score_column_names)
|
2022-08-05 21:27:46 +02:00
|
|
|
|
2022-03-25 22:49:33 +01:00
|
|
|
#
|
2023-02-04 12:54:58 +01:00
|
|
|
# 6.
|
2022-03-25 22:49:33 +01:00
|
|
|
# Collect results and create signal object
|
|
|
|
|
#
|
2023-02-20 20:01:39 +01:00
|
|
|
row = df.iloc[-1] # Last row stores the latest values we need
|
2023-02-04 12:54:58 +01:00
|
|
|
|
2022-03-25 22:49:33 +01:00
|
|
|
close_price = row["close"]
|
2022-04-10 19:38:53 +02:00
|
|
|
close_time = row.name+timedelta(minutes=1) # Add 1 minute because timestamp is start of the interval
|
2022-03-20 10:09:33 +01:00
|
|
|
|
2023-03-26 15:47:30 +02:00
|
|
|
trade_score = row[trade_score_column_names[0]]
|
2023-02-20 20:01:39 +01:00
|
|
|
|
|
|
|
|
buy_signal = row["buy_signal_column"]
|
|
|
|
|
sell_signal = row["sell_signal_column"]
|
|
|
|
|
|
2022-03-25 22:49:33 +01:00
|
|
|
signal = dict(
|
2022-03-26 16:45:23 +01:00
|
|
|
side="",
|
2023-03-26 15:47:30 +02:00
|
|
|
trade_score=trade_score,
|
2022-03-25 22:49:33 +01:00
|
|
|
buy_signal=buy_signal, sell_signal=sell_signal,
|
|
|
|
|
close_price=close_price, close_time=close_time
|
|
|
|
|
)
|
|
|
|
|
|
2023-03-26 15:47:30 +02:00
|
|
|
if pd.isnull(trade_score):
|
2022-03-25 22:49:33 +01:00
|
|
|
pass # Something is wrong with the computation results
|
|
|
|
|
elif buy_signal and sell_signal: # Both signals are true - should not happen
|
|
|
|
|
pass
|
|
|
|
|
elif buy_signal:
|
2022-03-20 10:09:33 +01:00
|
|
|
signal["side"] = "BUY"
|
2022-03-25 22:49:33 +01:00
|
|
|
elif sell_signal:
|
2022-03-20 10:09:33 +01:00
|
|
|
signal["side"] = "SELL"
|
|
|
|
|
else:
|
|
|
|
|
signal["side"] = ""
|
|
|
|
|
|
|
|
|
|
App.signal = signal
|
|
|
|
|
|
2023-03-26 15:47:30 +02:00
|
|
|
log.info(f"Analyze finished. Signal: {signal['side']}. Trade score: {trade_score:+.3f}. Price: {int(close_price):,}")
|
2022-03-20 10:09:33 +01:00
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
pass
|