intelligent-trading-bot/outputs/notifier_diagram.py

325 lines
12 KiB
Python
Raw Permalink Normal View History

import os
import sys
import io
from datetime import timedelta, datetime
import asyncio
import pandas as pd
2025-08-24 12:46:56 +02:00
import pandas.api.types as ptypes
import requests
from service.App import *
from common.utils import *
from common.model_store import *
from outputs.notifier_trades import load_all_transactions
import logging
log = logging.getLogger('notifier')
logging.getLogger('PIL').setLevel(logging.WARNING)
logging.getLogger('matplotlib').setLevel(logging.WARNING)
async def send_diagram(df, model: dict, config: dict, model_store: ModelStore):
"""
Produce a line chart based on latest data and send it to the channel.
"""
2025-08-24 12:46:56 +02:00
freq = config.get("freq")
time_column = config["time_column"]
# Ensure that timestamp is in index. It is needed for visualization
if not ptypes.is_datetime64_any_dtype(df.index): # Alternatively df.index.inferred_type == "datetime64"
2025-08-24 12:46:56 +02:00
if time_column in df.columns:
df = df.set_index('timestamp', inplace=False)
else:
raise ValueError(f"Neither index nor time columns '{time_column}' are of datetime type")
notification_freq = model.get("notification_freq")
if notification_freq:
# Continue only if system interval start is equal to the (longer) diagram interval start
if pandas_get_interval(notification_freq)[0] != pandas_get_interval(freq)[0]:
return
2023-12-28 14:27:08 +01:00
score_column_names = model.get("score_column_names")
if isinstance(score_column_names, str):
score_column_names = [score_column_names]
elif isinstance(score_column_names, list) and len(score_column_names) > 1:
score_column_names = [score_column_names[0]]
log.warning(f"Parameter 'score_column_names' should be one column. Only the first column will be visualized.")
2023-12-28 14:27:08 +01:00
score_thresholds = model.get("score_thresholds")
2024-05-15 19:15:00 +02:00
resampling_freq = model.get("resampling_freq") # Resampling (aggregation) frequency
nrows = model.get("nrows") # Time range (x axis) of the diagram, for example, 1 week 168 hours, 2 weeks 336 hours
row = df.iloc[-1] # Last row stores the latest values we need
#
# Prepare data to be visualized
#
2023-12-28 14:27:08 +01:00
vis_columns = ['open', 'high', 'low', 'close']
score_col = score_column_names[0]
vis_columns.append(score_col)
# Generate MAs if necessary
score_mas = model.get("score_ma", [])
if not isinstance(score_mas, list):
score_mas = [score_mas]
for ma in score_mas:
if not isinstance(ma, int):
log.error(f"Parameter 'score_ma' {ma} have to be an integer or a list of integers. Ignore")
continue
# TODO: Compute moving average column and add it to the list of columns to be visualized
ma_column_name = f"{score_col}_{ma}"
df[ma_column_name] = df[score_col].rolling(window=ma).mean()
vis_columns.append(ma_column_name)
score_column_names.append(ma_column_name)
# Resample
df_ohlc = df[vis_columns]
df_ohlc = resample_ohlc_data(df_ohlc.reset_index(), resampling_freq, nrows, score_columns=score_column_names, buy_signal_column=None, sell_signal_column=None)
2023-12-28 14:27:08 +01:00
# Get transaction data
df_t = load_all_transactions() # timestamp,price,profit,status
df_t['buy_long'] = df_t['status'].apply(lambda x: True if isinstance(x, str) and x == 'BUY' else False)
df_t['sell_long'] = df_t['status'].apply(lambda x: True if isinstance(x, str) and x == 'SELL' else False)
df_t = df_t[df_t.timestamp >= df_ohlc.timestamp.min()] # select only transactions for the last time
transactions_exist = len(df_t) > 0
if transactions_exist:
2024-05-15 19:15:00 +02:00
df_t = resample_transaction_data(df_t, resampling_freq, 0, 'buy_long', 'sell_long')
else:
df_t = None
# Merge because we need signals along with close price in one df
if transactions_exist:
df = df_ohlc.merge(df_t, how='left', left_on='timestamp', right_on='timestamp')
else:
df = df_ohlc
symbol = config["symbol"]
title = f"$\\bf{{{symbol}}}$"
description = config.get("description", "")
if description:
title += ": " + description
fig = generate_chart(
df, title,
buy_signal_column="buy_long" if transactions_exist else None,
sell_signal_column="sell_long" if transactions_exist else None,
2023-12-28 14:27:08 +01:00
score_column=score_column_names if score_column_names else None,
thresholds=score_thresholds
)
with io.BytesIO() as buf:
2025-04-06 00:23:31 +02:00
fig.savefig(buf, format='png', bbox_inches='tight', pad_inches=0.1) # Convert and save in buffer
im_bytes = buf.getvalue() # Get complete content (while read returns from current position)
img_data = im_bytes
#
# Send image
#
bot_token = config["telegram_bot_token"]
chat_id = config["telegram_chat_id"]
files = {'photo': img_data}
payload = {
'chat_id': chat_id,
'caption': f"", # Currently no text
'parse_mode': 'markdown'
}
try:
url = 'https://api.telegram.org/bot' + bot_token + '/sendPhoto'
req = requests.post(url=url, data=payload, files=files)
response = req.json()
except Exception as e:
log.error(f"Error sending notification: {e}")
def resample_ohlc_data(df, freq, nrows, score_columns, buy_signal_column, sell_signal_column):
"""
Resample ohlc data to lower frequency. Assumption: time in 'timestamp' column.
"""
# Aggregation functions
ohlc = {
'timestamp': 'first', # It will be in index
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last',
}
if isinstance(score_columns, str):
score_columns = [score_columns]
for col in score_columns:
# Add to aggregations
ohlc[col] = lambda x: max(x) if len(x) > 0 and all(x > 0.0) else min(x) if len(x) > 0 and all(x < 0.0) else np.mean(x)
2023-12-28 14:27:08 +01:00
if buy_signal_column:
# Buy signal if at least one buy signal was during this time interval
2023-12-28 14:27:08 +01:00
ohlc[buy_signal_column] = lambda x: True if not all(x == False) else False
# Alternatively, 0 if no buy signals, 1 if only 1 buy signal, 2 or -1 if more than 1 any signals (mixed interval)
if sell_signal_column:
# Sell signal if at least one sell signal was during this time interval
2023-12-28 14:27:08 +01:00
ohlc[sell_signal_column] = lambda x: True if not all(x == False) else False
df_out = df.resample(freq, on='timestamp').apply(ohlc)
del df_out['timestamp']
df_out.reset_index(inplace=True)
if nrows:
df_out = df_out.tail(nrows)
return df_out
def resample_transaction_data(df, freq, nrows, buy_signal_column, sell_signal_column):
"""
Given a list of transactions with arbitrary timestamps,
return a regular time series with True or False for the rows with transactions
Assumption: time in 'timestamp' column
PROBLEM: only one transaction per interval (1 hour) is possible so if we buy and then sell within one hour then we cannot represent this
Solution 1: remove
Solution 2: introduce a special symbol (like dot instead of arrows) which denotes one or more transactions - essentially error or inability to visualize
1 week 7*1440=10080 points, 5 min - 2016 points, 10 mins - 1008 points
"""
# Aggregation functions
transactions = {
'timestamp': 'first', # It will be in index
buy_signal_column: lambda x: True if not all(x == False) else False,
sell_signal_column: lambda x: True if not all(x == False) else False,
}
df_out = df.resample(freq, on='timestamp').apply(transactions)
del df_out['timestamp']
df_out.reset_index(inplace=True)
if nrows:
df_out = df_out.tail(nrows)
return df_out
def generate_chart(df, title, buy_signal_column, sell_signal_column, score_column, thresholds: list):
"""
All columns in one input df with desired length and desired freq
Visualize columns 1 (pre-defined): high, low, close
Visualize columns 1 (via parameters): buy_signal_column, sell_signal_column
Visualize columns 2: score_column (optional) - in [-1, +1]
Visualize columns 2: Threshold lines (as many as there are values in the list)
"""
from matplotlib import pyplot as plt
import matplotlib.dates as mdates
import seaborn as sns
# List of colors: https://matplotlib.org/stable/gallery/color/named_colors.html
2025-04-06 11:03:34 +02:00
sns.set_style('white', {'axes.grid': False, 'axes.spines.top': True, 'axes.edgecolor': 'lightgrey'}) # white, darkgrid, whitegrid, dark, ticks
# sns.set_style('white', {'axes.facecolor':'yellow', 'grid.color': '.8', 'font.family':'Times New Roman'})
sns.set_context("notebook") # notebook (default), talk, paper, poster
# sns.despine(left=True, offset=10, trim=True)
# sns.despine(fig=None, ax=None, top=False, right=False, left=False, bottom=False, offset=None, trim=False)
# sns.despine(bottom = True, left = False)
# sns.color_palette("rocket")
# sns.set(rc={'axes.facecolor': 'gold', 'figure.facecolor': 'white'})
# sns.set(rc={'figure.facecolor': 'gold'})
fig, ax1 = plt.subplots(figsize=(12, 6))
# plt.tight_layout()
2025-04-06 11:03:34 +02:00
# plt.grid(axis="x")
#
# Price: High, Low, Close
#
# Fill area between high and low
plt.fill_between(df.timestamp, df.low, df.high, step="mid", lw=0.0, facecolor='skyblue', alpha=.4) # edgecolor='red',
2025-04-06 11:03:34 +02:00
# Close price
sns.lineplot(data=df, x="timestamp", y="close", drawstyle='steps-mid', lw=.5, color='blue', ax=ax1)
# sns.pointplot(data=df, x="timestamp", y="close", color='darkblue', ax=ax1)
2025-04-06 11:03:34 +02:00
#
# Transactions (optional): buy or sell triangles over price curve
#
2026-04-26 17:19:56 +02:00
# Slightly move the buy/sell triangles so that they exactly point to the price by their corner
# Yet, it depends on the price scale (varies for different assets), so set to 0 before a better solution is found
triangle_adj = 0
df["close_buy_adj"] = df["close"] - triangle_adj
df["close_sell_adj"] = df["close"] + triangle_adj
if buy_signal_column:
2024-05-15 19:15:00 +02:00
sns.lineplot(data=df[df[buy_signal_column] == True], x="timestamp", y="close_buy_adj", lw=0, markerfacecolor="green", markersize=10, marker="^", alpha=0.6, ax=ax1)
if sell_signal_column:
2024-05-15 19:15:00 +02:00
sns.lineplot(data=df[df[sell_signal_column] == True], x="timestamp", y="close_sell_adj", lw=0, markerfacecolor="red", markersize=10, marker="v", alpha=0.6, ax=ax1)
# g2.set(yticklabels=[])
# g2.set(title='Penguins: Body Mass by Species for Gender')
ax1.set(xlabel=None) # remove the x-axis label
# g2.set(ylabel=None) # remove the y-axis label
2025-04-06 11:03:34 +02:00
ax1.set_ylabel('Close price', color='blue', fontsize=16)
# g2.tick_params(left=False) # remove the ticks
2023-12-28 14:27:08 +01:00
ymin = df['low'].min()
ymax = df['high'].max()
ax1.set(ylim=(ymin - (ymax - ymin) * 0.05, ymax + (ymax - ymin) * 0.005))
2025-04-06 11:03:34 +02:00
# ax1.set_frame_on(False)
2025-04-06 11:03:34 +02:00
# ax1.xaxis.set_major_locator(mdates.DayLocator())
ax1.xaxis.set_major_formatter(mdates.DateFormatter("%H:%M")) # "%H:%M:%S" "%d %b"
ax1.tick_params(axis="x", rotation=90)
ax1.xaxis.grid(True)
#
# Indicator line
#
if not isinstance(score_column, list):
score_column = [score_column]
main_score_column = score_column[0] if len(score_column) > 0 else None
if main_score_column and main_score_column in df.columns:
ax2 = ax1.twinx()
2023-12-28 14:27:08 +01:00
ymax = max(df[main_score_column].abs().max(), max(thresholds) if thresholds else 0.0)
2025-04-06 11:03:34 +02:00
ax2.set(ylim=(-ymax * 1.2, +ymax * 1.2))
2025-04-06 11:03:34 +02:00
# ax2.set_frame_on(False)
ax2.xaxis.grid(True)
# ax2.axhline(0.0, lw=.1, color="black")
# Draw horizontal threshold lines
for threshold in thresholds:
2025-04-06 11:03:34 +02:00
ax2.axhline(threshold, lw=3.0, color="lightgray")
# Draw secondary score columns if any
for i, sec_score_col in enumerate(score_column):
if i == 0:
continue
sns.lineplot(data=df, x="timestamp", y=sec_score_col, drawstyle='default', lw=1.0, color="violet", ax=ax2) # marker="v" "^" , markersize=12
# Primary score
2025-04-06 11:03:34 +02:00
# ax2.plot(x, y1, 'o-', color="red" )
sns.lineplot(data=df, x="timestamp", y=main_score_column, drawstyle='steps-mid', lw=1.0, color="red", ax=ax2) # marker="v" "^" , markersize=12
2025-04-06 11:03:34 +02:00
ax2.set_ylabel('Intelligent Indicator', color='r', fontsize=16)
# ax2.set_ylabel('Score', color='b')
# fig.suptitle("My figtitle", fontsize=14) # Positioned higher
# plt.title('Weekly: $\\bf{S&P 500}$', fontsize=16) # , weight='bold' or MathText
plt.title(title, fontsize=14)
# ax1.set_title('My Title')
# plt.show()
return fig