115 lines
3.9 KiB
Python
115 lines
3.9 KiB
Python
import MetaTrader5 as mt5
|
|
import smtplib
|
|
from email.mime.text import MIMEText
|
|
from email.mime.multipart import MIMEMultipart
|
|
from datetime import datetime
|
|
import json
|
|
import os
|
|
|
|
# -------------------------------------------
|
|
# Load project settings (config.json)
|
|
# -------------------------------------------
|
|
def load_config(filepath="config.json"):
|
|
"""Load configuration settings from JSON."""
|
|
if not os.path.exists(filepath):
|
|
raise FileNotFoundError(f"Config file not found: {filepath}")
|
|
with open(filepath, "r") as f:
|
|
return json.load(f)
|
|
|
|
# -------------------------------------------
|
|
# Initialize MT5 connection
|
|
# -------------------------------------------
|
|
def initialize_mt5(config):
|
|
"""Connect to a specific MT5 terminal using credentials."""
|
|
mt5_path = config["mt5Pathway"]
|
|
login = int(config["username"])
|
|
password = config["password"]
|
|
server = config["server"]
|
|
|
|
if not mt5.initialize(path=mt5_path, login=login, password=password, server=server):
|
|
raise ConnectionError(f"MT5 initialization failed: {mt5.last_error()}")
|
|
|
|
account_info = mt5.account_info()
|
|
if account_info:
|
|
print(f"[MT5] Connected to account {account_info.login} on server {account_info.server}")
|
|
else:
|
|
raise ConnectionError(f"MT5 connected, but account info unavailable: {mt5.last_error()}")
|
|
|
|
# -------------------------------------------
|
|
# Fetch latest tick data
|
|
# -------------------------------------------
|
|
def get_latest_tick(symbol):
|
|
"""Fetch the latest market tick for a given symbol."""
|
|
tick = mt5.symbol_info_tick(symbol)
|
|
if tick is None:
|
|
raise ValueError(f"Failed to get tick for {symbol}: {mt5.last_error()}")
|
|
return tick
|
|
|
|
# -------------------------------------------
|
|
# Send email alert
|
|
# -------------------------------------------
|
|
def send_email_alert(symbol, tick, smtp_config):
|
|
"""Send an email with tick data for a given symbol."""
|
|
# Email message body
|
|
message_body = (
|
|
f"MT5 Market Alert\n\n"
|
|
f"Symbol: {symbol}\n"
|
|
f"Time: {datetime.fromtimestamp(tick.time)}\n"
|
|
f"Bid: {tick.bid}\n"
|
|
f"Ask: {tick.ask}\n"
|
|
)
|
|
|
|
msg = MIMEMultipart()
|
|
msg['From'] = smtp_config["sender_email"]
|
|
msg['To'] = smtp_config["recipient_email"]
|
|
msg['Subject'] = f"MT5 Alert - {symbol} Tick Update"
|
|
msg.attach(MIMEText(message_body, "plain"))
|
|
|
|
try:
|
|
with smtplib.SMTP(smtp_config["smtp_server"], smtp_config["smtp_port"]) as server:
|
|
server.starttls() # Secure the connection
|
|
server.login(smtp_config["sender_email"], smtp_config["sender_password"])
|
|
server.send_message(msg)
|
|
print(f"[EMAIL] Alert sent successfully to {smtp_config['recipient_email']}")
|
|
except Exception as e:
|
|
raise RuntimeError(f"Failed to send email: {e}")
|
|
|
|
# -------------------------------------------
|
|
# Main test function
|
|
# -------------------------------------------
|
|
def main():
|
|
"""Test email alert by fetching tick data and sending it."""
|
|
try:
|
|
# Load config
|
|
config = load_config("config.json")
|
|
|
|
# SMTP configuration (edit these)
|
|
smtp_config = {
|
|
"smtp_server": "smtp.gmail.com",
|
|
"smtp_port": 587,
|
|
"sender_email": "your_email@gmail.com", # replace
|
|
"sender_password": "your_app_password", # Gmail App Password
|
|
"recipient_email": "recipient_email@example.com" # replace
|
|
}
|
|
|
|
# Initialize MT5
|
|
initialize_mt5(config)
|
|
|
|
# Fetch latest tick
|
|
symbol = "EURUSD"
|
|
tick = get_latest_tick(symbol)
|
|
|
|
# Send alert
|
|
send_email_alert(symbol, tick, smtp_config)
|
|
|
|
except Exception as e:
|
|
print(f"[ERROR] {e}")
|
|
finally:
|
|
mt5.shutdown()
|
|
print("[MT5] Connection closed.")
|
|
|
|
# -------------------------------------------
|
|
# Run directly
|
|
# -------------------------------------------
|
|
if __name__ == "__main__":
|
|
main()
|