Compare commits

...

4 commits

Author SHA1 Message Date
Agent
01f7d061bc fix: WebSocket streaming - address all AI review findings (#1076)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
ci/woodpecker/push/nomad-validate Pipeline was successful
ci/woodpecker/pr/ci Pipeline was successful
ci/woodpecker/pr/edge-subpath Pipeline was successful
ci/woodpecker/pr/nomad-validate Pipeline was successful
ci/woodpecker/pr/secret-scan Pipeline was successful
Fixes identified in AI review:
- Blocker #1: Server now handles chat_request WebSocket frames and invokes Claude
- Blocker #2: accept_connection() uses self.headers from BaseHTTPRequestHandler
- Blocker #3: handle_websocket_upgrade() uses asyncio.open_connection() for proper StreamWriter
- Medium #4: _decode_frame() uses readexactly() for all fixed-length reads
- Medium #5: Message queue cleaned up on disconnect in handle_connection() finally block
- Low #6: WebSocket close code corrected from 768 to 1000
- Low #7: _send_close() and _send_pong() are now async with proper await

Changes:
- Added _handle_chat_request() method to invoke Claude within WebSocket coroutine
- Fixed _send_close() to use struct.pack for correct close code (1000)
- Made _send_pong() async with proper await
- Updated handle_connection() to call async close/pong methods and cleanup queue
- Fixed handle_websocket_upgrade() to pass Sec-WebSocket-Key from HTTP headers
- Replaced create_connection() with open_connection() for proper reader/writer
2026-04-20 11:36:27 +00:00
Agent
17e745376d fix: vision(#623): WebSocket streaming for chat UI to replace one-shot claude --print (#1026) 2026-04-20 11:36:27 +00:00
aa87639356 Merge pull request 'fix: vision(#623): automate subdomain fallback pivot if subpath routing fails (#1028)' (#1078) from fix/issue-1028 into main
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
ci/woodpecker/push/nomad-validate Pipeline was successful
2026-04-20 11:28:44 +00:00
Claude
78a295f567 fix: vision(#623): automate subdomain fallback pivot if subpath routing fails (#1028)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
ci/woodpecker/push/nomad-validate Pipeline was successful
ci/woodpecker/pr/ci Pipeline was successful
ci/woodpecker/pr/edge-subpath Pipeline was successful
ci/woodpecker/pr/nomad-validate Pipeline was successful
ci/woodpecker/pr/smoke-init Pipeline was successful
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-20 11:12:20 +00:00
8 changed files with 782 additions and 33 deletions

View file

@ -1488,15 +1488,28 @@ p.write_text(text)
touch "${FACTORY_ROOT}/.env" touch "${FACTORY_ROOT}/.env"
fi fi
# Configure Forgejo and Woodpecker subpath URLs when EDGE_TUNNEL_FQDN is set # Configure Forgejo and Woodpecker URLs when EDGE_TUNNEL_FQDN is set.
# In subdomain mode, uses per-service FQDNs at root path instead of subpath URLs.
if [ -n "${EDGE_TUNNEL_FQDN:-}" ]; then if [ -n "${EDGE_TUNNEL_FQDN:-}" ]; then
# Forgejo ROOT_URL with /forge/ subpath (note trailing slash - Forgejo needs it) local routing_mode="${EDGE_ROUTING_MODE:-subpath}"
if ! grep -q '^FORGEJO_ROOT_URL=' "${FACTORY_ROOT}/.env" 2>/dev/null; then if [ "$routing_mode" = "subdomain" ]; then
echo "FORGEJO_ROOT_URL=https://${EDGE_TUNNEL_FQDN}/forge/" >> "${FACTORY_ROOT}/.env" # Subdomain mode: Forgejo at forge.<project>.disinto.ai (root path)
fi if ! grep -q '^FORGEJO_ROOT_URL=' "${FACTORY_ROOT}/.env" 2>/dev/null; then
# Woodpecker WOODPECKER_HOST with /ci subpath (no trailing slash for v3) echo "FORGEJO_ROOT_URL=https://${EDGE_TUNNEL_FQDN_FORGE:-forge.${EDGE_TUNNEL_FQDN}}/" >> "${FACTORY_ROOT}/.env"
if ! grep -q '^WOODPECKER_HOST=' "${FACTORY_ROOT}/.env" 2>/dev/null; then fi
echo "WOODPECKER_HOST=https://${EDGE_TUNNEL_FQDN}/ci" >> "${FACTORY_ROOT}/.env" # Subdomain mode: Woodpecker at ci.<project>.disinto.ai (root path)
if ! grep -q '^WOODPECKER_HOST=' "${FACTORY_ROOT}/.env" 2>/dev/null; then
echo "WOODPECKER_HOST=https://${EDGE_TUNNEL_FQDN_CI:-ci.${EDGE_TUNNEL_FQDN}}" >> "${FACTORY_ROOT}/.env"
fi
else
# Subpath mode: Forgejo ROOT_URL with /forge/ subpath (trailing slash required)
if ! grep -q '^FORGEJO_ROOT_URL=' "${FACTORY_ROOT}/.env" 2>/dev/null; then
echo "FORGEJO_ROOT_URL=https://${EDGE_TUNNEL_FQDN}/forge/" >> "${FACTORY_ROOT}/.env"
fi
# Subpath mode: Woodpecker WOODPECKER_HOST with /ci subpath (no trailing slash for v3)
if ! grep -q '^WOODPECKER_HOST=' "${FACTORY_ROOT}/.env" 2>/dev/null; then
echo "WOODPECKER_HOST=https://${EDGE_TUNNEL_FQDN}/ci" >> "${FACTORY_ROOT}/.env"
fi
fi fi
fi fi
@ -1603,9 +1616,15 @@ p.write_text(text)
create_woodpecker_oauth "$forge_url" "$forge_repo" create_woodpecker_oauth "$forge_url" "$forge_repo"
# Create OAuth2 app on Forgejo for disinto-chat (#708) # Create OAuth2 app on Forgejo for disinto-chat (#708)
# In subdomain mode, callback is at chat.<project> root instead of /chat/ subpath.
local chat_redirect_uri local chat_redirect_uri
if [ -n "${EDGE_TUNNEL_FQDN:-}" ]; then if [ -n "${EDGE_TUNNEL_FQDN:-}" ]; then
chat_redirect_uri="https://${EDGE_TUNNEL_FQDN}/chat/oauth/callback" local chat_routing_mode="${EDGE_ROUTING_MODE:-subpath}"
if [ "$chat_routing_mode" = "subdomain" ]; then
chat_redirect_uri="https://${EDGE_TUNNEL_FQDN_CHAT:-chat.${EDGE_TUNNEL_FQDN}}/oauth/callback"
else
chat_redirect_uri="https://${EDGE_TUNNEL_FQDN}/chat/oauth/callback"
fi
else else
chat_redirect_uri="http://localhost/chat/oauth/callback" chat_redirect_uri="http://localhost/chat/oauth/callback"
fi fi
@ -2805,15 +2824,29 @@ disinto_edge() {
# Write to .env (replace existing entries to avoid duplicates) # Write to .env (replace existing entries to avoid duplicates)
local tmp_env local tmp_env
tmp_env=$(mktemp) tmp_env=$(mktemp)
grep -Ev "^EDGE_TUNNEL_(HOST|PORT|FQDN)=" "$env_file" > "$tmp_env" 2>/dev/null || true grep -Ev "^EDGE_TUNNEL_(HOST|PORT|FQDN|FQDN_FORGE|FQDN_CI|FQDN_CHAT)=" "$env_file" > "$tmp_env" 2>/dev/null || true
mv "$tmp_env" "$env_file" mv "$tmp_env" "$env_file"
echo "EDGE_TUNNEL_HOST=${edge_host}" >> "$env_file" echo "EDGE_TUNNEL_HOST=${edge_host}" >> "$env_file"
echo "EDGE_TUNNEL_PORT=${port}" >> "$env_file" echo "EDGE_TUNNEL_PORT=${port}" >> "$env_file"
echo "EDGE_TUNNEL_FQDN=${fqdn}" >> "$env_file" echo "EDGE_TUNNEL_FQDN=${fqdn}" >> "$env_file"
# Subdomain mode: write per-service FQDNs (#1028)
local reg_routing_mode="${EDGE_ROUTING_MODE:-subpath}"
if [ "$reg_routing_mode" = "subdomain" ]; then
echo "EDGE_TUNNEL_FQDN_FORGE=forge.${fqdn}" >> "$env_file"
echo "EDGE_TUNNEL_FQDN_CI=ci.${fqdn}" >> "$env_file"
echo "EDGE_TUNNEL_FQDN_CHAT=chat.${fqdn}" >> "$env_file"
fi
echo "Registered: ${project}" echo "Registered: ${project}"
echo " Port: ${port}" echo " Port: ${port}"
echo " FQDN: ${fqdn}" echo " FQDN: ${fqdn}"
if [ "$reg_routing_mode" = "subdomain" ]; then
echo " Mode: subdomain"
echo " Forge: forge.${fqdn}"
echo " CI: ci.${fqdn}"
echo " Chat: chat.${fqdn}"
fi
echo " Saved to: ${env_file}" echo " Saved to: ${env_file}"
;; ;;

View file

@ -22,6 +22,7 @@ OAuth flow:
The claude binary is expected to be mounted from the host at /usr/local/bin/claude. The claude binary is expected to be mounted from the host at /usr/local/bin/claude.
""" """
import asyncio
import datetime import datetime
import json import json
import os import os
@ -30,8 +31,14 @@ import secrets
import subprocess import subprocess
import sys import sys
import time import time
import threading
from http.server import HTTPServer, BaseHTTPRequestHandler from http.server import HTTPServer, BaseHTTPRequestHandler
from socketserver import ThreadingMixIn
from urllib.parse import urlparse, parse_qs, urlencode from urllib.parse import urlparse, parse_qs, urlencode
import socket
import struct
import base64
import hashlib
# Configuration # Configuration
HOST = os.environ.get("CHAT_HOST", "0.0.0.0") HOST = os.environ.get("CHAT_HOST", "0.0.0.0")
@ -45,6 +52,8 @@ FORGE_URL = os.environ.get("FORGE_URL", "http://localhost:3000")
CHAT_OAUTH_CLIENT_ID = os.environ.get("CHAT_OAUTH_CLIENT_ID", "") CHAT_OAUTH_CLIENT_ID = os.environ.get("CHAT_OAUTH_CLIENT_ID", "")
CHAT_OAUTH_CLIENT_SECRET = os.environ.get("CHAT_OAUTH_CLIENT_SECRET", "") CHAT_OAUTH_CLIENT_SECRET = os.environ.get("CHAT_OAUTH_CLIENT_SECRET", "")
EDGE_TUNNEL_FQDN = os.environ.get("EDGE_TUNNEL_FQDN", "") EDGE_TUNNEL_FQDN = os.environ.get("EDGE_TUNNEL_FQDN", "")
EDGE_TUNNEL_FQDN_CHAT = os.environ.get("EDGE_TUNNEL_FQDN_CHAT", "")
EDGE_ROUTING_MODE = os.environ.get("EDGE_ROUTING_MODE", "subpath")
# Shared secret for Caddy forward_auth verify endpoint (#709). # Shared secret for Caddy forward_auth verify endpoint (#709).
# When set, only requests carrying this value in X-Forward-Auth-Secret are # When set, only requests carrying this value in X-Forward-Auth-Secret are
@ -87,6 +96,10 @@ _request_log = {}
# user -> {"tokens": int, "date": "YYYY-MM-DD"} # user -> {"tokens": int, "date": "YYYY-MM-DD"}
_daily_tokens = {} _daily_tokens = {}
# WebSocket message queues per user
# user -> asyncio.Queue (for streaming messages to connected clients)
_websocket_queues = {}
# MIME types for static files # MIME types for static files
MIME_TYPES = { MIME_TYPES = {
".html": "text/html; charset=utf-8", ".html": "text/html; charset=utf-8",
@ -99,9 +112,22 @@ MIME_TYPES = {
".ico": "image/x-icon", ".ico": "image/x-icon",
} }
# WebSocket subprotocol for chat streaming
WEBSOCKET_SUBPROTOCOL = "chat-stream-v1"
# WebSocket opcodes
OPCODE_CONTINUATION = 0x0
OPCODE_TEXT = 0x1
OPCODE_BINARY = 0x2
OPCODE_CLOSE = 0x8
OPCODE_PING = 0x9
OPCODE_PONG = 0xA
def _build_callback_uri(): def _build_callback_uri():
"""Build the OAuth callback URI based on tunnel configuration.""" """Build the OAuth callback URI based on tunnel configuration."""
if EDGE_ROUTING_MODE == "subdomain" and EDGE_TUNNEL_FQDN_CHAT:
return f"https://{EDGE_TUNNEL_FQDN_CHAT}/oauth/callback"
if EDGE_TUNNEL_FQDN: if EDGE_TUNNEL_FQDN:
return f"https://{EDGE_TUNNEL_FQDN}/chat/oauth/callback" return f"https://{EDGE_TUNNEL_FQDN}/chat/oauth/callback"
return "http://localhost/chat/oauth/callback" return "http://localhost/chat/oauth/callback"
@ -295,6 +321,307 @@ def _parse_stream_json(output):
return "".join(text_parts), total_tokens return "".join(text_parts), total_tokens
# =============================================================================
# WebSocket Handler Class
# =============================================================================
class _WebSocketHandler:
"""Handle WebSocket connections for chat streaming."""
def __init__(self, reader, writer, user, message_queue):
self.reader = reader
self.writer = writer
self.user = user
self.message_queue = message_queue
self.closed = False
async def accept_connection(self, sec_websocket_key, sec_websocket_protocol=None):
"""Accept the WebSocket handshake.
The HTTP request has already been parsed by BaseHTTPRequestHandler,
so we use the provided key and protocol instead of re-reading from socket.
"""
# Validate subprotocol
if sec_websocket_protocol and sec_websocket_protocol != WEBSOCKET_SUBPROTOCOL:
self._send_http_error(
400,
"Bad Request",
f"Unsupported subprotocol. Expected: {WEBSOCKET_SUBPROTOCOL}",
)
self._close_connection()
return False
# Generate accept key
accept_key = self._generate_accept_key(sec_websocket_key)
# Send handshake response
response = (
"HTTP/1.1 101 Switching Protocols\r\n"
"Upgrade: websocket\r\n"
"Connection: Upgrade\r\n"
f"Sec-WebSocket-Accept: {accept_key}\r\n"
)
if sec_websocket_protocol:
response += f"Sec-WebSocket-Protocol: {sec_websocket_protocol}\r\n"
response += "\r\n"
self.writer.write(response.encode("utf-8"))
await self.writer.drain()
return True
def _generate_accept_key(self, sec_key):
"""Generate the Sec-WebSocket-Accept key."""
GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
combined = sec_key + GUID
sha1 = hashlib.sha1(combined.encode("utf-8"))
return base64.b64encode(sha1.digest()).decode("utf-8")
async def _read_line(self):
"""Read a line from the socket."""
data = await self.reader.read(1)
line = ""
while data:
if data == b"\r":
data = await self.reader.read(1)
continue
if data == b"\n":
return line
line += data.decode("utf-8", errors="replace")
data = await self.reader.read(1)
return line
def _send_http_error(self, code, title, message):
"""Send an HTTP error response."""
response = (
f"HTTP/1.1 {code} {title}\r\n"
"Content-Type: text/plain; charset=utf-8\r\n"
"Content-Length: " + str(len(message)) + "\r\n"
"\r\n"
+ message
)
try:
self.writer.write(response.encode("utf-8"))
self.writer.drain()
except Exception:
pass
def _close_connection(self):
"""Close the connection."""
try:
self.writer.close()
except Exception:
pass
async def send_text(self, data):
"""Send a text frame."""
if self.closed:
return
try:
frame = self._encode_frame(OPCODE_TEXT, data.encode("utf-8"))
self.writer.write(frame)
await self.writer.drain()
except Exception as e:
print(f"WebSocket send error: {e}", file=sys.stderr)
async def send_binary(self, data):
"""Send a binary frame."""
if self.closed:
return
try:
if isinstance(data, str):
data = data.encode("utf-8")
frame = self._encode_frame(OPCODE_BINARY, data)
self.writer.write(frame)
await self.writer.drain()
except Exception as e:
print(f"WebSocket send error: {e}", file=sys.stderr)
def _encode_frame(self, opcode, payload):
"""Encode a WebSocket frame."""
frame = bytearray()
frame.append(0x80 | opcode) # FIN + opcode
length = len(payload)
if length < 126:
frame.append(length)
elif length < 65536:
frame.append(126)
frame.extend(struct.pack(">H", length))
else:
frame.append(127)
frame.extend(struct.pack(">Q", length))
frame.extend(payload)
return bytes(frame)
async def _decode_frame(self):
"""Decode a WebSocket frame. Returns (opcode, payload)."""
try:
# Read first two bytes (use readexactly for guaranteed length)
header = await self.reader.readexactly(2)
fin = (header[0] >> 7) & 1
opcode = header[0] & 0x0F
masked = (header[1] >> 7) & 1
length = header[1] & 0x7F
# Extended payload length
if length == 126:
ext = await self.reader.readexactly(2)
length = struct.unpack(">H", ext)[0]
elif length == 127:
ext = await self.reader.readexactly(8)
length = struct.unpack(">Q", ext)[0]
# Masking key
if masked:
mask_key = await self.reader.readexactly(4)
# Payload
payload = await self.reader.readexactly(length)
# Unmask if needed
if masked:
payload = bytes(b ^ mask_key[i % 4] for i, b in enumerate(payload))
return opcode, payload
except Exception as e:
print(f"WebSocket decode error: {e}", file=sys.stderr)
return None, None
async def handle_connection(self):
"""Handle the WebSocket connection loop."""
try:
while not self.closed:
opcode, payload = await self._decode_frame()
if opcode is None:
break
if opcode == OPCODE_CLOSE:
await self._send_close()
break
elif opcode == OPCODE_PING:
await self._send_pong(payload)
elif opcode == OPCODE_PONG:
pass # Ignore pong
elif opcode in (OPCODE_TEXT, OPCODE_BINARY):
# Handle text messages from client (e.g., chat_request)
try:
msg = payload.decode("utf-8")
data = json.loads(msg)
if data.get("type") == "chat_request":
# Invoke Claude with the message
await self._handle_chat_request(data.get("message", ""))
except (json.JSONDecodeError, UnicodeDecodeError):
pass
# Check if we should stop waiting for messages
if self.closed:
break
except Exception as e:
print(f"WebSocket connection error: {e}", file=sys.stderr)
finally:
self._close_connection()
# Clean up the message queue on disconnect
if self.user in _websocket_queues:
del _websocket_queues[self.user]
async def _send_close(self):
"""Send a close frame."""
try:
# Close code 1000 = normal closure
frame = self._encode_frame(OPCODE_CLOSE, struct.pack(">H", 1000))
self.writer.write(frame)
await self.writer.drain()
except Exception:
pass
async def _send_pong(self, payload):
"""Send a pong frame."""
try:
frame = self._encode_frame(OPCODE_PONG, payload)
self.writer.write(frame)
await self.writer.drain()
except Exception:
pass
async def _handle_chat_request(self, message):
"""Handle a chat_request WebSocket frame by invoking Claude."""
if not message:
return
# Validate Claude binary exists
if not os.path.exists(CLAUDE_BIN):
await self.send_text(json.dumps({
"type": "error",
"message": "Claude CLI not found",
}))
return
try:
# Spawn claude --print with stream-json for streaming output
proc = subprocess.Popen(
[CLAUDE_BIN, "--print", "--output-format", "stream-json", message],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
)
# Stream output line by line
for line in iter(proc.stdout.readline, ""):
line = line.strip()
if not line:
continue
try:
event = json.loads(line)
etype = event.get("type", "")
# Extract text content from content_block_delta events
if etype == "content_block_delta":
delta = event.get("delta", {})
if delta.get("type") == "text_delta":
text = delta.get("text", "")
if text:
# Send tokens to client
await self.send_text(text)
# Check for usage event to know when complete
if etype == "result":
pass # Will send complete after loop
except json.JSONDecodeError:
pass
# Wait for process to complete
proc.wait()
if proc.returncode != 0:
await self.send_text(json.dumps({
"type": "error",
"message": f"Claude CLI failed with exit code {proc.returncode}",
}))
return
# Send complete signal
await self.send_text(json.dumps({
"type": "complete",
}))
except FileNotFoundError:
await self.send_text(json.dumps({
"type": "error",
"message": "Claude CLI not found",
}))
except Exception as e:
await self.send_text(json.dumps({
"type": "error",
"message": str(e),
}))
# ============================================================================= # =============================================================================
# Conversation History Functions (#710) # Conversation History Functions (#710)
# ============================================================================= # =============================================================================
@ -544,9 +871,9 @@ class ChatHandler(BaseHTTPRequestHandler):
self.serve_static(path) self.serve_static(path)
return return
# Reserved WebSocket endpoint (future use) # WebSocket upgrade endpoint
if path == "/ws" or path.startswith("/ws"): if path == "/chat/ws" or path == "/ws" or path.startswith("/ws"):
self.send_error_page(501, "WebSocket upgrade not yet implemented") self.handle_websocket_upgrade()
return return
# 404 for unknown paths # 404 for unknown paths
@ -755,6 +1082,7 @@ class ChatHandler(BaseHTTPRequestHandler):
""" """
Handle chat requests by spawning `claude --print` with the user message. Handle chat requests by spawning `claude --print` with the user message.
Enforces per-user rate limits and tracks token usage (#711). Enforces per-user rate limits and tracks token usage (#711).
Streams tokens over WebSocket if connected.
""" """
# Check rate limits before processing (#711) # Check rate limits before processing (#711)
@ -812,10 +1140,47 @@ class ChatHandler(BaseHTTPRequestHandler):
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, stderr=subprocess.PIPE,
text=True, text=True,
bufsize=1, # Line buffered
) )
raw_output = proc.stdout.read() # Stream output line by line
response_parts = []
total_tokens = 0
for line in iter(proc.stdout.readline, ""):
line = line.strip()
if not line:
continue
try:
event = json.loads(line)
etype = event.get("type", "")
# Extract text content from content_block_delta events
if etype == "content_block_delta":
delta = event.get("delta", {})
if delta.get("type") == "text_delta":
text = delta.get("text", "")
if text:
response_parts.append(text)
# Stream to WebSocket if connected
if user in _websocket_queues:
try:
_websocket_queues[user].put_nowait(text)
except Exception:
pass # Client disconnected
# Parse usage from result event
if etype == "result":
usage = event.get("usage", {})
total_tokens = usage.get("input_tokens", 0) + usage.get("output_tokens", 0)
elif "usage" in event:
usage = event["usage"]
if isinstance(usage, dict):
total_tokens = usage.get("input_tokens", 0) + usage.get("output_tokens", 0)
except json.JSONDecodeError:
pass
# Wait for process to complete
error_output = proc.stderr.read() error_output = proc.stderr.read()
if error_output: if error_output:
print(f"Claude stderr: {error_output}", file=sys.stderr) print(f"Claude stderr: {error_output}", file=sys.stderr)
@ -826,8 +1191,8 @@ class ChatHandler(BaseHTTPRequestHandler):
self.send_error_page(500, f"Claude CLI failed with exit code {proc.returncode}") self.send_error_page(500, f"Claude CLI failed with exit code {proc.returncode}")
return return
# Parse stream-json for text and token usage (#711) # Combine response parts
response, total_tokens = _parse_stream_json(raw_output) response = "".join(response_parts)
# Track token usage - does not block *this* request (#711) # Track token usage - does not block *this* request (#711)
if total_tokens > 0: if total_tokens > 0:
@ -839,7 +1204,7 @@ class ChatHandler(BaseHTTPRequestHandler):
# Fall back to raw output if stream-json parsing yielded no text # Fall back to raw output if stream-json parsing yielded no text
if not response: if not response:
response = raw_output response = proc.stdout.getvalue() if hasattr(proc.stdout, 'getvalue') else ""
# Save assistant response to history # Save assistant response to history
_write_message(user, conv_id, "assistant", response) _write_message(user, conv_id, "assistant", response)
@ -909,6 +1274,118 @@ class ChatHandler(BaseHTTPRequestHandler):
self.end_headers() self.end_headers()
self.wfile.write(json.dumps({"conversation_id": conv_id}, ensure_ascii=False).encode("utf-8")) self.wfile.write(json.dumps({"conversation_id": conv_id}, ensure_ascii=False).encode("utf-8"))
@staticmethod
def push_to_websocket(user, message):
"""Push a message to a WebSocket connection for a user.
This is called from the chat handler to stream tokens to connected clients.
The message is added to the user's WebSocket message queue.
"""
# Get the message queue from the WebSocket handler's queue
# We store the queue in a global dict keyed by user
if user in _websocket_queues:
_websocket_queues[user].put_nowait(message)
def handle_websocket_upgrade(self):
"""Handle WebSocket upgrade request for chat streaming."""
# Check session cookie
user = _validate_session(self.headers.get("Cookie"))
if not user:
self.send_error_page(401, "Unauthorized: no valid session")
return
# Check rate limits before allowing WebSocket connection
allowed, retry_after, reason = _check_rate_limit(user)
if not allowed:
self.send_error_page(
429,
f"Rate limit exceeded: {reason}. Retry after {retry_after}s",
)
return
# Record request for rate limiting
_record_request(user)
# Create message queue for this user
_websocket_queues[user] = asyncio.Queue()
# Get WebSocket upgrade headers from the HTTP request
sec_websocket_key = self.headers.get("Sec-WebSocket-Key", "")
sec_websocket_protocol = self.headers.get("Sec-WebSocket-Protocol", "")
# Validate Sec-WebSocket-Key
if not sec_websocket_key:
self.send_error_page(400, "Bad Request", "Missing Sec-WebSocket-Key")
return
# Get the socket from the connection
sock = self.connection
sock.setblocking(False)
# Create async server to handle the connection
async def handle_ws():
try:
# Wrap the socket in asyncio streams using open_connection
reader, writer = await asyncio.open_connection(sock=sock)
# Create WebSocket handler
ws_handler = _WebSocketHandler(reader, writer, user, _websocket_queues[user])
# Accept the connection (pass headers from HTTP request)
if not await ws_handler.accept_connection(sec_websocket_key, sec_websocket_protocol):
return
# Start a task to read from the queue and send to client
async def send_stream():
while not ws_handler.closed:
try:
data = await asyncio.wait_for(ws_handler.message_queue.get(), timeout=1.0)
await ws_handler.send_text(data)
except asyncio.TimeoutError:
# Send ping to keep connection alive
try:
frame = ws_handler._encode_frame(OPCODE_PING, b"")
writer.write(frame)
await writer.drain()
except Exception:
break
except Exception as e:
print(f"Send stream error: {e}", file=sys.stderr)
break
# Start sending task
send_task = asyncio.create_task(send_stream())
# Handle incoming WebSocket frames
await ws_handler.handle_connection()
# Cancel send task
send_task.cancel()
try:
await send_task
except asyncio.CancelledError:
pass
except Exception as e:
print(f"WebSocket handler error: {e}", file=sys.stderr)
finally:
try:
writer.close()
await writer.wait_closed()
except Exception:
pass
# Run the async handler in a thread
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(handle_ws())
except Exception as e:
print(f"WebSocket error: {e}", file=sys.stderr)
finally:
loop.close()
sock.close()
def do_DELETE(self): def do_DELETE(self):
"""Handle DELETE requests.""" """Handle DELETE requests."""
parsed = urlparse(self.path) parsed = urlparse(self.path)

View file

@ -430,6 +430,10 @@
return div.innerHTML.replace(/\n/g, '<br>'); return div.innerHTML.replace(/\n/g, '<br>');
} }
// WebSocket connection for streaming
let ws = null;
let wsMessageId = null;
// Send message handler // Send message handler
async function sendMessage() { async function sendMessage() {
const message = textarea.value.trim(); const message = textarea.value.trim();
@ -449,6 +453,14 @@
await createNewConversation(); await createNewConversation();
} }
// Try WebSocket streaming first, fall back to fetch
if (window.location.protocol === 'https:' || window.location.hostname === 'localhost') {
if (tryWebSocketSend(message)) {
return;
}
}
// Fallback to fetch
try { try {
// Use fetch with URLSearchParams for application/x-www-form-urlencoded // Use fetch with URLSearchParams for application/x-www-form-urlencoded
const params = new URLSearchParams(); const params = new URLSearchParams();
@ -485,6 +497,111 @@
} }
} }
// Try to send message via WebSocket streaming
function tryWebSocketSend(message) {
try {
// Generate a unique message ID for this request
wsMessageId = Date.now().toString(36) + Math.random().toString(36).substr(2);
// Connect to WebSocket
const wsUrl = window.location.protocol === 'https:'
? `wss://${window.location.host}/chat/ws`
: `ws://${window.location.host}/chat/ws`;
ws = new WebSocket(wsUrl);
ws.onopen = function() {
// Send the message as JSON with message ID
const data = {
type: 'chat_request',
message_id: wsMessageId,
message: message,
conversation_id: currentConversationId
};
ws.send(JSON.stringify(data));
};
ws.onmessage = function(event) {
try {
const data = JSON.parse(event.data);
if (data.type === 'token') {
// Stream a token to the UI
addTokenToLastMessage(data.token);
} else if (data.type === 'complete') {
// Streaming complete
closeWebSocket();
textarea.disabled = false;
sendBtn.disabled = false;
sendBtn.textContent = 'Send';
textarea.focus();
messagesDiv.scrollTop = messagesDiv.scrollHeight;
loadConversations();
} else if (data.type === 'error') {
addSystemMessage(`Error: ${data.message}`);
closeWebSocket();
textarea.disabled = false;
sendBtn.disabled = false;
sendBtn.textContent = 'Send';
textarea.focus();
}
} catch (e) {
console.error('Failed to parse WebSocket message:', e);
}
};
ws.onerror = function(error) {
console.error('WebSocket error:', error);
addSystemMessage('WebSocket connection error. Falling back to regular chat.');
closeWebSocket();
sendMessage(); // Retry with fetch
};
ws.onclose = function() {
wsMessageId = null;
};
return true; // WebSocket attempt started
} catch (error) {
console.error('Failed to create WebSocket:', error);
return false; // Fall back to fetch
}
}
// Add a token to the last assistant message (for streaming)
function addTokenToLastMessage(token) {
const messages = messagesDiv.querySelectorAll('.message.assistant');
if (messages.length === 0) {
// No assistant message yet, create one
const msgDiv = document.createElement('div');
msgDiv.className = 'message assistant';
msgDiv.innerHTML = `
<div class="role">assistant</div>
<div class="content streaming"></div>
`;
messagesDiv.appendChild(msgDiv);
}
const lastMsg = messagesDiv.querySelector('.message.assistant .content.streaming');
if (lastMsg) {
lastMsg.textContent += token;
messagesDiv.scrollTop = messagesDiv.scrollHeight;
}
}
// Close WebSocket connection
function closeWebSocket() {
if (ws) {
ws.onopen = null;
ws.onmessage = null;
ws.onerror = null;
ws.onclose = null;
ws.close();
ws = null;
}
}
// Event listeners // Event listeners
sendBtn.addEventListener('click', sendMessage); sendBtn.addEventListener('click', sendMessage);

View file

@ -142,6 +142,7 @@ _create_forgejo_oauth_app() {
# Set up Woodpecker CI to use Forgejo as its forge backend. # Set up Woodpecker CI to use Forgejo as its forge backend.
# Creates an OAuth2 app on Forgejo for Woodpecker, activates the repo. # Creates an OAuth2 app on Forgejo for Woodpecker, activates the repo.
# Respects EDGE_ROUTING_MODE: in subdomain mode, uses EDGE_TUNNEL_FQDN_CI for redirect URI.
# Usage: create_woodpecker_oauth <forge_url> <repo_slug> # Usage: create_woodpecker_oauth <forge_url> <repo_slug>
_create_woodpecker_oauth_impl() { _create_woodpecker_oauth_impl() {
local forge_url="$1" local forge_url="$1"
@ -150,7 +151,13 @@ _create_woodpecker_oauth_impl() {
echo "" echo ""
echo "── Woodpecker OAuth2 setup ────────────────────────────" echo "── Woodpecker OAuth2 setup ────────────────────────────"
_create_forgejo_oauth_app "woodpecker-ci" "http://localhost:8000/authorize" || return 0 local wp_redirect_uri="http://localhost:8000/authorize"
local routing_mode="${EDGE_ROUTING_MODE:-subpath}"
if [ "$routing_mode" = "subdomain" ] && [ -n "${EDGE_TUNNEL_FQDN_CI:-}" ]; then
wp_redirect_uri="https://${EDGE_TUNNEL_FQDN_CI}/authorize"
fi
_create_forgejo_oauth_app "woodpecker-ci" "$wp_redirect_uri" || return 0
local client_id="${_OAUTH_CLIENT_ID}" local client_id="${_OAUTH_CLIENT_ID}"
local client_secret="${_OAUTH_CLIENT_SECRET}" local client_secret="${_OAUTH_CLIENT_SECRET}"
@ -158,10 +165,15 @@ _create_woodpecker_oauth_impl() {
# WP_FORGEJO_CLIENT/SECRET match the docker-compose.yml variable references # WP_FORGEJO_CLIENT/SECRET match the docker-compose.yml variable references
# WOODPECKER_HOST must be host-accessible URL to match OAuth2 redirect_uri # WOODPECKER_HOST must be host-accessible URL to match OAuth2 redirect_uri
local env_file="${FACTORY_ROOT}/.env" local env_file="${FACTORY_ROOT}/.env"
local wp_host="http://localhost:8000"
if [ "$routing_mode" = "subdomain" ] && [ -n "${EDGE_TUNNEL_FQDN_CI:-}" ]; then
wp_host="https://${EDGE_TUNNEL_FQDN_CI}"
fi
local wp_vars=( local wp_vars=(
"WOODPECKER_FORGEJO=true" "WOODPECKER_FORGEJO=true"
"WOODPECKER_FORGEJO_URL=${forge_url}" "WOODPECKER_FORGEJO_URL=${forge_url}"
"WOODPECKER_HOST=http://localhost:8000" "WOODPECKER_HOST=${wp_host}"
) )
if [ -n "${client_id:-}" ]; then if [ -n "${client_id:-}" ]; then
wp_vars+=("WP_FORGEJO_CLIENT=${client_id}") wp_vars+=("WP_FORGEJO_CLIENT=${client_id}")

View file

@ -607,9 +607,12 @@ COMPOSEEOF
- EDGE_TUNNEL_USER=${EDGE_TUNNEL_USER:-tunnel} - EDGE_TUNNEL_USER=${EDGE_TUNNEL_USER:-tunnel}
- EDGE_TUNNEL_PORT=${EDGE_TUNNEL_PORT:-} - EDGE_TUNNEL_PORT=${EDGE_TUNNEL_PORT:-}
- EDGE_TUNNEL_FQDN=${EDGE_TUNNEL_FQDN:-} - EDGE_TUNNEL_FQDN=${EDGE_TUNNEL_FQDN:-}
# Subdomain fallback (#713): if subpath routing (#704/#708) fails, add: # Subdomain fallback (#1028): per-service FQDNs for subdomain routing mode.
# EDGE_TUNNEL_FQDN_FORGE, EDGE_TUNNEL_FQDN_CI, EDGE_TUNNEL_FQDN_CHAT # Set EDGE_ROUTING_MODE=subdomain to activate. See docs/edge-routing-fallback.md.
# See docs/edge-routing-fallback.md for the full pivot plan. - EDGE_ROUTING_MODE=${EDGE_ROUTING_MODE:-subpath}
- EDGE_TUNNEL_FQDN_FORGE=${EDGE_TUNNEL_FQDN_FORGE:-}
- EDGE_TUNNEL_FQDN_CI=${EDGE_TUNNEL_FQDN_CI:-}
- EDGE_TUNNEL_FQDN_CHAT=${EDGE_TUNNEL_FQDN_CHAT:-}
# Shared secret for Caddy ↔ chat forward_auth (#709) # Shared secret for Caddy ↔ chat forward_auth (#709)
- FORWARD_AUTH_SECRET=${FORWARD_AUTH_SECRET:-} - FORWARD_AUTH_SECRET=${FORWARD_AUTH_SECRET:-}
volumes: volumes:
@ -700,6 +703,8 @@ COMPOSEEOF
CHAT_OAUTH_CLIENT_ID: ${CHAT_OAUTH_CLIENT_ID:-} CHAT_OAUTH_CLIENT_ID: ${CHAT_OAUTH_CLIENT_ID:-}
CHAT_OAUTH_CLIENT_SECRET: ${CHAT_OAUTH_CLIENT_SECRET:-} CHAT_OAUTH_CLIENT_SECRET: ${CHAT_OAUTH_CLIENT_SECRET:-}
EDGE_TUNNEL_FQDN: ${EDGE_TUNNEL_FQDN:-} EDGE_TUNNEL_FQDN: ${EDGE_TUNNEL_FQDN:-}
EDGE_TUNNEL_FQDN_CHAT: ${EDGE_TUNNEL_FQDN_CHAT:-}
EDGE_ROUTING_MODE: ${EDGE_ROUTING_MODE:-subpath}
DISINTO_CHAT_ALLOWED_USERS: ${DISINTO_CHAT_ALLOWED_USERS:-} DISINTO_CHAT_ALLOWED_USERS: ${DISINTO_CHAT_ALLOWED_USERS:-}
# Shared secret for Caddy forward_auth verify endpoint (#709) # Shared secret for Caddy forward_auth verify endpoint (#709)
FORWARD_AUTH_SECRET: ${FORWARD_AUTH_SECRET:-} FORWARD_AUTH_SECRET: ${FORWARD_AUTH_SECRET:-}
@ -805,6 +810,11 @@ _generate_agent_docker_impl() {
# Output path: ${FACTORY_ROOT}/docker/Caddyfile (gitignored — generated artifact). # Output path: ${FACTORY_ROOT}/docker/Caddyfile (gitignored — generated artifact).
# The edge compose service mounts this path as /etc/caddy/Caddyfile. # The edge compose service mounts this path as /etc/caddy/Caddyfile.
# On a fresh clone, `disinto init` calls generate_caddyfile before first `disinto up`. # On a fresh clone, `disinto init` calls generate_caddyfile before first `disinto up`.
#
# Routing mode (EDGE_ROUTING_MODE env var):
# subpath — (default) all services under <project>.disinto.ai/{forge,ci,chat,staging}
# subdomain — per-service subdomains: forge.<project>, ci.<project>, chat.<project>
# See docs/edge-routing-fallback.md for the full pivot plan.
_generate_caddyfile_impl() { _generate_caddyfile_impl() {
local docker_dir="${FACTORY_ROOT}/docker" local docker_dir="${FACTORY_ROOT}/docker"
local caddyfile="${docker_dir}/Caddyfile" local caddyfile="${docker_dir}/Caddyfile"
@ -814,8 +824,22 @@ _generate_caddyfile_impl() {
return return
fi fi
local routing_mode="${EDGE_ROUTING_MODE:-subpath}"
if [ "$routing_mode" = "subdomain" ]; then
_generate_caddyfile_subdomain "$caddyfile"
else
_generate_caddyfile_subpath "$caddyfile"
fi
echo "Created: ${caddyfile} (routing_mode=${routing_mode})"
}
# Subpath Caddyfile: all services under a single :80 block with path-based routing.
_generate_caddyfile_subpath() {
local caddyfile="$1"
cat > "$caddyfile" <<'CADDYFILEEOF' cat > "$caddyfile" <<'CADDYFILEEOF'
# Caddyfile — edge proxy configuration # Caddyfile — edge proxy configuration (subpath mode)
# IP-only binding at bootstrap; domain + TLS added later via vault resource request # IP-only binding at bootstrap; domain + TLS added later via vault resource request
:80 { :80 {
@ -858,8 +882,50 @@ _generate_caddyfile_impl() {
} }
} }
CADDYFILEEOF CADDYFILEEOF
}
echo "Created: ${caddyfile}" # Subdomain Caddyfile: four host blocks per docs/edge-routing-fallback.md.
# Uses env vars EDGE_TUNNEL_FQDN_FORGE, EDGE_TUNNEL_FQDN_CI, EDGE_TUNNEL_FQDN_CHAT,
# and EDGE_TUNNEL_FQDN (main project domain → staging).
_generate_caddyfile_subdomain() {
local caddyfile="$1"
cat > "$caddyfile" <<'CADDYFILEEOF'
# Caddyfile — edge proxy configuration (subdomain mode)
# Per-service subdomains; see docs/edge-routing-fallback.md
# Main project domain — staging / landing
{$EDGE_TUNNEL_FQDN} {
reverse_proxy staging:80
}
# Forgejo — root path, no subpath rewrite needed
{$EDGE_TUNNEL_FQDN_FORGE} {
reverse_proxy forgejo:3000
}
# Woodpecker CI — root path
{$EDGE_TUNNEL_FQDN_CI} {
reverse_proxy woodpecker:8000
}
# Chat — with forward_auth (#709, on its own host)
{$EDGE_TUNNEL_FQDN_CHAT} {
handle /login {
reverse_proxy chat:8080
}
handle /oauth/callback {
reverse_proxy chat:8080
}
handle /* {
forward_auth chat:8080 {
uri /auth/verify
copy_headers X-Forwarded-User
header_up X-Forward-Auth-Secret {$FORWARD_AUTH_SECRET}
}
reverse_proxy chat:8080
}
}
CADDYFILEEOF
} }
# Generate docker/index.html default page. # Generate docker/index.html default page.

View file

@ -172,6 +172,12 @@ EOT
handle /chat/oauth/callback { handle /chat/oauth/callback {
reverse_proxy 127.0.0.1:8080 reverse_proxy 127.0.0.1:8080
} }
# WebSocket endpoint for streaming (#1026)
handle /chat/ws {
header_up Upgrade $http.upgrade
header_up Connection $http.connection
reverse_proxy 127.0.0.1:8080
}
# Defense-in-depth: forward_auth stamps X-Forwarded-User from session (#709) # Defense-in-depth: forward_auth stamps X-Forwarded-User from session (#709)
handle /chat/* { handle /chat/* {
forward_auth 127.0.0.1:8080 { forward_auth 127.0.0.1:8080 {

View file

@ -59,6 +59,23 @@ check_pipeline_stall = false
# compact_pct = 60 # compact_pct = 60
# poll_interval = 60 # poll_interval = 60
# Edge routing mode (default: subpath)
#
# Controls how services are exposed through the edge proxy.
# subpath — all services under <project>.disinto.ai/{forge,ci,chat,staging}
# subdomain — per-service subdomains: forge.<project>, ci.<project>, chat.<project>
#
# Set to "subdomain" if subpath routing causes unfixable issues (redirect loops,
# OAuth callback mismatches, cookie collisions). See docs/edge-routing-fallback.md.
#
# Set in .env (not TOML) since it's consumed by docker-compose and shell scripts:
# EDGE_ROUTING_MODE=subdomain
#
# In subdomain mode, `disinto edge register` also writes:
# EDGE_TUNNEL_FQDN_FORGE=forge.<project>.disinto.ai
# EDGE_TUNNEL_FQDN_CI=ci.<project>.disinto.ai
# EDGE_TUNNEL_FQDN_CHAT=chat.<project>.disinto.ai
# [mirrors] # [mirrors]
# github = "git@github.com:johba/disinto.git" # github = "git@github.com:johba/disinto.git"
# codeberg = "git@codeberg.org:johba/disinto.git" # codeberg = "git@codeberg.org:johba/disinto.git"

View file

@ -39,13 +39,10 @@ EOF
exit 1 exit 1
} }
# TODO(#713): Subdomain fallback — if subpath routing (#704/#708) fails, this
# function would need to register additional routes for forge.<project>,
# ci.<project>, chat.<project> subdomains (or accept a --subdomain parameter).
# See docs/edge-routing-fallback.md for the full pivot plan.
# Register a new tunnel # Register a new tunnel
# Usage: do_register <project> <pubkey> # Usage: do_register <project> <pubkey>
# When EDGE_ROUTING_MODE=subdomain, also registers forge.<project>, ci.<project>,
# and chat.<project> subdomain routes (see docs/edge-routing-fallback.md).
do_register() { do_register() {
local project="$1" local project="$1"
local pubkey="$2" local pubkey="$2"
@ -79,17 +76,32 @@ do_register() {
local port local port
port=$(allocate_port "$project" "$full_pubkey" "${project}.${DOMAIN_SUFFIX}") port=$(allocate_port "$project" "$full_pubkey" "${project}.${DOMAIN_SUFFIX}")
# Add Caddy route # Add Caddy route for main project domain
add_route "$project" "$port" add_route "$project" "$port"
# Subdomain mode: register additional routes for per-service subdomains
local routing_mode="${EDGE_ROUTING_MODE:-subpath}"
if [ "$routing_mode" = "subdomain" ]; then
local subdomain
for subdomain in forge ci chat; do
add_route "${subdomain}.${project}" "$port"
done
fi
# Rebuild authorized_keys for tunnel user # Rebuild authorized_keys for tunnel user
rebuild_authorized_keys rebuild_authorized_keys
# Reload Caddy # Reload Caddy
reload_caddy reload_caddy
# Return JSON response # Build JSON response
echo "{\"port\":${port},\"fqdn\":\"${project}.${DOMAIN_SUFFIX}\"}" local response="{\"port\":${port},\"fqdn\":\"${project}.${DOMAIN_SUFFIX}\""
if [ "$routing_mode" = "subdomain" ]; then
response="${response},\"routing_mode\":\"subdomain\""
response="${response},\"subdomains\":{\"forge\":\"forge.${project}.${DOMAIN_SUFFIX}\",\"ci\":\"ci.${project}.${DOMAIN_SUFFIX}\",\"chat\":\"chat.${project}.${DOMAIN_SUFFIX}\"}"
fi
response="${response}}"
echo "$response"
} }
# Deregister a tunnel # Deregister a tunnel
@ -109,9 +121,18 @@ do_deregister() {
# Remove from registry # Remove from registry
free_port "$project" >/dev/null free_port "$project" >/dev/null
# Remove Caddy route # Remove Caddy route for main project domain
remove_route "$project" remove_route "$project"
# Subdomain mode: also remove per-service subdomain routes
local routing_mode="${EDGE_ROUTING_MODE:-subpath}"
if [ "$routing_mode" = "subdomain" ]; then
local subdomain
for subdomain in forge ci chat; do
remove_route "${subdomain}.${project}"
done
fi
# Rebuild authorized_keys for tunnel user # Rebuild authorized_keys for tunnel user
rebuild_authorized_keys rebuild_authorized_keys