#!/usr/bin/env python3 """ disinto-chat server — minimal HTTP backend for Claude chat UI. Routes: GET /chat/auth/verify → Caddy forward_auth callback (returns 200+X-Forwarded-User or 401) GET /chat/login → 302 to Forgejo OAuth authorize GET /chat/oauth/callback → exchange code for token, validate user, set session GET /chat/ → serves index.html (session required) GET /chat/static/* → serves static assets (session required) POST /chat → spawns `claude --print` with user message (session required) GET /ws → reserved for future streaming upgrade (returns 501) OAuth flow: 1. User hits any /chat/* route without a valid session cookie → 302 /chat/login 2. /chat/login redirects to Forgejo /login/oauth/authorize 3. Forgejo redirects back to /chat/oauth/callback with ?code=...&state=... 4. Server exchanges code for access token, fetches /api/v1/user 5. Asserts user is in allowlist, sets HttpOnly session cookie 6. Redirects to /chat/ The claude binary is expected to be mounted from the host at /usr/local/bin/claude. """ import datetime import json import os import secrets import subprocess import sys import time from http.server import HTTPServer, BaseHTTPRequestHandler from urllib.parse import urlparse, parse_qs, urlencode # Configuration HOST = os.environ.get("CHAT_HOST", "0.0.0.0") PORT = int(os.environ.get("CHAT_PORT", 8080)) UI_DIR = "/var/chat/ui" STATIC_DIR = os.path.join(UI_DIR, "static") CLAUDE_BIN = "/usr/local/bin/claude" # OAuth configuration FORGE_URL = os.environ.get("FORGE_URL", "http://localhost:3000") CHAT_OAUTH_CLIENT_ID = os.environ.get("CHAT_OAUTH_CLIENT_ID", "") CHAT_OAUTH_CLIENT_SECRET = os.environ.get("CHAT_OAUTH_CLIENT_SECRET", "") EDGE_TUNNEL_FQDN = os.environ.get("EDGE_TUNNEL_FQDN", "") # Shared secret for Caddy forward_auth verify endpoint (#709). # When set, only requests carrying this value in X-Forward-Auth-Secret are # allowed to call /chat/auth/verify. When empty the endpoint is unrestricted # (acceptable during local dev; production MUST set this). FORWARD_AUTH_SECRET = os.environ.get("FORWARD_AUTH_SECRET", "") # Rate limiting / cost caps (#711) CHAT_MAX_REQUESTS_PER_HOUR = int(os.environ.get("CHAT_MAX_REQUESTS_PER_HOUR", 60)) CHAT_MAX_REQUESTS_PER_DAY = int(os.environ.get("CHAT_MAX_REQUESTS_PER_DAY", 500)) CHAT_MAX_TOKENS_PER_DAY = int(os.environ.get("CHAT_MAX_TOKENS_PER_DAY", 1000000)) # Allowed users — disinto-admin always allowed; CSV allowlist extends it _allowed_csv = os.environ.get("DISINTO_CHAT_ALLOWED_USERS", "") ALLOWED_USERS = {"disinto-admin"} if _allowed_csv: ALLOWED_USERS.update(u.strip() for u in _allowed_csv.split(",") if u.strip()) # Session cookie name SESSION_COOKIE = "disinto_chat_session" # Session TTL: 24 hours SESSION_TTL = 24 * 60 * 60 # In-memory session store: token → {"user": str, "expires": float} _sessions = {} # Pending OAuth state tokens: state → expires (float) _oauth_states = {} # Per-user rate limiting state (#711) # user → list of request timestamps (for sliding-window hourly/daily caps) _request_log = {} # user → {"tokens": int, "date": "YYYY-MM-DD"} _daily_tokens = {} # MIME types for static files MIME_TYPES = { ".html": "text/html; charset=utf-8", ".js": "application/javascript; charset=utf-8", ".css": "text/css; charset=utf-8", ".json": "application/json; charset=utf-8", ".png": "image/png", ".jpg": "image/jpeg", ".svg": "image/svg+xml", ".ico": "image/x-icon", } def _build_callback_uri(): """Build the OAuth callback URI based on tunnel configuration.""" if EDGE_TUNNEL_FQDN: return f"https://{EDGE_TUNNEL_FQDN}/chat/oauth/callback" return "http://localhost/chat/oauth/callback" def _session_cookie_flags(): """Return cookie flags appropriate for the deployment mode.""" flags = "HttpOnly; SameSite=Lax; Path=/chat" if EDGE_TUNNEL_FQDN: flags += "; Secure" return flags def _validate_session(cookie_header): """Check session cookie and return username if valid, else None.""" if not cookie_header: return None for part in cookie_header.split(";"): part = part.strip() if part.startswith(SESSION_COOKIE + "="): token = part[len(SESSION_COOKIE) + 1:] session = _sessions.get(token) if session and session["expires"] > time.time(): return session["user"] # Expired — clean up _sessions.pop(token, None) return None return None def _gc_sessions(): """Remove expired sessions (called opportunistically).""" now = time.time() expired = [k for k, v in _sessions.items() if v["expires"] <= now] for k in expired: del _sessions[k] expired_states = [k for k, v in _oauth_states.items() if v <= now] for k in expired_states: del _oauth_states[k] def _exchange_code_for_token(code): """Exchange an authorization code for an access token via Forgejo.""" import urllib.request import urllib.error data = urlencode({ "grant_type": "authorization_code", "code": code, "client_id": CHAT_OAUTH_CLIENT_ID, "client_secret": CHAT_OAUTH_CLIENT_SECRET, "redirect_uri": _build_callback_uri(), }).encode() req = urllib.request.Request( f"{FORGE_URL}/login/oauth/access_token", data=data, headers={"Accept": "application/json", "Content-Type": "application/x-www-form-urlencoded"}, method="POST", ) try: with urllib.request.urlopen(req, timeout=10) as resp: return json.loads(resp.read().decode()) except (urllib.error.URLError, json.JSONDecodeError, OSError) as e: print(f"OAuth token exchange failed: {e}", file=sys.stderr) return None def _fetch_user(access_token): """Fetch the authenticated user from Forgejo API.""" import urllib.request import urllib.error req = urllib.request.Request( f"{FORGE_URL}/api/v1/user", headers={"Authorization": f"token {access_token}", "Accept": "application/json"}, ) try: with urllib.request.urlopen(req, timeout=10) as resp: return json.loads(resp.read().decode()) except (urllib.error.URLError, json.JSONDecodeError, OSError) as e: print(f"User fetch failed: {e}", file=sys.stderr) return None def _check_rate_limit(user): """Check per-user rate limits. Returns (allowed, retry_after, reason) (#711). Checks hourly request cap, daily request cap, and daily token cap. """ now = time.time() one_hour_ago = now - 3600 today = datetime.date.today().isoformat() # Prune old entries from request log timestamps = _request_log.get(user, []) timestamps = [t for t in timestamps if t > now - 86400] _request_log[user] = timestamps # Hourly request cap hourly = [t for t in timestamps if t > one_hour_ago] if len(hourly) >= CHAT_MAX_REQUESTS_PER_HOUR: oldest_in_window = min(hourly) retry_after = int(oldest_in_window + 3600 - now) + 1 return False, max(retry_after, 1), "hourly request limit" # Daily request cap start_of_day = time.mktime(datetime.date.today().timetuple()) daily = [t for t in timestamps if t >= start_of_day] if len(daily) >= CHAT_MAX_REQUESTS_PER_DAY: next_day = start_of_day + 86400 retry_after = int(next_day - now) + 1 return False, max(retry_after, 1), "daily request limit" # Daily token cap token_info = _daily_tokens.get(user, {"tokens": 0, "date": today}) if token_info["date"] != today: token_info = {"tokens": 0, "date": today} _daily_tokens[user] = token_info if token_info["tokens"] >= CHAT_MAX_TOKENS_PER_DAY: next_day = start_of_day + 86400 retry_after = int(next_day - now) + 1 return False, max(retry_after, 1), "daily token limit" return True, 0, "" def _record_request(user): """Record a request timestamp for the user (#711).""" _request_log.setdefault(user, []).append(time.time()) def _record_tokens(user, tokens): """Record token usage for the user (#711).""" today = datetime.date.today().isoformat() token_info = _daily_tokens.get(user, {"tokens": 0, "date": today}) if token_info["date"] != today: token_info = {"tokens": 0, "date": today} token_info["tokens"] += tokens _daily_tokens[user] = token_info def _parse_stream_json(output): """Parse stream-json output from claude --print (#711). Returns (text_content, total_tokens). Falls back gracefully if the usage event is absent or malformed. """ text_parts = [] total_tokens = 0 for line in output.splitlines(): line = line.strip() if not line: continue try: event = json.loads(line) except json.JSONDecodeError: continue etype = event.get("type", "") # Collect assistant text if etype == "content_block_delta": delta = event.get("delta", {}) if delta.get("type") == "text_delta": text_parts.append(delta.get("text", "")) elif etype == "assistant": # Full assistant message (non-streaming) content = event.get("content", "") if isinstance(content, str) and content: text_parts.append(content) elif isinstance(content, list): for block in content: if isinstance(block, dict) and block.get("text"): text_parts.append(block["text"]) # Parse usage from result event if etype == "result": usage = event.get("usage", {}) total_tokens = usage.get("input_tokens", 0) + usage.get("output_tokens", 0) elif "usage" in event: usage = event["usage"] if isinstance(usage, dict): total_tokens = usage.get("input_tokens", 0) + usage.get("output_tokens", 0) return "".join(text_parts), total_tokens class ChatHandler(BaseHTTPRequestHandler): """HTTP request handler for disinto-chat with Forgejo OAuth.""" def log_message(self, format, *args): """Log to stderr.""" print(f"[{self.log_date_time_string()}] {format % args}", file=sys.stderr) def send_error_page(self, code, message=None): """Custom error response.""" self.send_response(code) self.send_header("Content-Type", "text/plain; charset=utf-8") self.end_headers() if message: self.wfile.write(message.encode("utf-8")) def _require_session(self): """Check session; redirect to /chat/login if missing. Returns username or None.""" user = _validate_session(self.headers.get("Cookie")) if user: return user self.send_response(302) self.send_header("Location", "/chat/login") self.end_headers() return None def _check_forwarded_user(self, session_user): """Defense-in-depth: verify X-Forwarded-User matches session user (#709). Returns True if the request may proceed, False if a 403 was sent. When X-Forwarded-User is absent (forward_auth removed from Caddy), the request is rejected — fail-closed by design. """ forwarded = self.headers.get("X-Forwarded-User") if not forwarded: rid = self.headers.get("X-Request-Id", "-") print( f"WARN: missing X-Forwarded-User for session_user={session_user} " f"req_id={rid} — fail-closed (#709)", file=sys.stderr, ) self.send_error_page(403, "Forbidden: missing forwarded-user header") return False if forwarded != session_user: rid = self.headers.get("X-Request-Id", "-") print( f"WARN: X-Forwarded-User mismatch: header={forwarded} " f"session={session_user} req_id={rid} (#709)", file=sys.stderr, ) self.send_error_page(403, "Forbidden: user identity mismatch") return False return True def do_GET(self): """Handle GET requests.""" parsed = urlparse(self.path) path = parsed.path # Verify endpoint for Caddy forward_auth (#709) if path == "/chat/auth/verify": self.handle_auth_verify() return # OAuth routes (no session required) if path == "/chat/login": self.handle_login() return if path == "/chat/oauth/callback": self.handle_oauth_callback(parsed.query) return # Serve index.html at root if path in ("/", "/chat", "/chat/"): user = self._require_session() if not user: return if not self._check_forwarded_user(user): return self.serve_index() return # Serve static files if path.startswith("/chat/static/") or path.startswith("/static/"): user = self._require_session() if not user: return if not self._check_forwarded_user(user): return self.serve_static(path) return # Reserved WebSocket endpoint (future use) if path == "/ws" or path.startswith("/ws"): self.send_error_page(501, "WebSocket upgrade not yet implemented") return # 404 for unknown paths self.send_error_page(404, "Not found") def do_POST(self): """Handle POST requests.""" parsed = urlparse(self.path) path = parsed.path # Chat endpoint (session required) if path in ("/chat", "/chat/"): user = self._require_session() if not user: return if not self._check_forwarded_user(user): return self.handle_chat(user) return # 404 for unknown paths self.send_error_page(404, "Not found") def handle_auth_verify(self): """Caddy forward_auth callback — validate session and return X-Forwarded-User (#709). Caddy calls this endpoint for every /chat/* request. If the session cookie is valid the endpoint returns 200 with the X-Forwarded-User header set to the session username. Otherwise it returns 401 so Caddy knows the request is unauthenticated. Access control: when FORWARD_AUTH_SECRET is configured, the request must carry a matching X-Forward-Auth-Secret header (shared secret between Caddy and the chat backend). """ # Shared-secret gate if FORWARD_AUTH_SECRET: provided = self.headers.get("X-Forward-Auth-Secret", "") if not secrets.compare_digest(provided, FORWARD_AUTH_SECRET): self.send_error_page(403, "Forbidden: invalid forward-auth secret") return user = _validate_session(self.headers.get("Cookie")) if not user: self.send_error_page(401, "Unauthorized: no valid session") return self.send_response(200) self.send_header("X-Forwarded-User", user) self.send_header("Content-Type", "text/plain; charset=utf-8") self.end_headers() self.wfile.write(b"ok") def handle_login(self): """Redirect to Forgejo OAuth authorize endpoint.""" _gc_sessions() if not CHAT_OAUTH_CLIENT_ID: self.send_error_page(500, "Chat OAuth not configured (CHAT_OAUTH_CLIENT_ID missing)") return state = secrets.token_urlsafe(32) _oauth_states[state] = time.time() + 600 # 10 min validity params = urlencode({ "client_id": CHAT_OAUTH_CLIENT_ID, "redirect_uri": _build_callback_uri(), "response_type": "code", "state": state, }) self.send_response(302) self.send_header("Location", f"{FORGE_URL}/login/oauth/authorize?{params}") self.end_headers() def handle_oauth_callback(self, query_string): """Exchange authorization code for token, validate user, set session.""" params = parse_qs(query_string) code = params.get("code", [""])[0] state = params.get("state", [""])[0] # Validate state expected_expiry = _oauth_states.pop(state, None) if state else None if not expected_expiry or expected_expiry < time.time(): self.send_error_page(400, "Invalid or expired OAuth state") return if not code: self.send_error_page(400, "Missing authorization code") return # Exchange code for access token token_resp = _exchange_code_for_token(code) if not token_resp or "access_token" not in token_resp: self.send_error_page(502, "Failed to obtain access token from Forgejo") return access_token = token_resp["access_token"] # Fetch user info user_info = _fetch_user(access_token) if not user_info or "login" not in user_info: self.send_error_page(502, "Failed to fetch user info from Forgejo") return username = user_info["login"] # Check allowlist if username not in ALLOWED_USERS: self.send_response(403) self.send_header("Content-Type", "text/plain; charset=utf-8") self.end_headers() self.wfile.write( f"Not authorised: user '{username}' is not in the allowed users list.\n".encode() ) return # Create session session_token = secrets.token_urlsafe(48) _sessions[session_token] = { "user": username, "expires": time.time() + SESSION_TTL, } cookie_flags = _session_cookie_flags() self.send_response(302) self.send_header("Set-Cookie", f"{SESSION_COOKIE}={session_token}; {cookie_flags}") self.send_header("Location", "/chat/") self.end_headers() def serve_index(self): """Serve the main index.html file.""" index_path = os.path.join(UI_DIR, "index.html") if not os.path.exists(index_path): self.send_error_page(500, "UI not found") return try: with open(index_path, "r", encoding="utf-8") as f: content = f.read() self.send_response(200) self.send_header("Content-Type", MIME_TYPES[".html"]) self.send_header("Content-Length", len(content.encode("utf-8"))) self.end_headers() self.wfile.write(content.encode("utf-8")) except IOError as e: self.send_error_page(500, f"Error reading index.html: {e}") def serve_static(self, path): """Serve static files from the static directory.""" # Strip /chat/static/ or /static/ prefix if path.startswith("/chat/static/"): relative_path = path[len("/chat/static/"):] else: relative_path = path[len("/static/"):] if ".." in relative_path or relative_path.startswith("/"): self.send_error_page(403, "Forbidden") return file_path = os.path.join(STATIC_DIR, relative_path) if not os.path.exists(file_path): self.send_error_page(404, "Not found") return # Determine MIME type _, ext = os.path.splitext(file_path) content_type = MIME_TYPES.get(ext.lower(), "application/octet-stream") try: with open(file_path, "rb") as f: content = f.read() self.send_response(200) self.send_header("Content-Type", content_type) self.send_header("Content-Length", len(content)) self.end_headers() self.wfile.write(content) except IOError as e: self.send_error_page(500, f"Error reading file: {e}") def _send_rate_limit_response(self, retry_after, reason): """Send a 429 response with Retry-After header and HTMX fragment (#711).""" body = ( f'