#+------------------------------------------------------------------+ #| Imports | #+------------------------------------------------------------------+ # definiciones bases para las utiliades # json from Src.Ops.JsonFIle import * #--- # Este archivo se definen las utildiades para trabajar con repos.. # #+------------------------------------------------------------------+ #| | #+------------------------------------------------------------------+ def clone_repo(url: str, target_path: str, rama: str) -> bool: """ Clona un repo en target_path Retorna True si éxito, False si falla """ #--- Obtenemos nombre start_barra : int = target_path.find("/") start_barra = start_barra if start_barra >= 0 else 0 repo_name : str = target_path[start_barra:len(target_path)] # extraemos el nombre #--- Clonamos try: subprocess.run( ["git", "clone", "-b", rama, url, target_path], check=True, timeout=60 ) return True except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e: click.echo(f"[CLONE:{repo_name}] Clone failed: {e}", err=True) return False #+------------------------------------------------------------------+ #| | #+------------------------------------------------------------------+ def update_repo(repo_path: str, rama: str) -> bool: """ Pull un repo existente Retorna True si éxito, False si falla """ #--- Obtenemos nombre start_barra : int = repo_path.find("/") start_barra = start_barra if start_barra >= 0 else 0 repo_name : str = repo_path[start_barra:len(repo_path)] # extraemos el nombre #--- Update try: subprocess.run(["git", "-C", repo_path, "checkout", rama], check=True, timeout=30) subprocess.run(["git", "-C", repo_path, "pull"], check=True, timeout=60) return True except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e: click.echo(f"[UPDATE:{repo_name}] Update failed: {e}", err=True) return False #+------------------------------------------------------------------+ #| | #+------------------------------------------------------------------+ def repo_exists(repo_path: str) -> bool: """Verifica si un repo existe localmente""" return os.path.exists(repo_path) and os.path.exists(os.path.join(repo_path, ".git")) #+------------------------------------------------------------------+ #| | #+------------------------------------------------------------------+ # repo_path = path al repo como tal # root: path al root como tal (ejemplo folder share proyects) # Nota a este punto dicho repo ya deberia de existir... (por eso en install lo clonamso de base) def process_dependencies(repo_path : str, root: str, visited: set, on_pre_process: callable, on_post_process : callable) -> bool: """Procesa dependencias recursivamente""" #--- Obtener el JSON # Obtenemos el path del dependencies.json deps_json_path = os.path.join(repo_path, "dependencies.json") # Check exist if not os.path.exists(deps_json_path): repo_basename = os.path.basename(repo_path) click.echo(f"[INSTALL:{repo_basename}] No dependencies.json found") return True # Cargar JSON data : dict = load_json(deps_json_path) if data is None: return False # Validar el json is_valid, errors = validate_json(data) if not is_valid: repo_basename = os.path.basename(repo_path) click.echo(f"[INSTALL:{repo_basename}] Invalid JSON:", err=True) for error in errors: click.echo(f"[INSTALL:{repo_basename}] - {error}", err=True) return False #--- Hook if on_pre_process: on_pre_process(data["hooks"], "pre_install") #--- Iterar sobre todos los repos repos = data.get("repos", []) repo_basename = os.path.basename(repo_path) click.echo(f"[INSTALL:{repo_basename}] Found {len(repos)} repositories") all_success : bool = True # Por defecto true (se acepta que no haya repos) for repo in repos: name : str = repo["name"] url : str = repo["url"] rama : str = repo.get("rama", "main") #--- Evitar duplicados if name in visited: click.echo(f"[INSTALL:{name}] Already processed, skipping") continue visited.add(name) #--- Clone o update repo_target_path : str = os.path.join(root, name) if repo_exists(repo_target_path): click.echo(f"[INSTALL:{name}] Updating ({rama})") success = update_repo(repo_target_path, rama) else: click.echo(f"[INSTALL:{name}] Cloning from {url}") success = clone_repo(url, repo_target_path, rama) #--- Check resultaado if not success: click.echo(f"[INSTALL:{name}] Failed", err=True) all_success = False continue #--- Recursión click.echo(f"[INSTALL:{name}] Processing sub-dependencies...") # Siguiente if not process_dependencies(repo_target_path, root, visited, on_pre_process): all_success = False #--- Ejecutamos el hook 2 # Solo si no es None if on_post_process and data.get("hooks", {}).get("post_install_only_on_success", False): on_post_process(data["hooks"], "post_install") return all_success #+------------------------------------------------------------------+ #| | #+------------------------------------------------------------------+ # Actuliza dependecias def update_dependencies(repo_path: str, root: str, visited: set, on_pre_process: callable = None, on_post_process: callable = None) -> bool: """Actualiza dependencias recursivamente (solo pull, no clone)""" #--- Obtener el JSON deps_json_path = os.path.join(repo_path, "dependencies.json") # Check exist if not os.path.exists(deps_json_path): repo_basename = os.path.basename(repo_path) click.echo(f"[UPDATE:{repo_basename}] No dependencies.json found") return True # Cargar JSON data: dict = load_json(deps_json_path) if data is None: return False # Validar el json is_valid, errors = validate_json(data) if not is_valid: repo_basename = os.path.basename(repo_path) click.echo(f"[UPDATE:{repo_basename}] Invalid JSON:", err=True) for error in errors: click.echo(f"[UPDATE:{repo_basename}] - {error}", err=True) return False #--- Hook PRE if on_pre_process: on_pre_process(data.get("hooks", {}), "pre_update") #--- Iterar sobre todos los repos repos = data.get("repos", []) repo_basename = os.path.basename(repo_path) click.echo(f"[UPDATE:{repo_basename}] Found {len(repos)} repositories to update") all_success: bool = True for repo in repos: name: str = repo["name"] rama: str = repo.get("rama", "main") #--- Evitar duplicados if name in visited: click.echo(f"[UPDATE:{name}] Already updated, skipping") continue visited.add(name) #--- Update (solo pull, no clone) repo_target_path: str = os.path.join(root, name) # Verificar que exista if not repo_exists(repo_target_path): click.echo(f"[UPDATE:{name}] Repository does not exist (must run install first)", err=True) all_success = False continue # Pull click.echo(f"[UPDATE:{name}] Updating repository (branch: {rama})") #--- Check resultado if not update_repo(repo_target_path, rama): click.echo(f"[UPDATE:{name}] Error updating repository", err=True) all_success = False continue #--- Recursión click.echo(f"[UPDATE:{name}] Updating sub-dependencies...") if not update_dependencies(repo_target_path, root, visited, on_pre_process, on_post_process): all_success = False #--- Hook POST if on_post_process and data.get("hooks", {}).get("post_update_only_on_success", False): if all_success: on_post_process(data.get("hooks", {}), "post_update") return all_success #+------------------------------------------------------------------+ #| | #+------------------------------------------------------------------+ # Imprime la rama de dpendeicas de un repo en base a su path.. y datos out\in para la recuidivdida.. def imprimir_dependencias_repo( path: str, prev_str: str, current_depth: int, max_depth: int ): """ Imprime arbol de dependencias recursivamente Args: path: ruta del repo prev_str: string de prefijo visual current_depth: profundidad actual max_depth: profundidad máxima """ #--- Verificar límite de profundidad if current_depth >= max_depth: return #--- Cargar JSON json_path : str = os.path.join(path, "dependencies.json") if not os.path.exists(json_path): click.echo(f"{prev_str}No dependencies.json found") return data: dict = load_json(json_path) if not data: click.echo(f"{prev_str}Failed to load JSON") return if "repos" not in data: click.echo(f"{prev_str}JSON without valid structure") return repos : list[dict] = data.get("repos", []) repos_size : int = len(repos) if repos_size < 1: return #--- Iterar sobre repos for index, repo in enumerate(repos): name: str = repo["name"] # Decidir símbolo if (index == repos_size - 1): current_char: str = "└── " next_prev_str: str = prev_str + " " else: current_char: str = "├── " next_prev_str: str = prev_str + "| " click.echo(f"{prev_str}{current_char}{name}/") # Recursión con nueva profundidad repo_path: str = os.path.join(os.getcwd(), name) if os.path.exists(repo_path): imprimir_dependencias_repo( repo_path, next_prev_str, current_depth=current_depth + 1, # Incrementar profundidad max_depth=max_depth ) #+------------------------------------------------------------------+ #| | #+------------------------------------------------------------------+ def check_repo(path: str): """Verify that all repositories in dependencies.json are cloned""" repo_name: str = os.path.basename(path) click.echo(f"[CHECK:{repo_name}] Starting verification") #--- Cargar JSON json_path: str = os.path.join(path, "dependencies.json") if not os.path.exists(json_path): click.echo(f"[CHECK:{repo_name}] No dependencies.json found") return data: dict = load_json(json_path) if not data: click.echo(f"[CHECK:{repo_name}] Failed to load JSON") return # validate_json retorna (bool, list) is_valid, errors = validate_json(data) if not is_valid: click.echo(f"[CHECK:{repo_name}] Invalid JSON:", err=True) for error in errors: click.echo(f"[CHECK:{repo_name}] - {error}", err=True) return repos: list[dict] = data.get("repos", []) repos_size: int = len(repos) if repos_size < 1: click.echo(f"[CHECK:{repo_name}] No dependencies found") return click.echo(f"[CHECK:{repo_name}] Verifying {repos_size} repositories...") #--- Iterar sobre repos ok_count: int = 0 fail_count: int = 0 for repo in repos: name: str = repo["name"] repo_path: str = os.path.join(os.getcwd(), name) # Verificar que exista if repo_exists(repo_path): click.echo(f"[CHECK:{repo_name}] [OK] {name}") ok_count += 1 else: click.echo(f"[CHECK:{repo_name}] [FAIL] {name} (not cloned)") fail_count += 1 #--- Resumen de las depdneicas click.echo(f"[CHECK:{repo_name}] Result: {ok_count} OK, {fail_count} FAIL") click.echo(f"[CHECK:{repo_name}] Verification completed")