mirror of
https://github.com/qwibitai/nanoclaw.git
synced 2026-06-24 18:31:31 +08:00
Compare commits
40 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 6313a63d43 | |||
| e9fafee764 | |||
| 5b66525e54 | |||
| 6c2134694a | |||
| 323ba125e7 | |||
| 04f03e9064 | |||
| 3b07c0ceaf | |||
| 1a358dc7e3 | |||
| 7da08b3327 | |||
| 684a98d078 | |||
| e1251da394 | |||
| eb6502a1b2 | |||
| 3af6e70c05 | |||
| 8a7311a7bb | |||
| 61ab60041c | |||
| ca17683e32 | |||
| 6a56b10ffc | |||
| 2754f7559a | |||
| 1594a0c682 | |||
| a6995cc17e | |||
| 93732a4978 | |||
| 350d9631fa | |||
| a90104b8e3 | |||
| 708f98e156 | |||
| b40d43725f | |||
| d92c676327 | |||
| 6f0b8f1961 | |||
| 1afbba6a91 | |||
| cd69bf5c45 | |||
| c3d1b3e976 | |||
| 42e8ae004e | |||
| 9ccafcda82 | |||
| 860d1310ca | |||
| 9ca3367229 | |||
| 12719be6e1 | |||
| 57dad14a01 | |||
| 6d8d085f96 | |||
| 877d2a370a | |||
| 8eff3e558c | |||
| 1eb55e85a0 |
@@ -82,11 +82,14 @@ For each target agent group, confirm OneCLI will inject Gmail secrets into its c
|
||||
onecli agents list
|
||||
```
|
||||
|
||||
If that agent's `secretMode` is `all`, you're done — Gmail secrets (identified by OneCLI's Gmail hostPattern) will auto-inject. If it's `selective`, explicitly assign the Gmail secrets:
|
||||
If that agent's `secretMode` is `all`, you're done — Gmail secrets (identified by OneCLI's Gmail hostPattern) will auto-inject. If it's `selective`, explicitly assign the Gmail secrets using the safe merge pattern (`set-secrets` replaces the entire list — always read first):
|
||||
|
||||
```bash
|
||||
onecli secrets list # find Gmail secret IDs (OneCLI creates one per connected app)
|
||||
onecli agents set-secrets --id <agent-id> --secret-ids <gmail-secret-id>
|
||||
GMAIL_IDS=$(onecli secrets list | jq -r '[.data[] | select(.name | test("(?i)gmail")) | .id] | join(",")')
|
||||
CURRENT=$(onecli agents secrets --id <agent-id> | jq -r '[.data[]] | join(",")')
|
||||
MERGED=$(printf '%s' "$CURRENT,$GMAIL_IDS" | tr ',' '\n' | sort -u | paste -sd ',' -)
|
||||
onecli agents set-secrets --id <agent-id> --secret-ids "$MERGED"
|
||||
onecli agents secrets --id <agent-id>
|
||||
```
|
||||
|
||||
## Phase 2: Apply Code Changes
|
||||
|
||||
@@ -0,0 +1,208 @@
|
||||
---
|
||||
name: add-mnemon
|
||||
description: Add persistent graph-based memory via mnemon. Agents recall past context before responding and remember insights after each turn.
|
||||
---
|
||||
|
||||
# Add Mnemon — Persistent Memory
|
||||
|
||||
Installs [mnemon](https://github.com/mnemon-dev/mnemon) in the agent container image. On each container start, `mnemon setup` registers Claude Code hooks that surface relevant memory before the agent responds and store new insights after each turn. Memory is written to the per-agent-group `.claude/` mount and survives container restarts.
|
||||
|
||||
## Provider Compatibility
|
||||
|
||||
**mnemon hooks only work with `--target claude-code`.** If the agent group uses `AGENT_PROVIDER=opencode`, hooks registered by `mnemon setup` will never fire — OpenCode spawns its own process and doesn't invoke the `claude` CLI at all.
|
||||
|
||||
Check your provider:
|
||||
|
||||
```bash
|
||||
grep AGENT_PROVIDER .env groups/*/container.json 2>/dev/null
|
||||
```
|
||||
|
||||
- `AGENT_PROVIDER=claude` (default) — fully compatible, proceed with both Phase 2 steps.
|
||||
- `AGENT_PROVIDER=opencode` — use **Phase 2 (OpenCode path)** instead of the standard entrypoint step.
|
||||
|
||||
## Phase 1: Pre-flight
|
||||
|
||||
### Check if already applied
|
||||
|
||||
```bash
|
||||
grep -q 'MNEMON_VERSION' container/Dockerfile && echo "Already applied" || echo "Not applied"
|
||||
```
|
||||
|
||||
If already applied, skip to Phase 3 (Verify).
|
||||
|
||||
### Check latest mnemon version
|
||||
|
||||
```bash
|
||||
curl -fsSL https://api.github.com/repos/mnemon-dev/mnemon/releases/latest | grep '"tag_name"'
|
||||
```
|
||||
|
||||
Note the version (e.g. `v0.1.1`) — use it as `MNEMON_VERSION` in the next step.
|
||||
|
||||
## Phase 2: Apply Changes (Claude Code path)
|
||||
|
||||
### 1. Dockerfile — install mnemon binary
|
||||
|
||||
Add after the AWS CLI block, before the Bun runtime section:
|
||||
|
||||
```dockerfile
|
||||
# ---- mnemon — persistent agent memory ----------------------------------------
|
||||
ARG MNEMON_VERSION=0.1.1
|
||||
RUN ARCH=$(dpkg --print-architecture) && \
|
||||
curl -fsSL "https://github.com/mnemon-dev/mnemon/releases/download/v${MNEMON_VERSION}/mnemon_${MNEMON_VERSION}_linux_${ARCH}.tar.gz" \
|
||||
| tar -xz -C /usr/local/bin mnemon && \
|
||||
chmod +x /usr/local/bin/mnemon
|
||||
|
||||
ENV MNEMON_DATA_DIR=/home/node/.claude/mnemon
|
||||
```
|
||||
|
||||
`MNEMON_DATA_DIR` points into the per-agent-group `.claude/` mount so memory persists across container restarts. No extra volume mounts needed.
|
||||
|
||||
### 2. Entrypoint — run mnemon setup on each container start
|
||||
|
||||
`mnemon setup` is idempotent. Edit `container/entrypoint.sh` to run it right after `set -e`, before the `cat` that captures stdin:
|
||||
|
||||
```bash
|
||||
#!/bin/bash
|
||||
# NanoClaw agent container entrypoint.
|
||||
#
|
||||
# ...existing header comment...
|
||||
|
||||
set -e
|
||||
|
||||
mnemon setup --target claude-code --yes --global >/dev/stderr 2>&1
|
||||
|
||||
cat > /tmp/input.json
|
||||
|
||||
exec bun run /app/src/index.ts < /tmp/input.json
|
||||
```
|
||||
|
||||
`>/dev/stderr 2>&1` routes all mnemon output to stderr (docker logs) so it doesn't interfere with the JSON stdin handshake between host and agent-runner.
|
||||
|
||||
### 3. Rebuild and smoke-test the image
|
||||
|
||||
```bash
|
||||
./container/build.sh
|
||||
docker run --rm --entrypoint mnemon nanoclaw-agent:latest --version
|
||||
```
|
||||
|
||||
## Phase 3: Restart and Verify
|
||||
|
||||
### Restart the service
|
||||
|
||||
```bash
|
||||
systemctl --user restart nanoclaw # Linux
|
||||
# launchctl kickstart -k gui/$(id -u)/com.nanoclaw # macOS
|
||||
```
|
||||
|
||||
### Confirm mnemon hooks are registered
|
||||
|
||||
After the next container starts, check that setup ran:
|
||||
|
||||
```bash
|
||||
docker logs $(docker ps --filter name=nanoclaw-v2 --format '{{.Names}}' | head -1) 2>&1 | grep -i mnemon
|
||||
```
|
||||
|
||||
Then inspect the hooks inside the running container:
|
||||
|
||||
```bash
|
||||
docker exec $(docker ps --filter name=nanoclaw-v2 --format '{{.Names}}' | head -1) \
|
||||
cat /home/node/.claude/settings.json | grep -A5 mnemon
|
||||
```
|
||||
|
||||
### Test memory recall
|
||||
|
||||
Have a conversation with the agent, then start a new session and reference something from the earlier one. Mnemon should surface the relevant context automatically without you restating it.
|
||||
|
||||
## Phase 2 (OpenCode path) — context injection
|
||||
|
||||
mnemon hooks don't fire under OpenCode. Instead, the agent-runner injects mnemon context directly into every prompt via `wrapPromptWithContext()` in `container/agent-runner/src/providers/opencode.ts`. This is already implemented in NanoClaw — no code changes needed if you're on current `ester`/`main`.
|
||||
|
||||
**How it works:** On each prompt, `readMnemonContext()` checks for `MNEMON_DATA_DIR` (set by the Dockerfile `ENV`). If the env var is present, it reads `$MNEMON_DATA_DIR/prompt/guide.md` (mnemon's custom prompt guide, written by `mnemon setup`) or falls back to an inline guide. The content is prepended as a `<system>` block, instructing the agent to run `mnemon recall` at the start of relevant tasks and `mnemon remember` after key decisions.
|
||||
|
||||
**What this means for the agent:** The agent (running inside OpenCode) can call `mnemon recall`, `mnemon remember`, `mnemon link`, and `mnemon status` via its bash tool. mnemon writes its graph to `$MNEMON_DATA_DIR`, which is in the per-agent-group `.claude/` mount — so memory persists across container restarts.
|
||||
|
||||
**Applying:** Only the Dockerfile step from Phase 2 is needed for OpenCode agents. Skip `container/entrypoint.sh` entirely.
|
||||
|
||||
```dockerfile
|
||||
ARG MNEMON_VERSION=0.1.1
|
||||
RUN ARCH=$(dpkg --print-architecture) && \
|
||||
curl -fsSL "https://github.com/mnemon-dev/mnemon/releases/download/v${MNEMON_VERSION}/mnemon_${MNEMON_VERSION}_linux_${ARCH}.tar.gz" \
|
||||
| tar -xz -C /usr/local/bin mnemon && \
|
||||
chmod +x /usr/local/bin/mnemon
|
||||
ENV MNEMON_DATA_DIR=/home/node/.claude/mnemon
|
||||
```
|
||||
|
||||
Then rebuild: `./container/build.sh`
|
||||
|
||||
### Verify (OpenCode)
|
||||
|
||||
Start a session and ask the agent to run `mnemon status`. It should report empty graphs (no error) on first run.
|
||||
|
||||
```bash
|
||||
# Also confirm the binary is present in the image:
|
||||
docker run --rm --entrypoint mnemon nanoclaw-agent:latest --version
|
||||
```
|
||||
|
||||
## Memory Storage
|
||||
|
||||
Mnemon writes to `/home/node/.claude/mnemon/` inside the container, which maps to the per-agent-group `.claude/` directory on the host. To find the exact host path:
|
||||
|
||||
```bash
|
||||
docker inspect $(docker ps --filter name=nanoclaw-v2 --format '{{.Names}}' | head -1) \
|
||||
--format '{{range .Mounts}}{{if eq .Destination "/home/node/.claude"}}{{.Source}}{{end}}{{end}}'
|
||||
```
|
||||
|
||||
To reset all memory for an agent, stop the container and delete the `mnemon/` subdirectory from that host path.
|
||||
|
||||
## Migration Guide Update
|
||||
|
||||
If you are using `/migrate-nanoclaw`, add these entries to `.nanoclaw-migrations/05-dockerfile.md`:
|
||||
|
||||
**Dockerfile — after AWS CLI, before Bun runtime:**
|
||||
```dockerfile
|
||||
ARG MNEMON_VERSION=0.1.1
|
||||
RUN ARCH=$(dpkg --print-architecture) && \
|
||||
curl -fsSL "https://github.com/mnemon-dev/mnemon/releases/download/v${MNEMON_VERSION}/mnemon_${MNEMON_VERSION}_linux_${ARCH}.tar.gz" \
|
||||
| tar -xz -C /usr/local/bin mnemon && \
|
||||
chmod +x /usr/local/bin/mnemon
|
||||
ENV MNEMON_DATA_DIR=/home/node/.claude/mnemon
|
||||
```
|
||||
|
||||
**`container/entrypoint.sh` — add after `set -e`:**
|
||||
```bash
|
||||
mnemon setup --target claude-code --yes --global >/dev/stderr 2>&1
|
||||
```
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
### `mnemon: command not found` in container
|
||||
|
||||
The image wasn't rebuilt after adding the Dockerfile layer. Run `./container/build.sh` and restart.
|
||||
|
||||
### Memory not persisting across restarts
|
||||
|
||||
Verify `MNEMON_DATA_DIR` resolves to a mounted path (not an in-container ephemeral directory):
|
||||
|
||||
```bash
|
||||
docker exec <container> sh -c 'ls -la $MNEMON_DATA_DIR'
|
||||
```
|
||||
|
||||
If the directory is empty after conversations, the mount is missing or the path is wrong. Check the host mount with the `docker inspect` command above.
|
||||
|
||||
### Agent not using past memory
|
||||
|
||||
`mnemon setup` writes hooks into `/home/node/.claude/settings.json`. Verify:
|
||||
|
||||
```bash
|
||||
docker exec <container> cat /home/node/.claude/settings.json
|
||||
```
|
||||
|
||||
If the hooks are absent, `mnemon setup` may have failed silently. Check container startup logs for errors from mnemon.
|
||||
|
||||
### Setup fails at container start
|
||||
|
||||
Run setup manually inside a running container to see the full error:
|
||||
|
||||
```bash
|
||||
docker exec -it <container> mnemon setup --target claude-code --yes --global
|
||||
```
|
||||
@@ -132,12 +132,15 @@ Credentials: register provider API keys in OneCLI with the matching `--host-patt
|
||||
|
||||
After adding a secret, **grant the agent access** — agents in `selective` mode only receive secrets they've been explicitly assigned:
|
||||
|
||||
```bash
|
||||
# Find the agent id and secret id, then:
|
||||
onecli agents set-secrets --id <agent-id> --secret-ids <existing-ids>,<new-secret-id>
|
||||
```
|
||||
Use the safe merge pattern — `set-secrets` replaces the entire list, so always read first:
|
||||
|
||||
Always include existing secret IDs in the list — `set-secrets` replaces, not appends.
|
||||
```bash
|
||||
AGENT_ID=$(onecli agents list | jq -r '.data[] | select(.identifier=="<agentGroupId>") | .id')
|
||||
CURRENT=$(onecli agents secrets --id "$AGENT_ID" | jq -r '[.data[]] | join(",")')
|
||||
MERGED=$(printf '%s' "$CURRENT,<new-secret-id>" | tr ',' '\n' | sort -u | paste -sd ',' -)
|
||||
onecli agents set-secrets --id "$AGENT_ID" --secret-ids "$MERGED"
|
||||
onecli agents secrets --id "$AGENT_ID"
|
||||
```
|
||||
|
||||
#### Example: DeepSeek
|
||||
|
||||
|
||||
@@ -284,6 +284,11 @@ If you see `Signal daemon not reachable at 127.0.0.1:7583` and `SIGNAL_MANAGE_DA
|
||||
1. Channel initialized: `grep "Signal channel connected" logs/nanoclaw.log | tail -1`
|
||||
2. Channel wired: `pnpm exec tsx scripts/q.ts data/v2.db "SELECT mg.platform_id, mg.name FROM messaging_groups mg JOIN messaging_group_agents mga ON mg.id = mga.messaging_group_id WHERE mg.channel_type='signal'"`
|
||||
3. Service running: `launchctl print gui/$(id -u)/com.nanoclaw` (macOS) / `systemctl --user status nanoclaw` (Linux)
|
||||
4. **Check for duplicate service instances** — if `logs/nanoclaw.error.log` shows `No adapter for channel type channelType="signal"` despite the adapter starting, two NanoClaw processes are racing. See the `/debug` skill section "No adapter for channel type / Messages silently lost" for the full fix.
|
||||
|
||||
### Messages delivered but never arrive (null platformMsgId)
|
||||
|
||||
Signal responses show `platformMsgId=undefined` in the main log. This means the delivery poll ran but found no adapter — likely a duplicate service instance issue (see above). Affected messages cannot be retried; the user must resend.
|
||||
|
||||
### Lost connection mid-session
|
||||
|
||||
|
||||
@@ -90,12 +90,12 @@ onecli secrets list | grep -i vercel
|
||||
OneCLI uses selective secret mode — secrets must be explicitly assigned to each agent. Get the Vercel secret ID from the output above, then assign it to every agent:
|
||||
|
||||
```bash
|
||||
# For each agent, add the Vercel secret to its assigned secrets list.
|
||||
# First get current assignments, then set them with the new secret appended.
|
||||
VERCEL_SECRET_ID=$(onecli secrets list 2>/dev/null | grep -B2 "Vercel" | grep '"id"' | head -1 | sed 's/.*"id": "//;s/".*//')
|
||||
for agent in $(onecli agents list 2>/dev/null | grep '"id"' | sed 's/.*"id": "//;s/".*//'); do
|
||||
CURRENT=$(onecli agents secrets --id "$agent" 2>/dev/null | grep '"' | grep -v hint | grep -v data | sed 's/.*"//;s/".*//' | tr '\n' ',' | sed 's/,$//')
|
||||
onecli agents set-secrets --id "$agent" --secret-ids "${CURRENT:+$CURRENT,}$VERCEL_SECRET_ID"
|
||||
# set-secrets replaces the entire list — read and merge for each agent.
|
||||
VERCEL_SECRET_ID=$(onecli secrets list | jq -r '.data[] | select(.name | test("(?i)vercel")) | .id' | head -1)
|
||||
for agent in $(onecli agents list | jq -r '.data[].id'); do
|
||||
CURRENT=$(onecli agents secrets --id "$agent" | jq -r '[.data[]] | join(",")')
|
||||
MERGED=$(printf '%s' "$CURRENT,$VERCEL_SECRET_ID" | tr ',' '\n' | sort -u | paste -sd ',' -)
|
||||
onecli agents set-secrets --id "$agent" --secret-ids "$MERGED"
|
||||
done
|
||||
```
|
||||
|
||||
|
||||
@@ -57,7 +57,50 @@ Debug level shows:
|
||||
|
||||
## Common Issues
|
||||
|
||||
### 1. "Claude Code process exited with code 1"
|
||||
### 1. "No adapter for channel type" / Messages silently lost (null platformMsgId)
|
||||
|
||||
**Symptom:** The bot stops replying. `logs/nanoclaw.error.log` shows repeated:
|
||||
```
|
||||
WARN No adapter for channel type channelType="telegram"
|
||||
WARN No adapter for channel type channelType="signal"
|
||||
```
|
||||
The main log shows "Message delivered" entries with `platformMsgId=undefined` — meaning the delivery poll ran, found no adapter, and permanently marked the message as delivered without sending it.
|
||||
|
||||
**Root cause: two NanoClaw service instances running simultaneously.**
|
||||
|
||||
When a second service instance (often `nanoclaw-v2-<id>.service` running alongside `nanoclaw.service`) is active with a stale binary, it has no channel adapters registered. Its delivery poll races against the working instance and wins — permanently marking outbound messages as delivered without ever sending them.
|
||||
|
||||
**Diagnosis:**
|
||||
```bash
|
||||
# Check for duplicate running instances
|
||||
ps aux | grep 'nanoclaw/dist/index.js' | grep -v grep
|
||||
|
||||
# Check which services are active
|
||||
systemctl --user list-units 'nanoclaw*' --all
|
||||
|
||||
# Confirm channel adapters registered by the current process
|
||||
grep "Channel adapter started" logs/nanoclaw.log | tail -10
|
||||
```
|
||||
|
||||
**Fix:**
|
||||
1. Identify which service has the correct binary and EnvironmentFile (the one showing `signal`, `telegram`, `cli` all started in the log).
|
||||
2. Stop and disable the stale duplicate service:
|
||||
```bash
|
||||
systemctl --user stop nanoclaw.service # or whichever is the old one
|
||||
systemctl --user disable nanoclaw.service
|
||||
```
|
||||
3. If the remaining service unit is missing `EnvironmentFile`, add it:
|
||||
```bash
|
||||
# Edit the service unit — add this line under [Service]:
|
||||
# EnvironmentFile=/home/[user]/nanoclaw/.env
|
||||
systemctl --user daemon-reload
|
||||
systemctl --user restart nanoclaw-v2-<id>.service
|
||||
```
|
||||
4. Verify only one instance runs: `ps aux | grep nanoclaw/dist/index.js | grep -v grep`
|
||||
|
||||
**Note:** Messages that were marked delivered with a null `platform_message_id` cannot be automatically retried — they are permanently lost. The user must resend their message.
|
||||
|
||||
### 2. "Claude Code process exited with code 1"
|
||||
|
||||
**Check the container log file** in `groups/{folder}/logs/container-*.log`
|
||||
|
||||
|
||||
@@ -259,6 +259,41 @@ Tell the user:
|
||||
- To manage secrets: `onecli secrets list`, or open ${ONECLI_URL}
|
||||
- To add rate limits or policies: `onecli rules create --help`
|
||||
|
||||
## Granting secrets to agents (safe merge)
|
||||
|
||||
`set-secrets` **replaces** the agent's entire secret list — it never appends. Always read the current list first and merge before calling it. This pattern is canonical across all skills that assign secrets:
|
||||
|
||||
```bash
|
||||
AGENT_ID=$(onecli agents list | jq -r '.data[] | select(.identifier=="<agentGroupId>") | .id')
|
||||
CURRENT=$(onecli agents secrets --id "$AGENT_ID" | jq -r '[.data[]] | join(",")')
|
||||
MERGED=$(printf '%s' "$CURRENT,<new-secret-id>" | tr ',' '\n' | sort -u | paste -sd ',' -)
|
||||
onecli agents set-secrets --id "$AGENT_ID" --secret-ids "$MERGED"
|
||||
onecli agents secrets --id "$AGENT_ID"
|
||||
```
|
||||
|
||||
- `<agentGroupId>` — the `agentGroupId` field in `groups/<folder>/container.json`
|
||||
- `<new-secret-id>` — the `id` from `onecli secrets list`
|
||||
- Multiple new secrets: append them comma-separated before the `printf` step
|
||||
|
||||
### git over HTTPS
|
||||
|
||||
OneCLI's proxy injects credentials proactively — `injections_applied=1` appears in `docker logs onecli` even when git sends no auth header. However, OneCLI sets `SSL_CERT_FILE` for Node/Python/Deno but not `GIT_SSL_CAINFO`. Without it, git rejects the OneCLI MITM certificate.
|
||||
|
||||
**Auth format matters**: GitHub's git smart HTTP protocol (`github.com`) requires `Basic` auth, not `Bearer`. GitHub's REST API (`api.github.com`) accepts `Bearer`. These must be configured as separate secrets with different formats — see `/add-github` for the full setup.
|
||||
|
||||
If an agent uses `git` or `gh`, add to `data/v2-sessions/<agent-group-id>/.claude-shared/settings.json`:
|
||||
|
||||
```json
|
||||
"GIT_SSL_CAINFO": "/tmp/onecli-combined-ca.pem",
|
||||
"GIT_TERMINAL_PROMPT": "0",
|
||||
"GIT_CONFIG_COUNT": "1",
|
||||
"GIT_CONFIG_KEY_0": "credential.helper",
|
||||
"GIT_CONFIG_VALUE_0": "",
|
||||
"GH_TOKEN": "ghp_onecli_proxy_replaces_this"
|
||||
```
|
||||
|
||||
**Debugging injection**: `docker logs onecli 2>&1 | grep "github.com"` shows every request with `injections_applied=N` and the HTTP status. If `injections_applied=1` but status is still 401, the injected credential value is wrong or uses the wrong auth format for that endpoint.
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
**"OneCLI gateway not reachable" in logs:** The gateway isn't running. Check with `curl -sf ${ONECLI_URL}/health`. Start it with `onecli start` if needed.
|
||||
|
||||
@@ -91,7 +91,13 @@ RUN --mount=type=cache,target=/root/.bun/install/cache \
|
||||
# the SDK fails at spawn time with "native binary not found".
|
||||
ENV PNPM_HOME="/pnpm"
|
||||
ENV PATH="$PNPM_HOME:$PATH"
|
||||
RUN corepack enable
|
||||
# Pin pnpm to match the host (package.json packageManager). pnpm 11 stopped
|
||||
# honoring `only-built-dependencies[]=` in .npmrc for global installs, which
|
||||
# silently skips claude-code's native-binary postinstall and agent-browser's
|
||||
# bin chmod — the agent then crashes at runtime with "native binary not
|
||||
# installed". Keep this in lockstep with package.json's `packageManager`.
|
||||
ARG PNPM_VERSION=10.33.0
|
||||
RUN corepack enable && corepack prepare pnpm@${PNPM_VERSION} --activate
|
||||
|
||||
RUN --mount=type=cache,target=/root/.cache/pnpm \
|
||||
echo "only-built-dependencies[]=agent-browser" > /root/.npmrc && \
|
||||
|
||||
@@ -0,0 +1,29 @@
|
||||
/**
|
||||
* Per-batch context the poll loop publishes for downstream consumers
|
||||
* (MCP tools, etc.) that don't sit on the poll-loop's call stack.
|
||||
*
|
||||
* Today the only field is `inReplyTo` — the id of the first inbound
|
||||
* message in the batch the agent is currently processing. MCP tools like
|
||||
* `send_message` and `send_file` read this and stamp it onto the outbound
|
||||
* row so the host's a2a return-path routing can correlate replies back to
|
||||
* the originating session.
|
||||
*
|
||||
* This is module-level state on purpose: the agent-runner is single-process
|
||||
* and processes one batch at a time. Poll-loop calls `setCurrentInReplyTo`
|
||||
* before invoking the provider and `clearCurrentInReplyTo` after the batch
|
||||
* completes (or errors out).
|
||||
*/
|
||||
let currentInReplyTo: string | null = null;
|
||||
|
||||
export function setCurrentInReplyTo(id: string | null): void {
|
||||
currentInReplyTo = id;
|
||||
}
|
||||
|
||||
export function clearCurrentInReplyTo(): void {
|
||||
currentInReplyTo = null;
|
||||
}
|
||||
|
||||
export function getCurrentInReplyTo(): string | null {
|
||||
return currentInReplyTo;
|
||||
}
|
||||
|
||||
@@ -0,0 +1,63 @@
|
||||
import { afterEach, beforeEach, describe, expect, it } from 'bun:test';
|
||||
|
||||
import { closeSessionDb, getInboundDb, initTestSessionDb } from './db/connection.js';
|
||||
import { buildSystemPromptAddendum } from './destinations.js';
|
||||
|
||||
beforeEach(() => {
|
||||
initTestSessionDb();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
closeSessionDb();
|
||||
});
|
||||
|
||||
function seedDestination(name: string, displayName: string, channelType: string, platformId: string): void {
|
||||
getInboundDb()
|
||||
.prepare(
|
||||
`INSERT INTO destinations (name, display_name, type, channel_type, platform_id, agent_group_id)
|
||||
VALUES (?, ?, 'channel', ?, ?, NULL)`,
|
||||
)
|
||||
.run(name, displayName, channelType, platformId);
|
||||
}
|
||||
|
||||
describe('buildSystemPromptAddendum — multi-destination routing guidance', () => {
|
||||
it('includes default-routing nudge when there are >1 destinations', () => {
|
||||
seedDestination('casa', 'Casa', 'whatsapp', 'group-1@g.us');
|
||||
seedDestination('whatsapp-mg-17780', 'whatsapp-mg-17780', 'whatsapp', 'phone-2@s.whatsapp.net');
|
||||
|
||||
const prompt = buildSystemPromptAddendum('Casa');
|
||||
|
||||
expect(prompt).toContain('Default routing');
|
||||
expect(prompt).toContain('from="name"');
|
||||
expect(prompt).toContain('`casa`');
|
||||
expect(prompt).toContain('`whatsapp-mg-17780`');
|
||||
});
|
||||
|
||||
it('requires explicit wrapping even for a single destination', () => {
|
||||
seedDestination('casa', 'Casa', 'whatsapp', 'group-1@g.us');
|
||||
|
||||
const prompt = buildSystemPromptAddendum('Casa');
|
||||
|
||||
expect(prompt).toContain('Every response must be wrapped');
|
||||
expect(prompt).toContain('<message to="name">');
|
||||
expect(prompt).toContain('`casa`');
|
||||
});
|
||||
|
||||
it('handles the no-destination case without crashing', () => {
|
||||
const prompt = buildSystemPromptAddendum('Casa');
|
||||
|
||||
expect(prompt).toContain('no configured destinations');
|
||||
expect(prompt).not.toContain('Default routing');
|
||||
});
|
||||
|
||||
it('includes default-routing and wrapping instructions for single destination', () => {
|
||||
seedDestination('casa', 'Casa', 'whatsapp', 'group-1@g.us');
|
||||
|
||||
const prompt = buildSystemPromptAddendum('Casa');
|
||||
|
||||
expect(prompt).toContain('Every response must be wrapped');
|
||||
expect(prompt).toContain('<message to="name">');
|
||||
expect(prompt).toContain('Default routing');
|
||||
expect(prompt).toContain('`casa`');
|
||||
});
|
||||
});
|
||||
@@ -120,6 +120,10 @@ function buildDestinationsSection(): string {
|
||||
lines.push('Text outside of `<message>` blocks is scratchpad — logged but not sent anywhere.');
|
||||
lines.push('Use `<internal>...</internal>` to make scratchpad intent explicit.');
|
||||
lines.push('');
|
||||
lines.push(
|
||||
'**Default routing**: when replying to an incoming message, address the same destination the message came `from` — every inbound `<message>` tag carries a `from="name"` attribute that names the origin destination. Only address a different destination when the request itself asks you to (e.g., "tell Laura that…").',
|
||||
);
|
||||
lines.push('');
|
||||
lines.push(
|
||||
'To send a message mid-response (e.g., an acknowledgment before a long task), call the `send_message` MCP tool with the `to` parameter set to a destination name.',
|
||||
);
|
||||
|
||||
@@ -112,6 +112,125 @@ describe('poll loop integration', () => {
|
||||
await loopPromise.catch(() => {});
|
||||
});
|
||||
|
||||
it('bare text produces no outbound messages (scratchpad only)', async () => {
|
||||
insertMessage('m1', { sender: 'Alice', text: 'hello' }, { platformId: 'chan-1', channelType: 'discord' });
|
||||
|
||||
// Agent responds with bare text — no <message to="..."> wrapping
|
||||
const provider = new MockProvider({}, () => 'I am thinking about this...');
|
||||
const controller = new AbortController();
|
||||
const loopPromise = runPollLoopWithTimeout(provider, controller.signal, 2000);
|
||||
|
||||
// Wait long enough for the poll loop to process
|
||||
await sleep(1000);
|
||||
controller.abort();
|
||||
|
||||
const out = getUndeliveredMessages();
|
||||
expect(out).toHaveLength(0);
|
||||
|
||||
await loopPromise.catch(() => {});
|
||||
});
|
||||
|
||||
it('unknown destination is dropped, valid destination is sent', async () => {
|
||||
insertMessage('m1', { sender: 'Alice', text: 'hi' }, { platformId: 'chan-1', channelType: 'discord' });
|
||||
|
||||
const provider = new MockProvider(
|
||||
{},
|
||||
() => '<message to="nonexistent">dropped</message><message to="discord-test">delivered</message>',
|
||||
);
|
||||
const controller = new AbortController();
|
||||
const loopPromise = runPollLoopWithTimeout(provider, controller.signal, 2000);
|
||||
|
||||
await waitFor(() => getUndeliveredMessages().length > 0, 2000);
|
||||
controller.abort();
|
||||
|
||||
const out = getUndeliveredMessages();
|
||||
// Only the valid destination should produce output
|
||||
expect(out).toHaveLength(1);
|
||||
expect(JSON.parse(out[0].content).text).toBe('delivered');
|
||||
expect(out[0].platform_id).toBe('chan-1');
|
||||
|
||||
await loopPromise.catch(() => {});
|
||||
});
|
||||
|
||||
it('multiple <message> blocks each produce an outbound message', async () => {
|
||||
getInboundDb()
|
||||
.prepare(
|
||||
`INSERT INTO destinations (name, display_name, type, channel_type, platform_id, agent_group_id)
|
||||
VALUES ('slack-test', 'Slack Test', 'channel', 'slack', 'chan-2', NULL)`,
|
||||
)
|
||||
.run();
|
||||
|
||||
insertMessage('m1', { sender: 'Alice', text: 'broadcast' }, { platformId: 'chan-1', channelType: 'discord' });
|
||||
|
||||
const provider = new MockProvider(
|
||||
{},
|
||||
() => '<message to="discord-test">for discord</message><message to="slack-test">for slack</message>',
|
||||
);
|
||||
const controller = new AbortController();
|
||||
const loopPromise = runPollLoopWithTimeout(provider, controller.signal, 2000);
|
||||
|
||||
await waitFor(() => getUndeliveredMessages().length >= 2, 2000);
|
||||
controller.abort();
|
||||
|
||||
const out = getUndeliveredMessages();
|
||||
expect(out).toHaveLength(2);
|
||||
const discord = out.find((m) => m.platform_id === 'chan-1');
|
||||
const slack = out.find((m) => m.platform_id === 'chan-2');
|
||||
expect(discord).toBeDefined();
|
||||
expect(JSON.parse(discord!.content).text).toBe('for discord');
|
||||
expect(slack).toBeDefined();
|
||||
expect(JSON.parse(slack!.content).text).toBe('for slack');
|
||||
|
||||
await loopPromise.catch(() => {});
|
||||
});
|
||||
|
||||
it('sends null thread_id when no prior inbound from destination', async () => {
|
||||
// Seed a second destination that has NO inbound messages
|
||||
getInboundDb()
|
||||
.prepare(
|
||||
`INSERT INTO destinations (name, display_name, type, channel_type, platform_id, agent_group_id)
|
||||
VALUES ('slack-new', 'Slack New', 'channel', 'slack', 'chan-new', NULL)`,
|
||||
)
|
||||
.run();
|
||||
|
||||
// Only insert a message from discord — slack-new has never sent anything
|
||||
insertMessage('m1', { sender: 'Alice', text: 'tell slack' }, { platformId: 'chan-1', channelType: 'discord', threadId: 'discord-thread' });
|
||||
|
||||
const provider = new MockProvider({}, () => '<message to="slack-new">hello slack</message>');
|
||||
const controller = new AbortController();
|
||||
const loopPromise = runPollLoopWithTimeout(provider, controller.signal, 2000);
|
||||
|
||||
await waitFor(() => getUndeliveredMessages().length > 0, 2000);
|
||||
controller.abort();
|
||||
|
||||
const out = getUndeliveredMessages();
|
||||
expect(out).toHaveLength(1);
|
||||
expect(out[0].platform_id).toBe('chan-new');
|
||||
expect(out[0].thread_id).toBeNull();
|
||||
|
||||
await loopPromise.catch(() => {});
|
||||
});
|
||||
|
||||
it('resolves most recent thread_id when destination has multiple inbound messages', async () => {
|
||||
// Two messages from same destination, different threads
|
||||
insertMessage('m-old', { sender: 'Alice', text: 'old' }, { platformId: 'chan-1', channelType: 'discord', threadId: 'thread-old' });
|
||||
insertMessage('m-new', { sender: 'Alice', text: 'new' }, { platformId: 'chan-1', channelType: 'discord', threadId: 'thread-new' });
|
||||
|
||||
const provider = new MockProvider({}, () => '<message to="discord-test">reply</message>');
|
||||
const controller = new AbortController();
|
||||
const loopPromise = runPollLoopWithTimeout(provider, controller.signal, 2000);
|
||||
|
||||
await waitFor(() => getUndeliveredMessages().length > 0, 2000);
|
||||
controller.abort();
|
||||
|
||||
const out = getUndeliveredMessages();
|
||||
expect(out).toHaveLength(1);
|
||||
expect(out[0].thread_id).toBe('thread-new');
|
||||
expect(out[0].in_reply_to).toBe('m-new');
|
||||
|
||||
await loopPromise.catch(() => {});
|
||||
});
|
||||
|
||||
it('should process messages arriving after loop starts', async () => {
|
||||
const provider = new MockProvider({}, () => '<message to="discord-test">Processed</message>');
|
||||
const controller = new AbortController();
|
||||
@@ -129,8 +248,161 @@ describe('poll loop integration', () => {
|
||||
|
||||
await loopPromise.catch(() => {});
|
||||
});
|
||||
|
||||
it('internal tags between message blocks are stripped from scratchpad', async () => {
|
||||
insertMessage('m1', { sender: 'Alice', text: 'hi' }, { platformId: 'chan-1', channelType: 'discord' });
|
||||
|
||||
const provider = new MockProvider(
|
||||
{},
|
||||
() => '<internal>thinking about this...</internal><message to="discord-test">answer</message><internal>done thinking</internal>',
|
||||
);
|
||||
const controller = new AbortController();
|
||||
const loopPromise = runPollLoopWithTimeout(provider, controller.signal, 2000);
|
||||
|
||||
await waitFor(() => getUndeliveredMessages().length > 0, 2000);
|
||||
controller.abort();
|
||||
|
||||
const out = getUndeliveredMessages();
|
||||
expect(out).toHaveLength(1);
|
||||
expect(JSON.parse(out[0].content).text).toBe('answer');
|
||||
|
||||
await loopPromise.catch(() => {});
|
||||
});
|
||||
|
||||
it('handles mixed task + chat batch with correct origin metadata', async () => {
|
||||
// Seed destination for routing lookup
|
||||
insertMessage('m-chat', { sender: 'Alice', text: 'check this' }, { platformId: 'chan-1', channelType: 'discord' });
|
||||
// Task with same routing — simulates a scheduled task in a channel session
|
||||
getInboundDb()
|
||||
.prepare(
|
||||
`INSERT INTO messages_in (id, kind, timestamp, status, platform_id, channel_type, content)
|
||||
VALUES ('t-task', 'task', datetime('now'), 'pending', 'chan-1', 'discord', ?)`,
|
||||
)
|
||||
.run(JSON.stringify({ prompt: 'daily check' }));
|
||||
|
||||
const provider = new MockProvider({}, () => '<message to="discord-test">done</message>');
|
||||
const controller = new AbortController();
|
||||
const loopPromise = runPollLoopWithTimeout(provider, controller.signal, 2000);
|
||||
|
||||
await waitFor(() => getUndeliveredMessages().length > 0, 2000);
|
||||
controller.abort();
|
||||
|
||||
const out = getUndeliveredMessages();
|
||||
expect(out).toHaveLength(1);
|
||||
expect(out[0].platform_id).toBe('chan-1');
|
||||
|
||||
await loopPromise.catch(() => {});
|
||||
});
|
||||
|
||||
it('should inject destination reminder after a compacted event', async () => {
|
||||
// Two destinations — required for the reminder to fire (single-destination
|
||||
// groups have a fallback path that works without <message to="…"> wrapping).
|
||||
getInboundDb()
|
||||
.prepare(
|
||||
`INSERT INTO destinations (name, display_name, type, channel_type, platform_id, agent_group_id)
|
||||
VALUES ('discord-second', 'Discord Second', 'channel', 'discord', 'chan-2', NULL)`,
|
||||
)
|
||||
.run();
|
||||
|
||||
insertMessage('m1', { sender: 'Alice', text: 'First message' }, { platformId: 'chan-1', channelType: 'discord' });
|
||||
|
||||
const provider = new CompactingProvider();
|
||||
const controller = new AbortController();
|
||||
const loopPromise = runPollLoopWithTimeout(provider as unknown as MockProvider, controller.signal, 2500);
|
||||
|
||||
await waitFor(() => getUndeliveredMessages().length > 0, 2500);
|
||||
controller.abort();
|
||||
|
||||
expect(provider.pushes.length).toBeGreaterThanOrEqual(1);
|
||||
const reminder = provider.pushes.find((p) => p.includes('Context was just compacted'));
|
||||
expect(reminder).toBeDefined();
|
||||
expect(reminder).toContain('2 destinations');
|
||||
expect(reminder).toContain('discord-test');
|
||||
expect(reminder).toContain('discord-second');
|
||||
expect(reminder).toContain('<message to="name">');
|
||||
|
||||
await loopPromise.catch(() => {});
|
||||
});
|
||||
|
||||
it('should NOT inject destination reminder with a single destination', async () => {
|
||||
insertMessage('m1', { sender: 'Alice', text: 'First message' }, { platformId: 'chan-1', channelType: 'discord' });
|
||||
|
||||
const provider = new CompactingProvider();
|
||||
const controller = new AbortController();
|
||||
const loopPromise = runPollLoopWithTimeout(provider as unknown as MockProvider, controller.signal, 2500);
|
||||
|
||||
await waitFor(() => getUndeliveredMessages().length > 0, 2500);
|
||||
controller.abort();
|
||||
|
||||
// Only the original prompt push (if any) — no reminder, since beforeEach
|
||||
// seeds exactly one destination.
|
||||
const reminders = provider.pushes.filter((p) => p.includes('Context was just compacted'));
|
||||
expect(reminders).toHaveLength(0);
|
||||
|
||||
await loopPromise.catch(() => {});
|
||||
});
|
||||
});
|
||||
|
||||
/**
|
||||
* Provider that emits a single compacted event mid-stream, then returns a
|
||||
* result. Captures every push() call so tests can assert on the injected
|
||||
* reminder content.
|
||||
*/
|
||||
class CompactingProvider {
|
||||
readonly supportsNativeSlashCommands = false;
|
||||
readonly pushes: string[] = [];
|
||||
|
||||
isSessionInvalid(): boolean {
|
||||
return false;
|
||||
}
|
||||
|
||||
query(_input: { prompt: string; cwd: string }) {
|
||||
const pushes = this.pushes;
|
||||
let ended = false;
|
||||
let aborted = false;
|
||||
let resolveWaiter: (() => void) | null = null;
|
||||
|
||||
async function* events() {
|
||||
yield { type: 'activity' as const };
|
||||
yield { type: 'init' as const, continuation: 'compaction-test-session' };
|
||||
yield { type: 'activity' as const };
|
||||
yield { type: 'compacted' as const, text: 'Context compacted (50,000 tokens compacted).' };
|
||||
|
||||
// Wait for poll-loop to push the reminder (or end / abort)
|
||||
await new Promise<void>((resolve) => {
|
||||
resolveWaiter = resolve;
|
||||
// Belt-and-braces: don't hang forever if the reminder never arrives
|
||||
setTimeout(resolve, 200);
|
||||
});
|
||||
|
||||
yield { type: 'activity' as const };
|
||||
yield { type: 'result' as const, text: '<message to="discord-test">ack</message>' };
|
||||
while (!ended && !aborted) {
|
||||
await new Promise<void>((resolve) => {
|
||||
resolveWaiter = resolve;
|
||||
setTimeout(resolve, 50);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
push(message: string) {
|
||||
pushes.push(message);
|
||||
resolveWaiter?.();
|
||||
},
|
||||
end() {
|
||||
ended = true;
|
||||
resolveWaiter?.();
|
||||
},
|
||||
abort() {
|
||||
aborted = true;
|
||||
resolveWaiter?.();
|
||||
},
|
||||
events: events(),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// Helper: run poll loop until aborted or timeout
|
||||
async function runPollLoopWithTimeout(provider: MockProvider, signal: AbortSignal, timeoutMs: number): Promise<void> {
|
||||
return Promise.race([
|
||||
|
||||
@@ -0,0 +1,50 @@
|
||||
/**
|
||||
* Tests for the core MCP tools' interaction with the per-batch routing
|
||||
* context. The agent-runner sets a current `inReplyTo` at the top of each
|
||||
* batch in poll-loop, and outbound writes from MCP tools (send_message,
|
||||
* send_file) must pick it up so a2a return-path routing on the host can
|
||||
* correlate replies back to the originating session.
|
||||
*/
|
||||
import { describe, it, expect, beforeEach, afterEach } from 'bun:test';
|
||||
|
||||
import { initTestSessionDb, closeSessionDb, getInboundDb } from '../db/connection.js';
|
||||
import { getUndeliveredMessages } from '../db/messages-out.js';
|
||||
import { setCurrentInReplyTo, clearCurrentInReplyTo } from '../current-batch.js';
|
||||
import { sendMessage } from './core.js';
|
||||
|
||||
beforeEach(() => {
|
||||
initTestSessionDb();
|
||||
// Seed a peer agent destination
|
||||
getInboundDb()
|
||||
.prepare(
|
||||
`INSERT INTO destinations (name, display_name, type, channel_type, platform_id, agent_group_id)
|
||||
VALUES ('peer', 'Peer', 'agent', NULL, NULL, 'ag-peer')`,
|
||||
)
|
||||
.run();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
clearCurrentInReplyTo();
|
||||
closeSessionDb();
|
||||
});
|
||||
|
||||
describe('send_message MCP tool — in_reply_to plumbing', () => {
|
||||
it('stamps current batch in_reply_to on outbound rows', async () => {
|
||||
setCurrentInReplyTo('inbound-msg-1');
|
||||
|
||||
await sendMessage.handler({ to: 'peer', text: 'hello' });
|
||||
|
||||
const out = getUndeliveredMessages();
|
||||
expect(out).toHaveLength(1);
|
||||
expect(out[0].in_reply_to).toBe('inbound-msg-1');
|
||||
});
|
||||
|
||||
it('writes null when no batch is active', async () => {
|
||||
// No setCurrentInReplyTo before this call — simulates ad-hoc / out-of-batch invocation.
|
||||
await sendMessage.handler({ to: 'peer', text: 'hello' });
|
||||
|
||||
const out = getUndeliveredMessages();
|
||||
expect(out).toHaveLength(1);
|
||||
expect(out[0].in_reply_to).toBeNull();
|
||||
});
|
||||
});
|
||||
@@ -9,6 +9,7 @@
|
||||
import fs from 'fs';
|
||||
import path from 'path';
|
||||
|
||||
import { getCurrentInReplyTo } from '../current-batch.js';
|
||||
import { findByName, getAllDestinations } from '../destinations.js';
|
||||
import { getMessageIdBySeq, getRoutingBySeq, writeMessageOut } from '../db/messages-out.js';
|
||||
import { getSessionRouting } from '../db/session-routing.js';
|
||||
@@ -50,9 +51,7 @@ function destinationList(): string {
|
||||
*/
|
||||
function resolveRouting(
|
||||
to: string | undefined,
|
||||
):
|
||||
| { channel_type: string; platform_id: string; thread_id: string | null; resolvedName: string }
|
||||
| { error: string } {
|
||||
): { channel_type: string; platform_id: string; thread_id: string | null; resolvedName: string } | { error: string } {
|
||||
if (!to) {
|
||||
// Default: reply to whatever thread/channel this session is bound to.
|
||||
const session = getSessionRouting();
|
||||
@@ -82,9 +81,7 @@ function resolveRouting(
|
||||
// preserve the thread_id so replies land in the correct thread.
|
||||
const session = getSessionRouting();
|
||||
const threadId =
|
||||
session.channel_type === dest.channelType && session.platform_id === dest.platformId
|
||||
? session.thread_id
|
||||
: null;
|
||||
session.channel_type === dest.channelType && session.platform_id === dest.platformId ? session.thread_id : null;
|
||||
return {
|
||||
channel_type: dest.channelType!,
|
||||
platform_id: dest.platformId!,
|
||||
@@ -98,12 +95,14 @@ function resolveRouting(
|
||||
export const sendMessage: McpToolDefinition = {
|
||||
tool: {
|
||||
name: 'send_message',
|
||||
description:
|
||||
'Send a message to a named destination. If you have only one destination, you can omit `to`.',
|
||||
description: 'Send a message to a named destination. If you have only one destination, you can omit `to`.',
|
||||
inputSchema: {
|
||||
type: 'object' as const,
|
||||
properties: {
|
||||
to: { type: 'string', description: 'Destination name (e.g., "family", "worker-1"). Optional if you have only one destination.' },
|
||||
to: {
|
||||
type: 'string',
|
||||
description: 'Destination name (e.g., "family", "worker-1"). Optional if you have only one destination.',
|
||||
},
|
||||
text: { type: 'string', description: 'Message content' },
|
||||
},
|
||||
required: ['text'],
|
||||
@@ -119,6 +118,7 @@ export const sendMessage: McpToolDefinition = {
|
||||
const id = generateId();
|
||||
const seq = writeMessageOut({
|
||||
id,
|
||||
in_reply_to: getCurrentInReplyTo(),
|
||||
kind: 'chat',
|
||||
platform_id: routing.platform_id,
|
||||
channel_type: routing.channel_type,
|
||||
@@ -165,6 +165,7 @@ export const sendFile: McpToolDefinition = {
|
||||
|
||||
writeMessageOut({
|
||||
id,
|
||||
in_reply_to: getCurrentInReplyTo(),
|
||||
kind: 'chat',
|
||||
platform_id: routing.platform_id,
|
||||
channel_type: routing.channel_type,
|
||||
|
||||
@@ -149,6 +149,76 @@ describe('routing', () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe('origin metadata (from= attribute)', () => {
|
||||
function seedDestination(name: string, channelType: string, platformId: string): void {
|
||||
getInboundDb()
|
||||
.prepare(
|
||||
`INSERT INTO destinations (name, display_name, type, channel_type, platform_id, agent_group_id)
|
||||
VALUES (?, ?, 'channel', ?, ?, NULL)`,
|
||||
)
|
||||
.run(name, name, channelType, platformId);
|
||||
}
|
||||
|
||||
function insertWithRouting(id: string, kind: string, content: object, channelType: string | null, platformId: string | null): void {
|
||||
getInboundDb()
|
||||
.prepare(
|
||||
`INSERT INTO messages_in (id, kind, timestamp, status, platform_id, channel_type, content)
|
||||
VALUES (?, ?, datetime('now'), 'pending', ?, ?, ?)`,
|
||||
)
|
||||
.run(id, kind, platformId, channelType, JSON.stringify(content));
|
||||
}
|
||||
|
||||
it('chat message includes from= when destination matches', () => {
|
||||
seedDestination('discord-main', 'discord', 'chan-1');
|
||||
insertWithRouting('m1', 'chat', { sender: 'Alice', text: 'hi' }, 'discord', 'chan-1');
|
||||
const prompt = formatMessages(getPendingMessages());
|
||||
expect(prompt).toContain('from="discord-main"');
|
||||
});
|
||||
|
||||
it('chat message falls back to raw routing when no destination matches', () => {
|
||||
insertWithRouting('m1', 'chat', { sender: 'Alice', text: 'hi' }, 'telegram', 'chat-999');
|
||||
const prompt = formatMessages(getPendingMessages());
|
||||
expect(prompt).toContain('from="unknown:telegram:chat-999"');
|
||||
});
|
||||
|
||||
it('chat message omits from= when routing is null', () => {
|
||||
insertMessage('m1', 'chat', { sender: 'Alice', text: 'hi' });
|
||||
const prompt = formatMessages(getPendingMessages());
|
||||
expect(prompt).not.toContain('from=');
|
||||
});
|
||||
|
||||
it('task message includes from= when destination matches', () => {
|
||||
seedDestination('slack-ops', 'slack', 'C-OPS');
|
||||
insertWithRouting('t1', 'task', { prompt: 'check status' }, 'slack', 'C-OPS');
|
||||
const prompt = formatMessages(getPendingMessages());
|
||||
expect(prompt).toContain('<task');
|
||||
expect(prompt).toContain('from="slack-ops"');
|
||||
});
|
||||
|
||||
it('task message omits from= when routing is null', () => {
|
||||
insertMessage('t1', 'task', { prompt: 'check status' });
|
||||
const prompt = formatMessages(getPendingMessages());
|
||||
expect(prompt).toContain('<task');
|
||||
expect(prompt).not.toContain('from=');
|
||||
});
|
||||
|
||||
it('webhook message includes from= when destination matches', () => {
|
||||
seedDestination('github-ch', 'github', 'repo-1');
|
||||
insertWithRouting('w1', 'webhook', { source: 'github', event: 'push', payload: {} }, 'github', 'repo-1');
|
||||
const prompt = formatMessages(getPendingMessages());
|
||||
expect(prompt).toContain('<webhook');
|
||||
expect(prompt).toContain('from="github-ch"');
|
||||
});
|
||||
|
||||
it('system message includes from= when destination matches', () => {
|
||||
seedDestination('discord-main', 'discord', 'chan-1');
|
||||
insertWithRouting('s1', 'system', { action: 'test', status: 'ok', result: null }, 'discord', 'chan-1');
|
||||
const prompt = formatMessages(getPendingMessages());
|
||||
expect(prompt).toContain('<system_response');
|
||||
expect(prompt).toContain('from="discord-main"');
|
||||
});
|
||||
});
|
||||
|
||||
describe('mock provider', () => {
|
||||
it('should produce init + result events', async () => {
|
||||
const provider = new MockProvider({}, (prompt) => `Echo: ${prompt}`);
|
||||
|
||||
@@ -1,13 +1,18 @@
|
||||
import { findByName, type DestinationEntry } from './destinations.js';
|
||||
import { findByName, getAllDestinations, type DestinationEntry } from './destinations.js';
|
||||
import { getPendingMessages, markProcessing, markCompleted, type MessageInRow } from './db/messages-in.js';
|
||||
import { writeMessageOut } from './db/messages-out.js';
|
||||
import { getInboundDb, touchHeartbeat, clearStaleProcessingAcks } from './db/connection.js';
|
||||
import { clearContinuation, migrateLegacyContinuation, setContinuation } from './db/session-state.js';
|
||||
import { clearCurrentInReplyTo, setCurrentInReplyTo } from './current-batch.js';
|
||||
import {
|
||||
clearContinuation,
|
||||
migrateLegacyContinuation,
|
||||
setContinuation,
|
||||
} from './db/session-state.js';
|
||||
import { formatMessages, extractRouting, categorizeMessage, isClearCommand, isRunnerCommand, stripInternalTags, type RoutingContext } from './formatter.js';
|
||||
formatMessages,
|
||||
extractRouting,
|
||||
categorizeMessage,
|
||||
isClearCommand,
|
||||
isRunnerCommand,
|
||||
stripInternalTags,
|
||||
type RoutingContext,
|
||||
} from './formatter.js';
|
||||
import type { AgentProvider, AgentQuery, ProviderEvent } from './providers/types.js';
|
||||
|
||||
const POLL_INTERVAL_MS = 1000;
|
||||
@@ -170,6 +175,9 @@ export async function runPollLoop(config: PollLoopConfig): Promise<void> {
|
||||
// Process the query while concurrently polling for new messages
|
||||
const skippedSet = new Set(skipped);
|
||||
const processingIds = ids.filter((id) => !commandIds.includes(id) && !skippedSet.has(id));
|
||||
// Publish the batch's in_reply_to so MCP tools (send_message, send_file)
|
||||
// can stamp it on outbound rows — needed for a2a return-path routing.
|
||||
setCurrentInReplyTo(routing.inReplyTo);
|
||||
try {
|
||||
const result = await processQuery(query, routing, processingIds, config.providerName);
|
||||
if (result.continuation && result.continuation !== continuation) {
|
||||
@@ -198,6 +206,8 @@ export async function runPollLoop(config: PollLoopConfig): Promise<void> {
|
||||
thread_id: routing.threadId,
|
||||
content: JSON.stringify({ text: `Error: ${errMsg}` }),
|
||||
});
|
||||
} finally {
|
||||
clearCurrentInReplyTo();
|
||||
}
|
||||
|
||||
// Ensure completed even if processQuery ended without a result event
|
||||
@@ -366,6 +376,23 @@ async function processQuery(
|
||||
if (event.text) {
|
||||
dispatchResultText(event.text, routing);
|
||||
}
|
||||
} else if (event.type === 'compacted') {
|
||||
// The SDK auto-compacted the conversation. After compaction the
|
||||
// model often drops the learned `<message to="…">` wrapping
|
||||
// discipline (the destinations are still in the system prompt,
|
||||
// but the behavioral pattern is summarized away). Inject a
|
||||
// reminder back into the live query so the next turn re-anchors
|
||||
// on the destination model. Only do this when there's >1
|
||||
// destination — single-destination groups have a fallback that
|
||||
// works without wrapping. See qwibitai/nanoclaw#2325.
|
||||
const destinations = getAllDestinations();
|
||||
if (destinations.length > 1) {
|
||||
const names = destinations.map((d) => d.name).join(', ');
|
||||
query.push(
|
||||
`[system] Context was just compacted. Reminder: you have ${destinations.length} destinations (${names}). ` +
|
||||
`Use <message to="name"> blocks to address them. Bare text goes to the scratchpad fallback only.`,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
@@ -385,11 +412,16 @@ function handleEvent(event: ProviderEvent, _routing: RoutingContext): void {
|
||||
log(`Result: ${event.text ? event.text.slice(0, 200) : '(empty)'}`);
|
||||
break;
|
||||
case 'error':
|
||||
log(`Error: ${event.message} (retryable: ${event.retryable}${event.classification ? `, ${event.classification}` : ''})`);
|
||||
log(
|
||||
`Error: ${event.message} (retryable: ${event.retryable}${event.classification ? `, ${event.classification}` : ''})`,
|
||||
);
|
||||
break;
|
||||
case 'progress':
|
||||
log(`Progress: ${event.message}`);
|
||||
break;
|
||||
case 'compacted':
|
||||
log(`Compacted: ${event.text}`);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -329,7 +329,7 @@ export class ClaudeProvider implements AgentProvider {
|
||||
} else if (message.type === 'system' && (message as { subtype?: string }).subtype === 'compact_boundary') {
|
||||
const meta = (message as { compact_metadata?: { pre_tokens?: number } }).compact_metadata;
|
||||
const detail = meta?.pre_tokens ? ` (${meta.pre_tokens.toLocaleString()} tokens compacted)` : '';
|
||||
yield { type: 'result', text: `Context compacted${detail}.` };
|
||||
yield { type: 'compacted', text: `Context compacted${detail}.` };
|
||||
} else if (message.type === 'system' && (message as { subtype?: string }).subtype === 'task_notification') {
|
||||
const tn = message as { summary?: string };
|
||||
yield { type: 'progress', message: tn.summary || 'Task notification' };
|
||||
|
||||
@@ -79,4 +79,12 @@ export type ProviderEvent =
|
||||
* event (tool call, thinking, partial message, anything) so the
|
||||
* poll-loop's idle timer stays honest during long tool runs.
|
||||
*/
|
||||
| { type: 'activity' };
|
||||
| { type: 'activity' }
|
||||
/**
|
||||
* The provider's underlying SDK auto-compacted the conversation context.
|
||||
* The poll-loop reacts by injecting a destination reminder back into
|
||||
* the live query so the agent doesn't drop `<message to="…">` wrapping
|
||||
* after compaction. Distinct from `result` so it doesn't mark the turn
|
||||
* completed or get dispatched as a chat message. See qwibitai/nanoclaw#2325.
|
||||
*/
|
||||
| { type: 'compacted'; text: string };
|
||||
|
||||
+1
-1
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "nanoclaw",
|
||||
"version": "2.0.33",
|
||||
"version": "2.0.40",
|
||||
"description": "Personal Claude assistant. Lightweight, secure, customizable.",
|
||||
"type": "module",
|
||||
"packageManager": "pnpm@10.33.0",
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" width="90" height="20" role="img" aria-label="141k tokens, 71% of context window">
|
||||
<title>141k tokens, 71% of context window</title>
|
||||
<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" width="90" height="20" role="img" aria-label="147k tokens, 74% of context window">
|
||||
<title>147k tokens, 74% of context window</title>
|
||||
<linearGradient id="s" x2="0" y2="100%">
|
||||
<stop offset="0" stop-color="#bbb" stop-opacity=".1"/>
|
||||
<stop offset="1" stop-opacity=".1"/>
|
||||
@@ -15,8 +15,8 @@
|
||||
<g fill="#fff" text-anchor="middle" font-family="Verdana,Geneva,DejaVu Sans,sans-serif" font-size="11">
|
||||
<text aria-hidden="true" x="26" y="15" fill="#010101" fill-opacity=".3">tokens</text>
|
||||
<text x="26" y="14">tokens</text>
|
||||
<text aria-hidden="true" x="71" y="15" fill="#010101" fill-opacity=".3">141k</text>
|
||||
<text x="71" y="14">141k</text>
|
||||
<text aria-hidden="true" x="71" y="15" fill="#010101" fill-opacity=".3">147k</text>
|
||||
<text x="71" y="14">147k</text>
|
||||
</g>
|
||||
</g>
|
||||
</a>
|
||||
|
||||
|
Before Width: | Height: | Size: 1.1 KiB After Width: | Height: | Size: 1.1 KiB |
+46
-4
@@ -468,7 +468,7 @@ async function main(): Promise<void> {
|
||||
} else if (channelChoice === 'imessage') {
|
||||
result = await runIMessageChannel(displayName!);
|
||||
} else if (channelChoice === 'other') {
|
||||
await askOtherChannelName();
|
||||
result = await askOtherChannelName();
|
||||
} else {
|
||||
p.log.info(
|
||||
brandBody(
|
||||
@@ -740,12 +740,38 @@ async function runAuthStep(): Promise<void> {
|
||||
label: 'Paste an Anthropic API key',
|
||||
hint: 'pay-per-use via console.anthropic.com',
|
||||
},
|
||||
{
|
||||
value: 'skip',
|
||||
label: "Skip — I'll connect later",
|
||||
hint: 'not recommended — Claude helps debug setup issues',
|
||||
},
|
||||
],
|
||||
}),
|
||||
) as 'subscription' | 'oauth' | 'api';
|
||||
) as 'subscription' | 'oauth' | 'api' | 'skip';
|
||||
setupLog.userInput('auth_method', method);
|
||||
phEmit('auth_method_chosen', { method });
|
||||
|
||||
if (method === 'skip') {
|
||||
const confirmed = ensureAnswer(
|
||||
await p.confirm({
|
||||
message:
|
||||
"Skip Claude sign-in? The agent won't be able to run until you connect, and we won't be able to help debug setup errors.",
|
||||
initialValue: false,
|
||||
}),
|
||||
);
|
||||
if (!confirmed) {
|
||||
// Loop back to the auth picker so they can choose a real method.
|
||||
return runAuthStep();
|
||||
}
|
||||
setupLog.step('auth', 'skipped', 0, { REASON: 'user-skipped' });
|
||||
p.log.warn(
|
||||
brandBody(
|
||||
'Claude sign-in skipped. Re-run setup or run `bash nanoclaw.sh` to finish later.',
|
||||
),
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
if (method === 'subscription') {
|
||||
await runSubscriptionAuth();
|
||||
} else {
|
||||
@@ -1099,10 +1125,26 @@ async function askChannelChoice(): Promise<ChannelChoice> {
|
||||
return choice;
|
||||
}
|
||||
|
||||
async function askOtherChannelName(): Promise<void> {
|
||||
async function askOtherChannelName(): Promise<void | typeof BACK_TO_CHANNEL_SELECTION> {
|
||||
const action = ensureAnswer(
|
||||
await brightSelect<'type' | 'back'>({
|
||||
message: 'Which channel would you like to install?',
|
||||
options: [
|
||||
{
|
||||
value: 'type',
|
||||
label: 'Type the channel name',
|
||||
hint: 'e.g. matrix, github, linear, webex',
|
||||
},
|
||||
{ value: 'back', label: '← Back to channel selection' },
|
||||
],
|
||||
initialValue: 'type',
|
||||
}),
|
||||
);
|
||||
if (action === 'back') return BACK_TO_CHANNEL_SELECTION;
|
||||
|
||||
const answer = ensureAnswer(
|
||||
await p.text({
|
||||
message: 'Which channel would you like to install?',
|
||||
message: 'Channel name',
|
||||
placeholder: 'e.g. matrix, github, linear, webex',
|
||||
}),
|
||||
);
|
||||
|
||||
@@ -307,8 +307,14 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter
|
||||
// Start local HTTP server to receive forwarded Gateway events (including interactions)
|
||||
const webhookUrl = await startLocalWebhookServer(gatewayAdapter, setupConfig, config.botToken);
|
||||
|
||||
// Exponential backoff capped at 1h. Without this, an unrecoverable
|
||||
// failure (e.g., TokenInvalid) restarts ~10×/sec and Discord's
|
||||
// Cloudflare layer issues a multi-hour IP block. A run that lasts
|
||||
// longer than 5 minutes counts as healthy and resets the counter.
|
||||
let consecutiveFailures = 0;
|
||||
const startGateway = () => {
|
||||
if (gatewayAbort?.signal.aborted) return;
|
||||
const startedAt = Date.now();
|
||||
// Capture the long-running listener promise via waitUntil
|
||||
let listenerPromise: Promise<unknown> | undefined;
|
||||
gatewayAdapter.startGatewayListener!(
|
||||
@@ -323,21 +329,30 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter
|
||||
).then(() => {
|
||||
// startGatewayListener resolves immediately with a Response;
|
||||
// the actual work is in the listenerPromise passed to waitUntil
|
||||
if (listenerPromise) {
|
||||
listenerPromise
|
||||
.then(() => {
|
||||
if (!gatewayAbort?.signal.aborted) {
|
||||
log.info('Gateway listener expired, restarting', { adapter: adapter.name });
|
||||
startGateway();
|
||||
}
|
||||
})
|
||||
.catch((err) => {
|
||||
if (!gatewayAbort?.signal.aborted) {
|
||||
log.error('Gateway listener error, restarting in 5s', { adapter: adapter.name, err });
|
||||
setTimeout(startGateway, 5000);
|
||||
}
|
||||
if (!listenerPromise) return;
|
||||
const reschedule = (err?: unknown) => {
|
||||
if (gatewayAbort?.signal.aborted) return;
|
||||
const ranForMs = Date.now() - startedAt;
|
||||
if (ranForMs > 5 * 60 * 1000) consecutiveFailures = 0;
|
||||
else consecutiveFailures++;
|
||||
const delayMs = Math.min(60 * 60 * 1000, 2 ** consecutiveFailures * 1000);
|
||||
if (err) {
|
||||
log.error('Gateway listener error, retrying', {
|
||||
adapter: adapter.name,
|
||||
err,
|
||||
consecutiveFailures,
|
||||
delayMs,
|
||||
});
|
||||
}
|
||||
} else {
|
||||
log.info('Gateway listener expired, restarting', {
|
||||
adapter: adapter.name,
|
||||
consecutiveFailures,
|
||||
delayMs,
|
||||
});
|
||||
}
|
||||
setTimeout(startGateway, delayMs);
|
||||
};
|
||||
listenerPromise.then(() => reschedule()).catch(reschedule);
|
||||
});
|
||||
};
|
||||
startGateway();
|
||||
|
||||
+7
-1
@@ -171,7 +171,13 @@ CREATE TABLE IF NOT EXISTS messages_in (
|
||||
platform_id TEXT,
|
||||
channel_type TEXT,
|
||||
thread_id TEXT,
|
||||
content TEXT NOT NULL
|
||||
content TEXT NOT NULL,
|
||||
-- For agent-to-agent inbound rows: the source session that emitted the
|
||||
-- triggering outbound. Used as a return path when the target replies —
|
||||
-- the reply routes back to this exact session, not to the source agent
|
||||
-- group's "newest" session. NULL on channel-side inbound and on a2a rows
|
||||
-- written before this column existed.
|
||||
source_session_id TEXT
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_messages_in_series ON messages_in(series_id);
|
||||
|
||||
|
||||
@@ -10,7 +10,7 @@ import fs from 'fs';
|
||||
import path from 'path';
|
||||
import { describe, it, expect, afterEach } from 'vitest';
|
||||
|
||||
import { migrateMessagesInTable } from './session-db.js';
|
||||
import { getInboundSourceSessionId, migrateMessagesInTable } from './session-db.js';
|
||||
|
||||
const TEST_DIR = '/tmp/nanoclaw-session-db-test';
|
||||
const DB_PATH = path.join(TEST_DIR, 'inbound.db');
|
||||
@@ -55,4 +55,40 @@ describe('migrateMessagesInTable', () => {
|
||||
expect(row.series_id).toBe('legacy-1');
|
||||
db.close();
|
||||
});
|
||||
|
||||
it('adds source_session_id on a legacy DB, leaves existing rows NULL, is idempotent', () => {
|
||||
if (fs.existsSync(TEST_DIR)) fs.rmSync(TEST_DIR, { recursive: true });
|
||||
fs.mkdirSync(TEST_DIR, { recursive: true });
|
||||
|
||||
const db = new Database(DB_PATH);
|
||||
db.exec(`
|
||||
CREATE TABLE messages_in (
|
||||
id TEXT PRIMARY KEY,
|
||||
seq INTEGER UNIQUE,
|
||||
kind TEXT NOT NULL,
|
||||
timestamp TEXT NOT NULL,
|
||||
status TEXT DEFAULT 'pending',
|
||||
process_after TEXT,
|
||||
recurrence TEXT,
|
||||
tries INTEGER DEFAULT 0,
|
||||
platform_id TEXT,
|
||||
channel_type TEXT,
|
||||
thread_id TEXT,
|
||||
content TEXT NOT NULL
|
||||
);
|
||||
`);
|
||||
db.prepare(
|
||||
"INSERT INTO messages_in (id, seq, kind, timestamp, status, content) VALUES (?, ?, 'chat', datetime('now'), 'pending', '{}')",
|
||||
).run('legacy-2', 2);
|
||||
|
||||
migrateMessagesInTable(db);
|
||||
migrateMessagesInTable(db); // idempotent
|
||||
|
||||
const cols = (db.prepare("PRAGMA table_info('messages_in')").all() as Array<{ name: string }>).map((c) => c.name);
|
||||
expect(cols).toContain('source_session_id');
|
||||
|
||||
expect(getInboundSourceSessionId(db, 'legacy-2')).toBeNull();
|
||||
expect(getInboundSourceSessionId(db, 'does-not-exist')).toBeNull();
|
||||
db.close();
|
||||
});
|
||||
});
|
||||
|
||||
+53
-2
@@ -108,14 +108,21 @@ export function insertMessage(
|
||||
* Host countDueMessages gates on this; container reads everything.
|
||||
*/
|
||||
trigger?: 0 | 1;
|
||||
/**
|
||||
* For agent-to-agent inbound: the source session id that emitted the
|
||||
* outbound message which became this inbound row. Used as the return
|
||||
* path for the target's reply. NULL on channel-side inbound.
|
||||
*/
|
||||
sourceSessionId?: string | null;
|
||||
},
|
||||
): void {
|
||||
db.prepare(
|
||||
`INSERT INTO messages_in (id, seq, kind, timestamp, status, platform_id, channel_type, thread_id, content, process_after, recurrence, series_id, trigger)
|
||||
VALUES (@id, @seq, @kind, @timestamp, 'pending', @platformId, @channelType, @threadId, @content, @processAfter, @recurrence, @id, @trigger)`,
|
||||
`INSERT INTO messages_in (id, seq, kind, timestamp, status, platform_id, channel_type, thread_id, content, process_after, recurrence, series_id, trigger, source_session_id)
|
||||
VALUES (@id, @seq, @kind, @timestamp, 'pending', @platformId, @channelType, @threadId, @content, @processAfter, @recurrence, @id, @trigger, @sourceSessionId)`,
|
||||
).run({
|
||||
...message,
|
||||
trigger: message.trigger ?? 1,
|
||||
sourceSessionId: message.sourceSessionId ?? null,
|
||||
seq: nextEvenSeq(db),
|
||||
});
|
||||
}
|
||||
@@ -239,6 +246,7 @@ export interface OutboundMessage {
|
||||
channel_type: string | null;
|
||||
thread_id: string | null;
|
||||
content: string;
|
||||
in_reply_to: string | null;
|
||||
}
|
||||
|
||||
export function getDueOutboundMessages(db: Database.Database): OutboundMessage[] {
|
||||
@@ -305,4 +313,47 @@ export function migrateMessagesInTable(db: Database.Database): void {
|
||||
// the agent" semantics, so backfill 1 and default 1 for new inserts.
|
||||
db.prepare('ALTER TABLE messages_in ADD COLUMN trigger INTEGER NOT NULL DEFAULT 1').run();
|
||||
}
|
||||
if (!cols.has('source_session_id')) {
|
||||
// For agent-to-agent return-path routing. NULL on existing rows is fine —
|
||||
// their replies fall back to the legacy "newest active session" lookup.
|
||||
db.prepare('ALTER TABLE messages_in ADD COLUMN source_session_id TEXT').run();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Look up an inbound row's source_session_id by its message id. Returns null
|
||||
* if the row doesn't exist or the column is NULL (channel inbound or
|
||||
* pre-migration a2a inbound). Used by a2a routing to route replies back to
|
||||
* the originating session.
|
||||
*/
|
||||
export function getInboundSourceSessionId(db: Database.Database, messageId: string): string | null {
|
||||
const row = db.prepare('SELECT source_session_id FROM messages_in WHERE id = ?').get(messageId) as
|
||||
| { source_session_id: string | null }
|
||||
| undefined;
|
||||
return row?.source_session_id ?? null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the source_session_id of the most recent a2a inbound row from a
|
||||
* specific peer (by agent group id). Used as a peer-affinity fallback in
|
||||
* a2a routing when an outbound reply has no `in_reply_to` (e.g. the
|
||||
* container's send_message MCP tool path didn't thread the batch's
|
||||
* in_reply_to through).
|
||||
*
|
||||
* Heuristic: "the last time this peer talked to me, which session was it?"
|
||||
* Returns null when no prior a2a inbound from that peer carries a
|
||||
* non-null source_session_id (typical for pre-migration installs).
|
||||
*/
|
||||
export function getMostRecentPeerSourceSessionId(db: Database.Database, peerAgentGroupId: string): string | null {
|
||||
const row = db
|
||||
.prepare(
|
||||
`SELECT source_session_id FROM messages_in
|
||||
WHERE channel_type = 'agent'
|
||||
AND platform_id = ?
|
||||
AND source_session_id IS NOT NULL
|
||||
ORDER BY seq DESC
|
||||
LIMIT 1`,
|
||||
)
|
||||
.get(peerAgentGroupId) as { source_session_id: string | null } | undefined;
|
||||
return row?.source_session_id ?? null;
|
||||
}
|
||||
|
||||
@@ -239,6 +239,7 @@ async function deliverMessage(
|
||||
channel_type: string | null;
|
||||
thread_id: string | null;
|
||||
content: string;
|
||||
in_reply_to: string | null;
|
||||
},
|
||||
session: Session,
|
||||
inDb: Database.Database,
|
||||
|
||||
+453
-1
@@ -11,6 +11,7 @@ import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest';
|
||||
import {
|
||||
initTestDb,
|
||||
closeDb,
|
||||
getDb,
|
||||
runMigrations,
|
||||
createAgentGroup,
|
||||
createMessagingGroup,
|
||||
@@ -19,6 +20,7 @@ import {
|
||||
import {
|
||||
resolveSession,
|
||||
writeSessionMessage,
|
||||
writeSessionRouting,
|
||||
initSessionFolder,
|
||||
sessionDir,
|
||||
inboundDbPath,
|
||||
@@ -26,7 +28,7 @@ import {
|
||||
readOutboxFiles,
|
||||
clearOutbox,
|
||||
} from './session-manager.js';
|
||||
import { getSession, findSession } from './db/sessions.js';
|
||||
import { getSession, findSession, findSessionByAgentGroup } from './db/sessions.js';
|
||||
import type { InboundEvent } from './channels/adapter.js';
|
||||
|
||||
// Mock container runner to prevent actual Docker spawning
|
||||
@@ -595,6 +597,456 @@ describe('router', () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe('routing metadata preservation', () => {
|
||||
beforeEach(() => {
|
||||
createAgentGroup({
|
||||
id: 'ag-1',
|
||||
name: 'Test Agent',
|
||||
folder: 'test-agent',
|
||||
agent_provider: null,
|
||||
created_at: now(),
|
||||
});
|
||||
createMessagingGroup({
|
||||
id: 'mg-1',
|
||||
channel_type: 'discord',
|
||||
platform_id: 'chan-123',
|
||||
name: 'General',
|
||||
is_group: 1,
|
||||
unknown_sender_policy: 'public',
|
||||
created_at: now(),
|
||||
});
|
||||
createMessagingGroupAgent({
|
||||
id: 'mga-1',
|
||||
messaging_group_id: 'mg-1',
|
||||
agent_group_id: 'ag-1',
|
||||
engage_mode: 'pattern',
|
||||
engage_pattern: '.',
|
||||
sender_scope: 'all',
|
||||
ignored_message_policy: 'drop',
|
||||
session_mode: 'shared',
|
||||
priority: 0,
|
||||
created_at: now(),
|
||||
});
|
||||
});
|
||||
|
||||
it('routed message carries platformId, channelType, threadId on the messages_in row', async () => {
|
||||
const { routeInbound } = await import('./router.js');
|
||||
|
||||
await routeInbound({
|
||||
channelType: 'discord',
|
||||
platformId: 'chan-123',
|
||||
threadId: 'thread-42',
|
||||
message: { id: 'msg-r1', kind: 'chat', content: JSON.stringify({ sender: 'A', text: 'hi' }), timestamp: now() },
|
||||
});
|
||||
|
||||
const session = findSession('mg-1', null);
|
||||
const db = new Database(inboundDbPath('ag-1', session!.id));
|
||||
const row = db
|
||||
.prepare('SELECT platform_id, channel_type, thread_id FROM messages_in WHERE id LIKE ?')
|
||||
.get('msg-r1%') as {
|
||||
platform_id: string | null;
|
||||
channel_type: string | null;
|
||||
thread_id: string | null;
|
||||
};
|
||||
db.close();
|
||||
|
||||
expect(row.platform_id).toBe('chan-123');
|
||||
expect(row.channel_type).toBe('discord');
|
||||
expect(row.thread_id).toBe('thread-42');
|
||||
});
|
||||
|
||||
it('fan-out gives each agent its own routing, not leaked from sibling', async () => {
|
||||
const { routeInbound } = await import('./router.js');
|
||||
|
||||
createAgentGroup({
|
||||
id: 'ag-2',
|
||||
name: 'Agent Two',
|
||||
folder: 'agent-two',
|
||||
agent_provider: null,
|
||||
created_at: now(),
|
||||
});
|
||||
createMessagingGroupAgent({
|
||||
id: 'mga-2',
|
||||
messaging_group_id: 'mg-1',
|
||||
agent_group_id: 'ag-2',
|
||||
engage_mode: 'pattern',
|
||||
engage_pattern: '.',
|
||||
sender_scope: 'all',
|
||||
ignored_message_policy: 'drop',
|
||||
session_mode: 'shared',
|
||||
priority: 0,
|
||||
created_at: now(),
|
||||
});
|
||||
|
||||
await routeInbound({
|
||||
channelType: 'discord',
|
||||
platformId: 'chan-123',
|
||||
threadId: 'thread-fanout',
|
||||
message: { id: 'msg-fo', kind: 'chat', content: JSON.stringify({ text: 'fan' }), timestamp: now() },
|
||||
});
|
||||
|
||||
// Both agents should have the message with correct routing
|
||||
const { getSessionsByAgentGroup } = await import('./db/sessions.js');
|
||||
for (const agId of ['ag-1', 'ag-2']) {
|
||||
const sessions = getSessionsByAgentGroup(agId);
|
||||
expect(sessions).toHaveLength(1);
|
||||
const db = new Database(inboundDbPath(agId, sessions[0].id));
|
||||
const row = db.prepare('SELECT platform_id, channel_type, thread_id FROM messages_in LIMIT 1').get() as {
|
||||
platform_id: string | null;
|
||||
channel_type: string | null;
|
||||
thread_id: string | null;
|
||||
};
|
||||
db.close();
|
||||
expect(row.platform_id).toBe('chan-123');
|
||||
expect(row.channel_type).toBe('discord');
|
||||
expect(row.thread_id).toBe('thread-fanout');
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe('writeSessionRouting', () => {
|
||||
it('populates session_routing from the messaging group', () => {
|
||||
createAgentGroup({
|
||||
id: 'ag-1',
|
||||
name: 'Agent',
|
||||
folder: 'agent',
|
||||
agent_provider: null,
|
||||
created_at: now(),
|
||||
});
|
||||
createMessagingGroup({
|
||||
id: 'mg-1',
|
||||
channel_type: 'telegram',
|
||||
platform_id: 'tg:12345',
|
||||
name: 'Chat',
|
||||
is_group: 0,
|
||||
unknown_sender_policy: 'public',
|
||||
created_at: now(),
|
||||
});
|
||||
|
||||
const { session } = resolveSession('ag-1', 'mg-1', null, 'shared');
|
||||
writeSessionRouting('ag-1', session.id);
|
||||
|
||||
const db = new Database(inboundDbPath('ag-1', session.id));
|
||||
const row = db.prepare('SELECT channel_type, platform_id, thread_id FROM session_routing WHERE id = 1').get() as
|
||||
| {
|
||||
channel_type: string | null;
|
||||
platform_id: string | null;
|
||||
thread_id: string | null;
|
||||
}
|
||||
| undefined;
|
||||
db.close();
|
||||
|
||||
expect(row).toBeDefined();
|
||||
expect(row!.channel_type).toBe('telegram');
|
||||
expect(row!.platform_id).toBe('tg:12345');
|
||||
expect(row!.thread_id).toBeNull();
|
||||
});
|
||||
|
||||
it('writes null routing for agent-shared session (no messaging group)', () => {
|
||||
createAgentGroup({
|
||||
id: 'ag-1',
|
||||
name: 'Agent',
|
||||
folder: 'agent',
|
||||
agent_provider: null,
|
||||
created_at: now(),
|
||||
});
|
||||
|
||||
const { session } = resolveSession('ag-1', null, null, 'agent-shared');
|
||||
writeSessionRouting('ag-1', session.id);
|
||||
|
||||
const db = new Database(inboundDbPath('ag-1', session.id));
|
||||
const row = db.prepare('SELECT channel_type, platform_id, thread_id FROM session_routing WHERE id = 1').get() as
|
||||
| {
|
||||
channel_type: string | null;
|
||||
platform_id: string | null;
|
||||
thread_id: string | null;
|
||||
}
|
||||
| undefined;
|
||||
db.close();
|
||||
|
||||
expect(row).toBeDefined();
|
||||
expect(row!.channel_type).toBeNull();
|
||||
expect(row!.platform_id).toBeNull();
|
||||
expect(row!.thread_id).toBeNull();
|
||||
});
|
||||
|
||||
it('includes thread_id from per-thread session', () => {
|
||||
createAgentGroup({
|
||||
id: 'ag-1',
|
||||
name: 'Agent',
|
||||
folder: 'agent',
|
||||
agent_provider: null,
|
||||
created_at: now(),
|
||||
});
|
||||
createMessagingGroup({
|
||||
id: 'mg-1',
|
||||
channel_type: 'discord',
|
||||
platform_id: 'chan-123',
|
||||
name: 'General',
|
||||
is_group: 1,
|
||||
unknown_sender_policy: 'public',
|
||||
created_at: now(),
|
||||
});
|
||||
|
||||
const { session } = resolveSession('ag-1', 'mg-1', 'thread-77', 'per-thread');
|
||||
writeSessionRouting('ag-1', session.id);
|
||||
|
||||
const db = new Database(inboundDbPath('ag-1', session.id));
|
||||
const row = db.prepare('SELECT channel_type, platform_id, thread_id FROM session_routing WHERE id = 1').get() as
|
||||
| {
|
||||
channel_type: string | null;
|
||||
platform_id: string | null;
|
||||
thread_id: string | null;
|
||||
}
|
||||
| undefined;
|
||||
db.close();
|
||||
|
||||
expect(row).toBeDefined();
|
||||
expect(row!.channel_type).toBe('discord');
|
||||
expect(row!.platform_id).toBe('chan-123');
|
||||
expect(row!.thread_id).toBe('thread-77');
|
||||
});
|
||||
});
|
||||
|
||||
describe('agent-shared session resolution', () => {
|
||||
it('resolves to the same session on repeated calls', () => {
|
||||
createAgentGroup({
|
||||
id: 'ag-1',
|
||||
name: 'Agent',
|
||||
folder: 'agent',
|
||||
agent_provider: null,
|
||||
created_at: now(),
|
||||
});
|
||||
|
||||
const { session: s1, created: c1 } = resolveSession('ag-1', null, null, 'agent-shared');
|
||||
const { session: s2, created: c2 } = resolveSession('ag-1', null, null, 'agent-shared');
|
||||
|
||||
expect(c1).toBe(true);
|
||||
expect(c2).toBe(false);
|
||||
expect(s1.id).toBe(s2.id);
|
||||
});
|
||||
|
||||
it('agent-shared session has null messaging_group_id', () => {
|
||||
createAgentGroup({
|
||||
id: 'ag-1',
|
||||
name: 'Agent',
|
||||
folder: 'agent',
|
||||
agent_provider: null,
|
||||
created_at: now(),
|
||||
});
|
||||
|
||||
const { session } = resolveSession('ag-1', null, null, 'agent-shared');
|
||||
expect(session.messaging_group_id).toBeNull();
|
||||
});
|
||||
|
||||
// BUG (#2332): agent-shared resolveSession reuses an existing channel-bound
|
||||
// session via findSessionByAgentGroup instead of creating a dedicated
|
||||
// agent-shared session. The two cannot coexist today — the agent-shared
|
||||
// call finds the channel session and returns it. This test documents the
|
||||
// current (broken) behavior; fixing #2332 should make it pass as written.
|
||||
it.skip('agent-shared and channel-bound sessions coexist for the same agent group', () => {
|
||||
createAgentGroup({
|
||||
id: 'ag-1',
|
||||
name: 'Agent',
|
||||
folder: 'agent',
|
||||
agent_provider: null,
|
||||
created_at: now(),
|
||||
});
|
||||
createMessagingGroup({
|
||||
id: 'mg-1',
|
||||
channel_type: 'discord',
|
||||
platform_id: 'chan-123',
|
||||
name: 'General',
|
||||
is_group: 1,
|
||||
unknown_sender_policy: 'public',
|
||||
created_at: now(),
|
||||
});
|
||||
|
||||
const { session: shared } = resolveSession('ag-1', 'mg-1', null, 'shared');
|
||||
const { session: agentShared } = resolveSession('ag-1', null, null, 'agent-shared');
|
||||
|
||||
expect(shared.id).not.toBe(agentShared.id);
|
||||
expect(shared.messaging_group_id).toBe('mg-1');
|
||||
expect(agentShared.messaging_group_id).toBeNull();
|
||||
});
|
||||
|
||||
it('findSessionByAgentGroup returns existing channel-bound session (bug #2332)', () => {
|
||||
// Documents the current behavior: findSessionByAgentGroup doesn't
|
||||
// distinguish agent-shared from channel-bound. When a channel session
|
||||
// exists, agent-shared resolution reuses it instead of creating a
|
||||
// separate session. This is the root cause of A2A misrouting.
|
||||
createAgentGroup({
|
||||
id: 'ag-1',
|
||||
name: 'Agent',
|
||||
folder: 'agent',
|
||||
agent_provider: null,
|
||||
created_at: now(),
|
||||
});
|
||||
createMessagingGroup({
|
||||
id: 'mg-1',
|
||||
channel_type: 'discord',
|
||||
platform_id: 'chan-123',
|
||||
name: 'General',
|
||||
is_group: 1,
|
||||
unknown_sender_policy: 'public',
|
||||
created_at: now(),
|
||||
});
|
||||
|
||||
const { session: channelSession } = resolveSession('ag-1', 'mg-1', null, 'shared');
|
||||
const found = findSessionByAgentGroup('ag-1');
|
||||
|
||||
// Bug: picks the channel session — an agent-shared call would get this
|
||||
// instead of a dedicated session.
|
||||
expect(found).toBeDefined();
|
||||
expect(found!.id).toBe(channelSession.id);
|
||||
expect(found!.messaging_group_id).toBe('mg-1'); // should be null for agent-shared
|
||||
});
|
||||
});
|
||||
|
||||
describe('agent-to-agent routing', () => {
|
||||
beforeEach(() => {
|
||||
createAgentGroup({
|
||||
id: 'ag-pa',
|
||||
name: 'PA',
|
||||
folder: 'pa-agent',
|
||||
agent_provider: null,
|
||||
created_at: now(),
|
||||
});
|
||||
createMessagingGroup({
|
||||
id: 'mg-slack',
|
||||
channel_type: 'slack',
|
||||
platform_id: 'C-GENERAL',
|
||||
name: 'Slack General',
|
||||
is_group: 1,
|
||||
unknown_sender_policy: 'public',
|
||||
created_at: now(),
|
||||
});
|
||||
createAgentGroup({
|
||||
id: 'ag-researcher',
|
||||
name: 'Researcher',
|
||||
folder: 'researcher-agent',
|
||||
agent_provider: null,
|
||||
created_at: now(),
|
||||
});
|
||||
|
||||
// Wire bidirectional A2A destinations (table created by runMigrations)
|
||||
const db = getDb();
|
||||
db.prepare(
|
||||
`INSERT OR IGNORE INTO agent_destinations (agent_group_id, local_name, target_type, target_id, created_at)
|
||||
VALUES ('ag-pa', 'researcher', 'agent', 'ag-researcher', ?)`,
|
||||
).run(now());
|
||||
db.prepare(
|
||||
`INSERT OR IGNORE INTO agent_destinations (agent_group_id, local_name, target_type, target_id, created_at)
|
||||
VALUES ('ag-researcher', 'pa', 'agent', 'ag-pa', ?)`,
|
||||
).run(now());
|
||||
});
|
||||
|
||||
it('A2A outbound lands in a session for the target agent', async () => {
|
||||
const { routeAgentMessage } = await import('./modules/agent-to-agent/agent-route.js');
|
||||
|
||||
const { session: paSlackSession } = resolveSession('ag-pa', 'mg-slack', null, 'shared');
|
||||
|
||||
await routeAgentMessage(
|
||||
{ id: 'out-a2a-1', platform_id: 'ag-researcher', content: JSON.stringify({ text: 'research this' }) },
|
||||
paSlackSession,
|
||||
);
|
||||
|
||||
const { getSessionsByAgentGroup } = await import('./db/sessions.js');
|
||||
const researcherSessions = getSessionsByAgentGroup('ag-researcher');
|
||||
expect(researcherSessions.length).toBeGreaterThanOrEqual(1);
|
||||
|
||||
const rDb = new Database(inboundDbPath('ag-researcher', researcherSessions[0].id));
|
||||
const rows = rDb.prepare('SELECT platform_id, channel_type, content FROM messages_in').all() as Array<{
|
||||
platform_id: string | null;
|
||||
channel_type: string | null;
|
||||
content: string;
|
||||
}>;
|
||||
rDb.close();
|
||||
|
||||
expect(rows).toHaveLength(1);
|
||||
expect(rows[0].channel_type).toBe('agent');
|
||||
expect(rows[0].platform_id).toBe('ag-pa');
|
||||
expect(JSON.parse(rows[0].content).text).toBe('research this');
|
||||
});
|
||||
|
||||
it('A2A return path routes to originating session, not newest (#2332)', async () => {
|
||||
// PA has Slack session, then gets wired to Discord (newer session).
|
||||
// Researcher responds to PA. With the return-path fix, the reply
|
||||
// routes back to the Slack session (originator) not Discord (newest).
|
||||
const { routeAgentMessage } = await import('./modules/agent-to-agent/agent-route.js');
|
||||
|
||||
const { session: paSlackSession } = resolveSession('ag-pa', 'mg-slack', null, 'shared');
|
||||
|
||||
createMessagingGroup({
|
||||
id: 'mg-discord',
|
||||
channel_type: 'discord',
|
||||
platform_id: 'chan-discord',
|
||||
name: 'Discord',
|
||||
is_group: 0,
|
||||
unknown_sender_policy: 'public',
|
||||
created_at: now(),
|
||||
});
|
||||
const { session: paDiscordSession } = resolveSession('ag-pa', 'mg-discord', null, 'shared');
|
||||
|
||||
// PA sends from Slack
|
||||
await routeAgentMessage(
|
||||
{ id: 'out-fwd', platform_id: 'ag-researcher', content: JSON.stringify({ text: 'research' }) },
|
||||
paSlackSession,
|
||||
);
|
||||
|
||||
// Researcher responds back to PA
|
||||
const { getSessionsByAgentGroup } = await import('./db/sessions.js');
|
||||
const researcherSession = getSessionsByAgentGroup('ag-researcher')[0];
|
||||
|
||||
await routeAgentMessage(
|
||||
{ id: 'out-reply', platform_id: 'ag-pa', content: JSON.stringify({ text: 'found it' }) },
|
||||
researcherSession,
|
||||
);
|
||||
|
||||
const slackDb = new Database(inboundDbPath('ag-pa', paSlackSession.id));
|
||||
const slackA2a = slackDb.prepare("SELECT * FROM messages_in WHERE channel_type = 'agent'").all();
|
||||
slackDb.close();
|
||||
|
||||
const discordDb = new Database(inboundDbPath('ag-pa', paDiscordSession.id));
|
||||
const discordA2a = discordDb.prepare("SELECT * FROM messages_in WHERE channel_type = 'agent'").all();
|
||||
discordDb.close();
|
||||
|
||||
// Fixed: response lands in Slack (origin) not Discord (newest)
|
||||
expect(slackA2a).toHaveLength(1);
|
||||
expect(discordA2a).toHaveLength(0);
|
||||
});
|
||||
|
||||
it('BUG: A2A-only session gets null session_routing (#2332)', async () => {
|
||||
// Researcher only has an agent-shared session (no channel wiring).
|
||||
// writeSessionRouting writes nulls because messaging_group_id is null.
|
||||
const { routeAgentMessage } = await import('./modules/agent-to-agent/agent-route.js');
|
||||
|
||||
const { session: paSession } = resolveSession('ag-pa', 'mg-slack', null, 'shared');
|
||||
await routeAgentMessage(
|
||||
{ id: 'out-1', platform_id: 'ag-researcher', content: JSON.stringify({ text: 'go' }) },
|
||||
paSession,
|
||||
);
|
||||
|
||||
const { getSessionsByAgentGroup } = await import('./db/sessions.js');
|
||||
const researcherSessions = getSessionsByAgentGroup('ag-researcher');
|
||||
expect(researcherSessions).toHaveLength(1);
|
||||
|
||||
writeSessionRouting('ag-researcher', researcherSessions[0].id);
|
||||
|
||||
const rDb = new Database(inboundDbPath('ag-researcher', researcherSessions[0].id));
|
||||
const routing = rDb.prepare('SELECT channel_type, platform_id FROM session_routing WHERE id = 1').get() as {
|
||||
channel_type: string | null;
|
||||
platform_id: string | null;
|
||||
} | undefined;
|
||||
rDb.close();
|
||||
|
||||
// BUG: session_routing is all null — researcher has no default routing
|
||||
expect(routing).toBeDefined();
|
||||
expect(routing!.channel_type).toBeNull();
|
||||
expect(routing!.platform_id).toBeNull();
|
||||
});
|
||||
});
|
||||
|
||||
describe('delivery', () => {
|
||||
it('should detect undelivered messages in outbound DB', () => {
|
||||
createAgentGroup({
|
||||
|
||||
@@ -1,20 +1,53 @@
|
||||
import { describe, expect, it } from 'vitest';
|
||||
import Database from 'better-sqlite3';
|
||||
import fs from 'fs';
|
||||
import { describe, expect, it, beforeEach, afterEach, vi } from 'vitest';
|
||||
|
||||
import { isSafeAttachmentName } from './agent-route.js';
|
||||
import { isSafeAttachmentName, routeAgentMessage } from './agent-route.js';
|
||||
import { createDestination } from './db/agent-destinations.js';
|
||||
import { initTestDb, closeDb, runMigrations, createAgentGroup } from '../../db/index.js';
|
||||
import { createSession } from '../../db/sessions.js';
|
||||
import { initSessionFolder, inboundDbPath } from '../../session-manager.js';
|
||||
import type { Session } from '../../types.js';
|
||||
|
||||
vi.mock('../../container-runner.js', () => ({
|
||||
wakeContainer: vi.fn().mockResolvedValue(undefined),
|
||||
isContainerRunning: vi.fn().mockReturnValue(false),
|
||||
getActiveContainerCount: vi.fn().mockReturnValue(0),
|
||||
killContainer: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock('../../config.js', async () => {
|
||||
const actual = await vi.importActual('../../config.js');
|
||||
return { ...actual, DATA_DIR: '/tmp/nanoclaw-test-a2a-route' };
|
||||
});
|
||||
|
||||
const TEST_DIR = '/tmp/nanoclaw-test-a2a-route';
|
||||
|
||||
function now(): string {
|
||||
return new Date().toISOString();
|
||||
}
|
||||
|
||||
function readInbound(agentGroupId: string, sessionId: string) {
|
||||
const db = new Database(inboundDbPath(agentGroupId, sessionId), { readonly: true });
|
||||
const rows = db
|
||||
.prepare('SELECT id, platform_id, channel_type, content, source_session_id FROM messages_in ORDER BY seq')
|
||||
.all() as Array<{
|
||||
id: string;
|
||||
platform_id: string | null;
|
||||
channel_type: string | null;
|
||||
content: string;
|
||||
source_session_id: string | null;
|
||||
}>;
|
||||
db.close();
|
||||
return rows;
|
||||
}
|
||||
|
||||
/**
|
||||
* `forwardAttachedFiles` has a filesystem side that's awkward to unit-test
|
||||
* without mocking DATA_DIR. The guarantee worth pinning is that the
|
||||
* filename validator rejects everything that could escape the inbox dir —
|
||||
* `forwardAttachedFiles` runs this guard before any I/O, so traversal is
|
||||
* impossible as long as this matrix holds.
|
||||
*/
|
||||
describe('isSafeAttachmentName', () => {
|
||||
it('accepts plain filenames', () => {
|
||||
expect(isSafeAttachmentName('baby-duck.png')).toBe(true);
|
||||
expect(isSafeAttachmentName('file with spaces.pdf')).toBe(true);
|
||||
expect(isSafeAttachmentName('report.v2.docx')).toBe(true);
|
||||
expect(isSafeAttachmentName('.hidden')).toBe(true); // leading dot is fine, just not `.` / `..`
|
||||
expect(isSafeAttachmentName('.hidden')).toBe(true);
|
||||
});
|
||||
|
||||
it('rejects empty / sentinel values', () => {
|
||||
@@ -44,3 +77,200 @@ describe('isSafeAttachmentName', () => {
|
||||
expect(isSafeAttachmentName(undefined as unknown as string)).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
/**
|
||||
* Return-path routing: when an a2a reply targets an agent group with multiple
|
||||
* sessions, it must land in the *originating* session — not the newest one.
|
||||
*
|
||||
* Setup: agent A has two active sessions S1 (older) + S2 (newer).
|
||||
* Agent B is the peer A talks to. Bidirectional destinations wired.
|
||||
*/
|
||||
describe('routeAgentMessage return-path', () => {
|
||||
const A = 'ag-A';
|
||||
const B = 'ag-B';
|
||||
let S1: Session;
|
||||
let S2: Session;
|
||||
let SB: Session;
|
||||
|
||||
beforeEach(() => {
|
||||
if (fs.existsSync(TEST_DIR)) fs.rmSync(TEST_DIR, { recursive: true });
|
||||
fs.mkdirSync(TEST_DIR, { recursive: true });
|
||||
|
||||
const db = initTestDb();
|
||||
runMigrations(db);
|
||||
|
||||
createAgentGroup({ id: A, name: 'A', folder: 'a', agent_provider: null, created_at: now() });
|
||||
createAgentGroup({ id: B, name: 'B', folder: 'b', agent_provider: null, created_at: now() });
|
||||
|
||||
// S1 (older), S2 (newer) — both active sessions on A.
|
||||
S1 = {
|
||||
id: 'sess-A-old',
|
||||
agent_group_id: A,
|
||||
messaging_group_id: null,
|
||||
thread_id: null,
|
||||
agent_provider: null,
|
||||
status: 'active',
|
||||
container_status: 'stopped',
|
||||
last_active: null,
|
||||
created_at: '2026-01-01T00:00:00.000Z',
|
||||
};
|
||||
S2 = {
|
||||
id: 'sess-A-new',
|
||||
agent_group_id: A,
|
||||
messaging_group_id: null,
|
||||
thread_id: null,
|
||||
agent_provider: null,
|
||||
status: 'active',
|
||||
container_status: 'stopped',
|
||||
last_active: null,
|
||||
created_at: '2026-02-01T00:00:00.000Z',
|
||||
};
|
||||
SB = {
|
||||
id: 'sess-B',
|
||||
agent_group_id: B,
|
||||
messaging_group_id: null,
|
||||
thread_id: null,
|
||||
agent_provider: null,
|
||||
status: 'active',
|
||||
container_status: 'stopped',
|
||||
last_active: null,
|
||||
created_at: '2026-01-15T00:00:00.000Z',
|
||||
};
|
||||
createSession(S1);
|
||||
createSession(S2);
|
||||
createSession(SB);
|
||||
initSessionFolder(A, S1.id);
|
||||
initSessionFolder(A, S2.id);
|
||||
initSessionFolder(B, SB.id);
|
||||
|
||||
createDestination({
|
||||
agent_group_id: A,
|
||||
local_name: 'b',
|
||||
target_type: 'agent',
|
||||
target_id: B,
|
||||
created_at: now(),
|
||||
});
|
||||
createDestination({
|
||||
agent_group_id: B,
|
||||
local_name: 'a',
|
||||
target_type: 'agent',
|
||||
target_id: A,
|
||||
created_at: now(),
|
||||
});
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
closeDb();
|
||||
if (fs.existsSync(TEST_DIR)) fs.rmSync(TEST_DIR, { recursive: true });
|
||||
});
|
||||
|
||||
it('forward direction: stamps source_session_id on the target inbound row', async () => {
|
||||
// A.S1 emits an outbound a2a to B.
|
||||
await routeAgentMessage(
|
||||
{
|
||||
id: 'msg-from-A-S1',
|
||||
platform_id: B,
|
||||
content: JSON.stringify({ text: 'hello B' }),
|
||||
in_reply_to: null,
|
||||
},
|
||||
S1,
|
||||
);
|
||||
|
||||
const bRows = readInbound(B, SB.id);
|
||||
expect(bRows).toHaveLength(1);
|
||||
expect(bRows[0].platform_id).toBe(A);
|
||||
expect(bRows[0].source_session_id).toBe(S1.id); // <- the return address
|
||||
});
|
||||
|
||||
it('reply direction: routes back to the originating session, not the newest', async () => {
|
||||
// A.S1 sends to B.
|
||||
await routeAgentMessage(
|
||||
{
|
||||
id: 'msg-from-A-S1',
|
||||
platform_id: B,
|
||||
content: JSON.stringify({ text: 'ping' }),
|
||||
in_reply_to: null,
|
||||
},
|
||||
S1,
|
||||
);
|
||||
|
||||
// Capture the synthetic id the host stamped on B's inbound — that's what
|
||||
// B's container would reference as `in_reply_to` when replying.
|
||||
const bRows = readInbound(B, SB.id);
|
||||
const yId = bRows[0].id;
|
||||
|
||||
// B replies to that message.
|
||||
await routeAgentMessage(
|
||||
{
|
||||
id: 'msg-from-B',
|
||||
platform_id: A,
|
||||
content: JSON.stringify({ text: 'pong' }),
|
||||
in_reply_to: yId,
|
||||
},
|
||||
SB,
|
||||
);
|
||||
|
||||
const s1Rows = readInbound(A, S1.id);
|
||||
const s2Rows = readInbound(A, S2.id);
|
||||
|
||||
// The reply lands in S1 (originator) even though S2 is newer.
|
||||
expect(s1Rows).toHaveLength(1);
|
||||
expect(s1Rows[0].platform_id).toBe(B);
|
||||
expect(JSON.parse(s1Rows[0].content).text).toBe('pong');
|
||||
expect(s2Rows).toHaveLength(0);
|
||||
});
|
||||
|
||||
it('fallback: a2a with no in_reply_to falls through to newest-session lookup', async () => {
|
||||
// No prior conversation. B initiates an a2a to A out of the blue.
|
||||
await routeAgentMessage(
|
||||
{
|
||||
id: 'msg-from-B-fresh',
|
||||
platform_id: A,
|
||||
content: JSON.stringify({ text: 'unsolicited' }),
|
||||
in_reply_to: null,
|
||||
},
|
||||
SB,
|
||||
);
|
||||
|
||||
// Newest session wins (current heuristic, preserved).
|
||||
const s1Rows = readInbound(A, S1.id);
|
||||
const s2Rows = readInbound(A, S2.id);
|
||||
expect(s1Rows).toHaveLength(0);
|
||||
expect(s2Rows).toHaveLength(1);
|
||||
});
|
||||
|
||||
it('peer-affinity fallback: with no in_reply_to, routes to most recent peer-source session', async () => {
|
||||
// A.S1 sends to B (establishing affinity: B's last contact from A was via S1).
|
||||
await routeAgentMessage(
|
||||
{
|
||||
id: 'msg-from-A-S1-pre',
|
||||
platform_id: B,
|
||||
content: JSON.stringify({ text: 'context-establishing' }),
|
||||
in_reply_to: null,
|
||||
},
|
||||
S1,
|
||||
);
|
||||
|
||||
// B sends a follow-up but its container forgot to set in_reply_to (e.g.
|
||||
// emitted via an MCP tool path that doesn't thread the batch's in_reply_to
|
||||
// through). The host should still route this to S1 because S1 is the
|
||||
// session most recently in conversation with B — not the chronologically
|
||||
// newest session of A.
|
||||
await routeAgentMessage(
|
||||
{
|
||||
id: 'msg-from-B-followup',
|
||||
platform_id: A,
|
||||
content: JSON.stringify({ text: 'standing by' }),
|
||||
in_reply_to: null,
|
||||
},
|
||||
SB,
|
||||
);
|
||||
|
||||
const s1Rows = readInbound(A, S1.id);
|
||||
const s2Rows = readInbound(A, S2.id);
|
||||
// Affinity wins: reply to S1, not the newer S2.
|
||||
expect(s1Rows).toHaveLength(1);
|
||||
expect(JSON.parse(s1Rows[0].content).text).toBe('standing by');
|
||||
expect(s2Rows).toHaveLength(0);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -23,10 +23,11 @@ import path from 'path';
|
||||
|
||||
import { isSafeAttachmentName } from '../../attachment-safety.js';
|
||||
import { getAgentGroup } from '../../db/agent-groups.js';
|
||||
import { getInboundSourceSessionId, getMostRecentPeerSourceSessionId } from '../../db/session-db.js';
|
||||
import { getSession } from '../../db/sessions.js';
|
||||
import { wakeContainer } from '../../container-runner.js';
|
||||
import { log } from '../../log.js';
|
||||
import { resolveSession, sessionDir, writeSessionMessage } from '../../session-manager.js';
|
||||
import { openInboundDb, resolveSession, sessionDir, writeSessionMessage } from '../../session-manager.js';
|
||||
import type { Session } from '../../types.js';
|
||||
import { hasDestination } from './db/agent-destinations.js';
|
||||
|
||||
@@ -101,6 +102,61 @@ export interface RoutableAgentMessage {
|
||||
id: string;
|
||||
platform_id: string | null;
|
||||
content: string;
|
||||
/**
|
||||
* For replies, the id of the inbound message being replied to. The
|
||||
* container's formatter sets this from the first inbound in the batch
|
||||
* (`container/agent-runner/src/formatter.ts`). Used here to route the
|
||||
* reply back to the originating session — see `resolveTargetSession`.
|
||||
*/
|
||||
in_reply_to: string | null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Pick which session of `targetAgentGroupId` should receive this a2a message.
|
||||
*
|
||||
* Three layers, highest-fidelity first:
|
||||
*
|
||||
* 1. **Direct return-path** (in_reply_to lookup): if the message is a reply
|
||||
* (`in_reply_to` set), open the source agent's inbound DB and read the
|
||||
* triggering row's `source_session_id`. That column was stamped when the
|
||||
* original outbound was routed — it's the session that started the
|
||||
* conversation, and replies should land there even when the target has
|
||||
* multiple active sessions.
|
||||
*
|
||||
* 2. **Peer-affinity fallback**: if (1) misses (in_reply_to is null or the
|
||||
* referenced row isn't an a2a inbound), look up the most recent a2a
|
||||
* inbound *from the target agent group* in source's inbound and use its
|
||||
* `source_session_id`. The intuition: the last time this peer talked to
|
||||
* me, which target session was driving? Route the reply there, since
|
||||
* that's the session most plausibly in active conversation.
|
||||
*
|
||||
* 3. **Newest active session**: legacy heuristic. Used when no prior a2a
|
||||
* has been recorded with `source_session_id` (e.g. fresh installs,
|
||||
* pre-migration data).
|
||||
*/
|
||||
function resolveTargetSession(msg: RoutableAgentMessage, sourceSession: Session, targetAgentGroupId: string): Session {
|
||||
const srcDb = openInboundDb(sourceSession.agent_group_id, sourceSession.id);
|
||||
let originSessionId: string | null = null;
|
||||
try {
|
||||
if (msg.in_reply_to) {
|
||||
originSessionId = getInboundSourceSessionId(srcDb, msg.in_reply_to);
|
||||
}
|
||||
if (!originSessionId) {
|
||||
// Peer-affinity fallback — covers the case where the container's
|
||||
// outbound write didn't carry in_reply_to (e.g. legacy MCP send_message
|
||||
// path, container running pre-fix code).
|
||||
originSessionId = getMostRecentPeerSourceSessionId(srcDb, targetAgentGroupId);
|
||||
}
|
||||
} finally {
|
||||
srcDb.close();
|
||||
}
|
||||
if (originSessionId) {
|
||||
const candidate = getSession(originSessionId);
|
||||
if (candidate && candidate.agent_group_id === targetAgentGroupId && candidate.status === 'active') {
|
||||
return candidate;
|
||||
}
|
||||
}
|
||||
return resolveSession(targetAgentGroupId, null, null, 'agent-shared').session;
|
||||
}
|
||||
|
||||
export async function routeAgentMessage(msg: RoutableAgentMessage, session: Session): Promise<void> {
|
||||
@@ -119,7 +175,7 @@ export async function routeAgentMessage(msg: RoutableAgentMessage, session: Sess
|
||||
if (!getAgentGroup(targetAgentGroupId)) {
|
||||
throw new Error(`target agent group ${targetAgentGroupId} not found for message ${msg.id}`);
|
||||
}
|
||||
const { session: targetSession } = resolveSession(targetAgentGroupId, null, null, 'agent-shared');
|
||||
const targetSession = resolveTargetSession(msg, session, targetAgentGroupId);
|
||||
const a2aMsgId = `a2a-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
|
||||
|
||||
// If the source message references files (via `send_file`), forward the
|
||||
@@ -137,6 +193,7 @@ export async function routeAgentMessage(msg: RoutableAgentMessage, session: Sess
|
||||
channelType: 'agent',
|
||||
threadId: null,
|
||||
content: forwardedContent,
|
||||
sourceSessionId: session.id,
|
||||
});
|
||||
log.info('Agent message routed', {
|
||||
from: session.agent_group_id,
|
||||
|
||||
@@ -210,6 +210,12 @@ export function writeSessionMessage(
|
||||
* a trigger-1 message does arrive.
|
||||
*/
|
||||
trigger?: 0 | 1;
|
||||
/**
|
||||
* For agent-to-agent inbound: the source session id that emitted the
|
||||
* outbound message which became this inbound row. Used as the return
|
||||
* path so the target's reply routes back to that exact session.
|
||||
*/
|
||||
sourceSessionId?: string | null;
|
||||
},
|
||||
): void {
|
||||
// Extract base64 attachment data, save to inbox, replace with file paths
|
||||
@@ -228,6 +234,7 @@ export function writeSessionMessage(
|
||||
processAfter: message.processAfter ?? null,
|
||||
recurrence: message.recurrence ?? null,
|
||||
trigger: message.trigger ?? 1,
|
||||
sourceSessionId: message.sourceSessionId ?? null,
|
||||
});
|
||||
} finally {
|
||||
db.close();
|
||||
|
||||
Reference in New Issue
Block a user