feat: add Matrix coordination channel, replace openclaw (Closes #8)
Add matrix_send() to lib/env.sh and matrix_listener.sh daemon for real-time notifications, threaded escalations, and human-in-the-loop replies. All agents now notify via Matrix instead of openclaw. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
90ef03a304
commit
f215fbe3cf
11 changed files with 266 additions and 23 deletions
34
lib/env.sh
34
lib/env.sh
|
|
@ -55,7 +55,7 @@ codeberg_api() {
|
|||
woodpecker_api() {
|
||||
local path="$1"
|
||||
shift
|
||||
curl -sf \
|
||||
curl -sfL \
|
||||
-H "Authorization: Bearer ${WOODPECKER_TOKEN}" \
|
||||
"${WOODPECKER_SERVER}/api${path}" "$@"
|
||||
}
|
||||
|
|
@ -68,3 +68,35 @@ wpdb() {
|
|||
-d "${WOODPECKER_DB_NAME:-woodpecker}" \
|
||||
-t "$@" 2>/dev/null
|
||||
}
|
||||
|
||||
# Matrix messaging helper — usage: matrix_send <prefix> <message> [thread_event_id]
|
||||
# Returns event_id on stdout. Registers threads for listener dispatch.
|
||||
MATRIX_THREAD_MAP="${MATRIX_THREAD_MAP:-/tmp/matrix-thread-map}"
|
||||
matrix_send() {
|
||||
[ -z "${MATRIX_TOKEN:-}" ] && return 0
|
||||
local prefix="$1" msg="$2" thread_id="${3:-}"
|
||||
local room_encoded="${MATRIX_ROOM_ID//!/%21}"
|
||||
local txn="$(date +%s%N)$$"
|
||||
local body
|
||||
if [ -n "$thread_id" ]; then
|
||||
body=$(jq -nc --arg m "[${prefix}] ${msg}" --arg t "$thread_id" \
|
||||
'{msgtype:"m.text",body:$m,"m.relates_to":{rel_type:"m.thread",event_id:$t}}')
|
||||
else
|
||||
body=$(jq -nc --arg m "[${prefix}] ${msg}" '{msgtype:"m.text",body:$m}')
|
||||
fi
|
||||
local response
|
||||
response=$(curl -s -X PUT \
|
||||
-H "Authorization: Bearer ${MATRIX_TOKEN}" \
|
||||
-H "Content-Type: application/json" \
|
||||
"${MATRIX_HOMESERVER}/_matrix/client/v3/rooms/${room_encoded}/send/m.room.message/${txn}" \
|
||||
-d "$body" 2>/dev/null) || return 0
|
||||
local event_id
|
||||
event_id=$(printf '%s' "$response" | jq -r '.event_id // empty' 2>/dev/null)
|
||||
if [ -n "$event_id" ]; then
|
||||
printf '%s' "$event_id"
|
||||
# Register thread root for listener dispatch (escalations only)
|
||||
if [ -z "$thread_id" ]; then
|
||||
printf '%s\t%s\t%s\n' "$event_id" "$prefix" "$(date +%s)" >> "$MATRIX_THREAD_MAP" 2>/dev/null || true
|
||||
fi
|
||||
fi
|
||||
}
|
||||
|
|
|
|||
14
lib/matrix_listener.service
Normal file
14
lib/matrix_listener.service
Normal file
|
|
@ -0,0 +1,14 @@
|
|||
[Unit]
|
||||
Description=Dark Factory Matrix Listener
|
||||
After=network.target dendrite.service
|
||||
|
||||
[Service]
|
||||
Type=simple
|
||||
ExecStart=/home/admin/dark-factory/lib/matrix_listener.sh
|
||||
Restart=always
|
||||
RestartSec=10
|
||||
User=admin
|
||||
WorkingDirectory=/home/admin/dark-factory
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
||||
150
lib/matrix_listener.sh
Executable file
150
lib/matrix_listener.sh
Executable file
|
|
@ -0,0 +1,150 @@
|
|||
#!/usr/bin/env bash
|
||||
# matrix_listener.sh — Long-poll Matrix sync daemon
|
||||
#
|
||||
# Listens for replies in the factory Matrix room and dispatches them
|
||||
# to the appropriate agent via well-known files.
|
||||
#
|
||||
# Dispatch:
|
||||
# Thread reply to [supervisor] message → /tmp/factory-escalation-reply
|
||||
# Thread reply to [gardener] message → /tmp/gardener-escalation-reply
|
||||
#
|
||||
# Run as systemd service (see matrix_listener.service) or manually:
|
||||
# ./matrix_listener.sh
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
# Load shared environment
|
||||
source "$(dirname "$0")/../lib/env.sh"
|
||||
|
||||
SINCE_FILE="/tmp/matrix-listener-since"
|
||||
THREAD_MAP="${MATRIX_THREAD_MAP:-/tmp/matrix-thread-map}"
|
||||
LOGFILE="${FACTORY_ROOT}/factory/matrix-listener.log"
|
||||
SYNC_TIMEOUT=30000 # 30s long-poll
|
||||
BACKOFF=5
|
||||
MAX_BACKOFF=60
|
||||
|
||||
log() {
|
||||
printf '[%s] listener: %s\n' "$(date -u '+%Y-%m-%d %H:%M:%S UTC')" "$*" >> "$LOGFILE"
|
||||
}
|
||||
|
||||
# Validate Matrix config
|
||||
if [ -z "${MATRIX_TOKEN:-}" ] || [ -z "${MATRIX_ROOM_ID:-}" ]; then
|
||||
echo "MATRIX_TOKEN and MATRIX_ROOM_ID must be set in .env" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# URL-encode room ID
|
||||
ROOM_ENCODED="${MATRIX_ROOM_ID//!/%21}"
|
||||
|
||||
# Build sync filter — only our room, only messages
|
||||
FILTER=$(jq -nc --arg room "$MATRIX_ROOM_ID" '{
|
||||
room: {
|
||||
rooms: [$room],
|
||||
timeline: {types: ["m.room.message"], limit: 20},
|
||||
state: {types: []},
|
||||
ephemeral: {types: []}
|
||||
},
|
||||
presence: {types: []}
|
||||
}')
|
||||
|
||||
# Load previous sync token
|
||||
SINCE=""
|
||||
if [ -f "$SINCE_FILE" ]; then
|
||||
SINCE=$(cat "$SINCE_FILE" 2>/dev/null || true)
|
||||
fi
|
||||
|
||||
log "started (since=${SINCE:-initial})"
|
||||
|
||||
# Do an initial sync without timeout to catch up, then switch to long-poll
|
||||
INITIAL=true
|
||||
|
||||
while true; do
|
||||
# Build sync URL
|
||||
SYNC_URL="${MATRIX_HOMESERVER}/_matrix/client/v3/sync?filter=$(jq -rn --arg f "$FILTER" '$f | @uri')&timeout=${SYNC_TIMEOUT}"
|
||||
if [ -n "$SINCE" ]; then
|
||||
SYNC_URL="${SYNC_URL}&since=${SINCE}"
|
||||
fi
|
||||
if [ "$INITIAL" = true ]; then
|
||||
# First sync: no timeout, just catch up
|
||||
SYNC_URL="${MATRIX_HOMESERVER}/_matrix/client/v3/sync?filter=$(jq -rn --arg f "$FILTER" '$f | @uri')"
|
||||
[ -n "$SINCE" ] && SYNC_URL="${SYNC_URL}&since=${SINCE}"
|
||||
INITIAL=false
|
||||
fi
|
||||
|
||||
# Long-poll
|
||||
RESPONSE=$(curl -s --max-time $((SYNC_TIMEOUT / 1000 + 30)) \
|
||||
-H "Authorization: Bearer ${MATRIX_TOKEN}" \
|
||||
"$SYNC_URL" 2>/dev/null) || {
|
||||
log "sync failed, backing off ${BACKOFF}s"
|
||||
sleep "$BACKOFF"
|
||||
BACKOFF=$((BACKOFF * 2 > MAX_BACKOFF ? MAX_BACKOFF : BACKOFF * 2))
|
||||
continue
|
||||
}
|
||||
|
||||
# Reset backoff on success
|
||||
BACKOFF=5
|
||||
|
||||
# Extract next_batch
|
||||
NEXT_BATCH=$(printf '%s' "$RESPONSE" | jq -r '.next_batch // empty' 2>/dev/null)
|
||||
if [ -z "$NEXT_BATCH" ]; then
|
||||
log "no next_batch in response"
|
||||
sleep 5
|
||||
continue
|
||||
fi
|
||||
|
||||
# Save cursor
|
||||
printf '%s' "$NEXT_BATCH" > "$SINCE_FILE"
|
||||
SINCE="$NEXT_BATCH"
|
||||
|
||||
# Extract timeline events from our room
|
||||
EVENTS=$(printf '%s' "$RESPONSE" | jq -c --arg room "$MATRIX_ROOM_ID" '
|
||||
.rooms.join[$room].timeline.events[]? |
|
||||
select(.type == "m.room.message") |
|
||||
select(.sender != "'"${MATRIX_BOT_USER}"'")
|
||||
' 2>/dev/null) || continue
|
||||
|
||||
[ -z "$EVENTS" ] && continue
|
||||
|
||||
while IFS= read -r event; do
|
||||
SENDER=$(printf '%s' "$event" | jq -r '.sender')
|
||||
BODY=$(printf '%s' "$event" | jq -r '.content.body // ""')
|
||||
EVENT_ID=$(printf '%s' "$event" | jq -r '.event_id')
|
||||
|
||||
# Check if this is a thread reply
|
||||
THREAD_ROOT=$(printf '%s' "$event" | jq -r '.content."m.relates_to" | select(.rel_type == "m.thread") | .event_id // empty' 2>/dev/null)
|
||||
|
||||
if [ -z "$THREAD_ROOT" ] || [ -z "$BODY" ]; then
|
||||
continue
|
||||
fi
|
||||
|
||||
# Look up thread root in our mapping
|
||||
if [ ! -f "$THREAD_MAP" ]; then
|
||||
continue
|
||||
fi
|
||||
|
||||
AGENT=$(awk -F'\t' -v id="$THREAD_ROOT" '$1 == id {print $2}' "$THREAD_MAP" 2>/dev/null)
|
||||
|
||||
if [ -z "$AGENT" ]; then
|
||||
log "reply to unknown thread ${THREAD_ROOT:0:20} from ${SENDER}"
|
||||
continue
|
||||
fi
|
||||
|
||||
log "reply from ${SENDER} to [${AGENT}] thread: ${BODY:0:100}"
|
||||
|
||||
case "$AGENT" in
|
||||
supervisor)
|
||||
printf '%s\t%s\t%s\n' "$(date -u +%Y-%m-%dT%H:%M:%SZ)" "$SENDER" "$BODY" >> /tmp/factory-escalation-reply
|
||||
# Acknowledge
|
||||
matrix_send "supervisor" "✓ received, will act on next poll" "$THREAD_ROOT" >/dev/null 2>&1 || true
|
||||
;;
|
||||
gardener)
|
||||
printf '%s\t%s\t%s\n' "$(date -u +%Y-%m-%dT%H:%M:%SZ)" "$SENDER" "$BODY" >> /tmp/gardener-escalation-reply
|
||||
matrix_send "gardener" "✓ received, will act on next poll" "$THREAD_ROOT" >/dev/null 2>&1 || true
|
||||
;;
|
||||
*)
|
||||
log "no handler for agent '${AGENT}'"
|
||||
;;
|
||||
esac
|
||||
|
||||
done <<< "$EVENTS"
|
||||
done
|
||||
Loading…
Add table
Add a link
Reference in a new issue