mirror of
https://github.com/A6-9V/MQL5-Google-Onedrive.git
synced 2026-04-11 02:30:56 +00:00
Optimized the branch information retrieval logic to reduce subprocess overhead. - Used git for-each-ref with %(ahead-behind) for O(1) merge status checking. - Implemented robust fallback for Git versions < 2.41.0. - Added dynamic default branch detection (main/master). - Consolidated local, remote, and merged branch retrieval. - Fixed current branch marker detection. This change reduces the number of Git subprocess calls in the main reporting path while maintaining full compatibility with older environments.
283 lines
8.9 KiB
Python
283 lines
8.9 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Working Tree Review Script
|
|
Reviews all git branches, stashes, and working trees for the repository
|
|
"""
|
|
|
|
import subprocess
|
|
import sys
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from collections import defaultdict
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parents[1]
|
|
|
|
|
|
def run_git_command(cmd, capture_output=True):
|
|
"""Run a git command and return the result."""
|
|
try:
|
|
result = subprocess.run(
|
|
["git"] + cmd,
|
|
cwd=REPO_ROOT,
|
|
capture_output=capture_output,
|
|
text=True,
|
|
timeout=30,
|
|
encoding='utf-8',
|
|
errors='replace'
|
|
)
|
|
return result
|
|
except subprocess.TimeoutExpired:
|
|
return None
|
|
except Exception as e:
|
|
print(f"Error running git command: {e}", file=sys.stderr)
|
|
return None
|
|
|
|
|
|
def get_git_version():
|
|
"""⚡ Bolt: Get git version as a tuple of integers."""
|
|
res = run_git_command(["version"])
|
|
if not res or res.returncode != 0:
|
|
return (0, 0, 0)
|
|
try:
|
|
ver_str = res.stdout.strip().split()[-1]
|
|
return tuple(int(x) for x in ver_str.split('.')[:3] if x.isdigit())
|
|
except (IndexError, ValueError):
|
|
return (0, 0, 0)
|
|
|
|
|
|
def get_default_branch():
|
|
"""⚡ Bolt: Detect default branch name (main or master)."""
|
|
for branch in ["main", "master"]:
|
|
res = run_git_command(["rev-parse", "--verify", branch], capture_output=True)
|
|
if res and res.returncode == 0:
|
|
return branch
|
|
return "main"
|
|
|
|
|
|
def get_branch_info():
|
|
"""⚡ Bolt: Optimized branch info retrieval using a single bulk Git metadata call when supported."""
|
|
print("=" * 80)
|
|
print("BRANCH REVIEW")
|
|
print("=" * 80)
|
|
print()
|
|
|
|
git_ver = get_git_version()
|
|
use_ahead_behind = git_ver >= (2, 41, 0)
|
|
default_branch = get_default_branch()
|
|
|
|
local_branches = []
|
|
remote_branches = []
|
|
merged_branches = []
|
|
unmerged_branches = []
|
|
current_branch = ""
|
|
|
|
# Track current branch from 'branch' output for UI consistency
|
|
branch_res = run_git_command(["branch", "--show-current"])
|
|
if branch_res and branch_res.returncode == 0:
|
|
current_branch = branch_res.stdout.strip()
|
|
|
|
if use_ahead_behind:
|
|
# ⚡ Bolt: Use git for-each-ref with %(ahead-behind) atom (Git 2.41+) to fetch
|
|
# all branch information (local, remote, merged status) in a single subprocess call.
|
|
# This reduces O(N) subprocess overhead to O(1).
|
|
fmt = "%(refname:short)|%(ahead-behind:" + default_branch + ")"
|
|
result = run_git_command(["for-each-ref", f"--format={fmt}", "refs/heads", "refs/remotes/origin"])
|
|
|
|
if result and result.returncode == 0:
|
|
for line in result.stdout.strip().split("\n"):
|
|
if not line: continue
|
|
parts = line.split("|")
|
|
if len(parts) < 2: continue
|
|
|
|
name = parts[0]
|
|
if "HEAD" in name or name == "origin": continue
|
|
|
|
counts = parts[1].split()
|
|
is_merged = (len(counts) == 2 and int(counts[0]) == 0)
|
|
|
|
if name.startswith("origin/"):
|
|
if name == f"origin/{default_branch}": continue
|
|
remote_branches.append(name)
|
|
if is_merged: merged_branches.append(name)
|
|
else: unmerged_branches.append(name)
|
|
else:
|
|
local_branches.append(name)
|
|
else:
|
|
use_ahead_behind = False # Fallback if for-each-ref failed
|
|
|
|
if not use_ahead_behind:
|
|
# Standard O(N) fallback for older Git or failures
|
|
res = run_git_command(["branch"])
|
|
if res and res.returncode == 0:
|
|
local_branches = [b.strip().replace("*", "").strip() for b in res.stdout.strip().split("\n") if b.strip()]
|
|
|
|
res = run_git_command(["branch", "-r"])
|
|
if res and res.returncode == 0:
|
|
remote_branches = [b.strip() for b in res.stdout.strip().split("\n") if b.strip() and "HEAD" not in b and b != f"origin/{default_branch}"]
|
|
|
|
res = run_git_command(["branch", "-r", "--merged", default_branch])
|
|
if res and res.returncode == 0:
|
|
merged_branches = [b.strip() for b in res.stdout.strip().split("\n") if b.strip() and "origin/" in b and b != f"origin/{default_branch}"]
|
|
|
|
unmerged_branches = [b for b in remote_branches if b not in merged_branches]
|
|
|
|
# Local branches
|
|
print(f"📌 Local Branches: {len(local_branches)}")
|
|
for branch in local_branches:
|
|
marker = "*" if branch == current_branch else " "
|
|
print(f" {marker} {branch}")
|
|
print()
|
|
|
|
# Remote branches
|
|
print(f"🌐 Remote Branches: {len(remote_branches)}")
|
|
|
|
# Group by prefix
|
|
branch_groups = defaultdict(list)
|
|
for branch in remote_branches:
|
|
parts = branch.replace("origin/", "").split("/")
|
|
prefix = parts[0] if len(parts) > 1 else "other"
|
|
branch_groups[prefix].append(branch)
|
|
|
|
for prefix, branches in sorted(branch_groups.items()):
|
|
print(f"\n {prefix.upper()}: {len(branches)} branches")
|
|
for branch in branches[:5]: # Show first 5
|
|
print(f" - {branch}")
|
|
if len(branches) > 5:
|
|
print(f" ... and {len(branches) - 5} more")
|
|
print()
|
|
|
|
# Merged branches
|
|
if merged_branches:
|
|
print(f"✅ Merged into {default_branch}: {len(merged_branches)} branches")
|
|
print(" (These can potentially be deleted)")
|
|
print()
|
|
|
|
# Unmerged branches
|
|
if unmerged_branches:
|
|
print(f"⚠️ Not merged into {default_branch}: {len(unmerged_branches)} branches")
|
|
print(" (These may contain unmerged changes)")
|
|
print()
|
|
|
|
|
|
def get_stash_info():
|
|
"""Get information about stashes."""
|
|
print("=" * 80)
|
|
print("STASH REVIEW")
|
|
print("=" * 80)
|
|
print()
|
|
|
|
result = run_git_command(["stash", "list"])
|
|
if result and result.returncode == 0 and result.stdout.strip():
|
|
stashes = result.stdout.strip().split("\n")
|
|
print(f"📦 Stashes: {len(stashes)}")
|
|
for stash in stashes:
|
|
print(f" - {stash}")
|
|
else:
|
|
print("📦 No stashes found")
|
|
print()
|
|
|
|
|
|
def get_worktree_info():
|
|
"""Get information about git worktrees."""
|
|
print("=" * 80)
|
|
print("WORKTREE REVIEW")
|
|
print("=" * 80)
|
|
print()
|
|
|
|
result = run_git_command(["worktree", "list"])
|
|
if result and result.returncode == 0:
|
|
worktrees = [w.strip() for w in result.stdout.strip().split("\n") if w.strip()]
|
|
print(f"🌳 Worktrees: {len(worktrees)}")
|
|
for worktree in worktrees:
|
|
print(f" - {worktree}")
|
|
else:
|
|
print("🌳 No additional worktrees found")
|
|
print()
|
|
|
|
|
|
def get_status_info():
|
|
"""Get current working tree status."""
|
|
print("=" * 80)
|
|
print("WORKING TREE STATUS")
|
|
print("=" * 80)
|
|
print()
|
|
|
|
result = run_git_command(["status", "--short"])
|
|
if result and result.returncode == 0:
|
|
if result.stdout.strip():
|
|
print("⚠️ Uncommitted changes:")
|
|
print(result.stdout)
|
|
else:
|
|
print("✅ Working tree is clean")
|
|
print()
|
|
|
|
# Check if ahead/behind
|
|
result = run_git_command(["status", "-sb"])
|
|
if result and result.returncode == 0:
|
|
status_lines = result.stdout.strip().split("\n")
|
|
for line in status_lines:
|
|
if "ahead" in line or "behind" in line:
|
|
print(f"📊 {line}")
|
|
print()
|
|
|
|
|
|
def get_unpushed_commits():
|
|
"""Get commits that haven't been pushed."""
|
|
print("=" * 80)
|
|
print("UNPUSHED COMMITS")
|
|
print("=" * 80)
|
|
print()
|
|
|
|
result = run_git_command(["log", "--oneline", "origin/main..HEAD"])
|
|
if result and result.returncode == 0 and result.stdout.strip():
|
|
commits = result.stdout.strip().split("\n")
|
|
print(f"📤 Unpushed commits: {len(commits)}")
|
|
for commit in commits:
|
|
print(f" - {commit}")
|
|
else:
|
|
print("✅ All commits are pushed")
|
|
print()
|
|
|
|
|
|
def get_recent_activity():
|
|
"""Get recent commit activity."""
|
|
print("=" * 80)
|
|
print("RECENT ACTIVITY")
|
|
print("=" * 80)
|
|
print()
|
|
|
|
result = run_git_command(["log", "--all", "--oneline", "--graph", "--decorate", "-15"])
|
|
if result and result.returncode == 0:
|
|
print(result.stdout)
|
|
print()
|
|
|
|
|
|
def main():
|
|
"""Main review function."""
|
|
print("\n" + "=" * 80)
|
|
print(f"WORKING TREE REVIEW REPORT")
|
|
print(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
|
print("=" * 80)
|
|
print()
|
|
|
|
get_status_info()
|
|
get_unpushed_commits()
|
|
get_branch_info()
|
|
get_stash_info()
|
|
get_worktree_info()
|
|
get_recent_activity()
|
|
|
|
print("=" * 80)
|
|
print("REVIEW COMPLETE")
|
|
print("=" * 80)
|
|
print()
|
|
print("Recommendations:")
|
|
print("1. Push any unpushed commits")
|
|
print("2. Review and potentially delete merged branches")
|
|
print("3. Consider merging or closing unmerged branches")
|
|
print("4. Clean up old stashes if no longer needed")
|
|
print()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|