intelligent-trading-bot/service/notifier.py
2023-09-30 12:58:45 +02:00

471 行
18 KiB
Python

import os
import sys
from datetime import timedelta, datetime
import asyncio
import pandas as pd
import requests
from service.App import *
from common.utils import *
import logging
log = logging.getLogger('notifier')
logging.getLogger('PIL').setLevel(logging.WARNING)
logging.getLogger('matplotlib').setLevel(logging.WARNING)
transaction_file = Path("transactions.txt")
async def send_signal_message():
symbol = App.config["symbol"]
status = App.status
signal = App.signal
signal_side = signal.get("side")
close_price = signal.get('close_price')
trade_scores = signal.get('trade_score')
trade_score_primary = trade_scores[0]
trade_score_secondary = trade_scores[1] if len(trade_scores) > 1 else None
close_time = signal.get('close_time')
model = App.config["signal_model"]
buy_signal_threshold = model.get("parameters", {}).get("buy_signal_threshold", 0)
sell_signal_threshold = model.get("parameters", {}).get("sell_signal_threshold", 0)
buy_notify_threshold = model.get("notification", {}).get("buy_notify_threshold", 0)
sell_notify_threshold = model.get("notification", {}).get("sell_notify_threshold", 0)
trade_icon_step = model.get("notification", {}).get("trade_icon_step", 0.1)
notify_frequency_minutes = model.get("notification", {}).get("notify_frequency_minutes", 1)
# Crypto Currency Symbols: https://github.com/yonilevy/crypto-currency-symbols
if symbol == "BTCUSDT":
symbol_char = ""
elif symbol == "ETHUSDT":
symbol_char = "Ξ"
else:
symbol_char = symbol
# Message to be sent depends on the availability and direction of signal
# Icons:
# DOWN: 📉, ⬇ ⬇️↘️🔽 🔴 (red), 🟥, ▼ (red), ↘ (red)
# UP: 📈, ⬆, ⬆️ ↗️🔼 🟢 (green), 🟩, ▲ (green), ↗ (green)
# ✅ 🔹 (blue) 📌 🔸 (orange)
message = ""
primary_score_str = f"{trade_score_primary:+.2f}"
secondary_score_str = f"{trade_score_secondary:+.2f}" if trade_score_secondary is not None else ''
if signal_side == "BUY":
score_steps = (np.abs(trade_score_primary - buy_signal_threshold) // trade_icon_step) if trade_icon_step else 0
message = "🟢"*int(score_steps+1) + f" *BUY: {symbol_char} {int(close_price):,} Score: {primary_score_str}* {secondary_score_str}"
elif signal_side == "SELL":
score_steps = (np.abs(trade_score_primary - sell_signal_threshold) // trade_icon_step) if trade_icon_step else 0
message = "🔴"*int(score_steps+1) + f" *SELL: {symbol_char} {int(close_price):,} Score: {primary_score_str}* {secondary_score_str}"
elif trade_score_primary >= 0:
message = f"{symbol_char} {int(close_price):,} 📈{primary_score_str} {secondary_score_str}"
else:
message = f"{symbol_char} {int(close_price):,} 📉{primary_score_str} {secondary_score_str}"
message = message.replace("+", "%2B") # For Telegram to display plus sign
#
# To send or not to send (for example, to avoid to frequent insignificant messages)
#
if not message:
return
# Too small score according to notification ranges
if trade_score_primary < buy_notify_threshold and trade_score_primary > sell_notify_threshold:
return
# If corresponds to desired frequency then send. Otherwise, check other conditions
if (close_time.minute % notify_frequency_minutes) != 0:
# Within internal, send only signals and only one time
if signal_side not in ["BUY", "SELL"]:
return
# TODO: Check if it is first time
# Send if not already sent, that is, only if just crossed the buy/sell threshold
# We need to store somewhere the current band: lower, middle/None, high
pass
#
# Send notification
#
bot_token = App.config["telegram_bot_token"]
chat_id = App.config["telegram_chat_id"]
try:
url = 'https://api.telegram.org/bot' + bot_token + '/sendMessage?chat_id=' + chat_id + '&parse_mode=markdown&text=' + message
response = requests.get(url)
response_json = response.json()
if not response_json.get('ok'):
log.error(f"Error sending notification.")
except Exception as e:
log.error(f"Error sending notification: {e}")
async def send_transaction_message(transaction):
profit, profit_percent, profit_descr, profit_percent_descr = await generate_transaction_stats()
if transaction.get("status") == "SELL":
message = "⚡💰 *SOLD: "
elif transaction.get("status") == "BUY":
message = "⚡💰 *BOUGHT: "
else:
log.error(f"ERROR: Should not happen")
message += f" Profit: {profit_percent:.2f}% {profit:.2f}₮*"
bot_token = App.config["telegram_bot_token"]
chat_id = App.config["telegram_chat_id"]
try:
url = 'https://api.telegram.org/bot' + bot_token + '/sendMessage?chat_id=' + chat_id + '&parse_mode=markdown&text=' + message
response = requests.get(url)
response_json = response.json()
if not response_json.get('ok'):
log.error(f"Error sending notification.")
except Exception as e:
log.error(f"Error sending notification: {e}")
#
# Send stats about previous transactions (including this one)
#
if transaction.get("status") == "SELL":
message = "↗ *LONG transactions stats (4 weeks)*\n"
elif transaction.get("status") == "BUY":
message = "↘ *SHORT transactions stats (4 weeks)*\n"
else:
log.error(f"ERROR: Should not happen")
message += f"🔸sum={profit_percent_descr['count'] * profit_percent_descr['mean']:.2f}% 🔸count={int(profit_percent_descr['count'])}\n"
message += f"🔸mean={profit_percent_descr['mean']:.2f}% 🔸std={profit_percent_descr['std']:.2f}%\n"
message += f"🔸min={profit_percent_descr['min']:.2f}% 🔸median={profit_percent_descr['50%']:.2f}% 🔸max={profit_percent_descr['max']:.2f}%\n"
try:
url = 'https://api.telegram.org/bot' + bot_token + '/sendMessage?chat_id=' + chat_id + '&parse_mode=markdown&text=' + message
response = requests.get(url)
response_json = response.json()
if not response_json.get('ok'):
log.error(f"Error sending notification.")
except Exception as e:
log.error(f"Error sending notification: {e}")
async def simulate_trade():
"""
Very simple trade strategy where we only buy and sell using the whole available amount
"""
symbol = App.config["symbol"]
status = App.status
signal = App.signal
signal_side = signal.get("side")
close_price = signal.get('close_price')
buy_score = signal.get('buy_score')
sell_score = signal.get('sell_score')
close_time = signal.get('close_time')
# Previous transaction: BUY (we are currently selling) or SELL (we are currently buying)
t_status = App.transaction.get("status")
t_price = App.transaction.get("price")
if signal_side == "BUY" and (not t_status or t_status == "SELL"):
profit = t_price - close_price if t_price else 0.0
t_dict = dict(timestamp=str(close_time), price=close_price, profit=profit, status="BUY")
elif signal_side == "SELL" and (not t_status or t_status == "BUY"):
profit = close_price - t_price if t_price else 0.0
t_dict = dict(timestamp=str(close_time), price=close_price, profit=profit, status="SELL")
else:
return None
# Save this transaction
App.transaction = t_dict
with open(transaction_file, 'a+') as f:
f.write(",".join([f"{v:.2f}" if isinstance(v, float) else str(v) for v in t_dict.values()]) + "\n")
return t_dict
async def generate_transaction_stats():
"""Here we assume that the latest transaction is saved in the file and this function computes various properties."""
df = pd.read_csv(transaction_file, parse_dates=[0], header=None, names=["timestamp", "close", "profit", "status"], date_format="ISO8601")
mask = (df['timestamp'] >= (datetime.now() - timedelta(weeks=4)))
df = df[max(mask.idxmax()-1, 0):] # We add one previous row to use the previous close
df["prev_close"] = df["close"].shift()
df["profit_percent"] = df.apply(lambda x: 100.0*x["profit"]/x["prev_close"], axis=1)
df = df.iloc[1:] # Remove the first row which was added to compute relative profit
long_df = df[df["status"] == "SELL"]
short_df = df[df["status"] == "BUY"]
#
# Determine properties of the latest transaction
#
# Sample output:
# BTC, LONG or SHORT
# sell price 24,000 (now), buy price (datetime) 23,000
# profit abs: 1,000.00,
# profit rel: 3.21%
last_transaction = df.iloc[-1]
transaction_dt = last_transaction["timestamp"]
transaction_type = last_transaction["status"]
profit = last_transaction["profit"]
profit_percent = last_transaction["profit_percent"]
#
# Properties of last period of trade
#
if transaction_type == "SELL":
df2 = long_df
elif transaction_type == "BUY":
df2 = short_df
# Sample output for abs profit
# sum 1,200.00, mean 400.00, median 450.00, std 250.00, min -300.0, max 1200.00
profit_sum = df2["profit"].sum()
profit_descr = df2["profit"].describe() # count, mean, std, min, 50% max
profit_percent_sum = df2["profit_percent"].sum()
profit_percent_descr = df2["profit_percent"].describe() # count, mean, std, min, 50% max
return profit, profit_percent, profit_descr, profit_percent_descr
async def send_diagram(freq, nrows):
"""
Produce a line chart based on latest data and send it to the channel.
:param freq: Aggregation interval 'H' - hour.
:param nrows: Time range (x axis) of the diagram, for example, 1 week 168 hours, 2 weeks 336 hours
"""
model = App.config["signal_model"]
buy_signal_threshold = model.get("parameters", {}).get("buy_signal_threshold", 0)
sell_signal_threshold = model.get("parameters", {}).get("sell_signal_threshold", 0)
#
# Prepare data to be visualized
#
# Get main df with high, low, close for the symbol.
df_ohlc = App.feature_df[['open', 'high', 'low', 'close']]
df_ohlc = resample_ohlc_data(df_ohlc.reset_index(), freq, nrows, buy_signal_column=None, sell_signal_column=None)
# Get transaction data.
df_t = load_all_transactions() # timestamp,price,profit,status
df_t['buy_long'] = df_t['status'].apply(lambda x: True if isinstance(x, str) and x == 'BUY' else False)
df_t['sell_long'] = df_t['status'].apply(lambda x: True if isinstance(x, str) and x == 'SELL' else False)
df_t = df_t[df_t.timestamp >= df_ohlc.timestamp.min()] # select only transactions for the last time
transactions_exist = len(df_t) > 0
if transactions_exist:
df_t = resample_transaction_data(df_t, freq, 0, 'buy_long', 'sell_long')
else:
df_t = None
# Merge because we need signals along with close price in one df
if transactions_exist:
df = df_ohlc.merge(df_t, how='left', left_on='timestamp', right_on='timestamp')
else:
df = df_ohlc
# Load score
score_exists = False
symbol = App.config["symbol"]
title = f"$\\bf{{{symbol}}}$"
description = App.config.get("description", "")
if description:
title += ": " + description
fig = generate_chart(
df, title,
buy_signal_column="buy_long" if transactions_exist else None,
sell_signal_column="sell_long" if transactions_exist else None,
score_column="score" if score_exists else None,
thresholds=[buy_signal_threshold, sell_signal_threshold]
)
import io
with io.BytesIO() as buf:
fig.savefig(buf, format='png') # Convert and save in buffer
im_bytes = buf.getvalue() # Get complete content (while read returns from current position)
img_data = im_bytes
#
# Send image
#
bot_token = App.config["telegram_bot_token"]
chat_id = App.config["telegram_chat_id"]
files = {'photo': img_data}
payload = {
'chat_id': chat_id,
'caption': f"", # Currently no text
'parse_mode': 'markdown'
}
try:
url = 'https://api.telegram.org/bot' + bot_token + '/sendPhoto'
req = requests.post(url=url, data=payload, files=files)
response = req.json()
except Exception as e:
log.error(f"Error sending notification: {e}")
def resample_ohlc_data(df, freq, nrows, buy_signal_column, sell_signal_column):
"""
Resample ohlc data to lower frequency. Assumption: time in 'timestamp' column.
"""
# Aggregation functions
ohlc = {
'timestamp': 'first', # It will be in index
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last',
}
# These score columns are optional
if buy_signal_column:
# Buy signal if at least one buy signal was during this time interval
ohlc[buy_signal_column]: lambda x: True if not all(x == False) else False
# Alternatively, 0 if no buy signals, 1 if only 1 buy signal, 2 or -1 if more than 1 any signals (mixed interval)
if sell_signal_column:
# Sell signal if at least one sell signal was during this time interval
ohlc[sell_signal_column]: lambda x: True if not all(x == False) else False
df_out = df.resample(freq, on='timestamp').apply(ohlc)
del df_out['timestamp']
df_out.reset_index(inplace=True)
if nrows:
df_out = df_out.tail(nrows)
return df_out
def resample_transaction_data(df, freq, nrows, buy_signal_column, sell_signal_column):
"""
Given a list of transactions with arbitrary timestamps,
return a regular time series with True or False for the rows with transactions
Assumption: time in 'timestamp' column
PROBLEM: only one transaction per interval (1 hour) is possible so if we buy and then sell within one hour then we cannot represent this
Solution 1: remove
Solution 2: introduce a special symbol (like dot instead of arrows) which denotes one or more transactions - essentially error or inability to visualize
1 week 7*1440=10080 points, 5 min - 2016 points, 10 mins - 1008 points
"""
# Aggregation functions
transactions = {
'timestamp': 'first', # It will be in index
buy_signal_column: lambda x: True if not all(x == False) else False,
sell_signal_column: lambda x: True if not all(x == False) else False,
}
df_out = df.resample(freq, on='timestamp').apply(transactions)
del df_out['timestamp']
df_out.reset_index(inplace=True)
if nrows:
df_out = df_out.tail(nrows)
return df_out
def generate_chart(df, title, buy_signal_column, sell_signal_column, score_column, thresholds: list):
"""
All columns in one input df with desired length and desired freq
Visualize columns 1 (pre-defined): high, low, close
Visualize columns 1 (via parameters): buy_signal_column, sell_signal_column
Visualize columns 2: score_column (optional) - in [-1, +1]
Visualize columns 2: Threshold lines (as many as there are values in the list)
"""
from matplotlib import pyplot as plt
import matplotlib.dates as mdates
import seaborn as sns
# List of colors: https://matplotlib.org/stable/gallery/color/named_colors.html
sns.set_style('white') # darkgrid, whitegrid, dark, white, ticks
#sns.color_palette("rocket")
#sns.set(rc={'axes.facecolor': 'gold', 'figure.facecolor': 'white'})
#sns.set(rc={'figure.facecolor': 'gold'})
fig, ax1 = plt.subplots(figsize=(12, 6))
# plt.tight_layout()
# === High, Low, Close
# Fill area between high and low
plt.fill_between(df.timestamp, df.low, df.high, step="mid", lw=0.0, facecolor='skyblue', alpha=.4) # edgecolor='red',
# Draw close price
sns.lineplot(data=df, x="timestamp", y="close", drawstyle='steps-mid', lw=.5, color='darkblue', ax=ax1)
# Buy/sell markters (list of timestamps)
# buy_df = df[df.buy_transaction]
# sell_df = df[df.sell_transaction]
# === Transactions
triangle_adj = 15
df["close_buy_adj"] = df["close"] - triangle_adj
df["close_sell_adj"] = df["close"] + triangle_adj
# markersize=6, markerfacecolor='blue'
sns.lineplot(data=df[df[buy_signal_column] == True], x="timestamp", y="close_buy_adj", lw=0, markerfacecolor="green", markersize=10, marker="^", alpha=0.6, ax=ax1)
sns.lineplot(data=df[df[sell_signal_column] == True], x="timestamp", y="close_sell_adj", lw=0, markerfacecolor="red", markersize=10, marker="v", alpha=0.6, ax=ax1)
# g2.set(yticklabels=[])
# g2.set(title='Penguins: Body Mass by Species for Gender')
ax1.set(xlabel=None) # remove the x-axis label
# g2.set(ylabel=None) # remove the y-axis label
ax1.set_ylabel('Close price', color='darkblue')
# g2.tick_params(left=False) # remove the ticks
min = df['low'].min()
max = df['high'].max()
ax1.set(ylim=(min - (max - min) * 0.05, max + (max - min) * 0.005))
ax1.xaxis.set_major_locator(mdates.DayLocator())
ax1.xaxis.set_major_formatter(mdates.DateFormatter("%d")) # "%H:%M:%S" "%d %b"
ax1.tick_params(axis="x", rotation=0)
#ax1.xaxis.grid(True)
# === Score
if score_column and score_column in df.columns:
ax2 = ax1.twinx()
# ax2.plot(x, y1, 'o-', color="red" )
sns.lineplot(data=df, x="timestamp", y=score_column, drawstyle='steps-mid', lw=.2, color="red", ax=ax2) # marker="v" "^" , markersize=12
ax2.set_ylabel('Score', color='r')
ax2.set_ylabel('Score', color='b')
ax2.set(ylim=(-0.5, +3.0))
ax2.axhline(0.0, lw=.1, color="black")
for threshold in thresholds:
ax2.axhline(threshold, lw=.1, color="red")
ax2.axhline(threshold, lw=.1, color="red")
# fig.suptitle("My figtitle", fontsize=14) # Positioned higher
# plt.title('Weekly: $\\bf{S&P 500}$', fontsize=16) # , weight='bold' or MathText
plt.title(title, fontsize=14)
# ax1.set_title('My Title')
# plt.show()
return fig
if __name__ == '__main__':
pass