intelligent-trading-bot/common/utils.py

482 lines
18 KiB
Python
Raw Permalink Normal View History

2025-04-17 07:38:16 +01:00
import logging
2022-03-20 10:09:33 +01:00
import dateparser
import pytz
from datetime import datetime, timezone, timedelta
from typing import Union, List
2022-03-20 10:09:33 +01:00
import json
from decimal import *
import numpy as np
import pandas as pd
from sklearn import metrics
from apscheduler.triggers.cron import CronTrigger
from common.gen_features import *
2022-03-20 10:09:33 +01:00
#
# Decimals
#
def to_decimal(value):
"""Convert to a decimal with the required precision. The value can be string, float or decimal."""
# Possible cases: string, 4.1-e7, float like 0.1999999999999 (=0.2), Decimal('4.1E-7')
# App.config["trade"]["symbol_info"]["baseAssetPrecision"]
n = 8
rr = Decimal(1) / (Decimal(10) ** n) # Result: 0.00000001
ret = Decimal(str(value)).quantize(rr, rounding=ROUND_DOWN)
return ret
def round_str(value, digits):
rr = Decimal(1) / (Decimal(10) ** digits) # Result for 8 digits: 0.00000001
ret = Decimal(str(value)).quantize(rr, rounding=ROUND_HALF_UP)
return f"{ret:.{digits}f}"
def round_down_str(value, digits):
rr = Decimal(1) / (Decimal(10) ** digits) # Result for 8 digits: 0.00000001
ret = Decimal(str(value)).quantize(rr, rounding=ROUND_DOWN)
return f"{ret:.{digits}f}"
#
# Interval arithmetic (pandas)
2022-03-20 10:09:33 +01:00
#
def pandas_get_interval(freq: str, timestamp: int=None):
"""
Find a discrete interval for the given timestamp and return its start and end.
:param freq: pandas frequency
:param timestamp: milliseconds (13 digits)
:return: triple (start, end)
"""
if not timestamp:
2025-11-02 12:51:54 +01:00
timestamp = int(datetime.now(timezone.utc).timestamp()) # seconds (10 digits)
# Alternatively: timestamp = int(datetime.utcnow().replace(tzinfo=pytz.utc).timestamp())
elif isinstance(timestamp, datetime):
timestamp = int(timestamp.replace(tzinfo=pytz.utc).timestamp())
elif isinstance(timestamp, int):
pass
else:
ValueError(f"Error converting timestamp {timestamp} to millis. Unknown type {type(timestamp)} ")
# Interval length for the given frequency
interval_length_sec = pandas_interval_length_ms(freq) / 1000
start = (timestamp // interval_length_sec) * interval_length_sec
end = start + interval_length_sec
return int(start*1000), int(end*1000)
def pandas_interval_length_ms(freq: str):
return int(pd.Timedelta(freq).to_pytimedelta().total_seconds() * 1000)
def get_interval_count_from_start_dt(freq: str, start_dt):
"""How many whole intervals are from the specified start datetime and now."""
interval_length_td = pd.Timedelta(freq).to_pytimedelta()
now = datetime.now(timezone.utc)
interval_count = (now - start_dt) // interval_length_td # How many whole intervals
return interval_count + 2
def get_start_dt_for_interval_count(freq: str, interval_count: int):
"""Start datetime for the specified number of whole intervals back. Result is not aligned with the reaster."""
interval_length_td = pd.Timedelta(freq).to_pytimedelta()
period_length_td = interval_length_td * (interval_count + 1)
now = datetime.now(timezone.utc)
start_dt = (now - period_length_td)
return start_dt
#
# Date and time
#
def freq_to_CronTrigger(freq: str):
# Add small time after interval end to get a complete interval from the server
if freq.endswith("min"):
if freq[:-3] == "1":
trigger = CronTrigger(minute="*", second="1", timezone="UTC")
else:
trigger = CronTrigger(minute="*/" + freq[:-3], second="1", timezone="UTC")
elif freq.endswith("h"):
if freq[:-1] == "1":
trigger = CronTrigger(hour="*", minute="0", second="2", timezone="UTC")
else:
trigger = CronTrigger(hour="*/" + freq[:-1], minute="0", second="2", timezone="UTC")
elif freq.endswith("D"):
if freq[:-1] == "1":
trigger = CronTrigger(day="*", second="5", timezone="UTC")
else:
trigger = CronTrigger(day="*/" + freq[:-1], second="5", timezone="UTC")
elif freq.endswith("W"):
if freq[:-1] == "1":
trigger = CronTrigger(week="*", second="10", timezone="UTC")
else:
trigger = CronTrigger(day="*/" + freq[:-1], second="10", timezone="UTC")
elif freq.endswith("MS"):
if freq[:-2] == "1":
trigger = CronTrigger(month="*", second="30", timezone="UTC")
else:
trigger = CronTrigger(month="*/" + freq[:-1], second="30", timezone="UTC")
else:
raise ValueError(f"Cannot convert frequency '{freq}' to cron.")
return trigger
2022-03-20 10:09:33 +01:00
def now_timestamp():
"""
"""
2025-11-02 12:51:54 +01:00
return int(datetime.now(timezone.utc).timestamp() * 1000)
2022-03-20 10:09:33 +01:00
def find_index(df: pd.DataFrame, date_str: str, column_name: str = "timestamp"):
2022-03-20 10:09:33 +01:00
"""
Return index of the record with the specified datetime string.
:return: row id in the input data frame which can be then used in iloc function
:rtype: int
"""
d = dateparser.parse(date_str)
try:
res = df[df[column_name] == d]
except TypeError: # "Cannot compare tz-naive and tz-aware datetime-like objects"
# Change timezone (set UTC timezone or reset timezone)
if d.tzinfo is None or d.tzinfo.utcoffset(d) is None:
d = d.replace(tzinfo=pytz.utc)
else:
d = d.replace(tzinfo=None)
# Repeat
res = df[df[column_name] == d]
if res is None or len(res) == 0:
raise ValueError(f"Cannot find date '{date_str}' in the column '{column_name}'. Either it does not exist or wrong format")
2022-03-20 10:09:33 +01:00
id = res.index[0]
return id
2023-12-25 17:14:37 +01:00
def notnull_tail_rows(df):
"""Maximum number of tail rows without nulls."""
nan_df = df.isnull()
nan_cols = nan_df.any() # Series with columns having at least one NaN
nan_cols = nan_cols[nan_cols].index.to_list()
if len(nan_cols) == 0:
return len(df)
# Indexes of last NaN for all columns and then their minimum
tail_rows = nan_df[nan_cols].values[::-1].argmax(axis=0).min()
return tail_rows
#
# System etc.
#
def resolve_generator_name(gen_name: str):
"""
Resolve the specified name to a function reference.
Fully qualified name consists of module name and function name separated by a colon,
for example: 'mod1.mod2.mod3:my_func'.
Example: fn = resolve_generator_name("common.gen_features_topbot:generate_labels_topbot3")
"""
mod_and_func = gen_name.split(':', 1)
mod_name = mod_and_func[0] if len(mod_and_func) > 1 else None
func_name = mod_and_func[-1]
if not mod_name:
return None
try:
mod = importlib.import_module(mod_name)
except Exception as e:
return None
if mod is None:
return None
try:
func = getattr(mod, func_name)
except AttributeError as e:
return None
return func
#
# Data processing
#
def double_columns(df, shifts: List[int]):
"""
Use previous rows as features appended to this row. This allows us to move history to the current time.
One limitation is that this function will duplicate *all* features and only using the explicitly specified list of offsets.
"""
if not shifts:
return df
df_list = [df.shift(shift) for shift in shifts]
df_list.insert(0, df)
max_shift = max(shifts)
# Shift and add same columns
df_out = pd.concat(df_list, axis=1) # keys=('A', 'B')
return df_out
2025-11-02 12:51:54 +01:00
def append_rows(df, new_df):
"""
Append all rows from the second new data frame to the first old data frame by overwriting existing rows if any.
Notes:
- The rows are appended individually (row-by-row in the loop). If the second (new) data frame is large then this operation might be inefficient.
- No validity check are performed like column overlaps or resulting index gaps.
- If it is necessary to retain continuous index then additional validations have to be done (before or after this operation).
"""
for idx, row in new_df.iterrows():
df.loc[idx] = row
return df
def append_df_drop_concat(df, new_df):
# Drop explicitly last rows in the overlap range
#df_wo_overlap = df.drop(new_df.index[:], errors="ignore") # ignore errors in case of missing indexes (for new rows). Set in_place if necessary
# Drop explicitly last rows in the overlap range
df_wo_overlap = df[:new_df.index[0]] # Select only records till the first index in the new frame. Assume the rows in the new frame are ordered
df_wo_overlap = df_wo_overlap.iloc[:-1] # Remove last element because slicing above includes the range right side
# Drop explicitly last rows in the overlap range
#last_idx = df.index.get_loc(new_df.index[0])
#df_wo_overlap = df.iloc[:last_idx]
df3 = pd.concat([df_wo_overlap, new_df]) # Append new rows with overlap removed above
return df3
def append_df_combine_update(df, new_df):
# The first old frame has priority and its values will be always retained if available (non-null).
# Only if an old value is null or a new row was appended, the new values from new frame is used.
# Update null (and only null) elements with values in the same location in other (if any, that is, null is not used for overwriting).
df2 = df.combine_first(new_df)
# Yet, we want to have new values overwrite even old non-null values so enforce complete overwriting (inplace)
df2.update(new_df)
return df2
def merge_data_sources(data_sources: list, time_column: str, freq: str, merge_interpolate: bool):
"""
Create one data frame by merging multiple source data frames on the specified time column by generating a common time raster.
:param data_sources: list of dicts where each dict describes one data source and stores its data in df
:param time_column: column name with timestamps
:param freq: pandas frequency for the common time raster like 1min, 1h etc.
:param merge_interpolate: if True then the missing values will be interpolated
:return: data frame with all data merged on timestamps
"""
for ds in data_sources:
df = ds.get("df")
if time_column in df.columns:
df = df.set_index(time_column)
elif df.index.name == time_column:
pass
else:
print(f"ERROR: Timestamp column is absent.")
return
# Add prefix if not already there
if ds['column_prefix']:
#df = df.add_prefix(ds['column_prefix']+"_")
df.columns = [
ds['column_prefix']+"_"+col if not col.startswith(ds['column_prefix']+"_") else col
for col in df.columns
]
ds["start"] = df.first_valid_index() # df.index[0]
ds["end"] = df.last_valid_index() # df.index[-1]
ds["df"] = df
#
# Create common (main) index and empty data frame
#
range_start = min([ds["start"] for ds in data_sources])
range_end = min([ds["end"] for ds in data_sources])
# Generate a discrete time raster according to the (pandas) frequency parameter
index = pd.date_range(range_start, range_end, freq=freq)
df_out = pd.DataFrame(index=index)
df_out.index.name = time_column
df_out.insert(0, time_column, df_out.index) # Repeat index as a new column
for ds in data_sources:
# Note that timestamps must have the same semantics, for example, start of kline (and not end of kline)
# If different data sets have different semantics for timestamps, then data must be shifted accordingly
df_out = df_out.join(ds["df"])
# Interpolate numeric columns
if merge_interpolate:
num_columns = df_out.select_dtypes((float, int)).columns.tolist()
for col in num_columns:
df_out[col] = df_out[col].interpolate()
return df_out
#
# Analysis
#
def compute_scores(y_true, y_hat):
"""Compute several scores and return them as dict."""
y_true = y_true.astype(int)
y_hat_class = np.where(y_hat.values > 0.5, 1, 0)
try:
auc = metrics.roc_auc_score(y_true, y_hat.fillna(value=0))
except ValueError:
auc = 0.0 # Only one class is present (if dataset is too small, e.g,. when debugging) or Nulls in predictions
try:
ap = metrics.average_precision_score(y_true, y_hat.fillna(value=0))
except ValueError:
ap = 0.0 # Only one class is present (if dataset is too small, e.g,. when debugging) or Nulls in predictions
f1 = metrics.f1_score(y_true, y_hat_class)
precision = metrics.precision_score(y_true, y_hat_class)
recall = metrics.recall_score(y_true, y_hat_class)
scores = dict(auc=auc, ap=ap, f1=f1, precision=precision, recall=recall,)
scores = {key: round(float(value), 3) for (key, value) in scores.items()}
return scores
def compute_scores_regression(y_true, y_hat):
"""Compute regression scores. Input columns must have numeric data type."""
try:
mae = metrics.mean_absolute_error(y_true, y_hat)
except ValueError:
mae = np.nan
try:
mape = metrics.mean_absolute_percentage_error(y_true, y_hat)
except ValueError:
mape = np.nan
try:
r2 = metrics.r2_score(y_true, y_hat)
except ValueError:
r2 = np.nan
#
# How good it is in predicting the sign (increase of decrease)
#
y_true_class = np.where(y_true.values > 0.0, +1, -1)
y_hat_class = np.where(y_hat.values > 0.0, +1, -1)
try:
auc = metrics.roc_auc_score(y_true_class, y_hat_class)
except ValueError:
auc = 0.0 # Only one class is present (if dataset is too small, e.g,. when debugging) or Nulls in predictions
try:
ap = metrics.average_precision_score(y_true_class, y_hat_class)
except ValueError:
ap = 0.0 # Only one class is present (if dataset is too small, e.g,. when debugging) or Nulls in predictions
f1 = metrics.f1_score(y_true_class, y_hat_class)
precision = metrics.precision_score(y_true_class, y_hat_class)
recall = metrics.recall_score(y_true_class, y_hat_class)
scores = dict(
mae=mae, mape=mape, r2=r2,
auc=auc, ap=ap, f1=f1, precision=precision, recall=recall,
)
scores = {key: round(float(value), 3) for (key, value) in scores.items()}
return scores
def first_location_of_crossing_threshold(df, horizon, threshold, close_column_name, price_column_name):
"""
For each point, take its close price as a reference, and then find the distance
(relative offset, index) to the _first_ future point with the price higher (or lower)
than the reference price by the specified level.
If the location (relative index) is 0 then it is the next point. If location (index) is NaN,
then the price does not cross the specified threshold within the horizon
(or there is not enough data, e.g., at the end of the series). Therefore, this
function can be used to find whether the price will cross the threshold at all
during the specified horizon.
The function is somewhat similar to the tsfresh function first_location_of_maximum
or minimum. The difference is that this function does not search for local maximum
(which can vary) but rather finds the first point with the fixed specified increase
(or decrease).
Horizon specifies how many future points are considered (excluding the current point).
If the specified threshold is positive then the function is searching for increase.
Otherwise, if it is negative, then the functions will search for decrease.
The threshold is specified as percentage. For example, 2% means that the function
will find relative offset (distance from this point) where the price increases by 2%
relative to the current price. This will be done for all point.
:param df: dataframe with the specified columns
:param horizon: how many future data rows to consider when searching for the specified price increase or decrease
:param threshold: percentage of increase (if positive) or decrease (if negative)
:param close_column_name: column for reference (initial) prices
:param price_column_name: columns to search for the specified increase or decrease.
Frequently, it is high column for price increase and low column for price decrease.
But it can be also the reference column (for example, close price) for both cases if we ignore high and low peaks..
:return: A series with integer (or None) values. One integer value represent the offset from the current value
where the price increases (or decreases) by the specified percentage. None means that the price does not reach
the specified level within the horizon.
"""
def fn_high(x):
if len(x) < 2:
return np.nan
p = x[0, 0] # Reference price
p_threshold = p*(1+(threshold/100.0)) # Cross line
idx = np.argmax(x[1:, 1] > p_threshold) # First index where price crosses the threshold
# If all False, then index is 0 (first element of constant series) and we are not able to distinguish it from first element being True
# If index is 0 and first element False (under threshold) then NaN (not exceeds)
if idx == 0 and x[1, 1] <= p_threshold:
return np.nan
return idx
def fn_low(x):
if len(x) < 2:
return np.nan
p = x[0, 0] # Reference price
p_threshold = p*(1+(threshold/100.0)) # Cross line
idx = np.argmax(x[1:, 1] < p_threshold) # First index where price crosses the threshold
# If all False, then index is 0 (first element of constant series) and we are not able to distinguish it from first element being True
# If index is 0 and first element False (under threshold) then NaN (not exceeds)
if idx == 0 and x[1, 1] >= p_threshold:
return np.nan
return idx
# Window df will include the current row as well as horizon of past rows with 0 index starting from the oldest row and last index with the current row
rl = df[[close_column_name, price_column_name]].rolling(horizon + 1, min_periods=(horizon // 2), method='table')
if threshold > 0:
df_out = rl.apply(fn_high, raw=True, engine='numba')
elif threshold < 0:
df_out = rl.apply(fn_low, raw=True, engine='numba')
else:
raise ValueError(f"Threshold cannot be zero.")
# Because rolling apply processes past records while we need future records
df_out = df_out.shift(-horizon)
# For some unknown reason (bug?), rolling apply (with table and numba) returns several columns rather than one column
out_column = df_out.iloc[:, 0]
return out_column