intelligent-trading-bot/inputs/collector_mt5.py
tradershubspace aa35c304b8 Add Venue type
2025-05-12 18:15:16 +02:00

271 lines
10 KiB
Python

"""
This module handles the collection of market data from MetaTrader 5 (MT5).
It defines tasks for connecting to MT5, requesting historical kline data,
and storing it in the database. It also includes health checks for the MT5 connection.
"""
import time
from datetime import datetime, timedelta
import pandas as pd
import asyncio
import MetaTrader5 as mt5
import pytz
from common.utils import mt5_freq_from_pandas, get_timedelta_for_mt5_timeframe
from service.App import *
from common.utils import *
from service.analyzer import *
from service.mt5 import connect_mt5
import logging
log = logging.getLogger('collector')
async def main_collector_task() -> int:
"""
The main collector task that is executed periodically.
It checks the MT5 connection, synchronizes data, and handles any errors.
Returns:
int: 0 if the task completed successfully, 1 otherwise.
"""
symbol = App.config["symbol"]
pandas_freq = App.config["freq"]
start_ts, end_ts = pandas_get_interval(pandas_freq)
now_ts = now_timestamp()
log.info(f"===> Start collector task. Timestamp {now_ts}. Interval [{start_ts},{end_ts}].")
#
# 0. Check MT5 connection
#
if data_provider_problems_exist():
await data_provider_health_check()
if data_provider_problems_exist():
log.error(f"Problems with the data provider server found. No signaling, no trade. Will try next time.")
return 1
#
# 1. Ensure that we are up-to-date with klines
#
res = await sync_data_collector_task()
if res > 0:
log.error(f"Problem getting data from the server. No signaling, no trade. Will try next time.")
return 1
log.info(f"<=== End collector task.")
return 0
#
# Request/update market data
#
async def sync_data_collector_task() -> int:
"""
Synchronizes the local data state with the latest data from MT5.
This task retrieves the most recent kline data for specified symbols and
stores it in the database.
Returns:
int: 0 if the data was synchronized successfully, 1 otherwise.
Raises:
TimeoutError: If the data request times out.
Exception: If any other error occurs during data retrieval.
"""
CHUNK_SIZE = 10000 # How many bars worth of duration to request in each chunk
RATE_LIMIT_DELAY = 0.1 # Small delay between requests (seconds)
data_sources = App.config.get("data_sources", [])
symbols = [x.get("folder") for x in data_sources]
pandas_freq = App.config["freq"]
mt5_timeframe = mt5_freq_from_pandas(pandas_freq)
if not symbols:
symbols = [App.config["symbol"]]
# Connect to trading account (same as before)
mt5_account_id = App.config.get("mt5_account_id")
mt5_password = App.config.get("mt5_password")
mt5_server = App.config.get("mt5_server")
if mt5_account_id and mt5_password and mt5_server:
authorized = connect_mt5(int(mt5_account_id), password=str(mt5_password), server=str(mt5_server))
if not authorized:
log.error(f"MT5 Login failed for account #{mt5_account_id}, error code: {mt5.last_error()}")
return 1
# How many records are missing (and to be requested) for each symbol (not used here)
# missing_klines_counts = [App.analyzer.get_missing_klines_count(sym) for sym in symbols]
# Create a list of tasks for retrieving data
tasks = [asyncio.create_task(request_klines(s, pandas_freq, mt5_timeframe, CHUNK_SIZE, RATE_LIMIT_DELAY)) for s in symbols]
results = {}
timeout = 10 # Seconds to wait for the result
# Process responses in the order of arrival
for fut in asyncio.as_completed(tasks, timeout=timeout):
# Get the results
res = None
try:
res = await fut
except TimeoutError as te:
log.warning(f"Timeout {timeout} seconds when requesting kline data.")
return 1
except Exception as e:
log.warning(f"Exception when requesting kline data.")
return 1
# Add to the database (will overwrite existing klines if any)
if res and res.keys():
# res is dict for symbol, which is a list of record lists of 12 fields
# ==============================
# TODO: We need to check these fields for validity (presence, non-null)
# TODO: We can load maximum 999 latest klines, so if more 1600, then some other method
# TODO: Print somewhere diagnostics about how many lines are in history buffer of db, and if nans are found
results.update(res)
try:
added_count = App.analyzer.store_klines(res)
except Exception as e:
log.error(f"Error storing kline result in the database. Exception: {e}")
return 1
else:
log.error("Received empty or wrong result from klines request.")
return 1
# --- Shutdown MT5 (same as before) ---
log.info("\nShutting down MetaTrader 5 connection...")
mt5.shutdown()
return 0
async def request_klines(symbol: str, pandas_freq: str, mt5_timeframe: int, CHUNK_SIZE: int, RATE_LIMIT_DELAY: float) -> dict:
"""
Requests kline data from MT5 for a given symbol.
It fetches data in chunks to avoid overloading the server and handles
potential errors during the data retrieval process.
Args:
symbol (str): The trading symbol (e.g., "EURUSD").
pandas_freq (str): The pandas frequency string (e.g., "1min", "5min").
mt5_timeframe (int): The MT5 timeframe constant (e.g., mt5.TIMEFRAME_M1).
CHUNK_SIZE (int): The number of bars to request in each chunk.
RATE_LIMIT_DELAY (float): The delay in seconds between requests.
Returns:
dict: A dictionary with the symbol as the key and a list of klines as the value.
Each kline is a list of data points.
Raises:
ValueError: If an invalid MT5 timeframe is provided.
Exception: If any other error occurs during data retrieval.
"""
time_column = App.config["time_column"]
timezone = pytz.timezone("Etc/UTC")
# Define end point for download (now)
end_dt = datetime.now(timezone)
# Get the last kline from the database
last_kline_dt = App.analyzer.get_last_kline_time(symbol)
if last_kline_dt:
current_start_dt = last_kline_dt
log.info(f"Existing data found. Will download data starting from {current_start_dt.strftime('%Y-%m-%d %H:%M:%S %Z')}")
else:
# Define a default historical start if no file exists | 2017 | 2024
current_start_dt = datetime(2014, 1, 1, tzinfo=timezone) # Or get from config if needed
log.info(f"No existing data found. Starting download from {current_start_dt.strftime('%Y-%m-%d %H:%M:%S %Z')}.")
all_klines_list = []
try:
while current_start_dt < end_dt:
try:
# Calculate the duration for CHUNK_SIZE bars
chunk_duration = get_timedelta_for_mt5_timeframe(mt5_timeframe, CHUNK_SIZE)
except ValueError as e:
log.error(f"Error calculating duration: {e}. Stopping download for {symbol}.")
break
# Calculate the temporary end date for this chunk request
temp_end_dt = current_start_dt + chunk_duration
# Ensure the temporary end date doesn't go beyond the overall end date
temp_end_dt = min(temp_end_dt, end_dt)
# Add a small buffer (e.g., 1 second) to start_dt for the request
# to definitively exclude the current_start_dt bar itself in the range request.
request_start_dt = current_start_dt + timedelta(seconds=1)
# Avoid making a request if the adjusted start is already >= temp_end
if request_start_dt >= temp_end_dt:
log.info(f" Skipping request: Calculated range is empty or invalid ({request_start_dt.strftime('%Y-%m-%d %H:%M:%S %Z')} to {temp_end_dt.strftime('%Y-%m-%d %H:%M:%S %Z')}).")
break # Likely means we are caught up
log.info(f" Fetching range from {request_start_dt.strftime('%Y-%m-%d %H:%M:%S %Z')} to {temp_end_dt.strftime('%Y-%m-%d %H:%M:%S %Z')}...")
# Use copy_rates_range, since copy_rates_from seems to fetch data using backward lookback(present to past)
rates = mt5.copy_rates_range(symbol, mt5_timeframe, request_start_dt, temp_end_dt)
if rates is None:
log.error(f" mt5.copy_rates_range returned None. Error: {mt5.last_error()}. Stopping download for {symbol}.")
break
if len(rates) == 0:
log.info(" No data returned in this range. Download may be complete or data gap.")
# If no data, advance start time past this chunk's end to avoid getting stuck
current_start_dt = temp_end_dt
if current_start_dt >= end_dt:
log.info(" Reached end date after empty range.")
break
else:
log.info(f" Advancing start time to {current_start_dt.strftime('%Y-%m-%d %H:%M:%S %Z')} and continuing.")
time.sleep(RATE_LIMIT_DELAY) # Still pause slightly
continue # Try the next chunk
chunk_df = pd.DataFrame(rates)
# Convert 'time' (Unix seconds) to datetime objects (UTC)
chunk_df[time_column] = pd.to_datetime(chunk_df['time'], unit='s', utc=True)
all_klines_list.append(chunk_df)
last_bar_time_in_chunk = chunk_df[time_column].iloc[-1]
log.info(f" Fetched {len(chunk_df)} bars. Last timestamp in chunk: {last_bar_time_in_chunk.strftime('%Y-%m-%d %H:%M:%S %Z')}")
# Update the start time for the next chunk request to be the end time of THIS chunk's last bar
current_start_dt = last_bar_time_in_chunk
# Small delay before next request
time.sleep(RATE_LIMIT_DELAY)
except Exception as e:
log.error(f"Exception while requesting klines: {e}")
return {}
# Return all received klines with the symbol as a key
return {symbol: all_klines_list}
#
# Server and account info
#
async def data_provider_health_check() -> int:
"""
Performs a health check on the MT5 connection.
It verifies if the MT5 terminal is accessible and returns the status.
Returns:
int: 0 if the MT5 connection is healthy, 1 otherwise.
"""
# Check MT5 connection
if not mt5.terminal_info():
log.error(f"MT5 terminal not found.")
return 1
return 0