mirror of
https://github.com/asavinov/intelligent-trading-bot.git
synced 2026-05-04 08:26:19 +00:00
440 lines
18 KiB
Python
440 lines
18 KiB
Python
import os
|
|
from datetime import datetime, timezone, timedelta
|
|
from typing import Union, List, Tuple
|
|
import json
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
|
|
|
|
"""
|
|
Signal generation is based on processing a relatively small number of highly informative
|
|
(point-wise) scores generated by ML algorithms. The goal is to apply some rules to these
|
|
scores and generate the final signal buy, sell or do nothing. Such rules are described via
|
|
a number of parameters. These parameters are chosen to optimize the final trade performance
|
|
(and not precision in ML algorithms). Thus we have two sets of functions: 1) computing rules with
|
|
given parameters, and 2) finding optimal parameters of rules (currently via grid search).
|
|
"""
|
|
|
|
|
|
def generate_smoothen_scores(df, config: dict):
|
|
"""
|
|
Smoothen several columns and rows. Used for smoothing scores.
|
|
|
|
The following operations are applied:
|
|
- find average of the specified input columns (row-wise)
|
|
- find moving average with the specified window
|
|
- apply threshold to source buy/sell column(s) according to threshold parameter(s) by producing a boolean column
|
|
|
|
Notes:
|
|
- Input point-wise scores in buy and sell columns are always positive
|
|
"""
|
|
|
|
columns = config.get('columns')
|
|
if not columns:
|
|
raise ValueError(f"The 'columns' parameter must be a non-empty string. {type(columns)}")
|
|
elif isinstance(columns, str):
|
|
columns = [columns]
|
|
|
|
# TODO: check that all columns exist
|
|
#if columns not in df.columns:
|
|
# raise ValueError(f"{columns} do not exist in the input data. Existing columns: {df.columns.to_list()}")
|
|
|
|
# Average all buy and sell columns
|
|
out_column = df[columns].mean(skipna=True, axis=1)
|
|
|
|
# Apply thresholds (if specified) and binarize the score
|
|
point_threshold = config.get("point_threshold")
|
|
if point_threshold:
|
|
out_column = out_column >= point_threshold
|
|
|
|
# Moving average
|
|
window = config.get("window")
|
|
if isinstance(window, int):
|
|
out_column = out_column.rolling(window, min_periods=window // 2).mean()
|
|
elif isinstance(window, float):
|
|
out_column = out_column.ewm(span=window, min_periods=window // 2, adjust=False).mean()
|
|
|
|
names = config.get('names')
|
|
if not isinstance(names, str):
|
|
raise ValueError(f"'names' parameter must be a non-empty string. {type(names)}")
|
|
|
|
df[names] = out_column
|
|
|
|
return df, [names]
|
|
|
|
|
|
def generate_combine_scores(df, config: dict):
|
|
"""
|
|
ML algorithms predict score which is always positive and typically within [0,1].
|
|
One score for price growth and one score for price fall. This function combines pairs
|
|
of such scores and produce one score within [-1,+1]. Positive values mean growth
|
|
and negative values mean fall of price.
|
|
"""
|
|
columns = config.get('columns')
|
|
if not columns:
|
|
raise ValueError(f"The 'columns' parameter must be a non-empty string. {type(columns)}")
|
|
elif not isinstance(columns, list) or len(columns) != 2:
|
|
raise ValueError(f"'columns' parameter must be a list with buy column name and sell column name. {type(columns)}")
|
|
|
|
up_column = columns[0]
|
|
down_column = columns[1]
|
|
|
|
out_column = config.get('names')
|
|
|
|
if config.get("combine") == "relative":
|
|
combine_scores_relative(df, up_column, down_column, out_column)
|
|
elif config.get("combine") == "difference":
|
|
combine_scores_difference(df, up_column, down_column, out_column)
|
|
else:
|
|
# If buy score is greater than sell score then positive buy, otherwise negative sell
|
|
df[out_column] = df[[up_column, down_column]].apply(lambda x: x[0] if x[0] >= x[1] else -x[1], raw=True, axis=1)
|
|
|
|
# Scale the score distribution to make it symmetric or normalize
|
|
# Always apply the transformation to buy score. It might be in [0,1] or [-1,+1] depending on combine parameter
|
|
if config.get("coefficient"):
|
|
df[out_column] = df[out_column] * config.get("coefficient")
|
|
if config.get("constant"):
|
|
df[out_column] = df[out_column] + config.get("constant")
|
|
|
|
return df, [out_column]
|
|
|
|
|
|
def combine_scores_relative(df, buy_column, sell_column, trade_column_out):
|
|
"""
|
|
Mutually adjust input buy and sell scores by producing two output scores.
|
|
The idea is that if both scores (buy and sell) are equally high then in the output
|
|
they both will be 0. The output score describe if this score is higher relative to the other.
|
|
The two output scores are in [-1, +1] but have opposite values.
|
|
"""
|
|
|
|
# compute proportion in the sum
|
|
buy_plus_sell = df[buy_column] + df[sell_column]
|
|
buy_sell_score = ((df[buy_column] / buy_plus_sell) * 2) - 1.0 # in [-1, +1]
|
|
|
|
df[trade_column_out] = buy_sell_score # High values mean buy signal
|
|
#df[buy_column_out] = df[df[buy_column_out] < 0] = 0 # Set negative values to 0
|
|
|
|
return buy_sell_score
|
|
|
|
|
|
def combine_scores_difference(df, buy_column, sell_column, trade_column_out):
|
|
"""
|
|
This transformation represents how much buy score higher than sell score.
|
|
If they are equal then the output is 0. The output scores have opposite signs.
|
|
"""
|
|
|
|
# difference
|
|
buy_minus_sell = df[buy_column] - df[sell_column]
|
|
|
|
df[trade_column_out] = buy_minus_sell # High values mean buy signal
|
|
#df[buy_column_out] = df[df[buy_column_out] < 0] = 0 # Set negative values to 0
|
|
|
|
return buy_minus_sell
|
|
|
|
|
|
def compute_score_slope(df, model, buy_score_columns_in, sell_score_columns_in):
|
|
"""
|
|
Experimental. Currently not used.
|
|
Compute slope of the numeric score over model.get("buy_window") and model.get("sell_window")
|
|
"""
|
|
|
|
from scipy import stats
|
|
from sklearn import linear_model
|
|
def linear_regr_fn(X):
|
|
"""
|
|
Given a Series, fit a linear regression model and return its slope interpreted as a trend.
|
|
The sequence of values in X must correspond to increasing time in order for the trend to make sense.
|
|
"""
|
|
X_array = np.asarray(range(len(X)))
|
|
y_array = X
|
|
if np.isnan(y_array).any():
|
|
nans = ~np.isnan(y_array)
|
|
X_array = X_array[nans]
|
|
y_array = y_array[nans]
|
|
|
|
# X_array = X_array.reshape(-1, 1) # Make matrix
|
|
# model = linear_model.LinearRegression()
|
|
# model.fit(X_array, y_array)
|
|
# slope = model.coef_[0]
|
|
|
|
slope, intercept, r, p, se = stats.linregress(X_array, y_array)
|
|
|
|
return slope
|
|
|
|
# if 'buy_score_slope' not in df.columns:
|
|
# w = 10 #model.get("buy_window")
|
|
# df['buy_score_slope'] = df['buy_score_column'].rolling(window=w, min_periods=max(1, w // 2)).apply(linear_regr_fn, raw=True)
|
|
# w = 10 #model.get("sell_window")
|
|
# df['sell_score_slope'] = df['sell_score_column'].rolling(window=w, min_periods=max(1, w // 2)).apply(linear_regr_fn, raw=True)
|
|
|
|
#
|
|
# Signal rules
|
|
#
|
|
|
|
def generate_threshold_rule(df, config):
|
|
"""
|
|
Apply rules based on thresholds and generate trade signal buy, sell or do nothing.
|
|
|
|
Returns signals in two pre-defined columns: 'buy_signal_column' and 'sell_signal_column'
|
|
"""
|
|
parameters = config.get("parameters", {})
|
|
|
|
columns = config.get("columns")
|
|
if not columns:
|
|
raise ValueError(f"The 'columns' parameter must be a non-empty string. {type(columns)}")
|
|
elif isinstance(columns, list):
|
|
columns = [columns]
|
|
|
|
buy_signal_column = config.get("names")[0]
|
|
sell_signal_column = config.get("names")[1]
|
|
|
|
df[buy_signal_column] = \
|
|
(df[columns] >= parameters.get("buy_signal_threshold"))
|
|
df[sell_signal_column] = \
|
|
(df[columns] <= parameters.get("sell_signal_threshold"))
|
|
|
|
return df, [buy_signal_column, sell_signal_column]
|
|
|
|
|
|
# TODO: DEPRECATED TO BE REMOVED
|
|
def apply_rule_with_score_thresholds(df, score_column_names, model):
|
|
"""
|
|
Apply rules based on thresholds and generate trade signal buy, sell or do nothing.
|
|
|
|
Returns signals in two pre-defined columns: 'buy_signal_column' and 'sell_signal_column'
|
|
"""
|
|
parameters = model.get("parameters", {})
|
|
|
|
signal_column = model.get("signal_columns")[0]
|
|
signal_column_2 = model.get("signal_columns")[1]
|
|
|
|
score_column = score_column_names[0]
|
|
|
|
df[signal_column] = \
|
|
(df[score_column] >= parameters.get("buy_signal_threshold"))
|
|
df[signal_column_2] = \
|
|
(df[score_column] <= parameters.get("sell_signal_threshold"))
|
|
|
|
|
|
def generate_threshold_rule2(df, config):
|
|
"""
|
|
Assume using difference combination with negative sell scores
|
|
"""
|
|
parameters = config.get("parameters", {})
|
|
|
|
columns = config.get("columns")
|
|
if not columns:
|
|
raise ValueError(f"The 'columns' parameter must be a non-empty string. {type(columns)}")
|
|
elif not isinstance(columns, list) or len(columns) != 2:
|
|
raise ValueError(f"'columns' parameter must be a list with two column names. {type(columns)}")
|
|
|
|
score_column = columns[0]
|
|
score_column_2 = columns[1]
|
|
|
|
buy_signal_column = config.get("names")[0]
|
|
sell_signal_column = config.get("names")[1]
|
|
|
|
# Both buy scores are greater than the corresponding thresholds
|
|
df[buy_signal_column] = \
|
|
(df[score_column] >= parameters.get("buy_signal_threshold")) & \
|
|
(df[score_column_2] >= parameters.get("buy_signal_threshold_2"))
|
|
|
|
# Both sell scores are smaller than the corresponding thresholds
|
|
df[sell_signal_column] = \
|
|
(df[score_column] <= parameters.get("sell_signal_threshold")) & \
|
|
(df[score_column_2] <= parameters.get("sell_signal_threshold_2"))
|
|
|
|
return df, [buy_signal_column, sell_signal_column]
|
|
|
|
|
|
# TODO: DEPRECATED TO BE REMOVED
|
|
def apply_rule_with_score_thresholds_2(df, score_column_names, model):
|
|
"""
|
|
Assume using difference combination with negative sell scores
|
|
"""
|
|
#two_dim_distance_threshold = model.get("two_dim_distance_threshold")
|
|
#distance = ((df[buy_score_column]*df[buy_score_column]) + (df[buy_score_column_2]*df[buy_score_column_2]))**0.5
|
|
#distance_signal = (distance >= two_dim_distance_threshold) # Far enough from the center
|
|
|
|
parameters = model.get("parameters", {})
|
|
|
|
score_column = score_column_names[0]
|
|
score_column_2 = score_column_names[1]
|
|
|
|
signal_column = model.get("signal_columns")[0]
|
|
signal_column_2 = model.get("signal_columns")[1]
|
|
|
|
# Both buy scores are greater than the corresponding thresholds
|
|
df[signal_column] = \
|
|
(df[score_column] >= parameters.get("buy_signal_threshold")) & \
|
|
(df[score_column_2] >= parameters.get("buy_signal_threshold_2"))
|
|
|
|
#if model.get("buy_signal_diff_threshold") is not None:
|
|
# small_increase = df[score_column].diff() <= parameters.get("buy_signal_diff_threshold")
|
|
# df[signal_column] = df[signal_column] & small_increase
|
|
|
|
# Both sell scores are smaller than the corresponding thresholds
|
|
df[signal_column_2] = \
|
|
(df[score_column] <= parameters.get("sell_signal_threshold")) & \
|
|
(df[score_column_2] <= parameters.get("sell_signal_threshold_2"))
|
|
|
|
#if model.get("sell_signal_diff_threshold") is not None:
|
|
# small_increase = df[score_column_2].diff() >= model.get("sell_signal_diff_threshold")
|
|
# df[signal_column] = df[signal_column] & small_increase
|
|
|
|
|
|
def apply_rule_with_score_thresholds_one_row(row, score_column_names, model):
|
|
"""
|
|
Same as above but applied to one row rather than data frame. It is used for online predictions.
|
|
|
|
Returns signals as a tuple with two values: buy_signal and sell_signal
|
|
"""
|
|
parameters = model.get("parameters", {})
|
|
|
|
score_column = score_column_names[0]
|
|
|
|
buy_score = row[score_column]
|
|
|
|
buy_signal = \
|
|
(buy_score >= parameters.get("buy_signal_threshold"))
|
|
sell_signal = \
|
|
(buy_score <= parameters.get("sell_signal_threshold"))
|
|
|
|
return buy_signal, sell_signal
|
|
|
|
|
|
def apply_rule_with_slope_thresholds(df, model, buy_score_column, sell_score_column):
|
|
"""
|
|
Experimental. Currently not used.
|
|
This rule type evaluates the score itself and also its slope.
|
|
"""
|
|
# df['buy_signal_column'] = (df['buy_score_column'] >= model.get("buy_signal_threshold")) & (df['buy_score_slope'].abs() <= model.get("buy_slope_threshold"))
|
|
# df['sell_signal_column'] = (df['sell_score_column'] >= model.get("sell_signal_threshold")) & (df['sell_score_slope'].abs() <= model.get("sell_slope_threshold"))
|
|
|
|
|
|
#
|
|
# Helper and exploration functions
|
|
#
|
|
|
|
def find_interval_precision(df: pd.DataFrame, label_column: str, score_column: str, threshold: float):
|
|
"""
|
|
Convert point-wise score/label pairs to interval-wise score/label.
|
|
|
|
We assume that for each point there is a score and a boolean label. The score can be a future
|
|
prediction while boolean label is whether this forecast is true. Or the score can be a prediction
|
|
that this is a top/bottom while the label is whether it is indeed so.
|
|
Importantly, the labels are supposed to represent contiguous intervals because the algorithm
|
|
will output results for them by aggregating scores within these intervals.
|
|
|
|
The output is a data frame with one row per contiguous interval. The intervals are interleaving
|
|
like true, false, true, false etc. Accordingly, there is one label column which takes these
|
|
values true, false etc. The score column for each interval is computed by using these rules:
|
|
- for true interval: true (positive) if there is at least one point with score higher than the threshold
|
|
- for true interval: false (positive) if all points are lower than the threshold
|
|
- for false interval: true (negative) if all points are lower than the threshold
|
|
- for false interval: false (negative) if there is at least one (wrong) points with the score higher than the thresond
|
|
Essentially, we need only one boolean "all lower" function
|
|
|
|
The input point-wise score is typically aggregated by applying some kind of rolling aggregation
|
|
but it is performed separately.
|
|
|
|
The function is supposed to be used for scoring during hyper-parameter search.
|
|
We can search in level, tolerance, threshold, aggregation hyper-paraemters (no forecasting parameters).
|
|
Or we can also search through various ML forecasting hyper-parameters like horizon etc.
|
|
In any case, after we selected hyper-parameters, we apply interval selection, score aggregation,
|
|
then apply this function, and finally computing the interval-wise score.
|
|
|
|
Input data frame is supposed to be sorted (important for the algorithm of finding contiguous intervals).
|
|
"""
|
|
|
|
#
|
|
# Count all intervals by finding them as groups of points. Input is a boolean column with interleaving true-false
|
|
# Mark true intervals (extremum) and false intervals (non-extremum)
|
|
#
|
|
|
|
# Find indexes with transfer from 0 to 1 (+1) and from 1 to 0 (-1)
|
|
out = df[label_column].diff()
|
|
out.iloc[0] = False # Assume no change
|
|
out = out.astype(int)
|
|
|
|
# Find groups (intervals, starts-stops) and assign true-false label to them
|
|
interval_no_column = 'interval_no'
|
|
df[interval_no_column] = out.cumsum()
|
|
|
|
#
|
|
# For each group (with true-false label), compute their interval-wise score (using all or none principle)
|
|
#
|
|
|
|
# First, compute "score lower" (it will be used during interval-based aggregation)
|
|
df[score_column + '_greater_than_threshold'] = (df[score_column] >= threshold)
|
|
|
|
# Interval objects
|
|
by_interval = df.groupby(interval_no_column)
|
|
|
|
# Find interval label
|
|
# Either 0 (all false) or 1 (at least one true - but must be all true)
|
|
interval_label = by_interval[label_column].max()
|
|
|
|
# Apply "all lower" function to each interval scores.
|
|
# Either 0 (all lower) or 1 (at least one higher)
|
|
interval_score = by_interval[score_column + '_greater_than_threshold'].max()
|
|
interval_score.name = score_column
|
|
|
|
# Compute into output
|
|
interval_df = pd.concat([interval_label, interval_score], axis=1)
|
|
interval_df = interval_df.reset_index(drop=False)
|
|
|
|
return interval_df
|
|
|
|
|
|
# NOT USED
|
|
def generate_signals(df, models: dict):
|
|
"""
|
|
Use predicted labels in the data frame to decide whether to buy or sell.
|
|
Use rule-based approach by comparing the predicted scores with some thresholds.
|
|
The decision is made for the last row only but we can use also previous data.
|
|
|
|
TODO: In future, values could be functions which return signal 1 or 0 when applied to a row
|
|
|
|
:param df: data frame with features which will be used to generate signals
|
|
:param models: dict where key is a signal name which is also an output column name and value a dict of parameters of the model
|
|
:return: A number of binary columns will be added each corresponding to one signal and having same name
|
|
"""
|
|
|
|
# Define one function for each signal type.
|
|
# A function applies a predicates by using the provided parameters and qualifies this row as true or false
|
|
# TODO: Access to model parameters and row has to be rubust and use default values (use get instead of [])
|
|
|
|
def all_higher_fn(row, model):
|
|
keys = model.keys()
|
|
for field, value in model.items():
|
|
if row.get(field) >= value:
|
|
continue
|
|
else:
|
|
return 0
|
|
return 1
|
|
|
|
def all_lower_fn(row, model):
|
|
keys = model.keys()
|
|
for field, value in model.items():
|
|
if row.get(field) <= value:
|
|
continue
|
|
else:
|
|
return 0
|
|
return 1
|
|
|
|
for signal, model in models.items():
|
|
# Choose function which implements (knows how to generate) this signal
|
|
fn = None
|
|
if signal == "buy":
|
|
fn = all_higher_fn
|
|
elif signal == "sell":
|
|
fn = all_lower_fn
|
|
else:
|
|
print("ERROR: Wrong use. Unexpected signal name.")
|
|
|
|
# Model will be passed as the second argument (the first one is the row)
|
|
df[signal] = df.apply(fn, axis=1, args=[model])
|
|
|
|
return models.keys()
|