from __future__ import annotations # Eliminates problem with type annotations like list[int] import os from datetime import datetime, timezone, timedelta from typing import Union import json import numpy as np import pandas as pd from sklearn.model_selection import train_test_split from sklearn import metrics from sklearn import neighbors import statsmodels.api as sm """ Signals are binary features. However, they are not trained but rather found using grid search by checking their overall performance during trading for some period """ def generate_score(df, feature_sets): """ Add a score column which aggregates different types of scores generated by various algorithms with different options. The score is added as a new column and is supposed to be used by the signal generator as the final feature. :param df: :feature_sets: list of "kline", "futur" etc. :return: """ if "kline" in feature_sets: # high kline: 3 algorithms for all 3 levels df["high_k"] = \ df["high_10_k_gb"] + df["high_10_k_nn"] + df["high_10_k_lc"] + \ df["high_15_k_gb"] + df["high_15_k_nn"] + df["high_15_k_lc"] + \ df["high_20_k_gb"] + df["high_20_k_nn"] + df["high_20_k_lc"] df["high_k"] /= 9 # low kline: 3 algorithms for all 3 levels df["low_k"] = \ df["low_10_k_gb"] + df["low_10_k_nn"] + df["low_10_k_lc"] + \ df["low_15_k_gb"] + df["low_15_k_nn"] + df["low_15_k_lc"] + \ df["low_20_k_gb"] + df["low_20_k_nn"] + df["low_20_k_lc"] df["low_k"] /= 9 # By algorithm type df["high_k_nn"] = (df["high_10_k_nn"] + df["high_15_k_nn"] + df["high_20_k_nn"]) / 3 df["low_k_nn"] = (df["low_10_k_nn"] + df["low_15_k_nn"] + df["low_20_k_nn"]) / 3 if "futur" in feature_sets: # high futur: 3 algorithms for all 3 levels df["high_f"] = \ df["high_10_f_gb"] + df["high_10_f_nn"] + df["high_10_f_lc"] + \ df["high_15_f_gb"] + df["high_15_f_nn"] + df["high_15_f_lc"] + \ df["high_20_f_gb"] + df["high_20_f_nn"] + df["high_20_f_lc"] df["high_f"] /= 9 # low kline: 3 algorithms for all 3 levels df["low_f"] = \ df["low_10_f_gb"] + df["low_10_f_nn"] + df["low_10_f_lc"] + \ df["low_15_f_gb"] + df["low_15_f_nn"] + df["low_15_f_lc"] + \ df["low_20_f_gb"] + df["low_20_f_nn"] + df["low_20_f_lc"] df["low_f"] /= 9 # By algorithm type df["high_f_nn"] = (df["high_10_f_nn"] + df["high_15_f_nn"] + df["high_20_f_nn"]) / 3 df["low_f_nn"] = (df["low_10_f_nn"] + df["low_15_f_nn"] + df["low_20_f_nn"]) / 3 # High and low # Both k and f #in_df["high"] = (in_df["high_k"] + in_df["high_f"]) / 2 #in_df["low"] = (in_df["low_k"] + in_df["low_f"]) / 2 # Only k and all algorithms df["high"] = (df["high_k"]) df["low"] = (df["low_k"]) # Using one NN algorithm only #in_df["high"] = (in_df["high_k_nn"]) #in_df["low"] = (in_df["low_k_nn"]) # Final score: proportion to the sum high_and_low = df["high"] + df["low"] df["score"] = ((df["high"] / high_and_low) * 2) - 1.0 # in [-1, +1] # Final score: abs difference betwee high and low (scaled to [-1,+1] maybe) #in_df["score"] = in_df["high"] - in_df["low"] from sklearn.preprocessing import StandardScaler #in_df["score"] = StandardScaler().fit_transform(in_df["score"]) #in_df["score"] = in_df["score"].rolling(window=10, min_periods=1).apply(np.nanmean) return df def train_score_forecast_model(sr, order): model = sm.tsa.statespace.SARIMAX(sr, order=order, enforce_stationarity=True, enforce_invertibility=False) model_fit = model.fit(disp=False) print(model_fit.summary()) return model_fit def rolling_forecast(history, sr, model_order, result_params): """ Make one-step-ahead predictions for all values by appending them to the history. The return series has the same size. Indexes correspond to the time for which forecast is made, that is, next time after appended value. No forecast for the last appended value. """ model = sm.tsa.SARIMAX(history, order=model_order) result = model.filter(result_params) forecast = pd.Series(index=sr.index, dtype=float) vals = result.forecast() forecast.iloc[0] = vals.iloc[0] for i in range(0, len(sr)-1): # Can be slow because re-filtering is done for all data. >3 minutes 1_000 forecasts with 20_000 history #result = result.append(sr.iloc[i:i+1]) # Re-filtering is done only for new data. 20 seconds 1_000 forecasts with 20_000 history result = result.extend(sr.iloc[i:i+1]) vals = result.forecast() # TODO: We might also return confidence interval forecast.iloc[i+1] = vals.iloc[0] # Series with incremented index - next after last element of appended data return forecast def fitted_forecast(history, model_order, result_params): """ Given forecast model and data, find in-sample forecasts for all these values. """ model = sm.tsa.SARIMAX(history, order=model_order) result = model.filter(result_params) forecast = result.fittedvalues return forecast def search_forecast_model_order(sr): """Return best order.""" import pmdarima as pm arima = pm.auto_arima( sr, error_action='ignore', trace=True, suppress_warnings=True, #maxiter=100, stepwise=False, n_jobs=-1, seasonal=False, #p=1, start_p=1, max_p=2, d=0, stationary=True, #q=1, start_q=1, max_q=5, P=0, D=0, Q=0, max_order=None, #out_of_sample_size=1_000, ) print(arima.summary()) return arima # NOT USED def generate_signals(df, models: dict): """ Use predicted labels in the data frame to decide whether to buy or sell. Use rule-based approach by comparing the predicted scores with some thresholds. The decision is made for the last row only but we can use also previous data. TODO: In future, values could be functions which return signal 1 or 0 when applied to a row :param df: data frame with features which will be used to generate signals :param models: dict where key is a signal name which is also an output column name and value a dict of parameters of the model :return: A number of binary columns will be added each corresponding to one signal and having same name """ # Define one function for each signal type. # A function applies a predicates by using the provided parameters and qualifies this row as true or false # TODO: Access to model parameters and row has to be rubust and use default values (use get instead of []) def all_higher_fn(row, model): keys = model.keys() for field, value in model.items(): if row.get(field) >= value: continue else: return 0 return 1 def all_lower_fn(row, model): keys = model.keys() for field, value in model.items(): if row.get(field) <= value: continue else: return 0 return 1 for signal, model in models.items(): # Choose function which implements (knows how to generate) this signal fn = None if signal == "buy": fn = all_higher_fn elif signal == "sell": fn = all_lower_fn else: print("ERROR: Wrong use. Unexpected signal name.") # Model will be passed as the second argument (the first one is the row) df[signal] = df.apply(fn, axis=1, args=[model]) return models.keys()