fix: feat: Forgejo API mock server for CI smoke tests (#123) #125

Merged
dev-qwen merged 2 commits from fix/issue-123 into main 2026-04-01 19:16:35 +00:00

636
tests/mock-forgejo.py Executable file
View file

@ -0,0 +1,636 @@
#!/usr/bin/env python3
"""Mock Forgejo API server for CI smoke tests.
Implements 15 Forgejo API endpoints that disinto init calls.
State stored in-memory (dicts), responds instantly.
"""
import base64
import hashlib
import json
import os
import re
import signal
import socket
import sys
import threading
import uuid
from http.server import HTTPServer, BaseHTTPRequestHandler
from socketserver import ThreadingMixIn
from urllib.parse import parse_qs, urlparse
# Global state
state = {
"users": {}, # key: username -> user object
"tokens": {}, # key: token_sha1 -> token object
"repos": {}, # key: "owner/repo" -> repo object
"orgs": {}, # key: orgname -> org object
"labels": {}, # key: "owner/repo" -> list of labels
"collaborators": {}, # key: "owner/repo" -> set of usernames
"protections": {}, # key: "owner/repo" -> list of protections
"oauth2_apps": [], # list of oauth2 app objects
}
next_ids = {"users": 1, "tokens": 1, "repos": 1, "orgs": 1, "labels": 1, "oauth2_apps": 1}
SHUTDOWN_REQUESTED = False
def log_request(handler, method, path, status):
"""Log request details."""
print(f"[{handler.log_date_time_string()}] {method} {path} {status}", file=sys.stderr)
def json_response(handler, status, data):
"""Send JSON response."""
body = json.dumps(data).encode("utf-8")
handler.send_response(status)
handler.send_header("Content-Type", "application/json")
handler.send_header("Content-Length", len(body))
handler.end_headers()
handler.wfile.write(body)
def basic_auth_user(handler):
"""Extract username from Basic auth header. Returns None if invalid."""
auth_header = handler.headers.get("Authorization", "")
if not auth_header.startswith("Basic "):
return None
try:
decoded = base64.b64decode(auth_header[6:]).decode("utf-8")
username, _ = decoded.split(":", 1)
return username
except Exception:
return None
def token_auth_valid(handler):
"""Check if Authorization header contains token. Doesn't validate value."""
auth_header = handler.headers.get("Authorization", "")
return auth_header.startswith("token ")
def require_token(handler):
"""Require token auth. Return user or None if invalid."""
if not token_auth_valid(handler):
return None
return True # Any token is valid for mock purposes
def require_basic_auth(handler, required_user=None):
"""Require basic auth. Return username or None if invalid."""
username = basic_auth_user(handler)
if username is None:
return None
# Check user exists in state
if username not in state["users"]:
return None
if required_user and username != required_user:
return None
return username
class ForgejoHandler(BaseHTTPRequestHandler):
"""HTTP request handler for mock Forgejo API."""
def log_message(self, format, *args):
"""Override to use our logging."""
pass # We log in do_request
def do_request(self, method):
"""Route request to appropriate handler."""
parsed = urlparse(self.path)
path = parsed.path
query = parse_qs(parsed.query)
log_request(self, method, self.path, "PENDING")
# Strip /api/v1/ prefix for routing (or leading slash for other routes)
route_path = path
if route_path.startswith("/api/v1/"):
route_path = route_path[8:]
elif route_path.startswith("/"):
route_path = route_path.lstrip("/")
# Route to handler
try:
# First try exact match (with / replaced by _)
handler_path = route_path.replace("/", "_")
handler_name = f"handle_{method}_{handler_path}"
handler = getattr(self, handler_name, None)
if handler:
handler(query)
else:
# Try pattern matching for routes with dynamic segments
self._handle_patterned_route(method, route_path, query)
except Exception as e:
log_request(self, method, self.path, 500)
json_response(self, 500, {"message": str(e)})
def _handle_patterned_route(self, method, route_path, query):
"""Handle routes with dynamic segments using pattern matching."""
# Define patterns: (regex, handler_name)
patterns = [
# Users patterns
(r"^users/([^/]+)$", f"handle_{method}_users_username"),
(r"^users/([^/]+)/tokens$", f"handle_{method}_users_username_tokens"),
# Repos patterns
(r"^repos/([^/]+)/([^/]+)$", f"handle_{method}_repos_owner_repo"),
(r"^repos/([^/]+)/([^/]+)/labels$", f"handle_{method}_repos_owner_repo_labels"),
(r"^repos/([^/]+)/([^/]+)/branch_protections$", f"handle_{method}_repos_owner_repo_branch_protections"),
(r"^repos/([^/]+)/([^/]+)/collaborators/([^/]+)$", f"handle_{method}_repos_owner_repo_collaborators_collaborator"),
# Org patterns
(r"^orgs/([^/]+)/repos$", f"handle_{method}_orgs_org_repos"),
# User patterns
(r"^user/repos$", f"handle_{method}_user_repos"),
(r"^user/applications/oauth2$", f"handle_{method}_user_applications_oauth2"),
# Admin patterns
(r"^admin/users$", f"handle_{method}_admin_users"),
(r"^admin/users/([^/]+)$", f"handle_{method}_admin_users_username"),
# Org patterns
(r"^orgs$", f"handle_{method}_orgs"),
]
for pattern, handler_name in patterns:
if re.match(pattern, route_path):
handler = getattr(self, handler_name, None)
if handler:
handler(query)
return
self.handle_404()
def do_GET(self):
self.do_request("GET")
def do_POST(self):
self.do_request("POST")
def do_PATCH(self):
self.do_request("PATCH")
def do_PUT(self):
self.do_request("PUT")
def handle_GET_version(self, query):
"""GET /api/v1/version"""
json_response(self, 200, {"version": "11.0.0-mock"})
def handle_GET_users_username(self, query):
"""GET /api/v1/users/{username}"""
# Extract username from path
parts = self.path.split("/")
if len(parts) >= 5:
username = parts[4]
else:
json_response(self, 404, {"message": "user does not exist"})
return
if username in state["users"]:
json_response(self, 200, state["users"][username])
else:
json_response(self, 404, {"message": "user does not exist"})
def handle_GET_repos_owner_repo(self, query):
"""GET /api/v1/repos/{owner}/{repo}"""
parts = self.path.split("/")
if len(parts) >= 6:
owner = parts[4]
repo = parts[5]
else:
json_response(self, 404, {"message": "repository not found"})
return
key = f"{owner}/{repo}"
if key in state["repos"]:
json_response(self, 200, state["repos"][key])
else:
json_response(self, 404, {"message": "repository not found"})
def handle_GET_repos_owner_repo_labels(self, query):
"""GET /api/v1/repos/{owner}/{repo}/labels"""
parts = self.path.split("/")
if len(parts) >= 6:
owner = parts[4]
repo = parts[5]
else:
json_response(self, 404, {"message": "repository not found"})
return
require_token(self)
key = f"{owner}/{repo}"
if key in state["labels"]:
json_response(self, 200, state["labels"][key])
else:
json_response(self, 200, [])
def handle_GET_user_applications_oauth2(self, query):
"""GET /api/v1/user/applications/oauth2"""
require_token(self)
json_response(self, 200, state["oauth2_apps"])
def handle_GET_mock_shutdown(self, query):
"""GET /mock/shutdown"""
global SHUTDOWN_REQUESTED
SHUTDOWN_REQUESTED = True
json_response(self, 200, {"status": "shutdown"})
def handle_POST_admin_users(self, query):
"""POST /api/v1/admin/users"""
require_token(self)
content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length).decode("utf-8")
data = json.loads(body) if body else {}
username = data.get("username")
email = data.get("email")
if not username or not email:
json_response(self, 400, {"message": "username and email are required"})
return
user_id = next_ids["users"]
next_ids["users"] += 1
user = {
"id": user_id,
"login": username,
"email": email,
"full_name": data.get("full_name", ""),
"is_admin": data.get("admin", False),
"must_change_password": data.get("must_change_password", False),
"login_name": data.get("login_name", username),
"visibility": data.get("visibility", "public"),
"avatar_url": f"https://seccdn.libravatar.org/avatar/{hashlib.md5(email.encode()).hexdigest()}",
}
state["users"][username] = user
json_response(self, 201, user)
def handle_POST_users_username_tokens(self, query):
"""POST /api/v1/users/{username}/tokens"""
username = require_basic_auth(self)
if not username:
json_response(self, 401, {"message": "invalid authentication"})
return
content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length).decode("utf-8")
data = json.loads(body) if body else {}
token_name = data.get("name")
if not token_name:
json_response(self, 400, {"message": "name is required"})
return
token_id = next_ids["tokens"]
next_ids["tokens"] += 1
# Deterministic token: sha256(username + name)[:40]
token_str = hashlib.sha256(f"{username}{token_name}".encode()).hexdigest()[:40]
token = {
"id": token_id,
"name": token_name,
"sha1": token_str,
"scopes": data.get("scopes", ["all"]),
"created_at": "2026-04-01T00:00:00Z",
"expires_at": None,
"username": username, # Store username for lookup
}
state["tokens"][token_str] = token
json_response(self, 201, token)
def handle_POST_orgs(self, query):
"""POST /api/v1/orgs"""
require_token(self)
content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length).decode("utf-8")
data = json.loads(body) if body else {}
username = data.get("username")
if not username:
json_response(self, 400, {"message": "username is required"})
return
org_id = next_ids["orgs"]
next_ids["orgs"] += 1
org = {
"id": org_id,
"username": username,
"full_name": username,
"avatar_url": f"https://seccdn.libravatar.org/avatar/{hashlib.md5(username.encode()).hexdigest()}",
"visibility": data.get("visibility", "public"),
}
state["orgs"][username] = org
json_response(self, 201, org)
def handle_POST_orgs_org_repos(self, query):
"""POST /api/v1/orgs/{org}/repos"""
require_token(self)
parts = self.path.split("/")
if len(parts) >= 6:
org = parts[4]
else:
json_response(self, 404, {"message": "organization not found"})
return
content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length).decode("utf-8")
data = json.loads(body) if body else {}
repo_name = data.get("name")
if not repo_name:
json_response(self, 400, {"message": "name is required"})
return
repo_id = next_ids["repos"]
next_ids["repos"] += 1
key = f"{org}/{repo_name}"
repo = {
"id": repo_id,
"full_name": key,
"name": repo_name,
"owner": {"id": state["orgs"][org]["id"], "login": org},
"empty": False,
"default_branch": data.get("default_branch", "main"),
"description": data.get("description", ""),
"private": data.get("private", False),
"html_url": f"https://example.com/{key}",
"ssh_url": f"git@example.com:{key}.git",
"clone_url": f"https://example.com/{key}.git",
"created_at": "2026-04-01T00:00:00Z",
}
state["repos"][key] = repo
json_response(self, 201, repo)
def handle_POST_user_repos(self, query):
"""POST /api/v1/user/repos"""
require_token(self)
content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length).decode("utf-8")
data = json.loads(body) if body else {}
repo_name = data.get("name")
if not repo_name:
json_response(self, 400, {"message": "name is required"})
return
# Get authenticated user from token
auth_header = self.headers.get("Authorization", "")
token = auth_header.split(" ", 1)[1] if " " in auth_header else ""
# Find user by token (use stored username field)
owner = None
for tok_sha1, tok in state["tokens"].items():
if tok_sha1 == token:
owner = tok.get("username")
break
if not owner:
json_response(self, 401, {"message": "invalid token"})
return
repo_id = next_ids["repos"]
next_ids["repos"] += 1
key = f"{owner}/{repo_name}"
repo = {
"id": repo_id,
"full_name": key,
"name": repo_name,
"owner": {"id": state["users"].get(owner, {}).get("id", 0), "login": owner},
"empty": False,
"default_branch": data.get("default_branch", "main"),
"description": data.get("description", ""),
"private": data.get("private", False),
"html_url": f"https://example.com/{key}",
"ssh_url": f"git@example.com:{key}.git",
"clone_url": f"https://example.com/{key}.git",
"created_at": "2026-04-01T00:00:00Z",
}
state["repos"][key] = repo
json_response(self, 201, repo)
def handle_POST_repos_owner_repo_labels(self, query):
"""POST /api/v1/repos/{owner}/{repo}/labels"""
require_token(self)
parts = self.path.split("/")
if len(parts) >= 6:
owner = parts[4]
repo = parts[5]
else:
json_response(self, 404, {"message": "repository not found"})
return
content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length).decode("utf-8")
data = json.loads(body) if body else {}
label_name = data.get("name")
label_color = data.get("color")
if not label_name or not label_color:
json_response(self, 400, {"message": "name and color are required"})
return
label_id = next_ids["labels"]
next_ids["labels"] += 1
key = f"{owner}/{repo}"
label = {
"id": label_id,
"name": label_name,
"color": label_color,
"description": data.get("description", ""),
"url": f"https://example.com/api/v1/repos/{key}/labels/{label_id}",
}
if key not in state["labels"]:
state["labels"][key] = []
state["labels"][key].append(label)
json_response(self, 201, label)
def handle_POST_repos_owner_repo_branch_protections(self, query):
"""POST /api/v1/repos/{owner}/{repo}/branch_protections"""
require_token(self)
parts = self.path.split("/")
if len(parts) >= 6:
owner = parts[4]
repo = parts[5]
else:
json_response(self, 404, {"message": "repository not found"})
return
content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length).decode("utf-8")
data = json.loads(body) if body else {}
branch_name = data.get("branch_name", "main")
key = f"{owner}/{repo}"
# Generate unique ID for protection
if key in state["protections"]:
protection_id = len(state["protections"][key]) + 1
else:
protection_id = 1
protection = {
"id": protection_id,
"repo_id": state["repos"].get(key, {}).get("id", 0),
"branch_name": branch_name,
"rule_name": data.get("rule_name", branch_name),
"enable_push": data.get("enable_push", False),
"enable_merge_whitelist": data.get("enable_merge_whitelist", True),
"merge_whitelist_usernames": data.get("merge_whitelist_usernames", ["admin"]),
"required_approvals": data.get("required_approvals", 1),
"apply_to_admins": data.get("apply_to_admins", True),
}
if key not in state["protections"]:
state["protections"][key] = []
state["protections"][key].append(protection)
json_response(self, 201, protection)
def handle_POST_user_applications_oauth2(self, query):
"""POST /api/v1/user/applications/oauth2"""
require_token(self)
content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length).decode("utf-8")
data = json.loads(body) if body else {}
app_name = data.get("name")
if not app_name:
json_response(self, 400, {"message": "name is required"})
return
app_id = next_ids["oauth2_apps"]
next_ids["oauth2_apps"] += 1
app = {
"id": app_id,
"name": app_name,
"client_id": str(uuid.uuid4()),
"client_secret": hashlib.sha256(str(uuid.uuid4()).encode()).hexdigest(),
"redirect_uris": data.get("redirect_uris", []),
"confidential_client": data.get("confidential_client", True),
"created_at": "2026-04-01T00:00:00Z",
}
state["oauth2_apps"].append(app)
json_response(self, 201, app)
def handle_PATCH_admin_users_username(self, query):
"""PATCH /api/v1/admin/users/{username}"""
if not require_token(self):
json_response(self, 401, {"message": "invalid authentication"})
return
parts = self.path.split("/")
if len(parts) >= 6:
username = parts[5]
else:
json_response(self, 404, {"message": "user does not exist"})
return
if username not in state["users"]:
json_response(self, 404, {"message": "user does not exist"})
return
content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length).decode("utf-8")
data = json.loads(body) if body else {}
user = state["users"][username]
for key, value in data.items():
# Map 'admin' to 'is_admin' for consistency
update_key = 'is_admin' if key == 'admin' else key
if update_key in user:
user[update_key] = value
json_response(self, 200, user)
def handle_PUT_repos_owner_repo_collaborators_collaborator(self, query):
"""PUT /api/v1/repos/{owner}/{repo}/collaborators/{collaborator}"""
require_token(self)
parts = self.path.split("/")
if len(parts) >= 8:
owner = parts[4]
repo = parts[5]
collaborator = parts[7]
else:
json_response(self, 404, {"message": "repository not found"})
return
content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length).decode("utf-8")
data = json.loads(body) if body else {}
key = f"{owner}/{repo}"
if key not in state["collaborators"]:
state["collaborators"][key] = set()
state["collaborators"][key].add(collaborator)
self.send_response(204)
self.send_header("Content-Length", 0)
self.end_headers()
def handle_404(self):
"""Return 404 for unknown routes."""
json_response(self, 404, {"message": "route not found"})
class ThreadingHTTPServer(ThreadingMixIn, HTTPServer):
"""Threaded HTTP server for handling concurrent requests."""
daemon_threads = True
def main():
"""Start the mock server."""
global SHUTDOWN_REQUESTED
port = int(os.environ.get("MOCK_FORGE_PORT", 3000))
server = ThreadingHTTPServer(("0.0.0.0", port), ForgejoHandler)
try:
server.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
except OSError:
pass # Not all platforms support this
print(f"Mock Forgejo server starting on port {port}", file=sys.stderr)
def shutdown_handler(signum, frame):
global SHUTDOWN_REQUESTED
SHUTDOWN_REQUESTED = True
# Can't call server.shutdown() directly from signal handler in threaded server
threading.Thread(target=server.shutdown, daemon=True).start()
signal.signal(signal.SIGTERM, shutdown_handler)
signal.signal(signal.SIGINT, shutdown_handler)
try:
server.serve_forever()
except KeyboardInterrupt:
pass
finally:
server.shutdown()
print("Mock Forgejo server stopped", file=sys.stderr)
if __name__ == "__main__":
main()