intelligent-trading-bot/inputs/collector_mt5.py

563 lines
25 KiB
Python
Raw Permalink Normal View History

2025-04-17 14:21:17 +01:00
"""
This module handles the collection of market data from MetaTrader 5 (MT5).
It defines tasks for connecting to MT5, requesting historical kline data,
and storing it in the database. It also includes health checks for the MT5 connection.
"""
import os
2025-04-17 14:21:17 +01:00
import time
from datetime import datetime, timedelta
from typing import Optional
from pathlib import Path
import pandas as pd
2026-01-09 17:47:08 +01:00
import pytz
2025-04-17 14:21:17 +01:00
import asyncio
2026-01-09 17:47:08 +01:00
2025-04-17 14:21:17 +01:00
import MetaTrader5 as mt5
from inputs.utils_mt5 import *
2025-04-17 14:21:17 +01:00
import logging
log = logging.getLogger('mt5')
2025-04-17 14:21:17 +01:00
print("MetaTrader5 package author: ", mt5.__author__)
print("MetaTrader5 package version: ", mt5.__version__)
client = None
2025-04-17 14:21:17 +01:00
#
# Parameters
#
CHUNK_SIZE = 10000 # (int): The number of bars to request in each chunk. How many bars worth of duration to request in each chunk
TICK_CHUNK_SIZE = 5 # How many ticks worth of duration to request in each chunk
RATE_LIMIT_DELAY = 0.1 # (float): The delay in seconds between requests. Small delay between requests (seconds)
default_start_dt = datetime(2014, 1, 1, tzinfo=timezone) # Or get from config if needed
time_column = 'timestamp'
timezone = pytz.timezone("Etc/UTC")
def init_client(parameters, client_args):
global client
authorized = collector_mt5.connect_mt5(**client_args)
if not authorized:
log.error(f"Failed to connect to MT5. Check credentials and server details.")
raise ConnectionError("Failed to connect to MT5. Check credentials and server details.")
collector_mt5.client = mt5
def get_client():
return client
def close_client():
client.shutdown()
async def fetch_klines(config: dict, start_from_dt) -> dict[str, pd.DataFrame] | None:
2025-04-17 14:21:17 +01:00
"""
Synchronizes the local data state with the latest data from MT5.
This task retrieves the most recent kline data for specified symbols and
stores it in the database.
Returns:
dict: For each symbol (key of the dict), a data frame with the new data
2025-04-17 14:21:17 +01:00
Raises:
TimeoutError: If the data request times out.
Exception: If any other error occurs during data retrieval.
"""
data_sources = config.get("data_sources", [])
2025-04-17 14:21:17 +01:00
symbols = [x.get("folder") for x in data_sources]
pandas_freq = config["freq"]
2025-04-17 14:21:17 +01:00
mt5_timeframe = mt5_freq_from_pandas(pandas_freq)
if not symbols:
symbols = [config["symbol"]]
2025-04-17 14:21:17 +01:00
# Connect to trading account (same as before)
mt5_account_id = config.get("mt5_account_id")
mt5_password = config.get("mt5_password")
mt5_server = config.get("mt5_server")
2025-04-17 14:21:17 +01:00
if mt5_account_id and mt5_password and mt5_server:
authorized = connect_mt5(int(mt5_account_id), password=str(mt5_password), server=str(mt5_server))
if not authorized:
log.error(f"MT5 Login failed for account #{mt5_account_id}, error code: {mt5.last_error()}")
return None
2025-04-17 14:21:17 +01:00
# How many records are missing (and to be requested) for each symbol (not used here)
# missing_klines_counts = [App.analyzer.get_missing_klines_count(sym) for sym in symbols]
# Create a list of tasks for retrieving data
tasks = [asyncio.create_task(request_symbol_klines(s, mt5_timeframe, start_from_dt)) for s in symbols]
2025-04-17 14:21:17 +01:00
results = {}
timeout = 10 # Seconds to wait for the result
# Process responses in the order of arrival
for fut in asyncio.as_completed(tasks, timeout=timeout):
# Get the results
res = None
try:
res = await fut
except TimeoutError as te:
log.warning(f"Timeout {timeout} seconds when requesting kline data.")
return None
2025-04-17 14:21:17 +01:00
except Exception as e:
log.warning(f"Exception when requesting kline data.")
return None
2025-04-17 14:21:17 +01:00
# Add to the database (will overwrite existing klines if any)
if res and res.keys():
results.update(res)
else:
log.error("Received empty or wrong result from klines request.")
return None
2025-04-17 14:21:17 +01:00
# --- Shutdown MT5 (same as before) ---
log.info("\nShutting down MetaTrader 5 connection...")
mt5.shutdown()
return results
2025-04-17 14:21:17 +01:00
async def request_symbol_klines(symbol: str, mt5_timeframe: int, start_from_dt) -> dict:
2025-04-17 14:21:17 +01:00
"""
Requests kline data from MT5 for a given symbol.
It fetches data in chunks to avoid overloading the server and handles
potential errors during the data retrieval process.
Args:
symbol (str): The trading symbol (e.g., "EURUSD").
mt5_timeframe (int): The MT5 timeframe constant (e.g., mt5.TIMEFRAME_M1).
Returns:
dict: A dictionary with the symbol as the key and a data frame with klines
2025-04-17 14:21:17 +01:00
Raises:
ValueError: If an invalid MT5 timeframe is provided.
Exception: If any other error occurs during data retrieval.
"""
# Define end point for download (now)
end_dt = datetime.now(timezone)
if start_from_dt:
current_start_dt = start_from_dt
2025-04-17 14:21:17 +01:00
log.info(f"Existing data found. Will download data starting from {current_start_dt.strftime('%Y-%m-%d %H:%M:%S %Z')}")
else:
# Define a default historical start if no file exists | 2017 | 2024
current_start_dt = default_start_dt
2025-04-17 14:21:17 +01:00
log.info(f"No existing data found. Starting download from {current_start_dt.strftime('%Y-%m-%d %H:%M:%S %Z')}.")
all_klines_list = []
try:
while current_start_dt < end_dt:
try:
# Calculate the duration for CHUNK_SIZE bars
chunk_duration = get_timedelta_for_mt5_timeframe(mt5_timeframe, CHUNK_SIZE)
except ValueError as e:
log.error(f"Error calculating duration: {e}. Stopping download for {symbol}.")
break
# Calculate the temporary end date for this chunk request
temp_end_dt = current_start_dt + chunk_duration
# Ensure the temporary end date doesn't go beyond the overall end date
temp_end_dt = min(temp_end_dt, end_dt)
# Add a small buffer (e.g., 1 second) to start_dt for the request
# to definitively exclude the current_start_dt bar itself in the range request.
request_start_dt = current_start_dt + timedelta(seconds=1)
# Avoid making a request if the adjusted start is already >= temp_end
if request_start_dt >= temp_end_dt:
log.info(f" Skipping request: Calculated range is empty or invalid ({request_start_dt.strftime('%Y-%m-%d %H:%M:%S %Z')} to {temp_end_dt.strftime('%Y-%m-%d %H:%M:%S %Z')}).")
break # Likely means we are caught up
log.info(f" Fetching range from {request_start_dt.strftime('%Y-%m-%d %H:%M:%S %Z')} to {temp_end_dt.strftime('%Y-%m-%d %H:%M:%S %Z')}...")
# Use copy_rates_range, since copy_rates_from seems to fetch data using backward lookback(present to past)
rates = mt5.copy_rates_range(symbol, mt5_timeframe, request_start_dt, temp_end_dt)
if rates is None:
log.error(f" mt5.copy_rates_range returned None. Error: {mt5.last_error()}. Stopping download for {symbol}.")
break
if len(rates) == 0:
log.info(" No data returned in this range. Download may be complete or data gap.")
# If no data, advance start time past this chunk's end to avoid getting stuck
current_start_dt = temp_end_dt
if current_start_dt >= end_dt:
log.info(" Reached end date after empty range.")
break
else:
log.info(f" Advancing start time to {current_start_dt.strftime('%Y-%m-%d %H:%M:%S %Z')} and continuing.")
time.sleep(RATE_LIMIT_DELAY) # Still pause slightly
continue # Try the next chunk
chunk_df = pd.DataFrame(rates)
# Convert 'time' (Unix seconds) to datetime objects (UTC)
chunk_df[time_column] = pd.to_datetime(chunk_df['time'], unit='s', utc=True)
all_klines_list.append(chunk_df)
last_bar_time_in_chunk = chunk_df[time_column].iloc[-1]
log.info(f" Fetched {len(chunk_df)} bars. Last timestamp in chunk: {last_bar_time_in_chunk.strftime('%Y-%m-%d %H:%M:%S %Z')}")
# Update the start time for the next chunk request to be the end time of THIS chunk's last bar
current_start_dt = last_bar_time_in_chunk
# Small delay before next request
time.sleep(RATE_LIMIT_DELAY)
except Exception as e:
log.error(f"Exception while requesting klines: {e}")
return {}
df = pd.concat(all_klines_list, axis=0, ignore_index=False)
df = df.drop_duplicates(subset=time_column)
df = df.set_index(time_column, inplace=False, drop=False)
df.name = symbol
2025-04-17 14:21:17 +01:00
# Return all received klines with the symbol as a key
return {symbol: df}
2025-04-17 14:21:17 +01:00
async def health_check() -> int:
2025-04-17 14:21:17 +01:00
"""
Performs a health check on the MT5 connection.
It verifies if the MT5 terminal is accessible and returns the status.
Returns:
int: 0 if the MT5 connection is healthy, 1 otherwise.
"""
# Check MT5 connection
if not mt5.terminal_info():
log.error(f"MT5 terminal not found.")
return 1
return 0
def connect_mt5(mt5_account_id: Optional[int] = None, mt5_password: Optional[str] = None, mt5_server: Optional[str] = None, **kwargs):
"""
Initializes the MetaTrader 5 connection and attempts to log in with the provided credentials.
"""
# Initialize MetaTrader 5 connection
if not mt5.initialize():
log.error(f"initialize() failed, error code = {mt5.last_error()}")
return False
log.info(f"MT5 Initialized. Version: {mt5.version()}")
if mt5_account_id and mt5_password and mt5_server:
authorized = mt5.login(int(mt5_account_id), password=str(mt5_password), server=str(mt5_server), **kwargs)
if not authorized:
log.error(f"MT5 Login failed for account #{mt5_account_id}, error code: {mt5.last_error()}")
mt5.shutdown()
return False
return True
def download_klines(config, data_sources):
"""
Retrieving historic klines from MetaTrader5 server incrementally using copy_rates_range
with calculated chunk durations. Downloads data from the last record in the existing file
(or a historical start date) up to the current time, fetching in duration-based chunks.
"""
time_column = config["time_column"]
data_path = Path(config["data_folder"])
download_max_rows = config.get("download_max_rows", 0)
mt5_account_id = config.get("mt5_account_id")
mt5_password = config.get("mt5_password")
mt5_server = config.get("mt5_server")
pandas_freq = config["freq"]
print(f"Pandas frequency: {pandas_freq}")
mt5_timeframe = mt5_freq_from_pandas(pandas_freq)
# Use timeframe_description for clearer output if available
try:
tf_description = mt5.timeframe_description(mt5_timeframe)
print(f"MetaTrader5 frequency: {tf_description} ({mt5_timeframe})")
except AttributeError: # Handle older MT5 versions potentially lacking this func
print(f"MetaTrader5 frequency: {mt5_timeframe}")
# Define the timezone for MT5 (usually UTC)
timezone = pytz.timezone("Etc/UTC")
# Define a default historical start if no file exists => 2014 | 2017 | 2024
historical_start_date = datetime(2017, 1, 1, tzinfo=timezone) # Or get from config if needed
# Connect to trading account
if mt5_account_id and mt5_password and mt5_server:
authorized = connect_mt5(mt5_account_id, password=str(mt5_password), server=str(mt5_server))
if authorized:
print("MT5 Login successful.")
account_info = mt5.account_info()
if account_info:
print(f"Account Info: Login={account_info.login}, Server={account_info.server}, Balance={account_info.balance}")
else:
print(f"Could not retrieve account info. Error: {mt5.last_error()}")
else:
print(f"MT5 Login failed for account #{mt5_account_id}, error code: {mt5.last_error()}")
mt5.shutdown()
return
else:
print("MT5 credentials not fully provided in config. Proceeding without login (might affect available symbols/data).")
print(f"Terminal Info: {mt5.terminal_info()}")
# --- Loop through data sources ---
processed_symbols = []
for ds in data_sources:
quote = str(ds.get("folder"))
file_type = str(ds.get("file")).lower()
if not quote:
print("ERROR: Folder (symbol) is not specified in data_sources.")
continue
print(f"\n--- Processing symbol: {quote} ---")
file_path = data_path / quote
file_path.mkdir(parents=True, exist_ok=True)
file_name = (file_path / "klines").with_suffix(".csv")
chunk_size = int(ds.get("chunk_size", CHUNK_SIZE))
if file_type == "ticks":
file_name = (file_path / "ticks").with_suffix(".csv")
chunk_size = int(ds.get("chunk_size", TICK_CHUNK_SIZE))
existing_df = pd.DataFrame()
start_dt = historical_start_date
# Check if file exists and load data
if file_name.is_file():
try:
print(f"Loading existing data from: {file_name}")
# Specify date format for potentially faster parsing if consistent
existing_df = pd.read_csv(file_name, parse_dates=[time_column], date_format='ISO8601')
# Ensure timezone is set correctly after parsing
if pd.api.types.is_datetime64_any_dtype(existing_df[time_column]) and existing_df[time_column].dt.tz is None:
existing_df[time_column] = existing_df[time_column].dt.tz_localize('UTC')
elif pd.api.types.is_datetime64_any_dtype(existing_df[time_column]):
existing_df[time_column] = existing_df[time_column].dt.tz_convert('UTC')
else: # Fallback if parsing failed or column is not datetime
print(f"Warning: Column '{time_column}' not parsed as datetime. Attempting conversion.")
existing_df[time_column] = pd.to_datetime(existing_df[time_column], errors='coerce', utc=True)
existing_df = existing_df.dropna(subset=[time_column]) # Drop rows where conversion failed
if not existing_df.empty:
# Sort just in case file wasn't sorted
existing_df = existing_df.sort_values(by=time_column)
# Start downloading from the timestamp of the last record
start_dt = existing_df[time_column].iloc[-1]
print(f"Existing file found. Will download data starting from {start_dt.strftime('%Y-%m-%d %H:%M:%S %Z')}")
else:
print("Existing file was empty or had invalid dates after loading. Starting from historical date.")
existing_df = pd.DataFrame() # Reset to empty
except Exception as e:
print(f"Error loading existing file {file_name}: {e}. Starting from historical date.")
existing_df = pd.DataFrame() # Reset to empty
else:
print(f"File not found. Starting download from {historical_start_date.strftime('%Y-%m-%d %H:%M:%S %Z')}.")
start_dt = historical_start_date # Ensure start_dt is set
# Define end point for download (now)
end_dt = datetime.now(timezone)
# Check if symbol is available
symbol_info = mt5.symbol_info(quote)
if not symbol_info:
print(f"Symbol {quote} not found or not available in MT5 terminal. Skipping. Error: {mt5.last_error()}")
continue
if file_type == "ticks" and not symbol_info.trade_tick_size:
print(f"Ticks data is not available for {quote}. Skipping. Error: {mt5.last_error()}")
os.remove(file_name)
continue
print(f"Symbol {quote} found in MT5.")
all_klines_list = []
current_start_dt = start_dt
print(f"Starting download loop for {quote} from {current_start_dt.strftime('%Y-%m-%d %H:%M:%S %Z')} to {end_dt.strftime('%Y-%m-%d %H:%M:%S %Z')}...")
# --- Download Loop using copy_rates_range or copy_ticks_range with calculated duration ---
while current_start_dt < end_dt:
try:
# Calculate the duration for chunk_size bars or ticks
chunk_duration = get_timedelta_for_mt5_timeframe(mt5_timeframe, chunk_size)
except ValueError as e:
print(f"Error calculating duration: {e}. Stopping download for {quote}.")
break
# Calculate the temporary end date for this chunk request
temp_end_dt = current_start_dt + chunk_duration
# Ensure the temporary end date doesn't go beyond the overall end date
temp_end_dt = min(temp_end_dt, end_dt)
# Add a small buffer (e.g., 1 second) to start_dt for the request
# to definitively exclude the current_start_dt bar itself in the range request.
request_start_dt = current_start_dt + timedelta(seconds=1)
# Avoid making a request if the adjusted start is already >= temp_end
if request_start_dt >= temp_end_dt:
print(f" Skipping request: Calculated range is empty or invalid ({request_start_dt.strftime('%Y-%m-%d %H:%M:%S %Z')} to {temp_end_dt.strftime('%Y-%m-%d %H:%M:%S %Z')}).")
break # Likely means we are caught up
print(f" Fetching range from {request_start_dt.strftime('%Y-%m-%d %H:%M:%S %Z')} to {temp_end_dt.strftime('%Y-%m-%d %H:%M:%S %Z')}...")
# Use copy_rates_range or copy_ticks_range
if file_type == "ticks":
rates = mt5.copy_ticks_range(quote, request_start_dt, temp_end_dt, mt5.COPY_TICKS_ALL)
if rates is None:
print(f" mt5.copy_ticks_range returned None. Error: {mt5.last_error()}. Stopping download for {quote}.")
break
else:
rates = mt5.copy_rates_range(quote, mt5_timeframe, request_start_dt, temp_end_dt)
if rates is None:
print(f" mt5.copy_rates_range returned None. Error: {mt5.last_error()}. Stopping download for {quote}.")
break
if len(rates) == 0:
print(" No data returned in this range. Download may be complete or data gap.")
# If no data, advance start time past this chunk's end to avoid getting stuck
current_start_dt = temp_end_dt
if current_start_dt >= end_dt:
print(" Reached end date after empty range.")
break
else:
print(f" Advancing start time to {current_start_dt.strftime('%Y-%m-%d %H:%M:%S %Z')} and continuing.")
time.sleep(RATE_LIMIT_DELAY) # Still pause slightly
continue # Try the next chunk
chunk_df = pd.DataFrame(rates)
if file_type == "ticks":
# Convert 'time_msc' (Unix milliseconds) to datetime objects (UTC)
chunk_df[time_column] = pd.to_datetime(chunk_df['time_msc'], unit='ms', utc=True)
else:
# Convert 'time' (Unix seconds) to datetime objects (UTC)
chunk_df[time_column] = pd.to_datetime(chunk_df['time'], unit='s', utc=True)
# --- IMPORTANT: Filtering is no longer needed here ---
# Since we requested data *starting after* current_start_dt using request_start_dt,
# the check `chunk_df = chunk_df[chunk_df[time_column] > current_start_dt]`
# is redundant and can be removed.
# if chunk_df.empty: # This check is effectively handled by len(rates) == 0 now
# print(" No new bars found in the fetched chunk (after filtering). Stopping.")
# break
all_klines_list.append(chunk_df)
last_bar_time_in_chunk = chunk_df[time_column].iloc[-1]
print(f" Fetched {len(chunk_df)} bars. Last timestamp in chunk: {last_bar_time_in_chunk.strftime('%Y-%m-%d %H:%M:%S %Z')}")
# Update the start time for the next chunk request to be the end time of THIS chunk's last bar
current_start_dt = last_bar_time_in_chunk
# Check if we've downloaded past the intended end time (redundant check, loop condition handles it)
# if current_start_dt >= end_dt:
# print(" Reached or passed target end time. Download complete.")
# break
# Small delay before next request
time.sleep(RATE_LIMIT_DELAY)
# --- Combine and Process Data ---
if not all_klines_list:
print(f"No new data downloaded for {quote}.")
if existing_df.empty:
print(f"No existing or new data for {quote}. Skipping save.")
continue
else:
print("Saving existing data only (no updates).")
final_df = existing_df # Use existing data if no new data was fetched
else:
print("Combining downloaded data...")
new_df = pd.concat(all_klines_list, ignore_index=True)
# Combine existing and new data
if not existing_df.empty:
final_df = pd.concat([existing_df, new_df], ignore_index=True)
else:
final_df = new_df
print("Processing combined data (duplicates, sorting, columns)...")
if file_type == "ticks":
# Standardize columns (assuming MT5 names)
final_df.rename(columns={
'time_msc': 'time',
'flags': 'flags',
'bid': 'bid',
'ask': 'ask',
'last': 'last',
'volume': 'volume',
}, inplace=True, errors='ignore') # Added errors='ignore'
else:
final_df.rename(columns={
'tick_volume': 'volume', # Use tick_volume as 'volume'
}, inplace=True, errors='ignore') # Added errors='ignore'
# Ensure time column is the primary datetime column and drop time column if exist
if 'time' in final_df.columns and time_column != 'time':
final_df = final_df.drop('time', axis=1)
# Select desired columns (ensure time_column is first)
required_columns = [time_column, 'open', 'high', 'low', 'close', 'volume']
# Keep only columns that actually exist in the dataframe
final_df = final_df[[col for col in required_columns if col in final_df.columns]]
# Remove duplicates based on timestamp, keeping the latest entry
initial_rows = len(final_df)
final_df = final_df.drop_duplicates(subset=[time_column], keep='last')
if initial_rows > len(final_df):
print(f" Removed {initial_rows - len(final_df)} duplicate rows based on '{time_column}'.")
# Sort by timestamp
final_df = final_df.sort_values(by=time_column)
# Remove the last row *if* it represents the current, incomplete bar.
if not final_df.empty:
# Check if the last bar's time is too close to the script end time
# A simple heuristic: if the last bar's time is after the loop's end_dt minus one interval, it might be incomplete.
# Or just always drop the last row after bulk download.
print("Removing potentially incomplete last bar.")
final_df = final_df.iloc[:-1]
# Apply max rows limit if specified (same as before)
if download_max_rows and len(final_df) > download_max_rows:
print(f"Applying download_max_rows limit: {download_max_rows}")
final_df = final_df.tail(download_max_rows)
# Final check if DataFrame is valid before saving (same as before)
if final_df.empty:
print(f"Final dataframe for {quote} is empty after processing. Skipping save.")
continue
# Reset index before saving (same as before)
final_df = final_df.reset_index(drop=True)
# --- Save Data (same as before) ---
try:
if file_type == "ticks":
final_df = final_df.drop(['time'], axis=1)
print(f"Saving {len(final_df)} rows to {file_name}...")
final_df.to_csv(file_name, index=False, date_format='%Y-%m-%dT%H:%M:%SZ')
print(f"Finished saving '{quote}'.")
processed_symbols.append(quote)
except Exception as e:
print(f"Error saving file {file_name}: {e}")
# --- Shutdown MT5 (same as before) ---
print("\nShutting down MetaTrader 5 connection...")
mt5.shutdown()
print(f"\nFinished downloading data for symbols: {', '.join(processed_symbols) if processed_symbols else 'None'}")