mirror of
https://github.com/asavinov/intelligent-trading-bot.git
synced 2026-05-04 16:26:44 +00:00
63 lines
1.8 KiB
Python
63 lines
1.8 KiB
Python
import os
|
|
import sys
|
|
import asyncio
|
|
import requests
|
|
|
|
from service.App import *
|
|
from common.utils import *
|
|
|
|
|
|
async def notify_telegram():
|
|
status = App.status
|
|
signal = App.signal
|
|
notification_threshold = App.config["signaler"]["notification_threshold"]
|
|
symbol = App.config["symbol"]
|
|
base_asset = App.config["base_asset"]
|
|
quote_asset = App.config["quote_asset"]
|
|
close_price = signal.get('close_price')
|
|
|
|
signal_side = signal.get("side")
|
|
score = signal.get('score')
|
|
|
|
# How many steps of the score
|
|
score_step_length = 0.05
|
|
score_steps = np.abs(score) // score_step_length
|
|
|
|
if score_steps < notification_threshold:
|
|
return
|
|
|
|
if score > 0:
|
|
sign = "📈" * int(score_steps - notification_threshold + 1) # 📈 >
|
|
elif score < 0:
|
|
sign = "📉" * int(score_steps - notification_threshold + 1) # 📉 <
|
|
else:
|
|
sign = ""
|
|
|
|
# Crypto Currency Symbols: https://github.com/yonilevy/crypto-currency-symbols
|
|
if base_asset == "BTC":
|
|
symbol_sign = "₿"
|
|
elif base_asset == "ETH":
|
|
symbol_sign = "Ξ"
|
|
else:
|
|
symbol_sign = base_asset
|
|
|
|
message = f"{symbol_sign} {int(close_price):,} {sign} Score: {score:+.2f}"
|
|
message = message.replace("+", "%2B") # For Telegram to display plus sign
|
|
|
|
#if signal_side in ["BUY", "SELL"]:
|
|
# message = f"*{message}. SIGNAL: {signal_side}*"
|
|
|
|
bot_token = App.config["telegram_bot_token"]
|
|
chat_id = App.config["telegram_chat_id"]
|
|
|
|
url = 'https://api.telegram.org/bot' + bot_token + '/sendMessage?chat_id=' + chat_id + '&parse_mode=markdown&text=' + message
|
|
|
|
try:
|
|
response = requests.get(url)
|
|
#response_json = response.json()
|
|
except Exception as e:
|
|
print(f"Error sending notification: {e}")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
pass
|