fix(agent-to-agent): route a2a replies back to the originating session

When a target agent group has multiple active sessions (e.g. a parent
wired in `shared` mode with both Signal and email), every a2a reply
landed in the newest session via `findSessionByAgentGroup`'s
`ORDER BY created_at DESC LIMIT 1`. Conversations split-brain: the
session that asked the question never sees the answer.

Stamp `source_session_id` on the synthetic a2a inbound row when the host
routes an outbound, then on reply look up that column via the reply's
`in_reply_to` and route the response back to the originating session.
Falls back to the prior "newest active" heuristic when there's no
`in_reply_to` (fresh-initiated a2a) or the lookup misses.

- `messages_in` gains a nullable `source_session_id` column; existing
  rows stay NULL and route via the fallback path.
- The migration runs on every `openInboundDb` call, mirroring the
  existing `series_id` / `trigger` additive migrations.
- Container side is unchanged — formatter already populates
  `in_reply_to` on outbound replies.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Doug Daniels
2026-05-04 16:36:55 -04:00
committed by gavrielc
parent 1a358dc7e3
commit 04f03e9064
7 changed files with 332 additions and 16 deletions
+7 -1
View File
@@ -171,7 +171,13 @@ CREATE TABLE IF NOT EXISTS messages_in (
platform_id TEXT,
channel_type TEXT,
thread_id TEXT,
content TEXT NOT NULL
content TEXT NOT NULL,
-- For agent-to-agent inbound rows: the source session that emitted the
-- triggering outbound. Used as a return path when the target replies —
-- the reply routes back to this exact session, not to the source agent
-- group's "newest" session. NULL on channel-side inbound and on a2a rows
-- written before this column existed.
source_session_id TEXT
);
CREATE INDEX IF NOT EXISTS idx_messages_in_series ON messages_in(series_id);
+37 -1
View File
@@ -10,7 +10,7 @@ import fs from 'fs';
import path from 'path';
import { describe, it, expect, afterEach } from 'vitest';
import { migrateMessagesInTable } from './session-db.js';
import { getInboundSourceSessionId, migrateMessagesInTable } from './session-db.js';
const TEST_DIR = '/tmp/nanoclaw-session-db-test';
const DB_PATH = path.join(TEST_DIR, 'inbound.db');
@@ -55,4 +55,40 @@ describe('migrateMessagesInTable', () => {
expect(row.series_id).toBe('legacy-1');
db.close();
});
it('adds source_session_id on a legacy DB, leaves existing rows NULL, is idempotent', () => {
if (fs.existsSync(TEST_DIR)) fs.rmSync(TEST_DIR, { recursive: true });
fs.mkdirSync(TEST_DIR, { recursive: true });
const db = new Database(DB_PATH);
db.exec(`
CREATE TABLE messages_in (
id TEXT PRIMARY KEY,
seq INTEGER UNIQUE,
kind TEXT NOT NULL,
timestamp TEXT NOT NULL,
status TEXT DEFAULT 'pending',
process_after TEXT,
recurrence TEXT,
tries INTEGER DEFAULT 0,
platform_id TEXT,
channel_type TEXT,
thread_id TEXT,
content TEXT NOT NULL
);
`);
db.prepare(
"INSERT INTO messages_in (id, seq, kind, timestamp, status, content) VALUES (?, ?, 'chat', datetime('now'), 'pending', '{}')",
).run('legacy-2', 2);
migrateMessagesInTable(db);
migrateMessagesInTable(db); // idempotent
const cols = (db.prepare("PRAGMA table_info('messages_in')").all() as Array<{ name: string }>).map((c) => c.name);
expect(cols).toContain('source_session_id');
expect(getInboundSourceSessionId(db, 'legacy-2')).toBeNull();
expect(getInboundSourceSessionId(db, 'does-not-exist')).toBeNull();
db.close();
});
});
+28 -2
View File
@@ -108,14 +108,21 @@ export function insertMessage(
* Host countDueMessages gates on this; container reads everything.
*/
trigger?: 0 | 1;
/**
* For agent-to-agent inbound: the source session id that emitted the
* outbound message which became this inbound row. Used as the return
* path for the target's reply. NULL on channel-side inbound.
*/
sourceSessionId?: string | null;
},
): void {
db.prepare(
`INSERT INTO messages_in (id, seq, kind, timestamp, status, platform_id, channel_type, thread_id, content, process_after, recurrence, series_id, trigger)
VALUES (@id, @seq, @kind, @timestamp, 'pending', @platformId, @channelType, @threadId, @content, @processAfter, @recurrence, @id, @trigger)`,
`INSERT INTO messages_in (id, seq, kind, timestamp, status, platform_id, channel_type, thread_id, content, process_after, recurrence, series_id, trigger, source_session_id)
VALUES (@id, @seq, @kind, @timestamp, 'pending', @platformId, @channelType, @threadId, @content, @processAfter, @recurrence, @id, @trigger, @sourceSessionId)`,
).run({
...message,
trigger: message.trigger ?? 1,
sourceSessionId: message.sourceSessionId ?? null,
seq: nextEvenSeq(db),
});
}
@@ -239,6 +246,7 @@ export interface OutboundMessage {
channel_type: string | null;
thread_id: string | null;
content: string;
in_reply_to: string | null;
}
export function getDueOutboundMessages(db: Database.Database): OutboundMessage[] {
@@ -305,4 +313,22 @@ export function migrateMessagesInTable(db: Database.Database): void {
// the agent" semantics, so backfill 1 and default 1 for new inserts.
db.prepare('ALTER TABLE messages_in ADD COLUMN trigger INTEGER NOT NULL DEFAULT 1').run();
}
if (!cols.has('source_session_id')) {
// For agent-to-agent return-path routing. NULL on existing rows is fine —
// their replies fall back to the legacy "newest active session" lookup.
db.prepare('ALTER TABLE messages_in ADD COLUMN source_session_id TEXT').run();
}
}
/**
* Look up an inbound row's source_session_id by its message id. Returns null
* if the row doesn't exist or the column is NULL (channel inbound or
* pre-migration a2a inbound). Used by a2a routing to route replies back to
* the originating session.
*/
export function getInboundSourceSessionId(db: Database.Database, messageId: string): string | null {
const row = db.prepare('SELECT source_session_id FROM messages_in WHERE id = ?').get(messageId) as
| { source_session_id: string | null }
| undefined;
return row?.source_session_id ?? null;
}
+1
View File
@@ -239,6 +239,7 @@ async function deliverMessage(
channel_type: string | null;
thread_id: string | null;
content: string;
in_reply_to: string | null;
},
session: Session,
inDb: Database.Database,
+205 -10
View File
@@ -1,20 +1,53 @@
import { describe, expect, it } from 'vitest';
import Database from 'better-sqlite3';
import fs from 'fs';
import { describe, expect, it, beforeEach, afterEach, vi } from 'vitest';
import { isSafeAttachmentName } from './agent-route.js';
import { isSafeAttachmentName, routeAgentMessage } from './agent-route.js';
import { createDestination } from './db/agent-destinations.js';
import { initTestDb, closeDb, runMigrations, createAgentGroup } from '../../db/index.js';
import { createSession } from '../../db/sessions.js';
import { initSessionFolder, inboundDbPath } from '../../session-manager.js';
import type { Session } from '../../types.js';
vi.mock('../../container-runner.js', () => ({
wakeContainer: vi.fn().mockResolvedValue(undefined),
isContainerRunning: vi.fn().mockReturnValue(false),
getActiveContainerCount: vi.fn().mockReturnValue(0),
killContainer: vi.fn(),
}));
vi.mock('../../config.js', async () => {
const actual = await vi.importActual('../../config.js');
return { ...actual, DATA_DIR: '/tmp/nanoclaw-test-a2a-route' };
});
const TEST_DIR = '/tmp/nanoclaw-test-a2a-route';
function now(): string {
return new Date().toISOString();
}
function readInbound(agentGroupId: string, sessionId: string) {
const db = new Database(inboundDbPath(agentGroupId, sessionId), { readonly: true });
const rows = db
.prepare('SELECT id, platform_id, channel_type, content, source_session_id FROM messages_in ORDER BY seq')
.all() as Array<{
id: string;
platform_id: string | null;
channel_type: string | null;
content: string;
source_session_id: string | null;
}>;
db.close();
return rows;
}
/**
* `forwardAttachedFiles` has a filesystem side that's awkward to unit-test
* without mocking DATA_DIR. The guarantee worth pinning is that the
* filename validator rejects everything that could escape the inbox dir —
* `forwardAttachedFiles` runs this guard before any I/O, so traversal is
* impossible as long as this matrix holds.
*/
describe('isSafeAttachmentName', () => {
it('accepts plain filenames', () => {
expect(isSafeAttachmentName('baby-duck.png')).toBe(true);
expect(isSafeAttachmentName('file with spaces.pdf')).toBe(true);
expect(isSafeAttachmentName('report.v2.docx')).toBe(true);
expect(isSafeAttachmentName('.hidden')).toBe(true); // leading dot is fine, just not `.` / `..`
expect(isSafeAttachmentName('.hidden')).toBe(true);
});
it('rejects empty / sentinel values', () => {
@@ -44,3 +77,165 @@ describe('isSafeAttachmentName', () => {
expect(isSafeAttachmentName(undefined as unknown as string)).toBe(false);
});
});
/**
* Return-path routing: when an a2a reply targets an agent group with multiple
* sessions, it must land in the *originating* session — not the newest one.
*
* Setup: agent A has two active sessions S1 (older) + S2 (newer).
* Agent B is the peer A talks to. Bidirectional destinations wired.
*/
describe('routeAgentMessage return-path', () => {
const A = 'ag-A';
const B = 'ag-B';
let S1: Session;
let S2: Session;
let SB: Session;
beforeEach(() => {
if (fs.existsSync(TEST_DIR)) fs.rmSync(TEST_DIR, { recursive: true });
fs.mkdirSync(TEST_DIR, { recursive: true });
const db = initTestDb();
runMigrations(db);
createAgentGroup({ id: A, name: 'A', folder: 'a', agent_provider: null, created_at: now() });
createAgentGroup({ id: B, name: 'B', folder: 'b', agent_provider: null, created_at: now() });
// S1 (older), S2 (newer) — both active sessions on A.
S1 = {
id: 'sess-A-old',
agent_group_id: A,
messaging_group_id: null,
thread_id: null,
agent_provider: null,
status: 'active',
container_status: 'stopped',
last_active: null,
created_at: '2026-01-01T00:00:00.000Z',
};
S2 = {
id: 'sess-A-new',
agent_group_id: A,
messaging_group_id: null,
thread_id: null,
agent_provider: null,
status: 'active',
container_status: 'stopped',
last_active: null,
created_at: '2026-02-01T00:00:00.000Z',
};
SB = {
id: 'sess-B',
agent_group_id: B,
messaging_group_id: null,
thread_id: null,
agent_provider: null,
status: 'active',
container_status: 'stopped',
last_active: null,
created_at: '2026-01-15T00:00:00.000Z',
};
createSession(S1);
createSession(S2);
createSession(SB);
initSessionFolder(A, S1.id);
initSessionFolder(A, S2.id);
initSessionFolder(B, SB.id);
createDestination({
agent_group_id: A,
local_name: 'b',
target_type: 'agent',
target_id: B,
created_at: now(),
});
createDestination({
agent_group_id: B,
local_name: 'a',
target_type: 'agent',
target_id: A,
created_at: now(),
});
});
afterEach(() => {
closeDb();
if (fs.existsSync(TEST_DIR)) fs.rmSync(TEST_DIR, { recursive: true });
});
it('forward direction: stamps source_session_id on the target inbound row', async () => {
// A.S1 emits an outbound a2a to B.
await routeAgentMessage(
{
id: 'msg-from-A-S1',
platform_id: B,
content: JSON.stringify({ text: 'hello B' }),
in_reply_to: null,
},
S1,
);
const bRows = readInbound(B, SB.id);
expect(bRows).toHaveLength(1);
expect(bRows[0].platform_id).toBe(A);
expect(bRows[0].source_session_id).toBe(S1.id); // <- the return address
});
it('reply direction: routes back to the originating session, not the newest', async () => {
// A.S1 sends to B.
await routeAgentMessage(
{
id: 'msg-from-A-S1',
platform_id: B,
content: JSON.stringify({ text: 'ping' }),
in_reply_to: null,
},
S1,
);
// Capture the synthetic id the host stamped on B's inbound — that's what
// B's container would reference as `in_reply_to` when replying.
const bRows = readInbound(B, SB.id);
const yId = bRows[0].id;
// B replies to that message.
await routeAgentMessage(
{
id: 'msg-from-B',
platform_id: A,
content: JSON.stringify({ text: 'pong' }),
in_reply_to: yId,
},
SB,
);
const s1Rows = readInbound(A, S1.id);
const s2Rows = readInbound(A, S2.id);
// The reply lands in S1 (originator) even though S2 is newer.
expect(s1Rows).toHaveLength(1);
expect(s1Rows[0].platform_id).toBe(B);
expect(JSON.parse(s1Rows[0].content).text).toBe('pong');
expect(s2Rows).toHaveLength(0);
});
it('fallback: a2a with no in_reply_to falls through to newest-session lookup', async () => {
// No prior conversation. B initiates an a2a to A out of the blue.
await routeAgentMessage(
{
id: 'msg-from-B-fresh',
platform_id: A,
content: JSON.stringify({ text: 'unsolicited' }),
in_reply_to: null,
},
SB,
);
// Newest session wins (current heuristic, preserved).
const s1Rows = readInbound(A, S1.id);
const s2Rows = readInbound(A, S2.id);
expect(s1Rows).toHaveLength(0);
expect(s2Rows).toHaveLength(1);
});
});
+47 -2
View File
@@ -23,10 +23,11 @@ import path from 'path';
import { isSafeAttachmentName } from '../../attachment-safety.js';
import { getAgentGroup } from '../../db/agent-groups.js';
import { getInboundSourceSessionId } from '../../db/session-db.js';
import { getSession } from '../../db/sessions.js';
import { wakeContainer } from '../../container-runner.js';
import { log } from '../../log.js';
import { resolveSession, sessionDir, writeSessionMessage } from '../../session-manager.js';
import { openInboundDb, resolveSession, sessionDir, writeSessionMessage } from '../../session-manager.js';
import type { Session } from '../../types.js';
import { hasDestination } from './db/agent-destinations.js';
@@ -101,6 +102,49 @@ export interface RoutableAgentMessage {
id: string;
platform_id: string | null;
content: string;
/**
* For replies, the id of the inbound message being replied to. The
* container's formatter sets this from the first inbound in the batch
* (`container/agent-runner/src/formatter.ts`). Used here to route the
* reply back to the originating session — see `resolveTargetSession`.
*/
in_reply_to: string | null;
}
/**
* Pick which session of `targetAgentGroupId` should receive this a2a message.
*
* Return-path lookup: if the message is a reply (`in_reply_to` set), open the
* source agent's inbound DB and read the original triggering row's
* `source_session_id`. That column was stamped when the original outbound was
* routed — it's the session that started the conversation, and replies should
* land there even when the target agent group has multiple active sessions.
*
* Falls back to `resolveSession(..., 'agent-shared')` (which selects the
* newest active session) when:
* - the message has no `in_reply_to` (fresh-initiated a2a), OR
* - the referenced row isn't in source's inbound (cross-batch reference), OR
* - the referenced row's source_session_id is NULL (channel inbound or
* pre-migration row), OR
* - the recovered session no longer exists / belongs to a different agent.
*/
function resolveTargetSession(msg: RoutableAgentMessage, sourceSession: Session, targetAgentGroupId: string): Session {
if (msg.in_reply_to) {
const srcDb = openInboundDb(sourceSession.agent_group_id, sourceSession.id);
let originSessionId: string | null;
try {
originSessionId = getInboundSourceSessionId(srcDb, msg.in_reply_to);
} finally {
srcDb.close();
}
if (originSessionId) {
const candidate = getSession(originSessionId);
if (candidate && candidate.agent_group_id === targetAgentGroupId && candidate.status === 'active') {
return candidate;
}
}
}
return resolveSession(targetAgentGroupId, null, null, 'agent-shared').session;
}
export async function routeAgentMessage(msg: RoutableAgentMessage, session: Session): Promise<void> {
@@ -119,7 +163,7 @@ export async function routeAgentMessage(msg: RoutableAgentMessage, session: Sess
if (!getAgentGroup(targetAgentGroupId)) {
throw new Error(`target agent group ${targetAgentGroupId} not found for message ${msg.id}`);
}
const { session: targetSession } = resolveSession(targetAgentGroupId, null, null, 'agent-shared');
const targetSession = resolveTargetSession(msg, session, targetAgentGroupId);
const a2aMsgId = `a2a-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
// If the source message references files (via `send_file`), forward the
@@ -137,6 +181,7 @@ export async function routeAgentMessage(msg: RoutableAgentMessage, session: Sess
channelType: 'agent',
threadId: null,
content: forwardedContent,
sourceSessionId: session.id,
});
log.info('Agent message routed', {
from: session.agent_group_id,
+7
View File
@@ -210,6 +210,12 @@ export function writeSessionMessage(
* a trigger-1 message does arrive.
*/
trigger?: 0 | 1;
/**
* For agent-to-agent inbound: the source session id that emitted the
* outbound message which became this inbound row. Used as the return
* path so the target's reply routes back to that exact session.
*/
sourceSessionId?: string | null;
},
): void {
// Extract base64 attachment data, save to inbox, replace with file paths
@@ -228,6 +234,7 @@ export function writeSessionMessage(
processAfter: message.processAfter ?? null,
recurrence: message.recurrence ?? null,
trigger: message.trigger ?? 1,
sourceSessionId: message.sourceSessionId ?? null,
});
} finally {
db.close();