의 미러
https://github.com/asavinov/intelligent-trading-bot.git
synced 2026-05-04 08:26:19 +00:00
307 lines
12 KiB
Python
307 lines
12 KiB
Python
from typing import Tuple
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
from pandas.api.types import is_float_dtype, is_numeric_dtype, is_integer_dtype, is_string_dtype
|
|
|
|
from common.classifiers import *
|
|
from common.model_store import *
|
|
from common.gen_features import *
|
|
from common.gen_labels_highlow import generate_labels_highlow, generate_labels_highlow2
|
|
from common.gen_labels_topbot import generate_labels_topbot, generate_labels_topbot2
|
|
from common.gen_signals import (
|
|
generate_smoothen_scores, generate_combine_scores,
|
|
generate_threshold_rule, generate_threshold_rule2
|
|
)
|
|
|
|
from outputs.notifier_scores import *
|
|
from outputs.notifier_diagram import *
|
|
from outputs.notifier_trades import *
|
|
from outputs.trader_binance import *
|
|
|
|
|
|
def generate_feature_set(df: pd.DataFrame, fs: dict, last_rows: int) -> Tuple[pd.DataFrame, list]:
|
|
"""
|
|
Apply the specified resolved feature generator to the input data set.
|
|
"""
|
|
|
|
#
|
|
# Select columns from the data set to be processed by the feature generator
|
|
#
|
|
cp = fs.get("column_prefix")
|
|
if cp:
|
|
cp = cp + "_"
|
|
f_cols = [col for col in df if col.startswith(cp)]
|
|
f_df = df[f_cols] # Alternatively: f_df = df.loc[:, df.columns.str.startswith(cf)]
|
|
# Remove prefix because feature generators are generic (a prefix will be then added to derived features before adding them back to the main frame)
|
|
f_df = f_df.rename(columns=lambda x: x[len(cp):] if x.startswith(cp) else x) # Alternatively: f_df.columns = f_df.columns.str.replace(cp, "")
|
|
else:
|
|
f_df = df[df.columns.to_list()] # We want to have a different data frame object to add derived featuers and then join them back to the main frame with prefix
|
|
|
|
#
|
|
# Resolve and apply feature generator functions from the configuration
|
|
#
|
|
generator = fs.get("generator")
|
|
gen_config = fs.get('config', {})
|
|
if generator == "itblib":
|
|
features = generate_features_itblib(f_df, gen_config, last_rows=last_rows)
|
|
elif generator == "depth":
|
|
features = generate_features_depth(f_df)
|
|
elif generator == "tsfresh":
|
|
features = generate_features_tsfresh(f_df, gen_config, last_rows=last_rows)
|
|
elif generator == "talib":
|
|
features = generate_features_talib(f_df, gen_config, last_rows=last_rows)
|
|
elif generator == "itbstats":
|
|
features = generate_features_itbstats(f_df, gen_config, last_rows=last_rows)
|
|
|
|
# Labels
|
|
elif generator == "highlow":
|
|
horizon = gen_config.get("horizon")
|
|
|
|
# Binary labels whether max has exceeded a threshold or not
|
|
print(f"Generating 'highlow' labels with horizon {horizon}...")
|
|
features = generate_labels_highlow(f_df, horizon=horizon)
|
|
|
|
print(f"Finished generating 'highlow' labels. {len(features)} labels generated.")
|
|
elif generator == "highlow2":
|
|
print(f"Generating 'highlow2' labels...")
|
|
f_df, features = generate_labels_highlow2(f_df, gen_config)
|
|
print(f"Finished generating 'highlow2' labels. {len(features)} labels generated.")
|
|
elif generator == "topbot":
|
|
column_name = gen_config.get("columns", "close")
|
|
|
|
top_level_fracs = [0.01, 0.02, 0.03, 0.04, 0.05]
|
|
bot_level_fracs = [-x for x in top_level_fracs]
|
|
|
|
f_df, features = generate_labels_topbot(f_df, column_name, top_level_fracs, bot_level_fracs)
|
|
elif generator == "topbot2":
|
|
f_df, features = generate_labels_topbot2(f_df, gen_config)
|
|
|
|
# Signals
|
|
elif generator == "smoothen":
|
|
f_df, features = generate_smoothen_scores(f_df, gen_config)
|
|
elif generator == "combine":
|
|
f_df, features = generate_combine_scores(f_df, gen_config)
|
|
elif generator == "threshold_rule":
|
|
f_df, features = generate_threshold_rule(f_df, gen_config)
|
|
elif generator == "threshold_rule2":
|
|
f_df, features = generate_threshold_rule2(f_df, gen_config)
|
|
|
|
else:
|
|
# Resolve generator name to a function reference
|
|
generator_fn = resolve_generator_name(generator)
|
|
if generator_fn is None:
|
|
raise ValueError(f"Unknown feature generator name or name cannot be resolved: {generator}")
|
|
|
|
# Call this function
|
|
f_df, features = generator_fn(f_df, gen_config)
|
|
|
|
#
|
|
# Add generated features to the main data frame with all other columns and features
|
|
#
|
|
f_df = f_df[features]
|
|
fp = fs.get("feature_prefix")
|
|
if fp:
|
|
f_df = f_df.add_prefix(fp + "_")
|
|
|
|
new_features = f_df.columns.to_list()
|
|
|
|
# Delete new columns if they already exist
|
|
df.drop(list(set(df.columns) & set(new_features)), axis=1, inplace=True)
|
|
|
|
df = df.join(f_df) # Attach all derived features to the main frame
|
|
|
|
return df, new_features
|
|
|
|
|
|
def predict_feature_set(df, fs, config, models: dict):
|
|
|
|
labels_all = config.get("labels", [])
|
|
labels = fs.get("config").get("labels", [])
|
|
if not labels:
|
|
labels = labels_all
|
|
|
|
algorithms_all = config.get("algorithms")
|
|
algorithms_str = fs.get("config").get("functions", [])
|
|
if not algorithms_str:
|
|
algorithms_str = fs.get("config").get("algorithms", [])
|
|
# The algorithms can be either strings (names) or dicts (definitions) so we resolve the names
|
|
algorithms = []
|
|
for alg in algorithms_str:
|
|
if isinstance(alg, str): # Find in the list of algorithms
|
|
alg = find_algorithm_by_name(algorithms_all, alg)
|
|
elif not isinstance(alg, dict):
|
|
raise ValueError(f"Algorithm has to be either dict or name")
|
|
algorithms.append(alg)
|
|
if not algorithms:
|
|
algorithms = algorithms_all
|
|
|
|
train_features_all = config.get("train_features", [])
|
|
train_features = fs.get("config").get("columns", [])
|
|
if not train_features:
|
|
train_features = fs.get("config").get("features", [])
|
|
if not train_features:
|
|
train_features = train_features_all
|
|
|
|
train_df = df[train_features]
|
|
|
|
features = []
|
|
scores = dict()
|
|
out_df = pd.DataFrame(index=train_df.index) # Collect predictions
|
|
|
|
for label in labels:
|
|
for model_config in algorithms:
|
|
|
|
algo_name = model_config.get("name")
|
|
algo_type = model_config.get("algo")
|
|
score_column_name = label + label_algo_separator + algo_name
|
|
|
|
# It is an entry from loaded model dict
|
|
model_pair = models.get(score_column_name) # Trained model from model registry
|
|
|
|
print(f"Predict '{score_column_name}'. Algorithm {algo_name}. Label: {label}. Train length {len(train_df)}. Train columns {len(train_df.columns)}")
|
|
|
|
if algo_type == "gb":
|
|
df_y_hat = predict_gb(model_pair, train_df, model_config)
|
|
elif algo_type == "nn":
|
|
df_y_hat = predict_nn(model_pair, train_df, model_config)
|
|
elif algo_type == "lc":
|
|
df_y_hat = predict_lc(model_pair, train_df, model_config)
|
|
elif algo_type == "svc":
|
|
df_y_hat = predict_svc(model_pair, train_df, model_config)
|
|
else:
|
|
raise ValueError(f"Unknown algorithm type '{algo_type}'")
|
|
|
|
out_df[score_column_name] = df_y_hat
|
|
features.append(score_column_name)
|
|
|
|
# For each new score, compare it with the label true values
|
|
if label in df:
|
|
df_y = df[label]
|
|
if is_float_dtype(df_y) and is_float_dtype(df_y_hat):
|
|
scores[score_column_name] = compute_scores_regression(df_y, df_y_hat) # Regression stores
|
|
else:
|
|
scores[score_column_name] = compute_scores(df_y, df_y_hat) # Classification stores
|
|
|
|
return out_df, features, scores
|
|
|
|
|
|
def train_feature_set(df, fs, config):
|
|
|
|
labels_all = config.get("labels", [])
|
|
labels = fs.get("config").get("labels", [])
|
|
if not labels:
|
|
labels = labels_all
|
|
|
|
algorithms_all = config.get("algorithms")
|
|
algorithms_str = fs.get("config").get("functions", [])
|
|
if not algorithms_str:
|
|
algorithms_str = fs.get("config").get("algorithms", [])
|
|
# The algorithms can be either strings (names) or dicts (definitions) so we resolve the names
|
|
algorithms = []
|
|
for alg in algorithms_str:
|
|
if isinstance(alg, str): # Find in the list of algorithms
|
|
alg = find_algorithm_by_name(algorithms_all, alg)
|
|
elif not isinstance(alg, dict):
|
|
raise ValueError(f"Algorithm has to be either dict or name")
|
|
algorithms.append(alg)
|
|
if not algorithms:
|
|
algorithms = algorithms_all
|
|
|
|
train_features_all = config.get("train_features", [])
|
|
train_features = fs.get("config").get("columns", [])
|
|
if not train_features:
|
|
train_features = fs.get("config").get("features", [])
|
|
if not train_features:
|
|
train_features = train_features_all
|
|
|
|
# Only for train mode
|
|
df = df.dropna(subset=train_features).reset_index(drop=True)
|
|
df = df.dropna(subset=labels).reset_index(drop=True)
|
|
|
|
models = dict()
|
|
scores = dict()
|
|
out_df = pd.DataFrame() # Collect predictions
|
|
|
|
for label in labels:
|
|
for model_config in algorithms:
|
|
|
|
algo_name = model_config.get("name")
|
|
algo_type = model_config.get("algo")
|
|
score_column_name = label + label_algo_separator + algo_name
|
|
|
|
# Limit length according to the algorith train parameters
|
|
algo_every_nth_row = model_config.get("params", {}).get("every_nth_row")
|
|
if algo_every_nth_row:
|
|
train_df = df.iloc[::algo_every_nth_row, :]
|
|
else:
|
|
train_df = df
|
|
algo_train_length = model_config.get("params", {}).get("length")
|
|
if algo_train_length:
|
|
train_df = train_df.tail(algo_train_length)
|
|
|
|
df_X = train_df[train_features]
|
|
df_y = train_df[label]
|
|
|
|
print(f"Train '{score_column_name}'. Algorithm {algo_name}. Label: {label}. Train length {len(df_X)}. Train columns {len(df_X.columns)}")
|
|
|
|
if algo_type == "gb":
|
|
model_pair = train_gb(df_X, df_y, model_config)
|
|
models[score_column_name] = model_pair
|
|
df_y_hat = predict_gb(model_pair, df_X, model_config)
|
|
elif algo_type == "nn":
|
|
model_pair = train_nn(df_X, df_y, model_config)
|
|
models[score_column_name] = model_pair
|
|
df_y_hat = predict_nn(model_pair, df_X, model_config)
|
|
elif algo_type == "lc":
|
|
model_pair = train_lc(df_X, df_y, model_config)
|
|
models[score_column_name] = model_pair
|
|
df_y_hat = predict_lc(model_pair, df_X, model_config)
|
|
elif algo_type == "svc":
|
|
model_pair = train_svc(df_X, df_y, model_config)
|
|
models[score_column_name] = model_pair
|
|
df_y_hat = predict_svc(model_pair, df_X, model_config)
|
|
else:
|
|
print(f"ERROR: Unknown algorithm type {algo_type}. Check algorithm list.")
|
|
return
|
|
|
|
out_df[score_column_name] = df_y_hat
|
|
|
|
if is_float_dtype(df_y) and is_float_dtype(df_y_hat):
|
|
scores[score_column_name] = compute_scores_regression(df_y, df_y_hat) # Regression stores
|
|
else:
|
|
scores[score_column_name] = compute_scores(df_y, df_y_hat) # Classification stores
|
|
|
|
return out_df, models, scores
|
|
|
|
|
|
async def output_feature_set(df, fs: dict, config: dict):
|
|
#
|
|
# Resolve and apply feature generator functions from the configuration
|
|
#
|
|
generator = fs.get("generator")
|
|
gen_config = fs.get('config', {})
|
|
|
|
if generator == "score_notification_model":
|
|
generator_fn = send_score_notification
|
|
elif generator == "diagram_notification_model":
|
|
generator_fn = send_diagram
|
|
elif generator == "trader_simulation":
|
|
generator_fn = trader_simulation
|
|
elif generator == "trader_binance":
|
|
generator_fn = trader_binance
|
|
|
|
else:
|
|
# Resolve generator name to a function reference
|
|
generator_fn = resolve_generator_name(generator)
|
|
if generator_fn is None:
|
|
raise ValueError(f"Unknown feature generator name or name cannot be resolved: {generator}")
|
|
|
|
# Call the resolved function
|
|
if asyncio.iscoroutinefunction(generator_fn):
|
|
if asyncio.get_running_loop():
|
|
await generator_fn(df, gen_config, config)
|
|
else:
|
|
asyncio.run(generator_fn(df, gen_config, config))
|
|
else:
|
|
generator_fn(df, gen_config, config)
|