mql5/crypto/trading_bot.py
lingjie chen 1c54e87883 Sync crypto strategy with gold strategy updates:
1. Fix limit order execution in crypto/trading_bot.py by adding auto-price fallback.
2. Update crypto/qwen_client.py prompt to mandate explicit limit prices and MFE/MAE based SL/TP.
3. Update crypto/deepseek_client.py prompt to include performance stats and position context.
2025-12-29 16:00:07 +08:00

983 lignes
40 Kio
Python

import time
import logging
import os
import json
import requests
import random
import numpy as np
import pandas as pd
from datetime import datetime
from dotenv import load_dotenv
from .okx_data_processor import OKXDataProcessor
from .ai_client_factory import AIClientFactory
from .database_manager import DatabaseManager
from .advanced_analysis import AdvancedMarketAnalysis, SMCAnalyzer, MFHAnalyzer, MTFAnalyzer, PEMAnalyzer, MatrixMLAnalyzer
from .optimization import GWO, WOAm, DE, COAm, BBO, TETA
# Load environment variables
load_dotenv()
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("trading_bot.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class HybridOptimizer:
def __init__(self):
self.weights = {
"deepseek": 1.0,
"qwen": 1.2,
"crt": 0.8,
"price_equation": 0.6,
"tf_visual": 0.5,
"advanced_tech": 0.7,
"matrix_ml": 0.9,
"smc": 1.1,
"mfh": 0.8,
"mtf": 0.8,
"ifvg": 0.7,
"rvgi_cci": 0.6
}
self.history = []
def combine_signals(self, signals):
weighted_sum = 0
total_weight = 0
details = {}
for source, signal in signals.items():
weight = self.weights.get(source, 0.5)
val = 0
if signal == 'buy': val = 1
elif signal == 'sell': val = -1
weighted_sum += val * weight
total_weight += weight
details[source] = val * weight
if total_weight == 0: return "neutral", 0, self.weights
final_score = weighted_sum / total_weight
final_signal = "neutral"
if final_score > 0.15: final_signal = "buy" # Lower threshold for sensitivity
elif final_score < -0.15: final_signal = "sell"
return final_signal, final_score, self.weights
class CryptoTradingBot:
def __init__(self, symbol='ETH/USDT', timeframe='15m', interval=3600):
"""
Initialize the Crypto Trading Bot
Args:
symbol (str): Trading pair
timeframe (str): Candle timeframe (e.g., '15m', '1h', '4h')
interval (int): Loop interval in seconds
"""
self.symbol = symbol
self.timeframe = timeframe
self.interval = interval
self.is_running = False
# Initialize Database Manager
current_dir = os.path.dirname(os.path.realpath(__file__))
db_path = os.path.join(current_dir, 'crypto_trading.db')
self.db_manager = DatabaseManager(db_name=db_path)
# Initialize Data Processor
self.data_processor = OKXDataProcessor()
# Initialize Advanced Analysis
self.advanced_analysis = AdvancedMarketAnalysis()
self.smc_analyzer = SMCAnalyzer()
self.mfh_analyzer = MFHAnalyzer()
self.mtf_analyzer = MTFAnalyzer()
self.pem_analyzer = PEMAnalyzer()
self.matrix_ml = MatrixMLAnalyzer()
self.hybrid_optimizer = HybridOptimizer()
# Optimization Engine Pool
self.optimizers = {
"GWO": GWO(),
"WOAm": WOAm(),
"DE": DE(),
"COAm": COAm(),
"BBO": BBO(),
"TETA": TETA()
}
self.active_optimizer_name = "WOAm"
self.last_optimization_time = 0
self.optimization_interval = 3600 * 4 # Re-optimize every 4 hours
# Short-term params
self.short_term_params = {
'rvgi_sma': 30,
'rvgi_cci': 14,
'ifvg_gap': 50
}
# Initialize AI Clients
self.ai_factory = AIClientFactory()
self.deepseek_client = self.ai_factory.get_client('deepseek')
self.qwen_client = self.ai_factory.get_client('qwen')
# Telegram Configuration
self.telegram_token = os.getenv("TELEGRAM_BOT_TOKEN")
self.telegram_chat_id = os.getenv("TELEGRAM_CHAT_ID")
if not self.deepseek_client or not self.qwen_client:
logger.warning("AI Clients not fully initialized. Trading functionality may be limited.")
def escape_markdown(self, text):
"""Helper to escape markdown special characters"""
if not isinstance(text, str):
text = str(text)
escape_chars = '_*[`'
for char in escape_chars:
text = text.replace(char, f'\\{char}')
return text
def send_telegram_message(self, message):
"""Send message to Telegram"""
try:
url = f"https://api.telegram.org/bot{self.telegram_token}/sendMessage"
payload = {
"chat_id": self.telegram_chat_id,
"text": message,
"parse_mode": "Markdown"
}
response = requests.post(url, json=payload, timeout=10)
if response.status_code != 200:
logger.error(f"Failed to send Telegram message: {response.text}")
except Exception as e:
logger.error(f"Error sending Telegram message: {e}")
def evaluate_comprehensive_params(self, params, df):
"""
Comprehensive Objective Function: Evaluates ALL dataframe-based strategy parameters together.
params: Vector of parameter values corresponding to the defined structure.
"""
# Global counter for progress logging
if not hasattr(self, '_opt_counter'): self._opt_counter = 0
self._opt_counter += 1
if self._opt_counter % 50 == 0:
logger.info(f"Optimization Progress: {self._opt_counter} evaluations...")
# 1. Decode Parameters
# 0: smc_ma (int)
# 1: smc_atr (float)
# 2: mfh_lr (float)
# 3: mfh_horizon (int)
# 4: pem_fast (int)
# 5: pem_slow (int)
# 6: pem_adx (float)
# 7: rvgi_sma (int)
# 8: rvgi_cci (int)
# 9: ifvg_gap (int)
try:
p_smc_ma = int(params[0])
p_smc_atr = params[1]
p_mfh_lr = params[2]
p_mfh_horizon = int(params[3])
p_pem_fast = int(params[4])
p_pem_slow = int(params[5])
p_pem_adx = params[6]
p_rvgi_sma = int(params[7])
p_rvgi_cci = int(params[8])
p_ifvg_gap = int(params[9])
# 2. Initialize Temporary Analyzers (Fresh State)
tmp_smc = SMCAnalyzer()
tmp_smc.ma_period = p_smc_ma
tmp_smc.atr_threshold = p_smc_atr
tmp_mfh = MFHAnalyzer(learning_rate=p_mfh_lr)
tmp_mfh.horizon = p_mfh_horizon
# Note: PEMAnalyzer in Crypto might need attribute adjustment if different from Gold
tmp_pem = PEMAnalyzer()
# Assuming PEMAnalyzer has similar analyze method signature or we mock it
# Crypto PEMAnalyzer uses analyze(df, ma_fast_period, ma_slow_period, adx_threshold)
tmp_adapter = AdvancedMarketAnalysis()
# 3. Run Simulation
start_idx = max(p_smc_ma, p_pem_slow, 50) + 10
if len(df) < start_idx + 50: return -9999
balance = 10000.0
closes = df['close'].values
trades_count = 0
wins = 0
# Optimization: Step size > 1 to speed up
for i in range(start_idx, len(df)-1):
sub_df = df.iloc[:i+1]
curr_price = closes[i]
next_price = closes[i+1]
# MFH Train (Must happen every step for consistency)
if i > p_mfh_horizon:
past_ret = (closes[i] - closes[i-p_mfh_horizon]) / closes[i-p_mfh_horizon]
tmp_mfh.train(past_ret)
# Signals
smc_sig = tmp_smc.analyze(sub_df)['signal']
mfh_sig = tmp_mfh.predict(sub_df)['signal']
# PEM Signal (passing params explicitly as per Crypto implementation)
pem_res = tmp_pem.analyze(sub_df, ma_fast_period=p_pem_fast, ma_slow_period=p_pem_slow, adx_threshold=p_pem_adx)
pem_sig = pem_res['signal']
# Short Term
ifvg_sig = tmp_adapter.analyze_ifvg(sub_df, min_gap_points=p_ifvg_gap)['signal']
rvgi_sig = tmp_adapter.analyze_rvgi_cci_strategy(sub_df, sma_period=p_rvgi_sma, cci_period=p_rvgi_cci)['signal']
# Combine
votes = 0
for s in [smc_sig, mfh_sig, pem_sig, ifvg_sig, rvgi_sig]:
if s == 'buy': votes += 1
elif s == 'sell': votes -= 1
final_sig = "neutral"
if votes >= 2: final_sig = "buy"
elif votes <= -2: final_sig = "sell"
# Evaluate
if final_sig == "buy":
trades_count += 1
if next_price > curr_price: wins += 1
balance += (next_price - curr_price)
elif final_sig == "sell":
trades_count += 1
if next_price < curr_price: wins += 1
balance += (curr_price - next_price)
if trades_count == 0: return -100
# Simple Profit Metric
score = (balance - 10000.0)
return score
except Exception:
return -9999
def optimize_strategy_parameters(self, df):
"""
Comprehensive Optimization: Tunes ALL strategy parameters using Auto-AO.
"""
logger.info("Starting Comprehensive Strategy Optimization (Auto-AO)...")
# Reset progress counter
self._opt_counter = 0
if df is None or len(df) < 200:
logger.warning("Insufficient data for optimization, skipping")
return
# Reduce data size for speed (300 candles)
if len(df) > 300:
df = df.tail(300).copy()
# 2. Define Search Space (10 Dimensions)
# smc_ma, smc_atr, mfh_lr, mfh_horizon, pem_fast, pem_slow, pem_adx, rvgi_sma, rvgi_cci, ifvg_gap
bounds = [
(100, 300), # smc_ma
(0.001, 0.005), # smc_atr
(0.001, 0.1), # mfh_lr
(3, 10), # mfh_horizon
(10, 50), # pem_fast
(100, 300), # pem_slow
(15.0, 30.0), # pem_adx
(10, 50), # rvgi_sma
(10, 30), # rvgi_cci
(10, 100) # ifvg_gap
]
steps = [10, 0.0005, 0.005, 1, 5, 10, 1.0, 2, 2, 5]
# 3. Objective
def objective(params):
return self.evaluate_comprehensive_params(params, df)
# 4. Optimizer
algo_name = random.choice(list(self.optimizers.keys()))
optimizer = self.optimizers[algo_name]
# Adjust population size for realtime performance
if hasattr(optimizer, 'pop_size'):
optimizer.pop_size = 20
logger.info(f"Selected Optimizer: {algo_name} (Pop: {optimizer.pop_size})")
# 5. Run
best_params, best_score = optimizer.optimize(
objective,
bounds,
steps=steps,
epochs=4 # Reduced from 5 to 4
)
# 6. Apply Results
if best_score > -1000:
logger.info(f"Optimization Complete! Best Score: {best_score:.2f}")
# Extract
p_smc_ma = int(best_params[0])
p_smc_atr = best_params[1]
p_mfh_lr = best_params[2]
p_mfh_horizon = int(best_params[3])
p_pem_fast = int(best_params[4])
p_pem_slow = int(best_params[5])
p_pem_adx = best_params[6]
p_rvgi_sma = int(best_params[7])
p_rvgi_cci = int(best_params[8])
p_ifvg_gap = int(best_params[9])
# Apply
self.smc_analyzer.ma_period = p_smc_ma
self.smc_analyzer.atr_threshold = p_smc_atr
self.mfh_analyzer.learning_rate = p_mfh_lr
self.mfh_analyzer.horizon = p_mfh_horizon
# PEM in Crypto might need params passed to analyze method or set attributes if supported
# Assuming we can set them for future analyze calls, but analyze() signature requires them passed?
# Let's store them in self.pem_params
self.pem_params = {
'ma_fast': p_pem_fast,
'ma_slow': p_pem_slow,
'adx_threshold': p_pem_adx
}
self.short_term_params = {
'rvgi_sma': p_rvgi_sma,
'rvgi_cci': p_rvgi_cci,
'ifvg_gap': p_ifvg_gap
}
msg = (
f"🧬 *Comprehensive Optimization ({algo_name})*\n"
f"Score: {best_score:.2f}\n"
f"• SMC: MA={p_smc_ma}, ATR={p_smc_atr:.4f}\n"
f"• MFH: LR={p_mfh_lr:.3f}, H={p_mfh_horizon}\n"
f"• PEM: Fast={p_pem_fast}, Slow={p_pem_slow}, ADX={p_pem_adx:.1f}\n"
f"• ST: RVGI({p_rvgi_sma},{p_rvgi_cci}), IFVG({p_ifvg_gap})"
)
self.send_telegram_message(msg)
logger.info(f"Updated all strategy params: {msg}")
else:
logger.warning("Optimization failed to find positive score, keeping original params")
def calculate_optimized_sl_tp(self, trade_type, price, atr, market_context=None):
"""
Calculate optimized SL/TP based on ATR, MFE/MAE stats, and market structure
"""
if atr <= 0:
atr = price * 0.01 # Fallback 1%
# 1. Base Volatility
mfe_tp_dist = atr * 2.0
mae_sl_dist = atr * 1.5
# 2. Historical Stats (MFE/MAE)
try:
stats = self.db_manager.get_trade_performance_stats(limit=100)
trades = []
if isinstance(stats, list):
trades = stats
elif isinstance(stats, dict) and 'recent_trades' in stats:
trades = stats['recent_trades']
if trades and len(trades) > 10:
mfes = [t.get('mfe', 0) for t in trades if t.get('mfe', 0) > 0]
maes = [abs(t.get('mae', 0)) for t in trades if abs(t.get('mae', 0)) > 0]
if mfes and maes:
# Use 60th percentile for TP (Conservative)
# Use 90th percentile for SL (Wide enough to survive noise)
opt_tp_pct = np.percentile(mfes, 60) / 100.0
opt_sl_pct = np.percentile(maes, 90) / 100.0
mfe_tp_dist = price * opt_tp_pct
mae_sl_dist = price * opt_sl_pct
except Exception as e:
logger.warning(f"MFE/MAE calc failed: {e}")
# 3. Market Structure (Supply/Demand/FVG)
struct_tp_price = 0.0
if market_context:
is_buy = 'buy' in trade_type
if is_buy:
# Find TP: Lowest Bearish FVG above price
resistance_candidates = []
if 'active_zones' in market_context:
for zone in market_context['active_zones']:
if zone.get('type') == 'bearish' and zone.get('bottom', 0) > price:
resistance_candidates.append(zone['bottom'])
if resistance_candidates:
struct_tp_price = min(resistance_candidates)
else:
# Find TP: Highest Bullish FVG below price
support_candidates = []
if 'active_zones' in market_context:
for zone in market_context['active_zones']:
if zone.get('type') == 'bullish' and zone.get('top', 0) < price:
support_candidates.append(zone['top'])
if support_candidates:
struct_tp_price = max(support_candidates)
# 4. Final Calculation
final_sl = 0.0
final_tp = 0.0
if 'buy' in trade_type:
base_tp = price + mfe_tp_dist
base_sl = price - mae_sl_dist
if struct_tp_price > price:
if struct_tp_price < base_tp:
final_tp = struct_tp_price - (atr * 0.1)
else:
final_tp = base_tp
else:
final_tp = base_tp
final_sl = base_sl
else:
base_tp = price - mfe_tp_dist
base_sl = price + mae_sl_dist
if struct_tp_price > 0 and struct_tp_price < price:
if struct_tp_price > base_tp:
final_tp = struct_tp_price + (atr * 0.1)
else:
final_tp = base_tp
else:
final_tp = base_tp
final_sl = base_sl
return final_sl, final_tp
def analyze_market(self):
"""Analyze market using DeepSeek"""
logger.info(f"Fetching data for {self.symbol}...")
# Increase limit to allow for optimization history
df = self.data_processor.get_historical_data(self.symbol, self.timeframe, limit=600)
if df.empty:
logger.error("Failed to fetch historical data")
return None, None
# Generate features
df = self.data_processor.generate_features(df)
# Check if we need to run optimization
current_time = time.time()
if current_time - self.last_optimization_time > self.optimization_interval:
self.optimize_strategy_parameters(df)
self.last_optimization_time = current_time
# --- Self-Learning: Train Local Models ---
if len(df) > 2:
current_close = df['close'].iloc[-1]
prev_close = df['close'].iloc[-2]
actual_return = current_close - prev_close
# Train MFH
self.mfh_analyzer.train(actual_return)
# Train MatrixML
self.matrix_ml.train(actual_return)
# Prepare data for AI analysis
latest_data = df.iloc[-1].to_dict()
recent_candles = df.iloc[-5:].reset_index().to_dict('records')
market_data = {
"symbol": self.symbol,
"timeframe": self.timeframe,
"current_price": latest_data.get('close'),
"indicators": {
"ema_fast": latest_data.get('ema_fast'),
"ema_slow": latest_data.get('ema_slow'),
"rsi": latest_data.get('rsi'),
"atr": latest_data.get('atr'),
"volatility": latest_data.get('volatility')
},
"recent_candles": recent_candles
}
# Fetch current positions
current_positions = []
try:
positions = self.data_processor.exchange.fetch_positions([self.symbol])
current_positions = [p for p in positions if float(p['contracts']) > 0]
except Exception:
pass
technical_signals = market_data.get('indicators', {})
# --- Advanced Algorithm Integration ---
# 1. CRT
crt_analysis = self.advanced_analysis.analyze_crt_strategy(df) # Use default or optimized if method signature updated
# 2. IFVG (Use optimized)
ifvg_analysis = self.advanced_analysis.analyze_ifvg(
df,
min_gap_points=self.short_term_params.get('ifvg_gap', 50)
)
# 3. RVGI + CCI (Use optimized)
rvgi_analysis = self.advanced_analysis.analyze_rvgi_cci_strategy(
df,
sma_period=self.short_term_params.get('rvgi_sma', 30),
cci_period=self.short_term_params.get('rvgi_cci', 14)
)
# 4. Market Regime
regime_analysis = self.advanced_analysis.detect_market_regime(df)
# 5. SMC
smc_analysis = self.smc_analyzer.analyze(df)
# 6. MFH
mfh_analysis = self.mfh_analyzer.predict(df)
# 7. MTF
# ... Fetch HTF data ...
tf_lower = self.timeframe.lower()
htf_timeframe = '4h' if tf_lower in ['1h', '15m', '30m'] else '1d'
df_htf = self.data_processor.get_historical_data(self.symbol, htf_timeframe, limit=100)
mtf_analysis = {"signal": "neutral", "reason": "No HTF Data"}
if not df_htf.empty:
mtf_analysis = self.mtf_analyzer.analyze(df, df_htf)
# 8. PEM (Use optimized params)
pem_p = getattr(self, 'pem_params', {})
pem_analysis = self.pem_analyzer.analyze(
df,
ma_fast_period=pem_p.get('ma_fast', 108),
ma_slow_period=pem_p.get('ma_slow', 60),
adx_threshold=pem_p.get('adx_threshold', 20)
)
# 9. MatrixML
returns_data = df['close'].diff().dropna().values
matrix_ml_analysis = self.matrix_ml.predict(returns_data)
# Combine
extra_analysis_data = {
"technical_indicators": technical_signals,
"crt_strategy": crt_analysis,
"ifvg_strategy": ifvg_analysis,
"rvgi_cci_strategy": rvgi_analysis,
"market_regime": regime_analysis,
"smc_strategy": smc_analysis,
"mfh_strategy": mfh_analysis,
"mtf_strategy": mtf_analysis,
"pem_strategy": pem_analysis,
"matrix_ml_strategy": matrix_ml_analysis
}
logger.info(f"Signals: SMC={smc_analysis['signal']}, MFH={mfh_analysis['signal']}, PEM={pem_analysis['signal']}, IFVG={ifvg_analysis['signal']}")
# 1. DeepSeek
logger.info("Requesting DeepSeek market structure analysis...")
# Get performance stats for DeepSeek context
performance_stats = []
try:
performance_stats = self.db_manager.get_trade_performance_stats(limit=50)
except Exception:
pass
structure_analysis = self.deepseek_client.analyze_market_structure(
market_data,
current_positions=current_positions,
extra_analysis=extra_analysis_data,
performance_stats=performance_stats
)
# DeepSeek Signal Logic (similar to Gold)
ds_signal = structure_analysis.get('preliminary_signal', 'neutral')
ds_score = structure_analysis.get('structure_score', 50)
ds_pred = structure_analysis.get('short_term_prediction', 'neutral')
# Combine Signals using HybridOptimizer logic
all_signals = {
"deepseek": ds_signal,
"crt": crt_analysis['signal'],
"price_equation": pem_analysis['signal'],
"matrix_ml": matrix_ml_analysis['signal'],
"smc": smc_analysis['signal'],
"mfh": mfh_analysis['signal'],
"mtf": mtf_analysis['signal'],
"ifvg": ifvg_analysis['signal'],
"rvgi_cci": rvgi_analysis['signal']
}
final_signal, final_score, _ = self.hybrid_optimizer.combine_signals(all_signals)
logger.info(f"Hybrid Signal: {final_signal} (Score: {final_score:.2f})")
structure_analysis['technical_signals'] = extra_analysis_data
structure_analysis['hybrid_signal'] = final_signal # Pass to Qwen
structure_analysis['hybrid_score'] = final_score
try:
self.db_manager.log_analysis(
symbol=self.symbol,
market_state=structure_analysis.get('market_state'),
structure_score=structure_analysis.get('structure_score'),
ai_decision=None,
raw_analysis=structure_analysis
)
except Exception as e:
logger.error(f"Failed to log analysis: {e}")
return df, structure_analysis
def make_decision(self, df, structure_analysis):
"""Make trading decision using Qwen"""
if not self.qwen_client: return None
latest_data = df.iloc[-1].to_dict()
balance = self.data_processor.get_account_balance('USDT')
available_usdt = balance['free'] if balance else 0.0
total_equity = balance['total'] if balance else available_usdt
# Fetch actual positions
valid_positions = []
try:
positions = self.data_processor.exchange.fetch_positions([self.symbol])
for pos in positions:
if float(pos['contracts']) > 0:
valid_positions.append({
"symbol": pos['symbol'],
"side": pos['side'],
"contracts": float(pos['contracts']),
"unrealized_pnl": pos['unrealizedPnl'],
"leverage": pos['leverage']
})
except Exception:
pass
# Open Orders
open_orders = []
try:
raw_orders = self.data_processor.get_open_orders(self.symbol)
for o in raw_orders:
open_orders.append({"id": o['id'], "type": o['type'], "price": o['price']})
except Exception:
pass
market_data = {
"symbol": self.symbol,
"price": latest_data.get('close'),
"indicators": {
"rsi": latest_data.get('rsi'),
"atr": latest_data.get('atr')
},
"account_info": {"available_usdt": available_usdt, "total_equity": total_equity},
"open_orders": open_orders
}
# Feedback
performance_stats = []
try:
performance_stats = self.db_manager.get_trade_performance_stats(limit=50)
except Exception:
pass
logger.info("Requesting Qwen strategy optimization...")
decision = self.qwen_client.optimize_strategy_logic(
structure_analysis,
market_data,
current_positions=valid_positions,
performance_stats=performance_stats
)
# Check if Qwen provided SL/TP, if not, use Optimized Calculation
exit_cond = decision.get('exit_conditions', {})
if not exit_cond.get('sl_price') or not exit_cond.get('tp_price'):
logger.info("Qwen did not provide SL/TP, using Optimized Calculation")
trade_dir = "buy"
action = decision.get('action', 'hold')
if action in ['sell', 'limit_sell']: trade_dir = "sell"
atr = latest_data.get('atr', 0)
price = latest_data.get('close')
market_context = structure_analysis.get('technical_signals', {}).get('ifvg_strategy', {})
calc_sl, calc_tp = self.calculate_optimized_sl_tp(trade_dir, price, atr, market_context=market_context)
if not exit_cond.get('sl_price'): exit_cond['sl_price'] = calc_sl
if not exit_cond.get('tp_price'): exit_cond['tp_price'] = calc_tp
decision['exit_conditions'] = exit_cond
# Send Telegram
try:
sl_price = exit_cond.get('sl_price')
tp_price = exit_cond.get('tp_price')
action = decision.get('action')
display_action = action
if action == 'hold' and not valid_positions:
display_action = "WAITING FOR MARKET DIRECTION ⏳"
ds_signal = structure_analysis.get('preliminary_signal', 'N/A')
ds_score = structure_analysis.get('structure_score', 0)
hybrid_signal = structure_analysis.get('hybrid_signal', 'N/A')
hybrid_score = structure_analysis.get('hybrid_score', 0)
msg = (
f"🤖 *AI Crypto Strategy Insight*\n"
f"Symbol: `{self.symbol}` | TF: `{self.timeframe}`\n"
f"Time: {datetime.now().strftime('%H:%M:%S')}\n\n"
f"🧠 *AI Consensus*\n"
f"• Decision: *{display_action.upper()}*\n"
f"• Qwen Action: `{self.escape_markdown(action)}`\n"
f"• Hybrid Signal: `{hybrid_signal}` ({hybrid_score:.2f})\n"
f"• DeepSeek: `{ds_signal}` ({ds_score}/100)\n\n"
f"📝 *Rationale*: _{self.escape_markdown(decision.get('strategy_rationale'))}_\n\n"
f"🎯 *Setup*\n"
f"• SL: `{sl_price}`\n"
f"• TP: `{tp_price}`\n"
f"• Lev: {decision.get('leverage')}x | Size: {float(decision.get('position_size', 0))*100:.1f}%\n\n"
f"📊 *Market State*: `{structure_analysis.get('market_state')}`"
)
self.send_telegram_message(msg)
except Exception as e:
logger.error(f"Failed to construct telegram: {e}")
return decision
def execute_trade(self, decision):
"""Execute trade based on decision"""
action = decision.get('action')
rationale = decision.get('strategy_rationale', 'No rationale provided')
# Determine current positions status for logic handling
target_pos = None
try:
positions = self.data_processor.exchange.fetch_positions([self.symbol])
if positions:
for p in positions:
if float(p['contracts']) > 0:
target_pos = p
break
except Exception as e:
logger.error(f"Failed to fetch positions during execution: {e}")
# --- CASE 1: Close Logic (close_buy / close_sell) ---
if action in ['close_buy', 'close_sell']:
if target_pos:
logger.info(f"Executing CLOSE position for {self.symbol} based on AI decision")
try:
pos_side = target_pos['side'] # 'long' or 'short'
close_side = 'sell' if pos_side == 'long' else 'buy'
close_amount = float(target_pos['contracts'])
self.data_processor.cancel_all_orders(self.symbol)
order = self.data_processor.create_order(self.symbol, close_side, close_amount, type='market')
if order:
self.send_telegram_message(f"🚫 *Position Closed*\nSymbol: `{self.symbol}`\nReason: AI Signal `{action}`")
except Exception as e:
logger.error(f"Failed to close position: {e}")
return
# --- CASE 2: Hold / Update SL/TP Logic ---
if action == 'hold':
if target_pos:
exit_conditions = decision.get('exit_conditions', {})
new_sl = exit_conditions.get('sl_price')
new_tp = exit_conditions.get('tp_price')
if new_sl or new_tp:
logger.info(f"Updating SL/TP for existing position: SL={new_sl}, TP={new_tp}")
pos_side = target_pos['side']
sl_tp_side = 'sell' if pos_side == 'long' else 'buy'
pos_amount = float(target_pos['contracts'])
self.data_processor.cancel_all_orders(self.symbol)
self.data_processor.place_sl_tp_order(self.symbol, sl_tp_side, pos_amount, sl_price=new_sl, tp_price=new_tp)
self.send_telegram_message(f"🔄 *Updated SL/TP*\nSymbol: `{self.symbol}`\nNew SL: `{new_sl}`\nNew TP: `{new_tp}`")
return
# --- CASE 3: Open New Position (buy / sell) ---
if target_pos:
pos_side = target_pos['side']
is_reversal = (action == 'buy' and pos_side == 'short') or (action == 'sell' and pos_side == 'long')
if is_reversal:
logger.info(f"Reversal signal detected. Closing existing {pos_side} position.")
try:
close_side = 'sell' if pos_side == 'long' else 'buy'
close_amount = float(target_pos['contracts'])
self.data_processor.cancel_all_orders(self.symbol)
self.data_processor.create_order(self.symbol, close_side, close_amount, type='market')
self.send_telegram_message(f"🔄 *Position Reversal Initiated*")
time.sleep(1)
except Exception as e:
logger.error(f"Failed to close position for reversal: {e}")
return
else:
# Same direction, update SL/TP
logger.info(f"Signal {action} matches existing {pos_side} position. Updating SL/TP.")
exit_conditions = decision.get('exit_conditions', {})
new_sl = exit_conditions.get('sl_price')
new_tp = exit_conditions.get('tp_price')
if new_sl or new_tp:
sl_tp_side = 'sell' if pos_side == 'long' else 'buy'
pos_amount = float(target_pos['contracts'])
self.data_processor.cancel_all_orders(self.symbol)
self.data_processor.place_sl_tp_order(self.symbol, sl_tp_side, pos_amount, sl_price=new_sl, tp_price=new_tp)
return
# Execute New Trade
leverage = int(decision.get('leverage', 1))
volume_percent = float(decision.get('position_size', 0.0))
volume_percent = max(0.0, min(1.0, volume_percent))
if volume_percent <= 0: return
balance = self.data_processor.get_account_balance('USDT')
available_usdt = balance['free'] if balance else 0.0
target_usdt = available_usdt * volume_percent
if volume_percent > 0.95: target_usdt *= 0.99
current_price = self.data_processor.get_current_price(self.symbol)
if not current_price: return
target_position_value = target_usdt * leverage
amount_eth = target_position_value / current_price
contract_size = self.data_processor.get_contract_size(self.symbol) or 0.1
num_contracts = int(amount_eth / contract_size)
if num_contracts < 1:
logger.warning(f"Calculated contracts < 1. Required margin too high.")
return
# Prepare SL/TP
exit_conditions = decision.get('exit_conditions', {})
sl_price = exit_conditions.get('sl_price')
tp_price = exit_conditions.get('tp_price')
order_params = {}
if sl_price or tp_price:
algo_order = {'tpTriggerPxType': 'last', 'slTriggerPxType': 'last'}
if tp_price: algo_order['tpTriggerPx'] = str(tp_price); algo_order['tpOrdPx'] = '-1'
if sl_price: algo_order['slTriggerPx'] = str(sl_price); algo_order['slOrdPx'] = '-1'
order_params['attachAlgoOrds'] = [algo_order]
try:
self.data_processor.set_leverage(self.symbol, leverage)
order = None
if action == 'buy':
order = self.data_processor.create_order(self.symbol, 'buy', num_contracts, type='market', params=order_params)
elif action == 'sell':
order = self.data_processor.create_order(self.symbol, 'sell', num_contracts, type='market', params=order_params)
elif action in ['buy_limit', 'limit_buy']:
lp = decision.get('entry_conditions', {}).get('limit_price')
# Auto-fallback if limit price is invalid
if not lp or float(lp) <= 0:
logger.warning("LLM suggested Limit Buy but no price provided. Using ATR fallback.")
atr = latest_data.get('atr', 0)
if atr > 0:
lp = current_price - (atr * 0.5)
logger.info(f"Auto-set Limit Buy Price: {lp:.2f}")
if lp and float(lp) > 0:
# Determine Limit vs Stop
if float(lp) > current_price:
# Price > Current = Stop Buy (Breakout)
# OKX uses 'trigger' orders for stop, but 'limit' for standard limit.
# Standard Limit Buy must be < Current Price.
# If we want a Stop Buy, we need a trigger order.
# For simplicity in this bot, we might convert Stop Buy to Market if close, or use trigger.
# Let's assume standard limit for now, and if price > current, we might need to use 'market' or skip.
# Actually, placing a limit buy above market price executes immediately as taker (market).
# So we can just place it.
pass
order = self.data_processor.create_order(self.symbol, 'buy', num_contracts, type='limit', price=lp, params=order_params)
elif action in ['sell_limit', 'limit_sell']:
lp = decision.get('entry_conditions', {}).get('limit_price')
# Auto-fallback
if not lp or float(lp) <= 0:
logger.warning("LLM suggested Limit Sell but no price provided. Using ATR fallback.")
atr = latest_data.get('atr', 0)
if atr > 0:
lp = current_price + (atr * 0.5)
logger.info(f"Auto-set Limit Sell Price: {lp:.2f}")
if lp and float(lp) > 0:
order = self.data_processor.create_order(self.symbol, 'sell', num_contracts, type='limit', price=lp, params=order_params)
if order:
trade_record = {
'symbol': self.symbol, 'action': action, 'order_type': 'limit' if 'limit' in action else 'market',
'contracts': num_contracts, 'price': current_price, 'leverage': leverage,
'order_id': order.get('id'), 'strategy_rationale': rationale
}
self.db_manager.log_trade(trade_record)
exec_msg = (
f"✅ *Trade Executed*\nAction: `{action.upper()}`\nSymbol: `{self.symbol}`\n"
f"Contracts: `{num_contracts}`\nPrice: `{trade_record['price']}`\nLeverage: `{leverage}x`"
)
self.send_telegram_message(exec_msg)
except Exception as e:
logger.error(f"Failed to execute trade: {e}")
def run_once(self):
"""Run a single trading cycle"""
try:
logger.info("Starting trading cycle...")
df, analysis = self.analyze_market()
if df is not None and analysis is not None:
decision = self.make_decision(df, analysis)
if decision:
self.execute_trade(decision)
logger.info("Trading cycle completed")
except Exception as e:
logger.error(f"Error in trading cycle: {e}", exc_info=True)
def start(self):
"""Start the bot loop"""
self.is_running = True
logger.info(f"Starting bot for {self.symbol} with {self.interval}s interval")
while self.is_running:
self.run_once()
logger.info(f"Waiting {self.interval} seconds before next analysis...")
time.sleep(self.interval)
if __name__ == "__main__":
bot = CryptoTradingBot(symbol='ETH/USDT:USDT', timeframe='15m', interval=900)
bot.start()