diff --git a/tests/mock-forgejo.py b/tests/mock-forgejo.py deleted file mode 100755 index df05db7..0000000 --- a/tests/mock-forgejo.py +++ /dev/null @@ -1,636 +0,0 @@ -#!/usr/bin/env python3 -"""Mock Forgejo API server for CI smoke tests. - -Implements 15 Forgejo API endpoints that disinto init calls. -State stored in-memory (dicts), responds instantly. -""" - -import base64 -import hashlib -import json -import os -import re -import signal -import socket -import sys -import threading -import uuid -from http.server import HTTPServer, BaseHTTPRequestHandler -from socketserver import ThreadingMixIn -from urllib.parse import parse_qs, urlparse - -# Global state -state = { - "users": {}, # key: username -> user object - "tokens": {}, # key: token_sha1 -> token object - "repos": {}, # key: "owner/repo" -> repo object - "orgs": {}, # key: orgname -> org object - "labels": {}, # key: "owner/repo" -> list of labels - "collaborators": {}, # key: "owner/repo" -> set of usernames - "protections": {}, # key: "owner/repo" -> list of protections - "oauth2_apps": [], # list of oauth2 app objects -} - -next_ids = {"users": 1, "tokens": 1, "repos": 1, "orgs": 1, "labels": 1, "oauth2_apps": 1} - -SHUTDOWN_REQUESTED = False - - -def log_request(handler, method, path, status): - """Log request details.""" - print(f"[{handler.log_date_time_string()}] {method} {path} {status}", file=sys.stderr) - - -def json_response(handler, status, data): - """Send JSON response.""" - body = json.dumps(data).encode("utf-8") - handler.send_response(status) - handler.send_header("Content-Type", "application/json") - handler.send_header("Content-Length", len(body)) - handler.end_headers() - handler.wfile.write(body) - - -def basic_auth_user(handler): - """Extract username from Basic auth header. Returns None if invalid.""" - auth_header = handler.headers.get("Authorization", "") - if not auth_header.startswith("Basic "): - return None - try: - decoded = base64.b64decode(auth_header[6:]).decode("utf-8") - username, _ = decoded.split(":", 1) - return username - except Exception: - return None - - -def token_auth_valid(handler): - """Check if Authorization header contains token. Doesn't validate value.""" - auth_header = handler.headers.get("Authorization", "") - return auth_header.startswith("token ") - - -def require_token(handler): - """Require token auth. Return user or None if invalid.""" - if not token_auth_valid(handler): - return None - return True # Any token is valid for mock purposes - - -def require_basic_auth(handler, required_user=None): - """Require basic auth. Return username or None if invalid.""" - username = basic_auth_user(handler) - if username is None: - return None - # Check user exists in state - if username not in state["users"]: - return None - if required_user and username != required_user: - return None - return username - - -class ForgejoHandler(BaseHTTPRequestHandler): - """HTTP request handler for mock Forgejo API.""" - - def log_message(self, format, *args): - """Override to use our logging.""" - pass # We log in do_request - - def do_request(self, method): - """Route request to appropriate handler.""" - parsed = urlparse(self.path) - path = parsed.path - query = parse_qs(parsed.query) - - log_request(self, method, self.path, "PENDING") - - # Strip /api/v1/ prefix for routing (or leading slash for other routes) - route_path = path - if route_path.startswith("/api/v1/"): - route_path = route_path[8:] - elif route_path.startswith("/"): - route_path = route_path.lstrip("/") - - # Route to handler - try: - # First try exact match (with / replaced by _) - handler_path = route_path.replace("/", "_") - handler_name = f"handle_{method}_{handler_path}" - handler = getattr(self, handler_name, None) - - if handler: - handler(query) - else: - # Try pattern matching for routes with dynamic segments - self._handle_patterned_route(method, route_path, query) - except Exception as e: - log_request(self, method, self.path, 500) - json_response(self, 500, {"message": str(e)}) - - def _handle_patterned_route(self, method, route_path, query): - """Handle routes with dynamic segments using pattern matching.""" - # Define patterns: (regex, handler_name) - patterns = [ - # Users patterns - (r"^users/([^/]+)$", f"handle_{method}_users_username"), - (r"^users/([^/]+)/tokens$", f"handle_{method}_users_username_tokens"), - # Repos patterns - (r"^repos/([^/]+)/([^/]+)$", f"handle_{method}_repos_owner_repo"), - (r"^repos/([^/]+)/([^/]+)/labels$", f"handle_{method}_repos_owner_repo_labels"), - (r"^repos/([^/]+)/([^/]+)/branch_protections$", f"handle_{method}_repos_owner_repo_branch_protections"), - (r"^repos/([^/]+)/([^/]+)/collaborators/([^/]+)$", f"handle_{method}_repos_owner_repo_collaborators_collaborator"), - # Org patterns - (r"^orgs/([^/]+)/repos$", f"handle_{method}_orgs_org_repos"), - # User patterns - (r"^user/repos$", f"handle_{method}_user_repos"), - (r"^user/applications/oauth2$", f"handle_{method}_user_applications_oauth2"), - # Admin patterns - (r"^admin/users$", f"handle_{method}_admin_users"), - (r"^admin/users/([^/]+)$", f"handle_{method}_admin_users_username"), - # Org patterns - (r"^orgs$", f"handle_{method}_orgs"), - ] - - for pattern, handler_name in patterns: - if re.match(pattern, route_path): - handler = getattr(self, handler_name, None) - if handler: - handler(query) - return - - self.handle_404() - - def do_GET(self): - self.do_request("GET") - - def do_POST(self): - self.do_request("POST") - - def do_PATCH(self): - self.do_request("PATCH") - - def do_PUT(self): - self.do_request("PUT") - - def handle_GET_version(self, query): - """GET /api/v1/version""" - json_response(self, 200, {"version": "11.0.0-mock"}) - - def handle_GET_users_username(self, query): - """GET /api/v1/users/{username}""" - # Extract username from path - parts = self.path.split("/") - if len(parts) >= 5: - username = parts[4] - else: - json_response(self, 404, {"message": "user does not exist"}) - return - - if username in state["users"]: - json_response(self, 200, state["users"][username]) - else: - json_response(self, 404, {"message": "user does not exist"}) - - def handle_GET_repos_owner_repo(self, query): - """GET /api/v1/repos/{owner}/{repo}""" - parts = self.path.split("/") - if len(parts) >= 6: - owner = parts[4] - repo = parts[5] - else: - json_response(self, 404, {"message": "repository not found"}) - return - - key = f"{owner}/{repo}" - if key in state["repos"]: - json_response(self, 200, state["repos"][key]) - else: - json_response(self, 404, {"message": "repository not found"}) - - def handle_GET_repos_owner_repo_labels(self, query): - """GET /api/v1/repos/{owner}/{repo}/labels""" - parts = self.path.split("/") - if len(parts) >= 6: - owner = parts[4] - repo = parts[5] - else: - json_response(self, 404, {"message": "repository not found"}) - return - - require_token(self) - - key = f"{owner}/{repo}" - if key in state["labels"]: - json_response(self, 200, state["labels"][key]) - else: - json_response(self, 200, []) - - def handle_GET_user_applications_oauth2(self, query): - """GET /api/v1/user/applications/oauth2""" - require_token(self) - json_response(self, 200, state["oauth2_apps"]) - - def handle_GET_mock_shutdown(self, query): - """GET /mock/shutdown""" - global SHUTDOWN_REQUESTED - SHUTDOWN_REQUESTED = True - json_response(self, 200, {"status": "shutdown"}) - - def handle_POST_admin_users(self, query): - """POST /api/v1/admin/users""" - require_token(self) - - content_length = int(self.headers.get("Content-Length", 0)) - body = self.rfile.read(content_length).decode("utf-8") - data = json.loads(body) if body else {} - - username = data.get("username") - email = data.get("email") - - if not username or not email: - json_response(self, 400, {"message": "username and email are required"}) - return - - user_id = next_ids["users"] - next_ids["users"] += 1 - - user = { - "id": user_id, - "login": username, - "email": email, - "full_name": data.get("full_name", ""), - "is_admin": data.get("admin", False), - "must_change_password": data.get("must_change_password", False), - "login_name": data.get("login_name", username), - "visibility": data.get("visibility", "public"), - "avatar_url": f"https://seccdn.libravatar.org/avatar/{hashlib.md5(email.encode()).hexdigest()}", - } - - state["users"][username] = user - json_response(self, 201, user) - - def handle_POST_users_username_tokens(self, query): - """POST /api/v1/users/{username}/tokens""" - username = require_basic_auth(self) - if not username: - json_response(self, 401, {"message": "invalid authentication"}) - return - - content_length = int(self.headers.get("Content-Length", 0)) - body = self.rfile.read(content_length).decode("utf-8") - data = json.loads(body) if body else {} - - token_name = data.get("name") - if not token_name: - json_response(self, 400, {"message": "name is required"}) - return - - token_id = next_ids["tokens"] - next_ids["tokens"] += 1 - - # Deterministic token: sha256(username + name)[:40] - token_str = hashlib.sha256(f"{username}{token_name}".encode()).hexdigest()[:40] - - token = { - "id": token_id, - "name": token_name, - "sha1": token_str, - "scopes": data.get("scopes", ["all"]), - "created_at": "2026-04-01T00:00:00Z", - "expires_at": None, - "username": username, # Store username for lookup - } - - state["tokens"][token_str] = token - json_response(self, 201, token) - - def handle_POST_orgs(self, query): - """POST /api/v1/orgs""" - require_token(self) - - content_length = int(self.headers.get("Content-Length", 0)) - body = self.rfile.read(content_length).decode("utf-8") - data = json.loads(body) if body else {} - - username = data.get("username") - if not username: - json_response(self, 400, {"message": "username is required"}) - return - - org_id = next_ids["orgs"] - next_ids["orgs"] += 1 - - org = { - "id": org_id, - "username": username, - "full_name": username, - "avatar_url": f"https://seccdn.libravatar.org/avatar/{hashlib.md5(username.encode()).hexdigest()}", - "visibility": data.get("visibility", "public"), - } - - state["orgs"][username] = org - json_response(self, 201, org) - - def handle_POST_orgs_org_repos(self, query): - """POST /api/v1/orgs/{org}/repos""" - require_token(self) - - parts = self.path.split("/") - if len(parts) >= 6: - org = parts[4] - else: - json_response(self, 404, {"message": "organization not found"}) - return - - content_length = int(self.headers.get("Content-Length", 0)) - body = self.rfile.read(content_length).decode("utf-8") - data = json.loads(body) if body else {} - - repo_name = data.get("name") - if not repo_name: - json_response(self, 400, {"message": "name is required"}) - return - - repo_id = next_ids["repos"] - next_ids["repos"] += 1 - - key = f"{org}/{repo_name}" - repo = { - "id": repo_id, - "full_name": key, - "name": repo_name, - "owner": {"id": state["orgs"][org]["id"], "login": org}, - "empty": False, - "default_branch": data.get("default_branch", "main"), - "description": data.get("description", ""), - "private": data.get("private", False), - "html_url": f"https://example.com/{key}", - "ssh_url": f"git@example.com:{key}.git", - "clone_url": f"https://example.com/{key}.git", - "created_at": "2026-04-01T00:00:00Z", - } - - state["repos"][key] = repo - json_response(self, 201, repo) - - def handle_POST_user_repos(self, query): - """POST /api/v1/user/repos""" - require_token(self) - - content_length = int(self.headers.get("Content-Length", 0)) - body = self.rfile.read(content_length).decode("utf-8") - data = json.loads(body) if body else {} - - repo_name = data.get("name") - if not repo_name: - json_response(self, 400, {"message": "name is required"}) - return - - # Get authenticated user from token - auth_header = self.headers.get("Authorization", "") - token = auth_header.split(" ", 1)[1] if " " in auth_header else "" - - # Find user by token (use stored username field) - owner = None - for tok_sha1, tok in state["tokens"].items(): - if tok_sha1 == token: - owner = tok.get("username") - break - - if not owner: - json_response(self, 401, {"message": "invalid token"}) - return - - repo_id = next_ids["repos"] - next_ids["repos"] += 1 - - key = f"{owner}/{repo_name}" - repo = { - "id": repo_id, - "full_name": key, - "name": repo_name, - "owner": {"id": state["users"].get(owner, {}).get("id", 0), "login": owner}, - "empty": False, - "default_branch": data.get("default_branch", "main"), - "description": data.get("description", ""), - "private": data.get("private", False), - "html_url": f"https://example.com/{key}", - "ssh_url": f"git@example.com:{key}.git", - "clone_url": f"https://example.com/{key}.git", - "created_at": "2026-04-01T00:00:00Z", - } - - state["repos"][key] = repo - json_response(self, 201, repo) - - def handle_POST_repos_owner_repo_labels(self, query): - """POST /api/v1/repos/{owner}/{repo}/labels""" - require_token(self) - - parts = self.path.split("/") - if len(parts) >= 6: - owner = parts[4] - repo = parts[5] - else: - json_response(self, 404, {"message": "repository not found"}) - return - - content_length = int(self.headers.get("Content-Length", 0)) - body = self.rfile.read(content_length).decode("utf-8") - data = json.loads(body) if body else {} - - label_name = data.get("name") - label_color = data.get("color") - - if not label_name or not label_color: - json_response(self, 400, {"message": "name and color are required"}) - return - - label_id = next_ids["labels"] - next_ids["labels"] += 1 - - key = f"{owner}/{repo}" - label = { - "id": label_id, - "name": label_name, - "color": label_color, - "description": data.get("description", ""), - "url": f"https://example.com/api/v1/repos/{key}/labels/{label_id}", - } - - if key not in state["labels"]: - state["labels"][key] = [] - state["labels"][key].append(label) - json_response(self, 201, label) - - def handle_POST_repos_owner_repo_branch_protections(self, query): - """POST /api/v1/repos/{owner}/{repo}/branch_protections""" - require_token(self) - - parts = self.path.split("/") - if len(parts) >= 6: - owner = parts[4] - repo = parts[5] - else: - json_response(self, 404, {"message": "repository not found"}) - return - - content_length = int(self.headers.get("Content-Length", 0)) - body = self.rfile.read(content_length).decode("utf-8") - data = json.loads(body) if body else {} - - branch_name = data.get("branch_name", "main") - key = f"{owner}/{repo}" - - # Generate unique ID for protection - if key in state["protections"]: - protection_id = len(state["protections"][key]) + 1 - else: - protection_id = 1 - - protection = { - "id": protection_id, - "repo_id": state["repos"].get(key, {}).get("id", 0), - "branch_name": branch_name, - "rule_name": data.get("rule_name", branch_name), - "enable_push": data.get("enable_push", False), - "enable_merge_whitelist": data.get("enable_merge_whitelist", True), - "merge_whitelist_usernames": data.get("merge_whitelist_usernames", ["admin"]), - "required_approvals": data.get("required_approvals", 1), - "apply_to_admins": data.get("apply_to_admins", True), - } - - if key not in state["protections"]: - state["protections"][key] = [] - state["protections"][key].append(protection) - json_response(self, 201, protection) - - def handle_POST_user_applications_oauth2(self, query): - """POST /api/v1/user/applications/oauth2""" - require_token(self) - - content_length = int(self.headers.get("Content-Length", 0)) - body = self.rfile.read(content_length).decode("utf-8") - data = json.loads(body) if body else {} - - app_name = data.get("name") - if not app_name: - json_response(self, 400, {"message": "name is required"}) - return - - app_id = next_ids["oauth2_apps"] - next_ids["oauth2_apps"] += 1 - - app = { - "id": app_id, - "name": app_name, - "client_id": str(uuid.uuid4()), - "client_secret": hashlib.sha256(str(uuid.uuid4()).encode()).hexdigest(), - "redirect_uris": data.get("redirect_uris", []), - "confidential_client": data.get("confidential_client", True), - "created_at": "2026-04-01T00:00:00Z", - } - - state["oauth2_apps"].append(app) - json_response(self, 201, app) - - def handle_PATCH_admin_users_username(self, query): - """PATCH /api/v1/admin/users/{username}""" - if not require_token(self): - json_response(self, 401, {"message": "invalid authentication"}) - return - - parts = self.path.split("/") - if len(parts) >= 6: - username = parts[5] - else: - json_response(self, 404, {"message": "user does not exist"}) - return - - if username not in state["users"]: - json_response(self, 404, {"message": "user does not exist"}) - return - - content_length = int(self.headers.get("Content-Length", 0)) - body = self.rfile.read(content_length).decode("utf-8") - data = json.loads(body) if body else {} - - user = state["users"][username] - for key, value in data.items(): - # Map 'admin' to 'is_admin' for consistency - update_key = 'is_admin' if key == 'admin' else key - if update_key in user: - user[update_key] = value - - json_response(self, 200, user) - - def handle_PUT_repos_owner_repo_collaborators_collaborator(self, query): - """PUT /api/v1/repos/{owner}/{repo}/collaborators/{collaborator}""" - require_token(self) - - parts = self.path.split("/") - if len(parts) >= 8: - owner = parts[4] - repo = parts[5] - collaborator = parts[7] - else: - json_response(self, 404, {"message": "repository not found"}) - return - - content_length = int(self.headers.get("Content-Length", 0)) - body = self.rfile.read(content_length).decode("utf-8") - data = json.loads(body) if body else {} - - key = f"{owner}/{repo}" - if key not in state["collaborators"]: - state["collaborators"][key] = set() - state["collaborators"][key].add(collaborator) - - self.send_response(204) - self.send_header("Content-Length", 0) - self.end_headers() - - def handle_404(self): - """Return 404 for unknown routes.""" - json_response(self, 404, {"message": "route not found"}) - - -class ThreadingHTTPServer(ThreadingMixIn, HTTPServer): - """Threaded HTTP server for handling concurrent requests.""" - daemon_threads = True - - -def main(): - """Start the mock server.""" - global SHUTDOWN_REQUESTED - - port = int(os.environ.get("MOCK_FORGE_PORT", 3000)) - server = ThreadingHTTPServer(("0.0.0.0", port), ForgejoHandler) - try: - server.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - except OSError: - pass # Not all platforms support this - - print(f"Mock Forgejo server starting on port {port}", file=sys.stderr) - - def shutdown_handler(signum, frame): - global SHUTDOWN_REQUESTED - SHUTDOWN_REQUESTED = True - # Can't call server.shutdown() directly from signal handler in threaded server - threading.Thread(target=server.shutdown, daemon=True).start() - - signal.signal(signal.SIGTERM, shutdown_handler) - signal.signal(signal.SIGINT, shutdown_handler) - - try: - server.serve_forever() - except KeyboardInterrupt: - pass - finally: - server.shutdown() - print("Mock Forgejo server stopped", file=sys.stderr) - - -if __name__ == "__main__": - main()