intelligent-trading-bot/common/classifiers.py

567 lines
16 KiB
Python

from typing import List
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn import metrics
from sklearn.model_selection import ParameterGrid
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression, SGDClassifier
from sklearn.svm import SVC, SVR
import lightgbm as lgbm
import tensorflow as tf
from tensorflow import keras
from keras.optimizers import *
from keras.models import Sequential
from keras.layers import Dense, Dropout
from keras.regularizers import *
from keras.callbacks import *
#
# GB
#
def train_predict_gb(df_X, df_y, df_X_test, model_config: dict):
"""
Train model with the specified hyper-parameters and return its predictions for the test data.
"""
model_pair = train_gb(df_X, df_y, model_config)
y_test_hat = predict_gb(model_pair, df_X_test, model_config)
return y_test_hat
def train_gb(df_X, df_y, model_config: dict):
"""
Train model with the specified hyper-parameters and return this model (and scaler if any).
"""
params = model_config.get("params", {})
is_scale = params.get("is_scale", False)
is_regression = params.get("is_regression", False)
#
# Scale
#
if is_scale:
scaler = StandardScaler()
scaler.fit(df_X)
X_train = scaler.transform(df_X)
else:
scaler = None
X_train = df_X.values
y_train = df_y.values
#
# Create model
#
train_conf = model_config.get("train", {})
objective = train_conf.get("objective")
max_depth = train_conf.get("max_depth")
learning_rate = train_conf.get("learning_rate")
num_boost_round = train_conf.get("num_boost_round")
lambda_l1 = train_conf.get("lambda_l1")
lambda_l2 = train_conf.get("lambda_l2")
lgbm_params = {
'learning_rate': learning_rate,
'max_depth': max_depth, # Can be -1
#"n_estimators": 10000,
#"min_split_gain": params['min_split_gain'],
"min_data_in_leaf": int(0.01*len(df_X)), # Best: ~0.02 * len() - 2% of size
#'subsample': 0.8,
#'colsample_bytree': 0.8,
'num_leaves': 32, # or (2 * 2**max_depth)
#"bagging_freq": 5,
#"bagging_fraction": 0.4,
#"feature_fraction": 0.05,
# gamma=0.1 ???
"lambda_l1": lambda_l1,
"lambda_l2": lambda_l2,
'is_unbalance': 'true',
# 'scale_pos_weight': scale_pos_weight, # is_unbalance must be false
'boosting_type': 'gbdt', # dart (slow but best, worse than gbdt), goss, gbdt
'objective': objective, # binary cross_entropy cross_entropy_lambda
'metric': {'cross_entropy'}, # auc auc_mu map (mean_average_precision) cross_entropy binary_logloss cross_entropy_lambda binary_error
'verbose': 0,
}
model = lgbm.train(
lgbm_params,
train_set=lgbm.Dataset(X_train, y_train),
num_boost_round=num_boost_round,
#valid_sets=[lgbm.Dataset(X_validate, y_validate)],
#early_stopping_rounds=int(num_boost_round / 5),
#verbose_eval=100,
)
return (model, scaler)
def predict_gb(models: tuple, df_X_test, model_config: dict):
"""
Use the model(s) to make predictions for the test data.
The first model is a prediction model and the second model (optional) is a scaler.
"""
#
# Scale
#
scaler = models[1]
is_scale = scaler is not None
input_index = df_X_test.index
if is_scale:
df_X_test = scaler.transform(df_X_test)
df_X_test = pd.DataFrame(data=df_X_test, index=input_index)
else:
df_X_test = df_X_test
df_X_test_nonans = df_X_test.dropna() # Drop nans, possibly create gaps in index
nonans_index = df_X_test_nonans.index
y_test_hat_nonans = models[0].predict(df_X_test_nonans.values)
y_test_hat_nonans = pd.Series(data=y_test_hat_nonans, index=nonans_index) # Attach indexes with gaps
df_ret = pd.DataFrame(index=input_index) # Create empty dataframe with original index
df_ret["y_hat"] = y_test_hat_nonans # Join using indexes
sr_ret = df_ret["y_hat"] # This series has all original input indexes but NaNs where input is NaN
return sr_ret
#
# NN
#
def train_predict_nn(df_X, df_y, df_X_test, model_config: dict):
"""
Train model with the specified hyper-parameters and return its predictions for the test data.
"""
model_pair = train_nn(df_X, df_y, model_config)
y_test_hat = predict_nn(model_pair, df_X_test, model_config)
return y_test_hat
def train_nn(df_X, df_y, model_config: dict):
"""
Train model with the specified hyper-parameters and return this model (and scaler if any).
"""
params = model_config.get("params", {})
is_scale = params.get("is_scale", True)
is_regression = params.get("is_regression", False)
#
# Scale
#
if is_scale:
scaler = StandardScaler()
scaler.fit(df_X)
X_train = scaler.transform(df_X)
else:
scaler = None
X_train = df_X.values
y_train = df_y.values
#
# Create model
#
n_features = X_train.shape[1]
layers = params.get("layers") # List of ints
if not layers:
layers = [n_features // 4] # Default
if not isinstance(layers, list):
layers = [layers]
# Topology
model = Sequential()
# sigmoid, relu, tanh, selu, elu, exponential
# kernel_regularizer=l2(0.001)
reg_l2 = 0.001
train_conf = model_config.get("train", {})
learning_rate = train_conf.get("learning_rate")
n_epochs = train_conf.get("n_epochs")
batch_size = train_conf.get("bs")
for i, out_features in enumerate(layers):
in_features = n_features if i == 0 else layers[i-1]
model.add(Dense(out_features, activation='sigmoid', input_dim=in_features)) # , kernel_regularizer=l2(reg_l2)
#model.add(Dropout(rate=0.5))
if is_regression:
model.add(Dense(units=1))
model.compile(
loss='mean_squared_error',
optimizer=Adam(learning_rate=learning_rate),
metrics=[
tf.keras.metrics.MeanAbsoluteError(name="mean_absolute_error"),
tf.keras.metrics.MeanAbsolutePercentageError(name="mean_absolute_percentage_error"),
tf.keras.metrics.R2Score(name="r2_score"),
],
)
else:
model.add(Dense(units=1, activation='sigmoid'))
model.compile(
loss='binary_crossentropy',
optimizer=Adam(learning_rate=learning_rate),
metrics=[
tf.keras.metrics.AUC(name="auc"),
tf.keras.metrics.Precision(name="precision"),
tf.keras.metrics.Recall(name="recall"),
],
)
#model.summary()
# Default arguments for early stopping
es_args = dict(
monitor = "loss", # val_loss loss
min_delta = 0.00001, # Minimum change qualified as improvement
patience = 5, # Number of epochs with no improvements
verbose = 0,
mode = 'auto',
)
es_args.update(train_conf.get("es", {})) # Overwrite default values with those explicitly specified in config
es = EarlyStopping(**es_args)
#
# Train
#
model.fit(
X_train,
y_train,
batch_size=batch_size,
epochs=n_epochs,
#validation_split=0.05,
#validation_data=(X_validate, y_validate),
#class_weight={0: 1, 1: 20},
callbacks=[es],
verbose=1,
)
return (model, scaler)
def predict_nn(models: tuple, df_X_test, model_config: dict):
"""
Use the model(s) to make predictions for the test data.
The first model is a prediction model and the second model (optional) is a scaler.
"""
#
# Scale
#
scaler = models[1]
is_scale = scaler is not None
input_index = df_X_test.index
if is_scale:
df_X_test = scaler.transform(df_X_test)
df_X_test = pd.DataFrame(data=df_X_test, index=input_index)
else:
df_X_test = df_X_test
df_X_test_nonans = df_X_test.dropna() # Drop nans, possibly create gaps in index
nonans_index = df_X_test_nonans.index
# Resets all (global) state generated by Keras
# Important if prediction is executed in a loop to avoid memory leak
tf.keras.backend.clear_session()
y_test_hat_nonans = models[0].predict_on_batch(df_X_test_nonans.values) # NN returns matrix with one column as prediction
y_test_hat_nonans = y_test_hat_nonans[:, 0] # Or y_test_hat.flatten()
y_test_hat_nonans = pd.Series(data=y_test_hat_nonans, index=nonans_index) # Attach indexes with gaps
df_ret = pd.DataFrame(index=input_index) # Create empty dataframe with original index
df_ret["y_hat"] = y_test_hat_nonans # Join using indexes
sr_ret = df_ret["y_hat"] # This series has all original input indexes but NaNs where input is NaN
return sr_ret
#
# LC - Linear Classifier
#
def train_predict_lc(df_X, df_y, df_X_test, model_config: dict):
"""
Train model with the specified hyper-parameters and return its predictions for the test data.
"""
model_pair = train_lc(df_X, df_y, model_config)
y_test_hat = predict_lc(model_pair, df_X_test, model_config)
return y_test_hat
def train_lc(df_X, df_y, model_config: dict):
"""
Train model with the specified hyper-parameters and return this model (and scaler if any).
"""
params = model_config.get("params", {})
is_scale = params.get("is_scale", True)
is_regression = params.get("is_regression", False)
#
# Scale
#
if is_scale:
scaler = StandardScaler()
scaler.fit(df_X)
X_train = scaler.transform(df_X)
else:
scaler = None
X_train = df_X.values
y_train = df_y.values
#
# Create model
#
train_conf = model_config.get("train", {})
args = train_conf.copy()
args["n_jobs"] = -1
args["verbose"] = 0
model = LogisticRegression(**args)
#
# Train
#
model.fit(X_train, y_train)
return (model, scaler)
def predict_lc(models: tuple, df_X_test, model_config: dict):
"""
Use the model(s) to make predictions for the test data.
The first model is a prediction model and the second model (optional) is a scaler.
"""
#
# Scale
#
scaler = models[1]
is_scale = scaler is not None
input_index = df_X_test.index
if is_scale:
df_X_test = scaler.transform(df_X_test)
df_X_test = pd.DataFrame(data=df_X_test, index=input_index)
else:
df_X_test = df_X_test
df_X_test_nonans = df_X_test.dropna() # Drop nans, possibly create gaps in index
nonans_index = df_X_test_nonans.index
y_test_hat_nonans = models[0].predict_proba(df_X_test_nonans.values) # It returns pairs or probas for 0 and 1
y_test_hat_nonans = y_test_hat_nonans[:, 1] # Or y_test_hat.flatten()
y_test_hat_nonans = pd.Series(data=y_test_hat_nonans, index=nonans_index) # Attach indexes with gaps
df_ret = pd.DataFrame(index=input_index) # Create empty dataframe with original index
df_ret["y_hat"] = y_test_hat_nonans # Join using indexes
sr_ret = df_ret["y_hat"] # This series has all original input indexes but NaNs where input is NaN
return sr_ret
#
# SVC - SVN Classifier
#
def train_predict_svc(df_X, df_y, df_X_test, model_config: dict):
"""
Train model with the specified hyper-parameters and return its predictions for the test data.
"""
model_pair = train_svc(df_X, df_y, model_config)
y_test_hat = predict_svc(model_pair, df_X_test, model_config)
return y_test_hat
def train_svc(df_X, df_y, model_config: dict):
"""
Train model with the specified hyper-parameters and return this model (and scaler if any).
"""
params = model_config.get("params", {})
is_scale = params.get("is_scale", True)
is_regression = params.get("is_regression", False)
#
# Prepare data
#
if is_scale:
scaler = StandardScaler()
scaler.fit(df_X)
X_train = scaler.transform(df_X)
else:
scaler = None
X_train = df_X.values
y_train = df_y.values
#
# Create model
#
train_conf = model_config.get("train", {})
args = train_conf.copy()
if is_regression:
model = SVR(**args)
else:
args['probability'] = True # Required if we are going to use predict_proba()
model = SVC(**args)
#
# Train
#
model.fit(X_train, y_train)
return (model, scaler)
def predict_svc(models: tuple, df_X_test, model_config: dict):
"""
Use the model(s) to make predictions for the test data.
The first model is a prediction model and the second model (optional) is a scaler.
"""
is_regression = model_config.get("params", {}).get("is_regression", False)
#
# Scale
#
scaler = models[1]
is_scale = scaler is not None
input_index = df_X_test.index
if is_scale:
df_X_test = scaler.transform(df_X_test)
df_X_test = pd.DataFrame(data=df_X_test, index=input_index)
else:
df_X_test = df_X_test
df_X_test_nonans = df_X_test.dropna() # Drop nans, possibly create gaps in index
nonans_index = df_X_test_nonans.index
if is_regression:
y_test_hat_nonans = models[0].predict(df_X_test_nonans.values)
else:
y_test_hat_nonans = models[0].predict_proba(df_X_test_nonans.values) # It returns pairs or probas for 0 and 1
y_test_hat_nonans = y_test_hat_nonans[:, 1] # Or y_test_hat.flatten()
y_test_hat_nonans = pd.Series(data=y_test_hat_nonans, index=nonans_index) # Attach indexes with gaps
df_ret = pd.DataFrame(index=input_index) # Create empty dataframe with original index
df_ret["y_hat"] = y_test_hat_nonans # Join using indexes
sr_ret = df_ret["y_hat"] # This series has all original input indexes but NaNs where input is NaN
return sr_ret
#
# Utils
#
def compute_scores(y_true, y_hat):
"""Compute several scores and return them as dict."""
y_true = y_true.astype(int)
y_hat_class = np.where(y_hat.values > 0.5, 1, 0)
try:
auc = metrics.roc_auc_score(y_true, y_hat.fillna(value=0))
except ValueError:
auc = 0.0 # Only one class is present (if dataset is too small, e.g,. when debugging) or Nulls in predictions
try:
ap = metrics.average_precision_score(y_true, y_hat.fillna(value=0))
except ValueError:
ap = 0.0 # Only one class is present (if dataset is too small, e.g,. when debugging) or Nulls in predictions
f1 = metrics.f1_score(y_true, y_hat_class)
precision = metrics.precision_score(y_true, y_hat_class)
recall = metrics.recall_score(y_true, y_hat_class)
scores = dict(
auc=round(auc, 3),
ap=round(ap, 3),
f1=round(f1, 3),
precision=round(precision, 3),
recall=round(recall, 3),
)
return scores
def compute_scores_regression(y_true, y_hat):
"""Compute regression scores. Input columns must have numeric data type."""
try:
mae = metrics.mean_absolute_error(y_true, y_hat)
except ValueError:
mae = np.nan
try:
mape = metrics.mean_absolute_percentage_error(y_true, y_hat)
except ValueError:
mape = np.nan
try:
r2 = metrics.r2_score(y_true, y_hat)
except ValueError:
r2 = np.nan
#
# How good it is in predicting the sign (increase of decrease)
#
y_true_class = np.where(y_true.values > 0.0, +1, -1)
y_hat_class = np.where(y_hat.values > 0.0, +1, -1)
try:
auc = metrics.roc_auc_score(y_true_class, y_hat_class)
except ValueError:
auc = 0.0 # Only one class is present (if dataset is too small, e.g,. when debugging) or Nulls in predictions
try:
ap = metrics.average_precision_score(y_true_class, y_hat_class)
except ValueError:
ap = 0.0 # Only one class is present (if dataset is too small, e.g,. when debugging) or Nulls in predictions
f1 = metrics.f1_score(y_true_class, y_hat_class)
precision = metrics.precision_score(y_true_class, y_hat_class)
recall = metrics.recall_score(y_true_class, y_hat_class)
scores = dict(
mae=round(mae, 3),
mape=round(mape, 3),
r2=round(r2, 3),
auc=round(auc, 3),
ap=round(ap, 3),
f1=round(f1, 3),
precision=round(precision, 3),
recall=round(recall, 3),
)
return scores
if __name__ == '__main__':
pass