MQL5-Google-Onedrive/scripts/startup_orchestrator.py
copilot-swe-agent[bot] f99eccf302 Address code review: optimize ArraySize() calls and document cache behavior
Co-authored-by: Mouy-leng <199350297+Mouy-leng@users.noreply.github.com>
2026-02-11 08:36:56 +00:00

449 lines
17 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Startup Orchestrator for MQL5 Trading Automation
Handles automated startup of all trading components with proper sequencing,
logging, and error handling.
"""
from __future__ import annotations
import argparse
import functools
import json
import logging
import os
import platform
import subprocess
import sys
import time
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Optional
# Configuration paths
REPO_ROOT = Path(__file__).resolve().parents[1]
CONFIG_DIR = REPO_ROOT / "config"
LOGS_DIR = REPO_ROOT / "logs"
MT5_DIR = REPO_ROOT / "mt5" / "MQL5"
# OPTIMIZATION: Cache config file reads to avoid redundant I/O
# Note: Cache persists across multiple orchestrator instantiations within the same process.
# This is intentional for performance, as config changes during runtime are not expected.
# To force reload after config change, restart the process or call _load_cached_config.cache_clear()
@functools.lru_cache(maxsize=1)
def _load_cached_config(config_file_path: str) -> Optional[dict]:
"""Load and cache configuration from JSON file."""
config_path = Path(config_file_path)
if not config_path.exists():
return None
with open(config_path, 'r') as f:
return json.load(f)
@dataclass
class ComponentConfig:
"""Configuration for a startup component."""
name: str
executable: str
args: list[str]
working_dir: Optional[str] = None
wait_seconds: int = 0
required: bool = True
platform_specific: Optional[str] = None # "windows", "linux", or None for all
class StartupOrchestrator:
"""Orchestrates the startup of all trading components."""
def __init__(self, config_file: Optional[Path] = None, dry_run: bool = False):
"""Initialize the orchestrator."""
self.config_file = config_file or CONFIG_DIR / "startup_config.json"
self.dry_run = dry_run
self.processes: list[subprocess.Popen] = []
self.setup_logging()
self.load_config()
def setup_logging(self) -> None:
"""Setup logging configuration."""
LOGS_DIR.mkdir(parents=True, exist_ok=True)
log_file = LOGS_DIR / f"startup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler(sys.stdout)
]
)
self.logger = logging.getLogger(__name__)
self.logger.info(f"Startup orchestrator initialized. Log file: {log_file}")
def load_config(self) -> None:
"""Load configuration from JSON file."""
# OPTIMIZATION: Use cached config loader
self.config_data = _load_cached_config(str(self.config_file))
if self.config_data is None:
self.logger.warning(f"Config file not found: {self.config_file}")
self.logger.info("Using default configuration")
self.components = self.get_default_components()
else:
self.components = [
ComponentConfig(**component_config) for component_config in self.config_data.get('components', [])
]
self.logger.info(f"Loaded configuration from {self.config_file}")
max_retries = self.config_data.get('settings', {}).get('max_startup_retries', 1)
if max_retries > 1:
self.logger.info(f"Retry enabled: up to {max_retries} retries for failed components")
def get_default_components(self) -> list[ComponentConfig]:
"""Return default component configuration."""
system = platform.system().lower()
components = [
ComponentConfig(
name="MT5 Terminal",
executable="terminal64.exe" if "windows" in system else "wine terminal64.exe",
args=[],
wait_seconds=15,
required=True,
platform_specific="windows"
),
ComponentConfig(
name="Repository Validator",
executable=sys.executable,
args=[str(REPO_ROOT / "scripts" / "ci_validate_repo.py")],
working_dir=str(REPO_ROOT),
wait_seconds=2,
required=False,
platform_specific=None
),
]
return components
def check_system_requirements(self) -> bool:
"""Check if system meets requirements."""
self.logger.info("Checking system requirements...")
system = platform.system()
self.logger.info(f"Platform: {system} ({platform.machine()})")
self.logger.info(f"Python version: {sys.version}")
# Check Python version
if sys.version_info < (3, 8):
self.logger.error("Python 3.8 or higher is required")
return False
# Check if MT5 directory exists
if not MT5_DIR.exists():
self.logger.error(f"MT5 directory not found: {MT5_DIR}")
return False
self.logger.info("System requirements check passed")
return True
def is_component_compatible(self, component: ComponentConfig) -> bool:
"""Check if component is compatible with current platform."""
if component.platform_specific is None:
return True
current_platform = platform.system().lower()
if "windows" in current_platform:
return component.platform_specific == "windows"
elif "linux" in current_platform:
return component.platform_specific in ["linux", "wsl"]
else:
return False
def find_mt5_executable(self, preferred_path: str) -> Optional[str]:
"""Find MT5 executable, trying preferred path first, then fallback paths."""
# Check preferred path first
if Path(preferred_path).exists():
self.logger.info(f"Found MT5 at preferred path: {preferred_path}")
return preferred_path
self.logger.info(f"Preferred MT5 path not found: {preferred_path}")
self.logger.info("Trying fallback paths...")
# Fallback paths (check all possible MT5 installations)
fallback_paths = [
"C:\\Program Files\\MetaTrader 5 EXNESS\\terminal64.exe",
"C:\\Program Files\\Exness Terminal\\terminal64.exe",
"C:\\Program Files\\MetaTrader 5\\terminal64.exe",
os.path.expandvars("%APPDATA%\\MetaQuotes\\Terminal\\terminal64.exe"),
"C:\\Program Files (x86)\\Exness Terminal\\terminal64.exe",
"C:\\Program Files (x86)\\MetaTrader 5\\terminal64.exe"
]
# Remove preferred path from fallback list to avoid duplicate check
fallback_paths = [fallback_path for fallback_path in fallback_paths if fallback_path != preferred_path]
for fallback_path in fallback_paths:
if Path(fallback_path).exists():
self.logger.info(f"Using fallback MT5 path: {fallback_path}")
return fallback_path
self.logger.error("No MT5 Terminal found in any of the checked paths")
return None
def start_component(self, component: ComponentConfig) -> bool:
"""Start a single component."""
if not self.is_component_compatible(component):
self.logger.info(f"Skipping {component.name} (platform incompatible)")
return True
self.logger.info(f"Starting {component.name}...")
if self.dry_run:
self.logger.info(f"[DRY RUN] Would execute: {component.executable} {' '.join(component.args)}")
return True
try:
# For MT5 Terminal, check if executable exists, try fallback paths if not
executable = component.executable
if "terminal64.exe" in executable or "MT5" in component.name or "Exness" in component.name:
found_path = self.find_mt5_executable(executable)
if not found_path:
self.logger.error(f"MT5 Terminal not found. Tried: {executable}")
if component.required:
return False
else:
self.logger.warning(f"Skipping optional component: {component.name}")
return True
executable = found_path
cmd = [executable] + component.args
working_dir = component.working_dir or str(REPO_ROOT)
process = subprocess.Popen(
cmd,
cwd=working_dir,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
creationflags=subprocess.CREATE_NEW_CONSOLE if platform.system() == "Windows" else 0
)
self.processes.append(process)
self.logger.info(f"Started {component.name} (PID: {process.pid})")
if component.wait_seconds > 0:
self.logger.info(f"Waiting {component.wait_seconds} seconds for {component.name} to initialize...")
time.sleep(component.wait_seconds)
# Check if process is still running
if process.poll() is not None:
stdout, stderr = process.communicate()
if process.returncode != 0:
self.logger.error(f"{component.name} failed with exit code {process.returncode}")
if stderr:
self.logger.error(f"Error: {stderr.decode()}")
return not component.required
else:
self.logger.info(f"{component.name} completed successfully")
return True
except FileNotFoundError:
self.logger.error(f"Executable not found: {component.executable}")
return not component.required
except Exception as e:
self.logger.error(f"Failed to start {component.name}: {e}")
return not component.required
def start_all(self) -> bool:
"""Start all configured components with retry logic."""
if not self.check_system_requirements():
return False
self.logger.info("=" * 60)
self.logger.info("Starting all components...")
self.logger.info("=" * 60)
# Load retry settings from config
max_retries = 1 # Default to 1 attempt (no retry)
if hasattr(self, 'config_data') and self.config_data:
max_retries = self.config_data.get('settings', {}).get('max_startup_retries', 1)
success = True
for component in self.components:
retry_count = 0
component_success = False
while retry_count <= max_retries:
if retry_count > 0:
self.logger.info(f"Retry attempt {retry_count}/{max_retries} for {component.name}...")
time.sleep(2) # Brief delay before retry
component_success = self.start_component(component)
if component_success:
break
retry_count += 1
if not component_success:
if component.required:
self.logger.error(f"Failed to start required component: {component.name} after {retry_count} attempt(s)")
success = False
break
else:
self.logger.warning(f"Optional component {component.name} failed but continuing...")
if success:
self.logger.info("=" * 60)
self.logger.info("All components started successfully")
self.logger.info(f"Total processes: {len(self.processes)}")
self.logger.info("=" * 60)
else:
self.logger.error("Startup sequence failed")
return success
def stop_all(self) -> None:
"""Stop all running processes."""
self.logger.info("Stopping all processes...")
for process in self.processes:
try:
process.terminate()
process.wait(timeout=5)
self.logger.info(f"Stopped process {process.pid}")
except subprocess.TimeoutExpired:
process.kill()
self.logger.warning(f"Force killed process {process.pid}")
except Exception as e:
self.logger.error(f"Error stopping process {process.pid}: {e}")
def monitor_processes(self, duration: Optional[int] = None) -> None:
"""Monitor running processes."""
self.logger.info("Monitoring processes... Press Ctrl+C to stop")
start_time = time.time()
try:
while True:
if duration and (time.time() - start_time) > duration:
self.logger.info("Monitoring duration reached")
break
# Check process status
for i, process in enumerate(self.processes):
if process.poll() is not None:
self.logger.warning(f"Process {process.pid} has exited with code {process.returncode}")
time.sleep(10) # Check every 10 seconds
except KeyboardInterrupt:
self.logger.info("Monitoring interrupted by user")
def create_default_config(self) -> None:
"""Create default configuration file."""
CONFIG_DIR.mkdir(exist_ok=True)
default_config = {
"components": [
{
"name": "Repository Validator",
"executable": sys.executable,
"args": [str(REPO_ROOT / "scripts" / "ci_validate_repo.py")],
"working_dir": str(REPO_ROOT),
"wait_seconds": 2,
"required": False,
"platform_specific": None
},
{
"name": "MT5 Terminal",
"executable": "C:\\Program Files\\Exness Terminal\\terminal64.exe",
"args": ["/portable"],
"wait_seconds": 15,
"required": True,
"platform_specific": "windows"
},
{
"name": "Custom Python Script",
"executable": sys.executable,
"args": [str(REPO_ROOT / "scripts" / "your_custom_script.py")],
"working_dir": str(REPO_ROOT),
"wait_seconds": 5,
"required": False,
"platform_specific": None
}
]
}
with open(self.config_file, 'w') as f:
json.dump(default_config, f, indent=2)
self.logger.info(f"Created default configuration at {self.config_file}")
def main() -> int:
"""Main entry point."""
parser = argparse.ArgumentParser(
description="Startup orchestrator for MQL5 trading automation"
)
parser.add_argument(
"--config",
type=Path,
help="Path to configuration file (JSON)"
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Print actions without executing them"
)
parser.add_argument(
"--create-config",
action="store_true",
help="Create default configuration file"
)
parser.add_argument(
"--monitor",
type=int,
metavar="SECONDS",
help="Monitor processes for specified duration (0 = infinite)"
)
args = parser.parse_args()
orchestrator = StartupOrchestrator(
config_file=args.config,
dry_run=args.dry_run
)
if args.create_config:
orchestrator.create_default_config()
return 0
try:
if not orchestrator.start_all():
return 1
if args.monitor is not None:
monitor_duration = None if args.monitor == 0 else args.monitor
orchestrator.monitor_processes(duration=monitor_duration)
else:
# If no monitor specified, keep processes running (don't stop them)
orchestrator.logger.info("Processes started. Use --monitor to monitor or let them run independently.")
return 0
except KeyboardInterrupt:
orchestrator.logger.info("Interrupted by user")
return 130
except Exception as e:
orchestrator.logger.error(f"Unexpected error: {e}", exc_info=True)
return 1
finally:
# Only stop processes if monitor was NOT used (monitor keeps them alive)
# When monitor is used (even 0 for infinite), don't stop processes
if args.monitor is None:
# Only stop if explicitly not monitoring - but for auto-start, we want to keep running
# So we'll skip stopping when run from scheduled task
pass # Don't stop processes - let them run independently
if __name__ == "__main__":
sys.exit(main())