algoforge-mcp-server/handlers/contents.py

55 lines
1.8 KiB
Python
Raw Permalink Normal View History

from forgeclient import request, encode_content, decode_content
def get_file(owner, repo, path, ref=None):
"""Read a file's text content from a repository."""
params = {"ref": ref} if ref else None
data = request("GET", f"/repos/{owner}/{repo}/contents/{path}", params=params)
if "error" in data:
return data
if data.get("type") != "file":
return {"error": f"{path} is not a file (type: {data.get('type')})"}
return {
"path": data.get("path"),
"sha": data.get("sha"),
"size": data.get("size"),
"content": decode_content(data.get("content", "")),
}
def commit_file(owner, repo, path, content, message, branch=None, sha=None):
"""Create a new file or update an existing one with a single commit.
To update an existing file, the file's current sha must be supplied — the
handler fetches it automatically when not provided.
"""
body = {
"message": message,
"content": encode_content(content),
}
if branch:
body["branch"] = branch
# Decide whether this is a create (POST) or an update (PUT).
if sha is None:
existing = request(
"GET", f"/repos/{owner}/{repo}/contents/{path}",
params={"ref": branch} if branch else None,
)
if isinstance(existing, dict) and existing.get("sha"):
sha = existing["sha"]
if sha:
body["sha"] = sha
result = request("PUT", f"/repos/{owner}/{repo}/contents/{path}", json=body)
else:
result = request("POST", f"/repos/{owner}/{repo}/contents/{path}", json=body)
if "error" in result:
return result
commit = result.get("commit", {})
return {
"path": result.get("content", {}).get("path"),
"commit_sha": commit.get("sha"),
"html_url": commit.get("html_url"),
}