镜像自地址
https://github.com/asavinov/intelligent-trading-bot.git
已同步 2026-05-04 08:26:19 +00:00
471 行
18 KiB
Python
471 行
18 KiB
Python
import os
|
|
import sys
|
|
from datetime import timedelta, datetime
|
|
|
|
import asyncio
|
|
|
|
import pandas as pd
|
|
import requests
|
|
|
|
from service.App import *
|
|
from common.utils import *
|
|
|
|
import logging
|
|
log = logging.getLogger('notifier')
|
|
|
|
logging.getLogger('PIL').setLevel(logging.WARNING)
|
|
logging.getLogger('matplotlib').setLevel(logging.WARNING)
|
|
|
|
transaction_file = Path("transactions.txt")
|
|
|
|
|
|
async def send_signal_message():
|
|
symbol = App.config["symbol"]
|
|
|
|
status = App.status
|
|
signal = App.signal
|
|
signal_side = signal.get("side")
|
|
close_price = signal.get('close_price')
|
|
trade_scores = signal.get('trade_score')
|
|
trade_score_primary = trade_scores[0]
|
|
trade_score_secondary = trade_scores[1] if len(trade_scores) > 1 else None
|
|
close_time = signal.get('close_time')
|
|
|
|
model = App.config["signal_model"]
|
|
|
|
buy_signal_threshold = model.get("parameters", {}).get("buy_signal_threshold", 0)
|
|
sell_signal_threshold = model.get("parameters", {}).get("sell_signal_threshold", 0)
|
|
|
|
buy_notify_threshold = model.get("notification", {}).get("buy_notify_threshold", 0)
|
|
sell_notify_threshold = model.get("notification", {}).get("sell_notify_threshold", 0)
|
|
trade_icon_step = model.get("notification", {}).get("trade_icon_step", 0.1)
|
|
notify_frequency_minutes = model.get("notification", {}).get("notify_frequency_minutes", 1)
|
|
|
|
# Crypto Currency Symbols: https://github.com/yonilevy/crypto-currency-symbols
|
|
if symbol == "BTCUSDT":
|
|
symbol_char = "₿"
|
|
elif symbol == "ETHUSDT":
|
|
symbol_char = "Ξ"
|
|
else:
|
|
symbol_char = symbol
|
|
|
|
# Message to be sent depends on the availability and direction of signal
|
|
# Icons:
|
|
# DOWN: 📉, ⬇ ⬇️↘️🔽 🔴 (red), 🟥, ▼ (red), ↘ (red)
|
|
# UP: 📈, ⬆, ⬆️ ↗️🔼 🟢 (green), 🟩, ▲ (green), ↗ (green)
|
|
# ✅ 🔹 (blue) 📌 🔸 (orange)
|
|
message = ""
|
|
primary_score_str = f"{trade_score_primary:+.2f}"
|
|
secondary_score_str = f"{trade_score_secondary:+.2f}" if trade_score_secondary is not None else ''
|
|
if signal_side == "BUY":
|
|
score_steps = (np.abs(trade_score_primary - buy_signal_threshold) // trade_icon_step) if trade_icon_step else 0
|
|
message = "🟢"*int(score_steps+1) + f" *BUY: {symbol_char} {int(close_price):,} Score: {primary_score_str}* {secondary_score_str}"
|
|
elif signal_side == "SELL":
|
|
score_steps = (np.abs(trade_score_primary - sell_signal_threshold) // trade_icon_step) if trade_icon_step else 0
|
|
message = "🔴"*int(score_steps+1) + f" *SELL: {symbol_char} {int(close_price):,} Score: {primary_score_str}* {secondary_score_str}"
|
|
elif trade_score_primary >= 0:
|
|
message = f"{symbol_char} {int(close_price):,} 📈{primary_score_str} {secondary_score_str}"
|
|
else:
|
|
message = f"{symbol_char} {int(close_price):,} 📉{primary_score_str} {secondary_score_str}"
|
|
message = message.replace("+", "%2B") # For Telegram to display plus sign
|
|
|
|
#
|
|
# To send or not to send (for example, to avoid to frequent insignificant messages)
|
|
#
|
|
if not message:
|
|
return
|
|
|
|
# Too small score according to notification ranges
|
|
if trade_score_primary < buy_notify_threshold and trade_score_primary > sell_notify_threshold:
|
|
return
|
|
|
|
# If corresponds to desired frequency then send. Otherwise, check other conditions
|
|
if (close_time.minute % notify_frequency_minutes) != 0:
|
|
# Within internal, send only signals and only one time
|
|
if signal_side not in ["BUY", "SELL"]:
|
|
return
|
|
|
|
# TODO: Check if it is first time
|
|
# Send if not already sent, that is, only if just crossed the buy/sell threshold
|
|
# We need to store somewhere the current band: lower, middle/None, high
|
|
pass
|
|
|
|
#
|
|
# Send notification
|
|
#
|
|
bot_token = App.config["telegram_bot_token"]
|
|
chat_id = App.config["telegram_chat_id"]
|
|
|
|
try:
|
|
url = 'https://api.telegram.org/bot' + bot_token + '/sendMessage?chat_id=' + chat_id + '&parse_mode=markdown&text=' + message
|
|
response = requests.get(url)
|
|
response_json = response.json()
|
|
if not response_json.get('ok'):
|
|
log.error(f"Error sending notification.")
|
|
except Exception as e:
|
|
log.error(f"Error sending notification: {e}")
|
|
|
|
|
|
async def send_transaction_message(transaction):
|
|
|
|
profit, profit_percent, profit_descr, profit_percent_descr = await generate_transaction_stats()
|
|
|
|
if transaction.get("status") == "SELL":
|
|
message = "⚡💰 *SOLD: "
|
|
elif transaction.get("status") == "BUY":
|
|
message = "⚡💰 *BOUGHT: "
|
|
else:
|
|
log.error(f"ERROR: Should not happen")
|
|
|
|
message += f" Profit: {profit_percent:.2f}% {profit:.2f}₮*"
|
|
|
|
bot_token = App.config["telegram_bot_token"]
|
|
chat_id = App.config["telegram_chat_id"]
|
|
try:
|
|
url = 'https://api.telegram.org/bot' + bot_token + '/sendMessage?chat_id=' + chat_id + '&parse_mode=markdown&text=' + message
|
|
response = requests.get(url)
|
|
response_json = response.json()
|
|
if not response_json.get('ok'):
|
|
log.error(f"Error sending notification.")
|
|
except Exception as e:
|
|
log.error(f"Error sending notification: {e}")
|
|
|
|
#
|
|
# Send stats about previous transactions (including this one)
|
|
#
|
|
if transaction.get("status") == "SELL":
|
|
message = "↗ *LONG transactions stats (4 weeks)*\n"
|
|
elif transaction.get("status") == "BUY":
|
|
message = "↘ *SHORT transactions stats (4 weeks)*\n"
|
|
else:
|
|
log.error(f"ERROR: Should not happen")
|
|
|
|
message += f"🔸sum={profit_percent_descr['count'] * profit_percent_descr['mean']:.2f}% 🔸count={int(profit_percent_descr['count'])}\n"
|
|
message += f"🔸mean={profit_percent_descr['mean']:.2f}% 🔸std={profit_percent_descr['std']:.2f}%\n"
|
|
message += f"🔸min={profit_percent_descr['min']:.2f}% 🔸median={profit_percent_descr['50%']:.2f}% 🔸max={profit_percent_descr['max']:.2f}%\n"
|
|
|
|
try:
|
|
url = 'https://api.telegram.org/bot' + bot_token + '/sendMessage?chat_id=' + chat_id + '&parse_mode=markdown&text=' + message
|
|
response = requests.get(url)
|
|
response_json = response.json()
|
|
if not response_json.get('ok'):
|
|
log.error(f"Error sending notification.")
|
|
except Exception as e:
|
|
log.error(f"Error sending notification: {e}")
|
|
|
|
|
|
async def simulate_trade():
|
|
"""
|
|
Very simple trade strategy where we only buy and sell using the whole available amount
|
|
"""
|
|
symbol = App.config["symbol"]
|
|
|
|
status = App.status
|
|
signal = App.signal
|
|
signal_side = signal.get("side")
|
|
close_price = signal.get('close_price')
|
|
buy_score = signal.get('buy_score')
|
|
sell_score = signal.get('sell_score')
|
|
close_time = signal.get('close_time')
|
|
|
|
# Previous transaction: BUY (we are currently selling) or SELL (we are currently buying)
|
|
t_status = App.transaction.get("status")
|
|
t_price = App.transaction.get("price")
|
|
if signal_side == "BUY" and (not t_status or t_status == "SELL"):
|
|
profit = t_price - close_price if t_price else 0.0
|
|
t_dict = dict(timestamp=str(close_time), price=close_price, profit=profit, status="BUY")
|
|
elif signal_side == "SELL" and (not t_status or t_status == "BUY"):
|
|
profit = close_price - t_price if t_price else 0.0
|
|
t_dict = dict(timestamp=str(close_time), price=close_price, profit=profit, status="SELL")
|
|
else:
|
|
return None
|
|
|
|
# Save this transaction
|
|
App.transaction = t_dict
|
|
with open(transaction_file, 'a+') as f:
|
|
f.write(",".join([f"{v:.2f}" if isinstance(v, float) else str(v) for v in t_dict.values()]) + "\n")
|
|
|
|
return t_dict
|
|
|
|
|
|
async def generate_transaction_stats():
|
|
"""Here we assume that the latest transaction is saved in the file and this function computes various properties."""
|
|
|
|
df = pd.read_csv(transaction_file, parse_dates=[0], header=None, names=["timestamp", "close", "profit", "status"], date_format="ISO8601")
|
|
|
|
mask = (df['timestamp'] >= (datetime.now() - timedelta(weeks=4)))
|
|
df = df[max(mask.idxmax()-1, 0):] # We add one previous row to use the previous close
|
|
|
|
df["prev_close"] = df["close"].shift()
|
|
df["profit_percent"] = df.apply(lambda x: 100.0*x["profit"]/x["prev_close"], axis=1)
|
|
|
|
df = df.iloc[1:] # Remove the first row which was added to compute relative profit
|
|
|
|
long_df = df[df["status"] == "SELL"]
|
|
short_df = df[df["status"] == "BUY"]
|
|
|
|
#
|
|
# Determine properties of the latest transaction
|
|
#
|
|
|
|
# Sample output:
|
|
# BTC, LONG or SHORT
|
|
# sell price 24,000 (now), buy price (datetime) 23,000
|
|
# profit abs: 1,000.00,
|
|
# profit rel: 3.21%
|
|
|
|
last_transaction = df.iloc[-1]
|
|
transaction_dt = last_transaction["timestamp"]
|
|
transaction_type = last_transaction["status"]
|
|
profit = last_transaction["profit"]
|
|
profit_percent = last_transaction["profit_percent"]
|
|
|
|
#
|
|
# Properties of last period of trade
|
|
#
|
|
|
|
if transaction_type == "SELL":
|
|
df2 = long_df
|
|
elif transaction_type == "BUY":
|
|
df2 = short_df
|
|
|
|
# Sample output for abs profit
|
|
# sum 1,200.00, mean 400.00, median 450.00, std 250.00, min -300.0, max 1200.00
|
|
|
|
profit_sum = df2["profit"].sum()
|
|
profit_descr = df2["profit"].describe() # count, mean, std, min, 50% max
|
|
|
|
profit_percent_sum = df2["profit_percent"].sum()
|
|
profit_percent_descr = df2["profit_percent"].describe() # count, mean, std, min, 50% max
|
|
|
|
return profit, profit_percent, profit_descr, profit_percent_descr
|
|
|
|
|
|
async def send_diagram(freq, nrows):
|
|
"""
|
|
Produce a line chart based on latest data and send it to the channel.
|
|
|
|
:param freq: Aggregation interval 'H' - hour.
|
|
:param nrows: Time range (x axis) of the diagram, for example, 1 week 168 hours, 2 weeks 336 hours
|
|
"""
|
|
model = App.config["signal_model"]
|
|
|
|
buy_signal_threshold = model.get("parameters", {}).get("buy_signal_threshold", 0)
|
|
sell_signal_threshold = model.get("parameters", {}).get("sell_signal_threshold", 0)
|
|
|
|
#
|
|
# Prepare data to be visualized
|
|
#
|
|
# Get main df with high, low, close for the symbol.
|
|
df_ohlc = App.feature_df[['open', 'high', 'low', 'close']]
|
|
df_ohlc = resample_ohlc_data(df_ohlc.reset_index(), freq, nrows, buy_signal_column=None, sell_signal_column=None)
|
|
|
|
# Get transaction data.
|
|
df_t = load_all_transactions() # timestamp,price,profit,status
|
|
df_t['buy_long'] = df_t['status'].apply(lambda x: True if isinstance(x, str) and x == 'BUY' else False)
|
|
df_t['sell_long'] = df_t['status'].apply(lambda x: True if isinstance(x, str) and x == 'SELL' else False)
|
|
df_t = df_t[df_t.timestamp >= df_ohlc.timestamp.min()] # select only transactions for the last time
|
|
transactions_exist = len(df_t) > 0
|
|
|
|
if transactions_exist:
|
|
df_t = resample_transaction_data(df_t, freq, 0, 'buy_long', 'sell_long')
|
|
else:
|
|
df_t = None
|
|
|
|
# Merge because we need signals along with close price in one df
|
|
if transactions_exist:
|
|
df = df_ohlc.merge(df_t, how='left', left_on='timestamp', right_on='timestamp')
|
|
else:
|
|
df = df_ohlc
|
|
|
|
# Load score
|
|
score_exists = False
|
|
|
|
symbol = App.config["symbol"]
|
|
title = f"$\\bf{{{symbol}}}$"
|
|
|
|
description = App.config.get("description", "")
|
|
if description:
|
|
title += ": " + description
|
|
|
|
fig = generate_chart(
|
|
df, title,
|
|
buy_signal_column="buy_long" if transactions_exist else None,
|
|
sell_signal_column="sell_long" if transactions_exist else None,
|
|
score_column="score" if score_exists else None,
|
|
thresholds=[buy_signal_threshold, sell_signal_threshold]
|
|
)
|
|
|
|
import io
|
|
with io.BytesIO() as buf:
|
|
fig.savefig(buf, format='png') # Convert and save in buffer
|
|
im_bytes = buf.getvalue() # Get complete content (while read returns from current position)
|
|
img_data = im_bytes
|
|
|
|
#
|
|
# Send image
|
|
#
|
|
bot_token = App.config["telegram_bot_token"]
|
|
chat_id = App.config["telegram_chat_id"]
|
|
|
|
files = {'photo': img_data}
|
|
payload = {
|
|
'chat_id': chat_id,
|
|
'caption': f"", # Currently no text
|
|
'parse_mode': 'markdown'
|
|
}
|
|
|
|
try:
|
|
url = 'https://api.telegram.org/bot' + bot_token + '/sendPhoto'
|
|
req = requests.post(url=url, data=payload, files=files)
|
|
response = req.json()
|
|
except Exception as e:
|
|
log.error(f"Error sending notification: {e}")
|
|
|
|
|
|
def resample_ohlc_data(df, freq, nrows, buy_signal_column, sell_signal_column):
|
|
"""
|
|
Resample ohlc data to lower frequency. Assumption: time in 'timestamp' column.
|
|
"""
|
|
# Aggregation functions
|
|
ohlc = {
|
|
'timestamp': 'first', # It will be in index
|
|
'open': 'first',
|
|
'high': 'max',
|
|
'low': 'min',
|
|
'close': 'last',
|
|
}
|
|
|
|
# These score columns are optional
|
|
if buy_signal_column:
|
|
# Buy signal if at least one buy signal was during this time interval
|
|
ohlc[buy_signal_column]: lambda x: True if not all(x == False) else False
|
|
# Alternatively, 0 if no buy signals, 1 if only 1 buy signal, 2 or -1 if more than 1 any signals (mixed interval)
|
|
if sell_signal_column:
|
|
# Sell signal if at least one sell signal was during this time interval
|
|
ohlc[sell_signal_column]: lambda x: True if not all(x == False) else False
|
|
|
|
df_out = df.resample(freq, on='timestamp').apply(ohlc)
|
|
del df_out['timestamp']
|
|
df_out.reset_index(inplace=True)
|
|
|
|
if nrows:
|
|
df_out = df_out.tail(nrows)
|
|
|
|
return df_out
|
|
|
|
|
|
def resample_transaction_data(df, freq, nrows, buy_signal_column, sell_signal_column):
|
|
"""
|
|
Given a list of transactions with arbitrary timestamps,
|
|
return a regular time series with True or False for the rows with transactions
|
|
Assumption: time in 'timestamp' column
|
|
|
|
PROBLEM: only one transaction per interval (1 hour) is possible so if we buy and then sell within one hour then we cannot represent this
|
|
Solution 1: remove
|
|
Solution 2: introduce a special symbol (like dot instead of arrows) which denotes one or more transactions - essentially error or inability to visualize
|
|
1 week 7*1440=10080 points, 5 min - 2016 points, 10 mins - 1008 points
|
|
"""
|
|
# Aggregation functions
|
|
transactions = {
|
|
'timestamp': 'first', # It will be in index
|
|
buy_signal_column: lambda x: True if not all(x == False) else False,
|
|
sell_signal_column: lambda x: True if not all(x == False) else False,
|
|
}
|
|
|
|
df_out = df.resample(freq, on='timestamp').apply(transactions)
|
|
del df_out['timestamp']
|
|
df_out.reset_index(inplace=True)
|
|
|
|
if nrows:
|
|
df_out = df_out.tail(nrows)
|
|
|
|
return df_out
|
|
|
|
|
|
def generate_chart(df, title, buy_signal_column, sell_signal_column, score_column, thresholds: list):
|
|
"""
|
|
All columns in one input df with desired length and desired freq
|
|
Visualize columns 1 (pre-defined): high, low, close
|
|
Visualize columns 1 (via parameters): buy_signal_column, sell_signal_column
|
|
Visualize columns 2: score_column (optional) - in [-1, +1]
|
|
Visualize columns 2: Threshold lines (as many as there are values in the list)
|
|
"""
|
|
from matplotlib import pyplot as plt
|
|
import matplotlib.dates as mdates
|
|
import seaborn as sns
|
|
|
|
# List of colors: https://matplotlib.org/stable/gallery/color/named_colors.html
|
|
sns.set_style('white') # darkgrid, whitegrid, dark, white, ticks
|
|
#sns.color_palette("rocket")
|
|
#sns.set(rc={'axes.facecolor': 'gold', 'figure.facecolor': 'white'})
|
|
#sns.set(rc={'figure.facecolor': 'gold'})
|
|
|
|
fig, ax1 = plt.subplots(figsize=(12, 6))
|
|
# plt.tight_layout()
|
|
|
|
# === High, Low, Close
|
|
|
|
# Fill area between high and low
|
|
plt.fill_between(df.timestamp, df.low, df.high, step="mid", lw=0.0, facecolor='skyblue', alpha=.4) # edgecolor='red',
|
|
|
|
# Draw close price
|
|
sns.lineplot(data=df, x="timestamp", y="close", drawstyle='steps-mid', lw=.5, color='darkblue', ax=ax1)
|
|
|
|
# Buy/sell markters (list of timestamps)
|
|
# buy_df = df[df.buy_transaction]
|
|
# sell_df = df[df.sell_transaction]
|
|
|
|
# === Transactions
|
|
|
|
triangle_adj = 15
|
|
df["close_buy_adj"] = df["close"] - triangle_adj
|
|
df["close_sell_adj"] = df["close"] + triangle_adj
|
|
|
|
# markersize=6, markerfacecolor='blue'
|
|
sns.lineplot(data=df[df[buy_signal_column] == True], x="timestamp", y="close_buy_adj", lw=0, markerfacecolor="green", markersize=10, marker="^", alpha=0.6, ax=ax1)
|
|
sns.lineplot(data=df[df[sell_signal_column] == True], x="timestamp", y="close_sell_adj", lw=0, markerfacecolor="red", markersize=10, marker="v", alpha=0.6, ax=ax1)
|
|
|
|
# g2.set(yticklabels=[])
|
|
# g2.set(title='Penguins: Body Mass by Species for Gender')
|
|
ax1.set(xlabel=None) # remove the x-axis label
|
|
# g2.set(ylabel=None) # remove the y-axis label
|
|
ax1.set_ylabel('Close price', color='darkblue')
|
|
# g2.tick_params(left=False) # remove the ticks
|
|
min = df['low'].min()
|
|
max = df['high'].max()
|
|
ax1.set(ylim=(min - (max - min) * 0.05, max + (max - min) * 0.005))
|
|
|
|
ax1.xaxis.set_major_locator(mdates.DayLocator())
|
|
ax1.xaxis.set_major_formatter(mdates.DateFormatter("%d")) # "%H:%M:%S" "%d %b"
|
|
ax1.tick_params(axis="x", rotation=0)
|
|
#ax1.xaxis.grid(True)
|
|
|
|
# === Score
|
|
|
|
if score_column and score_column in df.columns:
|
|
ax2 = ax1.twinx()
|
|
# ax2.plot(x, y1, 'o-', color="red" )
|
|
sns.lineplot(data=df, x="timestamp", y=score_column, drawstyle='steps-mid', lw=.2, color="red", ax=ax2) # marker="v" "^" , markersize=12
|
|
ax2.set_ylabel('Score', color='r')
|
|
ax2.set_ylabel('Score', color='b')
|
|
ax2.set(ylim=(-0.5, +3.0))
|
|
|
|
ax2.axhline(0.0, lw=.1, color="black")
|
|
|
|
for threshold in thresholds:
|
|
ax2.axhline(threshold, lw=.1, color="red")
|
|
ax2.axhline(threshold, lw=.1, color="red")
|
|
|
|
# fig.suptitle("My figtitle", fontsize=14) # Positioned higher
|
|
# plt.title('Weekly: $\\bf{S&P 500}$', fontsize=16) # , weight='bold' or MathText
|
|
plt.title(title, fontsize=14)
|
|
# ax1.set_title('My Title')
|
|
|
|
# plt.show()
|
|
|
|
return fig
|
|
|
|
|
|
if __name__ == '__main__':
|
|
pass
|