intelligent-trading-bot/scripts/download_mt5.py

353 lines
16 KiB
Python
Raw Permalink Normal View History

2025-04-17 07:37:15 +01:00
import time
from datetime import datetime, timedelta
from pathlib import Path
2025-04-17 13:07:32 +01:00
import os
2025-04-17 07:37:15 +01:00
import pandas as pd
import click
import MetaTrader5 as mt5
import pytz
from common.utils import mt5_freq_from_pandas, get_timedelta_for_mt5_timeframe
from service.App import App, load_config
2025-04-17 13:07:32 +01:00
from service.mt5 import connect_mt5
2025-04-17 07:37:15 +01:00
print("MetaTrader5 package author: ", mt5.__author__)
print("MetaTrader5 package version: ", mt5.__version__)
# --- Configuration ---
2025-04-17 13:07:32 +01:00
DEFAULT_BAR_CHUNK_SIZE = 10000 # How many bars worth of duration to request in each chunk
DEFAULT_TICK_CHUNK_SIZE = 5 # How many ticks worth of duration to request in each chunk
2025-04-17 07:37:15 +01:00
RATE_LIMIT_DELAY = 0.1 # Small delay between requests (seconds)
# ---------------------
2025-04-17 13:07:32 +01:00
2025-04-17 07:37:15 +01:00
# -------------------------------------------------
@click.command()
@click.option('--config_file', '-c', type=click.Path(), default='', help='Configuration file name')
def main(config_file):
"""
Retrieving historic klines from MetaTrader5 server incrementally using copy_rates_range
with calculated chunk durations. Downloads data from the last record in the existing file
(or a historical start date) up to the current time, fetching in duration-based chunks.
"""
load_config(config_file)
2025-04-17 13:07:32 +01:00
data_sources = App.config["data_sources"]
2025-04-17 07:37:15 +01:00
time_column = App.config["time_column"]
data_path = Path(App.config["data_folder"])
download_max_rows = App.config.get("download_max_rows", 0)
mt5_account_id = App.config.get("mt5_account_id")
mt5_password = App.config.get("mt5_password")
mt5_server = App.config.get("mt5_server")
script_start_time = datetime.now()
pandas_freq = App.config["freq"]
print(f"Pandas frequency: {pandas_freq}")
mt5_timeframe = mt5_freq_from_pandas(pandas_freq)
# Use timeframe_description for clearer output if available
try:
tf_description = mt5.timeframe_description(mt5_timeframe)
print(f"MetaTrader5 frequency: {tf_description} ({mt5_timeframe})")
except AttributeError: # Handle older MT5 versions potentially lacking this func
print(f"MetaTrader5 frequency: {mt5_timeframe}")
# Define the timezone for MT5 (usually UTC)
timezone = pytz.timezone("Etc/UTC")
2025-04-17 13:07:32 +01:00
# Define a default historical start if no file exists => 2014 | 2017 | 2024
historical_start_date = datetime(2017, 1, 1, tzinfo=timezone) # Or get from config if needed
2025-04-17 07:37:15 +01:00
2025-04-17 13:07:32 +01:00
# Connect to trading account
2025-04-17 07:37:15 +01:00
if mt5_account_id and mt5_password and mt5_server:
2025-04-17 13:07:32 +01:00
authorized = connect_mt5(mt5_account_id, password=str(mt5_password), server=str(mt5_server))
2025-04-17 07:37:15 +01:00
if authorized:
print("MT5 Login successful.")
account_info = mt5.account_info()
if account_info:
print(f"Account Info: Login={account_info.login}, Server={account_info.server}, Balance={account_info.balance}")
else:
print(f"Could not retrieve account info. Error: {mt5.last_error()}")
else:
print(f"MT5 Login failed for account #{mt5_account_id}, error code: {mt5.last_error()}")
mt5.shutdown()
return
else:
print("MT5 credentials not fully provided in config. Proceeding without login (might affect available symbols/data).")
print(f"Terminal Info: {mt5.terminal_info()}")
# --- Loop through data sources ---
2025-04-17 13:07:32 +01:00
2025-04-17 07:37:15 +01:00
processed_symbols = []
for ds in data_sources:
2025-04-17 20:31:44 +01:00
quote = str(ds.get("folder"))
2025-04-17 13:07:32 +01:00
file_type = str(ds.get("file")).lower()
2025-04-17 07:37:15 +01:00
if not quote:
print("ERROR: Folder (symbol) is not specified in data_sources.")
continue
2025-04-17 13:07:32 +01:00
2025-04-17 07:37:15 +01:00
print(f"\n--- Processing symbol: {quote} ---")
file_path = data_path / quote
file_path.mkdir(parents=True, exist_ok=True)
file_name = (file_path / "klines").with_suffix(".csv")
2025-04-17 13:07:32 +01:00
chunk_size = int(ds.get("chunk_size", DEFAULT_BAR_CHUNK_SIZE))
if file_type == "ticks":
file_name = (file_path / "ticks").with_suffix(".csv")
chunk_size = int(ds.get("chunk_size", DEFAULT_TICK_CHUNK_SIZE))
2025-04-17 07:37:15 +01:00
existing_df = pd.DataFrame()
start_dt = historical_start_date
2025-04-17 13:07:32 +01:00
# Check if file exists and load data
2025-04-17 07:37:15 +01:00
if file_name.is_file():
try:
print(f"Loading existing data from: {file_name}")
# Specify date format for potentially faster parsing if consistent
existing_df = pd.read_csv(file_name, parse_dates=[time_column], date_format='ISO8601')
# Ensure timezone is set correctly after parsing
if pd.api.types.is_datetime64_any_dtype(existing_df[time_column]) and existing_df[time_column].dt.tz is None:
existing_df[time_column] = existing_df[time_column].dt.tz_localize('UTC')
elif pd.api.types.is_datetime64_any_dtype(existing_df[time_column]):
existing_df[time_column] = existing_df[time_column].dt.tz_convert('UTC')
else: # Fallback if parsing failed or column is not datetime
print(f"Warning: Column '{time_column}' not parsed as datetime. Attempting conversion.")
existing_df[time_column] = pd.to_datetime(existing_df[time_column], errors='coerce', utc=True)
existing_df = existing_df.dropna(subset=[time_column]) # Drop rows where conversion failed
if not existing_df.empty:
# Sort just in case file wasn't sorted
existing_df = existing_df.sort_values(by=time_column)
# Start downloading from the timestamp of the last record
start_dt = existing_df[time_column].iloc[-1]
print(f"Existing file found. Will download data starting from {start_dt.strftime('%Y-%m-%d %H:%M:%S %Z')}")
else:
print("Existing file was empty or had invalid dates after loading. Starting from historical date.")
existing_df = pd.DataFrame() # Reset to empty
except Exception as e:
print(f"Error loading existing file {file_name}: {e}. Starting from historical date.")
existing_df = pd.DataFrame() # Reset to empty
else:
print(f"File not found. Starting download from {historical_start_date.strftime('%Y-%m-%d %H:%M:%S %Z')}.")
start_dt = historical_start_date # Ensure start_dt is set
# Define end point for download (now)
end_dt = datetime.now(timezone)
2025-04-17 13:07:32 +01:00
# Check if symbol is available
2025-04-17 07:37:15 +01:00
symbol_info = mt5.symbol_info(quote)
if not symbol_info:
print(f"Symbol {quote} not found or not available in MT5 terminal. Skipping. Error: {mt5.last_error()}")
continue
2025-04-17 13:07:32 +01:00
if file_type == "ticks" and not symbol_info.trade_tick_size:
print(f"Ticks data is not available for {quote}. Skipping. Error: {mt5.last_error()}")
os.remove(file_name)
continue
2025-04-17 07:37:15 +01:00
print(f"Symbol {quote} found in MT5.")
all_klines_list = []
current_start_dt = start_dt
print(f"Starting download loop for {quote} from {current_start_dt.strftime('%Y-%m-%d %H:%M:%S %Z')} to {end_dt.strftime('%Y-%m-%d %H:%M:%S %Z')}...")
2025-04-17 13:07:32 +01:00
# --- Download Loop using copy_rates_range or copy_ticks_range with calculated duration ---
2025-04-17 07:37:15 +01:00
while current_start_dt < end_dt:
try:
2025-04-17 13:07:32 +01:00
# Calculate the duration for chunk_size bars or ticks
chunk_duration = get_timedelta_for_mt5_timeframe(mt5_timeframe, chunk_size)
2025-04-17 07:37:15 +01:00
except ValueError as e:
print(f"Error calculating duration: {e}. Stopping download for {quote}.")
break
# Calculate the temporary end date for this chunk request
temp_end_dt = current_start_dt + chunk_duration
# Ensure the temporary end date doesn't go beyond the overall end date
temp_end_dt = min(temp_end_dt, end_dt)
# Add a small buffer (e.g., 1 second) to start_dt for the request
# to definitively exclude the current_start_dt bar itself in the range request.
request_start_dt = current_start_dt + timedelta(seconds=1)
# Avoid making a request if the adjusted start is already >= temp_end
if request_start_dt >= temp_end_dt:
print(f" Skipping request: Calculated range is empty or invalid ({request_start_dt.strftime('%Y-%m-%d %H:%M:%S %Z')} to {temp_end_dt.strftime('%Y-%m-%d %H:%M:%S %Z')}).")
break # Likely means we are caught up
print(f" Fetching range from {request_start_dt.strftime('%Y-%m-%d %H:%M:%S %Z')} to {temp_end_dt.strftime('%Y-%m-%d %H:%M:%S %Z')}...")
2025-04-17 13:07:32 +01:00
# Use copy_rates_range or copy_ticks_range
if file_type == "ticks":
rates = mt5.copy_ticks_range(quote, request_start_dt, temp_end_dt, mt5.COPY_TICKS_ALL)
if rates is None:
print(f" mt5.copy_ticks_range returned None. Error: {mt5.last_error()}. Stopping download for {quote}.")
break
else:
rates = mt5.copy_rates_range(quote, mt5_timeframe, request_start_dt, temp_end_dt)
if rates is None:
print(f" mt5.copy_rates_range returned None. Error: {mt5.last_error()}. Stopping download for {quote}.")
break
2025-04-17 07:37:15 +01:00
if len(rates) == 0:
print(" No data returned in this range. Download may be complete or data gap.")
# If no data, advance start time past this chunk's end to avoid getting stuck
current_start_dt = temp_end_dt
if current_start_dt >= end_dt:
print(" Reached end date after empty range.")
break
else:
print(f" Advancing start time to {current_start_dt.strftime('%Y-%m-%d %H:%M:%S %Z')} and continuing.")
time.sleep(RATE_LIMIT_DELAY) # Still pause slightly
continue # Try the next chunk
chunk_df = pd.DataFrame(rates)
2025-04-17 13:07:32 +01:00
if file_type == "ticks":
# Convert 'time_msc' (Unix milliseconds) to datetime objects (UTC)
chunk_df[time_column] = pd.to_datetime(chunk_df['time_msc'], unit='ms', utc=True)
else:
# Convert 'time' (Unix seconds) to datetime objects (UTC)
chunk_df[time_column] = pd.to_datetime(chunk_df['time'], unit='s', utc=True)
2025-04-17 07:37:15 +01:00
# --- IMPORTANT: Filtering is no longer needed here ---
# Since we requested data *starting after* current_start_dt using request_start_dt,
# the check `chunk_df = chunk_df[chunk_df[time_column] > current_start_dt]`
# is redundant and can be removed.
# if chunk_df.empty: # This check is effectively handled by len(rates) == 0 now
# print(" No new bars found in the fetched chunk (after filtering). Stopping.")
# break
all_klines_list.append(chunk_df)
last_bar_time_in_chunk = chunk_df[time_column].iloc[-1]
print(f" Fetched {len(chunk_df)} bars. Last timestamp in chunk: {last_bar_time_in_chunk.strftime('%Y-%m-%d %H:%M:%S %Z')}")
# Update the start time for the next chunk request to be the end time of THIS chunk's last bar
current_start_dt = last_bar_time_in_chunk
# Check if we've downloaded past the intended end time (redundant check, loop condition handles it)
# if current_start_dt >= end_dt:
# print(" Reached or passed target end time. Download complete.")
# break
# Small delay before next request
time.sleep(RATE_LIMIT_DELAY)
2025-04-17 13:07:32 +01:00
# --- Combine and Process Data ---
2025-04-17 07:37:15 +01:00
if not all_klines_list:
print(f"No new data downloaded for {quote}.")
if existing_df.empty:
print(f"No existing or new data for {quote}. Skipping save.")
continue
else:
print("Saving existing data only (no updates).")
final_df = existing_df # Use existing data if no new data was fetched
else:
print("Combining downloaded data...")
new_df = pd.concat(all_klines_list, ignore_index=True)
# Combine existing and new data
if not existing_df.empty:
final_df = pd.concat([existing_df, new_df], ignore_index=True)
else:
final_df = new_df
print("Processing combined data (duplicates, sorting, columns)...")
2025-04-17 13:07:32 +01:00
if file_type == "ticks":
# Standardize columns (assuming MT5 names)
final_df.rename(columns={
'time_msc': 'time',
'flags': 'flags',
'bid': 'bid',
'ask': 'ask',
'last': 'last',
'volume': 'volume',
}, inplace=True, errors='ignore') # Added errors='ignore'
else:
final_df.rename(columns={
'tick_volume': 'volume', # Use tick_volume as 'volume'
}, inplace=True, errors='ignore') # Added errors='ignore'
# Ensure time column is the primary datetime column and drop time column if exist
2025-04-17 07:37:15 +01:00
if 'time' in final_df.columns and time_column != 'time':
final_df = final_df.drop('time', axis=1)
# Select desired columns (ensure time_column is first)
required_columns = [time_column, 'open', 'high', 'low', 'close', 'volume']
# Keep only columns that actually exist in the dataframe
final_df = final_df[[col for col in required_columns if col in final_df.columns]]
# Remove duplicates based on timestamp, keeping the latest entry
initial_rows = len(final_df)
final_df = final_df.drop_duplicates(subset=[time_column], keep='last')
if initial_rows > len(final_df):
print(f" Removed {initial_rows - len(final_df)} duplicate rows based on '{time_column}'.")
# Sort by timestamp
final_df = final_df.sort_values(by=time_column)
# Remove the last row *if* it represents the current, incomplete bar.
if not final_df.empty:
# Check if the last bar's time is too close to the script end time
# A simple heuristic: if the last bar's time is after the loop's end_dt minus one interval, it might be incomplete.
# Or just always drop the last row after bulk download.
print("Removing potentially incomplete last bar.")
final_df = final_df.iloc[:-1]
# Apply max rows limit if specified (same as before)
if download_max_rows and len(final_df) > download_max_rows:
print(f"Applying download_max_rows limit: {download_max_rows}")
final_df = final_df.tail(download_max_rows)
# Final check if DataFrame is valid before saving (same as before)
if final_df.empty:
print(f"Final dataframe for {quote} is empty after processing. Skipping save.")
continue
# Reset index before saving (same as before)
final_df = final_df.reset_index(drop=True)
# --- Save Data (same as before) ---
2025-04-17 13:07:32 +01:00
2025-04-17 07:37:15 +01:00
try:
2025-04-17 13:07:32 +01:00
if file_type == "ticks":
final_df = final_df.drop(['time'], axis=1)
2025-04-17 07:37:15 +01:00
print(f"Saving {len(final_df)} rows to {file_name}...")
final_df.to_csv(file_name, index=False, date_format='%Y-%m-%dT%H:%M:%SZ')
print(f"Finished saving '{quote}'.")
processed_symbols.append(quote)
except Exception as e:
print(f"Error saving file {file_name}: {e}")
# --- Shutdown MT5 (same as before) ---
print("\nShutting down MetaTrader 5 connection...")
mt5.shutdown()
elapsed = datetime.now() - script_start_time
print(f"\nFinished downloading data for symbols: {', '.join(processed_symbols) if processed_symbols else 'None'}")
print(f"Total time: {str(elapsed).split('.')[0]}")
if __name__ == '__main__':
main()