mirror of
https://github.com/asavinov/intelligent-trading-bot.git
synced 2026-05-04 16:26:44 +00:00
271 lines
10 KiB
Python
271 lines
10 KiB
Python
"""
|
|
This module handles the collection of market data from MetaTrader 5 (MT5).
|
|
It defines tasks for connecting to MT5, requesting historical kline data,
|
|
and storing it in the database. It also includes health checks for the MT5 connection.
|
|
"""
|
|
|
|
import time
|
|
from datetime import datetime, timedelta
|
|
|
|
import pandas as pd
|
|
import asyncio
|
|
import MetaTrader5 as mt5
|
|
import pytz
|
|
|
|
from common.utils import mt5_freq_from_pandas, get_timedelta_for_mt5_timeframe
|
|
from service.App import *
|
|
from common.utils import *
|
|
from service.analyzer import *
|
|
from service.mt5 import connect_mt5
|
|
|
|
import logging
|
|
|
|
log = logging.getLogger('collector')
|
|
|
|
|
|
async def main_collector_task() -> int:
|
|
"""
|
|
The main collector task that is executed periodically.
|
|
It checks the MT5 connection, synchronizes data, and handles any errors.
|
|
|
|
Returns:
|
|
int: 0 if the task completed successfully, 1 otherwise.
|
|
"""
|
|
|
|
symbol = App.config["symbol"]
|
|
pandas_freq = App.config["freq"]
|
|
start_ts, end_ts = pandas_get_interval(pandas_freq)
|
|
now_ts = now_timestamp()
|
|
|
|
log.info(f"===> Start collector task. Timestamp {now_ts}. Interval [{start_ts},{end_ts}].")
|
|
|
|
#
|
|
# 0. Check MT5 connection
|
|
#
|
|
if data_provider_problems_exist():
|
|
await data_provider_health_check()
|
|
if data_provider_problems_exist():
|
|
log.error(f"Problems with the data provider server found. No signaling, no trade. Will try next time.")
|
|
return 1
|
|
|
|
#
|
|
# 1. Ensure that we are up-to-date with klines
|
|
#
|
|
res = await sync_data_collector_task()
|
|
|
|
if res > 0:
|
|
log.error(f"Problem getting data from the server. No signaling, no trade. Will try next time.")
|
|
return 1
|
|
|
|
log.info(f"<=== End collector task.")
|
|
return 0
|
|
|
|
|
|
#
|
|
# Request/update market data
|
|
#
|
|
|
|
async def sync_data_collector_task() -> int:
|
|
"""
|
|
Synchronizes the local data state with the latest data from MT5.
|
|
This task retrieves the most recent kline data for specified symbols and
|
|
stores it in the database.
|
|
|
|
Returns:
|
|
int: 0 if the data was synchronized successfully, 1 otherwise.
|
|
|
|
Raises:
|
|
TimeoutError: If the data request times out.
|
|
Exception: If any other error occurs during data retrieval.
|
|
"""
|
|
|
|
CHUNK_SIZE = 10000 # How many bars worth of duration to request in each chunk
|
|
RATE_LIMIT_DELAY = 0.1 # Small delay between requests (seconds)
|
|
|
|
data_sources = App.config.get("data_sources", [])
|
|
symbols = [x.get("folder") for x in data_sources]
|
|
pandas_freq = App.config["freq"]
|
|
mt5_timeframe = mt5_freq_from_pandas(pandas_freq)
|
|
|
|
if not symbols:
|
|
symbols = [App.config["symbol"]]
|
|
|
|
# Connect to trading account (same as before)
|
|
mt5_account_id = App.config.get("mt5_account_id")
|
|
mt5_password = App.config.get("mt5_password")
|
|
mt5_server = App.config.get("mt5_server")
|
|
if mt5_account_id and mt5_password and mt5_server:
|
|
authorized = connect_mt5(int(mt5_account_id), password=str(mt5_password), server=str(mt5_server))
|
|
if not authorized:
|
|
log.error(f"MT5 Login failed for account #{mt5_account_id}, error code: {mt5.last_error()}")
|
|
return 1
|
|
|
|
|
|
# How many records are missing (and to be requested) for each symbol (not used here)
|
|
# missing_klines_counts = [App.analyzer.get_missing_klines_count(sym) for sym in symbols]
|
|
|
|
# Create a list of tasks for retrieving data
|
|
tasks = [asyncio.create_task(request_klines(s, pandas_freq, mt5_timeframe, CHUNK_SIZE, RATE_LIMIT_DELAY)) for s in symbols]
|
|
|
|
results = {}
|
|
timeout = 10 # Seconds to wait for the result
|
|
|
|
# Process responses in the order of arrival
|
|
for fut in asyncio.as_completed(tasks, timeout=timeout):
|
|
# Get the results
|
|
res = None
|
|
try:
|
|
res = await fut
|
|
except TimeoutError as te:
|
|
log.warning(f"Timeout {timeout} seconds when requesting kline data.")
|
|
return 1
|
|
except Exception as e:
|
|
log.warning(f"Exception when requesting kline data.")
|
|
return 1
|
|
|
|
# Add to the database (will overwrite existing klines if any)
|
|
if res and res.keys():
|
|
# res is dict for symbol, which is a list of record lists of 12 fields
|
|
# ==============================
|
|
# TODO: We need to check these fields for validity (presence, non-null)
|
|
# TODO: We can load maximum 999 latest klines, so if more 1600, then some other method
|
|
# TODO: Print somewhere diagnostics about how many lines are in history buffer of db, and if nans are found
|
|
results.update(res)
|
|
try:
|
|
added_count = App.analyzer.store_klines(res)
|
|
except Exception as e:
|
|
log.error(f"Error storing kline result in the database. Exception: {e}")
|
|
return 1
|
|
else:
|
|
log.error("Received empty or wrong result from klines request.")
|
|
return 1
|
|
|
|
# --- Shutdown MT5 (same as before) ---
|
|
log.info("\nShutting down MetaTrader 5 connection...")
|
|
mt5.shutdown()
|
|
|
|
return 0
|
|
|
|
|
|
async def request_klines(symbol: str, pandas_freq: str, mt5_timeframe: int, CHUNK_SIZE: int, RATE_LIMIT_DELAY: float) -> dict:
|
|
"""
|
|
Requests kline data from MT5 for a given symbol.
|
|
It fetches data in chunks to avoid overloading the server and handles
|
|
potential errors during the data retrieval process.
|
|
|
|
Args:
|
|
symbol (str): The trading symbol (e.g., "EURUSD").
|
|
pandas_freq (str): The pandas frequency string (e.g., "1min", "5min").
|
|
mt5_timeframe (int): The MT5 timeframe constant (e.g., mt5.TIMEFRAME_M1).
|
|
CHUNK_SIZE (int): The number of bars to request in each chunk.
|
|
RATE_LIMIT_DELAY (float): The delay in seconds between requests.
|
|
|
|
Returns:
|
|
dict: A dictionary with the symbol as the key and a list of klines as the value.
|
|
Each kline is a list of data points.
|
|
|
|
Raises:
|
|
ValueError: If an invalid MT5 timeframe is provided.
|
|
Exception: If any other error occurs during data retrieval.
|
|
"""
|
|
|
|
time_column = App.config["time_column"]
|
|
timezone = pytz.timezone("Etc/UTC")
|
|
|
|
# Define end point for download (now)
|
|
end_dt = datetime.now(timezone)
|
|
|
|
# Get the last kline from the database
|
|
last_kline_dt = App.analyzer.get_last_kline_time(symbol)
|
|
if last_kline_dt:
|
|
current_start_dt = last_kline_dt
|
|
log.info(f"Existing data found. Will download data starting from {current_start_dt.strftime('%Y-%m-%d %H:%M:%S %Z')}")
|
|
else:
|
|
# Define a default historical start if no file exists | 2017 | 2024
|
|
current_start_dt = datetime(2014, 1, 1, tzinfo=timezone) # Or get from config if needed
|
|
log.info(f"No existing data found. Starting download from {current_start_dt.strftime('%Y-%m-%d %H:%M:%S %Z')}.")
|
|
|
|
all_klines_list = []
|
|
|
|
try:
|
|
while current_start_dt < end_dt:
|
|
try:
|
|
# Calculate the duration for CHUNK_SIZE bars
|
|
chunk_duration = get_timedelta_for_mt5_timeframe(mt5_timeframe, CHUNK_SIZE)
|
|
except ValueError as e:
|
|
log.error(f"Error calculating duration: {e}. Stopping download for {symbol}.")
|
|
break
|
|
|
|
# Calculate the temporary end date for this chunk request
|
|
temp_end_dt = current_start_dt + chunk_duration
|
|
|
|
# Ensure the temporary end date doesn't go beyond the overall end date
|
|
temp_end_dt = min(temp_end_dt, end_dt)
|
|
|
|
# Add a small buffer (e.g., 1 second) to start_dt for the request
|
|
# to definitively exclude the current_start_dt bar itself in the range request.
|
|
request_start_dt = current_start_dt + timedelta(seconds=1)
|
|
|
|
# Avoid making a request if the adjusted start is already >= temp_end
|
|
if request_start_dt >= temp_end_dt:
|
|
log.info(f" Skipping request: Calculated range is empty or invalid ({request_start_dt.strftime('%Y-%m-%d %H:%M:%S %Z')} to {temp_end_dt.strftime('%Y-%m-%d %H:%M:%S %Z')}).")
|
|
break # Likely means we are caught up
|
|
|
|
log.info(f" Fetching range from {request_start_dt.strftime('%Y-%m-%d %H:%M:%S %Z')} to {temp_end_dt.strftime('%Y-%m-%d %H:%M:%S %Z')}...")
|
|
|
|
# Use copy_rates_range, since copy_rates_from seems to fetch data using backward lookback(present to past)
|
|
rates = mt5.copy_rates_range(symbol, mt5_timeframe, request_start_dt, temp_end_dt)
|
|
|
|
if rates is None:
|
|
log.error(f" mt5.copy_rates_range returned None. Error: {mt5.last_error()}. Stopping download for {symbol}.")
|
|
break
|
|
if len(rates) == 0:
|
|
log.info(" No data returned in this range. Download may be complete or data gap.")
|
|
# If no data, advance start time past this chunk's end to avoid getting stuck
|
|
current_start_dt = temp_end_dt
|
|
if current_start_dt >= end_dt:
|
|
log.info(" Reached end date after empty range.")
|
|
break
|
|
else:
|
|
log.info(f" Advancing start time to {current_start_dt.strftime('%Y-%m-%d %H:%M:%S %Z')} and continuing.")
|
|
time.sleep(RATE_LIMIT_DELAY) # Still pause slightly
|
|
continue # Try the next chunk
|
|
|
|
chunk_df = pd.DataFrame(rates)
|
|
# Convert 'time' (Unix seconds) to datetime objects (UTC)
|
|
chunk_df[time_column] = pd.to_datetime(chunk_df['time'], unit='s', utc=True)
|
|
all_klines_list.append(chunk_df)
|
|
last_bar_time_in_chunk = chunk_df[time_column].iloc[-1]
|
|
log.info(f" Fetched {len(chunk_df)} bars. Last timestamp in chunk: {last_bar_time_in_chunk.strftime('%Y-%m-%d %H:%M:%S %Z')}")
|
|
# Update the start time for the next chunk request to be the end time of THIS chunk's last bar
|
|
current_start_dt = last_bar_time_in_chunk
|
|
# Small delay before next request
|
|
time.sleep(RATE_LIMIT_DELAY)
|
|
|
|
except Exception as e:
|
|
log.error(f"Exception while requesting klines: {e}")
|
|
return {}
|
|
|
|
# Return all received klines with the symbol as a key
|
|
return {symbol: all_klines_list}
|
|
|
|
#
|
|
# Server and account info
|
|
#
|
|
|
|
|
|
async def data_provider_health_check() -> int:
|
|
"""
|
|
Performs a health check on the MT5 connection.
|
|
It verifies if the MT5 terminal is accessible and returns the status.
|
|
|
|
Returns:
|
|
int: 0 if the MT5 connection is healthy, 1 otherwise.
|
|
|
|
"""
|
|
|
|
# Check MT5 connection
|
|
if not mt5.terminal_info():
|
|
log.error(f"MT5 terminal not found.")
|
|
return 1
|
|
return 0
|