TSNDep/tsndep/Ops/JsonFIle.py
Nique_372 c0c2855891 # Actulizacion importante en el gestor tsndep
- Correcion de export
- Correcion de update\install
- Se agrego las variables ENV con sintaxis "${{}}"
2026-04-27 07:30:55 -05:00

183 lines
No EOL
6.3 KiB
Python

#+------------------------------------------------------------------+
#| Imports |
#+------------------------------------------------------------------+
from .Def import *
import json
#+------------------------------------------------------------------+
#| Load |
#+------------------------------------------------------------------+
def load_json(json_path: str) -> dict | None:
"""Carga un JSON"""
try:
with open(json_path, 'r') as f:
return json.load(f)
except FileNotFoundError:
click.echo(f"No encontrado: {json_path}", err=True)
return None
except json.JSONDecodeError as e:
click.echo(f"JSON invalido: {e}", err=True)
return None
#+------------------------------------------------------------------+
#| Save |
#+------------------------------------------------------------------+
def save_json(json_path: str, data: dict) -> bool:
"""Guarda un JSON"""
try:
with open(json_path, 'w') as f:
json.dump(data, f, indent=4)
return True
except Exception as e:
click.echo(f"Error guardando JSON: {e}", err=True)
return False
#+------------------------------------------------------------------+
#| Validate |
#+------------------------------------------------------------------+
def validate_json(data: dict) -> tuple[bool, list[str]]:
"""Valida estructura de dependencies.json"""
errors = []
# Validar repos
if "repos" not in data or not isinstance(data["repos"], list):
errors.append("'repos' debe ser un array")
else:
for repo in data["repos"]:
if not isinstance(repo.get("name"), str) or not repo.get("name"):
errors.append("Repo sin 'name' válido")
if not isinstance(repo.get("url"), str) or not repo.get("url"):
errors.append(f"Repo sin 'url' válida")
if not isinstance(repo.get("rama"), str) or not repo.get("rama"):
errors.append(f"Repo sin 'rama' válida")
if not isinstance(repo.get("comment"), str):
errors.append(f"Repo sin 'comment' válido")
# Validar other_languages (opcional)
if "other_languages" in data:
if not isinstance(data["other_languages"], list):
errors.append("'other_languages' debe ser un array")
else:
for lang in data["other_languages"]:
if not isinstance(lang.get("name"), str) or not lang.get("name"):
errors.append("Lenguaje sin 'name' válido")
if "version" in lang and not isinstance(lang["version"], str):
errors.append(f"Lenguaje '{lang.get('name')}' con 'version' inválida")
# Validar hooks (opcional)
if "hooks" in data:
if not isinstance(data["hooks"], dict):
errors.append("'hooks' debe ser un objeto")
else:
# Validar que cada hook sea un array si existe
hook_arrays = ["pre_install", "post_install", "pre_update", "post_update"]
for hook_name in hook_arrays:
if hook_name in data["hooks"]:
if not isinstance(data["hooks"][hook_name], list):
errors.append(f"'{hook_name}' debe ser un array")
# Validacion interna la omitimos por ahora...
# Validar booleanos de condición si existen
hook_conditions = ["post_install_only_on_success", "post_update_only_on_success"]
for condition in hook_conditions:
if condition in data["hooks"]:
if not isinstance(data["hooks"][condition], bool):
errors.append(f"'{condition}' debe ser boolean")
return (len(errors) == 0, errors)
#+------------------------------------------------------------------+
#| Export funcionts |
#+------------------------------------------------------------------+
def export_dependencies(repo_path: str, root: str) -> dict:
"""
Exporta todas las dependencias de forma recursiva
Retorna dict con estructura plana
"""
#--- Inicializar estructura
exported_data: dict = {
"_metadata": {
"exported_from": os.path.basename(repo_path),
"exported_date": __import__('datetime').datetime.now().isoformat(),
"total_deps": 0
},
"repos": []
}
#--- Procesar dependencias
visited : set = set()
export_recursive(
repo_path=repo_path,
root=root,
visited=visited,
exported_data=exported_data
)
# Actualizar total_deps
exported_data["_metadata"]["total_deps"] = len(exported_data["repos"])
return exported_data
def export_recursive(
repo_path: str,
root: str,
visited: set,
exported_data: dict
):
"""Procesa recursivamente y agrega repos al export dict"""
#--- Cargar JSON del repo actual
json_path : str = os.path.join(repo_path, "dependencies.json")
#print(json_path)
# Check existe
if not os.path.exists(json_path):
return
# Cargamos
data : dict = load_json(json_path)
if not data:
return
# Obtenemos
repos : list[dict] = data.get("repos", [])
#--- Iterar sobre repos
for repo in repos:
name : str = repo["name"]
# Evitar duplicados
if name in visited:
continue
visited.add(name)
# Obtener nombres de deps de este repo
repo_path_current : str = os.path.join(root, name)
# Crear entrada para export
export_repo: dict = {
"name": name,
"url": repo["url"],
"rama": repo.get("rama", "main"),
"comment": repo.get("comment", "")
}
# Agregar al export
exported_data["repos"].append(export_repo)
# Recursión
if os.path.exists(repo_path_current):
export_recursive(
repo_path=repo_path_current,
root=root,
visited=visited,
exported_data=exported_data
)