intelligent-trading-bot/outputs/notifier_diagram.py
2026-04-26 17:19:56 +02:00

325 lines
12 KiB
Python

import os
import sys
import io
from datetime import timedelta, datetime
import asyncio
import pandas as pd
import pandas.api.types as ptypes
import requests
from service.App import *
from common.utils import *
from common.model_store import *
from outputs.notifier_trades import load_all_transactions
import logging
log = logging.getLogger('notifier')
logging.getLogger('PIL').setLevel(logging.WARNING)
logging.getLogger('matplotlib').setLevel(logging.WARNING)
async def send_diagram(df, model: dict, config: dict, model_store: ModelStore):
"""
Produce a line chart based on latest data and send it to the channel.
"""
freq = config.get("freq")
time_column = config["time_column"]
# Ensure that timestamp is in index. It is needed for visualization
if not ptypes.is_datetime64_any_dtype(df.index): # Alternatively df.index.inferred_type == "datetime64"
if time_column in df.columns:
df = df.set_index('timestamp', inplace=False)
else:
raise ValueError(f"Neither index nor time columns '{time_column}' are of datetime type")
notification_freq = model.get("notification_freq")
if notification_freq:
# Continue only if system interval start is equal to the (longer) diagram interval start
if pandas_get_interval(notification_freq)[0] != pandas_get_interval(freq)[0]:
return
score_column_names = model.get("score_column_names")
if isinstance(score_column_names, str):
score_column_names = [score_column_names]
elif isinstance(score_column_names, list) and len(score_column_names) > 1:
score_column_names = [score_column_names[0]]
log.warning(f"Parameter 'score_column_names' should be one column. Only the first column will be visualized.")
score_thresholds = model.get("score_thresholds")
resampling_freq = model.get("resampling_freq") # Resampling (aggregation) frequency
nrows = model.get("nrows") # Time range (x axis) of the diagram, for example, 1 week 168 hours, 2 weeks 336 hours
row = df.iloc[-1] # Last row stores the latest values we need
#
# Prepare data to be visualized
#
vis_columns = ['open', 'high', 'low', 'close']
score_col = score_column_names[0]
vis_columns.append(score_col)
# Generate MAs if necessary
score_mas = model.get("score_ma", [])
if not isinstance(score_mas, list):
score_mas = [score_mas]
for ma in score_mas:
if not isinstance(ma, int):
log.error(f"Parameter 'score_ma' {ma} have to be an integer or a list of integers. Ignore")
continue
# TODO: Compute moving average column and add it to the list of columns to be visualized
ma_column_name = f"{score_col}_{ma}"
df[ma_column_name] = df[score_col].rolling(window=ma).mean()
vis_columns.append(ma_column_name)
score_column_names.append(ma_column_name)
# Resample
df_ohlc = df[vis_columns]
df_ohlc = resample_ohlc_data(df_ohlc.reset_index(), resampling_freq, nrows, score_columns=score_column_names, buy_signal_column=None, sell_signal_column=None)
# Get transaction data
df_t = load_all_transactions() # timestamp,price,profit,status
df_t['buy_long'] = df_t['status'].apply(lambda x: True if isinstance(x, str) and x == 'BUY' else False)
df_t['sell_long'] = df_t['status'].apply(lambda x: True if isinstance(x, str) and x == 'SELL' else False)
df_t = df_t[df_t.timestamp >= df_ohlc.timestamp.min()] # select only transactions for the last time
transactions_exist = len(df_t) > 0
if transactions_exist:
df_t = resample_transaction_data(df_t, resampling_freq, 0, 'buy_long', 'sell_long')
else:
df_t = None
# Merge because we need signals along with close price in one df
if transactions_exist:
df = df_ohlc.merge(df_t, how='left', left_on='timestamp', right_on='timestamp')
else:
df = df_ohlc
symbol = config["symbol"]
title = f"$\\bf{{{symbol}}}$"
description = config.get("description", "")
if description:
title += ": " + description
fig = generate_chart(
df, title,
buy_signal_column="buy_long" if transactions_exist else None,
sell_signal_column="sell_long" if transactions_exist else None,
score_column=score_column_names if score_column_names else None,
thresholds=score_thresholds
)
with io.BytesIO() as buf:
fig.savefig(buf, format='png', bbox_inches='tight', pad_inches=0.1) # Convert and save in buffer
im_bytes = buf.getvalue() # Get complete content (while read returns from current position)
img_data = im_bytes
#
# Send image
#
bot_token = config["telegram_bot_token"]
chat_id = config["telegram_chat_id"]
files = {'photo': img_data}
payload = {
'chat_id': chat_id,
'caption': f"", # Currently no text
'parse_mode': 'markdown'
}
try:
url = 'https://api.telegram.org/bot' + bot_token + '/sendPhoto'
req = requests.post(url=url, data=payload, files=files)
response = req.json()
except Exception as e:
log.error(f"Error sending notification: {e}")
def resample_ohlc_data(df, freq, nrows, score_columns, buy_signal_column, sell_signal_column):
"""
Resample ohlc data to lower frequency. Assumption: time in 'timestamp' column.
"""
# Aggregation functions
ohlc = {
'timestamp': 'first', # It will be in index
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last',
}
if isinstance(score_columns, str):
score_columns = [score_columns]
for col in score_columns:
# Add to aggregations
ohlc[col] = lambda x: max(x) if len(x) > 0 and all(x > 0.0) else min(x) if len(x) > 0 and all(x < 0.0) else np.mean(x)
if buy_signal_column:
# Buy signal if at least one buy signal was during this time interval
ohlc[buy_signal_column] = lambda x: True if not all(x == False) else False
# Alternatively, 0 if no buy signals, 1 if only 1 buy signal, 2 or -1 if more than 1 any signals (mixed interval)
if sell_signal_column:
# Sell signal if at least one sell signal was during this time interval
ohlc[sell_signal_column] = lambda x: True if not all(x == False) else False
df_out = df.resample(freq, on='timestamp').apply(ohlc)
del df_out['timestamp']
df_out.reset_index(inplace=True)
if nrows:
df_out = df_out.tail(nrows)
return df_out
def resample_transaction_data(df, freq, nrows, buy_signal_column, sell_signal_column):
"""
Given a list of transactions with arbitrary timestamps,
return a regular time series with True or False for the rows with transactions
Assumption: time in 'timestamp' column
PROBLEM: only one transaction per interval (1 hour) is possible so if we buy and then sell within one hour then we cannot represent this
Solution 1: remove
Solution 2: introduce a special symbol (like dot instead of arrows) which denotes one or more transactions - essentially error or inability to visualize
1 week 7*1440=10080 points, 5 min - 2016 points, 10 mins - 1008 points
"""
# Aggregation functions
transactions = {
'timestamp': 'first', # It will be in index
buy_signal_column: lambda x: True if not all(x == False) else False,
sell_signal_column: lambda x: True if not all(x == False) else False,
}
df_out = df.resample(freq, on='timestamp').apply(transactions)
del df_out['timestamp']
df_out.reset_index(inplace=True)
if nrows:
df_out = df_out.tail(nrows)
return df_out
def generate_chart(df, title, buy_signal_column, sell_signal_column, score_column, thresholds: list):
"""
All columns in one input df with desired length and desired freq
Visualize columns 1 (pre-defined): high, low, close
Visualize columns 1 (via parameters): buy_signal_column, sell_signal_column
Visualize columns 2: score_column (optional) - in [-1, +1]
Visualize columns 2: Threshold lines (as many as there are values in the list)
"""
from matplotlib import pyplot as plt
import matplotlib.dates as mdates
import seaborn as sns
# List of colors: https://matplotlib.org/stable/gallery/color/named_colors.html
sns.set_style('white', {'axes.grid': False, 'axes.spines.top': True, 'axes.edgecolor': 'lightgrey'}) # white, darkgrid, whitegrid, dark, ticks
# sns.set_style('white', {'axes.facecolor':'yellow', 'grid.color': '.8', 'font.family':'Times New Roman'})
sns.set_context("notebook") # notebook (default), talk, paper, poster
# sns.despine(left=True, offset=10, trim=True)
# sns.despine(fig=None, ax=None, top=False, right=False, left=False, bottom=False, offset=None, trim=False)
# sns.despine(bottom = True, left = False)
# sns.color_palette("rocket")
# sns.set(rc={'axes.facecolor': 'gold', 'figure.facecolor': 'white'})
# sns.set(rc={'figure.facecolor': 'gold'})
fig, ax1 = plt.subplots(figsize=(12, 6))
# plt.tight_layout()
# plt.grid(axis="x")
#
# Price: High, Low, Close
#
# Fill area between high and low
plt.fill_between(df.timestamp, df.low, df.high, step="mid", lw=0.0, facecolor='skyblue', alpha=.4) # edgecolor='red',
# Close price
sns.lineplot(data=df, x="timestamp", y="close", drawstyle='steps-mid', lw=.5, color='blue', ax=ax1)
# sns.pointplot(data=df, x="timestamp", y="close", color='darkblue', ax=ax1)
#
# Transactions (optional): buy or sell triangles over price curve
#
# Slightly move the buy/sell triangles so that they exactly point to the price by their corner
# Yet, it depends on the price scale (varies for different assets), so set to 0 before a better solution is found
triangle_adj = 0
df["close_buy_adj"] = df["close"] - triangle_adj
df["close_sell_adj"] = df["close"] + triangle_adj
if buy_signal_column:
sns.lineplot(data=df[df[buy_signal_column] == True], x="timestamp", y="close_buy_adj", lw=0, markerfacecolor="green", markersize=10, marker="^", alpha=0.6, ax=ax1)
if sell_signal_column:
sns.lineplot(data=df[df[sell_signal_column] == True], x="timestamp", y="close_sell_adj", lw=0, markerfacecolor="red", markersize=10, marker="v", alpha=0.6, ax=ax1)
# g2.set(yticklabels=[])
# g2.set(title='Penguins: Body Mass by Species for Gender')
ax1.set(xlabel=None) # remove the x-axis label
# g2.set(ylabel=None) # remove the y-axis label
ax1.set_ylabel('Close price', color='blue', fontsize=16)
# g2.tick_params(left=False) # remove the ticks
ymin = df['low'].min()
ymax = df['high'].max()
ax1.set(ylim=(ymin - (ymax - ymin) * 0.05, ymax + (ymax - ymin) * 0.005))
# ax1.set_frame_on(False)
# ax1.xaxis.set_major_locator(mdates.DayLocator())
ax1.xaxis.set_major_formatter(mdates.DateFormatter("%H:%M")) # "%H:%M:%S" "%d %b"
ax1.tick_params(axis="x", rotation=90)
ax1.xaxis.grid(True)
#
# Indicator line
#
if not isinstance(score_column, list):
score_column = [score_column]
main_score_column = score_column[0] if len(score_column) > 0 else None
if main_score_column and main_score_column in df.columns:
ax2 = ax1.twinx()
ymax = max(df[main_score_column].abs().max(), max(thresholds) if thresholds else 0.0)
ax2.set(ylim=(-ymax * 1.2, +ymax * 1.2))
# ax2.set_frame_on(False)
ax2.xaxis.grid(True)
# ax2.axhline(0.0, lw=.1, color="black")
# Draw horizontal threshold lines
for threshold in thresholds:
ax2.axhline(threshold, lw=3.0, color="lightgray")
# Draw secondary score columns if any
for i, sec_score_col in enumerate(score_column):
if i == 0:
continue
sns.lineplot(data=df, x="timestamp", y=sec_score_col, drawstyle='default', lw=1.0, color="violet", ax=ax2) # marker="v" "^" , markersize=12
# Primary score
# ax2.plot(x, y1, 'o-', color="red" )
sns.lineplot(data=df, x="timestamp", y=main_score_column, drawstyle='steps-mid', lw=1.0, color="red", ax=ax2) # marker="v" "^" , markersize=12
ax2.set_ylabel('Intelligent Indicator', color='r', fontsize=16)
# ax2.set_ylabel('Score', color='b')
# fig.suptitle("My figtitle", fontsize=14) # Positioned higher
# plt.title('Weekly: $\\bf{S&P 500}$', fontsize=16) # , weight='bold' or MathText
plt.title(title, fontsize=14)
# ax1.set_title('My Title')
# plt.show()
return fig