From 107945f10c09d6934566202fe6857c5a87ea3b82 Mon Sep 17 00:00:00 2001 From: gavrielc Date: Fri, 8 May 2026 00:48:10 +0300 Subject: [PATCH] fix(agent-to-agent): route A2A replies back to originating session (#2267) Squash merge of PR #2267 by ddaniels. When an agent group has more than one active session, A2A replies landed in the newest session via findSessionByAgentGroup's ORDER BY created_at DESC. The session that asked the question never saw the answer. Adds origin-aware return-path routing with three layers: 1. Direct return-path: if the reply has in_reply_to, look up the triggering inbound row's source_session_id and route there. 2. Peer-affinity fallback: find the most recent A2A inbound from this peer and use its source_session_id. 3. Legacy fallback: newest active session (pre-migration compat). Container-side: MCP send_message/send_file now thread the current batch's in_reply_to through to outbound rows via current-batch.ts. Also flips our A2A bug-documenting test (#2332) from asserting the broken behavior to asserting the fixed behavior. Co-Authored-By: Doug Daniels Co-Authored-By: Claude Opus 4.6 (1M context) --- container/agent-runner/src/current-batch.ts | 29 ++ .../agent-runner/src/mcp-tools/core.test.ts | 50 ++++ container/agent-runner/src/mcp-tools/core.ts | 19 +- container/agent-runner/src/poll-loop.ts | 24 +- src/db/schema.ts | 8 +- src/db/session-db.test.ts | 38 ++- src/db/session-db.ts | 55 +++- src/delivery.ts | 1 + src/host-core.test.ts | 14 +- .../agent-to-agent/agent-route.test.ts | 250 +++++++++++++++++- src/modules/agent-to-agent/agent-route.ts | 61 ++++- src/session-manager.ts | 7 + 12 files changed, 517 insertions(+), 39 deletions(-) create mode 100644 container/agent-runner/src/current-batch.ts create mode 100644 container/agent-runner/src/mcp-tools/core.test.ts diff --git a/container/agent-runner/src/current-batch.ts b/container/agent-runner/src/current-batch.ts new file mode 100644 index 000000000..b699c13ff --- /dev/null +++ b/container/agent-runner/src/current-batch.ts @@ -0,0 +1,29 @@ +/** + * Per-batch context the poll loop publishes for downstream consumers + * (MCP tools, etc.) that don't sit on the poll-loop's call stack. + * + * Today the only field is `inReplyTo` — the id of the first inbound + * message in the batch the agent is currently processing. MCP tools like + * `send_message` and `send_file` read this and stamp it onto the outbound + * row so the host's a2a return-path routing can correlate replies back to + * the originating session. + * + * This is module-level state on purpose: the agent-runner is single-process + * and processes one batch at a time. Poll-loop calls `setCurrentInReplyTo` + * before invoking the provider and `clearCurrentInReplyTo` after the batch + * completes (or errors out). + */ +let currentInReplyTo: string | null = null; + +export function setCurrentInReplyTo(id: string | null): void { + currentInReplyTo = id; +} + +export function clearCurrentInReplyTo(): void { + currentInReplyTo = null; +} + +export function getCurrentInReplyTo(): string | null { + return currentInReplyTo; +} + diff --git a/container/agent-runner/src/mcp-tools/core.test.ts b/container/agent-runner/src/mcp-tools/core.test.ts new file mode 100644 index 000000000..4cef950fb --- /dev/null +++ b/container/agent-runner/src/mcp-tools/core.test.ts @@ -0,0 +1,50 @@ +/** + * Tests for the core MCP tools' interaction with the per-batch routing + * context. The agent-runner sets a current `inReplyTo` at the top of each + * batch in poll-loop, and outbound writes from MCP tools (send_message, + * send_file) must pick it up so a2a return-path routing on the host can + * correlate replies back to the originating session. + */ +import { describe, it, expect, beforeEach, afterEach } from 'bun:test'; + +import { initTestSessionDb, closeSessionDb, getInboundDb } from '../db/connection.js'; +import { getUndeliveredMessages } from '../db/messages-out.js'; +import { setCurrentInReplyTo, clearCurrentInReplyTo } from '../current-batch.js'; +import { sendMessage } from './core.js'; + +beforeEach(() => { + initTestSessionDb(); + // Seed a peer agent destination + getInboundDb() + .prepare( + `INSERT INTO destinations (name, display_name, type, channel_type, platform_id, agent_group_id) + VALUES ('peer', 'Peer', 'agent', NULL, NULL, 'ag-peer')`, + ) + .run(); +}); + +afterEach(() => { + clearCurrentInReplyTo(); + closeSessionDb(); +}); + +describe('send_message MCP tool — in_reply_to plumbing', () => { + it('stamps current batch in_reply_to on outbound rows', async () => { + setCurrentInReplyTo('inbound-msg-1'); + + await sendMessage.handler({ to: 'peer', text: 'hello' }); + + const out = getUndeliveredMessages(); + expect(out).toHaveLength(1); + expect(out[0].in_reply_to).toBe('inbound-msg-1'); + }); + + it('writes null when no batch is active', async () => { + // No setCurrentInReplyTo before this call — simulates ad-hoc / out-of-batch invocation. + await sendMessage.handler({ to: 'peer', text: 'hello' }); + + const out = getUndeliveredMessages(); + expect(out).toHaveLength(1); + expect(out[0].in_reply_to).toBeNull(); + }); +}); diff --git a/container/agent-runner/src/mcp-tools/core.ts b/container/agent-runner/src/mcp-tools/core.ts index bf89ef826..48f87d596 100644 --- a/container/agent-runner/src/mcp-tools/core.ts +++ b/container/agent-runner/src/mcp-tools/core.ts @@ -9,6 +9,7 @@ import fs from 'fs'; import path from 'path'; +import { getCurrentInReplyTo } from '../current-batch.js'; import { findByName, getAllDestinations } from '../destinations.js'; import { getMessageIdBySeq, getRoutingBySeq, writeMessageOut } from '../db/messages-out.js'; import { getSessionRouting } from '../db/session-routing.js'; @@ -50,9 +51,7 @@ function destinationList(): string { */ function resolveRouting( to: string | undefined, -): - | { channel_type: string; platform_id: string; thread_id: string | null; resolvedName: string } - | { error: string } { +): { channel_type: string; platform_id: string; thread_id: string | null; resolvedName: string } | { error: string } { if (!to) { // Default: reply to whatever thread/channel this session is bound to. const session = getSessionRouting(); @@ -82,9 +81,7 @@ function resolveRouting( // preserve the thread_id so replies land in the correct thread. const session = getSessionRouting(); const threadId = - session.channel_type === dest.channelType && session.platform_id === dest.platformId - ? session.thread_id - : null; + session.channel_type === dest.channelType && session.platform_id === dest.platformId ? session.thread_id : null; return { channel_type: dest.channelType!, platform_id: dest.platformId!, @@ -98,12 +95,14 @@ function resolveRouting( export const sendMessage: McpToolDefinition = { tool: { name: 'send_message', - description: - 'Send a message to a named destination. If you have only one destination, you can omit `to`.', + description: 'Send a message to a named destination. If you have only one destination, you can omit `to`.', inputSchema: { type: 'object' as const, properties: { - to: { type: 'string', description: 'Destination name (e.g., "family", "worker-1"). Optional if you have only one destination.' }, + to: { + type: 'string', + description: 'Destination name (e.g., "family", "worker-1"). Optional if you have only one destination.', + }, text: { type: 'string', description: 'Message content' }, }, required: ['text'], @@ -119,6 +118,7 @@ export const sendMessage: McpToolDefinition = { const id = generateId(); const seq = writeMessageOut({ id, + in_reply_to: getCurrentInReplyTo(), kind: 'chat', platform_id: routing.platform_id, channel_type: routing.channel_type, @@ -165,6 +165,7 @@ export const sendFile: McpToolDefinition = { writeMessageOut({ id, + in_reply_to: getCurrentInReplyTo(), kind: 'chat', platform_id: routing.platform_id, channel_type: routing.channel_type, diff --git a/container/agent-runner/src/poll-loop.ts b/container/agent-runner/src/poll-loop.ts index f22fc7d91..e0ac7226d 100644 --- a/container/agent-runner/src/poll-loop.ts +++ b/container/agent-runner/src/poll-loop.ts @@ -2,12 +2,17 @@ import { findByName, getAllDestinations, type DestinationEntry } from './destina import { getPendingMessages, markProcessing, markCompleted, type MessageInRow } from './db/messages-in.js'; import { writeMessageOut } from './db/messages-out.js'; import { getInboundDb, touchHeartbeat, clearStaleProcessingAcks } from './db/connection.js'; +import { clearContinuation, migrateLegacyContinuation, setContinuation } from './db/session-state.js'; +import { clearCurrentInReplyTo, setCurrentInReplyTo } from './current-batch.js'; import { - clearContinuation, - migrateLegacyContinuation, - setContinuation, -} from './db/session-state.js'; -import { formatMessages, extractRouting, categorizeMessage, isClearCommand, isRunnerCommand, stripInternalTags, type RoutingContext } from './formatter.js'; + formatMessages, + extractRouting, + categorizeMessage, + isClearCommand, + isRunnerCommand, + stripInternalTags, + type RoutingContext, +} from './formatter.js'; import type { AgentProvider, AgentQuery, ProviderEvent } from './providers/types.js'; const POLL_INTERVAL_MS = 1000; @@ -170,6 +175,9 @@ export async function runPollLoop(config: PollLoopConfig): Promise { // Process the query while concurrently polling for new messages const skippedSet = new Set(skipped); const processingIds = ids.filter((id) => !commandIds.includes(id) && !skippedSet.has(id)); + // Publish the batch's in_reply_to so MCP tools (send_message, send_file) + // can stamp it on outbound rows — needed for a2a return-path routing. + setCurrentInReplyTo(routing.inReplyTo); try { const result = await processQuery(query, routing, processingIds, config.providerName); if (result.continuation && result.continuation !== continuation) { @@ -198,6 +206,8 @@ export async function runPollLoop(config: PollLoopConfig): Promise { thread_id: routing.threadId, content: JSON.stringify({ text: `Error: ${errMsg}` }), }); + } finally { + clearCurrentInReplyTo(); } // Ensure completed even if processQuery ended without a result event @@ -402,7 +412,9 @@ function handleEvent(event: ProviderEvent, _routing: RoutingContext): void { log(`Result: ${event.text ? event.text.slice(0, 200) : '(empty)'}`); break; case 'error': - log(`Error: ${event.message} (retryable: ${event.retryable}${event.classification ? `, ${event.classification}` : ''})`); + log( + `Error: ${event.message} (retryable: ${event.retryable}${event.classification ? `, ${event.classification}` : ''})`, + ); break; case 'progress': log(`Progress: ${event.message}`); diff --git a/src/db/schema.ts b/src/db/schema.ts index 8433035be..48d9ce397 100644 --- a/src/db/schema.ts +++ b/src/db/schema.ts @@ -171,7 +171,13 @@ CREATE TABLE IF NOT EXISTS messages_in ( platform_id TEXT, channel_type TEXT, thread_id TEXT, - content TEXT NOT NULL + content TEXT NOT NULL, + -- For agent-to-agent inbound rows: the source session that emitted the + -- triggering outbound. Used as a return path when the target replies — + -- the reply routes back to this exact session, not to the source agent + -- group's "newest" session. NULL on channel-side inbound and on a2a rows + -- written before this column existed. + source_session_id TEXT ); CREATE INDEX IF NOT EXISTS idx_messages_in_series ON messages_in(series_id); diff --git a/src/db/session-db.test.ts b/src/db/session-db.test.ts index 530790094..a20210050 100644 --- a/src/db/session-db.test.ts +++ b/src/db/session-db.test.ts @@ -10,7 +10,7 @@ import fs from 'fs'; import path from 'path'; import { describe, it, expect, afterEach } from 'vitest'; -import { migrateMessagesInTable } from './session-db.js'; +import { getInboundSourceSessionId, migrateMessagesInTable } from './session-db.js'; const TEST_DIR = '/tmp/nanoclaw-session-db-test'; const DB_PATH = path.join(TEST_DIR, 'inbound.db'); @@ -55,4 +55,40 @@ describe('migrateMessagesInTable', () => { expect(row.series_id).toBe('legacy-1'); db.close(); }); + + it('adds source_session_id on a legacy DB, leaves existing rows NULL, is idempotent', () => { + if (fs.existsSync(TEST_DIR)) fs.rmSync(TEST_DIR, { recursive: true }); + fs.mkdirSync(TEST_DIR, { recursive: true }); + + const db = new Database(DB_PATH); + db.exec(` + CREATE TABLE messages_in ( + id TEXT PRIMARY KEY, + seq INTEGER UNIQUE, + kind TEXT NOT NULL, + timestamp TEXT NOT NULL, + status TEXT DEFAULT 'pending', + process_after TEXT, + recurrence TEXT, + tries INTEGER DEFAULT 0, + platform_id TEXT, + channel_type TEXT, + thread_id TEXT, + content TEXT NOT NULL + ); + `); + db.prepare( + "INSERT INTO messages_in (id, seq, kind, timestamp, status, content) VALUES (?, ?, 'chat', datetime('now'), 'pending', '{}')", + ).run('legacy-2', 2); + + migrateMessagesInTable(db); + migrateMessagesInTable(db); // idempotent + + const cols = (db.prepare("PRAGMA table_info('messages_in')").all() as Array<{ name: string }>).map((c) => c.name); + expect(cols).toContain('source_session_id'); + + expect(getInboundSourceSessionId(db, 'legacy-2')).toBeNull(); + expect(getInboundSourceSessionId(db, 'does-not-exist')).toBeNull(); + db.close(); + }); }); diff --git a/src/db/session-db.ts b/src/db/session-db.ts index addc39dbf..671370225 100644 --- a/src/db/session-db.ts +++ b/src/db/session-db.ts @@ -108,14 +108,21 @@ export function insertMessage( * Host countDueMessages gates on this; container reads everything. */ trigger?: 0 | 1; + /** + * For agent-to-agent inbound: the source session id that emitted the + * outbound message which became this inbound row. Used as the return + * path for the target's reply. NULL on channel-side inbound. + */ + sourceSessionId?: string | null; }, ): void { db.prepare( - `INSERT INTO messages_in (id, seq, kind, timestamp, status, platform_id, channel_type, thread_id, content, process_after, recurrence, series_id, trigger) - VALUES (@id, @seq, @kind, @timestamp, 'pending', @platformId, @channelType, @threadId, @content, @processAfter, @recurrence, @id, @trigger)`, + `INSERT INTO messages_in (id, seq, kind, timestamp, status, platform_id, channel_type, thread_id, content, process_after, recurrence, series_id, trigger, source_session_id) + VALUES (@id, @seq, @kind, @timestamp, 'pending', @platformId, @channelType, @threadId, @content, @processAfter, @recurrence, @id, @trigger, @sourceSessionId)`, ).run({ ...message, trigger: message.trigger ?? 1, + sourceSessionId: message.sourceSessionId ?? null, seq: nextEvenSeq(db), }); } @@ -239,6 +246,7 @@ export interface OutboundMessage { channel_type: string | null; thread_id: string | null; content: string; + in_reply_to: string | null; } export function getDueOutboundMessages(db: Database.Database): OutboundMessage[] { @@ -305,4 +313,47 @@ export function migrateMessagesInTable(db: Database.Database): void { // the agent" semantics, so backfill 1 and default 1 for new inserts. db.prepare('ALTER TABLE messages_in ADD COLUMN trigger INTEGER NOT NULL DEFAULT 1').run(); } + if (!cols.has('source_session_id')) { + // For agent-to-agent return-path routing. NULL on existing rows is fine — + // their replies fall back to the legacy "newest active session" lookup. + db.prepare('ALTER TABLE messages_in ADD COLUMN source_session_id TEXT').run(); + } +} + +/** + * Look up an inbound row's source_session_id by its message id. Returns null + * if the row doesn't exist or the column is NULL (channel inbound or + * pre-migration a2a inbound). Used by a2a routing to route replies back to + * the originating session. + */ +export function getInboundSourceSessionId(db: Database.Database, messageId: string): string | null { + const row = db.prepare('SELECT source_session_id FROM messages_in WHERE id = ?').get(messageId) as + | { source_session_id: string | null } + | undefined; + return row?.source_session_id ?? null; +} + +/** + * Find the source_session_id of the most recent a2a inbound row from a + * specific peer (by agent group id). Used as a peer-affinity fallback in + * a2a routing when an outbound reply has no `in_reply_to` (e.g. the + * container's send_message MCP tool path didn't thread the batch's + * in_reply_to through). + * + * Heuristic: "the last time this peer talked to me, which session was it?" + * Returns null when no prior a2a inbound from that peer carries a + * non-null source_session_id (typical for pre-migration installs). + */ +export function getMostRecentPeerSourceSessionId(db: Database.Database, peerAgentGroupId: string): string | null { + const row = db + .prepare( + `SELECT source_session_id FROM messages_in + WHERE channel_type = 'agent' + AND platform_id = ? + AND source_session_id IS NOT NULL + ORDER BY seq DESC + LIMIT 1`, + ) + .get(peerAgentGroupId) as { source_session_id: string | null } | undefined; + return row?.source_session_id ?? null; } diff --git a/src/delivery.ts b/src/delivery.ts index 036153a8b..a47fec270 100644 --- a/src/delivery.ts +++ b/src/delivery.ts @@ -239,6 +239,7 @@ async function deliverMessage( channel_type: string | null; thread_id: string | null; content: string; + in_reply_to: string | null; }, session: Session, inDb: Database.Database, diff --git a/src/host-core.test.ts b/src/host-core.test.ts index 976544f17..b9ba62ada 100644 --- a/src/host-core.test.ts +++ b/src/host-core.test.ts @@ -969,12 +969,10 @@ describe('agent-to-agent routing', () => { expect(JSON.parse(rows[0].content).text).toBe('research this'); }); - it('BUG: A2A return path resolves to wrong session when multiple channel sessions exist (#2332)', async () => { + it('A2A return path routes to originating session, not newest (#2332)', async () => { // PA has Slack session, then gets wired to Discord (newer session). - // Researcher responds to PA. routeAgentMessage calls - // resolveSession('ag-pa', null, null, 'agent-shared') which calls - // findSessionByAgentGroup — picks newest (Discord) instead of the - // Slack session that originated the A2A call. + // Researcher responds to PA. With the return-path fix, the reply + // routes back to the Slack session (originator) not Discord (newest). const { routeAgentMessage } = await import('./modules/agent-to-agent/agent-route.js'); const { session: paSlackSession } = resolveSession('ag-pa', 'mg-slack', null, 'shared'); @@ -1013,9 +1011,9 @@ describe('agent-to-agent routing', () => { const discordA2a = discordDb.prepare("SELECT * FROM messages_in WHERE channel_type = 'agent'").all(); discordDb.close(); - // Document the bug: response lands in Discord (newest) not Slack (origin) - expect(discordA2a).toHaveLength(1); // BUG: should be 0 - expect(slackA2a).toHaveLength(0); // BUG: should be 1 + // Fixed: response lands in Slack (origin) not Discord (newest) + expect(slackA2a).toHaveLength(1); + expect(discordA2a).toHaveLength(0); }); it('BUG: A2A-only session gets null session_routing (#2332)', async () => { diff --git a/src/modules/agent-to-agent/agent-route.test.ts b/src/modules/agent-to-agent/agent-route.test.ts index 4d48f6f7b..274565d2c 100644 --- a/src/modules/agent-to-agent/agent-route.test.ts +++ b/src/modules/agent-to-agent/agent-route.test.ts @@ -1,20 +1,53 @@ -import { describe, expect, it } from 'vitest'; +import Database from 'better-sqlite3'; +import fs from 'fs'; +import { describe, expect, it, beforeEach, afterEach, vi } from 'vitest'; -import { isSafeAttachmentName } from './agent-route.js'; +import { isSafeAttachmentName, routeAgentMessage } from './agent-route.js'; +import { createDestination } from './db/agent-destinations.js'; +import { initTestDb, closeDb, runMigrations, createAgentGroup } from '../../db/index.js'; +import { createSession } from '../../db/sessions.js'; +import { initSessionFolder, inboundDbPath } from '../../session-manager.js'; +import type { Session } from '../../types.js'; + +vi.mock('../../container-runner.js', () => ({ + wakeContainer: vi.fn().mockResolvedValue(undefined), + isContainerRunning: vi.fn().mockReturnValue(false), + getActiveContainerCount: vi.fn().mockReturnValue(0), + killContainer: vi.fn(), +})); + +vi.mock('../../config.js', async () => { + const actual = await vi.importActual('../../config.js'); + return { ...actual, DATA_DIR: '/tmp/nanoclaw-test-a2a-route' }; +}); + +const TEST_DIR = '/tmp/nanoclaw-test-a2a-route'; + +function now(): string { + return new Date().toISOString(); +} + +function readInbound(agentGroupId: string, sessionId: string) { + const db = new Database(inboundDbPath(agentGroupId, sessionId), { readonly: true }); + const rows = db + .prepare('SELECT id, platform_id, channel_type, content, source_session_id FROM messages_in ORDER BY seq') + .all() as Array<{ + id: string; + platform_id: string | null; + channel_type: string | null; + content: string; + source_session_id: string | null; + }>; + db.close(); + return rows; +} -/** - * `forwardAttachedFiles` has a filesystem side that's awkward to unit-test - * without mocking DATA_DIR. The guarantee worth pinning is that the - * filename validator rejects everything that could escape the inbox dir — - * `forwardAttachedFiles` runs this guard before any I/O, so traversal is - * impossible as long as this matrix holds. - */ describe('isSafeAttachmentName', () => { it('accepts plain filenames', () => { expect(isSafeAttachmentName('baby-duck.png')).toBe(true); expect(isSafeAttachmentName('file with spaces.pdf')).toBe(true); expect(isSafeAttachmentName('report.v2.docx')).toBe(true); - expect(isSafeAttachmentName('.hidden')).toBe(true); // leading dot is fine, just not `.` / `..` + expect(isSafeAttachmentName('.hidden')).toBe(true); }); it('rejects empty / sentinel values', () => { @@ -44,3 +77,200 @@ describe('isSafeAttachmentName', () => { expect(isSafeAttachmentName(undefined as unknown as string)).toBe(false); }); }); + +/** + * Return-path routing: when an a2a reply targets an agent group with multiple + * sessions, it must land in the *originating* session — not the newest one. + * + * Setup: agent A has two active sessions S1 (older) + S2 (newer). + * Agent B is the peer A talks to. Bidirectional destinations wired. + */ +describe('routeAgentMessage return-path', () => { + const A = 'ag-A'; + const B = 'ag-B'; + let S1: Session; + let S2: Session; + let SB: Session; + + beforeEach(() => { + if (fs.existsSync(TEST_DIR)) fs.rmSync(TEST_DIR, { recursive: true }); + fs.mkdirSync(TEST_DIR, { recursive: true }); + + const db = initTestDb(); + runMigrations(db); + + createAgentGroup({ id: A, name: 'A', folder: 'a', agent_provider: null, created_at: now() }); + createAgentGroup({ id: B, name: 'B', folder: 'b', agent_provider: null, created_at: now() }); + + // S1 (older), S2 (newer) — both active sessions on A. + S1 = { + id: 'sess-A-old', + agent_group_id: A, + messaging_group_id: null, + thread_id: null, + agent_provider: null, + status: 'active', + container_status: 'stopped', + last_active: null, + created_at: '2026-01-01T00:00:00.000Z', + }; + S2 = { + id: 'sess-A-new', + agent_group_id: A, + messaging_group_id: null, + thread_id: null, + agent_provider: null, + status: 'active', + container_status: 'stopped', + last_active: null, + created_at: '2026-02-01T00:00:00.000Z', + }; + SB = { + id: 'sess-B', + agent_group_id: B, + messaging_group_id: null, + thread_id: null, + agent_provider: null, + status: 'active', + container_status: 'stopped', + last_active: null, + created_at: '2026-01-15T00:00:00.000Z', + }; + createSession(S1); + createSession(S2); + createSession(SB); + initSessionFolder(A, S1.id); + initSessionFolder(A, S2.id); + initSessionFolder(B, SB.id); + + createDestination({ + agent_group_id: A, + local_name: 'b', + target_type: 'agent', + target_id: B, + created_at: now(), + }); + createDestination({ + agent_group_id: B, + local_name: 'a', + target_type: 'agent', + target_id: A, + created_at: now(), + }); + }); + + afterEach(() => { + closeDb(); + if (fs.existsSync(TEST_DIR)) fs.rmSync(TEST_DIR, { recursive: true }); + }); + + it('forward direction: stamps source_session_id on the target inbound row', async () => { + // A.S1 emits an outbound a2a to B. + await routeAgentMessage( + { + id: 'msg-from-A-S1', + platform_id: B, + content: JSON.stringify({ text: 'hello B' }), + in_reply_to: null, + }, + S1, + ); + + const bRows = readInbound(B, SB.id); + expect(bRows).toHaveLength(1); + expect(bRows[0].platform_id).toBe(A); + expect(bRows[0].source_session_id).toBe(S1.id); // <- the return address + }); + + it('reply direction: routes back to the originating session, not the newest', async () => { + // A.S1 sends to B. + await routeAgentMessage( + { + id: 'msg-from-A-S1', + platform_id: B, + content: JSON.stringify({ text: 'ping' }), + in_reply_to: null, + }, + S1, + ); + + // Capture the synthetic id the host stamped on B's inbound — that's what + // B's container would reference as `in_reply_to` when replying. + const bRows = readInbound(B, SB.id); + const yId = bRows[0].id; + + // B replies to that message. + await routeAgentMessage( + { + id: 'msg-from-B', + platform_id: A, + content: JSON.stringify({ text: 'pong' }), + in_reply_to: yId, + }, + SB, + ); + + const s1Rows = readInbound(A, S1.id); + const s2Rows = readInbound(A, S2.id); + + // The reply lands in S1 (originator) even though S2 is newer. + expect(s1Rows).toHaveLength(1); + expect(s1Rows[0].platform_id).toBe(B); + expect(JSON.parse(s1Rows[0].content).text).toBe('pong'); + expect(s2Rows).toHaveLength(0); + }); + + it('fallback: a2a with no in_reply_to falls through to newest-session lookup', async () => { + // No prior conversation. B initiates an a2a to A out of the blue. + await routeAgentMessage( + { + id: 'msg-from-B-fresh', + platform_id: A, + content: JSON.stringify({ text: 'unsolicited' }), + in_reply_to: null, + }, + SB, + ); + + // Newest session wins (current heuristic, preserved). + const s1Rows = readInbound(A, S1.id); + const s2Rows = readInbound(A, S2.id); + expect(s1Rows).toHaveLength(0); + expect(s2Rows).toHaveLength(1); + }); + + it('peer-affinity fallback: with no in_reply_to, routes to most recent peer-source session', async () => { + // A.S1 sends to B (establishing affinity: B's last contact from A was via S1). + await routeAgentMessage( + { + id: 'msg-from-A-S1-pre', + platform_id: B, + content: JSON.stringify({ text: 'context-establishing' }), + in_reply_to: null, + }, + S1, + ); + + // B sends a follow-up but its container forgot to set in_reply_to (e.g. + // emitted via an MCP tool path that doesn't thread the batch's in_reply_to + // through). The host should still route this to S1 because S1 is the + // session most recently in conversation with B — not the chronologically + // newest session of A. + await routeAgentMessage( + { + id: 'msg-from-B-followup', + platform_id: A, + content: JSON.stringify({ text: 'standing by' }), + in_reply_to: null, + }, + SB, + ); + + const s1Rows = readInbound(A, S1.id); + const s2Rows = readInbound(A, S2.id); + // Affinity wins: reply to S1, not the newer S2. + expect(s1Rows).toHaveLength(1); + expect(JSON.parse(s1Rows[0].content).text).toBe('standing by'); + expect(s2Rows).toHaveLength(0); + }); +}); diff --git a/src/modules/agent-to-agent/agent-route.ts b/src/modules/agent-to-agent/agent-route.ts index 613a1edfb..58e141960 100644 --- a/src/modules/agent-to-agent/agent-route.ts +++ b/src/modules/agent-to-agent/agent-route.ts @@ -23,10 +23,11 @@ import path from 'path'; import { isSafeAttachmentName } from '../../attachment-safety.js'; import { getAgentGroup } from '../../db/agent-groups.js'; +import { getInboundSourceSessionId, getMostRecentPeerSourceSessionId } from '../../db/session-db.js'; import { getSession } from '../../db/sessions.js'; import { wakeContainer } from '../../container-runner.js'; import { log } from '../../log.js'; -import { resolveSession, sessionDir, writeSessionMessage } from '../../session-manager.js'; +import { openInboundDb, resolveSession, sessionDir, writeSessionMessage } from '../../session-manager.js'; import type { Session } from '../../types.js'; import { hasDestination } from './db/agent-destinations.js'; @@ -101,6 +102,61 @@ export interface RoutableAgentMessage { id: string; platform_id: string | null; content: string; + /** + * For replies, the id of the inbound message being replied to. The + * container's formatter sets this from the first inbound in the batch + * (`container/agent-runner/src/formatter.ts`). Used here to route the + * reply back to the originating session — see `resolveTargetSession`. + */ + in_reply_to: string | null; +} + +/** + * Pick which session of `targetAgentGroupId` should receive this a2a message. + * + * Three layers, highest-fidelity first: + * + * 1. **Direct return-path** (in_reply_to lookup): if the message is a reply + * (`in_reply_to` set), open the source agent's inbound DB and read the + * triggering row's `source_session_id`. That column was stamped when the + * original outbound was routed — it's the session that started the + * conversation, and replies should land there even when the target has + * multiple active sessions. + * + * 2. **Peer-affinity fallback**: if (1) misses (in_reply_to is null or the + * referenced row isn't an a2a inbound), look up the most recent a2a + * inbound *from the target agent group* in source's inbound and use its + * `source_session_id`. The intuition: the last time this peer talked to + * me, which target session was driving? Route the reply there, since + * that's the session most plausibly in active conversation. + * + * 3. **Newest active session**: legacy heuristic. Used when no prior a2a + * has been recorded with `source_session_id` (e.g. fresh installs, + * pre-migration data). + */ +function resolveTargetSession(msg: RoutableAgentMessage, sourceSession: Session, targetAgentGroupId: string): Session { + const srcDb = openInboundDb(sourceSession.agent_group_id, sourceSession.id); + let originSessionId: string | null = null; + try { + if (msg.in_reply_to) { + originSessionId = getInboundSourceSessionId(srcDb, msg.in_reply_to); + } + if (!originSessionId) { + // Peer-affinity fallback — covers the case where the container's + // outbound write didn't carry in_reply_to (e.g. legacy MCP send_message + // path, container running pre-fix code). + originSessionId = getMostRecentPeerSourceSessionId(srcDb, targetAgentGroupId); + } + } finally { + srcDb.close(); + } + if (originSessionId) { + const candidate = getSession(originSessionId); + if (candidate && candidate.agent_group_id === targetAgentGroupId && candidate.status === 'active') { + return candidate; + } + } + return resolveSession(targetAgentGroupId, null, null, 'agent-shared').session; } export async function routeAgentMessage(msg: RoutableAgentMessage, session: Session): Promise { @@ -119,7 +175,7 @@ export async function routeAgentMessage(msg: RoutableAgentMessage, session: Sess if (!getAgentGroup(targetAgentGroupId)) { throw new Error(`target agent group ${targetAgentGroupId} not found for message ${msg.id}`); } - const { session: targetSession } = resolveSession(targetAgentGroupId, null, null, 'agent-shared'); + const targetSession = resolveTargetSession(msg, session, targetAgentGroupId); const a2aMsgId = `a2a-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; // If the source message references files (via `send_file`), forward the @@ -137,6 +193,7 @@ export async function routeAgentMessage(msg: RoutableAgentMessage, session: Sess channelType: 'agent', threadId: null, content: forwardedContent, + sourceSessionId: session.id, }); log.info('Agent message routed', { from: session.agent_group_id, diff --git a/src/session-manager.ts b/src/session-manager.ts index e3f3f7a0c..5c423eaeb 100644 --- a/src/session-manager.ts +++ b/src/session-manager.ts @@ -210,6 +210,12 @@ export function writeSessionMessage( * a trigger-1 message does arrive. */ trigger?: 0 | 1; + /** + * For agent-to-agent inbound: the source session id that emitted the + * outbound message which became this inbound row. Used as the return + * path so the target's reply routes back to that exact session. + */ + sourceSessionId?: string | null; }, ): void { // Extract base64 attachment data, save to inbox, replace with file paths @@ -228,6 +234,7 @@ export function writeSessionMessage( processAfter: message.processAfter ?? null, recurrence: message.recurrence ?? null, trigger: message.trigger ?? 1, + sourceSessionId: message.sourceSessionId ?? null, }); } finally { db.close();