markaz_arshy/auto_tune.py
2025-08-12 14:36:24 +00:00

122 lines
3.8 KiB
Python

import sqlite3
def auto_tune_confidence_threshold(
db_path="learning_log.db",
rolling_window=20,
base_threshold=5.0,
low_winrate=0.45,
high_winrate=0.7,
min_threshold=1.0,
max_threshold=10.0,
risk_rule=False,
filter_symbol=None,
filter_tf=None
):
conn = sqlite3.connect(db_path)
c = conn.cursor()
# Filter by symbol/TF
q = "SELECT pnl FROM learning_log"
params = []
if filter_symbol and filter_tf:
q += " WHERE symbol=? AND tf=?"
params = [filter_symbol, filter_tf]
elif filter_symbol:
q += " WHERE symbol=?"
params = [filter_symbol]
elif filter_tf:
q += " WHERE tf=?"
params = [filter_tf]
q += " ORDER BY timestamp DESC LIMIT ?"
params.append(rolling_window)
c.execute(q, params)
rows = c.fetchall()
conn.close()
if not rows or len(rows) < rolling_window:
return base_threshold
pnls = [r[0] for r in rows]
winrate = sum(1 for p in pnls if p > 0) / rolling_window
avg_pnl = sum(pnls) / rolling_window
total_pnl = sum(pnls)
profit_curve = [sum(pnls[:i+1]) for i in range(len(pnls))]
max_dd = min(profit_curve) if profit_curve else 0
new_threshold = base_threshold
if winrate < low_winrate:
new_threshold = base_threshold + 1.0
elif winrate > high_winrate:
new_threshold = base_threshold - 0.5
if total_pnl < 0:
new_threshold += 1.0
if avg_pnl < 0.2:
new_threshold += 0.5
if max_dd < -2:
new_threshold += 1.0
if risk_rule:
new_threshold = base_threshold + (0.55 - winrate) * 5
new_threshold = max(min_threshold, min(new_threshold, max_threshold))
return round(new_threshold, 3)
def auto_tune_min_distance_pips(
db_path="learning_log.db",
rolling_window=20,
base_distance=4000,
low_winrate=0.45,
high_winrate=0.7,
min_distance=1000,
max_distance=10000,
reward_risk_rule=True,
filter_symbol=None,
filter_tf=None
):
conn = sqlite3.connect(db_path)
c = conn.cursor()
# Filter by symbol/TF
q = "SELECT pnl FROM learning_log"
params = []
if filter_symbol and filter_tf:
q += " WHERE symbol=? AND tf=?"
params = [filter_symbol, filter_tf]
elif filter_symbol:
q += " WHERE symbol=?"
params = [filter_symbol]
elif filter_tf:
q += " WHERE tf=?"
params = [filter_tf]
q += " ORDER BY timestamp DESC LIMIT ?"
params.append(rolling_window)
c.execute(q, params)
rows = c.fetchall()
conn.close()
if not rows or len(rows) < rolling_window:
return base_distance
pnls = [r[0] for r in rows]
winrate = sum(1 for p in pnls if p > 0) / rolling_window
avg_pnl = sum(pnls) / rolling_window
rewards = [p for p in pnls if p > 0]
risks = [abs(p) for p in pnls if p <= 0]
reward_risk = (sum(rewards) / len(rewards)) / (sum(risks) / len(risks)) if rewards and risks and sum(risks)!=0 else 1.0
new_distance = base_distance
if winrate < low_winrate:
new_distance += 1000
elif winrate > high_winrate:
new_distance -= 800
if avg_pnl < 0.2:
new_distance += 500
if reward_risk_rule and reward_risk < 1.0:
new_distance += 1000
elif reward_risk_rule and reward_risk > 2.0:
new_distance -= 600
new_distance = int(max(min_distance, min(new_distance, max_distance)))
return new_distance
def is_on_grid(entry_price: float, existing_prices: list, grid_pips: float, pip_size: float = 0.01) -> bool:
grid_distance = grid_pips * pip_size
for price in existing_prices:
if abs(entry_price - price) < (grid_distance / 2): # setengah grid = buffer
return True
return False