fix: feat: Forgejo API mock server for CI smoke tests (#123) #125
1 changed files with 636 additions and 0 deletions
636
tests/mock-forgejo.py
Executable file
636
tests/mock-forgejo.py
Executable file
|
|
@ -0,0 +1,636 @@
|
|||
#!/usr/bin/env python3
|
||||
"""Mock Forgejo API server for CI smoke tests.
|
||||
|
||||
Implements 15 Forgejo API endpoints that disinto init calls.
|
||||
State stored in-memory (dicts), responds instantly.
|
||||
"""
|
||||
|
||||
import base64
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import signal
|
||||
import socket
|
||||
import sys
|
||||
import threading
|
||||
import uuid
|
||||
from http.server import HTTPServer, BaseHTTPRequestHandler
|
||||
from socketserver import ThreadingMixIn
|
||||
from urllib.parse import parse_qs, urlparse
|
||||
|
||||
# Global state
|
||||
state = {
|
||||
"users": {}, # key: username -> user object
|
||||
"tokens": {}, # key: token_sha1 -> token object
|
||||
"repos": {}, # key: "owner/repo" -> repo object
|
||||
"orgs": {}, # key: orgname -> org object
|
||||
"labels": {}, # key: "owner/repo" -> list of labels
|
||||
"collaborators": {}, # key: "owner/repo" -> set of usernames
|
||||
"protections": {}, # key: "owner/repo" -> list of protections
|
||||
"oauth2_apps": [], # list of oauth2 app objects
|
||||
}
|
||||
|
||||
next_ids = {"users": 1, "tokens": 1, "repos": 1, "orgs": 1, "labels": 1, "oauth2_apps": 1}
|
||||
|
||||
SHUTDOWN_REQUESTED = False
|
||||
|
||||
|
||||
def log_request(handler, method, path, status):
|
||||
"""Log request details."""
|
||||
print(f"[{handler.log_date_time_string()}] {method} {path} {status}", file=sys.stderr)
|
||||
|
||||
|
||||
def json_response(handler, status, data):
|
||||
"""Send JSON response."""
|
||||
body = json.dumps(data).encode("utf-8")
|
||||
handler.send_response(status)
|
||||
handler.send_header("Content-Type", "application/json")
|
||||
handler.send_header("Content-Length", len(body))
|
||||
handler.end_headers()
|
||||
handler.wfile.write(body)
|
||||
|
||||
|
||||
def basic_auth_user(handler):
|
||||
"""Extract username from Basic auth header. Returns None if invalid."""
|
||||
auth_header = handler.headers.get("Authorization", "")
|
||||
if not auth_header.startswith("Basic "):
|
||||
return None
|
||||
try:
|
||||
decoded = base64.b64decode(auth_header[6:]).decode("utf-8")
|
||||
username, _ = decoded.split(":", 1)
|
||||
return username
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def token_auth_valid(handler):
|
||||
"""Check if Authorization header contains token. Doesn't validate value."""
|
||||
auth_header = handler.headers.get("Authorization", "")
|
||||
return auth_header.startswith("token ")
|
||||
|
||||
|
||||
def require_token(handler):
|
||||
"""Require token auth. Return user or None if invalid."""
|
||||
if not token_auth_valid(handler):
|
||||
return None
|
||||
return True # Any token is valid for mock purposes
|
||||
|
||||
|
||||
def require_basic_auth(handler, required_user=None):
|
||||
"""Require basic auth. Return username or None if invalid."""
|
||||
username = basic_auth_user(handler)
|
||||
if username is None:
|
||||
return None
|
||||
# Check user exists in state
|
||||
if username not in state["users"]:
|
||||
return None
|
||||
if required_user and username != required_user:
|
||||
return None
|
||||
return username
|
||||
|
||||
|
||||
class ForgejoHandler(BaseHTTPRequestHandler):
|
||||
"""HTTP request handler for mock Forgejo API."""
|
||||
|
||||
def log_message(self, format, *args):
|
||||
"""Override to use our logging."""
|
||||
pass # We log in do_request
|
||||
|
||||
def do_request(self, method):
|
||||
"""Route request to appropriate handler."""
|
||||
parsed = urlparse(self.path)
|
||||
path = parsed.path
|
||||
query = parse_qs(parsed.query)
|
||||
|
||||
log_request(self, method, self.path, "PENDING")
|
||||
|
||||
# Strip /api/v1/ prefix for routing (or leading slash for other routes)
|
||||
route_path = path
|
||||
if route_path.startswith("/api/v1/"):
|
||||
route_path = route_path[8:]
|
||||
elif route_path.startswith("/"):
|
||||
route_path = route_path.lstrip("/")
|
||||
|
||||
# Route to handler
|
||||
try:
|
||||
# First try exact match (with / replaced by _)
|
||||
handler_path = route_path.replace("/", "_")
|
||||
handler_name = f"handle_{method}_{handler_path}"
|
||||
handler = getattr(self, handler_name, None)
|
||||
|
||||
if handler:
|
||||
handler(query)
|
||||
else:
|
||||
# Try pattern matching for routes with dynamic segments
|
||||
self._handle_patterned_route(method, route_path, query)
|
||||
except Exception as e:
|
||||
log_request(self, method, self.path, 500)
|
||||
json_response(self, 500, {"message": str(e)})
|
||||
|
||||
def _handle_patterned_route(self, method, route_path, query):
|
||||
"""Handle routes with dynamic segments using pattern matching."""
|
||||
# Define patterns: (regex, handler_name)
|
||||
patterns = [
|
||||
# Users patterns
|
||||
(r"^users/([^/]+)$", f"handle_{method}_users_username"),
|
||||
(r"^users/([^/]+)/tokens$", f"handle_{method}_users_username_tokens"),
|
||||
# Repos patterns
|
||||
(r"^repos/([^/]+)/([^/]+)$", f"handle_{method}_repos_owner_repo"),
|
||||
(r"^repos/([^/]+)/([^/]+)/labels$", f"handle_{method}_repos_owner_repo_labels"),
|
||||
(r"^repos/([^/]+)/([^/]+)/branch_protections$", f"handle_{method}_repos_owner_repo_branch_protections"),
|
||||
(r"^repos/([^/]+)/([^/]+)/collaborators/([^/]+)$", f"handle_{method}_repos_owner_repo_collaborators_collaborator"),
|
||||
# Org patterns
|
||||
(r"^orgs/([^/]+)/repos$", f"handle_{method}_orgs_org_repos"),
|
||||
# User patterns
|
||||
(r"^user/repos$", f"handle_{method}_user_repos"),
|
||||
(r"^user/applications/oauth2$", f"handle_{method}_user_applications_oauth2"),
|
||||
# Admin patterns
|
||||
(r"^admin/users$", f"handle_{method}_admin_users"),
|
||||
(r"^admin/users/([^/]+)$", f"handle_{method}_admin_users_username"),
|
||||
# Org patterns
|
||||
(r"^orgs$", f"handle_{method}_orgs"),
|
||||
]
|
||||
|
||||
for pattern, handler_name in patterns:
|
||||
if re.match(pattern, route_path):
|
||||
handler = getattr(self, handler_name, None)
|
||||
if handler:
|
||||
handler(query)
|
||||
return
|
||||
|
||||
self.handle_404()
|
||||
|
||||
def do_GET(self):
|
||||
self.do_request("GET")
|
||||
|
||||
def do_POST(self):
|
||||
self.do_request("POST")
|
||||
|
||||
def do_PATCH(self):
|
||||
self.do_request("PATCH")
|
||||
|
||||
def do_PUT(self):
|
||||
self.do_request("PUT")
|
||||
|
||||
def handle_GET_version(self, query):
|
||||
"""GET /api/v1/version"""
|
||||
json_response(self, 200, {"version": "11.0.0-mock"})
|
||||
|
||||
def handle_GET_users_username(self, query):
|
||||
"""GET /api/v1/users/{username}"""
|
||||
# Extract username from path
|
||||
parts = self.path.split("/")
|
||||
if len(parts) >= 5:
|
||||
username = parts[4]
|
||||
else:
|
||||
json_response(self, 404, {"message": "user does not exist"})
|
||||
return
|
||||
|
||||
if username in state["users"]:
|
||||
json_response(self, 200, state["users"][username])
|
||||
else:
|
||||
json_response(self, 404, {"message": "user does not exist"})
|
||||
|
||||
def handle_GET_repos_owner_repo(self, query):
|
||||
"""GET /api/v1/repos/{owner}/{repo}"""
|
||||
parts = self.path.split("/")
|
||||
if len(parts) >= 6:
|
||||
owner = parts[4]
|
||||
repo = parts[5]
|
||||
else:
|
||||
json_response(self, 404, {"message": "repository not found"})
|
||||
return
|
||||
|
||||
key = f"{owner}/{repo}"
|
||||
if key in state["repos"]:
|
||||
json_response(self, 200, state["repos"][key])
|
||||
else:
|
||||
json_response(self, 404, {"message": "repository not found"})
|
||||
|
||||
def handle_GET_repos_owner_repo_labels(self, query):
|
||||
"""GET /api/v1/repos/{owner}/{repo}/labels"""
|
||||
parts = self.path.split("/")
|
||||
if len(parts) >= 6:
|
||||
owner = parts[4]
|
||||
repo = parts[5]
|
||||
else:
|
||||
json_response(self, 404, {"message": "repository not found"})
|
||||
return
|
||||
|
||||
require_token(self)
|
||||
|
||||
key = f"{owner}/{repo}"
|
||||
if key in state["labels"]:
|
||||
json_response(self, 200, state["labels"][key])
|
||||
else:
|
||||
json_response(self, 200, [])
|
||||
|
||||
def handle_GET_user_applications_oauth2(self, query):
|
||||
"""GET /api/v1/user/applications/oauth2"""
|
||||
require_token(self)
|
||||
json_response(self, 200, state["oauth2_apps"])
|
||||
|
||||
def handle_GET_mock_shutdown(self, query):
|
||||
"""GET /mock/shutdown"""
|
||||
global SHUTDOWN_REQUESTED
|
||||
SHUTDOWN_REQUESTED = True
|
||||
json_response(self, 200, {"status": "shutdown"})
|
||||
|
||||
def handle_POST_admin_users(self, query):
|
||||
"""POST /api/v1/admin/users"""
|
||||
require_token(self)
|
||||
|
||||
content_length = int(self.headers.get("Content-Length", 0))
|
||||
body = self.rfile.read(content_length).decode("utf-8")
|
||||
data = json.loads(body) if body else {}
|
||||
|
||||
username = data.get("username")
|
||||
email = data.get("email")
|
||||
|
||||
if not username or not email:
|
||||
json_response(self, 400, {"message": "username and email are required"})
|
||||
return
|
||||
|
||||
user_id = next_ids["users"]
|
||||
next_ids["users"] += 1
|
||||
|
||||
user = {
|
||||
"id": user_id,
|
||||
"login": username,
|
||||
"email": email,
|
||||
"full_name": data.get("full_name", ""),
|
||||
"is_admin": data.get("admin", False),
|
||||
"must_change_password": data.get("must_change_password", False),
|
||||
"login_name": data.get("login_name", username),
|
||||
"visibility": data.get("visibility", "public"),
|
||||
"avatar_url": f"https://seccdn.libravatar.org/avatar/{hashlib.md5(email.encode()).hexdigest()}",
|
||||
}
|
||||
|
||||
state["users"][username] = user
|
||||
json_response(self, 201, user)
|
||||
|
||||
def handle_POST_users_username_tokens(self, query):
|
||||
"""POST /api/v1/users/{username}/tokens"""
|
||||
username = require_basic_auth(self)
|
||||
if not username:
|
||||
json_response(self, 401, {"message": "invalid authentication"})
|
||||
return
|
||||
|
||||
content_length = int(self.headers.get("Content-Length", 0))
|
||||
body = self.rfile.read(content_length).decode("utf-8")
|
||||
data = json.loads(body) if body else {}
|
||||
|
||||
token_name = data.get("name")
|
||||
if not token_name:
|
||||
json_response(self, 400, {"message": "name is required"})
|
||||
return
|
||||
|
||||
token_id = next_ids["tokens"]
|
||||
next_ids["tokens"] += 1
|
||||
|
||||
# Deterministic token: sha256(username + name)[:40]
|
||||
token_str = hashlib.sha256(f"{username}{token_name}".encode()).hexdigest()[:40]
|
||||
|
||||
token = {
|
||||
"id": token_id,
|
||||
"name": token_name,
|
||||
"sha1": token_str,
|
||||
"scopes": data.get("scopes", ["all"]),
|
||||
"created_at": "2026-04-01T00:00:00Z",
|
||||
"expires_at": None,
|
||||
"username": username, # Store username for lookup
|
||||
}
|
||||
|
||||
state["tokens"][token_str] = token
|
||||
json_response(self, 201, token)
|
||||
|
||||
def handle_POST_orgs(self, query):
|
||||
"""POST /api/v1/orgs"""
|
||||
require_token(self)
|
||||
|
||||
content_length = int(self.headers.get("Content-Length", 0))
|
||||
body = self.rfile.read(content_length).decode("utf-8")
|
||||
data = json.loads(body) if body else {}
|
||||
|
||||
username = data.get("username")
|
||||
if not username:
|
||||
json_response(self, 400, {"message": "username is required"})
|
||||
return
|
||||
|
||||
org_id = next_ids["orgs"]
|
||||
next_ids["orgs"] += 1
|
||||
|
||||
org = {
|
||||
"id": org_id,
|
||||
"username": username,
|
||||
"full_name": username,
|
||||
"avatar_url": f"https://seccdn.libravatar.org/avatar/{hashlib.md5(username.encode()).hexdigest()}",
|
||||
"visibility": data.get("visibility", "public"),
|
||||
}
|
||||
|
||||
state["orgs"][username] = org
|
||||
json_response(self, 201, org)
|
||||
|
||||
def handle_POST_orgs_org_repos(self, query):
|
||||
"""POST /api/v1/orgs/{org}/repos"""
|
||||
require_token(self)
|
||||
|
||||
parts = self.path.split("/")
|
||||
if len(parts) >= 6:
|
||||
org = parts[4]
|
||||
else:
|
||||
json_response(self, 404, {"message": "organization not found"})
|
||||
return
|
||||
|
||||
content_length = int(self.headers.get("Content-Length", 0))
|
||||
body = self.rfile.read(content_length).decode("utf-8")
|
||||
data = json.loads(body) if body else {}
|
||||
|
||||
repo_name = data.get("name")
|
||||
if not repo_name:
|
||||
json_response(self, 400, {"message": "name is required"})
|
||||
return
|
||||
|
||||
repo_id = next_ids["repos"]
|
||||
next_ids["repos"] += 1
|
||||
|
||||
key = f"{org}/{repo_name}"
|
||||
repo = {
|
||||
"id": repo_id,
|
||||
"full_name": key,
|
||||
"name": repo_name,
|
||||
"owner": {"id": state["orgs"][org]["id"], "login": org},
|
||||
"empty": False,
|
||||
"default_branch": data.get("default_branch", "main"),
|
||||
"description": data.get("description", ""),
|
||||
"private": data.get("private", False),
|
||||
"html_url": f"https://example.com/{key}",
|
||||
"ssh_url": f"git@example.com:{key}.git",
|
||||
"clone_url": f"https://example.com/{key}.git",
|
||||
"created_at": "2026-04-01T00:00:00Z",
|
||||
}
|
||||
|
||||
state["repos"][key] = repo
|
||||
json_response(self, 201, repo)
|
||||
|
||||
def handle_POST_user_repos(self, query):
|
||||
"""POST /api/v1/user/repos"""
|
||||
require_token(self)
|
||||
|
||||
content_length = int(self.headers.get("Content-Length", 0))
|
||||
body = self.rfile.read(content_length).decode("utf-8")
|
||||
data = json.loads(body) if body else {}
|
||||
|
||||
repo_name = data.get("name")
|
||||
if not repo_name:
|
||||
json_response(self, 400, {"message": "name is required"})
|
||||
return
|
||||
|
||||
# Get authenticated user from token
|
||||
auth_header = self.headers.get("Authorization", "")
|
||||
token = auth_header.split(" ", 1)[1] if " " in auth_header else ""
|
||||
|
||||
# Find user by token (use stored username field)
|
||||
owner = None
|
||||
for tok_sha1, tok in state["tokens"].items():
|
||||
if tok_sha1 == token:
|
||||
owner = tok.get("username")
|
||||
break
|
||||
|
||||
if not owner:
|
||||
json_response(self, 401, {"message": "invalid token"})
|
||||
return
|
||||
|
||||
repo_id = next_ids["repos"]
|
||||
next_ids["repos"] += 1
|
||||
|
||||
key = f"{owner}/{repo_name}"
|
||||
repo = {
|
||||
"id": repo_id,
|
||||
"full_name": key,
|
||||
"name": repo_name,
|
||||
"owner": {"id": state["users"].get(owner, {}).get("id", 0), "login": owner},
|
||||
"empty": False,
|
||||
"default_branch": data.get("default_branch", "main"),
|
||||
"description": data.get("description", ""),
|
||||
"private": data.get("private", False),
|
||||
"html_url": f"https://example.com/{key}",
|
||||
"ssh_url": f"git@example.com:{key}.git",
|
||||
"clone_url": f"https://example.com/{key}.git",
|
||||
"created_at": "2026-04-01T00:00:00Z",
|
||||
}
|
||||
|
||||
state["repos"][key] = repo
|
||||
json_response(self, 201, repo)
|
||||
|
||||
def handle_POST_repos_owner_repo_labels(self, query):
|
||||
"""POST /api/v1/repos/{owner}/{repo}/labels"""
|
||||
require_token(self)
|
||||
|
||||
parts = self.path.split("/")
|
||||
if len(parts) >= 6:
|
||||
owner = parts[4]
|
||||
repo = parts[5]
|
||||
else:
|
||||
json_response(self, 404, {"message": "repository not found"})
|
||||
return
|
||||
|
||||
content_length = int(self.headers.get("Content-Length", 0))
|
||||
body = self.rfile.read(content_length).decode("utf-8")
|
||||
data = json.loads(body) if body else {}
|
||||
|
||||
label_name = data.get("name")
|
||||
label_color = data.get("color")
|
||||
|
||||
if not label_name or not label_color:
|
||||
json_response(self, 400, {"message": "name and color are required"})
|
||||
return
|
||||
|
||||
label_id = next_ids["labels"]
|
||||
next_ids["labels"] += 1
|
||||
|
||||
key = f"{owner}/{repo}"
|
||||
label = {
|
||||
"id": label_id,
|
||||
"name": label_name,
|
||||
"color": label_color,
|
||||
"description": data.get("description", ""),
|
||||
"url": f"https://example.com/api/v1/repos/{key}/labels/{label_id}",
|
||||
}
|
||||
|
||||
if key not in state["labels"]:
|
||||
state["labels"][key] = []
|
||||
state["labels"][key].append(label)
|
||||
json_response(self, 201, label)
|
||||
|
||||
def handle_POST_repos_owner_repo_branch_protections(self, query):
|
||||
"""POST /api/v1/repos/{owner}/{repo}/branch_protections"""
|
||||
require_token(self)
|
||||
|
||||
parts = self.path.split("/")
|
||||
if len(parts) >= 6:
|
||||
owner = parts[4]
|
||||
repo = parts[5]
|
||||
else:
|
||||
json_response(self, 404, {"message": "repository not found"})
|
||||
return
|
||||
|
||||
content_length = int(self.headers.get("Content-Length", 0))
|
||||
body = self.rfile.read(content_length).decode("utf-8")
|
||||
data = json.loads(body) if body else {}
|
||||
|
||||
branch_name = data.get("branch_name", "main")
|
||||
key = f"{owner}/{repo}"
|
||||
|
||||
# Generate unique ID for protection
|
||||
if key in state["protections"]:
|
||||
protection_id = len(state["protections"][key]) + 1
|
||||
else:
|
||||
protection_id = 1
|
||||
|
||||
protection = {
|
||||
"id": protection_id,
|
||||
"repo_id": state["repos"].get(key, {}).get("id", 0),
|
||||
"branch_name": branch_name,
|
||||
"rule_name": data.get("rule_name", branch_name),
|
||||
"enable_push": data.get("enable_push", False),
|
||||
"enable_merge_whitelist": data.get("enable_merge_whitelist", True),
|
||||
"merge_whitelist_usernames": data.get("merge_whitelist_usernames", ["admin"]),
|
||||
"required_approvals": data.get("required_approvals", 1),
|
||||
"apply_to_admins": data.get("apply_to_admins", True),
|
||||
}
|
||||
|
||||
if key not in state["protections"]:
|
||||
state["protections"][key] = []
|
||||
state["protections"][key].append(protection)
|
||||
json_response(self, 201, protection)
|
||||
|
||||
def handle_POST_user_applications_oauth2(self, query):
|
||||
"""POST /api/v1/user/applications/oauth2"""
|
||||
require_token(self)
|
||||
|
||||
content_length = int(self.headers.get("Content-Length", 0))
|
||||
body = self.rfile.read(content_length).decode("utf-8")
|
||||
data = json.loads(body) if body else {}
|
||||
|
||||
app_name = data.get("name")
|
||||
if not app_name:
|
||||
json_response(self, 400, {"message": "name is required"})
|
||||
return
|
||||
|
||||
app_id = next_ids["oauth2_apps"]
|
||||
next_ids["oauth2_apps"] += 1
|
||||
|
||||
app = {
|
||||
"id": app_id,
|
||||
"name": app_name,
|
||||
"client_id": str(uuid.uuid4()),
|
||||
"client_secret": hashlib.sha256(str(uuid.uuid4()).encode()).hexdigest(),
|
||||
"redirect_uris": data.get("redirect_uris", []),
|
||||
"confidential_client": data.get("confidential_client", True),
|
||||
"created_at": "2026-04-01T00:00:00Z",
|
||||
}
|
||||
|
||||
state["oauth2_apps"].append(app)
|
||||
json_response(self, 201, app)
|
||||
|
||||
def handle_PATCH_admin_users_username(self, query):
|
||||
"""PATCH /api/v1/admin/users/{username}"""
|
||||
if not require_token(self):
|
||||
json_response(self, 401, {"message": "invalid authentication"})
|
||||
return
|
||||
|
||||
parts = self.path.split("/")
|
||||
if len(parts) >= 6:
|
||||
username = parts[5]
|
||||
else:
|
||||
json_response(self, 404, {"message": "user does not exist"})
|
||||
return
|
||||
|
||||
if username not in state["users"]:
|
||||
json_response(self, 404, {"message": "user does not exist"})
|
||||
return
|
||||
|
||||
content_length = int(self.headers.get("Content-Length", 0))
|
||||
body = self.rfile.read(content_length).decode("utf-8")
|
||||
data = json.loads(body) if body else {}
|
||||
|
||||
user = state["users"][username]
|
||||
for key, value in data.items():
|
||||
# Map 'admin' to 'is_admin' for consistency
|
||||
update_key = 'is_admin' if key == 'admin' else key
|
||||
if update_key in user:
|
||||
user[update_key] = value
|
||||
|
||||
json_response(self, 200, user)
|
||||
|
||||
def handle_PUT_repos_owner_repo_collaborators_collaborator(self, query):
|
||||
"""PUT /api/v1/repos/{owner}/{repo}/collaborators/{collaborator}"""
|
||||
require_token(self)
|
||||
|
||||
parts = self.path.split("/")
|
||||
if len(parts) >= 8:
|
||||
owner = parts[4]
|
||||
repo = parts[5]
|
||||
collaborator = parts[7]
|
||||
else:
|
||||
json_response(self, 404, {"message": "repository not found"})
|
||||
return
|
||||
|
||||
content_length = int(self.headers.get("Content-Length", 0))
|
||||
body = self.rfile.read(content_length).decode("utf-8")
|
||||
data = json.loads(body) if body else {}
|
||||
|
||||
key = f"{owner}/{repo}"
|
||||
if key not in state["collaborators"]:
|
||||
state["collaborators"][key] = set()
|
||||
state["collaborators"][key].add(collaborator)
|
||||
|
||||
self.send_response(204)
|
||||
self.send_header("Content-Length", 0)
|
||||
self.end_headers()
|
||||
|
||||
def handle_404(self):
|
||||
"""Return 404 for unknown routes."""
|
||||
json_response(self, 404, {"message": "route not found"})
|
||||
|
||||
|
||||
class ThreadingHTTPServer(ThreadingMixIn, HTTPServer):
|
||||
"""Threaded HTTP server for handling concurrent requests."""
|
||||
daemon_threads = True
|
||||
|
||||
|
||||
def main():
|
||||
"""Start the mock server."""
|
||||
global SHUTDOWN_REQUESTED
|
||||
|
||||
port = int(os.environ.get("MOCK_FORGE_PORT", 3000))
|
||||
server = ThreadingHTTPServer(("0.0.0.0", port), ForgejoHandler)
|
||||
try:
|
||||
server.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
except OSError:
|
||||
pass # Not all platforms support this
|
||||
|
||||
print(f"Mock Forgejo server starting on port {port}", file=sys.stderr)
|
||||
|
||||
def shutdown_handler(signum, frame):
|
||||
global SHUTDOWN_REQUESTED
|
||||
SHUTDOWN_REQUESTED = True
|
||||
# Can't call server.shutdown() directly from signal handler in threaded server
|
||||
threading.Thread(target=server.shutdown, daemon=True).start()
|
||||
|
||||
signal.signal(signal.SIGTERM, shutdown_handler)
|
||||
signal.signal(signal.SIGINT, shutdown_handler)
|
||||
|
||||
try:
|
||||
server.serve_forever()
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
server.shutdown()
|
||||
print("Mock Forgejo server stopped", file=sys.stderr)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
Add table
Add a link
Reference in a new issue