fix: vision(#623): Forgejo OAuth gate for disinto-chat (#708)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
ci/woodpecker/pr/ci Pipeline was successful
ci/woodpecker/pr/smoke-init Pipeline was successful

Gate /chat/* behind Forgejo OAuth2 authorization-code flow.

- Extract generic _create_forgejo_oauth_app() helper in lib/ci-setup.sh;
  Woodpecker OAuth becomes a thin wrapper, chat gets its own app.
- bin/disinto init now creates TWO OAuth apps (woodpecker-ci + disinto-chat)
  and writes CHAT_OAUTH_CLIENT_ID / CHAT_OAUTH_CLIENT_SECRET to .env.
- docker/chat/server.py: new routes /chat/login (→ Forgejo authorize),
  /chat/oauth/callback (code→token exchange, user allowlist check, session
  cookie). All other /chat/* routes require a valid session or redirect to
  /chat/login. Session store is in-memory with 24h TTL.
- lib/generators.sh: pass FORGE_URL, CHAT_OAUTH_CLIENT_ID,
  CHAT_OAUTH_CLIENT_SECRET, EDGE_TUNNEL_FQDN, DISINTO_CHAT_ALLOWED_USERS
  to the chat container environment.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Claude 2026-04-12 01:52:16 +00:00
parent cf4e9983c2
commit 30e19f71e2
6 changed files with 376 additions and 69 deletions

View file

@ -3,19 +3,32 @@
disinto-chat server minimal HTTP backend for Claude chat UI.
Routes:
GET / serves index.html
GET /static/* serves static assets (htmx.min.js, etc.)
POST /chat spawns `claude --print` with user message, returns response
GET /ws reserved for future streaming upgrade (returns 501)
GET /chat/login 302 to Forgejo OAuth authorize
GET /chat/oauth/callback exchange code for token, validate user, set session
GET /chat/ serves index.html (session required)
GET /chat/static/* serves static assets (session required)
POST /chat spawns `claude --print` with user message (session required)
GET /ws reserved for future streaming upgrade (returns 501)
OAuth flow:
1. User hits any /chat/* route without a valid session cookie 302 /chat/login
2. /chat/login redirects to Forgejo /login/oauth/authorize
3. Forgejo redirects back to /chat/oauth/callback with ?code=...&state=...
4. Server exchanges code for access token, fetches /api/v1/user
5. Asserts user is in allowlist, sets HttpOnly session cookie
6. Redirects to /chat/
The claude binary is expected to be mounted from the host at /usr/local/bin/claude.
"""
import json
import os
import secrets
import subprocess
import sys
import time
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse, parse_qs
from urllib.parse import urlparse, parse_qs, urlencode
# Configuration
HOST = os.environ.get("CHAT_HOST", "0.0.0.0")
@ -24,6 +37,30 @@ UI_DIR = "/var/chat/ui"
STATIC_DIR = os.path.join(UI_DIR, "static")
CLAUDE_BIN = "/usr/local/bin/claude"
# OAuth configuration
FORGE_URL = os.environ.get("FORGE_URL", "http://localhost:3000")
CHAT_OAUTH_CLIENT_ID = os.environ.get("CHAT_OAUTH_CLIENT_ID", "")
CHAT_OAUTH_CLIENT_SECRET = os.environ.get("CHAT_OAUTH_CLIENT_SECRET", "")
EDGE_TUNNEL_FQDN = os.environ.get("EDGE_TUNNEL_FQDN", "")
# Allowed users — disinto-admin always allowed; CSV allowlist extends it
_allowed_csv = os.environ.get("DISINTO_CHAT_ALLOWED_USERS", "")
ALLOWED_USERS = {"disinto-admin"}
if _allowed_csv:
ALLOWED_USERS.update(u.strip() for u in _allowed_csv.split(",") if u.strip())
# Session cookie name
SESSION_COOKIE = "disinto_chat_session"
# Session TTL: 24 hours
SESSION_TTL = 24 * 60 * 60
# In-memory session store: token → {"user": str, "expires": float}
_sessions = {}
# Pending OAuth state tokens: state → expires (float)
_oauth_states = {}
# MIME types for static files
MIME_TYPES = {
".html": "text/html; charset=utf-8",
@ -37,14 +74,101 @@ MIME_TYPES = {
}
def _build_callback_uri():
"""Build the OAuth callback URI based on tunnel configuration."""
if EDGE_TUNNEL_FQDN:
return f"https://{EDGE_TUNNEL_FQDN}/chat/oauth/callback"
return "http://localhost/chat/oauth/callback"
def _session_cookie_flags():
"""Return cookie flags appropriate for the deployment mode."""
flags = "HttpOnly; SameSite=Lax; Path=/chat"
if EDGE_TUNNEL_FQDN:
flags += "; Secure"
return flags
def _validate_session(cookie_header):
"""Check session cookie and return username if valid, else None."""
if not cookie_header:
return None
for part in cookie_header.split(";"):
part = part.strip()
if part.startswith(SESSION_COOKIE + "="):
token = part[len(SESSION_COOKIE) + 1:]
session = _sessions.get(token)
if session and session["expires"] > time.time():
return session["user"]
# Expired — clean up
_sessions.pop(token, None)
return None
return None
def _gc_sessions():
"""Remove expired sessions (called opportunistically)."""
now = time.time()
expired = [k for k, v in _sessions.items() if v["expires"] <= now]
for k in expired:
del _sessions[k]
expired_states = [k for k, v in _oauth_states.items() if v <= now]
for k in expired_states:
del _oauth_states[k]
def _exchange_code_for_token(code):
"""Exchange an authorization code for an access token via Forgejo."""
import urllib.request
import urllib.error
data = urlencode({
"grant_type": "authorization_code",
"code": code,
"client_id": CHAT_OAUTH_CLIENT_ID,
"client_secret": CHAT_OAUTH_CLIENT_SECRET,
"redirect_uri": _build_callback_uri(),
}).encode()
req = urllib.request.Request(
f"{FORGE_URL}/login/oauth/access_token",
data=data,
headers={"Accept": "application/json", "Content-Type": "application/x-www-form-urlencoded"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=10) as resp:
return json.loads(resp.read().decode())
except (urllib.error.URLError, json.JSONDecodeError, OSError) as e:
print(f"OAuth token exchange failed: {e}", file=sys.stderr)
return None
def _fetch_user(access_token):
"""Fetch the authenticated user from Forgejo API."""
import urllib.request
import urllib.error
req = urllib.request.Request(
f"{FORGE_URL}/api/v1/user",
headers={"Authorization": f"token {access_token}", "Accept": "application/json"},
)
try:
with urllib.request.urlopen(req, timeout=10) as resp:
return json.loads(resp.read().decode())
except (urllib.error.URLError, json.JSONDecodeError, OSError) as e:
print(f"User fetch failed: {e}", file=sys.stderr)
return None
class ChatHandler(BaseHTTPRequestHandler):
"""HTTP request handler for disinto-chat."""
"""HTTP request handler for disinto-chat with Forgejo OAuth."""
def log_message(self, format, *args):
"""Log to stdout instead of stderr."""
"""Log to stderr."""
print(f"[{self.log_date_time_string()}] {format % args}", file=sys.stderr)
def send_error(self, code, message=None):
def send_error_page(self, code, message=None):
"""Custom error response."""
self.send_response(code)
self.send_header("Content-Type", "text/plain; charset=utf-8")
@ -52,47 +176,148 @@ class ChatHandler(BaseHTTPRequestHandler):
if message:
self.wfile.write(message.encode("utf-8"))
def _require_session(self):
"""Check session; redirect to /chat/login if missing. Returns username or None."""
user = _validate_session(self.headers.get("Cookie"))
if user:
return user
self.send_response(302)
self.send_header("Location", "/chat/login")
self.end_headers()
return None
def do_GET(self):
"""Handle GET requests."""
parsed = urlparse(self.path)
path = parsed.path
# OAuth routes (no session required)
if path == "/chat/login":
self.handle_login()
return
if path == "/chat/oauth/callback":
self.handle_oauth_callback(parsed.query)
return
# Serve index.html at root
if path == "/" or path == "/chat":
if path in ("/", "/chat", "/chat/"):
if not self._require_session():
return
self.serve_index()
return
# Serve static files
if path.startswith("/static/"):
if path.startswith("/chat/static/") or path.startswith("/static/"):
if not self._require_session():
return
self.serve_static(path)
return
# Reserved WebSocket endpoint (future use)
if path == "/ws" or path.startswith("/ws"):
self.send_error(501, "WebSocket upgrade not yet implemented")
self.send_error_page(501, "WebSocket upgrade not yet implemented")
return
# 404 for unknown paths
self.send_error(404, "Not found")
self.send_error_page(404, "Not found")
def do_POST(self):
"""Handle POST requests."""
parsed = urlparse(self.path)
path = parsed.path
# Chat endpoint
if path == "/chat" or path == "/chat/":
# Chat endpoint (session required)
if path in ("/chat", "/chat/"):
if not self._require_session():
return
self.handle_chat()
return
# 404 for unknown paths
self.send_error(404, "Not found")
self.send_error_page(404, "Not found")
def handle_login(self):
"""Redirect to Forgejo OAuth authorize endpoint."""
_gc_sessions()
if not CHAT_OAUTH_CLIENT_ID:
self.send_error_page(500, "Chat OAuth not configured (CHAT_OAUTH_CLIENT_ID missing)")
return
state = secrets.token_urlsafe(32)
_oauth_states[state] = time.time() + 600 # 10 min validity
params = urlencode({
"client_id": CHAT_OAUTH_CLIENT_ID,
"redirect_uri": _build_callback_uri(),
"response_type": "code",
"state": state,
})
self.send_response(302)
self.send_header("Location", f"{FORGE_URL}/login/oauth/authorize?{params}")
self.end_headers()
def handle_oauth_callback(self, query_string):
"""Exchange authorization code for token, validate user, set session."""
params = parse_qs(query_string)
code = params.get("code", [""])[0]
state = params.get("state", [""])[0]
# Validate state
expected_expiry = _oauth_states.pop(state, None) if state else None
if not expected_expiry or expected_expiry < time.time():
self.send_error_page(400, "Invalid or expired OAuth state")
return
if not code:
self.send_error_page(400, "Missing authorization code")
return
# Exchange code for access token
token_resp = _exchange_code_for_token(code)
if not token_resp or "access_token" not in token_resp:
self.send_error_page(502, "Failed to obtain access token from Forgejo")
return
access_token = token_resp["access_token"]
# Fetch user info
user_info = _fetch_user(access_token)
if not user_info or "login" not in user_info:
self.send_error_page(502, "Failed to fetch user info from Forgejo")
return
username = user_info["login"]
# Check allowlist
if username not in ALLOWED_USERS:
self.send_response(403)
self.send_header("Content-Type", "text/plain; charset=utf-8")
self.end_headers()
self.wfile.write(
f"Not authorised: user '{username}' is not in the allowed users list.\n".encode()
)
return
# Create session
session_token = secrets.token_urlsafe(48)
_sessions[session_token] = {
"user": username,
"expires": time.time() + SESSION_TTL,
}
cookie_flags = _session_cookie_flags()
self.send_response(302)
self.send_header("Set-Cookie", f"{SESSION_COOKIE}={session_token}; {cookie_flags}")
self.send_header("Location", "/chat/")
self.end_headers()
def serve_index(self):
"""Serve the main index.html file."""
index_path = os.path.join(UI_DIR, "index.html")
if not os.path.exists(index_path):
self.send_error(500, "UI not found")
self.send_error_page(500, "UI not found")
return
try:
@ -104,19 +329,23 @@ class ChatHandler(BaseHTTPRequestHandler):
self.end_headers()
self.wfile.write(content.encode("utf-8"))
except IOError as e:
self.send_error(500, f"Error reading index.html: {e}")
self.send_error_page(500, f"Error reading index.html: {e}")
def serve_static(self, path):
"""Serve static files from the static directory."""
# Sanitize path to prevent directory traversal
relative_path = path[len("/static/"):]
# Strip /chat/static/ or /static/ prefix
if path.startswith("/chat/static/"):
relative_path = path[len("/chat/static/"):]
else:
relative_path = path[len("/static/"):]
if ".." in relative_path or relative_path.startswith("/"):
self.send_error(403, "Forbidden")
self.send_error_page(403, "Forbidden")
return
file_path = os.path.join(STATIC_DIR, relative_path)
if not os.path.exists(file_path):
self.send_error(404, "Not found")
self.send_error_page(404, "Not found")
return
# Determine MIME type
@ -132,7 +361,7 @@ class ChatHandler(BaseHTTPRequestHandler):
self.end_headers()
self.wfile.write(content)
except IOError as e:
self.send_error(500, f"Error reading file: {e}")
self.send_error_page(500, f"Error reading file: {e}")
def handle_chat(self):
"""
@ -142,7 +371,7 @@ class ChatHandler(BaseHTTPRequestHandler):
# Read request body
content_length = int(self.headers.get("Content-Length", 0))
if content_length == 0:
self.send_error(400, "No message provided")
self.send_error_page(400, "No message provided")
return
body = self.rfile.read(content_length)
@ -152,16 +381,16 @@ class ChatHandler(BaseHTTPRequestHandler):
params = parse_qs(body_str)
message = params.get("message", [""])[0]
except (UnicodeDecodeError, KeyError):
self.send_error(400, "Invalid message format")
self.send_error_page(400, "Invalid message format")
return
if not message:
self.send_error(400, "Empty message")
self.send_error_page(400, "Empty message")
return
# Validate Claude binary exists
if not os.path.exists(CLAUDE_BIN):
self.send_error(500, "Claude CLI not found")
self.send_error_page(500, "Claude CLI not found")
return
try:
@ -186,7 +415,7 @@ class ChatHandler(BaseHTTPRequestHandler):
# Check for errors
if proc.returncode != 0:
self.send_error(500, f"Claude CLI failed with exit code {proc.returncode}")
self.send_error_page(500, f"Claude CLI failed with exit code {proc.returncode}")
return
self.send_response(200)
self.send_header("Content-Type", "text/plain; charset=utf-8")
@ -195,9 +424,9 @@ class ChatHandler(BaseHTTPRequestHandler):
self.wfile.write(response.encode("utf-8"))
except FileNotFoundError:
self.send_error(500, "Claude CLI not found")
self.send_error_page(500, "Claude CLI not found")
except Exception as e:
self.send_error(500, f"Error: {e}")
self.send_error_page(500, f"Error: {e}")
def main():
@ -205,7 +434,12 @@ def main():
server_address = (HOST, PORT)
httpd = HTTPServer(server_address, ChatHandler)
print(f"Starting disinto-chat server on {HOST}:{PORT}", file=sys.stderr)
print(f"UI available at http://localhost:{PORT}/", file=sys.stderr)
print(f"UI available at http://localhost:{PORT}/chat/", file=sys.stderr)
if CHAT_OAUTH_CLIENT_ID:
print(f"OAuth enabled (client_id={CHAT_OAUTH_CLIENT_ID[:8]}...)", file=sys.stderr)
print(f"Allowed users: {', '.join(sorted(ALLOWED_USERS))}", file=sys.stderr)
else:
print("WARNING: CHAT_OAUTH_CLIENT_ID not set — OAuth disabled", file=sys.stderr)
httpd.serve_forever()