intelligent-trading-bot/common/generators.py

278 lines
12 KiB
Python
Raw Permalink Normal View History

2023-12-10 11:51:26 +01:00
from typing import Tuple
import asyncio
2023-12-10 11:51:26 +01:00
import numpy as np
import pandas as pd
import pandas.api.types as ptypes
2023-12-10 11:51:26 +01:00
from common.types import Venue
from common.utils import *
from common.model_store import *
2023-12-10 11:51:26 +01:00
from common.gen_features import *
from common.gen_labels_highlow import generate_labels_highlow, generate_labels_highlow2
from common.gen_labels_topbot import generate_labels_topbot, generate_labels_topbot2
from common.gen_signals import (
generate_smoothen_scores, generate_combine_scores,
generate_threshold_rule, generate_threshold_rule2
)
def generate_feature_set(df: pd.DataFrame, fs: dict, config: dict, model_store: ModelStore, last_rows: int) -> Tuple[pd.DataFrame, list]:
2023-12-10 11:51:26 +01:00
"""
Apply the specified resolved feature generator to the input data set.
"""
#
# Select columns from the data set to be processed by the feature generator
#
cp = fs.get("column_prefix")
if cp:
cp = cp + "_"
f_cols = [col for col in df if col.startswith(cp)]
f_df = df[f_cols] # Alternatively: f_df = df.loc[:, df.columns.str.startswith(cf)]
# Remove prefix because feature generators are generic (a prefix will be then added to derived features before adding them back to the main frame)
f_df = f_df.rename(columns=lambda x: x[len(cp):] if x.startswith(cp) else x) # Alternatively: f_df.columns = f_df.columns.str.replace(cp, "")
else:
f_df = df[df.columns.to_list()] # We want to have a different data frame object to add derived features and then join them back to the main frame with prefix
2023-12-10 11:51:26 +01:00
#
# Resolve and apply feature generator functions from the configuration
#
generator = fs.get("generator")
gen_config = fs.get('config', {})
if generator == "itblib":
features = generate_features_itblib(f_df, gen_config, last_rows=last_rows)
elif generator == "depth":
features = generate_features_depth(f_df)
elif generator == "tsfresh":
features = generate_features_tsfresh(f_df, gen_config, last_rows=last_rows)
elif generator == "talib":
features = generate_features_talib(f_df, gen_config, last_rows=last_rows)
elif generator == "itbstats":
features = generate_features_itbstats(f_df, gen_config, last_rows=last_rows)
# Labels
elif generator == "highlow":
horizon = gen_config.get("horizon")
# Binary labels whether max has exceeded a threshold or not
print(f"Generating 'highlow' labels with horizon {horizon}...")
features = generate_labels_highlow(f_df, horizon=horizon)
print(f"Finished generating 'highlow' labels. {len(features)} labels generated.")
elif generator == "highlow2":
print(f"Generating 'highlow2' labels...")
f_df, features = generate_labels_highlow2(f_df, gen_config)
print(f"Finished generating 'highlow2' labels. {len(features)} labels generated.")
elif generator == "topbot":
column_name = gen_config.get("columns", "close")
top_level_fracs = [0.01, 0.02, 0.03, 0.04, 0.05]
bot_level_fracs = [-x for x in top_level_fracs]
f_df, features = generate_labels_topbot(f_df, column_name, top_level_fracs, bot_level_fracs)
elif generator == "topbot2":
f_df, features = generate_labels_topbot2(f_df, gen_config)
# Signals
elif generator == "smoothen":
f_df, features = generate_smoothen_scores(f_df, gen_config)
elif generator == "combine":
f_df, features = generate_combine_scores(f_df, gen_config)
elif generator == "threshold_rule":
f_df, features = generate_threshold_rule(f_df, gen_config)
elif generator == "threshold_rule2":
f_df, features = generate_threshold_rule2(f_df, gen_config)
else:
# Resolve generator name to a function reference
generator_fn = resolve_generator_name(generator)
if generator_fn is None:
raise ValueError(f"Unknown feature generator name or name cannot be resolved: {generator}")
# Call this function
f_df, features = generator_fn(f_df, gen_config, config, model_store)
2023-12-10 11:51:26 +01:00
#
# Add generated features to the main data frame with all other columns and features
#
f_df = f_df[features]
fp = fs.get("feature_prefix")
if fp:
f_df = f_df.add_prefix(fp + "_")
new_features = f_df.columns.to_list()
2023-12-22 16:36:28 +01:00
# Delete new columns if they already exist
df.drop(list(set(df.columns) & set(new_features)), axis=1, inplace=True)
2023-12-10 11:51:26 +01:00
df = df.join(f_df) # Attach all derived features to the main frame
return df, new_features
def predict_feature_set(df, fs, config, model_store: ModelStore) -> Tuple[pd.DataFrame, list]:
train_features, labels, algorithms = get_features_labels_algorithms(fs, config)
train_df = df[train_features]
features = []
out_df = pd.DataFrame(index=train_df.index) # Collect predictions
for label in labels:
for model_config in algorithms:
algo_name = model_config.get("name")
algo_type = model_config.get("algo")
score_column_name = label + label_algo_separator + algo_name
# It is an entry from loaded model dict
model_pair = model_store.get_model_pair(score_column_name) # Trained model from model registry
print(f"Predict '{score_column_name}'. Algorithm {algo_name}. Label: {label}. Train length {len(train_df)}. Train columns {len(train_df.columns)}")
if algo_type == "gb":
from common.classifier_gb import predict_gb
df_y_hat = predict_gb(model_pair, train_df, model_config)
elif algo_type == "nn":
2026-01-08 18:01:38 +01:00
from common.classifier_nn import predict_nn
df_y_hat = predict_nn(model_pair, train_df, model_config)
elif algo_type == "lc":
2026-01-08 18:01:38 +01:00
from common.classifier_lc import predict_lc
df_y_hat = predict_lc(model_pair, train_df, model_config)
elif algo_type == "svc":
2026-01-08 18:01:38 +01:00
from common.classifier_svc import predict_svc
df_y_hat = predict_svc(model_pair, train_df, model_config)
else:
raise ValueError(f"Unknown algorithm type {algo_type}. Check algorithm list.")
out_df[score_column_name] = df_y_hat
features.append(score_column_name)
return out_df, features
def train_feature_set(df, fs, config) -> dict:
train_features, labels, algorithms = get_features_labels_algorithms(fs, config)
2025-01-11 18:44:28 +01:00
# Only for train mode
2025-01-11 18:44:28 +01:00
df = df.dropna(subset=train_features).reset_index(drop=True)
df = df.dropna(subset=labels).reset_index(drop=True)
models = dict() # Here collect the resulted trained models
for label in labels:
for model_config in algorithms:
algo_name = model_config.get("name")
algo_type = model_config.get("algo")
score_column_name = label + label_algo_separator + algo_name
# Limit length according to the algorith train parameters
algo_every_nth_row = model_config.get("params", {}).get("every_nth_row")
if algo_every_nth_row:
train_df = df.iloc[::algo_every_nth_row, :]
else:
train_df = df
algo_train_length = model_config.get("params", {}).get("length")
if algo_train_length:
train_df = train_df.tail(algo_train_length)
df_X = train_df[train_features]
df_y = train_df[label]
print(f"Train '{score_column_name}'. Algorithm {algo_name}. Label: {label}. Train length {len(df_X)}. Train columns {len(df_X.columns)}")
if algo_type == "gb":
from common.classifier_gb import train_gb
model_pair = train_gb(df_X, df_y, model_config)
models[score_column_name] = model_pair
elif algo_type == "nn":
2026-01-08 18:01:38 +01:00
from common.classifier_nn import train_nn
model_pair = train_nn(df_X, df_y, model_config)
models[score_column_name] = model_pair
elif algo_type == "lc":
2026-01-08 18:01:38 +01:00
from common.classifier_lc import train_lc
model_pair = train_lc(df_X, df_y, model_config)
models[score_column_name] = model_pair
elif algo_type == "svc":
2026-01-08 18:01:38 +01:00
from common.classifier_svc import train_svc
model_pair = train_svc(df_X, df_y, model_config)
models[score_column_name] = model_pair
else:
raise ValueError(f"Unknown algorithm type {algo_type}. Check algorithm list.")
return models
def get_features_labels_algorithms(fs, config) -> Tuple[list, list, list]:
"""
Get three lists by combining the entries from default lists in the config file
and lists in the generator config. The function will return a list from the specific
generator config if it is available and the default list otherwise.
For the algorithm list, it will resolve the algorithm names into their definitions if necessary.
"""
train_features_all = config.get("train_features", [])
train_features = fs.get("config").get("columns", [])
if not train_features:
train_features = fs.get("config").get("features", [])
if not train_features:
train_features = train_features_all
labels_all = config.get("labels", [])
labels = fs.get("config").get("labels", [])
if not labels:
labels = labels_all
algorithms_all = config.get("algorithms")
algorithms_str = fs.get("config").get("functions", [])
if not algorithms_str:
algorithms_str = fs.get("config").get("algorithms", [])
# The algorithms can be either strings (names) or dicts (definitions) so we resolve the names
algorithms = []
for alg in algorithms_str:
if isinstance(alg, str): # Find in the list of algorithms
alg = find_algorithm_by_name(algorithms_all, alg)
elif not isinstance(alg, dict):
raise ValueError(f"Algorithm has to be either dict or name")
algorithms.append(alg)
if not algorithms:
algorithms = algorithms_all
return train_features, labels, algorithms
async def output_feature_set(df, fs: dict, config: dict, model_store: ModelStore) -> None:
from outputs.notifier_scores import send_score_notification
from outputs.notifier_diagram import send_diagram
from outputs.notifier_trades import trader_simulation
from outputs import get_trader_functions
#
# Resolve and apply feature generator functions from the configuration
#
generator = fs.get("generator")
gen_config = fs.get('config', {})
if generator == "score_notification_model":
generator_fn = send_score_notification
elif generator == "diagram_notification_model":
generator_fn = send_diagram
elif generator == "trader_simulation":
generator_fn = trader_simulation
elif generator == "trader_binance":
generator_fn = get_trader_functions(Venue.BINANCE)["trader"]
elif generator == "trader_mt5":
generator_fn = get_trader_functions(Venue.MT5)["trader"]
else:
# Resolve generator name to a function reference
generator_fn = resolve_generator_name(generator)
if generator_fn is None:
raise ValueError(f"Unknown feature generator name or name cannot be resolved: {generator}")
# Call the resolved function
if asyncio.iscoroutinefunction(generator_fn):
if asyncio.get_running_loop():
await generator_fn(df, gen_config, config, model_store)
else:
asyncio.run(generator_fn(df, gen_config, config, model_store))
else:
generator_fn(df, gen_config, config, model_store)