mirror of
https://github.com/asavinov/intelligent-trading-bot.git
synced 2026-05-04 08:26:19 +00:00
563 lines
25 KiB
Python
563 lines
25 KiB
Python
"""
|
|
This module handles the collection of market data from MetaTrader 5 (MT5).
|
|
It defines tasks for connecting to MT5, requesting historical kline data,
|
|
and storing it in the database. It also includes health checks for the MT5 connection.
|
|
"""
|
|
|
|
import os
|
|
import time
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional
|
|
from pathlib import Path
|
|
|
|
import pandas as pd
|
|
import pytz
|
|
|
|
import asyncio
|
|
|
|
import MetaTrader5 as mt5
|
|
|
|
from inputs.utils_mt5 import *
|
|
|
|
import logging
|
|
log = logging.getLogger('mt5')
|
|
|
|
print("MetaTrader5 package author: ", mt5.__author__)
|
|
print("MetaTrader5 package version: ", mt5.__version__)
|
|
|
|
client = None
|
|
|
|
#
|
|
# Parameters
|
|
#
|
|
CHUNK_SIZE = 10000 # (int): The number of bars to request in each chunk. How many bars worth of duration to request in each chunk
|
|
TICK_CHUNK_SIZE = 5 # How many ticks worth of duration to request in each chunk
|
|
RATE_LIMIT_DELAY = 0.1 # (float): The delay in seconds between requests. Small delay between requests (seconds)
|
|
default_start_dt = datetime(2014, 1, 1, tzinfo=timezone) # Or get from config if needed
|
|
|
|
time_column = 'timestamp'
|
|
timezone = pytz.timezone("Etc/UTC")
|
|
|
|
def init_client(parameters, client_args):
|
|
global client
|
|
authorized = collector_mt5.connect_mt5(**client_args)
|
|
if not authorized:
|
|
log.error(f"Failed to connect to MT5. Check credentials and server details.")
|
|
raise ConnectionError("Failed to connect to MT5. Check credentials and server details.")
|
|
collector_mt5.client = mt5
|
|
|
|
def get_client():
|
|
return client
|
|
|
|
def close_client():
|
|
client.shutdown()
|
|
|
|
async def fetch_klines(config: dict, start_from_dt) -> dict[str, pd.DataFrame] | None:
|
|
"""
|
|
Synchronizes the local data state with the latest data from MT5.
|
|
This task retrieves the most recent kline data for specified symbols and
|
|
stores it in the database.
|
|
|
|
Returns:
|
|
dict: For each symbol (key of the dict), a data frame with the new data
|
|
|
|
Raises:
|
|
TimeoutError: If the data request times out.
|
|
Exception: If any other error occurs during data retrieval.
|
|
"""
|
|
data_sources = config.get("data_sources", [])
|
|
symbols = [x.get("folder") for x in data_sources]
|
|
pandas_freq = config["freq"]
|
|
mt5_timeframe = mt5_freq_from_pandas(pandas_freq)
|
|
|
|
if not symbols:
|
|
symbols = [config["symbol"]]
|
|
|
|
# Connect to trading account (same as before)
|
|
mt5_account_id = config.get("mt5_account_id")
|
|
mt5_password = config.get("mt5_password")
|
|
mt5_server = config.get("mt5_server")
|
|
if mt5_account_id and mt5_password and mt5_server:
|
|
authorized = connect_mt5(int(mt5_account_id), password=str(mt5_password), server=str(mt5_server))
|
|
if not authorized:
|
|
log.error(f"MT5 Login failed for account #{mt5_account_id}, error code: {mt5.last_error()}")
|
|
return None
|
|
|
|
# How many records are missing (and to be requested) for each symbol (not used here)
|
|
# missing_klines_counts = [App.analyzer.get_missing_klines_count(sym) for sym in symbols]
|
|
|
|
# Create a list of tasks for retrieving data
|
|
tasks = [asyncio.create_task(request_symbol_klines(s, mt5_timeframe, start_from_dt)) for s in symbols]
|
|
|
|
results = {}
|
|
timeout = 10 # Seconds to wait for the result
|
|
|
|
# Process responses in the order of arrival
|
|
for fut in asyncio.as_completed(tasks, timeout=timeout):
|
|
# Get the results
|
|
res = None
|
|
try:
|
|
res = await fut
|
|
except TimeoutError as te:
|
|
log.warning(f"Timeout {timeout} seconds when requesting kline data.")
|
|
return None
|
|
except Exception as e:
|
|
log.warning(f"Exception when requesting kline data.")
|
|
return None
|
|
|
|
# Add to the database (will overwrite existing klines if any)
|
|
if res and res.keys():
|
|
results.update(res)
|
|
else:
|
|
log.error("Received empty or wrong result from klines request.")
|
|
return None
|
|
|
|
# --- Shutdown MT5 (same as before) ---
|
|
log.info("\nShutting down MetaTrader 5 connection...")
|
|
mt5.shutdown()
|
|
|
|
return results
|
|
|
|
async def request_symbol_klines(symbol: str, mt5_timeframe: int, start_from_dt) -> dict:
|
|
"""
|
|
Requests kline data from MT5 for a given symbol.
|
|
It fetches data in chunks to avoid overloading the server and handles
|
|
potential errors during the data retrieval process.
|
|
|
|
Args:
|
|
symbol (str): The trading symbol (e.g., "EURUSD").
|
|
mt5_timeframe (int): The MT5 timeframe constant (e.g., mt5.TIMEFRAME_M1).
|
|
|
|
Returns:
|
|
dict: A dictionary with the symbol as the key and a data frame with klines
|
|
|
|
Raises:
|
|
ValueError: If an invalid MT5 timeframe is provided.
|
|
Exception: If any other error occurs during data retrieval.
|
|
"""
|
|
# Define end point for download (now)
|
|
end_dt = datetime.now(timezone)
|
|
|
|
if start_from_dt:
|
|
current_start_dt = start_from_dt
|
|
log.info(f"Existing data found. Will download data starting from {current_start_dt.strftime('%Y-%m-%d %H:%M:%S %Z')}")
|
|
else:
|
|
# Define a default historical start if no file exists | 2017 | 2024
|
|
current_start_dt = default_start_dt
|
|
log.info(f"No existing data found. Starting download from {current_start_dt.strftime('%Y-%m-%d %H:%M:%S %Z')}.")
|
|
|
|
all_klines_list = []
|
|
|
|
try:
|
|
while current_start_dt < end_dt:
|
|
try:
|
|
# Calculate the duration for CHUNK_SIZE bars
|
|
chunk_duration = get_timedelta_for_mt5_timeframe(mt5_timeframe, CHUNK_SIZE)
|
|
except ValueError as e:
|
|
log.error(f"Error calculating duration: {e}. Stopping download for {symbol}.")
|
|
break
|
|
|
|
# Calculate the temporary end date for this chunk request
|
|
temp_end_dt = current_start_dt + chunk_duration
|
|
|
|
# Ensure the temporary end date doesn't go beyond the overall end date
|
|
temp_end_dt = min(temp_end_dt, end_dt)
|
|
|
|
# Add a small buffer (e.g., 1 second) to start_dt for the request
|
|
# to definitively exclude the current_start_dt bar itself in the range request.
|
|
request_start_dt = current_start_dt + timedelta(seconds=1)
|
|
|
|
# Avoid making a request if the adjusted start is already >= temp_end
|
|
if request_start_dt >= temp_end_dt:
|
|
log.info(f" Skipping request: Calculated range is empty or invalid ({request_start_dt.strftime('%Y-%m-%d %H:%M:%S %Z')} to {temp_end_dt.strftime('%Y-%m-%d %H:%M:%S %Z')}).")
|
|
break # Likely means we are caught up
|
|
|
|
log.info(f" Fetching range from {request_start_dt.strftime('%Y-%m-%d %H:%M:%S %Z')} to {temp_end_dt.strftime('%Y-%m-%d %H:%M:%S %Z')}...")
|
|
|
|
# Use copy_rates_range, since copy_rates_from seems to fetch data using backward lookback(present to past)
|
|
rates = mt5.copy_rates_range(symbol, mt5_timeframe, request_start_dt, temp_end_dt)
|
|
|
|
if rates is None:
|
|
log.error(f" mt5.copy_rates_range returned None. Error: {mt5.last_error()}. Stopping download for {symbol}.")
|
|
break
|
|
if len(rates) == 0:
|
|
log.info(" No data returned in this range. Download may be complete or data gap.")
|
|
# If no data, advance start time past this chunk's end to avoid getting stuck
|
|
current_start_dt = temp_end_dt
|
|
if current_start_dt >= end_dt:
|
|
log.info(" Reached end date after empty range.")
|
|
break
|
|
else:
|
|
log.info(f" Advancing start time to {current_start_dt.strftime('%Y-%m-%d %H:%M:%S %Z')} and continuing.")
|
|
time.sleep(RATE_LIMIT_DELAY) # Still pause slightly
|
|
continue # Try the next chunk
|
|
|
|
chunk_df = pd.DataFrame(rates)
|
|
# Convert 'time' (Unix seconds) to datetime objects (UTC)
|
|
chunk_df[time_column] = pd.to_datetime(chunk_df['time'], unit='s', utc=True)
|
|
all_klines_list.append(chunk_df)
|
|
last_bar_time_in_chunk = chunk_df[time_column].iloc[-1]
|
|
log.info(f" Fetched {len(chunk_df)} bars. Last timestamp in chunk: {last_bar_time_in_chunk.strftime('%Y-%m-%d %H:%M:%S %Z')}")
|
|
# Update the start time for the next chunk request to be the end time of THIS chunk's last bar
|
|
current_start_dt = last_bar_time_in_chunk
|
|
# Small delay before next request
|
|
time.sleep(RATE_LIMIT_DELAY)
|
|
|
|
except Exception as e:
|
|
log.error(f"Exception while requesting klines: {e}")
|
|
return {}
|
|
|
|
df = pd.concat(all_klines_list, axis=0, ignore_index=False)
|
|
df = df.drop_duplicates(subset=time_column)
|
|
df = df.set_index(time_column, inplace=False, drop=False)
|
|
df.name = symbol
|
|
|
|
# Return all received klines with the symbol as a key
|
|
return {symbol: df}
|
|
|
|
async def health_check() -> int:
|
|
"""
|
|
Performs a health check on the MT5 connection.
|
|
It verifies if the MT5 terminal is accessible and returns the status.
|
|
|
|
Returns:
|
|
int: 0 if the MT5 connection is healthy, 1 otherwise.
|
|
|
|
"""
|
|
# Check MT5 connection
|
|
if not mt5.terminal_info():
|
|
log.error(f"MT5 terminal not found.")
|
|
return 1
|
|
return 0
|
|
|
|
def connect_mt5(mt5_account_id: Optional[int] = None, mt5_password: Optional[str] = None, mt5_server: Optional[str] = None, **kwargs):
|
|
"""
|
|
Initializes the MetaTrader 5 connection and attempts to log in with the provided credentials.
|
|
"""
|
|
# Initialize MetaTrader 5 connection
|
|
if not mt5.initialize():
|
|
log.error(f"initialize() failed, error code = {mt5.last_error()}")
|
|
return False
|
|
log.info(f"MT5 Initialized. Version: {mt5.version()}")
|
|
|
|
if mt5_account_id and mt5_password and mt5_server:
|
|
authorized = mt5.login(int(mt5_account_id), password=str(mt5_password), server=str(mt5_server), **kwargs)
|
|
if not authorized:
|
|
log.error(f"MT5 Login failed for account #{mt5_account_id}, error code: {mt5.last_error()}")
|
|
mt5.shutdown()
|
|
return False
|
|
return True
|
|
|
|
def download_klines(config, data_sources):
|
|
"""
|
|
Retrieving historic klines from MetaTrader5 server incrementally using copy_rates_range
|
|
with calculated chunk durations. Downloads data from the last record in the existing file
|
|
(or a historical start date) up to the current time, fetching in duration-based chunks.
|
|
"""
|
|
time_column = config["time_column"]
|
|
data_path = Path(config["data_folder"])
|
|
download_max_rows = config.get("download_max_rows", 0)
|
|
|
|
mt5_account_id = config.get("mt5_account_id")
|
|
mt5_password = config.get("mt5_password")
|
|
mt5_server = config.get("mt5_server")
|
|
|
|
pandas_freq = config["freq"]
|
|
print(f"Pandas frequency: {pandas_freq}")
|
|
|
|
mt5_timeframe = mt5_freq_from_pandas(pandas_freq)
|
|
# Use timeframe_description for clearer output if available
|
|
try:
|
|
tf_description = mt5.timeframe_description(mt5_timeframe)
|
|
print(f"MetaTrader5 frequency: {tf_description} ({mt5_timeframe})")
|
|
except AttributeError: # Handle older MT5 versions potentially lacking this func
|
|
print(f"MetaTrader5 frequency: {mt5_timeframe}")
|
|
|
|
|
|
# Define the timezone for MT5 (usually UTC)
|
|
timezone = pytz.timezone("Etc/UTC")
|
|
# Define a default historical start if no file exists => 2014 | 2017 | 2024
|
|
historical_start_date = datetime(2017, 1, 1, tzinfo=timezone) # Or get from config if needed
|
|
|
|
# Connect to trading account
|
|
if mt5_account_id and mt5_password and mt5_server:
|
|
authorized = connect_mt5(mt5_account_id, password=str(mt5_password), server=str(mt5_server))
|
|
if authorized:
|
|
print("MT5 Login successful.")
|
|
account_info = mt5.account_info()
|
|
if account_info:
|
|
print(f"Account Info: Login={account_info.login}, Server={account_info.server}, Balance={account_info.balance}")
|
|
else:
|
|
print(f"Could not retrieve account info. Error: {mt5.last_error()}")
|
|
else:
|
|
print(f"MT5 Login failed for account #{mt5_account_id}, error code: {mt5.last_error()}")
|
|
mt5.shutdown()
|
|
return
|
|
else:
|
|
print("MT5 credentials not fully provided in config. Proceeding without login (might affect available symbols/data).")
|
|
|
|
print(f"Terminal Info: {mt5.terminal_info()}")
|
|
|
|
|
|
# --- Loop through data sources ---
|
|
|
|
processed_symbols = []
|
|
|
|
for ds in data_sources:
|
|
quote = str(ds.get("folder"))
|
|
file_type = str(ds.get("file")).lower()
|
|
|
|
if not quote:
|
|
print("ERROR: Folder (symbol) is not specified in data_sources.")
|
|
continue
|
|
|
|
|
|
print(f"\n--- Processing symbol: {quote} ---")
|
|
|
|
file_path = data_path / quote
|
|
file_path.mkdir(parents=True, exist_ok=True)
|
|
file_name = (file_path / "klines").with_suffix(".csv")
|
|
chunk_size = int(ds.get("chunk_size", CHUNK_SIZE))
|
|
|
|
|
|
if file_type == "ticks":
|
|
file_name = (file_path / "ticks").with_suffix(".csv")
|
|
chunk_size = int(ds.get("chunk_size", TICK_CHUNK_SIZE))
|
|
|
|
|
|
existing_df = pd.DataFrame()
|
|
start_dt = historical_start_date
|
|
|
|
|
|
# Check if file exists and load data
|
|
if file_name.is_file():
|
|
try:
|
|
print(f"Loading existing data from: {file_name}")
|
|
# Specify date format for potentially faster parsing if consistent
|
|
existing_df = pd.read_csv(file_name, parse_dates=[time_column], date_format='ISO8601')
|
|
# Ensure timezone is set correctly after parsing
|
|
if pd.api.types.is_datetime64_any_dtype(existing_df[time_column]) and existing_df[time_column].dt.tz is None:
|
|
existing_df[time_column] = existing_df[time_column].dt.tz_localize('UTC')
|
|
elif pd.api.types.is_datetime64_any_dtype(existing_df[time_column]):
|
|
existing_df[time_column] = existing_df[time_column].dt.tz_convert('UTC')
|
|
else: # Fallback if parsing failed or column is not datetime
|
|
print(f"Warning: Column '{time_column}' not parsed as datetime. Attempting conversion.")
|
|
existing_df[time_column] = pd.to_datetime(existing_df[time_column], errors='coerce', utc=True)
|
|
|
|
existing_df = existing_df.dropna(subset=[time_column]) # Drop rows where conversion failed
|
|
|
|
if not existing_df.empty:
|
|
# Sort just in case file wasn't sorted
|
|
existing_df = existing_df.sort_values(by=time_column)
|
|
# Start downloading from the timestamp of the last record
|
|
start_dt = existing_df[time_column].iloc[-1]
|
|
print(f"Existing file found. Will download data starting from {start_dt.strftime('%Y-%m-%d %H:%M:%S %Z')}")
|
|
else:
|
|
print("Existing file was empty or had invalid dates after loading. Starting from historical date.")
|
|
existing_df = pd.DataFrame() # Reset to empty
|
|
except Exception as e:
|
|
print(f"Error loading existing file {file_name}: {e}. Starting from historical date.")
|
|
existing_df = pd.DataFrame() # Reset to empty
|
|
else:
|
|
print(f"File not found. Starting download from {historical_start_date.strftime('%Y-%m-%d %H:%M:%S %Z')}.")
|
|
start_dt = historical_start_date # Ensure start_dt is set
|
|
|
|
|
|
# Define end point for download (now)
|
|
end_dt = datetime.now(timezone)
|
|
|
|
# Check if symbol is available
|
|
symbol_info = mt5.symbol_info(quote)
|
|
if not symbol_info:
|
|
print(f"Symbol {quote} not found or not available in MT5 terminal. Skipping. Error: {mt5.last_error()}")
|
|
continue
|
|
if file_type == "ticks" and not symbol_info.trade_tick_size:
|
|
print(f"Ticks data is not available for {quote}. Skipping. Error: {mt5.last_error()}")
|
|
os.remove(file_name)
|
|
continue
|
|
print(f"Symbol {quote} found in MT5.")
|
|
|
|
|
|
all_klines_list = []
|
|
current_start_dt = start_dt
|
|
|
|
print(f"Starting download loop for {quote} from {current_start_dt.strftime('%Y-%m-%d %H:%M:%S %Z')} to {end_dt.strftime('%Y-%m-%d %H:%M:%S %Z')}...")
|
|
|
|
# --- Download Loop using copy_rates_range or copy_ticks_range with calculated duration ---
|
|
while current_start_dt < end_dt:
|
|
try:
|
|
# Calculate the duration for chunk_size bars or ticks
|
|
|
|
|
|
chunk_duration = get_timedelta_for_mt5_timeframe(mt5_timeframe, chunk_size)
|
|
except ValueError as e:
|
|
print(f"Error calculating duration: {e}. Stopping download for {quote}.")
|
|
break
|
|
|
|
# Calculate the temporary end date for this chunk request
|
|
temp_end_dt = current_start_dt + chunk_duration
|
|
|
|
# Ensure the temporary end date doesn't go beyond the overall end date
|
|
temp_end_dt = min(temp_end_dt, end_dt)
|
|
|
|
# Add a small buffer (e.g., 1 second) to start_dt for the request
|
|
# to definitively exclude the current_start_dt bar itself in the range request.
|
|
request_start_dt = current_start_dt + timedelta(seconds=1)
|
|
|
|
# Avoid making a request if the adjusted start is already >= temp_end
|
|
if request_start_dt >= temp_end_dt:
|
|
print(f" Skipping request: Calculated range is empty or invalid ({request_start_dt.strftime('%Y-%m-%d %H:%M:%S %Z')} to {temp_end_dt.strftime('%Y-%m-%d %H:%M:%S %Z')}).")
|
|
break # Likely means we are caught up
|
|
|
|
print(f" Fetching range from {request_start_dt.strftime('%Y-%m-%d %H:%M:%S %Z')} to {temp_end_dt.strftime('%Y-%m-%d %H:%M:%S %Z')}...")
|
|
|
|
# Use copy_rates_range or copy_ticks_range
|
|
if file_type == "ticks":
|
|
rates = mt5.copy_ticks_range(quote, request_start_dt, temp_end_dt, mt5.COPY_TICKS_ALL)
|
|
if rates is None:
|
|
print(f" mt5.copy_ticks_range returned None. Error: {mt5.last_error()}. Stopping download for {quote}.")
|
|
break
|
|
else:
|
|
rates = mt5.copy_rates_range(quote, mt5_timeframe, request_start_dt, temp_end_dt)
|
|
if rates is None:
|
|
print(f" mt5.copy_rates_range returned None. Error: {mt5.last_error()}. Stopping download for {quote}.")
|
|
break
|
|
|
|
if len(rates) == 0:
|
|
print(" No data returned in this range. Download may be complete or data gap.")
|
|
# If no data, advance start time past this chunk's end to avoid getting stuck
|
|
current_start_dt = temp_end_dt
|
|
if current_start_dt >= end_dt:
|
|
print(" Reached end date after empty range.")
|
|
break
|
|
else:
|
|
print(f" Advancing start time to {current_start_dt.strftime('%Y-%m-%d %H:%M:%S %Z')} and continuing.")
|
|
time.sleep(RATE_LIMIT_DELAY) # Still pause slightly
|
|
continue # Try the next chunk
|
|
|
|
chunk_df = pd.DataFrame(rates)
|
|
if file_type == "ticks":
|
|
# Convert 'time_msc' (Unix milliseconds) to datetime objects (UTC)
|
|
chunk_df[time_column] = pd.to_datetime(chunk_df['time_msc'], unit='ms', utc=True)
|
|
else:
|
|
# Convert 'time' (Unix seconds) to datetime objects (UTC)
|
|
chunk_df[time_column] = pd.to_datetime(chunk_df['time'], unit='s', utc=True)
|
|
|
|
# --- IMPORTANT: Filtering is no longer needed here ---
|
|
# Since we requested data *starting after* current_start_dt using request_start_dt,
|
|
# the check `chunk_df = chunk_df[chunk_df[time_column] > current_start_dt]`
|
|
# is redundant and can be removed.
|
|
|
|
# if chunk_df.empty: # This check is effectively handled by len(rates) == 0 now
|
|
# print(" No new bars found in the fetched chunk (after filtering). Stopping.")
|
|
# break
|
|
|
|
all_klines_list.append(chunk_df)
|
|
last_bar_time_in_chunk = chunk_df[time_column].iloc[-1]
|
|
print(f" Fetched {len(chunk_df)} bars. Last timestamp in chunk: {last_bar_time_in_chunk.strftime('%Y-%m-%d %H:%M:%S %Z')}")
|
|
|
|
# Update the start time for the next chunk request to be the end time of THIS chunk's last bar
|
|
current_start_dt = last_bar_time_in_chunk
|
|
|
|
# Check if we've downloaded past the intended end time (redundant check, loop condition handles it)
|
|
# if current_start_dt >= end_dt:
|
|
# print(" Reached or passed target end time. Download complete.")
|
|
# break
|
|
|
|
# Small delay before next request
|
|
time.sleep(RATE_LIMIT_DELAY)
|
|
|
|
# --- Combine and Process Data ---
|
|
if not all_klines_list:
|
|
print(f"No new data downloaded for {quote}.")
|
|
if existing_df.empty:
|
|
print(f"No existing or new data for {quote}. Skipping save.")
|
|
continue
|
|
else:
|
|
print("Saving existing data only (no updates).")
|
|
final_df = existing_df # Use existing data if no new data was fetched
|
|
else:
|
|
print("Combining downloaded data...")
|
|
new_df = pd.concat(all_klines_list, ignore_index=True)
|
|
|
|
# Combine existing and new data
|
|
if not existing_df.empty:
|
|
final_df = pd.concat([existing_df, new_df], ignore_index=True)
|
|
else:
|
|
final_df = new_df
|
|
|
|
print("Processing combined data (duplicates, sorting, columns)...")
|
|
if file_type == "ticks":
|
|
# Standardize columns (assuming MT5 names)
|
|
final_df.rename(columns={
|
|
'time_msc': 'time',
|
|
'flags': 'flags',
|
|
'bid': 'bid',
|
|
'ask': 'ask',
|
|
'last': 'last',
|
|
'volume': 'volume',
|
|
}, inplace=True, errors='ignore') # Added errors='ignore'
|
|
else:
|
|
final_df.rename(columns={
|
|
'tick_volume': 'volume', # Use tick_volume as 'volume'
|
|
}, inplace=True, errors='ignore') # Added errors='ignore'
|
|
|
|
|
|
# Ensure time column is the primary datetime column and drop time column if exist
|
|
if 'time' in final_df.columns and time_column != 'time':
|
|
final_df = final_df.drop('time', axis=1)
|
|
|
|
# Select desired columns (ensure time_column is first)
|
|
required_columns = [time_column, 'open', 'high', 'low', 'close', 'volume']
|
|
# Keep only columns that actually exist in the dataframe
|
|
final_df = final_df[[col for col in required_columns if col in final_df.columns]]
|
|
|
|
# Remove duplicates based on timestamp, keeping the latest entry
|
|
initial_rows = len(final_df)
|
|
final_df = final_df.drop_duplicates(subset=[time_column], keep='last')
|
|
if initial_rows > len(final_df):
|
|
print(f" Removed {initial_rows - len(final_df)} duplicate rows based on '{time_column}'.")
|
|
|
|
# Sort by timestamp
|
|
final_df = final_df.sort_values(by=time_column)
|
|
|
|
# Remove the last row *if* it represents the current, incomplete bar.
|
|
if not final_df.empty:
|
|
# Check if the last bar's time is too close to the script end time
|
|
# A simple heuristic: if the last bar's time is after the loop's end_dt minus one interval, it might be incomplete.
|
|
# Or just always drop the last row after bulk download.
|
|
print("Removing potentially incomplete last bar.")
|
|
final_df = final_df.iloc[:-1]
|
|
|
|
|
|
# Apply max rows limit if specified (same as before)
|
|
if download_max_rows and len(final_df) > download_max_rows:
|
|
print(f"Applying download_max_rows limit: {download_max_rows}")
|
|
final_df = final_df.tail(download_max_rows)
|
|
|
|
# Final check if DataFrame is valid before saving (same as before)
|
|
if final_df.empty:
|
|
print(f"Final dataframe for {quote} is empty after processing. Skipping save.")
|
|
continue
|
|
|
|
# Reset index before saving (same as before)
|
|
final_df = final_df.reset_index(drop=True)
|
|
|
|
# --- Save Data (same as before) ---
|
|
|
|
try:
|
|
if file_type == "ticks":
|
|
final_df = final_df.drop(['time'], axis=1)
|
|
|
|
print(f"Saving {len(final_df)} rows to {file_name}...")
|
|
final_df.to_csv(file_name, index=False, date_format='%Y-%m-%dT%H:%M:%SZ')
|
|
print(f"Finished saving '{quote}'.")
|
|
processed_symbols.append(quote)
|
|
except Exception as e:
|
|
print(f"Error saving file {file_name}: {e}")
|
|
|
|
|
|
# --- Shutdown MT5 (same as before) ---
|
|
print("\nShutting down MetaTrader 5 connection...")
|
|
mt5.shutdown()
|
|
print(f"\nFinished downloading data for symbols: {', '.join(processed_symbols) if processed_symbols else 'None'}")
|