#!/usr/bin/env python3 """ disinto-chat server — minimal HTTP backend for Claude chat UI. Routes: GET /chat/auth/verify -> Caddy forward_auth callback (returns 200+X-Forwarded-User or 401) GET /chat/login -> 302 to Forgejo OAuth authorize GET /chat/oauth/callback -> exchange code for token, validate user, set session GET /chat/ -> serves index.html (session required) GET /chat/static/* -> serves static assets (session required) POST /chat -> spawns `claude --print` with user message (session required) GET /ws -> reserved for future streaming upgrade (returns 501) OAuth flow: 1. User hits any /chat/* route without a valid session cookie -> 302 /chat/login 2. /chat/login redirects to Forgejo /login/oauth/authorize 3. Forgejo redirects back to /chat/oauth/callback with ?code=...&state=... 4. Server exchanges code for access token, fetches /api/v1/user 5. Asserts user is in allowlist, sets HttpOnly session cookie 6. Redirects to /chat/ The claude binary is expected to be mounted from the host at /usr/local/bin/claude. """ import datetime import json import os import re import secrets import subprocess import sys import time import urllib.request import urllib.error from http.server import HTTPServer, BaseHTTPRequestHandler from urllib.parse import urlparse, parse_qs, urlencode # Configuration HOST = os.environ.get("CHAT_HOST", "0.0.0.0") PORT = int(os.environ.get("CHAT_PORT", 8080)) UI_DIR = "/var/chat/ui" STATIC_DIR = os.path.join(UI_DIR, "static") CLAUDE_BIN = "/usr/local/bin/claude" # OAuth configuration FORGE_URL = os.environ.get("FORGE_URL", "http://localhost:3000") CHAT_OAUTH_CLIENT_ID = os.environ.get("CHAT_OAUTH_CLIENT_ID", "") CHAT_OAUTH_CLIENT_SECRET = os.environ.get("CHAT_OAUTH_CLIENT_SECRET", "") EDGE_TUNNEL_FQDN = os.environ.get("EDGE_TUNNEL_FQDN", "") # Shared secret for Caddy forward_auth verify endpoint (#709). # When set, only requests carrying this value in X-Forward-Auth-Secret are # allowed to call /chat/auth/verify. When empty the endpoint is unrestricted # (acceptable during local dev; production MUST set this). FORWARD_AUTH_SECRET = os.environ.get("FORWARD_AUTH_SECRET", "") # Rate limiting / cost caps (#711) CHAT_MAX_REQUESTS_PER_HOUR = int(os.environ.get("CHAT_MAX_REQUESTS_PER_HOUR", 60)) CHAT_MAX_REQUESTS_PER_DAY = int(os.environ.get("CHAT_MAX_REQUESTS_PER_DAY", 500)) CHAT_MAX_TOKENS_PER_DAY = int(os.environ.get("CHAT_MAX_TOKENS_PER_DAY", 1000000)) # Action endpoints configuration (#712) WOODPECKER_TOKEN = os.environ.get("WOODPECKER_TOKEN", "") WOODPECKER_URL = os.environ.get("WOODPECKER_URL", "http://woodpecker:8000") FORGE_TOKEN = os.environ.get("FORGE_TOKEN", "") FORGE_URL = os.environ.get("FORGE_URL", "http://forgejo:3000") FORGE_OWNER = os.environ.get("FORGE_OWNER", "") FORGE_REPO = os.environ.get("FORGE_REPO", "") # Allowed users - disinto-admin always allowed; CSV allowlist extends it _allowed_csv = os.environ.get("DISINTO_CHAT_ALLOWED_USERS", "") ALLOWED_USERS = {"disinto-admin"} if _allowed_csv: ALLOWED_USERS.update(u.strip() for u in _allowed_csv.split(",") if u.strip()) # Session cookie name SESSION_COOKIE = "disinto_chat_session" # Session TTL: 24 hours SESSION_TTL = 24 * 60 * 60 # Chat history directory (bind-mounted from host) CHAT_HISTORY_DIR = os.environ.get("CHAT_HISTORY_DIR", "/var/lib/chat/history") # Regex for valid conversation_id (12-char hex, no slashes) CONVERSATION_ID_PATTERN = re.compile(r"^[0-9a-f]{12}$") # In-memory session store: token -> {"user": str, "expires": float} _sessions = {} # Pending OAuth state tokens: state -> expires (float) _oauth_states = {} # Per-user rate limiting state (#711) # user -> list of request timestamps (for sliding-window hourly/daily caps) _request_log = {} # user -> {"tokens": int, "date": "YYYY-MM-DD"} _daily_tokens = {} # MIME types for static files MIME_TYPES = { ".html": "text/html; charset=utf-8", ".js": "application/javascript; charset=utf-8", ".css": "text/css; charset=utf-8", ".json": "application/json; charset=utf-8", ".png": "image/png", ".jpg": "image/jpeg", ".svg": "image/svg+xml", ".ico": "image/x-icon", } def _build_callback_uri(): """Build the OAuth callback URI based on tunnel configuration.""" if EDGE_TUNNEL_FQDN: return f"https://{EDGE_TUNNEL_FQDN}/chat/oauth/callback" return "http://localhost/chat/oauth/callback" def _session_cookie_flags(): """Return cookie flags appropriate for the deployment mode.""" flags = "HttpOnly; SameSite=Lax; Path=/chat" if EDGE_TUNNEL_FQDN: flags += "; Secure" return flags def _validate_session(cookie_header): """Check session cookie and return username if valid, else None.""" if not cookie_header: return None for part in cookie_header.split(";"): part = part.strip() if part.startswith(SESSION_COOKIE + "="): token = part[len(SESSION_COOKIE) + 1:] session = _sessions.get(token) if session and session["expires"] > time.time(): return session["user"] # Expired - clean up _sessions.pop(token, None) return None return None def _gc_sessions(): """Remove expired sessions (called opportunistically).""" now = time.time() expired = [k for k, v in _sessions.items() if v["expires"] <= now] for k in expired: del _sessions[k] expired_states = [k for k, v in _oauth_states.items() if v <= now] for k in expired_states: del _oauth_states[k] def _exchange_code_for_token(code): """Exchange an authorization code for an access token via Forgejo.""" import urllib.request import urllib.error data = urlencode({ "grant_type": "authorization_code", "code": code, "client_id": CHAT_OAUTH_CLIENT_ID, "client_secret": CHAT_OAUTH_CLIENT_SECRET, "redirect_uri": _build_callback_uri(), }).encode() req = urllib.request.Request( f"{FORGE_URL}/login/oauth/access_token", data=data, headers={"Accept": "application/json", "Content-Type": "application/x-www-form-urlencoded"}, method="POST", ) try: with urllib.request.urlopen(req, timeout=10) as resp: return json.loads(resp.read().decode()) except (urllib.error.URLError, json.JSONDecodeError, OSError) as e: print(f"OAuth token exchange failed: {e}", file=sys.stderr) return None def _fetch_user(access_token): """Fetch the authenticated user from Forgejo API.""" import urllib.request import urllib.error req = urllib.request.Request( f"{FORGE_URL}/api/v1/user", headers={"Authorization": f"token {access_token}", "Accept": "application/json"}, ) try: with urllib.request.urlopen(req, timeout=10) as resp: return json.loads(resp.read().decode()) except (urllib.error.URLError, json.JSONDecodeError, OSError) as e: print(f"User fetch failed: {e}", file=sys.stderr) return None # ============================================================================= # Rate Limiting Functions (#711) # ============================================================================= def _check_rate_limit(user): """Check per-user rate limits. Returns (allowed, retry_after, reason) (#711). Checks hourly request cap, daily request cap, and daily token cap. """ now = time.time() one_hour_ago = now - 3600 today = datetime.date.today().isoformat() # Prune old entries from request log timestamps = _request_log.get(user, []) timestamps = [t for t in timestamps if t > now - 86400] _request_log[user] = timestamps # Hourly request cap hourly = [t for t in timestamps if t > one_hour_ago] if len(hourly) >= CHAT_MAX_REQUESTS_PER_HOUR: oldest_in_window = min(hourly) retry_after = int(oldest_in_window + 3600 - now) + 1 return False, max(retry_after, 1), "hourly request limit" # Daily request cap start_of_day = time.mktime(datetime.date.today().timetuple()) daily = [t for t in timestamps if t >= start_of_day] if len(daily) >= CHAT_MAX_REQUESTS_PER_DAY: next_day = start_of_day + 86400 retry_after = int(next_day - now) + 1 return False, max(retry_after, 1), "daily request limit" # Daily token cap token_info = _daily_tokens.get(user, {"tokens": 0, "date": today}) if token_info["date"] != today: token_info = {"tokens": 0, "date": today} _daily_tokens[user] = token_info if token_info["tokens"] >= CHAT_MAX_TOKENS_PER_DAY: next_day = start_of_day + 86400 retry_after = int(next_day - now) + 1 return False, max(retry_after, 1), "daily token limit" return True, 0, "" def _record_request(user): """Record a request timestamp for the user (#711).""" _request_log.setdefault(user, []).append(time.time()) def _record_tokens(user, tokens): """Record token usage for the user (#711).""" today = datetime.date.today().isoformat() token_info = _daily_tokens.get(user, {"tokens": 0, "date": today}) if token_info["date"] != today: token_info = {"tokens": 0, "date": today} token_info["tokens"] += tokens _daily_tokens[user] = token_info def _parse_stream_json(output): """Parse stream-json output from claude --print (#711). Returns (text_content, total_tokens). Falls back gracefully if the usage event is absent or malformed. """ text_parts = [] total_tokens = 0 for line in output.splitlines(): line = line.strip() if not line: continue try: event = json.loads(line) except json.JSONDecodeError: continue etype = event.get("type", "") # Collect assistant text if etype == "content_block_delta": delta = event.get("delta", {}) if delta.get("type") == "text_delta": text_parts.append(delta.get("text", "")) elif etype == "assistant": # Full assistant message (non-streaming) content = event.get("content", "") if isinstance(content, str) and content: text_parts.append(content) elif isinstance(content, list): for block in content: if isinstance(block, dict) and block.get("text"): text_parts.append(block["text"]) # Parse usage from result event if etype == "result": usage = event.get("usage", {}) total_tokens = usage.get("input_tokens", 0) + usage.get("output_tokens", 0) elif "usage" in event: usage = event["usage"] if isinstance(usage, dict): total_tokens = usage.get("input_tokens", 0) + usage.get("output_tokens", 0) return "".join(text_parts), total_tokens # ============================================================================= # Conversation History Functions (#710) # ============================================================================= def _generate_conversation_id(): """Generate a new conversation ID (12-char hex string).""" return secrets.token_hex(6) def _validate_conversation_id(conv_id): """Validate that conversation_id matches the required format.""" return bool(CONVERSATION_ID_PATTERN.match(conv_id)) def _get_user_history_dir(user): """Get the history directory path for a user.""" return os.path.join(CHAT_HISTORY_DIR, user) def _get_conversation_path(user, conv_id): """Get the full path to a conversation file.""" user_dir = _get_user_history_dir(user) return os.path.join(user_dir, f"{conv_id}.ndjson") def _ensure_user_dir(user): """Ensure the user's history directory exists.""" user_dir = _get_user_history_dir(user) os.makedirs(user_dir, exist_ok=True) return user_dir def _write_message(user, conv_id, role, content): """Append a message to a conversation file in NDJSON format.""" conv_path = _get_conversation_path(user, conv_id) _ensure_user_dir(user) record = { "ts": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "user": user, "role": role, "content": content, } with open(conv_path, "a", encoding="utf-8") as f: f.write(json.dumps(record, ensure_ascii=False) + "\n") def _read_conversation(user, conv_id): """Read all messages from a conversation file.""" conv_path = _get_conversation_path(user, conv_id) messages = [] if not os.path.exists(conv_path): return None try: with open(conv_path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if line: try: messages.append(json.loads(line)) except json.JSONDecodeError: # Skip malformed lines continue except IOError: return None return messages def _list_user_conversations(user): """List all conversation files for a user with first message preview.""" user_dir = _get_user_history_dir(user) conversations = [] if not os.path.exists(user_dir): return conversations try: for filename in os.listdir(user_dir): if not filename.endswith(".ndjson"): continue conv_id = filename[:-7] # Remove .ndjson extension if not _validate_conversation_id(conv_id): continue conv_path = os.path.join(user_dir, filename) messages = _read_conversation(user, conv_id) if messages: first_msg = messages[0] preview = first_msg.get("content", "")[:50] if len(first_msg.get("content", "")) > 50: preview += "..." conversations.append({ "id": conv_id, "created_at": first_msg.get("ts", ""), "preview": preview, "message_count": len(messages), }) else: # Empty conversation file conversations.append({ "id": conv_id, "created_at": "", "preview": "(empty)", "message_count": 0, }) except OSError: pass # Sort by created_at descending conversations.sort(key=lambda x: x["created_at"] or "", reverse=True) return conversations def _delete_conversation(user, conv_id): """Delete a conversation file.""" conv_path = _get_conversation_path(user, conv_id) if os.path.exists(conv_path): os.remove(conv_path) return True return False # ============================================================================= # Action Endpoints (#712) # ============================================================================= def _write_action_record(user, conv_id, action_type, payload, response_data): """Write an action record to the conversation history.""" record = { "ts": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "user": user, "role": "action", "action_type": action_type, "payload": payload, "response": response_data, } conv_path = _get_conversation_path(user, conv_id) _ensure_user_dir(user) with open(conv_path, "a", encoding="utf-8") as f: f.write(json.dumps(record, ensure_ascii=False) + "\n") def _trigger_woodpecker_pipeline(repo, branch): """Trigger a Woodpecker CI pipeline for the given repo and branch. Woodpecker API: POST /api/v1/repos/{owner}/{repo}/pipeline Returns dict with success status and response data. """ if not WOODPECKER_TOKEN: return {"success": False, "error": "WOODPECKER_TOKEN not configured"} if not FORGE_OWNER or not FORGE_REPO: return {"success": False, "error": "FORGE_OWNER and FORGE_REPO not configured"} try: url = f"{WOODPECKER_URL}/api/v1/repos/{FORGE_OWNER}/{FORGE_REPO}/pipeline" data = json.dumps({"branch": branch, "event": "push"}).encode("utf-8") req = urllib.request.Request( url, data=data, headers={ "Authorization": f"token {WOODPECKER_TOKEN}", "Content-Type": "application/json", "Accept": "application/json", }, method="POST", ) with urllib.request.urlopen(req, timeout=30) as resp: result = json.loads(resp.read().decode()) return {"success": True, "data": result} except urllib.error.HTTPError as e: try: body = json.loads(e.read().decode()) return {"success": False, "error": str(e.reason), "details": body} except (json.JSONDecodeError, UnicodeDecodeError): return {"success": False, "error": str(e.reason)} except urllib.error.URLError as e: return {"success": False, "error": f"Network error: {e.reason}"} except json.JSONDecodeError as e: return {"success": False, "error": f"Invalid JSON response: {e}"} except Exception as e: return {"success": False, "error": str(e)} def _create_forgejo_issue(title, body, labels=None): """Create a Forgejo issue. Forgejo API: POST /api/v1/repos/{owner}/{repo}/issues Returns dict with success status and response data. """ if not FORGE_TOKEN: return {"success": False, "error": "FORGE_TOKEN not configured"} if not FORGE_OWNER or not FORGE_REPO: return {"success": False, "error": "FORGE_OWNER and FORGE_REPO not configured"} if not title: return {"success": False, "error": "Title is required"} try: url = f"{FORGE_URL}/api/v1/repos/{FORGE_OWNER}/{FORGE_REPO}/issues" payload = { "title": title, "body": body or "", } if labels: payload["labels"] = labels data = json.dumps(payload).encode("utf-8") req = urllib.request.Request( url, data=data, headers={ "Authorization": f"token {FORGE_TOKEN}", "Content-Type": "application/json", "Accept": "application/json", }, method="POST", ) with urllib.request.urlopen(req, timeout=30) as resp: result = json.loads(resp.read().decode()) return {"success": True, "data": result} except urllib.error.HTTPError as e: try: body = json.loads(e.read().decode()) return {"success": False, "error": str(e.reason), "details": body} except (json.JSONDecodeError, UnicodeDecodeError): return {"success": False, "error": str(e.reason)} except urllib.error.URLError as e: return {"success": False, "error": f"Network error: {e.reason}"} except json.JSONDecodeError as e: return {"success": False, "error": f"Invalid JSON response: {e}"} except Exception as e: return {"success": False, "error": str(e)} def _create_forgejo_pull_request(head, base, title, body=None): """Create a Forgejo pull request. Forgejo API: POST /api/v1/repos/{owner}/{repo}/pulls Returns dict with success status and response data. """ if not FORGE_TOKEN: return {"success": False, "error": "FORGE_TOKEN not configured"} if not FORGE_OWNER or not FORGE_REPO: return {"success": False, "error": "FORGE_OWNER and FORGE_REPO not configured"} if not head or not base or not title: return {"success": False, "error": "head, base, and title are required"} try: url = f"{FORGE_URL}/api/v1/repos/{FORGE_OWNER}/{FORGE_REPO}/pulls" payload = { "head": head, "base": base, "title": title, "body": body or "", } data = json.dumps(payload).encode("utf-8") req = urllib.request.Request( url, data=data, headers={ "Authorization": f"token {FORGE_TOKEN}", "Content-Type": "application/json", "Accept": "application/json", }, method="POST", ) with urllib.request.urlopen(req, timeout=30) as resp: result = json.loads(resp.read().decode()) return {"success": True, "data": result} except urllib.error.HTTPError as e: try: body = json.loads(e.read().decode()) return {"success": False, "error": str(e.reason), "details": body} except (json.JSONDecodeError, UnicodeDecodeError): return {"success": False, "error": str(e.reason)} except urllib.error.URLError as e: return {"success": False, "error": f"Network error: {e.reason}"} except json.JSONDecodeError as e: return {"success": False, "error": f"Invalid JSON response: {e}"} except Exception as e: return {"success": False, "error": str(e)} class ChatHandler(BaseHTTPRequestHandler): """HTTP request handler for disinto-chat with Forgejo OAuth.""" def log_message(self, format, *args): """Log to stderr.""" print(f"[{self.log_date_time_string()}] {format % args}", file=sys.stderr) def send_error_page(self, code, message=None): """Custom error response.""" self.send_response(code) self.send_header("Content-Type", "text/plain; charset=utf-8") self.end_headers() if message: self.wfile.write(message.encode("utf-8")) def _require_session(self): """Check session; redirect to /chat/login if missing. Returns username or None.""" user = _validate_session(self.headers.get("Cookie")) if user: return user self.send_response(302) self.send_header("Location", "/chat/login") self.end_headers() return None def _check_forwarded_user(self, session_user): """Defense-in-depth: verify X-Forwarded-User matches session user (#709). Returns True if the request may proceed, False if a 403 was sent. When X-Forwarded-User is absent (forward_auth removed from Caddy), the request is rejected - fail-closed by design. """ forwarded = self.headers.get("X-Forwarded-User") if not forwarded: rid = self.headers.get("X-Request-Id", "-") print( f"WARN: missing X-Forwarded-User for session_user={session_user} " f"req_id={rid} - fail-closed (#709)", file=sys.stderr, ) self.send_error_page(403, "Forbidden: missing forwarded-user header") return False if forwarded != session_user: rid = self.headers.get("X-Request-Id", "-") print( f"WARN: X-Forwarded-User mismatch: header={forwarded} " f"session={session_user} req_id={rid} (#709)", file=sys.stderr, ) self.send_error_page(403, "Forbidden: user identity mismatch") return False return True def do_GET(self): """Handle GET requests.""" parsed = urlparse(self.path) path = parsed.path # Verify endpoint for Caddy forward_auth (#709) if path == "/chat/auth/verify": self.handle_auth_verify() return # OAuth routes (no session required) if path == "/chat/login": self.handle_login() return if path == "/chat/oauth/callback": self.handle_oauth_callback(parsed.query) return # Conversation list endpoint: GET /chat/history if path == "/chat/history": user = self._require_session() if not user: return if not self._check_forwarded_user(user): return self.handle_conversation_list(user) return # Single conversation endpoint: GET /chat/history/ if path.startswith("/chat/history/"): user = self._require_session() if not user: return if not self._check_forwarded_user(user): return conv_id = path[len("/chat/history/"):] self.handle_conversation_get(user, conv_id) return # Serve index.html at root if path in ("/", "/chat", "/chat/"): user = self._require_session() if not user: return if not self._check_forwarded_user(user): return self.serve_index() return # Serve static files if path.startswith("/chat/static/") or path.startswith("/static/"): user = self._require_session() if not user: return if not self._check_forwarded_user(user): return self.serve_static(path) return # Reserved WebSocket endpoint (future use) if path == "/ws" or path.startswith("/ws"): self.send_error_page(501, "WebSocket upgrade not yet implemented") return # 404 for unknown paths self.send_error_page(404, "Not found") def do_POST(self): """Handle POST requests.""" parsed = urlparse(self.path) path = parsed.path # Action endpoints (#712) if path in ("/chat/action/ci-run", "/chat/action/issue-create", "/chat/action/pr-create"): user = self._require_session() if not user: return if not self._check_forwarded_user(user): return self.handle_action(user, path) return # New conversation endpoint (session required) if path == "/chat/new": user = self._require_session() if not user: return if not self._check_forwarded_user(user): return self.handle_new_conversation(user) return # Chat endpoint (session required) if path in ("/chat", "/chat/"): user = self._require_session() if not user: return if not self._check_forwarded_user(user): return self.handle_chat(user) return # 404 for unknown paths self.send_error_page(404, "Not found") def handle_auth_verify(self): """Caddy forward_auth callback - validate session and return X-Forwarded-User (#709). Caddy calls this endpoint for every /chat/* request. If the session cookie is valid the endpoint returns 200 with the X-Forwarded-User header set to the session username. Otherwise it returns 401 so Caddy knows the request is unauthenticated. Access control: when FORWARD_AUTH_SECRET is configured, the request must carry a matching X-Forward-Auth-Secret header (shared secret between Caddy and the chat backend). """ # Shared-secret gate if FORWARD_AUTH_SECRET: provided = self.headers.get("X-Forward-Auth-Secret", "") if not secrets.compare_digest(provided, FORWARD_AUTH_SECRET): self.send_error_page(403, "Forbidden: invalid forward-auth secret") return user = _validate_session(self.headers.get("Cookie")) if not user: self.send_error_page(401, "Unauthorized: no valid session") return self.send_response(200) self.send_header("X-Forwarded-User", user) self.send_header("Content-Type", "text/plain; charset=utf-8") self.end_headers() self.wfile.write(b"ok") def handle_login(self): """Redirect to Forgejo OAuth authorize endpoint.""" _gc_sessions() if not CHAT_OAUTH_CLIENT_ID: self.send_error_page(500, "Chat OAuth not configured (CHAT_OAUTH_CLIENT_ID missing)") return state = secrets.token_urlsafe(32) _oauth_states[state] = time.time() + 600 # 10 min validity params = urlencode({ "client_id": CHAT_OAUTH_CLIENT_ID, "redirect_uri": _build_callback_uri(), "response_type": "code", "state": state, }) self.send_response(302) self.send_header("Location", f"{FORGE_URL}/login/oauth/authorize?{params}") self.end_headers() def handle_oauth_callback(self, query_string): """Exchange authorization code for token, validate user, set session.""" params = parse_qs(query_string) code = params.get("code", [""])[0] state = params.get("state", [""])[0] # Validate state expected_expiry = _oauth_states.pop(state, None) if state else None if not expected_expiry or expected_expiry < time.time(): self.send_error_page(400, "Invalid or expired OAuth state") return if not code: self.send_error_page(400, "Missing authorization code") return # Exchange code for access token token_resp = _exchange_code_for_token(code) if not token_resp or "access_token" not in token_resp: self.send_error_page(502, "Failed to obtain access token from Forgejo") return access_token = token_resp["access_token"] # Fetch user info user_info = _fetch_user(access_token) if not user_info or "login" not in user_info: self.send_error_page(502, "Failed to fetch user info from Forgejo") return username = user_info["login"] # Check allowlist if username not in ALLOWED_USERS: self.send_response(403) self.send_header("Content-Type", "text/plain; charset=utf-8") self.end_headers() self.wfile.write( f"Not authorised: user '{username}' is not in the allowed users list.\n".encode() ) return # Create session session_token = secrets.token_urlsafe(48) _sessions[session_token] = { "user": username, "expires": time.time() + SESSION_TTL, } cookie_flags = _session_cookie_flags() self.send_response(302) self.send_header("Set-Cookie", f"{SESSION_COOKIE}={session_token}; {cookie_flags}") self.send_header("Location", "/chat/") self.end_headers() def serve_index(self): """Serve the main index.html file.""" index_path = os.path.join(UI_DIR, "index.html") if not os.path.exists(index_path): self.send_error_page(500, "UI not found") return try: with open(index_path, "r", encoding="utf-8") as f: content = f.read() self.send_response(200) self.send_header("Content-Type", MIME_TYPES[".html"]) self.send_header("Content-Length", len(content.encode("utf-8"))) self.end_headers() self.wfile.write(content.encode("utf-8")) except IOError as e: self.send_error_page(500, f"Error reading index.html: {e}") def serve_static(self, path): """Serve static files from the static directory.""" # Strip /chat/static/ or /static/ prefix if path.startswith("/chat/static/"): relative_path = path[len("/chat/static/"):] else: relative_path = path[len("/static/"):] if ".." in relative_path or relative_path.startswith("/"): self.send_error_page(403, "Forbidden") return file_path = os.path.join(STATIC_DIR, relative_path) if not os.path.exists(file_path): self.send_error_page(404, "Not found") return # Determine MIME type _, ext = os.path.splitext(file_path) content_type = MIME_TYPES.get(ext.lower(), "application/octet-stream") try: with open(file_path, "rb") as f: content = f.read() self.send_response(200) self.send_header("Content-Type", content_type) self.send_header("Content-Length", len(content)) self.end_headers() self.wfile.write(content) except IOError as e: self.send_error_page(500, f"Error reading file: {e}") def _send_rate_limit_response(self, retry_after, reason): """Send a 429 response with Retry-After header and HTMX fragment (#711).""" body = ( f'
' f"Rate limit exceeded: {reason}. " f"Please try again in {retry_after} seconds." f"
" ) self.send_response(429) self.send_header("Retry-After", str(retry_after)) self.send_header("Content-Type", "text/html; charset=utf-8") self.send_header("Content-Length", str(len(body.encode("utf-8")))) self.end_headers() self.wfile.write(body.encode("utf-8")) def handle_chat(self, user): """ Handle chat requests by spawning `claude --print` with the user message. Enforces per-user rate limits and tracks token usage (#711). """ # Check rate limits before processing (#711) allowed, retry_after, reason = _check_rate_limit(user) if not allowed: self._send_rate_limit_response(retry_after, reason) return # Read request body content_length = int(self.headers.get("Content-Length", 0)) if content_length == 0: self.send_error_page(400, "No message provided") return body = self.rfile.read(content_length) try: # Parse form-encoded body body_str = body.decode("utf-8") params = parse_qs(body_str) message = params.get("message", [""])[0] conv_id = params.get("conversation_id", [None])[0] except (UnicodeDecodeError, KeyError): self.send_error_page(400, "Invalid message format") return if not message: self.send_error_page(400, "Empty message") return # Get user from session user = _validate_session(self.headers.get("Cookie")) if not user: self.send_error_page(401, "Unauthorized") return # Validate Claude binary exists if not os.path.exists(CLAUDE_BIN): self.send_error_page(500, "Claude CLI not found") return # Generate new conversation ID if not provided if not conv_id or not _validate_conversation_id(conv_id): conv_id = _generate_conversation_id() # Record request for rate limiting (#711) _record_request(user) try: # Save user message to history _write_message(user, conv_id, "user", message) # Spawn claude --print with stream-json for token tracking (#711) proc = subprocess.Popen( [CLAUDE_BIN, "--print", "--output-format", "stream-json", message], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) raw_output = proc.stdout.read() error_output = proc.stderr.read() if error_output: print(f"Claude stderr: {error_output}", file=sys.stderr) proc.wait() if proc.returncode != 0: self.send_error_page(500, f"Claude CLI failed with exit code {proc.returncode}") return # Parse stream-json for text and token usage (#711) response, total_tokens = _parse_stream_json(raw_output) # Track token usage - does not block *this* request (#711) if total_tokens > 0: _record_tokens(user, total_tokens) print( f"Token usage: user={user} tokens={total_tokens}", file=sys.stderr, ) # Fall back to raw output if stream-json parsing yielded no text if not response: response = raw_output # Save assistant response to history _write_message(user, conv_id, "assistant", response) self.send_response(200) self.send_header("Content-Type", "application/json; charset=utf-8") self.end_headers() self.wfile.write(json.dumps({ "response": response, "conversation_id": conv_id, }, ensure_ascii=False).encode("utf-8")) except FileNotFoundError: self.send_error_page(500, "Claude CLI not found") except Exception as e: self.send_error_page(500, f"Error: {e}") # ======================================================================= # Conversation History Handlers # ======================================================================= def handle_conversation_list(self, user): """List all conversations for the logged-in user.""" conversations = _list_user_conversations(user) self.send_response(200) self.send_header("Content-Type", "application/json; charset=utf-8") self.end_headers() self.wfile.write(json.dumps(conversations, ensure_ascii=False).encode("utf-8")) def handle_conversation_get(self, user, conv_id): """Get a specific conversation for the logged-in user.""" # Validate conversation_id format if not _validate_conversation_id(conv_id): self.send_error_page(400, "Invalid conversation ID") return messages = _read_conversation(user, conv_id) if messages is None: self.send_error_page(404, "Conversation not found") return self.send_response(200) self.send_header("Content-Type", "application/json; charset=utf-8") self.end_headers() self.wfile.write(json.dumps(messages, ensure_ascii=False).encode("utf-8")) def handle_conversation_delete(self, user, conv_id): """Delete a specific conversation for the logged-in user.""" # Validate conversation_id format if not _validate_conversation_id(conv_id): self.send_error_page(400, "Invalid conversation ID") return if _delete_conversation(user, conv_id): self.send_response(204) # No Content self.end_headers() else: self.send_error_page(404, "Conversation not found") def handle_new_conversation(self, user): """Create a new conversation and return its ID.""" conv_id = _generate_conversation_id() self.send_response(200) self.send_header("Content-Type", "application/json; charset=utf-8") self.end_headers() self.wfile.write(json.dumps({"conversation_id": conv_id}, ensure_ascii=False).encode("utf-8")) def handle_action(self, user, path): """Handle action requests (ci-run, issue-create, pr-create).""" # Determine action type from path action_type = path.replace("/chat/action/", "") # Read request body content_length = int(self.headers.get("Content-Length", 0)) if content_length == 0: self.send_error_page(400, "No request body provided") return body = self.rfile.read(content_length) try: request_data = json.loads(body.decode("utf-8")) except json.JSONDecodeError: self.send_error_page(400, "Invalid JSON in request body") return # Get conversation ID from request or session conv_id = request_data.get("conversation_id") if not conv_id or not _validate_conversation_id(conv_id): # Fall back to session-based conversation if available # For now, we'll use a default or generate one conv_id = request_data.get("conversation_id") if not conv_id: self.send_error_page(400, "conversation_id is required") return # Route to appropriate handler if action_type == "ci-run": repo = request_data.get("repo") branch = request_data.get("branch") if not repo or not branch: self.send_error_page(400, "repo and branch are required for ci-run") return payload = {"repo": repo, "branch": branch} result = _trigger_woodpecker_pipeline(repo, branch) elif action_type == "issue-create": title = request_data.get("title") body_text = request_data.get("body", "") labels = request_data.get("labels", []) if not title: self.send_error_page(400, "title is required for issue-create") return payload = {"title": title, "body": body_text, "labels": labels} result = _create_forgejo_issue(title, body_text, labels) elif action_type == "pr-create": head = request_data.get("head") base = request_data.get("base") title = request_data.get("title") body_text = request_data.get("body", "") if not head or not base or not title: self.send_error_page(400, "head, base, and title are required for pr-create") return payload = {"head": head, "base": base, "title": title, "body": body_text} result = _create_forgejo_pull_request(head, base, title, body_text) else: self.send_error_page(404, f"Unknown action type: {action_type}") return # Log the action to history _write_action_record(user, conv_id, action_type, payload, result) # Send response self.send_response(200) self.send_header("Content-Type", "application/json; charset=utf-8") self.end_headers() self.wfile.write(json.dumps(result, ensure_ascii=False).encode("utf-8")) def do_DELETE(self): """Handle DELETE requests.""" parsed = urlparse(self.path) path = parsed.path # Delete conversation endpoint if path.startswith("/chat/history/"): user = self._require_session() if not user: return if not self._check_forwarded_user(user): return conv_id = path[len("/chat/history/"):] self.handle_conversation_delete(user, conv_id) return # 404 for unknown paths self.send_error_page(404, "Not found") def main(): """Start the HTTP server.""" server_address = (HOST, PORT) httpd = HTTPServer(server_address, ChatHandler) print(f"Starting disinto-chat server on {HOST}:{PORT}", file=sys.stderr) print(f"UI available at http://localhost:{PORT}/chat/", file=sys.stderr) if CHAT_OAUTH_CLIENT_ID: print(f"OAuth enabled (client_id={CHAT_OAUTH_CLIENT_ID[:8]}...)", file=sys.stderr) print(f"Allowed users: {', '.join(sorted(ALLOWED_USERS))}", file=sys.stderr) else: print("WARNING: CHAT_OAUTH_CLIENT_ID not set - OAuth disabled", file=sys.stderr) if FORWARD_AUTH_SECRET: print("forward_auth secret configured (#709)", file=sys.stderr) else: print("WARNING: FORWARD_AUTH_SECRET not set - verify endpoint unrestricted", file=sys.stderr) print( f"Rate limits (#711): {CHAT_MAX_REQUESTS_PER_HOUR}/hr, " f"{CHAT_MAX_REQUESTS_PER_DAY}/day, " f"{CHAT_MAX_TOKENS_PER_DAY} tokens/day", file=sys.stderr, ) httpd.serve_forever() if __name__ == "__main__": main()