From 323b1d390ba0dea485c3c7263b27b6ce2cb5372c Mon Sep 17 00:00:00 2001 From: Agent Date: Wed, 1 Apr 2026 19:00:12 +0000 Subject: [PATCH] fix: feat: Forgejo API mock server for CI smoke tests (#123) --- tests/mock-forgejo.py | 631 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 631 insertions(+) create mode 100755 tests/mock-forgejo.py diff --git a/tests/mock-forgejo.py b/tests/mock-forgejo.py new file mode 100755 index 0000000..456eabc --- /dev/null +++ b/tests/mock-forgejo.py @@ -0,0 +1,631 @@ +#!/usr/bin/env python3 +"""Mock Forgejo API server for CI smoke tests. + +Implements 15 Forgejo API endpoints that disinto init calls. +State stored in-memory (dicts), responds instantly. +""" + +import base64 +import hashlib +import json +import os +import re +import signal +import sys +import uuid +from http.server import HTTPServer, BaseHTTPRequestHandler +from socketserver import ThreadingMixIn +from urllib.parse import parse_qs, urlparse + +# Global state +state = { + "users": {}, # key: username -> user object + "tokens": {}, # key: token_sha1 -> token object + "repos": {}, # key: "owner/repo" -> repo object + "orgs": {}, # key: orgname -> org object + "labels": {}, # key: "owner/repo" -> list of labels + "collaborators": {}, # key: "owner/repo" -> set of usernames + "protections": {}, # key: "owner/repo" -> list of protections + "oauth2_apps": [], # list of oauth2 app objects +} + +next_ids = {"users": 1, "tokens": 1, "repos": 1, "orgs": 1, "labels": 1, "oauth2_apps": 1} + +SHUTDOWN_REQUESTED = False + + +def log_request(handler, method, path, status): + """Log request details.""" + print(f"[{handler.log_date_time_string()}] {method} {path} {status}", file=sys.stderr) + + +def json_response(handler, status, data): + """Send JSON response.""" + body = json.dumps(data).encode("utf-8") + handler.send_response(status) + handler.send_header("Content-Type", "application/json") + handler.send_header("Content-Length", len(body)) + handler.end_headers() + handler.wfile.write(body) + + +def basic_auth_user(handler): + """Extract username from Basic auth header. Returns None if invalid.""" + auth_header = handler.headers.get("Authorization", "") + if not auth_header.startswith("Basic "): + return None + try: + decoded = base64.b64decode(auth_header[6:]).decode("utf-8") + username, _ = decoded.split(":", 1) + return username + except Exception: + return None + + +def token_auth_valid(handler): + """Check if Authorization header contains token. Doesn't validate value.""" + auth_header = handler.headers.get("Authorization", "") + return auth_header.startswith("token ") + + +def require_token(handler): + """Require token auth. Return user or None if invalid.""" + if not token_auth_valid(handler): + return None + return True # Any token is valid for mock purposes + + +def require_basic_auth(handler, required_user=None): + """Require basic auth. Return username or None if invalid.""" + username = basic_auth_user(handler) + if username is None: + return None + # Check user exists in state + if username not in state["users"]: + return None + if required_user and username != required_user: + return None + return username + + +class ForgejoHandler(BaseHTTPRequestHandler): + """HTTP request handler for mock Forgejo API.""" + + def log_message(self, format, *args): + """Override to use our logging.""" + pass # We log in do_request + + def do_request(self, method): + """Route request to appropriate handler.""" + parsed = urlparse(self.path) + path = parsed.path + query = parse_qs(parsed.query) + + log_request(self, method, self.path, "PENDING") + + # Strip /api/v1/ prefix for routing + route_path = path + if route_path.startswith("/api/v1/"): + route_path = route_path[8:] + + # Route to handler + try: + # First try exact match (with / replaced by _) + handler_path = route_path.replace("/", "_") + handler_name = f"handle_{method}_{handler_path}" + handler = getattr(self, handler_name, None) + + if handler: + handler(query) + else: + # Try pattern matching for routes with dynamic segments + self._handle_patterned_route(method, route_path, query) + except Exception as e: + log_request(self, method, self.path, 500) + json_response(self, 500, {"message": str(e)}) + + def _handle_patterned_route(self, method, route_path, query): + """Handle routes with dynamic segments using pattern matching.""" + # Define patterns: (regex, handler_name) + patterns = [ + # Users patterns + (r"^users/([^/]+)$", f"handle_{method}_users_username"), + (r"^users/([^/]+)/tokens$", f"handle_{method}_users_username_tokens"), + # Repos patterns + (r"^repos/([^/]+)/([^/]+)$", f"handle_{method}_repos_owner_repo"), + (r"^repos/([^/]+)/([^/]+)/labels$", f"handle_{method}_repos_owner_repo_labels"), + (r"^repos/([^/]+)/([^/]+)/branch_protections$", f"handle_{method}_repos_owner_repo_branch_protections"), + (r"^repos/([^/]+)/([^/]+)/collaborators/([^/]+)$", f"handle_{method}_repos_owner_repo_collaborators_collaborator"), + # Org patterns + (r"^orgs/([^/]+)/repos$", f"handle_{method}_orgs_org_repos"), + # User patterns + (r"^user/repos$", f"handle_{method}_user_repos"), + (r"^user/applications/oauth2$", f"handle_{method}_user_applications_oauth2"), + # Admin patterns + (r"^admin/users$", f"handle_{method}_admin_users"), + (r"^admin/users/([^/]+)$", f"handle_{method}_admin_users_username"), + # Org patterns + (r"^orgs$", f"handle_{method}_orgs"), + # OAuth2 patterns + (r"^user/applications/oauth2$", f"handle_{method}_user_applications_oauth2"), + ] + + for pattern, handler_name in patterns: + if re.match(pattern, route_path): + handler = getattr(self, handler_name, None) + if handler: + handler(query) + return + + self.handle_404() + + def do_GET(self): + self.do_request("GET") + + def do_POST(self): + self.do_request("POST") + + def do_PATCH(self): + self.do_request("PATCH") + + def do_PUT(self): + self.do_request("PUT") + + def handle_GET_version(self, query): + """GET /api/v1/version""" + json_response(self, 200, {"version": "11.0.0-mock"}) + + def handle_GET_users_username(self, query): + """GET /api/v1/users/{username}""" + # Extract username from path + parts = self.path.split("/") + if len(parts) >= 5: + username = parts[4] + else: + json_response(self, 404, {"message": "user does not exist"}) + return + + if username in state["users"]: + json_response(self, 200, state["users"][username]) + else: + json_response(self, 404, {"message": "user does not exist"}) + + def handle_GET_repos_owner_repo(self, query): + """GET /api/v1/repos/{owner}/{repo}""" + parts = self.path.split("/") + if len(parts) >= 6: + owner = parts[4] + repo = parts[5] + else: + json_response(self, 404, {"message": "repository not found"}) + return + + key = f"{owner}/{repo}" + if key in state["repos"]: + json_response(self, 200, state["repos"][key]) + else: + json_response(self, 404, {"message": "repository not found"}) + + def handle_GET_repos_owner_repo_labels(self, query): + """GET /api/v1/repos/{owner}/{repo}/labels""" + parts = self.path.split("/") + if len(parts) >= 6: + owner = parts[4] + repo = parts[5] + else: + json_response(self, 404, {"message": "repository not found"}) + return + + require_token(self) + + key = f"{owner}/{repo}" + if key in state["labels"]: + json_response(self, 200, state["labels"][key]) + else: + json_response(self, 200, []) + + def handle_GET_user_applications_oauth2(self, query): + """GET /api/v1/user/applications/oauth2""" + require_token(self) + json_response(self, 200, state["oauth2_apps"]) + + def handle_GET_mock_shutdown(self, query): + """GET /mock/shutdown""" + global SHUTDOWN_REQUESTED + SHUTDOWN_REQUESTED = True + json_response(self, 200, {"status": "shutdown"}) + + def handle_POST_admin_users(self, query): + """POST /api/v1/admin/users""" + require_token(self) + + content_length = int(self.headers.get("Content-Length", 0)) + body = self.rfile.read(content_length).decode("utf-8") + data = json.loads(body) if body else {} + + username = data.get("username") + email = data.get("email") + + if not username or not email: + json_response(self, 400, {"message": "username and email are required"}) + return + + user_id = next_ids["users"] + next_ids["users"] += 1 + + user = { + "id": user_id, + "login": username, + "email": email, + "full_name": data.get("full_name", ""), + "is_admin": data.get("admin", False), + "must_change_password": data.get("must_change_password", False), + "login_name": data.get("login_name", username), + "visibility": data.get("visibility", "public"), + "avatar_url": f"https://seccdn.libravatar.org/avatar/{hashlib.md5(email.encode()).hexdigest()}", + } + + state["users"][username] = user + json_response(self, 201, user) + + def handle_POST_users_username_tokens(self, query): + """POST /api/v1/users/{username}/tokens""" + username = require_basic_auth(self) + if not username: + json_response(self, 401, {"message": "invalid authentication"}) + return + + content_length = int(self.headers.get("Content-Length", 0)) + body = self.rfile.read(content_length).decode("utf-8") + data = json.loads(body) if body else {} + + token_name = data.get("name") + if not token_name: + json_response(self, 400, {"message": "name is required"}) + return + + token_id = next_ids["tokens"] + next_ids["tokens"] += 1 + + # Deterministic token: sha256(username + name)[:40] + token_str = hashlib.sha256(f"{username}{token_name}".encode()).hexdigest()[:40] + + token = { + "id": token_id, + "name": token_name, + "sha1": token_str, + "scopes": data.get("scopes", ["all"]), + "created_at": "2026-04-01T00:00:00Z", + "expires_at": None, + } + + state["tokens"][token_str] = token + json_response(self, 201, token) + + def handle_POST_orgs(self, query): + """POST /api/v1/orgs""" + require_token(self) + + content_length = int(self.headers.get("Content-Length", 0)) + body = self.rfile.read(content_length).decode("utf-8") + data = json.loads(body) if body else {} + + username = data.get("username") + if not username: + json_response(self, 400, {"message": "username is required"}) + return + + org_id = next_ids["orgs"] + next_ids["orgs"] += 1 + + org = { + "id": org_id, + "username": username, + "full_name": username, + "avatar_url": f"https://seccdn.libravatar.org/avatar/{hashlib.md5(username.encode()).hexdigest()}", + "visibility": data.get("visibility", "public"), + } + + state["orgs"][username] = org + json_response(self, 201, org) + + def handle_POST_orgs_org_repos(self, query): + """POST /api/v1/orgs/{org}/repos""" + require_token(self) + + parts = self.path.split("/") + if len(parts) >= 6: + org = parts[4] + else: + json_response(self, 404, {"message": "organization not found"}) + return + + content_length = int(self.headers.get("Content-Length", 0)) + body = self.rfile.read(content_length).decode("utf-8") + data = json.loads(body) if body else {} + + repo_name = data.get("name") + if not repo_name: + json_response(self, 400, {"message": "name is required"}) + return + + repo_id = next_ids["repos"] + next_ids["repos"] += 1 + + key = f"{org}/{repo_name}" + repo = { + "id": repo_id, + "full_name": key, + "name": repo_name, + "owner": {"id": state["orgs"][org]["id"], "login": org}, + "empty": False, + "default_branch": data.get("default_branch", "main"), + "description": data.get("description", ""), + "private": data.get("private", False), + "html_url": f"https://example.com/{key}", + "ssh_url": f"git@example.com:{key}.git", + "clone_url": f"https://example.com/{key}.git", + "created_at": "2026-04-01T00:00:00Z", + } + + state["repos"][key] = repo + json_response(self, 201, repo) + + def handle_POST_user_repos(self, query): + """POST /api/v1/user/repos""" + require_token(self) + + content_length = int(self.headers.get("Content-Length", 0)) + body = self.rfile.read(content_length).decode("utf-8") + data = json.loads(body) if body else {} + + repo_name = data.get("name") + if not repo_name: + json_response(self, 400, {"message": "name is required"}) + return + + # Get authenticated user from token + auth_header = self.headers.get("Authorization", "") + token = auth_header.split(" ", 1)[1] if " " in auth_header else "" + + # Find user by token + owner = None + for uname, tok in state["tokens"].items(): + if tok.get("sha1") == token: + owner = uname + break + + if not owner: + json_response(self, 401, {"message": "invalid token"}) + return + + repo_id = next_ids["repos"] + next_ids["repos"] += 1 + + key = f"{owner}/{repo_name}" + repo = { + "id": repo_id, + "full_name": key, + "name": repo_name, + "owner": {"id": state["users"].get(owner, {}).get("id", 0), "login": owner}, + "empty": False, + "default_branch": data.get("default_branch", "main"), + "description": data.get("description", ""), + "private": data.get("private", False), + "html_url": f"https://example.com/{key}", + "ssh_url": f"git@example.com:{key}.git", + "clone_url": f"https://example.com/{key}.git", + "created_at": "2026-04-01T00:00:00Z", + } + + state["repos"][key] = repo + json_response(self, 201, repo) + + def handle_POST_repos_owner_repo_labels(self, query): + """POST /api/v1/repos/{owner}/{repo}/labels""" + require_token(self) + + parts = self.path.split("/") + if len(parts) >= 6: + owner = parts[4] + repo = parts[5] + else: + json_response(self, 404, {"message": "repository not found"}) + return + + content_length = int(self.headers.get("Content-Length", 0)) + body = self.rfile.read(content_length).decode("utf-8") + data = json.loads(body) if body else {} + + label_name = data.get("name") + label_color = data.get("color") + + if not label_name or not label_color: + json_response(self, 400, {"message": "name and color are required"}) + return + + label_id = next_ids["labels"] + next_ids["labels"] += 1 + + key = f"{owner}/{repo}" + label = { + "id": label_id, + "name": label_name, + "color": label_color, + "description": data.get("description", ""), + "url": f"https://example.com/api/v1/repos/{key}/labels/{label_id}", + } + + if key not in state["labels"]: + state["labels"][key] = [] + state["labels"][key].append(label) + json_response(self, 201, label) + + def handle_POST_repos_owner_repo_branch_protections(self, query): + """POST /api/v1/repos/{owner}/{repo}/branch_protections""" + require_token(self) + + parts = self.path.split("/") + if len(parts) >= 6: + owner = parts[4] + repo = parts[5] + else: + json_response(self, 404, {"message": "repository not found"}) + return + + content_length = int(self.headers.get("Content-Length", 0)) + body = self.rfile.read(content_length).decode("utf-8") + data = json.loads(body) if body else {} + + branch_name = data.get("branch_name", "main") + key = f"{owner}/{repo}" + + # Generate unique ID for protection + if key in state["protections"]: + protection_id = len(state["protections"][key]) + 1 + else: + protection_id = 1 + + protection = { + "id": protection_id, + "repo_id": state["repos"].get(key, {}).get("id", 0), + "branch_name": branch_name, + "rule_name": data.get("rule_name", branch_name), + "enable_push": data.get("enable_push", False), + "enable_merge_whitelist": data.get("enable_merge_whitelist", True), + "merge_whitelist_usernames": data.get("merge_whitelist_usernames", ["admin"]), + "required_approvals": data.get("required_approvals", 1), + "apply_to_admins": data.get("apply_to_admins", True), + } + + if key not in state["protections"]: + state["protections"][key] = [] + state["protections"][key].append(protection) + json_response(self, 201, protection) + + def handle_POST_user_applications_oauth2(self, query): + """POST /api/v1/user/applications/oauth2""" + require_token(self) + + content_length = int(self.headers.get("Content-Length", 0)) + body = self.rfile.read(content_length).decode("utf-8") + data = json.loads(body) if body else {} + + app_name = data.get("name") + if not app_name: + json_response(self, 400, {"message": "name is required"}) + return + + app_id = next_ids["oauth2_apps"] + next_ids["oauth2_apps"] += 1 + + app = { + "id": app_id, + "name": app_name, + "client_id": str(uuid.uuid4()), + "client_secret": hashlib.sha256(str(uuid.uuid4()).encode()).hexdigest(), + "redirect_uris": data.get("redirect_uris", []), + "confidential_client": data.get("confidential_client", True), + "created_at": "2026-04-01T00:00:00Z", + } + + state["oauth2_apps"].append(app) + json_response(self, 201, app) + + def handle_PATCH_admin_users_username(self, query): + """PATCH /api/v1/admin/users/{username}""" + if not require_token(self): + json_response(self, 401, {"message": "invalid authentication"}) + return + + parts = self.path.split("/") + if len(parts) >= 6: + username = parts[5] + else: + json_response(self, 404, {"message": "user does not exist"}) + return + + if username not in state["users"]: + json_response(self, 404, {"message": "user does not exist"}) + return + + content_length = int(self.headers.get("Content-Length", 0)) + body = self.rfile.read(content_length).decode("utf-8") + data = json.loads(body) if body else {} + + user = state["users"][username] + for key, value in data.items(): + # Map 'admin' to 'is_admin' for consistency + update_key = 'is_admin' if key == 'admin' else key + if update_key in user: + user[update_key] = value + + json_response(self, 200, user) + + def handle_PUT_repos_owner_repo_collaborators_collaborator(self, query): + """PUT /api/v1/repos/{owner}/{repo}/collaborators/{collaborator}""" + require_token(self) + + parts = self.path.split("/") + if len(parts) >= 7: + owner = parts[4] + repo = parts[5] + collaborator = parts[6] + else: + json_response(self, 404, {"message": "repository not found"}) + return + + content_length = int(self.headers.get("Content-Length", 0)) + body = self.rfile.read(content_length).decode("utf-8") + data = json.loads(body) if body else {} + + key = f"{owner}/{repo}" + if key not in state["collaborators"]: + state["collaborators"][key] = set() + state["collaborators"][key].add(collaborator) + + self.send_response(204) + self.send_header("Content-Length", 0) + self.end_headers() + + def handle_404(self): + """Return 404 for unknown routes.""" + json_response(self, 404, {"message": "route not found"}) + + +class ThreadingHTTPServer(ThreadingMixIn, HTTPServer): + """Threaded HTTP server for handling concurrent requests.""" + daemon_threads = True + + +def main(): + """Start the mock server.""" + global SHUTDOWN_REQUESTED + + port = int(os.environ.get("MOCK_FORGE_PORT", 3000)) + server = ThreadingHTTPServer(("0.0.0.0", port), ForgejoHandler) + try: + server.socket.setsockopt(2, 4, 1) # SO_REUSEADDR + except OSError: + pass # Not all platforms support this + + print(f"Mock Forgejo server starting on port {port}", file=sys.stderr) + + def shutdown_handler(signum, frame): + global SHUTDOWN_REQUESTED + SHUTDOWN_REQUESTED = True + + signal.signal(signal.SIGTERM, shutdown_handler) + signal.signal(signal.SIGINT, shutdown_handler) + + try: + server.serve_forever() + except KeyboardInterrupt: + pass + finally: + server.shutdown() + print("Mock Forgejo server stopped", file=sys.stderr) + + +if __name__ == "__main__": + main()