intelligent-trading-bot/common/generators.py
2025-03-25 20:27:43 +01:00

307 lines
12 KiB
Python

from typing import Tuple
import numpy as np
import pandas as pd
from pandas.api.types import is_float_dtype, is_numeric_dtype, is_integer_dtype, is_string_dtype
from common.classifiers import *
from common.model_store import *
from common.gen_features import *
from common.gen_labels_highlow import generate_labels_highlow, generate_labels_highlow2
from common.gen_labels_topbot import generate_labels_topbot, generate_labels_topbot2
from common.gen_signals import (
generate_smoothen_scores, generate_combine_scores,
generate_threshold_rule, generate_threshold_rule2
)
from outputs.notifier_scores import *
from outputs.notifier_diagram import *
from outputs.notifier_trades import *
from outputs.trader_binance import *
def generate_feature_set(df: pd.DataFrame, fs: dict, last_rows: int) -> Tuple[pd.DataFrame, list]:
"""
Apply the specified resolved feature generator to the input data set.
"""
#
# Select columns from the data set to be processed by the feature generator
#
cp = fs.get("column_prefix")
if cp:
cp = cp + "_"
f_cols = [col for col in df if col.startswith(cp)]
f_df = df[f_cols] # Alternatively: f_df = df.loc[:, df.columns.str.startswith(cf)]
# Remove prefix because feature generators are generic (a prefix will be then added to derived features before adding them back to the main frame)
f_df = f_df.rename(columns=lambda x: x[len(cp):] if x.startswith(cp) else x) # Alternatively: f_df.columns = f_df.columns.str.replace(cp, "")
else:
f_df = df[df.columns.to_list()] # We want to have a different data frame object to add derived featuers and then join them back to the main frame with prefix
#
# Resolve and apply feature generator functions from the configuration
#
generator = fs.get("generator")
gen_config = fs.get('config', {})
if generator == "itblib":
features = generate_features_itblib(f_df, gen_config, last_rows=last_rows)
elif generator == "depth":
features = generate_features_depth(f_df)
elif generator == "tsfresh":
features = generate_features_tsfresh(f_df, gen_config, last_rows=last_rows)
elif generator == "talib":
features = generate_features_talib(f_df, gen_config, last_rows=last_rows)
elif generator == "itbstats":
features = generate_features_itbstats(f_df, gen_config, last_rows=last_rows)
# Labels
elif generator == "highlow":
horizon = gen_config.get("horizon")
# Binary labels whether max has exceeded a threshold or not
print(f"Generating 'highlow' labels with horizon {horizon}...")
features = generate_labels_highlow(f_df, horizon=horizon)
print(f"Finished generating 'highlow' labels. {len(features)} labels generated.")
elif generator == "highlow2":
print(f"Generating 'highlow2' labels...")
f_df, features = generate_labels_highlow2(f_df, gen_config)
print(f"Finished generating 'highlow2' labels. {len(features)} labels generated.")
elif generator == "topbot":
column_name = gen_config.get("columns", "close")
top_level_fracs = [0.01, 0.02, 0.03, 0.04, 0.05]
bot_level_fracs = [-x for x in top_level_fracs]
f_df, features = generate_labels_topbot(f_df, column_name, top_level_fracs, bot_level_fracs)
elif generator == "topbot2":
f_df, features = generate_labels_topbot2(f_df, gen_config)
# Signals
elif generator == "smoothen":
f_df, features = generate_smoothen_scores(f_df, gen_config)
elif generator == "combine":
f_df, features = generate_combine_scores(f_df, gen_config)
elif generator == "threshold_rule":
f_df, features = generate_threshold_rule(f_df, gen_config)
elif generator == "threshold_rule2":
f_df, features = generate_threshold_rule2(f_df, gen_config)
else:
# Resolve generator name to a function reference
generator_fn = resolve_generator_name(generator)
if generator_fn is None:
raise ValueError(f"Unknown feature generator name or name cannot be resolved: {generator}")
# Call this function
f_df, features = generator_fn(f_df, gen_config)
#
# Add generated features to the main data frame with all other columns and features
#
f_df = f_df[features]
fp = fs.get("feature_prefix")
if fp:
f_df = f_df.add_prefix(fp + "_")
new_features = f_df.columns.to_list()
# Delete new columns if they already exist
df.drop(list(set(df.columns) & set(new_features)), axis=1, inplace=True)
df = df.join(f_df) # Attach all derived features to the main frame
return df, new_features
def predict_feature_set(df, fs, config, models: dict):
labels_all = config.get("labels", [])
labels = fs.get("config").get("labels", [])
if not labels:
labels = labels_all
algorithms_all = config.get("algorithms")
algorithms_str = fs.get("config").get("functions", [])
if not algorithms_str:
algorithms_str = fs.get("config").get("algorithms", [])
# The algorithms can be either strings (names) or dicts (definitions) so we resolve the names
algorithms = []
for alg in algorithms_str:
if isinstance(alg, str): # Find in the list of algorithms
alg = find_algorithm_by_name(algorithms_all, alg)
elif not isinstance(alg, dict):
raise ValueError(f"Algorithm has to be either dict or name")
algorithms.append(alg)
if not algorithms:
algorithms = algorithms_all
train_features_all = config.get("train_features", [])
train_features = fs.get("config").get("columns", [])
if not train_features:
train_features = fs.get("config").get("features", [])
if not train_features:
train_features = train_features_all
train_df = df[train_features]
features = []
scores = dict()
out_df = pd.DataFrame(index=train_df.index) # Collect predictions
for label in labels:
for model_config in algorithms:
algo_name = model_config.get("name")
algo_type = model_config.get("algo")
score_column_name = label + label_algo_separator + algo_name
# It is an entry from loaded model dict
model_pair = models.get(score_column_name) # Trained model from model registry
print(f"Predict '{score_column_name}'. Algorithm {algo_name}. Label: {label}. Train length {len(train_df)}. Train columns {len(train_df.columns)}")
if algo_type == "gb":
df_y_hat = predict_gb(model_pair, train_df, model_config)
elif algo_type == "nn":
df_y_hat = predict_nn(model_pair, train_df, model_config)
elif algo_type == "lc":
df_y_hat = predict_lc(model_pair, train_df, model_config)
elif algo_type == "svc":
df_y_hat = predict_svc(model_pair, train_df, model_config)
else:
raise ValueError(f"Unknown algorithm type '{algo_type}'")
out_df[score_column_name] = df_y_hat
features.append(score_column_name)
# For each new score, compare it with the label true values
if label in df:
df_y = df[label]
if is_float_dtype(df_y) and is_float_dtype(df_y_hat):
scores[score_column_name] = compute_scores_regression(df_y, df_y_hat) # Regression stores
else:
scores[score_column_name] = compute_scores(df_y, df_y_hat) # Classification stores
return out_df, features, scores
def train_feature_set(df, fs, config):
labels_all = config.get("labels", [])
labels = fs.get("config").get("labels", [])
if not labels:
labels = labels_all
algorithms_all = config.get("algorithms")
algorithms_str = fs.get("config").get("functions", [])
if not algorithms_str:
algorithms_str = fs.get("config").get("algorithms", [])
# The algorithms can be either strings (names) or dicts (definitions) so we resolve the names
algorithms = []
for alg in algorithms_str:
if isinstance(alg, str): # Find in the list of algorithms
alg = find_algorithm_by_name(algorithms_all, alg)
elif not isinstance(alg, dict):
raise ValueError(f"Algorithm has to be either dict or name")
algorithms.append(alg)
if not algorithms:
algorithms = algorithms_all
train_features_all = config.get("train_features", [])
train_features = fs.get("config").get("columns", [])
if not train_features:
train_features = fs.get("config").get("features", [])
if not train_features:
train_features = train_features_all
# Only for train mode
df = df.dropna(subset=train_features).reset_index(drop=True)
df = df.dropna(subset=labels).reset_index(drop=True)
models = dict()
scores = dict()
out_df = pd.DataFrame() # Collect predictions
for label in labels:
for model_config in algorithms:
algo_name = model_config.get("name")
algo_type = model_config.get("algo")
score_column_name = label + label_algo_separator + algo_name
# Limit length according to the algorith train parameters
algo_every_nth_row = model_config.get("params", {}).get("every_nth_row")
if algo_every_nth_row:
train_df = df.iloc[::algo_every_nth_row, :]
else:
train_df = df
algo_train_length = model_config.get("params", {}).get("length")
if algo_train_length:
train_df = train_df.tail(algo_train_length)
df_X = train_df[train_features]
df_y = train_df[label]
print(f"Train '{score_column_name}'. Algorithm {algo_name}. Label: {label}. Train length {len(df_X)}. Train columns {len(df_X.columns)}")
if algo_type == "gb":
model_pair = train_gb(df_X, df_y, model_config)
models[score_column_name] = model_pair
df_y_hat = predict_gb(model_pair, df_X, model_config)
elif algo_type == "nn":
model_pair = train_nn(df_X, df_y, model_config)
models[score_column_name] = model_pair
df_y_hat = predict_nn(model_pair, df_X, model_config)
elif algo_type == "lc":
model_pair = train_lc(df_X, df_y, model_config)
models[score_column_name] = model_pair
df_y_hat = predict_lc(model_pair, df_X, model_config)
elif algo_type == "svc":
model_pair = train_svc(df_X, df_y, model_config)
models[score_column_name] = model_pair
df_y_hat = predict_svc(model_pair, df_X, model_config)
else:
print(f"ERROR: Unknown algorithm type {algo_type}. Check algorithm list.")
return
out_df[score_column_name] = df_y_hat
if is_float_dtype(df_y) and is_float_dtype(df_y_hat):
scores[score_column_name] = compute_scores_regression(df_y, df_y_hat) # Regression stores
else:
scores[score_column_name] = compute_scores(df_y, df_y_hat) # Classification stores
return out_df, models, scores
async def output_feature_set(df, fs: dict, config: dict):
#
# Resolve and apply feature generator functions from the configuration
#
generator = fs.get("generator")
gen_config = fs.get('config', {})
if generator == "score_notification_model":
generator_fn = send_score_notification
elif generator == "diagram_notification_model":
generator_fn = send_diagram
elif generator == "trader_simulation":
generator_fn = trader_simulation
elif generator == "trader_binance":
generator_fn = trader_binance
else:
# Resolve generator name to a function reference
generator_fn = resolve_generator_name(generator)
if generator_fn is None:
raise ValueError(f"Unknown feature generator name or name cannot be resolved: {generator}")
# Call the resolved function
if asyncio.iscoroutinefunction(generator_fn):
if asyncio.get_running_loop():
await generator_fn(df, gen_config, config)
else:
asyncio.run(generator_fn(df, gen_config, config))
else:
generator_fn(df, gen_config, config)