mql5/crypto/trading_bot.py

983 lines
40 KiB
Python
Raw Permalink Normal View History

import time
import logging
import os
import json
2025-12-28 01:17:57 +08:00
import requests
import random
import numpy as np
import pandas as pd
from datetime import datetime
from dotenv import load_dotenv
from .okx_data_processor import OKXDataProcessor
from .ai_client_factory import AIClientFactory
from .database_manager import DatabaseManager
from .advanced_analysis import AdvancedMarketAnalysis, SMCAnalyzer, MFHAnalyzer, MTFAnalyzer, PEMAnalyzer, MatrixMLAnalyzer
from .optimization import GWO, WOAm, DE, COAm, BBO, TETA
# Load environment variables
load_dotenv()
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("trading_bot.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class HybridOptimizer:
def __init__(self):
self.weights = {
"deepseek": 1.0,
"qwen": 1.2,
"crt": 0.8,
"price_equation": 0.6,
"tf_visual": 0.5,
"advanced_tech": 0.7,
"matrix_ml": 0.9,
"smc": 1.1,
"mfh": 0.8,
"mtf": 0.8,
"ifvg": 0.7,
"rvgi_cci": 0.6
}
self.history = []
def combine_signals(self, signals):
weighted_sum = 0
total_weight = 0
details = {}
for source, signal in signals.items():
weight = self.weights.get(source, 0.5)
val = 0
if signal == 'buy': val = 1
elif signal == 'sell': val = -1
weighted_sum += val * weight
total_weight += weight
details[source] = val * weight
if total_weight == 0: return "neutral", 0, self.weights
final_score = weighted_sum / total_weight
final_signal = "neutral"
if final_score > 0.15: final_signal = "buy" # Lower threshold for sensitivity
elif final_score < -0.15: final_signal = "sell"
return final_signal, final_score, self.weights
class CryptoTradingBot:
def __init__(self, symbol='ETH/USDT', timeframe='15m', interval=3600):
"""
Initialize the Crypto Trading Bot
Args:
symbol (str): Trading pair
timeframe (str): Candle timeframe (e.g., '15m', '1h', '4h')
interval (int): Loop interval in seconds
"""
self.symbol = symbol
self.timeframe = timeframe
self.interval = interval
self.is_running = False
# Initialize Database Manager
current_dir = os.path.dirname(os.path.realpath(__file__))
db_path = os.path.join(current_dir, 'crypto_trading.db')
self.db_manager = DatabaseManager(db_name=db_path)
# Initialize Data Processor
self.data_processor = OKXDataProcessor()
# Initialize Advanced Analysis
self.advanced_analysis = AdvancedMarketAnalysis()
self.smc_analyzer = SMCAnalyzer()
self.mfh_analyzer = MFHAnalyzer()
self.mtf_analyzer = MTFAnalyzer()
self.pem_analyzer = PEMAnalyzer()
self.matrix_ml = MatrixMLAnalyzer()
self.hybrid_optimizer = HybridOptimizer()
# Optimization Engine Pool
self.optimizers = {
"GWO": GWO(),
"WOAm": WOAm(),
"DE": DE(),
"COAm": COAm(),
"BBO": BBO(),
"TETA": TETA()
}
self.active_optimizer_name = "WOAm"
self.last_optimization_time = 0
self.optimization_interval = 3600 * 4 # Re-optimize every 4 hours
# Short-term params
self.short_term_params = {
'rvgi_sma': 30,
'rvgi_cci': 14,
'ifvg_gap': 50
}
# Initialize AI Clients
self.ai_factory = AIClientFactory()
self.deepseek_client = self.ai_factory.get_client('deepseek')
self.qwen_client = self.ai_factory.get_client('qwen')
2025-12-28 01:17:57 +08:00
# Telegram Configuration
self.telegram_token = os.getenv("TELEGRAM_BOT_TOKEN")
self.telegram_chat_id = os.getenv("TELEGRAM_CHAT_ID")
2025-12-28 01:17:57 +08:00
if not self.deepseek_client or not self.qwen_client:
logger.warning("AI Clients not fully initialized. Trading functionality may be limited.")
2025-12-28 01:17:57 +08:00
def escape_markdown(self, text):
"""Helper to escape markdown special characters"""
if not isinstance(text, str):
text = str(text)
escape_chars = '_*[`'
for char in escape_chars:
text = text.replace(char, f'\\{char}')
return text
2025-12-28 01:17:57 +08:00
def send_telegram_message(self, message):
"""Send message to Telegram"""
try:
url = f"https://api.telegram.org/bot{self.telegram_token}/sendMessage"
payload = {
"chat_id": self.telegram_chat_id,
"text": message,
"parse_mode": "Markdown"
}
response = requests.post(url, json=payload, timeout=10)
if response.status_code != 200:
logger.error(f"Failed to send Telegram message: {response.text}")
except Exception as e:
logger.error(f"Error sending Telegram message: {e}")
def evaluate_comprehensive_params(self, params, df):
"""
Comprehensive Objective Function: Evaluates ALL dataframe-based strategy parameters together.
params: Vector of parameter values corresponding to the defined structure.
"""
# Global counter for progress logging
if not hasattr(self, '_opt_counter'): self._opt_counter = 0
self._opt_counter += 1
if self._opt_counter % 50 == 0:
logger.info(f"Optimization Progress: {self._opt_counter} evaluations...")
# 1. Decode Parameters
# 0: smc_ma (int)
# 1: smc_atr (float)
# 2: mfh_lr (float)
# 3: mfh_horizon (int)
# 4: pem_fast (int)
# 5: pem_slow (int)
# 6: pem_adx (float)
# 7: rvgi_sma (int)
# 8: rvgi_cci (int)
# 9: ifvg_gap (int)
try:
p_smc_ma = int(params[0])
p_smc_atr = params[1]
p_mfh_lr = params[2]
p_mfh_horizon = int(params[3])
p_pem_fast = int(params[4])
p_pem_slow = int(params[5])
p_pem_adx = params[6]
p_rvgi_sma = int(params[7])
p_rvgi_cci = int(params[8])
p_ifvg_gap = int(params[9])
# 2. Initialize Temporary Analyzers (Fresh State)
tmp_smc = SMCAnalyzer()
tmp_smc.ma_period = p_smc_ma
tmp_smc.atr_threshold = p_smc_atr
tmp_mfh = MFHAnalyzer(learning_rate=p_mfh_lr)
tmp_mfh.horizon = p_mfh_horizon
# Note: PEMAnalyzer in Crypto might need attribute adjustment if different from Gold
tmp_pem = PEMAnalyzer()
# Assuming PEMAnalyzer has similar analyze method signature or we mock it
# Crypto PEMAnalyzer uses analyze(df, ma_fast_period, ma_slow_period, adx_threshold)
tmp_adapter = AdvancedMarketAnalysis()
# 3. Run Simulation
start_idx = max(p_smc_ma, p_pem_slow, 50) + 10
if len(df) < start_idx + 50: return -9999
balance = 10000.0
closes = df['close'].values
trades_count = 0
wins = 0
# Optimization: Step size > 1 to speed up
for i in range(start_idx, len(df)-1):
sub_df = df.iloc[:i+1]
curr_price = closes[i]
next_price = closes[i+1]
# MFH Train (Must happen every step for consistency)
if i > p_mfh_horizon:
past_ret = (closes[i] - closes[i-p_mfh_horizon]) / closes[i-p_mfh_horizon]
tmp_mfh.train(past_ret)
# Signals
smc_sig = tmp_smc.analyze(sub_df)['signal']
mfh_sig = tmp_mfh.predict(sub_df)['signal']
# PEM Signal (passing params explicitly as per Crypto implementation)
pem_res = tmp_pem.analyze(sub_df, ma_fast_period=p_pem_fast, ma_slow_period=p_pem_slow, adx_threshold=p_pem_adx)
pem_sig = pem_res['signal']
# Short Term
ifvg_sig = tmp_adapter.analyze_ifvg(sub_df, min_gap_points=p_ifvg_gap)['signal']
rvgi_sig = tmp_adapter.analyze_rvgi_cci_strategy(sub_df, sma_period=p_rvgi_sma, cci_period=p_rvgi_cci)['signal']
# Combine
votes = 0
for s in [smc_sig, mfh_sig, pem_sig, ifvg_sig, rvgi_sig]:
if s == 'buy': votes += 1
elif s == 'sell': votes -= 1
final_sig = "neutral"
if votes >= 2: final_sig = "buy"
elif votes <= -2: final_sig = "sell"
# Evaluate
if final_sig == "buy":
trades_count += 1
if next_price > curr_price: wins += 1
balance += (next_price - curr_price)
elif final_sig == "sell":
trades_count += 1
if next_price < curr_price: wins += 1
balance += (curr_price - next_price)
if trades_count == 0: return -100
# Simple Profit Metric
score = (balance - 10000.0)
return score
except Exception:
return -9999
def optimize_strategy_parameters(self, df):
"""
Comprehensive Optimization: Tunes ALL strategy parameters using Auto-AO.
"""
logger.info("Starting Comprehensive Strategy Optimization (Auto-AO)...")
# Reset progress counter
self._opt_counter = 0
if df is None or len(df) < 200:
logger.warning("Insufficient data for optimization, skipping")
return
# Reduce data size for speed (300 candles)
if len(df) > 300:
df = df.tail(300).copy()
# 2. Define Search Space (10 Dimensions)
# smc_ma, smc_atr, mfh_lr, mfh_horizon, pem_fast, pem_slow, pem_adx, rvgi_sma, rvgi_cci, ifvg_gap
bounds = [
(100, 300), # smc_ma
(0.001, 0.005), # smc_atr
(0.001, 0.1), # mfh_lr
(3, 10), # mfh_horizon
(10, 50), # pem_fast
(100, 300), # pem_slow
(15.0, 30.0), # pem_adx
(10, 50), # rvgi_sma
(10, 30), # rvgi_cci
(10, 100) # ifvg_gap
]
steps = [10, 0.0005, 0.005, 1, 5, 10, 1.0, 2, 2, 5]
# 3. Objective
def objective(params):
return self.evaluate_comprehensive_params(params, df)
# 4. Optimizer
algo_name = random.choice(list(self.optimizers.keys()))
optimizer = self.optimizers[algo_name]
# Adjust population size for realtime performance
if hasattr(optimizer, 'pop_size'):
optimizer.pop_size = 20
logger.info(f"Selected Optimizer: {algo_name} (Pop: {optimizer.pop_size})")
# 5. Run
best_params, best_score = optimizer.optimize(
objective,
bounds,
steps=steps,
epochs=4 # Reduced from 5 to 4
)
# 6. Apply Results
if best_score > -1000:
logger.info(f"Optimization Complete! Best Score: {best_score:.2f}")
# Extract
p_smc_ma = int(best_params[0])
p_smc_atr = best_params[1]
p_mfh_lr = best_params[2]
p_mfh_horizon = int(best_params[3])
p_pem_fast = int(best_params[4])
p_pem_slow = int(best_params[5])
p_pem_adx = best_params[6]
p_rvgi_sma = int(best_params[7])
p_rvgi_cci = int(best_params[8])
p_ifvg_gap = int(best_params[9])
# Apply
self.smc_analyzer.ma_period = p_smc_ma
self.smc_analyzer.atr_threshold = p_smc_atr
self.mfh_analyzer.learning_rate = p_mfh_lr
self.mfh_analyzer.horizon = p_mfh_horizon
# PEM in Crypto might need params passed to analyze method or set attributes if supported
# Assuming we can set them for future analyze calls, but analyze() signature requires them passed?
# Let's store them in self.pem_params
self.pem_params = {
'ma_fast': p_pem_fast,
'ma_slow': p_pem_slow,
'adx_threshold': p_pem_adx
}
self.short_term_params = {
'rvgi_sma': p_rvgi_sma,
'rvgi_cci': p_rvgi_cci,
'ifvg_gap': p_ifvg_gap
}
msg = (
f"🧬 *Comprehensive Optimization ({algo_name})*\n"
f"Score: {best_score:.2f}\n"
f"• SMC: MA={p_smc_ma}, ATR={p_smc_atr:.4f}\n"
f"• MFH: LR={p_mfh_lr:.3f}, H={p_mfh_horizon}\n"
f"• PEM: Fast={p_pem_fast}, Slow={p_pem_slow}, ADX={p_pem_adx:.1f}\n"
f"• ST: RVGI({p_rvgi_sma},{p_rvgi_cci}), IFVG({p_ifvg_gap})"
)
self.send_telegram_message(msg)
logger.info(f"Updated all strategy params: {msg}")
else:
logger.warning("Optimization failed to find positive score, keeping original params")
def calculate_optimized_sl_tp(self, trade_type, price, atr, market_context=None):
"""
Calculate optimized SL/TP based on ATR, MFE/MAE stats, and market structure
"""
if atr <= 0:
atr = price * 0.01 # Fallback 1%
# 1. Base Volatility
mfe_tp_dist = atr * 2.0
mae_sl_dist = atr * 1.5
# 2. Historical Stats (MFE/MAE)
try:
stats = self.db_manager.get_trade_performance_stats(limit=100)
trades = []
if isinstance(stats, list):
trades = stats
elif isinstance(stats, dict) and 'recent_trades' in stats:
trades = stats['recent_trades']
if trades and len(trades) > 10:
mfes = [t.get('mfe', 0) for t in trades if t.get('mfe', 0) > 0]
maes = [abs(t.get('mae', 0)) for t in trades if abs(t.get('mae', 0)) > 0]
if mfes and maes:
# Use 60th percentile for TP (Conservative)
# Use 90th percentile for SL (Wide enough to survive noise)
opt_tp_pct = np.percentile(mfes, 60) / 100.0
opt_sl_pct = np.percentile(maes, 90) / 100.0
mfe_tp_dist = price * opt_tp_pct
mae_sl_dist = price * opt_sl_pct
except Exception as e:
logger.warning(f"MFE/MAE calc failed: {e}")
# 3. Market Structure (Supply/Demand/FVG)
struct_tp_price = 0.0
if market_context:
is_buy = 'buy' in trade_type
if is_buy:
# Find TP: Lowest Bearish FVG above price
resistance_candidates = []
if 'active_zones' in market_context:
for zone in market_context['active_zones']:
if zone.get('type') == 'bearish' and zone.get('bottom', 0) > price:
resistance_candidates.append(zone['bottom'])
if resistance_candidates:
struct_tp_price = min(resistance_candidates)
else:
# Find TP: Highest Bullish FVG below price
support_candidates = []
if 'active_zones' in market_context:
for zone in market_context['active_zones']:
if zone.get('type') == 'bullish' and zone.get('top', 0) < price:
support_candidates.append(zone['top'])
if support_candidates:
struct_tp_price = max(support_candidates)
# 4. Final Calculation
final_sl = 0.0
final_tp = 0.0
if 'buy' in trade_type:
base_tp = price + mfe_tp_dist
base_sl = price - mae_sl_dist
if struct_tp_price > price:
if struct_tp_price < base_tp:
final_tp = struct_tp_price - (atr * 0.1)
else:
final_tp = base_tp
else:
final_tp = base_tp
final_sl = base_sl
else:
base_tp = price - mfe_tp_dist
base_sl = price + mae_sl_dist
if struct_tp_price > 0 and struct_tp_price < price:
if struct_tp_price > base_tp:
final_tp = struct_tp_price + (atr * 0.1)
else:
final_tp = base_tp
else:
final_tp = base_tp
final_sl = base_sl
return final_sl, final_tp
def analyze_market(self):
"""Analyze market using DeepSeek"""
logger.info(f"Fetching data for {self.symbol}...")
# Increase limit to allow for optimization history
df = self.data_processor.get_historical_data(self.symbol, self.timeframe, limit=600)
if df.empty:
logger.error("Failed to fetch historical data")
return None, None
# Generate features
df = self.data_processor.generate_features(df)
# Check if we need to run optimization
current_time = time.time()
if current_time - self.last_optimization_time > self.optimization_interval:
self.optimize_strategy_parameters(df)
self.last_optimization_time = current_time
# --- Self-Learning: Train Local Models ---
if len(df) > 2:
current_close = df['close'].iloc[-1]
prev_close = df['close'].iloc[-2]
actual_return = current_close - prev_close
# Train MFH
self.mfh_analyzer.train(actual_return)
# Train MatrixML
self.matrix_ml.train(actual_return)
# Prepare data for AI analysis
latest_data = df.iloc[-1].to_dict()
recent_candles = df.iloc[-5:].reset_index().to_dict('records')
market_data = {
"symbol": self.symbol,
"timeframe": self.timeframe,
"current_price": latest_data.get('close'),
"indicators": {
"ema_fast": latest_data.get('ema_fast'),
"ema_slow": latest_data.get('ema_slow'),
"rsi": latest_data.get('rsi'),
"atr": latest_data.get('atr'),
"volatility": latest_data.get('volatility')
},
"recent_candles": recent_candles
}
# Fetch current positions
current_positions = []
try:
positions = self.data_processor.exchange.fetch_positions([self.symbol])
current_positions = [p for p in positions if float(p['contracts']) > 0]
except Exception:
pass
technical_signals = market_data.get('indicators', {})
# --- Advanced Algorithm Integration ---
# 1. CRT
crt_analysis = self.advanced_analysis.analyze_crt_strategy(df) # Use default or optimized if method signature updated
# 2. IFVG (Use optimized)
ifvg_analysis = self.advanced_analysis.analyze_ifvg(
df,
min_gap_points=self.short_term_params.get('ifvg_gap', 50)
)
# 3. RVGI + CCI (Use optimized)
rvgi_analysis = self.advanced_analysis.analyze_rvgi_cci_strategy(
df,
sma_period=self.short_term_params.get('rvgi_sma', 30),
cci_period=self.short_term_params.get('rvgi_cci', 14)
)
# 4. Market Regime
regime_analysis = self.advanced_analysis.detect_market_regime(df)
# 5. SMC
smc_analysis = self.smc_analyzer.analyze(df)
# 6. MFH
mfh_analysis = self.mfh_analyzer.predict(df)
# 7. MTF
# ... Fetch HTF data ...
tf_lower = self.timeframe.lower()
htf_timeframe = '4h' if tf_lower in ['1h', '15m', '30m'] else '1d'
df_htf = self.data_processor.get_historical_data(self.symbol, htf_timeframe, limit=100)
mtf_analysis = {"signal": "neutral", "reason": "No HTF Data"}
if not df_htf.empty:
mtf_analysis = self.mtf_analyzer.analyze(df, df_htf)
# 8. PEM (Use optimized params)
pem_p = getattr(self, 'pem_params', {})
pem_analysis = self.pem_analyzer.analyze(
df,
ma_fast_period=pem_p.get('ma_fast', 108),
ma_slow_period=pem_p.get('ma_slow', 60),
adx_threshold=pem_p.get('adx_threshold', 20)
)
# 9. MatrixML
returns_data = df['close'].diff().dropna().values
matrix_ml_analysis = self.matrix_ml.predict(returns_data)
# Combine
extra_analysis_data = {
"technical_indicators": technical_signals,
"crt_strategy": crt_analysis,
"ifvg_strategy": ifvg_analysis,
"rvgi_cci_strategy": rvgi_analysis,
"market_regime": regime_analysis,
"smc_strategy": smc_analysis,
"mfh_strategy": mfh_analysis,
"mtf_strategy": mtf_analysis,
"pem_strategy": pem_analysis,
"matrix_ml_strategy": matrix_ml_analysis
}
logger.info(f"Signals: SMC={smc_analysis['signal']}, MFH={mfh_analysis['signal']}, PEM={pem_analysis['signal']}, IFVG={ifvg_analysis['signal']}")
# 1. DeepSeek
logger.info("Requesting DeepSeek market structure analysis...")
# Get performance stats for DeepSeek context
performance_stats = []
try:
performance_stats = self.db_manager.get_trade_performance_stats(limit=50)
except Exception:
pass
structure_analysis = self.deepseek_client.analyze_market_structure(
market_data,
current_positions=current_positions,
extra_analysis=extra_analysis_data,
performance_stats=performance_stats
)
# DeepSeek Signal Logic (similar to Gold)
ds_signal = structure_analysis.get('preliminary_signal', 'neutral')
ds_score = structure_analysis.get('structure_score', 50)
ds_pred = structure_analysis.get('short_term_prediction', 'neutral')
# Combine Signals using HybridOptimizer logic
all_signals = {
"deepseek": ds_signal,
"crt": crt_analysis['signal'],
"price_equation": pem_analysis['signal'],
"matrix_ml": matrix_ml_analysis['signal'],
"smc": smc_analysis['signal'],
"mfh": mfh_analysis['signal'],
"mtf": mtf_analysis['signal'],
"ifvg": ifvg_analysis['signal'],
"rvgi_cci": rvgi_analysis['signal']
}
final_signal, final_score, _ = self.hybrid_optimizer.combine_signals(all_signals)
logger.info(f"Hybrid Signal: {final_signal} (Score: {final_score:.2f})")
structure_analysis['technical_signals'] = extra_analysis_data
structure_analysis['hybrid_signal'] = final_signal # Pass to Qwen
structure_analysis['hybrid_score'] = final_score
try:
self.db_manager.log_analysis(
symbol=self.symbol,
market_state=structure_analysis.get('market_state'),
structure_score=structure_analysis.get('structure_score'),
ai_decision=None,
raw_analysis=structure_analysis
)
except Exception as e:
logger.error(f"Failed to log analysis: {e}")
return df, structure_analysis
def make_decision(self, df, structure_analysis):
"""Make trading decision using Qwen"""
if not self.qwen_client: return None
latest_data = df.iloc[-1].to_dict()
balance = self.data_processor.get_account_balance('USDT')
available_usdt = balance['free'] if balance else 0.0
total_equity = balance['total'] if balance else available_usdt
# Fetch actual positions
valid_positions = []
try:
positions = self.data_processor.exchange.fetch_positions([self.symbol])
for pos in positions:
if float(pos['contracts']) > 0:
valid_positions.append({
"symbol": pos['symbol'],
"side": pos['side'],
"contracts": float(pos['contracts']),
"unrealized_pnl": pos['unrealizedPnl'],
"leverage": pos['leverage']
})
except Exception:
pass
# Open Orders
open_orders = []
try:
raw_orders = self.data_processor.get_open_orders(self.symbol)
for o in raw_orders:
open_orders.append({"id": o['id'], "type": o['type'], "price": o['price']})
except Exception:
pass
market_data = {
"symbol": self.symbol,
"price": latest_data.get('close'),
"indicators": {
"rsi": latest_data.get('rsi'),
"atr": latest_data.get('atr')
},
"account_info": {"available_usdt": available_usdt, "total_equity": total_equity},
"open_orders": open_orders
}
# Feedback
performance_stats = []
try:
performance_stats = self.db_manager.get_trade_performance_stats(limit=50)
except Exception:
pass
logger.info("Requesting Qwen strategy optimization...")
decision = self.qwen_client.optimize_strategy_logic(
structure_analysis,
market_data,
current_positions=valid_positions,
performance_stats=performance_stats
)
# Check if Qwen provided SL/TP, if not, use Optimized Calculation
exit_cond = decision.get('exit_conditions', {})
if not exit_cond.get('sl_price') or not exit_cond.get('tp_price'):
logger.info("Qwen did not provide SL/TP, using Optimized Calculation")
trade_dir = "buy"
action = decision.get('action', 'hold')
if action in ['sell', 'limit_sell']: trade_dir = "sell"
atr = latest_data.get('atr', 0)
price = latest_data.get('close')
market_context = structure_analysis.get('technical_signals', {}).get('ifvg_strategy', {})
calc_sl, calc_tp = self.calculate_optimized_sl_tp(trade_dir, price, atr, market_context=market_context)
2025-12-28 01:17:57 +08:00
if not exit_cond.get('sl_price'): exit_cond['sl_price'] = calc_sl
if not exit_cond.get('tp_price'): exit_cond['tp_price'] = calc_tp
decision['exit_conditions'] = exit_cond
# Send Telegram
try:
sl_price = exit_cond.get('sl_price')
tp_price = exit_cond.get('tp_price')
action = decision.get('action')
display_action = action
if action == 'hold' and not valid_positions:
display_action = "WAITING FOR MARKET DIRECTION ⏳"
ds_signal = structure_analysis.get('preliminary_signal', 'N/A')
ds_score = structure_analysis.get('structure_score', 0)
hybrid_signal = structure_analysis.get('hybrid_signal', 'N/A')
hybrid_score = structure_analysis.get('hybrid_score', 0)
2025-12-28 01:17:57 +08:00
msg = (
f"🤖 *AI Crypto Strategy Insight*\n"
f"Symbol: `{self.symbol}` | TF: `{self.timeframe}`\n"
f"Time: {datetime.now().strftime('%H:%M:%S')}\n\n"
f"🧠 *AI Consensus*\n"
f"• Decision: *{display_action.upper()}*\n"
f"• Qwen Action: `{self.escape_markdown(action)}`\n"
f"• Hybrid Signal: `{hybrid_signal}` ({hybrid_score:.2f})\n"
f"• DeepSeek: `{ds_signal}` ({ds_score}/100)\n\n"
f"📝 *Rationale*: _{self.escape_markdown(decision.get('strategy_rationale'))}_\n\n"
f"🎯 *Setup*\n"
2025-12-28 01:17:57 +08:00
f"• SL: `{sl_price}`\n"
f"• TP: `{tp_price}`\n"
f"• Lev: {decision.get('leverage')}x | Size: {float(decision.get('position_size', 0))*100:.1f}%\n\n"
f"📊 *Market State*: `{structure_analysis.get('market_state')}`"
2025-12-28 01:17:57 +08:00
)
self.send_telegram_message(msg)
except Exception as e:
logger.error(f"Failed to construct telegram: {e}")
2025-12-28 01:17:57 +08:00
return decision
def execute_trade(self, decision):
"""Execute trade based on decision"""
action = decision.get('action')
rationale = decision.get('strategy_rationale', 'No rationale provided')
# Determine current positions status for logic handling
target_pos = None
try:
positions = self.data_processor.exchange.fetch_positions([self.symbol])
if positions:
for p in positions:
if float(p['contracts']) > 0:
target_pos = p
break
except Exception as e:
logger.error(f"Failed to fetch positions during execution: {e}")
# --- CASE 1: Close Logic (close_buy / close_sell) ---
if action in ['close_buy', 'close_sell']:
if target_pos:
logger.info(f"Executing CLOSE position for {self.symbol} based on AI decision")
try:
pos_side = target_pos['side'] # 'long' or 'short'
close_side = 'sell' if pos_side == 'long' else 'buy'
close_amount = float(target_pos['contracts'])
self.data_processor.cancel_all_orders(self.symbol)
order = self.data_processor.create_order(self.symbol, close_side, close_amount, type='market')
if order:
self.send_telegram_message(f"🚫 *Position Closed*\nSymbol: `{self.symbol}`\nReason: AI Signal `{action}`")
except Exception as e:
logger.error(f"Failed to close position: {e}")
return
# --- CASE 2: Hold / Update SL/TP Logic ---
if action == 'hold':
if target_pos:
exit_conditions = decision.get('exit_conditions', {})
new_sl = exit_conditions.get('sl_price')
new_tp = exit_conditions.get('tp_price')
if new_sl or new_tp:
logger.info(f"Updating SL/TP for existing position: SL={new_sl}, TP={new_tp}")
pos_side = target_pos['side']
sl_tp_side = 'sell' if pos_side == 'long' else 'buy'
pos_amount = float(target_pos['contracts'])
self.data_processor.cancel_all_orders(self.symbol)
self.data_processor.place_sl_tp_order(self.symbol, sl_tp_side, pos_amount, sl_price=new_sl, tp_price=new_tp)
self.send_telegram_message(f"🔄 *Updated SL/TP*\nSymbol: `{self.symbol}`\nNew SL: `{new_sl}`\nNew TP: `{new_tp}`")
return
# --- CASE 3: Open New Position (buy / sell) ---
if target_pos:
pos_side = target_pos['side']
is_reversal = (action == 'buy' and pos_side == 'short') or (action == 'sell' and pos_side == 'long')
if is_reversal:
logger.info(f"Reversal signal detected. Closing existing {pos_side} position.")
try:
close_side = 'sell' if pos_side == 'long' else 'buy'
close_amount = float(target_pos['contracts'])
self.data_processor.cancel_all_orders(self.symbol)
self.data_processor.create_order(self.symbol, close_side, close_amount, type='market')
self.send_telegram_message(f"🔄 *Position Reversal Initiated*")
time.sleep(1)
except Exception as e:
logger.error(f"Failed to close position for reversal: {e}")
return
else:
# Same direction, update SL/TP
logger.info(f"Signal {action} matches existing {pos_side} position. Updating SL/TP.")
exit_conditions = decision.get('exit_conditions', {})
new_sl = exit_conditions.get('sl_price')
new_tp = exit_conditions.get('tp_price')
if new_sl or new_tp:
sl_tp_side = 'sell' if pos_side == 'long' else 'buy'
pos_amount = float(target_pos['contracts'])
self.data_processor.cancel_all_orders(self.symbol)
self.data_processor.place_sl_tp_order(self.symbol, sl_tp_side, pos_amount, sl_price=new_sl, tp_price=new_tp)
return
# Execute New Trade
leverage = int(decision.get('leverage', 1))
volume_percent = float(decision.get('position_size', 0.0))
volume_percent = max(0.0, min(1.0, volume_percent))
if volume_percent <= 0: return
balance = self.data_processor.get_account_balance('USDT')
available_usdt = balance['free'] if balance else 0.0
target_usdt = available_usdt * volume_percent
if volume_percent > 0.95: target_usdt *= 0.99
current_price = self.data_processor.get_current_price(self.symbol)
if not current_price: return
target_position_value = target_usdt * leverage
amount_eth = target_position_value / current_price
contract_size = self.data_processor.get_contract_size(self.symbol) or 0.1
num_contracts = int(amount_eth / contract_size)
if num_contracts < 1:
logger.warning(f"Calculated contracts < 1. Required margin too high.")
return
# Prepare SL/TP
exit_conditions = decision.get('exit_conditions', {})
sl_price = exit_conditions.get('sl_price')
tp_price = exit_conditions.get('tp_price')
order_params = {}
if sl_price or tp_price:
algo_order = {'tpTriggerPxType': 'last', 'slTriggerPxType': 'last'}
if tp_price: algo_order['tpTriggerPx'] = str(tp_price); algo_order['tpOrdPx'] = '-1'
if sl_price: algo_order['slTriggerPx'] = str(sl_price); algo_order['slOrdPx'] = '-1'
order_params['attachAlgoOrds'] = [algo_order]
try:
self.data_processor.set_leverage(self.symbol, leverage)
order = None
if action == 'buy':
order = self.data_processor.create_order(self.symbol, 'buy', num_contracts, type='market', params=order_params)
elif action == 'sell':
order = self.data_processor.create_order(self.symbol, 'sell', num_contracts, type='market', params=order_params)
elif action in ['buy_limit', 'limit_buy']:
lp = decision.get('entry_conditions', {}).get('limit_price')
# Auto-fallback if limit price is invalid
if not lp or float(lp) <= 0:
logger.warning("LLM suggested Limit Buy but no price provided. Using ATR fallback.")
atr = latest_data.get('atr', 0)
if atr > 0:
lp = current_price - (atr * 0.5)
logger.info(f"Auto-set Limit Buy Price: {lp:.2f}")
if lp and float(lp) > 0:
# Determine Limit vs Stop
if float(lp) > current_price:
# Price > Current = Stop Buy (Breakout)
# OKX uses 'trigger' orders for stop, but 'limit' for standard limit.
# Standard Limit Buy must be < Current Price.
# If we want a Stop Buy, we need a trigger order.
# For simplicity in this bot, we might convert Stop Buy to Market if close, or use trigger.
# Let's assume standard limit for now, and if price > current, we might need to use 'market' or skip.
# Actually, placing a limit buy above market price executes immediately as taker (market).
# So we can just place it.
pass
order = self.data_processor.create_order(self.symbol, 'buy', num_contracts, type='limit', price=lp, params=order_params)
elif action in ['sell_limit', 'limit_sell']:
lp = decision.get('entry_conditions', {}).get('limit_price')
# Auto-fallback
if not lp or float(lp) <= 0:
logger.warning("LLM suggested Limit Sell but no price provided. Using ATR fallback.")
atr = latest_data.get('atr', 0)
if atr > 0:
lp = current_price + (atr * 0.5)
logger.info(f"Auto-set Limit Sell Price: {lp:.2f}")
if lp and float(lp) > 0:
order = self.data_processor.create_order(self.symbol, 'sell', num_contracts, type='limit', price=lp, params=order_params)
if order:
trade_record = {
'symbol': self.symbol, 'action': action, 'order_type': 'limit' if 'limit' in action else 'market',
'contracts': num_contracts, 'price': current_price, 'leverage': leverage,
'order_id': order.get('id'), 'strategy_rationale': rationale
}
self.db_manager.log_trade(trade_record)
exec_msg = (
f"✅ *Trade Executed*\nAction: `{action.upper()}`\nSymbol: `{self.symbol}`\n"
f"Contracts: `{num_contracts}`\nPrice: `{trade_record['price']}`\nLeverage: `{leverage}x`"
)
self.send_telegram_message(exec_msg)
2025-12-28 01:17:57 +08:00
except Exception as e:
logger.error(f"Failed to execute trade: {e}")
def run_once(self):
"""Run a single trading cycle"""
try:
logger.info("Starting trading cycle...")
df, analysis = self.analyze_market()
if df is not None and analysis is not None:
decision = self.make_decision(df, analysis)
if decision:
self.execute_trade(decision)
logger.info("Trading cycle completed")
except Exception as e:
logger.error(f"Error in trading cycle: {e}", exc_info=True)
def start(self):
"""Start the bot loop"""
self.is_running = True
logger.info(f"Starting bot for {self.symbol} with {self.interval}s interval")
while self.is_running:
self.run_once()
logger.info(f"Waiting {self.interval} seconds before next analysis...")
time.sleep(self.interval)
if __name__ == "__main__":
bot = CryptoTradingBot(symbol='ETH/USDT:USDT', timeframe='15m', interval=900)
bot.start()