From f215fbe3cf51f0cab280febc0fe70559832b5600 Mon Sep 17 00:00:00 2001 From: johba Date: Sat, 14 Mar 2026 16:25:33 +0100 Subject: [PATCH] feat: add Matrix coordination channel, replace openclaw (Closes #8) Add matrix_send() to lib/env.sh and matrix_listener.sh daemon for real-time notifications, threaded escalations, and human-in-the-loop replies. All agents now notify via Matrix instead of openclaw. Co-Authored-By: Claude Opus 4.6 (1M context) --- .env.example | 6 ++ README.md | 13 +++- dev/dev-agent.sh | 2 +- dev/dev-poll.sh | 6 +- factory/PROMPT.md | 6 +- factory/factory-poll.sh | 24 +++++- gardener/gardener-poll.sh | 24 +++++- lib/env.sh | 34 +++++++- lib/matrix_listener.service | 14 ++++ lib/matrix_listener.sh | 150 ++++++++++++++++++++++++++++++++++++ review/review-pr.sh | 10 +-- 11 files changed, 266 insertions(+), 23 deletions(-) create mode 100644 lib/matrix_listener.service create mode 100755 lib/matrix_listener.sh diff --git a/.env.example b/.env.example index 8c911fe..a3afd0c 100644 --- a/.env.example +++ b/.env.example @@ -26,5 +26,11 @@ WOODPECKER_DB_USER=woodpecker WOODPECKER_DB_HOST=127.0.0.1 WOODPECKER_DB_NAME=woodpecker +# ── Matrix (optional — real-time notifications & escalation replies) ────── +MATRIX_HOMESERVER=http://localhost:8008 # Dendrite/Synapse URL +MATRIX_BOT_USER=@factory:your.server # bot's Matrix user ID +MATRIX_TOKEN= # bot's access token +MATRIX_ROOM_ID= # coordination room ID (!xxx:your.server) + # ── Tuning ──────────────────────────────────────────────────────────────── CLAUDE_TIMEOUT=7200 # max seconds per Claude invocation diff --git a/README.md b/README.md index 65b3799..1223e51 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,11 @@ cron (*/10) ──→ review-poll.sh ← finds unreviewed PRs, spawns review cron (daily) ──→ gardener-poll.sh ← backlog grooming (duplicates, stale, tech-debt) └── claude -p: triage → promote/close/escalate + +systemd ──→ matrix_listener.sh ← long-poll daemon for human replies + └── dispatches thread replies → supervisor/gardener + +all agents ──→ matrix_send() ← status updates, escalations, merge notifications ``` ## Prerequisites @@ -34,7 +39,7 @@ cron (daily) ──→ gardener-poll.sh ← backlog grooming (duplicates, stale **Optional:** -- [OpenClaw](https://openclaw.ai/) — escalation notifications; when agents hit something they can't resolve, they send a system event via `openclaw` CLI +- [Matrix](https://matrix.org/) homeserver ([Dendrite](https://github.com/matrix-org/dendrite) or Synapse) — real-time notifications, escalation threads with human-in-the-loop replies - [Foundry](https://getfoundry.sh/) (`forge`, `cast`, `anvil`) — only needed if your target project uses Solidity - [Node.js](https://nodejs.org/) — only needed if your target project uses Node @@ -93,8 +98,10 @@ dark-factory/ ├── .env.example # Template — copy to .env, add secrets + project config ├── .gitignore # Excludes .env, logs, state files ├── lib/ -│ ├── env.sh # Shared: load .env, PATH, Codeberg/Woodpecker API helpers -│ └── ci-debug.sh # Woodpecker CI log/failure helper +│ ├── env.sh # Shared: load .env, PATH, API helpers, matrix_send() +│ ├── ci-debug.sh # Woodpecker CI log/failure helper +│ ├── matrix_listener.sh # Matrix long-poll daemon (dispatches replies) +│ └── matrix_listener.service # systemd unit for the listener ├── dev/ │ ├── dev-poll.sh # Cron entry: find ready issues │ └── dev-agent.sh # Implementation agent (claude -p) diff --git a/dev/dev-agent.sh b/dev/dev-agent.sh index ba61dad..fae9ede 100755 --- a/dev/dev-agent.sh +++ b/dev/dev-agent.sh @@ -55,7 +55,7 @@ status() { } notify() { - openclaw system event --text "🔧 dev-agent #${ISSUE}: $*" --mode now 2>/dev/null || true + matrix_send "dev" "🔧 #${ISSUE}: $*" 2>/dev/null || true } cleanup_worktree() { diff --git a/dev/dev-poll.sh b/dev/dev-poll.sh index 4a14be2..772b254 100755 --- a/dev/dev-poll.sh +++ b/dev/dev-poll.sh @@ -157,7 +157,7 @@ if [ "$ORPHAN_COUNT" -gt 0 ]; then "${API}/issues/${ISSUE_NUM}" -d '{"state":"closed"}' >/dev/null 2>&1 || true curl -sf -X DELETE -H "Authorization: token ${CODEBERG_TOKEN}" \ "${API}/issues/${ISSUE_NUM}/labels/in-progress" >/dev/null 2>&1 || true - openclaw system event --text "✅ PR #${HAS_PR} merged! Issue #${ISSUE_NUM} done." --mode now 2>/dev/null || true + matrix_send "dev" "✅ PR #${HAS_PR} merged! Issue #${ISSUE_NUM} done." 2>/dev/null || true else log "merge failed (HTTP ${MERGE_CODE})" fi @@ -235,7 +235,7 @@ for i in $(seq 0 $(($(echo "$OPEN_PRS" | jq 'length') - 1))); do curl -sf -X PATCH -H "Authorization: token ${CODEBERG_TOKEN}" \ -H "Content-Type: application/json" \ "${API}/issues/${STUCK_ISSUE}" -d '{"state":"closed"}' >/dev/null 2>&1 || true - openclaw system event --text "✅ PR #${PR_NUM} merged! Issue #${STUCK_ISSUE} done." --mode now 2>/dev/null || true + matrix_send "dev" "✅ PR #${PR_NUM} merged! Issue #${STUCK_ISSUE} done." 2>/dev/null || true fi continue fi @@ -309,7 +309,7 @@ for i in $(seq 0 $((BACKLOG_COUNT - 1))); do curl -sf -X PATCH -H "Authorization: token ${CODEBERG_TOKEN}" \ -H "Content-Type: application/json" \ "${API}/issues/${ISSUE_NUM}" -d '{"state":"closed"}' >/dev/null 2>&1 || true - openclaw system event --text "✅ PR #${EXISTING_PR} merged! Issue #${ISSUE_NUM} done." --mode now 2>/dev/null || true + matrix_send "dev" "✅ PR #${EXISTING_PR} merged! Issue #${ISSUE_NUM} done." 2>/dev/null || true fi continue diff --git a/factory/PROMPT.md b/factory/PROMPT.md index 96aac64..1a96f8e 100644 --- a/factory/PROMPT.md +++ b/factory/PROMPT.md @@ -40,12 +40,14 @@ This gives you: - `$PROJECT_NAME` — short project name (for worktree prefixes, container names) - `$PRIMARY_BRANCH` — main branch (master or main) - `$FACTORY_ROOT` — path to the dark-factory repo +- `matrix_send ` — send notifications to the Matrix coordination room ## Escalation -If you can't fix it, escalate to Clawy (the main agent): +If you can't fix it, escalate via Matrix: ```bash -openclaw system event --text "🏭 ESCALATE: " --mode now +source ${FACTORY_ROOT}/lib/env.sh +matrix_send "supervisor" "🏭 ESCALATE: " ``` Do NOT escalate if you can fix it. Do NOT ask permission. Fix first, report after. diff --git a/factory/factory-poll.sh b/factory/factory-poll.sh index f807a18..62a682a 100755 --- a/factory/factory-poll.sh +++ b/factory/factory-poll.sh @@ -36,6 +36,14 @@ status() { flog "$*" } +# ── Check for escalation replies from Matrix ────────────────────────────── +ESCALATION_REPLY="" +if [ -s /tmp/factory-escalation-reply ]; then + ESCALATION_REPLY=$(cat /tmp/factory-escalation-reply) + rm -f /tmp/factory-escalation-reply + flog "Got escalation reply: $(echo "$ESCALATION_REPLY" | head -1)" +fi + # Alerts by priority P0_ALERTS="" P1_ALERTS="" @@ -154,10 +162,10 @@ fi status "P2: checking factory" # CI stuck -STUCK_CI=$(wpdb -c "SELECT count(*) FROM pipelines WHERE repo_id=${WOODPECKER_REPO_ID} AND status='running' AND EXTRACT(EPOCH FROM now() - to_timestamp(started)) > 1200;" 2>/dev/null | xargs) -[ "${STUCK_CI:-0}" -gt 0 ] && p2 "CI: ${STUCK_CI} pipeline(s) running >20min" +STUCK_CI=$(wpdb -c "SELECT count(*) FROM pipelines WHERE repo_id=${WOODPECKER_REPO_ID} AND status='running' AND EXTRACT(EPOCH FROM now() - to_timestamp(started)) > 1200;" 2>/dev/null | xargs || true) +[ "${STUCK_CI:-0}" -gt 0 ] 2>/dev/null && p2 "CI: ${STUCK_CI} pipeline(s) running >20min" -PENDING_CI=$(wpdb -c "SELECT count(*) FROM pipelines WHERE repo_id=${WOODPECKER_REPO_ID} AND status='pending' AND EXTRACT(EPOCH FROM now() - to_timestamp(created)) > 1800;" 2>/dev/null | xargs) +PENDING_CI=$(wpdb -c "SELECT count(*) FROM pipelines WHERE repo_id=${WOODPECKER_REPO_ID} AND status='pending' AND EXTRACT(EPOCH FROM now() - to_timestamp(created)) > 1800;" 2>/dev/null | xargs || true) [ "${PENDING_CI:-0}" -gt 0 ] && p2 "CI: ${PENDING_CI} pipeline(s) pending >30min" # Dev-agent health @@ -304,6 +312,10 @@ ALL_ALERTS="${P0_ALERTS}${P1_ALERTS}${P2_ALERTS}${P3_ALERTS}${P4_ALERTS}" if [ -n "$ALL_ALERTS" ]; then ALERT_TEXT=$(echo -e "$ALL_ALERTS") + # Notify Matrix + matrix_send "supervisor" "⚠️ Factory alerts: +${ALERT_TEXT}" 2>/dev/null || true + flog "Invoking claude -p for alerts" CLAUDE_PROMPT="$(cat "$PROMPT_FILE" 2>/dev/null || echo "You are a factory supervisor. Fix the issue below.") @@ -320,6 +332,12 @@ Disk: $(df -h / | awk 'NR==2{printf "%s used of %s (%s)", $3, $2, $5}') Docker: $(sudo docker ps --format '{{.Names}}' 2>/dev/null | wc -l) containers running Claude procs: $(pgrep -f "claude" 2>/dev/null | wc -l) +$(if [ -n "$ESCALATION_REPLY" ]; then echo " +## Human Response to Previous Escalation +${ESCALATION_REPLY} + +Act on this response."; fi) + Fix what you can. Escalate what you can't. Read the relevant best-practices file first." CLAUDE_OUTPUT=$(timeout 300 claude -p --model sonnet --dangerously-skip-permissions \ diff --git a/gardener/gardener-poll.sh b/gardener/gardener-poll.sh index 41bb35a..b65a714 100755 --- a/gardener/gardener-poll.sh +++ b/gardener/gardener-poll.sh @@ -53,6 +53,14 @@ trap 'rm -f "$LOCK_FILE"' EXIT log "--- Gardener poll start ---" +# ── Check for escalation replies from Matrix ────────────────────────────── +ESCALATION_REPLY="" +if [ -s /tmp/gardener-escalation-reply ]; then + ESCALATION_REPLY=$(cat /tmp/gardener-escalation-reply) + rm -f /tmp/gardener-escalation-reply + log "Got escalation reply: $(echo "$ESCALATION_REPLY" | head -1)" +fi + # ── Fetch all open issues ───────────────────────────────────────────────── ISSUES_JSON=$(codeberg_api GET "/issues?state=open&type=issues&limit=50&sort=updated&direction=desc" 2>/dev/null || true) if [ -z "$ISSUES_JSON" ] || [ "$ISSUES_JSON" = "null" ]; then @@ -207,7 +215,17 @@ ESCALATE ## Important - You MUST process the tech_debt_promotion items listed above. Read each issue, add acceptance criteria + affected files, then relabel to backlog. - If an issue is ambiguous or needs a design decision, ESCALATE it — don't skip it silently. -- Every tech-debt issue in the list above should result in either an ACTION (promoted) or an ESCALATE (needs decision). Never skip silently." +- Every tech-debt issue in the list above should result in either an ACTION (promoted) or an ESCALATE (needs decision). Never skip silently. +$(if [ -n "$ESCALATION_REPLY" ]; then echo " +## Human Response to Previous Escalation +The human replied with shorthand choices keyed to the previous ESCALATE block. +Format: '1a 2c 3b' means question 1→option (a), question 2→option (c), question 3→option (b). + +Raw reply: +${ESCALATION_REPLY} + +Execute each chosen option NOW via the Codeberg API before processing new items. +If a choice is unclear, re-escalate that single item with a clarifying question."; fi)" CLAUDE_OUTPUT=$(cd "${PROJECT_REPO_ROOT}" && CODEBERG_TOKEN="$CODEBERG_TOKEN" timeout "$CLAUDE_TIMEOUT" \ claude -p "$PROMPT" \ @@ -228,8 +246,8 @@ if [ -n "$ESCALATION" ]; then ITEM_COUNT=$(echo "$ESCALATION" | grep -c '.' || true) log "Escalating $ITEM_COUNT items to human" - # Send via openclaw system event - openclaw system event "🌱 Issue Gardener — ${ITEM_COUNT} item(s) need attention + # Send via Matrix (threaded — replies route back via listener) + matrix_send "gardener" "🌱 Issue Gardener — ${ITEM_COUNT} item(s) need attention ${ESCALATION} diff --git a/lib/env.sh b/lib/env.sh index 7f3018a..ab68986 100755 --- a/lib/env.sh +++ b/lib/env.sh @@ -55,7 +55,7 @@ codeberg_api() { woodpecker_api() { local path="$1" shift - curl -sf \ + curl -sfL \ -H "Authorization: Bearer ${WOODPECKER_TOKEN}" \ "${WOODPECKER_SERVER}/api${path}" "$@" } @@ -68,3 +68,35 @@ wpdb() { -d "${WOODPECKER_DB_NAME:-woodpecker}" \ -t "$@" 2>/dev/null } + +# Matrix messaging helper — usage: matrix_send [thread_event_id] +# Returns event_id on stdout. Registers threads for listener dispatch. +MATRIX_THREAD_MAP="${MATRIX_THREAD_MAP:-/tmp/matrix-thread-map}" +matrix_send() { + [ -z "${MATRIX_TOKEN:-}" ] && return 0 + local prefix="$1" msg="$2" thread_id="${3:-}" + local room_encoded="${MATRIX_ROOM_ID//!/%21}" + local txn="$(date +%s%N)$$" + local body + if [ -n "$thread_id" ]; then + body=$(jq -nc --arg m "[${prefix}] ${msg}" --arg t "$thread_id" \ + '{msgtype:"m.text",body:$m,"m.relates_to":{rel_type:"m.thread",event_id:$t}}') + else + body=$(jq -nc --arg m "[${prefix}] ${msg}" '{msgtype:"m.text",body:$m}') + fi + local response + response=$(curl -s -X PUT \ + -H "Authorization: Bearer ${MATRIX_TOKEN}" \ + -H "Content-Type: application/json" \ + "${MATRIX_HOMESERVER}/_matrix/client/v3/rooms/${room_encoded}/send/m.room.message/${txn}" \ + -d "$body" 2>/dev/null) || return 0 + local event_id + event_id=$(printf '%s' "$response" | jq -r '.event_id // empty' 2>/dev/null) + if [ -n "$event_id" ]; then + printf '%s' "$event_id" + # Register thread root for listener dispatch (escalations only) + if [ -z "$thread_id" ]; then + printf '%s\t%s\t%s\n' "$event_id" "$prefix" "$(date +%s)" >> "$MATRIX_THREAD_MAP" 2>/dev/null || true + fi + fi +} diff --git a/lib/matrix_listener.service b/lib/matrix_listener.service new file mode 100644 index 0000000..ef22016 --- /dev/null +++ b/lib/matrix_listener.service @@ -0,0 +1,14 @@ +[Unit] +Description=Dark Factory Matrix Listener +After=network.target dendrite.service + +[Service] +Type=simple +ExecStart=/home/admin/dark-factory/lib/matrix_listener.sh +Restart=always +RestartSec=10 +User=admin +WorkingDirectory=/home/admin/dark-factory + +[Install] +WantedBy=multi-user.target diff --git a/lib/matrix_listener.sh b/lib/matrix_listener.sh new file mode 100755 index 0000000..6c4666f --- /dev/null +++ b/lib/matrix_listener.sh @@ -0,0 +1,150 @@ +#!/usr/bin/env bash +# matrix_listener.sh — Long-poll Matrix sync daemon +# +# Listens for replies in the factory Matrix room and dispatches them +# to the appropriate agent via well-known files. +# +# Dispatch: +# Thread reply to [supervisor] message → /tmp/factory-escalation-reply +# Thread reply to [gardener] message → /tmp/gardener-escalation-reply +# +# Run as systemd service (see matrix_listener.service) or manually: +# ./matrix_listener.sh + +set -euo pipefail + +# Load shared environment +source "$(dirname "$0")/../lib/env.sh" + +SINCE_FILE="/tmp/matrix-listener-since" +THREAD_MAP="${MATRIX_THREAD_MAP:-/tmp/matrix-thread-map}" +LOGFILE="${FACTORY_ROOT}/factory/matrix-listener.log" +SYNC_TIMEOUT=30000 # 30s long-poll +BACKOFF=5 +MAX_BACKOFF=60 + +log() { + printf '[%s] listener: %s\n' "$(date -u '+%Y-%m-%d %H:%M:%S UTC')" "$*" >> "$LOGFILE" +} + +# Validate Matrix config +if [ -z "${MATRIX_TOKEN:-}" ] || [ -z "${MATRIX_ROOM_ID:-}" ]; then + echo "MATRIX_TOKEN and MATRIX_ROOM_ID must be set in .env" >&2 + exit 1 +fi + +# URL-encode room ID +ROOM_ENCODED="${MATRIX_ROOM_ID//!/%21}" + +# Build sync filter — only our room, only messages +FILTER=$(jq -nc --arg room "$MATRIX_ROOM_ID" '{ + room: { + rooms: [$room], + timeline: {types: ["m.room.message"], limit: 20}, + state: {types: []}, + ephemeral: {types: []} + }, + presence: {types: []} +}') + +# Load previous sync token +SINCE="" +if [ -f "$SINCE_FILE" ]; then + SINCE=$(cat "$SINCE_FILE" 2>/dev/null || true) +fi + +log "started (since=${SINCE:-initial})" + +# Do an initial sync without timeout to catch up, then switch to long-poll +INITIAL=true + +while true; do + # Build sync URL + SYNC_URL="${MATRIX_HOMESERVER}/_matrix/client/v3/sync?filter=$(jq -rn --arg f "$FILTER" '$f | @uri')&timeout=${SYNC_TIMEOUT}" + if [ -n "$SINCE" ]; then + SYNC_URL="${SYNC_URL}&since=${SINCE}" + fi + if [ "$INITIAL" = true ]; then + # First sync: no timeout, just catch up + SYNC_URL="${MATRIX_HOMESERVER}/_matrix/client/v3/sync?filter=$(jq -rn --arg f "$FILTER" '$f | @uri')" + [ -n "$SINCE" ] && SYNC_URL="${SYNC_URL}&since=${SINCE}" + INITIAL=false + fi + + # Long-poll + RESPONSE=$(curl -s --max-time $((SYNC_TIMEOUT / 1000 + 30)) \ + -H "Authorization: Bearer ${MATRIX_TOKEN}" \ + "$SYNC_URL" 2>/dev/null) || { + log "sync failed, backing off ${BACKOFF}s" + sleep "$BACKOFF" + BACKOFF=$((BACKOFF * 2 > MAX_BACKOFF ? MAX_BACKOFF : BACKOFF * 2)) + continue + } + + # Reset backoff on success + BACKOFF=5 + + # Extract next_batch + NEXT_BATCH=$(printf '%s' "$RESPONSE" | jq -r '.next_batch // empty' 2>/dev/null) + if [ -z "$NEXT_BATCH" ]; then + log "no next_batch in response" + sleep 5 + continue + fi + + # Save cursor + printf '%s' "$NEXT_BATCH" > "$SINCE_FILE" + SINCE="$NEXT_BATCH" + + # Extract timeline events from our room + EVENTS=$(printf '%s' "$RESPONSE" | jq -c --arg room "$MATRIX_ROOM_ID" ' + .rooms.join[$room].timeline.events[]? | + select(.type == "m.room.message") | + select(.sender != "'"${MATRIX_BOT_USER}"'") + ' 2>/dev/null) || continue + + [ -z "$EVENTS" ] && continue + + while IFS= read -r event; do + SENDER=$(printf '%s' "$event" | jq -r '.sender') + BODY=$(printf '%s' "$event" | jq -r '.content.body // ""') + EVENT_ID=$(printf '%s' "$event" | jq -r '.event_id') + + # Check if this is a thread reply + THREAD_ROOT=$(printf '%s' "$event" | jq -r '.content."m.relates_to" | select(.rel_type == "m.thread") | .event_id // empty' 2>/dev/null) + + if [ -z "$THREAD_ROOT" ] || [ -z "$BODY" ]; then + continue + fi + + # Look up thread root in our mapping + if [ ! -f "$THREAD_MAP" ]; then + continue + fi + + AGENT=$(awk -F'\t' -v id="$THREAD_ROOT" '$1 == id {print $2}' "$THREAD_MAP" 2>/dev/null) + + if [ -z "$AGENT" ]; then + log "reply to unknown thread ${THREAD_ROOT:0:20} from ${SENDER}" + continue + fi + + log "reply from ${SENDER} to [${AGENT}] thread: ${BODY:0:100}" + + case "$AGENT" in + supervisor) + printf '%s\t%s\t%s\n' "$(date -u +%Y-%m-%dT%H:%M:%SZ)" "$SENDER" "$BODY" >> /tmp/factory-escalation-reply + # Acknowledge + matrix_send "supervisor" "✓ received, will act on next poll" "$THREAD_ROOT" >/dev/null 2>&1 || true + ;; + gardener) + printf '%s\t%s\t%s\n' "$(date -u +%Y-%m-%dT%H:%M:%SZ)" "$SENDER" "$BODY" >> /tmp/gardener-escalation-reply + matrix_send "gardener" "✓ received, will act on next poll" "$THREAD_ROOT" >/dev/null 2>&1 || true + ;; + *) + log "no handler for agent '${AGENT}'" + ;; + esac + + done <<< "$EVENTS" +done diff --git a/review/review-pr.sh b/review/review-pr.sh index 3cb9c76..b4d1399 100755 --- a/review/review-pr.sh +++ b/review/review-pr.sh @@ -485,9 +485,7 @@ A maintainer should review this PR manually, or re-trigger with \`--force\`. [ -f "$f" ] && cp "$f" "${LOGDIR}/review-pr${PR_NUMBER}-$(basename "$f")" done - openclaw system event \ - --text "⚠️ PR #${PR_NUMBER} review failed — no valid JSON output" \ - --mode now 2>/dev/null || true + matrix_send "review" "⚠️ PR #${PR_NUMBER} review failed — no valid JSON output" 2>/dev/null || true exit 1 fi @@ -726,9 +724,7 @@ ${FU_DETAILS} log "created ${CREATED_COUNT} follow-up issues total" fi -# --- Notify OpenClaw --- -openclaw system event \ - --text "🤖 PR #${PR_NUMBER} ${REVIEW_TYPE}: ${VERDICT} — ${PR_TITLE}" \ - --mode now 2>/dev/null || true +# --- Notify Matrix --- +matrix_send "review" "🤖 PR #${PR_NUMBER} ${REVIEW_TYPE}: ${VERDICT} — ${PR_TITLE}" 2>/dev/null || true log "DONE: ${VERDICT} (${ELAPSED}s, re-review: ${IS_RE_REVIEW})"