TSNDep/Src/Ops/Git.py
Nique_372 54bd302736
2026-04-25 06:51:23 -05:00

367 lines
No EOL
12 KiB
Python

#+------------------------------------------------------------------+
#| Imports |
#+------------------------------------------------------------------+
# definiciones bases para las utiliades
# json
from Src.Ops.JsonFIle import *
#---
# Este archivo se definen las utildiades para trabajar con repos..
#
#+------------------------------------------------------------------+
#| |
#+------------------------------------------------------------------+
def clone_repo(url: str, target_path: str, rama: str) -> bool:
"""
Clona un repo en target_path
Retorna True si éxito, False si falla
"""
#--- Obtenemos nombre
start_barra : int = target_path.find("/")
start_barra = start_barra if start_barra >= 0 else 0
repo_name : str = target_path[start_barra:len(target_path)] # extraemos el nombre
#--- Clonamos
try:
subprocess.run(
["git", "clone", "-b", rama, url, target_path],
check=True,
timeout=60
)
return True
except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e:
click.echo(f"[CLONE:{repo_name}] Clone failed: {e}", err=True)
return False
#+------------------------------------------------------------------+
#| |
#+------------------------------------------------------------------+
def update_repo(repo_path: str, rama: str) -> bool:
"""
Pull un repo existente
Retorna True si éxito, False si falla
"""
#--- Obtenemos nombre
start_barra : int = repo_path.find("/")
start_barra = start_barra if start_barra >= 0 else 0
repo_name : str = repo_path[start_barra:len(repo_path)] # extraemos el nombre
#--- Update
try:
subprocess.run(["git", "-C", repo_path, "checkout", rama], check=True, timeout=30)
subprocess.run(["git", "-C", repo_path, "pull"], check=True, timeout=60)
return True
except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e:
click.echo(f"[UPDATE:{repo_name}] Update failed: {e}", err=True)
return False
#+------------------------------------------------------------------+
#| |
#+------------------------------------------------------------------+
def repo_exists(repo_path: str) -> bool:
"""Verifica si un repo existe localmente"""
return os.path.exists(repo_path) and os.path.exists(os.path.join(repo_path, ".git"))
#+------------------------------------------------------------------+
#| |
#+------------------------------------------------------------------+
# repo_path = path al repo como tal
# root: path al root como tal (ejemplo folder share proyects)
# Nota a este punto dicho repo ya deberia de existir... (por eso en install lo clonamso de base)
def process_dependencies(repo_path : str, root: str, visited: set, on_pre_process: callable, on_post_process : callable) -> bool:
"""Procesa dependencias recursivamente"""
#--- Obtener el JSON
# Obtenemos el path del dependencies.json
deps_json_path = os.path.join(repo_path, "dependencies.json")
# Check exist
if not os.path.exists(deps_json_path):
repo_basename = os.path.basename(repo_path)
click.echo(f"[INSTALL:{repo_basename}] No dependencies.json found")
return True
# Cargar JSON
data : dict = load_json(deps_json_path)
if data is None:
return False
# Validar el json
is_valid, errors = validate_json(data)
if not is_valid:
repo_basename = os.path.basename(repo_path)
click.echo(f"[INSTALL:{repo_basename}] Invalid JSON:", err=True)
for error in errors:
click.echo(f"[INSTALL:{repo_basename}] - {error}", err=True)
return False
#--- Hook
if on_pre_process:
on_pre_process(data["hooks"], "pre_install")
#--- Iterar sobre todos los repos
repos = data.get("repos", [])
repo_basename = os.path.basename(repo_path)
click.echo(f"[INSTALL:{repo_basename}] Found {len(repos)} repositories")
all_success : bool = True # Por defecto true (se acepta que no haya repos)
for repo in repos:
name : str = repo["name"]
url : str = repo["url"]
rama : str = repo.get("rama", "main")
#--- Evitar duplicados
if name in visited:
click.echo(f"[INSTALL:{name}] Already processed, skipping")
continue
visited.add(name)
#--- Clone o update
repo_target_path : str = os.path.join(root, name)
if repo_exists(repo_target_path):
click.echo(f"[INSTALL:{name}] Updating ({rama})")
success = update_repo(repo_target_path, rama)
else:
click.echo(f"[INSTALL:{name}] Cloning from {url}")
success = clone_repo(url, repo_target_path, rama)
#--- Check resultaado
if not success:
click.echo(f"[INSTALL:{name}] Failed", err=True)
all_success = False
continue
#--- Recursión
click.echo(f"[INSTALL:{name}] Processing sub-dependencies...")
# Siguiente
if not process_dependencies(repo_target_path, root, visited, on_pre_process):
all_success = False
#--- Ejecutamos el hook 2
# Solo si no es None
if on_post_process and data.get("hooks", {}).get("post_install_only_on_success", False):
on_post_process(data["hooks"], "post_install")
return all_success
#+------------------------------------------------------------------+
#| |
#+------------------------------------------------------------------+
# Actuliza dependecias
def update_dependencies(repo_path: str, root: str, visited: set, on_pre_process: callable = None, on_post_process: callable = None) -> bool:
"""Actualiza dependencias recursivamente (solo pull, no clone)"""
#--- Obtener el JSON
deps_json_path = os.path.join(repo_path, "dependencies.json")
# Check exist
if not os.path.exists(deps_json_path):
repo_basename = os.path.basename(repo_path)
click.echo(f"[UPDATE:{repo_basename}] No dependencies.json found")
return True
# Cargar JSON
data: dict = load_json(deps_json_path)
if data is None:
return False
# Validar el json
is_valid, errors = validate_json(data)
if not is_valid:
repo_basename = os.path.basename(repo_path)
click.echo(f"[UPDATE:{repo_basename}] Invalid JSON:", err=True)
for error in errors:
click.echo(f"[UPDATE:{repo_basename}] - {error}", err=True)
return False
#--- Hook PRE
if on_pre_process:
on_pre_process(data.get("hooks", {}), "pre_update")
#--- Iterar sobre todos los repos
repos = data.get("repos", [])
repo_basename = os.path.basename(repo_path)
click.echo(f"[UPDATE:{repo_basename}] Found {len(repos)} repositories to update")
all_success: bool = True
for repo in repos:
name: str = repo["name"]
rama: str = repo.get("rama", "main")
#--- Evitar duplicados
if name in visited:
click.echo(f"[UPDATE:{name}] Already updated, skipping")
continue
visited.add(name)
#--- Update (solo pull, no clone)
repo_target_path: str = os.path.join(root, name)
# Verificar que exista
if not repo_exists(repo_target_path):
click.echo(f"[UPDATE:{name}] Repository does not exist (must run install first)", err=True)
all_success = False
continue
# Pull
click.echo(f"[UPDATE:{name}] Updating repository (branch: {rama})")
#--- Check resultado
if not update_repo(repo_target_path, rama):
click.echo(f"[UPDATE:{name}] Error updating repository", err=True)
all_success = False
continue
#--- Recursión
click.echo(f"[UPDATE:{name}] Updating sub-dependencies...")
if not update_dependencies(repo_target_path, root, visited, on_pre_process, on_post_process):
all_success = False
#--- Hook POST
if on_post_process and data.get("hooks", {}).get("post_update_only_on_success", False):
if all_success:
on_post_process(data.get("hooks", {}), "post_update")
return all_success
#+------------------------------------------------------------------+
#| |
#+------------------------------------------------------------------+
# Imprime la rama de dpendeicas de un repo en base a su path.. y datos out\in para la recuidivdida..
def imprimir_dependencias_repo(
path: str,
prev_str: str,
current_depth: int,
max_depth: int
):
"""
Imprime arbol de dependencias recursivamente
Args:
path: ruta del repo
prev_str: string de prefijo visual
current_depth: profundidad actual
max_depth: profundidad máxima
"""
#--- Verificar límite de profundidad
if current_depth >= max_depth:
return
#--- Cargar JSON
json_path : str = os.path.join(path, "dependencies.json")
if not os.path.exists(json_path):
click.echo(f"{prev_str}No dependencies.json found")
return
data: dict = load_json(json_path)
if not data:
click.echo(f"{prev_str}Failed to load JSON")
return
if "repos" not in data:
click.echo(f"{prev_str}JSON without valid structure")
return
repos : list[dict] = data.get("repos", [])
repos_size : int = len(repos)
if repos_size < 1:
return
#--- Iterar sobre repos
for index, repo in enumerate(repos):
name: str = repo["name"]
# Decidir símbolo
if (index == repos_size - 1):
current_char: str = "└── "
next_prev_str: str = prev_str + " "
else:
current_char: str = "├── "
next_prev_str: str = prev_str + "| "
click.echo(f"{prev_str}{current_char}{name}/")
# Recursión con nueva profundidad
repo_path: str = os.path.join(os.getcwd(), name)
if os.path.exists(repo_path):
imprimir_dependencias_repo(
repo_path,
next_prev_str,
current_depth=current_depth + 1, # Incrementar profundidad
max_depth=max_depth
)
#+------------------------------------------------------------------+
#| |
#+------------------------------------------------------------------+
def check_repo(path: str):
"""Verify that all repositories in dependencies.json are cloned"""
repo_name: str = os.path.basename(path)
click.echo(f"[CHECK:{repo_name}] Starting verification")
#--- Cargar JSON
json_path: str = os.path.join(path, "dependencies.json")
if not os.path.exists(json_path):
click.echo(f"[CHECK:{repo_name}] No dependencies.json found")
return
data: dict = load_json(json_path)
if not data:
click.echo(f"[CHECK:{repo_name}] Failed to load JSON")
return
# validate_json retorna (bool, list)
is_valid, errors = validate_json(data)
if not is_valid:
click.echo(f"[CHECK:{repo_name}] Invalid JSON:", err=True)
for error in errors:
click.echo(f"[CHECK:{repo_name}] - {error}", err=True)
return
repos: list[dict] = data.get("repos", [])
repos_size: int = len(repos)
if repos_size < 1:
click.echo(f"[CHECK:{repo_name}] No dependencies found")
return
click.echo(f"[CHECK:{repo_name}] Verifying {repos_size} repositories...")
#--- Iterar sobre repos
ok_count: int = 0
fail_count: int = 0
for repo in repos:
name: str = repo["name"]
repo_path: str = os.path.join(os.getcwd(), name)
# Verificar que exista
if repo_exists(repo_path):
click.echo(f"[CHECK:{repo_name}] [OK] {name}")
ok_count += 1
else:
click.echo(f"[CHECK:{repo_name}] [FAIL] {name} (not cloned)")
fail_count += 1
#--- Resumen de las depdneicas
click.echo(f"[CHECK:{repo_name}] Result: {ok_count} OK, {fail_count} FAIL")
click.echo(f"[CHECK:{repo_name}] Verification completed")