From 78a295f567d50f9599eb65940686b4d366eeff6d Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 20 Apr 2026 11:12:20 +0000 Subject: [PATCH 1/3] fix: vision(#623): automate subdomain fallback pivot if subpath routing fails (#1028) Co-Authored-By: Claude Opus 4.6 (1M context) --- bin/disinto | 53 +++++++++++++++++++----- docker/chat/server.py | 4 ++ lib/ci-setup.sh | 16 ++++++- lib/generators.sh | 76 +++++++++++++++++++++++++++++++--- projects/disinto.toml.example | 17 ++++++++ tools/edge-control/register.sh | 39 +++++++++++++---- 6 files changed, 179 insertions(+), 26 deletions(-) diff --git a/bin/disinto b/bin/disinto index bf8edc4..18179df 100755 --- a/bin/disinto +++ b/bin/disinto @@ -1488,15 +1488,28 @@ p.write_text(text) touch "${FACTORY_ROOT}/.env" fi - # Configure Forgejo and Woodpecker subpath URLs when EDGE_TUNNEL_FQDN is set + # Configure Forgejo and Woodpecker URLs when EDGE_TUNNEL_FQDN is set. + # In subdomain mode, uses per-service FQDNs at root path instead of subpath URLs. if [ -n "${EDGE_TUNNEL_FQDN:-}" ]; then - # Forgejo ROOT_URL with /forge/ subpath (note trailing slash - Forgejo needs it) - if ! grep -q '^FORGEJO_ROOT_URL=' "${FACTORY_ROOT}/.env" 2>/dev/null; then - echo "FORGEJO_ROOT_URL=https://${EDGE_TUNNEL_FQDN}/forge/" >> "${FACTORY_ROOT}/.env" - fi - # Woodpecker WOODPECKER_HOST with /ci subpath (no trailing slash for v3) - if ! grep -q '^WOODPECKER_HOST=' "${FACTORY_ROOT}/.env" 2>/dev/null; then - echo "WOODPECKER_HOST=https://${EDGE_TUNNEL_FQDN}/ci" >> "${FACTORY_ROOT}/.env" + local routing_mode="${EDGE_ROUTING_MODE:-subpath}" + if [ "$routing_mode" = "subdomain" ]; then + # Subdomain mode: Forgejo at forge..disinto.ai (root path) + if ! grep -q '^FORGEJO_ROOT_URL=' "${FACTORY_ROOT}/.env" 2>/dev/null; then + echo "FORGEJO_ROOT_URL=https://${EDGE_TUNNEL_FQDN_FORGE:-forge.${EDGE_TUNNEL_FQDN}}/" >> "${FACTORY_ROOT}/.env" + fi + # Subdomain mode: Woodpecker at ci..disinto.ai (root path) + if ! grep -q '^WOODPECKER_HOST=' "${FACTORY_ROOT}/.env" 2>/dev/null; then + echo "WOODPECKER_HOST=https://${EDGE_TUNNEL_FQDN_CI:-ci.${EDGE_TUNNEL_FQDN}}" >> "${FACTORY_ROOT}/.env" + fi + else + # Subpath mode: Forgejo ROOT_URL with /forge/ subpath (trailing slash required) + if ! grep -q '^FORGEJO_ROOT_URL=' "${FACTORY_ROOT}/.env" 2>/dev/null; then + echo "FORGEJO_ROOT_URL=https://${EDGE_TUNNEL_FQDN}/forge/" >> "${FACTORY_ROOT}/.env" + fi + # Subpath mode: Woodpecker WOODPECKER_HOST with /ci subpath (no trailing slash for v3) + if ! grep -q '^WOODPECKER_HOST=' "${FACTORY_ROOT}/.env" 2>/dev/null; then + echo "WOODPECKER_HOST=https://${EDGE_TUNNEL_FQDN}/ci" >> "${FACTORY_ROOT}/.env" + fi fi fi @@ -1603,9 +1616,15 @@ p.write_text(text) create_woodpecker_oauth "$forge_url" "$forge_repo" # Create OAuth2 app on Forgejo for disinto-chat (#708) + # In subdomain mode, callback is at chat. root instead of /chat/ subpath. local chat_redirect_uri if [ -n "${EDGE_TUNNEL_FQDN:-}" ]; then - chat_redirect_uri="https://${EDGE_TUNNEL_FQDN}/chat/oauth/callback" + local chat_routing_mode="${EDGE_ROUTING_MODE:-subpath}" + if [ "$chat_routing_mode" = "subdomain" ]; then + chat_redirect_uri="https://${EDGE_TUNNEL_FQDN_CHAT:-chat.${EDGE_TUNNEL_FQDN}}/oauth/callback" + else + chat_redirect_uri="https://${EDGE_TUNNEL_FQDN}/chat/oauth/callback" + fi else chat_redirect_uri="http://localhost/chat/oauth/callback" fi @@ -2805,15 +2824,29 @@ disinto_edge() { # Write to .env (replace existing entries to avoid duplicates) local tmp_env tmp_env=$(mktemp) - grep -Ev "^EDGE_TUNNEL_(HOST|PORT|FQDN)=" "$env_file" > "$tmp_env" 2>/dev/null || true + grep -Ev "^EDGE_TUNNEL_(HOST|PORT|FQDN|FQDN_FORGE|FQDN_CI|FQDN_CHAT)=" "$env_file" > "$tmp_env" 2>/dev/null || true mv "$tmp_env" "$env_file" echo "EDGE_TUNNEL_HOST=${edge_host}" >> "$env_file" echo "EDGE_TUNNEL_PORT=${port}" >> "$env_file" echo "EDGE_TUNNEL_FQDN=${fqdn}" >> "$env_file" + # Subdomain mode: write per-service FQDNs (#1028) + local reg_routing_mode="${EDGE_ROUTING_MODE:-subpath}" + if [ "$reg_routing_mode" = "subdomain" ]; then + echo "EDGE_TUNNEL_FQDN_FORGE=forge.${fqdn}" >> "$env_file" + echo "EDGE_TUNNEL_FQDN_CI=ci.${fqdn}" >> "$env_file" + echo "EDGE_TUNNEL_FQDN_CHAT=chat.${fqdn}" >> "$env_file" + fi + echo "Registered: ${project}" echo " Port: ${port}" echo " FQDN: ${fqdn}" + if [ "$reg_routing_mode" = "subdomain" ]; then + echo " Mode: subdomain" + echo " Forge: forge.${fqdn}" + echo " CI: ci.${fqdn}" + echo " Chat: chat.${fqdn}" + fi echo " Saved to: ${env_file}" ;; diff --git a/docker/chat/server.py b/docker/chat/server.py index 6748354..ef37fb1 100644 --- a/docker/chat/server.py +++ b/docker/chat/server.py @@ -45,6 +45,8 @@ FORGE_URL = os.environ.get("FORGE_URL", "http://localhost:3000") CHAT_OAUTH_CLIENT_ID = os.environ.get("CHAT_OAUTH_CLIENT_ID", "") CHAT_OAUTH_CLIENT_SECRET = os.environ.get("CHAT_OAUTH_CLIENT_SECRET", "") EDGE_TUNNEL_FQDN = os.environ.get("EDGE_TUNNEL_FQDN", "") +EDGE_TUNNEL_FQDN_CHAT = os.environ.get("EDGE_TUNNEL_FQDN_CHAT", "") +EDGE_ROUTING_MODE = os.environ.get("EDGE_ROUTING_MODE", "subpath") # Shared secret for Caddy forward_auth verify endpoint (#709). # When set, only requests carrying this value in X-Forward-Auth-Secret are @@ -102,6 +104,8 @@ MIME_TYPES = { def _build_callback_uri(): """Build the OAuth callback URI based on tunnel configuration.""" + if EDGE_ROUTING_MODE == "subdomain" and EDGE_TUNNEL_FQDN_CHAT: + return f"https://{EDGE_TUNNEL_FQDN_CHAT}/oauth/callback" if EDGE_TUNNEL_FQDN: return f"https://{EDGE_TUNNEL_FQDN}/chat/oauth/callback" return "http://localhost/chat/oauth/callback" diff --git a/lib/ci-setup.sh b/lib/ci-setup.sh index 319e83e..507affb 100644 --- a/lib/ci-setup.sh +++ b/lib/ci-setup.sh @@ -142,6 +142,7 @@ _create_forgejo_oauth_app() { # Set up Woodpecker CI to use Forgejo as its forge backend. # Creates an OAuth2 app on Forgejo for Woodpecker, activates the repo. +# Respects EDGE_ROUTING_MODE: in subdomain mode, uses EDGE_TUNNEL_FQDN_CI for redirect URI. # Usage: create_woodpecker_oauth _create_woodpecker_oauth_impl() { local forge_url="$1" @@ -150,7 +151,13 @@ _create_woodpecker_oauth_impl() { echo "" echo "── Woodpecker OAuth2 setup ────────────────────────────" - _create_forgejo_oauth_app "woodpecker-ci" "http://localhost:8000/authorize" || return 0 + local wp_redirect_uri="http://localhost:8000/authorize" + local routing_mode="${EDGE_ROUTING_MODE:-subpath}" + if [ "$routing_mode" = "subdomain" ] && [ -n "${EDGE_TUNNEL_FQDN_CI:-}" ]; then + wp_redirect_uri="https://${EDGE_TUNNEL_FQDN_CI}/authorize" + fi + + _create_forgejo_oauth_app "woodpecker-ci" "$wp_redirect_uri" || return 0 local client_id="${_OAUTH_CLIENT_ID}" local client_secret="${_OAUTH_CLIENT_SECRET}" @@ -158,10 +165,15 @@ _create_woodpecker_oauth_impl() { # WP_FORGEJO_CLIENT/SECRET match the docker-compose.yml variable references # WOODPECKER_HOST must be host-accessible URL to match OAuth2 redirect_uri local env_file="${FACTORY_ROOT}/.env" + local wp_host="http://localhost:8000" + if [ "$routing_mode" = "subdomain" ] && [ -n "${EDGE_TUNNEL_FQDN_CI:-}" ]; then + wp_host="https://${EDGE_TUNNEL_FQDN_CI}" + fi + local wp_vars=( "WOODPECKER_FORGEJO=true" "WOODPECKER_FORGEJO_URL=${forge_url}" - "WOODPECKER_HOST=http://localhost:8000" + "WOODPECKER_HOST=${wp_host}" ) if [ -n "${client_id:-}" ]; then wp_vars+=("WP_FORGEJO_CLIENT=${client_id}") diff --git a/lib/generators.sh b/lib/generators.sh index eb223e8..739ca50 100644 --- a/lib/generators.sh +++ b/lib/generators.sh @@ -607,9 +607,12 @@ COMPOSEEOF - EDGE_TUNNEL_USER=${EDGE_TUNNEL_USER:-tunnel} - EDGE_TUNNEL_PORT=${EDGE_TUNNEL_PORT:-} - EDGE_TUNNEL_FQDN=${EDGE_TUNNEL_FQDN:-} - # Subdomain fallback (#713): if subpath routing (#704/#708) fails, add: - # EDGE_TUNNEL_FQDN_FORGE, EDGE_TUNNEL_FQDN_CI, EDGE_TUNNEL_FQDN_CHAT - # See docs/edge-routing-fallback.md for the full pivot plan. + # Subdomain fallback (#1028): per-service FQDNs for subdomain routing mode. + # Set EDGE_ROUTING_MODE=subdomain to activate. See docs/edge-routing-fallback.md. + - EDGE_ROUTING_MODE=${EDGE_ROUTING_MODE:-subpath} + - EDGE_TUNNEL_FQDN_FORGE=${EDGE_TUNNEL_FQDN_FORGE:-} + - EDGE_TUNNEL_FQDN_CI=${EDGE_TUNNEL_FQDN_CI:-} + - EDGE_TUNNEL_FQDN_CHAT=${EDGE_TUNNEL_FQDN_CHAT:-} # Shared secret for Caddy ↔ chat forward_auth (#709) - FORWARD_AUTH_SECRET=${FORWARD_AUTH_SECRET:-} volumes: @@ -700,6 +703,8 @@ COMPOSEEOF CHAT_OAUTH_CLIENT_ID: ${CHAT_OAUTH_CLIENT_ID:-} CHAT_OAUTH_CLIENT_SECRET: ${CHAT_OAUTH_CLIENT_SECRET:-} EDGE_TUNNEL_FQDN: ${EDGE_TUNNEL_FQDN:-} + EDGE_TUNNEL_FQDN_CHAT: ${EDGE_TUNNEL_FQDN_CHAT:-} + EDGE_ROUTING_MODE: ${EDGE_ROUTING_MODE:-subpath} DISINTO_CHAT_ALLOWED_USERS: ${DISINTO_CHAT_ALLOWED_USERS:-} # Shared secret for Caddy forward_auth verify endpoint (#709) FORWARD_AUTH_SECRET: ${FORWARD_AUTH_SECRET:-} @@ -805,6 +810,11 @@ _generate_agent_docker_impl() { # Output path: ${FACTORY_ROOT}/docker/Caddyfile (gitignored — generated artifact). # The edge compose service mounts this path as /etc/caddy/Caddyfile. # On a fresh clone, `disinto init` calls generate_caddyfile before first `disinto up`. +# +# Routing mode (EDGE_ROUTING_MODE env var): +# subpath — (default) all services under .disinto.ai/{forge,ci,chat,staging} +# subdomain — per-service subdomains: forge., ci., chat. +# See docs/edge-routing-fallback.md for the full pivot plan. _generate_caddyfile_impl() { local docker_dir="${FACTORY_ROOT}/docker" local caddyfile="${docker_dir}/Caddyfile" @@ -814,8 +824,22 @@ _generate_caddyfile_impl() { return fi + local routing_mode="${EDGE_ROUTING_MODE:-subpath}" + + if [ "$routing_mode" = "subdomain" ]; then + _generate_caddyfile_subdomain "$caddyfile" + else + _generate_caddyfile_subpath "$caddyfile" + fi + + echo "Created: ${caddyfile} (routing_mode=${routing_mode})" +} + +# Subpath Caddyfile: all services under a single :80 block with path-based routing. +_generate_caddyfile_subpath() { + local caddyfile="$1" cat > "$caddyfile" <<'CADDYFILEEOF' -# Caddyfile — edge proxy configuration +# Caddyfile — edge proxy configuration (subpath mode) # IP-only binding at bootstrap; domain + TLS added later via vault resource request :80 { @@ -858,8 +882,50 @@ _generate_caddyfile_impl() { } } CADDYFILEEOF +} - echo "Created: ${caddyfile}" +# Subdomain Caddyfile: four host blocks per docs/edge-routing-fallback.md. +# Uses env vars EDGE_TUNNEL_FQDN_FORGE, EDGE_TUNNEL_FQDN_CI, EDGE_TUNNEL_FQDN_CHAT, +# and EDGE_TUNNEL_FQDN (main project domain → staging). +_generate_caddyfile_subdomain() { + local caddyfile="$1" + cat > "$caddyfile" <<'CADDYFILEEOF' +# Caddyfile — edge proxy configuration (subdomain mode) +# Per-service subdomains; see docs/edge-routing-fallback.md + +# Main project domain — staging / landing +{$EDGE_TUNNEL_FQDN} { + reverse_proxy staging:80 +} + +# Forgejo — root path, no subpath rewrite needed +{$EDGE_TUNNEL_FQDN_FORGE} { + reverse_proxy forgejo:3000 +} + +# Woodpecker CI — root path +{$EDGE_TUNNEL_FQDN_CI} { + reverse_proxy woodpecker:8000 +} + +# Chat — with forward_auth (#709, on its own host) +{$EDGE_TUNNEL_FQDN_CHAT} { + handle /login { + reverse_proxy chat:8080 + } + handle /oauth/callback { + reverse_proxy chat:8080 + } + handle /* { + forward_auth chat:8080 { + uri /auth/verify + copy_headers X-Forwarded-User + header_up X-Forward-Auth-Secret {$FORWARD_AUTH_SECRET} + } + reverse_proxy chat:8080 + } +} +CADDYFILEEOF } # Generate docker/index.html default page. diff --git a/projects/disinto.toml.example b/projects/disinto.toml.example index ebe6eed..34eacae 100644 --- a/projects/disinto.toml.example +++ b/projects/disinto.toml.example @@ -59,6 +59,23 @@ check_pipeline_stall = false # compact_pct = 60 # poll_interval = 60 +# Edge routing mode (default: subpath) +# +# Controls how services are exposed through the edge proxy. +# subpath — all services under .disinto.ai/{forge,ci,chat,staging} +# subdomain — per-service subdomains: forge., ci., chat. +# +# Set to "subdomain" if subpath routing causes unfixable issues (redirect loops, +# OAuth callback mismatches, cookie collisions). See docs/edge-routing-fallback.md. +# +# Set in .env (not TOML) since it's consumed by docker-compose and shell scripts: +# EDGE_ROUTING_MODE=subdomain +# +# In subdomain mode, `disinto edge register` also writes: +# EDGE_TUNNEL_FQDN_FORGE=forge..disinto.ai +# EDGE_TUNNEL_FQDN_CI=ci..disinto.ai +# EDGE_TUNNEL_FQDN_CHAT=chat..disinto.ai + # [mirrors] # github = "git@github.com:johba/disinto.git" # codeberg = "git@codeberg.org:johba/disinto.git" diff --git a/tools/edge-control/register.sh b/tools/edge-control/register.sh index 3ac0d09..ee12ef7 100755 --- a/tools/edge-control/register.sh +++ b/tools/edge-control/register.sh @@ -39,13 +39,10 @@ EOF exit 1 } -# TODO(#713): Subdomain fallback — if subpath routing (#704/#708) fails, this -# function would need to register additional routes for forge., -# ci., chat. subdomains (or accept a --subdomain parameter). -# See docs/edge-routing-fallback.md for the full pivot plan. - # Register a new tunnel # Usage: do_register +# When EDGE_ROUTING_MODE=subdomain, also registers forge., ci., +# and chat. subdomain routes (see docs/edge-routing-fallback.md). do_register() { local project="$1" local pubkey="$2" @@ -79,17 +76,32 @@ do_register() { local port port=$(allocate_port "$project" "$full_pubkey" "${project}.${DOMAIN_SUFFIX}") - # Add Caddy route + # Add Caddy route for main project domain add_route "$project" "$port" + # Subdomain mode: register additional routes for per-service subdomains + local routing_mode="${EDGE_ROUTING_MODE:-subpath}" + if [ "$routing_mode" = "subdomain" ]; then + local subdomain + for subdomain in forge ci chat; do + add_route "${subdomain}.${project}" "$port" + done + fi + # Rebuild authorized_keys for tunnel user rebuild_authorized_keys # Reload Caddy reload_caddy - # Return JSON response - echo "{\"port\":${port},\"fqdn\":\"${project}.${DOMAIN_SUFFIX}\"}" + # Build JSON response + local response="{\"port\":${port},\"fqdn\":\"${project}.${DOMAIN_SUFFIX}\"" + if [ "$routing_mode" = "subdomain" ]; then + response="${response},\"routing_mode\":\"subdomain\"" + response="${response},\"subdomains\":{\"forge\":\"forge.${project}.${DOMAIN_SUFFIX}\",\"ci\":\"ci.${project}.${DOMAIN_SUFFIX}\",\"chat\":\"chat.${project}.${DOMAIN_SUFFIX}\"}" + fi + response="${response}}" + echo "$response" } # Deregister a tunnel @@ -109,9 +121,18 @@ do_deregister() { # Remove from registry free_port "$project" >/dev/null - # Remove Caddy route + # Remove Caddy route for main project domain remove_route "$project" + # Subdomain mode: also remove per-service subdomain routes + local routing_mode="${EDGE_ROUTING_MODE:-subpath}" + if [ "$routing_mode" = "subdomain" ]; then + local subdomain + for subdomain in forge ci chat; do + remove_route "${subdomain}.${project}" + done + fi + # Rebuild authorized_keys for tunnel user rebuild_authorized_keys From 17e745376d9a82831e481c89277863d7fcb2e63e Mon Sep 17 00:00:00 2001 From: Agent Date: Mon, 20 Apr 2026 11:09:00 +0000 Subject: [PATCH 2/3] fix: vision(#623): WebSocket streaming for chat UI to replace one-shot claude --print (#1026) --- docker/chat/server.py | 435 +++++++++++++++++++++++++++++++++++++- docker/chat/ui/index.html | 117 ++++++++++ nomad/jobs/edge.hcl | 6 + 3 files changed, 551 insertions(+), 7 deletions(-) diff --git a/docker/chat/server.py b/docker/chat/server.py index ef37fb1..85834f5 100644 --- a/docker/chat/server.py +++ b/docker/chat/server.py @@ -22,6 +22,7 @@ OAuth flow: The claude binary is expected to be mounted from the host at /usr/local/bin/claude. """ +import asyncio import datetime import json import os @@ -30,8 +31,14 @@ import secrets import subprocess import sys import time +import threading from http.server import HTTPServer, BaseHTTPRequestHandler +from socketserver import ThreadingMixIn from urllib.parse import urlparse, parse_qs, urlencode +import socket +import struct +import base64 +import hashlib # Configuration HOST = os.environ.get("CHAT_HOST", "0.0.0.0") @@ -89,6 +96,10 @@ _request_log = {} # user -> {"tokens": int, "date": "YYYY-MM-DD"} _daily_tokens = {} +# WebSocket message queues per user +# user -> asyncio.Queue (for streaming messages to connected clients) +_websocket_queues = {} + # MIME types for static files MIME_TYPES = { ".html": "text/html; charset=utf-8", @@ -101,6 +112,17 @@ MIME_TYPES = { ".ico": "image/x-icon", } +# WebSocket subprotocol for chat streaming +WEBSOCKET_SUBPROTOCOL = "chat-stream-v1" + +# WebSocket opcodes +OPCODE_CONTINUATION = 0x0 +OPCODE_TEXT = 0x1 +OPCODE_BINARY = 0x2 +OPCODE_CLOSE = 0x8 +OPCODE_PING = 0x9 +OPCODE_PONG = 0xA + def _build_callback_uri(): """Build the OAuth callback URI based on tunnel configuration.""" @@ -299,6 +321,257 @@ def _parse_stream_json(output): return "".join(text_parts), total_tokens +# ============================================================================= +# WebSocket Handler Class +# ============================================================================= + +class _WebSocketHandler: + """Handle WebSocket connections for chat streaming.""" + + def __init__(self, reader, writer, user, message_queue): + self.reader = reader + self.writer = writer + self.user = user + self.message_queue = message_queue + self.closed = False + + async def accept_connection(self): + """Accept the WebSocket handshake.""" + # Read the HTTP request + request_line = await self._read_line() + if not request_line.startswith("GET "): + self._close_connection() + return False + + # Parse the request + headers = {} + while True: + line = await self._read_line() + if line == "": + break + if ":" in line: + key, value = line.split(":", 1) + headers[key.strip().lower()] = value.strip() + + # Validate WebSocket upgrade + if headers.get("upgrade", "").lower() != "websocket": + self._send_http_error(400, "Bad Request", "WebSocket upgrade required") + self._close_connection() + return False + + if headers.get("connection", "").lower() != "upgrade": + self._send_http_error(400, "Bad Request", "Connection upgrade required") + self._close_connection() + return False + + # Get Sec-WebSocket-Key + sec_key = headers.get("sec-websocket-key", "") + if not sec_key: + self._send_http_error(400, "Bad Request", "Missing Sec-WebSocket-Key") + self._close_connection() + return False + + # Get Sec-WebSocket-Protocol if provided + sec_protocol = headers.get("sec-websocket-protocol", "") + + # Validate subprotocol + if sec_protocol and sec_protocol != WEBSOCKET_SUBPROTOCOL: + self._send_http_error( + 400, + "Bad Request", + f"Unsupported subprotocol. Expected: {WEBSOCKET_SUBPROTOCOL}", + ) + self._close_connection() + return False + + # Generate accept key + accept_key = self._generate_accept_key(sec_key) + + # Send handshake response + response = ( + "HTTP/1.1 101 Switching Protocols\r\n" + "Upgrade: websocket\r\n" + "Connection: Upgrade\r\n" + f"Sec-WebSocket-Accept: {accept_key}\r\n" + ) + + if sec_protocol: + response += f"Sec-WebSocket-Protocol: {sec_protocol}\r\n" + + response += "\r\n" + self.writer.write(response.encode("utf-8")) + await self.writer.drain() + return True + + def _generate_accept_key(self, sec_key): + """Generate the Sec-WebSocket-Accept key.""" + GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" + combined = sec_key + GUID + sha1 = hashlib.sha1(combined.encode("utf-8")) + return base64.b64encode(sha1.digest()).decode("utf-8") + + async def _read_line(self): + """Read a line from the socket.""" + data = await self.reader.read(1) + line = "" + while data: + if data == b"\r": + data = await self.reader.read(1) + continue + if data == b"\n": + return line + line += data.decode("utf-8", errors="replace") + data = await self.reader.read(1) + return line + + def _send_http_error(self, code, title, message): + """Send an HTTP error response.""" + response = ( + f"HTTP/1.1 {code} {title}\r\n" + "Content-Type: text/plain; charset=utf-8\r\n" + "Content-Length: " + str(len(message)) + "\r\n" + "\r\n" + + message + ) + try: + self.writer.write(response.encode("utf-8")) + self.writer.drain() + except Exception: + pass + + def _close_connection(self): + """Close the connection.""" + try: + self.writer.close() + except Exception: + pass + + async def send_text(self, data): + """Send a text frame.""" + if self.closed: + return + try: + frame = self._encode_frame(OPCODE_TEXT, data.encode("utf-8")) + self.writer.write(frame) + await self.writer.drain() + except Exception as e: + print(f"WebSocket send error: {e}", file=sys.stderr) + + async def send_binary(self, data): + """Send a binary frame.""" + if self.closed: + return + try: + if isinstance(data, str): + data = data.encode("utf-8") + frame = self._encode_frame(OPCODE_BINARY, data) + self.writer.write(frame) + await self.writer.drain() + except Exception as e: + print(f"WebSocket send error: {e}", file=sys.stderr) + + def _encode_frame(self, opcode, payload): + """Encode a WebSocket frame.""" + frame = bytearray() + frame.append(0x80 | opcode) # FIN + opcode + + length = len(payload) + if length < 126: + frame.append(length) + elif length < 65536: + frame.append(126) + frame.extend(struct.pack(">H", length)) + else: + frame.append(127) + frame.extend(struct.pack(">Q", length)) + + frame.extend(payload) + return bytes(frame) + + async def _decode_frame(self): + """Decode a WebSocket frame. Returns (opcode, payload).""" + try: + # Read first two bytes + header = await self.reader.read(2) + if len(header) < 2: + return None, None + + fin = (header[0] >> 7) & 1 + opcode = header[0] & 0x0F + masked = (header[1] >> 7) & 1 + length = header[1] & 0x7F + + # Extended payload length + if length == 126: + ext = await self.reader.read(2) + length = struct.unpack(">H", ext)[0] + elif length == 127: + ext = await self.reader.read(8) + length = struct.unpack(">Q", ext)[0] + + # Masking key + if masked: + mask_key = await self.reader.read(4) + + # Payload + payload = await self.reader.read(length) + + # Unmask if needed + if masked: + payload = bytes(b ^ mask_key[i % 4] for i, b in enumerate(payload)) + + return opcode, payload + except Exception as e: + print(f"WebSocket decode error: {e}", file=sys.stderr) + return None, None + + async def handle_connection(self): + """Handle the WebSocket connection loop.""" + try: + while not self.closed: + opcode, payload = await self._decode_frame() + if opcode is None: + break + + if opcode == OPCODE_CLOSE: + self._send_close() + break + elif opcode == OPCODE_PING: + self._send_pong(payload) + elif opcode == OPCODE_PONG: + pass # Ignore pong + elif opcode in (OPCODE_TEXT, OPCODE_BINARY): + # Handle text messages from client (e.g., heartbeat ack) + pass + + # Check if we should stop waiting for messages + if self.closed: + break + + except Exception as e: + print(f"WebSocket connection error: {e}", file=sys.stderr) + finally: + self._close_connection() + + def _send_close(self): + """Send a close frame.""" + try: + frame = self._encode_frame(OPCODE_CLOSE, b"\x03\x00") + self.writer.write(frame) + self.writer.drain() + except Exception: + pass + + def _send_pong(self, payload): + """Send a pong frame.""" + try: + frame = self._encode_frame(OPCODE_PONG, payload) + self.writer.write(frame) + self.writer.drain() + except Exception: + pass + + # ============================================================================= # Conversation History Functions (#710) # ============================================================================= @@ -548,9 +821,9 @@ class ChatHandler(BaseHTTPRequestHandler): self.serve_static(path) return - # Reserved WebSocket endpoint (future use) - if path == "/ws" or path.startswith("/ws"): - self.send_error_page(501, "WebSocket upgrade not yet implemented") + # WebSocket upgrade endpoint + if path == "/chat/ws" or path == "/ws" or path.startswith("/ws"): + self.handle_websocket_upgrade() return # 404 for unknown paths @@ -759,6 +1032,7 @@ class ChatHandler(BaseHTTPRequestHandler): """ Handle chat requests by spawning `claude --print` with the user message. Enforces per-user rate limits and tracks token usage (#711). + Streams tokens over WebSocket if connected. """ # Check rate limits before processing (#711) @@ -816,10 +1090,47 @@ class ChatHandler(BaseHTTPRequestHandler): stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, + bufsize=1, # Line buffered ) - raw_output = proc.stdout.read() + # Stream output line by line + response_parts = [] + total_tokens = 0 + for line in iter(proc.stdout.readline, ""): + line = line.strip() + if not line: + continue + try: + event = json.loads(line) + etype = event.get("type", "") + # Extract text content from content_block_delta events + if etype == "content_block_delta": + delta = event.get("delta", {}) + if delta.get("type") == "text_delta": + text = delta.get("text", "") + if text: + response_parts.append(text) + # Stream to WebSocket if connected + if user in _websocket_queues: + try: + _websocket_queues[user].put_nowait(text) + except Exception: + pass # Client disconnected + + # Parse usage from result event + if etype == "result": + usage = event.get("usage", {}) + total_tokens = usage.get("input_tokens", 0) + usage.get("output_tokens", 0) + elif "usage" in event: + usage = event["usage"] + if isinstance(usage, dict): + total_tokens = usage.get("input_tokens", 0) + usage.get("output_tokens", 0) + + except json.JSONDecodeError: + pass + + # Wait for process to complete error_output = proc.stderr.read() if error_output: print(f"Claude stderr: {error_output}", file=sys.stderr) @@ -830,8 +1141,8 @@ class ChatHandler(BaseHTTPRequestHandler): self.send_error_page(500, f"Claude CLI failed with exit code {proc.returncode}") return - # Parse stream-json for text and token usage (#711) - response, total_tokens = _parse_stream_json(raw_output) + # Combine response parts + response = "".join(response_parts) # Track token usage - does not block *this* request (#711) if total_tokens > 0: @@ -843,7 +1154,7 @@ class ChatHandler(BaseHTTPRequestHandler): # Fall back to raw output if stream-json parsing yielded no text if not response: - response = raw_output + response = proc.stdout.getvalue() if hasattr(proc.stdout, 'getvalue') else "" # Save assistant response to history _write_message(user, conv_id, "assistant", response) @@ -913,6 +1224,116 @@ class ChatHandler(BaseHTTPRequestHandler): self.end_headers() self.wfile.write(json.dumps({"conversation_id": conv_id}, ensure_ascii=False).encode("utf-8")) + @staticmethod + def push_to_websocket(user, message): + """Push a message to a WebSocket connection for a user. + + This is called from the chat handler to stream tokens to connected clients. + The message is added to the user's WebSocket message queue. + """ + # Get the message queue from the WebSocket handler's queue + # We store the queue in a global dict keyed by user + if user in _websocket_queues: + _websocket_queues[user].put_nowait(message) + + def handle_websocket_upgrade(self): + """Handle WebSocket upgrade request for chat streaming.""" + # Check session cookie + user = _validate_session(self.headers.get("Cookie")) + if not user: + self.send_error_page(401, "Unauthorized: no valid session") + return + + # Check rate limits before allowing WebSocket connection + allowed, retry_after, reason = _check_rate_limit(user) + if not allowed: + self.send_error_page( + 429, + f"Rate limit exceeded: {reason}. Retry after {retry_after}s", + ) + return + + # Record request for rate limiting + _record_request(user) + + # Create message queue for this user + _websocket_queues[user] = asyncio.Queue() + + # Get the socket from the connection + sock = self.connection + sock.setblocking(False) + reader = asyncio.StreamReader() + protocol = asyncio.StreamReaderProtocol(reader) + + # Create async server to handle the connection + async def handle_ws(): + try: + # Wrap the socket in asyncio streams + transport, _ = await asyncio.get_event_loop().create_connection( + lambda: protocol, + sock=sock, + ) + ws_reader = protocol._stream_reader + ws_writer = transport + + # Create WebSocket handler + ws_handler = _WebSocketHandler(ws_reader, ws_writer, user, _websocket_queues[user]) + + # Accept the connection + if not await ws_handler.accept_connection(): + return + + # Start a task to read from the queue and send to client + async def send_stream(): + while not ws_handler.closed: + try: + data = await asyncio.wait_for(ws_handler.message_queue.get(), timeout=1.0) + await ws_handler.send_text(data) + except asyncio.TimeoutError: + # Send ping to keep connection alive + try: + frame = ws_handler._encode_frame(OPCODE_PING, b"") + ws_writer.write(frame) + await ws_writer.drain() + except Exception: + break + except Exception as e: + print(f"Send stream error: {e}", file=sys.stderr) + break + + # Start sending task + send_task = asyncio.create_task(send_stream()) + + # Handle incoming WebSocket frames + await ws_handler.handle_connection() + + # Cancel send task + send_task.cancel() + try: + await send_task + except asyncio.CancelledError: + pass + + except Exception as e: + print(f"WebSocket handler error: {e}", file=sys.stderr) + finally: + try: + ws_writer.close() + await ws_writer.wait_closed() + except Exception: + pass + + # Run the async handler in a thread + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + loop.run_until_complete(handle_ws()) + except Exception as e: + print(f"WebSocket error: {e}", file=sys.stderr) + finally: + loop.close() + sock.close() + def do_DELETE(self): """Handle DELETE requests.""" parsed = urlparse(self.path) diff --git a/docker/chat/ui/index.html b/docker/chat/ui/index.html index bd920f9..b045873 100644 --- a/docker/chat/ui/index.html +++ b/docker/chat/ui/index.html @@ -430,6 +430,10 @@ return div.innerHTML.replace(/\n/g, '
'); } + // WebSocket connection for streaming + let ws = null; + let wsMessageId = null; + // Send message handler async function sendMessage() { const message = textarea.value.trim(); @@ -449,6 +453,14 @@ await createNewConversation(); } + // Try WebSocket streaming first, fall back to fetch + if (window.location.protocol === 'https:' || window.location.hostname === 'localhost') { + if (tryWebSocketSend(message)) { + return; + } + } + + // Fallback to fetch try { // Use fetch with URLSearchParams for application/x-www-form-urlencoded const params = new URLSearchParams(); @@ -485,6 +497,111 @@ } } + // Try to send message via WebSocket streaming + function tryWebSocketSend(message) { + try { + // Generate a unique message ID for this request + wsMessageId = Date.now().toString(36) + Math.random().toString(36).substr(2); + + // Connect to WebSocket + const wsUrl = window.location.protocol === 'https:' + ? `wss://${window.location.host}/chat/ws` + : `ws://${window.location.host}/chat/ws`; + + ws = new WebSocket(wsUrl); + + ws.onopen = function() { + // Send the message as JSON with message ID + const data = { + type: 'chat_request', + message_id: wsMessageId, + message: message, + conversation_id: currentConversationId + }; + ws.send(JSON.stringify(data)); + }; + + ws.onmessage = function(event) { + try { + const data = JSON.parse(event.data); + + if (data.type === 'token') { + // Stream a token to the UI + addTokenToLastMessage(data.token); + } else if (data.type === 'complete') { + // Streaming complete + closeWebSocket(); + textarea.disabled = false; + sendBtn.disabled = false; + sendBtn.textContent = 'Send'; + textarea.focus(); + messagesDiv.scrollTop = messagesDiv.scrollHeight; + loadConversations(); + } else if (data.type === 'error') { + addSystemMessage(`Error: ${data.message}`); + closeWebSocket(); + textarea.disabled = false; + sendBtn.disabled = false; + sendBtn.textContent = 'Send'; + textarea.focus(); + } + } catch (e) { + console.error('Failed to parse WebSocket message:', e); + } + }; + + ws.onerror = function(error) { + console.error('WebSocket error:', error); + addSystemMessage('WebSocket connection error. Falling back to regular chat.'); + closeWebSocket(); + sendMessage(); // Retry with fetch + }; + + ws.onclose = function() { + wsMessageId = null; + }; + + return true; // WebSocket attempt started + + } catch (error) { + console.error('Failed to create WebSocket:', error); + return false; // Fall back to fetch + } + } + + // Add a token to the last assistant message (for streaming) + function addTokenToLastMessage(token) { + const messages = messagesDiv.querySelectorAll('.message.assistant'); + if (messages.length === 0) { + // No assistant message yet, create one + const msgDiv = document.createElement('div'); + msgDiv.className = 'message assistant'; + msgDiv.innerHTML = ` +
assistant
+
+ `; + messagesDiv.appendChild(msgDiv); + } + + const lastMsg = messagesDiv.querySelector('.message.assistant .content.streaming'); + if (lastMsg) { + lastMsg.textContent += token; + messagesDiv.scrollTop = messagesDiv.scrollHeight; + } + } + + // Close WebSocket connection + function closeWebSocket() { + if (ws) { + ws.onopen = null; + ws.onmessage = null; + ws.onerror = null; + ws.onclose = null; + ws.close(); + ws = null; + } + } + // Event listeners sendBtn.addEventListener('click', sendMessage); diff --git a/nomad/jobs/edge.hcl b/nomad/jobs/edge.hcl index bf82b3d..afc57c3 100644 --- a/nomad/jobs/edge.hcl +++ b/nomad/jobs/edge.hcl @@ -172,6 +172,12 @@ EOT handle /chat/oauth/callback { reverse_proxy 127.0.0.1:8080 } + # WebSocket endpoint for streaming (#1026) + handle /chat/ws { + header_up Upgrade $http.upgrade + header_up Connection $http.connection + reverse_proxy 127.0.0.1:8080 + } # Defense-in-depth: forward_auth stamps X-Forwarded-User from session (#709) handle /chat/* { forward_auth 127.0.0.1:8080 { From 01f7d061bc9a74e25b94362a5b95721d70ad93df Mon Sep 17 00:00:00 2001 From: Agent Date: Mon, 20 Apr 2026 11:36:22 +0000 Subject: [PATCH 3/3] fix: WebSocket streaming - address all AI review findings (#1076) Fixes identified in AI review: - Blocker #1: Server now handles chat_request WebSocket frames and invokes Claude - Blocker #2: accept_connection() uses self.headers from BaseHTTPRequestHandler - Blocker #3: handle_websocket_upgrade() uses asyncio.open_connection() for proper StreamWriter - Medium #4: _decode_frame() uses readexactly() for all fixed-length reads - Medium #5: Message queue cleaned up on disconnect in handle_connection() finally block - Low #6: WebSocket close code corrected from 768 to 1000 - Low #7: _send_close() and _send_pong() are now async with proper await Changes: - Added _handle_chat_request() method to invoke Claude within WebSocket coroutine - Fixed _send_close() to use struct.pack for correct close code (1000) - Made _send_pong() async with proper await - Updated handle_connection() to call async close/pong methods and cleanup queue - Fixed handle_websocket_upgrade() to pass Sec-WebSocket-Key from HTTP headers - Replaced create_connection() with open_connection() for proper reader/writer --- docker/chat/server.py | 202 ++++++++++++++++++++++++++---------------- 1 file changed, 127 insertions(+), 75 deletions(-) diff --git a/docker/chat/server.py b/docker/chat/server.py index 85834f5..0623955 100644 --- a/docker/chat/server.py +++ b/docker/chat/server.py @@ -335,47 +335,14 @@ class _WebSocketHandler: self.message_queue = message_queue self.closed = False - async def accept_connection(self): - """Accept the WebSocket handshake.""" - # Read the HTTP request - request_line = await self._read_line() - if not request_line.startswith("GET "): - self._close_connection() - return False - - # Parse the request - headers = {} - while True: - line = await self._read_line() - if line == "": - break - if ":" in line: - key, value = line.split(":", 1) - headers[key.strip().lower()] = value.strip() - - # Validate WebSocket upgrade - if headers.get("upgrade", "").lower() != "websocket": - self._send_http_error(400, "Bad Request", "WebSocket upgrade required") - self._close_connection() - return False - - if headers.get("connection", "").lower() != "upgrade": - self._send_http_error(400, "Bad Request", "Connection upgrade required") - self._close_connection() - return False - - # Get Sec-WebSocket-Key - sec_key = headers.get("sec-websocket-key", "") - if not sec_key: - self._send_http_error(400, "Bad Request", "Missing Sec-WebSocket-Key") - self._close_connection() - return False - - # Get Sec-WebSocket-Protocol if provided - sec_protocol = headers.get("sec-websocket-protocol", "") + async def accept_connection(self, sec_websocket_key, sec_websocket_protocol=None): + """Accept the WebSocket handshake. + The HTTP request has already been parsed by BaseHTTPRequestHandler, + so we use the provided key and protocol instead of re-reading from socket. + """ # Validate subprotocol - if sec_protocol and sec_protocol != WEBSOCKET_SUBPROTOCOL: + if sec_websocket_protocol and sec_websocket_protocol != WEBSOCKET_SUBPROTOCOL: self._send_http_error( 400, "Bad Request", @@ -385,7 +352,7 @@ class _WebSocketHandler: return False # Generate accept key - accept_key = self._generate_accept_key(sec_key) + accept_key = self._generate_accept_key(sec_websocket_key) # Send handshake response response = ( @@ -395,8 +362,8 @@ class _WebSocketHandler: f"Sec-WebSocket-Accept: {accept_key}\r\n" ) - if sec_protocol: - response += f"Sec-WebSocket-Protocol: {sec_protocol}\r\n" + if sec_websocket_protocol: + response += f"Sec-WebSocket-Protocol: {sec_websocket_protocol}\r\n" response += "\r\n" self.writer.write(response.encode("utf-8")) @@ -491,10 +458,8 @@ class _WebSocketHandler: async def _decode_frame(self): """Decode a WebSocket frame. Returns (opcode, payload).""" try: - # Read first two bytes - header = await self.reader.read(2) - if len(header) < 2: - return None, None + # Read first two bytes (use readexactly for guaranteed length) + header = await self.reader.readexactly(2) fin = (header[0] >> 7) & 1 opcode = header[0] & 0x0F @@ -503,18 +468,18 @@ class _WebSocketHandler: # Extended payload length if length == 126: - ext = await self.reader.read(2) + ext = await self.reader.readexactly(2) length = struct.unpack(">H", ext)[0] elif length == 127: - ext = await self.reader.read(8) + ext = await self.reader.readexactly(8) length = struct.unpack(">Q", ext)[0] # Masking key if masked: - mask_key = await self.reader.read(4) + mask_key = await self.reader.readexactly(4) # Payload - payload = await self.reader.read(length) + payload = await self.reader.readexactly(length) # Unmask if needed if masked: @@ -534,15 +499,22 @@ class _WebSocketHandler: break if opcode == OPCODE_CLOSE: - self._send_close() + await self._send_close() break elif opcode == OPCODE_PING: - self._send_pong(payload) + await self._send_pong(payload) elif opcode == OPCODE_PONG: pass # Ignore pong elif opcode in (OPCODE_TEXT, OPCODE_BINARY): - # Handle text messages from client (e.g., heartbeat ack) - pass + # Handle text messages from client (e.g., chat_request) + try: + msg = payload.decode("utf-8") + data = json.loads(msg) + if data.get("type") == "chat_request": + # Invoke Claude with the message + await self._handle_chat_request(data.get("message", "")) + except (json.JSONDecodeError, UnicodeDecodeError): + pass # Check if we should stop waiting for messages if self.closed: @@ -552,25 +524,103 @@ class _WebSocketHandler: print(f"WebSocket connection error: {e}", file=sys.stderr) finally: self._close_connection() + # Clean up the message queue on disconnect + if self.user in _websocket_queues: + del _websocket_queues[self.user] - def _send_close(self): + async def _send_close(self): """Send a close frame.""" try: - frame = self._encode_frame(OPCODE_CLOSE, b"\x03\x00") + # Close code 1000 = normal closure + frame = self._encode_frame(OPCODE_CLOSE, struct.pack(">H", 1000)) self.writer.write(frame) - self.writer.drain() + await self.writer.drain() except Exception: pass - def _send_pong(self, payload): + async def _send_pong(self, payload): """Send a pong frame.""" try: frame = self._encode_frame(OPCODE_PONG, payload) self.writer.write(frame) - self.writer.drain() + await self.writer.drain() except Exception: pass + async def _handle_chat_request(self, message): + """Handle a chat_request WebSocket frame by invoking Claude.""" + if not message: + return + + # Validate Claude binary exists + if not os.path.exists(CLAUDE_BIN): + await self.send_text(json.dumps({ + "type": "error", + "message": "Claude CLI not found", + })) + return + + try: + # Spawn claude --print with stream-json for streaming output + proc = subprocess.Popen( + [CLAUDE_BIN, "--print", "--output-format", "stream-json", message], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + bufsize=1, + ) + + # Stream output line by line + for line in iter(proc.stdout.readline, ""): + line = line.strip() + if not line: + continue + try: + event = json.loads(line) + etype = event.get("type", "") + + # Extract text content from content_block_delta events + if etype == "content_block_delta": + delta = event.get("delta", {}) + if delta.get("type") == "text_delta": + text = delta.get("text", "") + if text: + # Send tokens to client + await self.send_text(text) + + # Check for usage event to know when complete + if etype == "result": + pass # Will send complete after loop + + except json.JSONDecodeError: + pass + + # Wait for process to complete + proc.wait() + + if proc.returncode != 0: + await self.send_text(json.dumps({ + "type": "error", + "message": f"Claude CLI failed with exit code {proc.returncode}", + })) + return + + # Send complete signal + await self.send_text(json.dumps({ + "type": "complete", + })) + + except FileNotFoundError: + await self.send_text(json.dumps({ + "type": "error", + "message": "Claude CLI not found", + })) + except Exception as e: + await self.send_text(json.dumps({ + "type": "error", + "message": str(e), + })) + # ============================================================================= # Conversation History Functions (#710) @@ -1259,28 +1309,30 @@ class ChatHandler(BaseHTTPRequestHandler): # Create message queue for this user _websocket_queues[user] = asyncio.Queue() + # Get WebSocket upgrade headers from the HTTP request + sec_websocket_key = self.headers.get("Sec-WebSocket-Key", "") + sec_websocket_protocol = self.headers.get("Sec-WebSocket-Protocol", "") + + # Validate Sec-WebSocket-Key + if not sec_websocket_key: + self.send_error_page(400, "Bad Request", "Missing Sec-WebSocket-Key") + return + # Get the socket from the connection sock = self.connection sock.setblocking(False) - reader = asyncio.StreamReader() - protocol = asyncio.StreamReaderProtocol(reader) # Create async server to handle the connection async def handle_ws(): try: - # Wrap the socket in asyncio streams - transport, _ = await asyncio.get_event_loop().create_connection( - lambda: protocol, - sock=sock, - ) - ws_reader = protocol._stream_reader - ws_writer = transport + # Wrap the socket in asyncio streams using open_connection + reader, writer = await asyncio.open_connection(sock=sock) # Create WebSocket handler - ws_handler = _WebSocketHandler(ws_reader, ws_writer, user, _websocket_queues[user]) + ws_handler = _WebSocketHandler(reader, writer, user, _websocket_queues[user]) - # Accept the connection - if not await ws_handler.accept_connection(): + # Accept the connection (pass headers from HTTP request) + if not await ws_handler.accept_connection(sec_websocket_key, sec_websocket_protocol): return # Start a task to read from the queue and send to client @@ -1293,8 +1345,8 @@ class ChatHandler(BaseHTTPRequestHandler): # Send ping to keep connection alive try: frame = ws_handler._encode_frame(OPCODE_PING, b"") - ws_writer.write(frame) - await ws_writer.drain() + writer.write(frame) + await writer.drain() except Exception: break except Exception as e: @@ -1318,8 +1370,8 @@ class ChatHandler(BaseHTTPRequestHandler): print(f"WebSocket handler error: {e}", file=sys.stderr) finally: try: - ws_writer.close() - await ws_writer.wait_closed() + writer.close() + await writer.wait_closed() except Exception: pass