intelligent-trading-bot/common/signal_generation.py

216 lines
7.6 KiB
Python
Raw Permalink Normal View History

2020-02-23 20:45:50 +01:00
from __future__ import annotations # Eliminates problem with type annotations like list[int]
import os
from datetime import datetime, timezone, timedelta
from typing import Union
import json
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn import metrics
from sklearn import neighbors
2021-03-07 10:44:32 +01:00
import statsmodels.api as sm
2020-02-23 20:45:50 +01:00
"""
Signals are binary features.
However, they are not trained but rather found using grid search by checking their overall performance during trading for some period
"""
2021-03-07 10:44:32 +01:00
def generate_score(df, feature_sets):
"""
Add a score column which aggregates different types of scores generated by various algorithms with different options.
The score is added as a new column and is supposed to be used by the signal generator as the final feature.
:param df:
:feature_sets: list of "kline", "futur" etc.
:return:
"""
if "kline" in feature_sets:
# high kline: 3 algorithms for all 3 levels
df["high_k"] = \
df["high_10_k_gb"] + df["high_10_k_nn"] + df["high_10_k_lc"] + \
df["high_15_k_gb"] + df["high_15_k_nn"] + df["high_15_k_lc"] + \
df["high_20_k_gb"] + df["high_20_k_nn"] + df["high_20_k_lc"]
df["high_k"] /= 9
# low kline: 3 algorithms for all 3 levels
df["low_k"] = \
df["low_10_k_gb"] + df["low_10_k_nn"] + df["low_10_k_lc"] + \
df["low_15_k_gb"] + df["low_15_k_nn"] + df["low_15_k_lc"] + \
df["low_20_k_gb"] + df["low_20_k_nn"] + df["low_20_k_lc"]
df["low_k"] /= 9
# By algorithm type
df["high_k_nn"] = (df["high_10_k_nn"] + df["high_15_k_nn"] + df["high_20_k_nn"]) / 3
df["low_k_nn"] = (df["low_10_k_nn"] + df["low_15_k_nn"] + df["low_20_k_nn"]) / 3
if "futur" in feature_sets:
# high futur: 3 algorithms for all 3 levels
df["high_f"] = \
df["high_10_f_gb"] + df["high_10_f_nn"] + df["high_10_f_lc"] + \
df["high_15_f_gb"] + df["high_15_f_nn"] + df["high_15_f_lc"] + \
df["high_20_f_gb"] + df["high_20_f_nn"] + df["high_20_f_lc"]
df["high_f"] /= 9
# low kline: 3 algorithms for all 3 levels
df["low_f"] = \
df["low_10_f_gb"] + df["low_10_f_nn"] + df["low_10_f_lc"] + \
df["low_15_f_gb"] + df["low_15_f_nn"] + df["low_15_f_lc"] + \
df["low_20_f_gb"] + df["low_20_f_nn"] + df["low_20_f_lc"]
df["low_f"] /= 9
# By algorithm type
df["high_f_nn"] = (df["high_10_f_nn"] + df["high_15_f_nn"] + df["high_20_f_nn"]) / 3
df["low_f_nn"] = (df["low_10_f_nn"] + df["low_15_f_nn"] + df["low_20_f_nn"]) / 3
# High and low
# Both k and f
#in_df["high"] = (in_df["high_k"] + in_df["high_f"]) / 2
#in_df["low"] = (in_df["low_k"] + in_df["low_f"]) / 2
# Only k and all algorithms
df["high"] = (df["high_k"])
df["low"] = (df["low_k"])
# Using one NN algorithm only
#in_df["high"] = (in_df["high_k_nn"])
#in_df["low"] = (in_df["low_k_nn"])
# Final score: proportion to the sum
high_and_low = df["high"] + df["low"]
df["score"] = ((df["high"] / high_and_low) * 2) - 1.0 # in [-1, +1]
# Final score: abs difference betwee high and low (scaled to [-1,+1] maybe)
#in_df["score"] = in_df["high"] - in_df["low"]
from sklearn.preprocessing import StandardScaler
#in_df["score"] = StandardScaler().fit_transform(in_df["score"])
#in_df["score"] = in_df["score"].rolling(window=10, min_periods=1).apply(np.nanmean)
return df
2021-10-23 21:51:31 +02:00
2021-03-07 10:44:32 +01:00
def train_score_forecast_model(sr, order):
model = sm.tsa.statespace.SARIMAX(sr, order=order, enforce_stationarity=True, enforce_invertibility=False)
model_fit = model.fit(disp=False)
print(model_fit.summary())
return model_fit
2021-10-23 21:51:31 +02:00
2021-03-07 10:44:32 +01:00
def rolling_forecast(history, sr, model_order, result_params):
"""
Make one-step-ahead predictions for all values by appending them to the history.
The return series has the same size.
Indexes correspond to the time for which forecast is made, that is, next time after appended value.
No forecast for the last appended value.
"""
model = sm.tsa.SARIMAX(history, order=model_order)
result = model.filter(result_params)
forecast = pd.Series(index=sr.index, dtype=float)
vals = result.forecast()
forecast.iloc[0] = vals.iloc[0]
for i in range(0, len(sr)-1):
# Can be slow because re-filtering is done for all data. >3 minutes 1_000 forecasts with 20_000 history
#result = result.append(sr.iloc[i:i+1])
# Re-filtering is done only for new data. 20 seconds 1_000 forecasts with 20_000 history
result = result.extend(sr.iloc[i:i+1])
vals = result.forecast()
# TODO: We might also return confidence interval
forecast.iloc[i+1] = vals.iloc[0] # Series with incremented index - next after last element of appended data
return forecast
2021-10-23 21:51:31 +02:00
2021-03-07 10:44:32 +01:00
def fitted_forecast(history, model_order, result_params):
"""
Given forecast model and data, find in-sample forecasts for all these values.
"""
model = sm.tsa.SARIMAX(history, order=model_order)
result = model.filter(result_params)
forecast = result.fittedvalues
return forecast
def search_forecast_model_order(sr):
"""Return best order."""
import pmdarima as pm
arima = pm.auto_arima(
sr,
error_action='ignore', trace=True, suppress_warnings=True,
#maxiter=100,
stepwise=False,
n_jobs=-1,
seasonal=False,
#p=1,
start_p=1, max_p=2,
d=0, stationary=True,
#q=1,
start_q=1, max_q=5,
P=0, D=0, Q=0,
max_order=None,
#out_of_sample_size=1_000,
)
print(arima.summary())
return arima
2021-10-23 21:51:31 +02:00
2021-03-07 10:44:32 +01:00
# NOT USED
2020-02-23 20:45:50 +01:00
def generate_signals(df, models: dict):
"""
Use predicted labels in the data frame to decide whether to buy or sell.
Use rule-based approach by comparing the predicted scores with some thresholds.
The decision is made for the last row only but we can use also previous data.
TODO: In future, values could be functions which return signal 1 or 0 when applied to a row
:param df: data frame with features which will be used to generate signals
:param models: dict where key is a signal name which is also an output column name and value a dict of parameters of the model
:return: A number of binary columns will be added each corresponding to one signal and having same name
"""
# Define one function for each signal type.
# A function applies a predicates by using the provided parameters and qualifies this row as true or false
# TODO: Access to model parameters and row has to be rubust and use default values (use get instead of [])
def all_higher_fn(row, model):
keys = model.keys()
for field, value in model.items():
if row.get(field) >= value:
continue
else:
return 0
return 1
def all_lower_fn(row, model):
keys = model.keys()
for field, value in model.items():
if row.get(field) <= value:
continue
else:
return 0
return 1
for signal, model in models.items():
# Choose function which implements (knows how to generate) this signal
fn = None
if signal == "buy":
fn = all_higher_fn
elif signal == "sell":
fn = all_lower_fn
else:
print("ERROR: Wrong use. Unexpected signal name.")
# Model will be passed as the second argument (the first one is the row)
df[signal] = df.apply(fn, axis=1, args=[model])
return models.keys()