367 lines
No EOL
12 KiB
Python
367 lines
No EOL
12 KiB
Python
#+------------------------------------------------------------------+
|
|
#| Imports |
|
|
#+------------------------------------------------------------------+
|
|
# definiciones bases para las utiliades
|
|
# json
|
|
from Src.Ops.JsonFIle import *
|
|
#---
|
|
# Este archivo se definen las utildiades para trabajar con repos..
|
|
#
|
|
|
|
#+------------------------------------------------------------------+
|
|
#| |
|
|
#+------------------------------------------------------------------+
|
|
def clone_repo(url: str, target_path: str, rama: str) -> bool:
|
|
"""
|
|
Clona un repo en target_path
|
|
Retorna True si éxito, False si falla
|
|
"""
|
|
#--- Obtenemos nombre
|
|
start_barra : int = target_path.find("/")
|
|
start_barra = start_barra if start_barra >= 0 else 0
|
|
repo_name : str = target_path[start_barra:len(target_path)] # extraemos el nombre
|
|
|
|
#--- Clonamos
|
|
try:
|
|
subprocess.run(
|
|
["git", "clone", "-b", rama, url, target_path],
|
|
check=True,
|
|
timeout=60
|
|
)
|
|
return True
|
|
except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e:
|
|
click.echo(f"[CLONE:{repo_name}] Clone failed: {e}", err=True)
|
|
return False
|
|
|
|
|
|
#+------------------------------------------------------------------+
|
|
#| |
|
|
#+------------------------------------------------------------------+
|
|
def update_repo(repo_path: str, rama: str) -> bool:
|
|
"""
|
|
Pull un repo existente
|
|
Retorna True si éxito, False si falla
|
|
"""
|
|
|
|
#--- Obtenemos nombre
|
|
start_barra : int = repo_path.find("/")
|
|
start_barra = start_barra if start_barra >= 0 else 0
|
|
repo_name : str = repo_path[start_barra:len(repo_path)] # extraemos el nombre
|
|
|
|
#--- Update
|
|
try:
|
|
subprocess.run(["git", "-C", repo_path, "checkout", rama], check=True, timeout=30)
|
|
subprocess.run(["git", "-C", repo_path, "pull"], check=True, timeout=60)
|
|
return True
|
|
except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e:
|
|
click.echo(f"[UPDATE:{repo_name}] Update failed: {e}", err=True)
|
|
return False
|
|
|
|
|
|
#+------------------------------------------------------------------+
|
|
#| |
|
|
#+------------------------------------------------------------------+
|
|
def repo_exists(repo_path: str) -> bool:
|
|
"""Verifica si un repo existe localmente"""
|
|
return os.path.exists(repo_path) and os.path.exists(os.path.join(repo_path, ".git"))
|
|
|
|
|
|
#+------------------------------------------------------------------+
|
|
#| |
|
|
#+------------------------------------------------------------------+
|
|
# repo_path = path al repo como tal
|
|
# root: path al root como tal (ejemplo folder share proyects)
|
|
# Nota a este punto dicho repo ya deberia de existir... (por eso en install lo clonamso de base)
|
|
def process_dependencies(repo_path : str, root: str, visited: set, on_pre_process: callable, on_post_process : callable) -> bool:
|
|
"""Procesa dependencias recursivamente"""
|
|
|
|
#--- Obtener el JSON
|
|
# Obtenemos el path del dependencies.json
|
|
deps_json_path = os.path.join(repo_path, "dependencies.json")
|
|
|
|
# Check exist
|
|
if not os.path.exists(deps_json_path):
|
|
repo_basename = os.path.basename(repo_path)
|
|
click.echo(f"[INSTALL:{repo_basename}] No dependencies.json found")
|
|
return True
|
|
|
|
# Cargar JSON
|
|
data : dict = load_json(deps_json_path)
|
|
if data is None:
|
|
return False
|
|
|
|
# Validar el json
|
|
is_valid, errors = validate_json(data)
|
|
if not is_valid:
|
|
repo_basename = os.path.basename(repo_path)
|
|
click.echo(f"[INSTALL:{repo_basename}] Invalid JSON:", err=True)
|
|
for error in errors:
|
|
click.echo(f"[INSTALL:{repo_basename}] - {error}", err=True)
|
|
return False
|
|
|
|
|
|
#--- Hook
|
|
if on_pre_process:
|
|
on_pre_process(data["hooks"], "pre_install")
|
|
|
|
#--- Iterar sobre todos los repos
|
|
repos = data.get("repos", [])
|
|
repo_basename = os.path.basename(repo_path)
|
|
click.echo(f"[INSTALL:{repo_basename}] Found {len(repos)} repositories")
|
|
|
|
all_success : bool = True # Por defecto true (se acepta que no haya repos)
|
|
for repo in repos:
|
|
name : str = repo["name"]
|
|
url : str = repo["url"]
|
|
rama : str = repo.get("rama", "main")
|
|
|
|
#--- Evitar duplicados
|
|
if name in visited:
|
|
click.echo(f"[INSTALL:{name}] Already processed, skipping")
|
|
continue
|
|
|
|
visited.add(name)
|
|
|
|
#--- Clone o update
|
|
repo_target_path : str = os.path.join(root, name)
|
|
|
|
if repo_exists(repo_target_path):
|
|
click.echo(f"[INSTALL:{name}] Updating ({rama})")
|
|
success = update_repo(repo_target_path, rama)
|
|
else:
|
|
click.echo(f"[INSTALL:{name}] Cloning from {url}")
|
|
success = clone_repo(url, repo_target_path, rama)
|
|
|
|
#--- Check resultaado
|
|
if not success:
|
|
click.echo(f"[INSTALL:{name}] Failed", err=True)
|
|
all_success = False
|
|
continue
|
|
|
|
#--- Recursión
|
|
click.echo(f"[INSTALL:{name}] Processing sub-dependencies...")
|
|
|
|
# Siguiente
|
|
if not process_dependencies(repo_target_path, root, visited, on_pre_process):
|
|
all_success = False
|
|
|
|
#--- Ejecutamos el hook 2
|
|
# Solo si no es None
|
|
if on_post_process and data.get("hooks", {}).get("post_install_only_on_success", False):
|
|
on_post_process(data["hooks"], "post_install")
|
|
|
|
return all_success
|
|
|
|
|
|
#+------------------------------------------------------------------+
|
|
#| |
|
|
#+------------------------------------------------------------------+
|
|
# Actuliza dependecias
|
|
def update_dependencies(repo_path: str, root: str, visited: set, on_pre_process: callable = None, on_post_process: callable = None) -> bool:
|
|
"""Actualiza dependencias recursivamente (solo pull, no clone)"""
|
|
|
|
#--- Obtener el JSON
|
|
deps_json_path = os.path.join(repo_path, "dependencies.json")
|
|
|
|
# Check exist
|
|
if not os.path.exists(deps_json_path):
|
|
repo_basename = os.path.basename(repo_path)
|
|
click.echo(f"[UPDATE:{repo_basename}] No dependencies.json found")
|
|
return True
|
|
|
|
# Cargar JSON
|
|
data: dict = load_json(deps_json_path)
|
|
if data is None:
|
|
return False
|
|
|
|
# Validar el json
|
|
is_valid, errors = validate_json(data)
|
|
if not is_valid:
|
|
repo_basename = os.path.basename(repo_path)
|
|
click.echo(f"[UPDATE:{repo_basename}] Invalid JSON:", err=True)
|
|
for error in errors:
|
|
click.echo(f"[UPDATE:{repo_basename}] - {error}", err=True)
|
|
return False
|
|
|
|
#--- Hook PRE
|
|
if on_pre_process:
|
|
on_pre_process(data.get("hooks", {}), "pre_update")
|
|
|
|
#--- Iterar sobre todos los repos
|
|
repos = data.get("repos", [])
|
|
repo_basename = os.path.basename(repo_path)
|
|
click.echo(f"[UPDATE:{repo_basename}] Found {len(repos)} repositories to update")
|
|
|
|
all_success: bool = True
|
|
for repo in repos:
|
|
name: str = repo["name"]
|
|
rama: str = repo.get("rama", "main")
|
|
|
|
#--- Evitar duplicados
|
|
if name in visited:
|
|
click.echo(f"[UPDATE:{name}] Already updated, skipping")
|
|
continue
|
|
|
|
visited.add(name)
|
|
|
|
#--- Update (solo pull, no clone)
|
|
repo_target_path: str = os.path.join(root, name)
|
|
|
|
# Verificar que exista
|
|
if not repo_exists(repo_target_path):
|
|
click.echo(f"[UPDATE:{name}] Repository does not exist (must run install first)", err=True)
|
|
all_success = False
|
|
continue
|
|
|
|
# Pull
|
|
click.echo(f"[UPDATE:{name}] Updating repository (branch: {rama})")
|
|
|
|
#--- Check resultado
|
|
if not update_repo(repo_target_path, rama):
|
|
click.echo(f"[UPDATE:{name}] Error updating repository", err=True)
|
|
all_success = False
|
|
continue
|
|
|
|
#--- Recursión
|
|
click.echo(f"[UPDATE:{name}] Updating sub-dependencies...")
|
|
|
|
if not update_dependencies(repo_target_path, root, visited, on_pre_process, on_post_process):
|
|
all_success = False
|
|
|
|
#--- Hook POST
|
|
if on_post_process and data.get("hooks", {}).get("post_update_only_on_success", False):
|
|
if all_success:
|
|
on_post_process(data.get("hooks", {}), "post_update")
|
|
|
|
return all_success
|
|
|
|
|
|
#+------------------------------------------------------------------+
|
|
#| |
|
|
#+------------------------------------------------------------------+
|
|
# Imprime la rama de dpendeicas de un repo en base a su path.. y datos out\in para la recuidivdida..
|
|
def imprimir_dependencias_repo(
|
|
path: str,
|
|
prev_str: str,
|
|
current_depth: int,
|
|
max_depth: int
|
|
):
|
|
"""
|
|
Imprime arbol de dependencias recursivamente
|
|
|
|
Args:
|
|
path: ruta del repo
|
|
prev_str: string de prefijo visual
|
|
current_depth: profundidad actual
|
|
max_depth: profundidad máxima
|
|
"""
|
|
|
|
#--- Verificar límite de profundidad
|
|
if current_depth >= max_depth:
|
|
return
|
|
|
|
#--- Cargar JSON
|
|
json_path : str = os.path.join(path, "dependencies.json")
|
|
|
|
if not os.path.exists(json_path):
|
|
click.echo(f"{prev_str}No dependencies.json found")
|
|
return
|
|
|
|
data: dict = load_json(json_path)
|
|
if not data:
|
|
click.echo(f"{prev_str}Failed to load JSON")
|
|
return
|
|
|
|
if "repos" not in data:
|
|
click.echo(f"{prev_str}JSON without valid structure")
|
|
return
|
|
|
|
|
|
repos : list[dict] = data.get("repos", [])
|
|
repos_size : int = len(repos)
|
|
|
|
if repos_size < 1:
|
|
return
|
|
|
|
#--- Iterar sobre repos
|
|
for index, repo in enumerate(repos):
|
|
name: str = repo["name"]
|
|
|
|
# Decidir símbolo
|
|
if (index == repos_size - 1):
|
|
current_char: str = "└── "
|
|
next_prev_str: str = prev_str + " "
|
|
else:
|
|
current_char: str = "├── "
|
|
next_prev_str: str = prev_str + "| "
|
|
|
|
click.echo(f"{prev_str}{current_char}{name}/")
|
|
|
|
# Recursión con nueva profundidad
|
|
repo_path: str = os.path.join(os.getcwd(), name)
|
|
if os.path.exists(repo_path):
|
|
imprimir_dependencias_repo(
|
|
repo_path,
|
|
next_prev_str,
|
|
current_depth=current_depth + 1, # Incrementar profundidad
|
|
max_depth=max_depth
|
|
)
|
|
|
|
|
|
#+------------------------------------------------------------------+
|
|
#| |
|
|
#+------------------------------------------------------------------+
|
|
def check_repo(path: str):
|
|
"""Verify that all repositories in dependencies.json are cloned"""
|
|
|
|
repo_name: str = os.path.basename(path)
|
|
click.echo(f"[CHECK:{repo_name}] Starting verification")
|
|
|
|
#--- Cargar JSON
|
|
json_path: str = os.path.join(path, "dependencies.json")
|
|
|
|
if not os.path.exists(json_path):
|
|
click.echo(f"[CHECK:{repo_name}] No dependencies.json found")
|
|
return
|
|
|
|
data: dict = load_json(json_path)
|
|
if not data:
|
|
click.echo(f"[CHECK:{repo_name}] Failed to load JSON")
|
|
return
|
|
|
|
# validate_json retorna (bool, list)
|
|
is_valid, errors = validate_json(data)
|
|
if not is_valid:
|
|
click.echo(f"[CHECK:{repo_name}] Invalid JSON:", err=True)
|
|
for error in errors:
|
|
click.echo(f"[CHECK:{repo_name}] - {error}", err=True)
|
|
return
|
|
|
|
repos: list[dict] = data.get("repos", [])
|
|
repos_size: int = len(repos)
|
|
|
|
if repos_size < 1:
|
|
click.echo(f"[CHECK:{repo_name}] No dependencies found")
|
|
return
|
|
|
|
click.echo(f"[CHECK:{repo_name}] Verifying {repos_size} repositories...")
|
|
|
|
#--- Iterar sobre repos
|
|
ok_count: int = 0
|
|
fail_count: int = 0
|
|
|
|
for repo in repos:
|
|
name: str = repo["name"]
|
|
repo_path: str = os.path.join(os.getcwd(), name)
|
|
|
|
# Verificar que exista
|
|
if repo_exists(repo_path):
|
|
click.echo(f"[CHECK:{repo_name}] [OK] {name}")
|
|
ok_count += 1
|
|
else:
|
|
click.echo(f"[CHECK:{repo_name}] [FAIL] {name} (not cloned)")
|
|
fail_count += 1
|
|
|
|
#--- Resumen de las depdneicas
|
|
click.echo(f"[CHECK:{repo_name}] Result: {ok_count} OK, {fail_count} FAIL")
|
|
click.echo(f"[CHECK:{repo_name}] Verification completed") |