MQL5-Google-Onedrive/scripts/market_research.py
copilot-swe-agent[bot] e7f2d678e2 Add shared utilities and refactor AI client duplications
Co-authored-by: Mouy-leng <199350297+Mouy-leng@users.noreply.github.com>
2026-02-11 08:36:58 +00:00

217 lines
7.9 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Market Research Script
Connects to Gemini and Jules to analyze market data and generate research reports.
"""
import json
import concurrent.futures
from datetime import datetime
# Use shared utilities to reduce code duplication
from common.logger_config import setup_basic_logging
from common.paths import DOCS_DIR, DATA_DIR
from common.config_loader import load_env
from common.ai_client import GeminiClient, JulesClient
# Setup logging using shared config
logger = setup_basic_logging()
# Load environment variables
load_env()
def get_market_data():
"""
Fetch market data using yfinance if available, otherwise use simulation.
"""
data = None
# ⚡ Optimization: Lazy import heavy dependencies
try:
import yfinance as yf
import pandas as pd
yfinance_available = True
except ImportError:
yfinance_available = False
if yfinance_available:
try:
logger.info("Fetching real market data via yfinance...")
symbols = ["EURUSD=X", "GBPUSD=X", "GC=F", "BTC-USD"]
market_data = {"timestamp": datetime.now().isoformat(), "symbols": {}}
# ⚡ Performance Optimization: Batch fetch all symbols to reduce HTTP requests (~3x speedup)
try:
# group_by='ticker' ensures consistent structure (MultiIndex if >1 symbol)
tickers_data = yf.download(symbols, period="14d", interval="1d", group_by='ticker', progress=False)
except Exception as e:
logger.error(f"Bulk download failed: {e}")
tickers_data = None
if tickers_data is not None and not tickers_data.empty:
# ⚡ Performance Optimization: Move structural checks outside the loop
is_multi = isinstance(tickers_data.columns, pd.MultiIndex)
# Determine which symbols are actually available in the downloaded data
if is_multi:
available_symbols = [s for s in symbols if s in tickers_data.columns.levels[0]]
elif len(symbols) == 1:
# If not MultiIndex and only 1 symbol requested, it's that symbol
available_symbols = symbols
else:
# Logic Error Fix: If structure is flat but multiple symbols requested,
# we can't safely assign it to any/all symbols.
available_symbols = []
for sym in available_symbols:
try:
hist = tickers_data[sym] if is_multi else tickers_data
if hist is not None and not hist.empty:
# Clean up data
if 'Close' in hist.columns:
hist = hist.dropna(subset=['Close'])
else:
continue
if hist.empty:
continue
current_price = hist['Close'].iloc[-1]
# Check if we have enough data
prev_price = hist['Close'].iloc[-2] if len(hist) > 1 else current_price
# Simple Trend (Price vs 5-day SMA)
sma_5 = hist['Close'].tail(5).mean()
trend = "UP" if current_price > sma_5 else "DOWN"
# Volatility (High - Low)
daily_range = hist['High'].iloc[-1] - hist['Low'].iloc[-1]
volatility = "HIGH" if daily_range > (current_price * 0.01) else "LOW" # Arbitrary threshold
market_data["symbols"][sym] = {
"price": round(current_price, 4),
"trend": trend,
"volatility": volatility,
"history_last_5_closes": [round(x, 4) for x in hist['Close'].tail(5).tolist()]
}
except Exception as e:
logger.warning(f"Failed to process {sym}: {e}")
if market_data["symbols"]:
data = market_data
except Exception as e:
logger.error(f"yfinance failed: {e}")
if data:
return data
logger.info("Using simulated/fallback market data.")
return {
"timestamp": datetime.now().isoformat(),
"symbols": {
"EURUSD": {"price": 1.0850, "trend": "UP", "rsi": 65.5, "volatility": "MEDIUM"},
"GBPUSD": {"price": 1.2700, "trend": "SIDEWAYS", "rsi": 50.2, "volatility": "LOW"},
"XAUUSD": {"price": 2030.50, "trend": "DOWN", "rsi": 35.0, "volatility": "HIGH"}
}
}
def analyze_with_gemini(data):
"""
Send data to Gemini for analysis using shared AI client.
"""
gemini = GeminiClient()
if not gemini.is_available():
logger.warning("GEMINI_API_KEY/GOOGLE_API_KEY not found. Skipping Gemini analysis.")
return None
prompt = f"""
Analyze the following market data and provide a research report for a trading bot.
Focus on:
1. Current market regime (Trending, Ranging, Volatile).
2. Potential trade setups based on Price Action and Trend.
3. Risk management suggestions.
Data:
{json.dumps(data, indent=2)}
"""
try:
response = gemini.generate(prompt)
return response if response else f"Gemini analysis failed"
except Exception as e:
logger.error(f"Gemini analysis failed: {e}")
return f"Gemini analysis failed: {e}"
def analyze_with_jules(data):
"""
Send data to Jules for analysis using shared AI client.
"""
jules = JulesClient()
if not jules.is_available():
return None
prompt = f"""
You are an expert market analyst. Analyze the following market data and provide a research report for a trading bot.
Focus on:
1. Macro view and Sentiment.
2. Specific trade ideas.
3. Correlation analysis if multiple symbols provided.
Data:
{json.dumps(data, indent=2)}
"""
try:
response = jules.generate(prompt)
return response if response else "Jules analysis failed"
except Exception as e:
logger.error(f"Jules analysis failed: {e}")
error_msg = f"Jules analysis failed: {e}"
if "NameResolutionError" in str(e) or "Failed to resolve" in str(e):
error_msg += "\n\n**Hint:** The Jules API URL might be incorrect. Please check `JULES_API_URL` in `.env`."
return error_msg
def main():
logger.info("Starting Market Research...")
# Ensure directories exist
DOCS_DIR.mkdir(exist_ok=True)
DATA_DIR.mkdir(exist_ok=True)
data = get_market_data()
logger.info(f"Market data loaded for {len(data.get('symbols', {}))} symbols.")
# Save raw data snapshot
with open(DATA_DIR / "market_snapshot.json", 'w') as f:
json.dump(data, f, indent=2)
# Parallelize AI analysis calls
with concurrent.futures.ThreadPoolExecutor() as executor:
future_gemini = executor.submit(analyze_with_gemini, data)
future_jules = executor.submit(analyze_with_jules, data)
gemini_report = future_gemini.result()
jules_report = future_jules.result()
report_path = DOCS_DIR / "market_research_report.md"
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
content = f"# Market Research Report\n\nGenerated: {timestamp}\n\n"
if gemini_report:
content += f"## Gemini Analysis\n\n{gemini_report}\n\n"
if jules_report:
content += f"## Jules Analysis\n\n{jules_report}\n\n"
if not gemini_report and not jules_report:
content += "## Analysis Failed\n\nNo AI providers were available or both failed."
with open(report_path, 'w') as f:
f.write(content)
logger.info(f"Report saved to {report_path}")
if __name__ == "__main__":
main()