TSNDep/Src/Ops/JsonFIle.py
Nique_372 54bd302736
2026-04-25 06:51:23 -05:00

182 lines
No EOL
6.3 KiB
Python

#+------------------------------------------------------------------+
#| Imports |
#+------------------------------------------------------------------+
from Src.Ops.Def import *
import json
#+------------------------------------------------------------------+
#| Load |
#+------------------------------------------------------------------+
def load_json(json_path: str) -> dict | None:
"""Carga un JSON"""
try:
with open(json_path, 'r') as f:
return json.load(f)
except FileNotFoundError:
click.echo(f"No encontrado: {json_path}", err=True)
return None
except json.JSONDecodeError as e:
click.echo(f"JSON invalido: {e}", err=True)
return None
#+------------------------------------------------------------------+
#| Save |
#+------------------------------------------------------------------+
def save_json(json_path: str, data: dict) -> bool:
"""Guarda un JSON"""
try:
with open(json_path, 'w') as f:
json.dump(data, f, indent=4)
return True
except Exception as e:
click.echo(f"Error guardando JSON: {e}", err=True)
return False
#+------------------------------------------------------------------+
#| Validate |
#+------------------------------------------------------------------+
def validate_json(data: dict) -> tuple[bool, list[str]]:
"""Valida estructura de dependencies.json"""
errors = []
# Validar repos
if "repos" not in data or not isinstance(data["repos"], list):
errors.append("'repos' debe ser un array")
else:
for repo in data["repos"]:
if not isinstance(repo.get("name"), str) or not repo.get("name"):
errors.append("Repo sin 'name' válido")
if not isinstance(repo.get("url"), str) or not repo.get("url"):
errors.append(f"Repo sin 'url' válida")
if not isinstance(repo.get("rama"), str) or not repo.get("rama"):
errors.append(f"Repo sin 'rama' válida")
if not isinstance(repo.get("comment"), str):
errors.append(f"Repo sin 'comment' válido")
# Validar other_languages (opcional)
if "other_languages" in data:
if not isinstance(data["other_languages"], list):
errors.append("'other_languages' debe ser un array")
else:
for lang in data["other_languages"]:
if not isinstance(lang.get("name"), str) or not lang.get("name"):
errors.append("Lenguaje sin 'name' válido")
if "version" in lang and not isinstance(lang["version"], str):
errors.append(f"Lenguaje '{lang.get('name')}' con 'version' inválida")
# Validar hooks (opcional)
if "hooks" in data:
if not isinstance(data["hooks"], dict):
errors.append("'hooks' debe ser un objeto")
else:
for hook_type, commands in data["hooks"].items():
if not isinstance(commands, list):
errors.append(f"Hook '{hook_type}' debe ser un array")
else:
for cmd in commands:
if not isinstance(cmd.get("command"), str) or not cmd.get("command"):
errors.append(f"Hook sin 'command' válido")
if "permitir_fallo" in cmd and not isinstance(cmd["permitir_fallo"], bool):
errors.append(f"'permitir_fallo' debe ser boolean")
if "timeout_ms" in cmd:
if not isinstance(cmd["timeout_ms"], int) or cmd["timeout_ms"] <= 10:
errors.append(f"'timeout_ms' debe ser int > 10")
return (len(errors) == 0, errors)
#+------------------------------------------------------------------+
#| Export funcionts |
#+------------------------------------------------------------------+
def export_dependencies(repo_path: str, root: str) -> dict:
"""
Exporta todas las dependencias de forma recursiva
Retorna dict con estructura plana
"""
#--- Inicializar estructura
exported_data: dict = {
"_metadata": {
"exported_from": os.path.basename(repo_path),
"exported_date": __import__('datetime').datetime.now().isoformat()
},
"repos": [],
"total_deps": 0 #prev
}
#--- Procesar dependencias
visited : set = set()
export_recursive(
repo_path=repo_path,
root=root,
visited=visited,
exported_data=exported_data
)
# Actualizar total_deps
exported_data["total_deps"] = len(exported_data["repos"])
return exported_data
def export_recursive(
repo_path: str,
root: str,
visited: set,
exported_data: dict
):
"""Procesa recursivamente y agrega repos al export dict"""
#--- Cargar JSON del repo actual
json_path : str = os.path.join(repo_path, "dependencies.json")
# Check existe
if not os.path.exists(json_path):
return
# Cargamos
data : dict = load_json(json_path)
if not data:
return
# Obtenemos
repos : list[dict] = data.get("repos", [])
#--- Iterar sobre repos
for repo in repos:
name : str = repo["name"]
# Evitar duplicados
if name in visited:
continue
visited.add(name)
# Obtener nombres de deps de este repo
repo_path_current : str = os.path.join(root, name)
# Crear entrada para export
export_repo: dict = {
"name": name,
"url": repo["url"],
"rama": repo.get("rama", "main"),
"comment": repo.get("comment", "")
}
# Agregar al export
exported_data["repos"].append(export_repo)
# Recursión
if os.path.exists(repo_path_current):
export_dependencies(
repo_path=repo_path_current,
root=root,
visited=visited,
exported_data=exported_data
)