Compare commits

...

6 Commits

Author SHA1 Message Date
gavrielc 6313a63d43 chore: whitespace to retrigger CI 2026-05-08 00:46:07 +03:00
gavrielc e9fafee764 chore: trigger CI rerun after rebase 2026-05-08 00:44:44 +03:00
gavrielc 5b66525e54 merge main into skill/a2a-return-path 2026-05-08 00:41:16 +03:00
gavrielc 6c2134694a test: flip A2A return-path test to assert correct behavior
The return-path fix in this PR resolves the bug where A2A replies landed
in the newest session instead of the originating one. Update the test
assertion from documenting the broken state to asserting the fixed state:
response now lands in Slack (origin) not Discord (newest).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-08 00:39:40 +03:00
Doug Daniels 323ba125e7 fix(agent-to-agent): thread in_reply_to through MCP send_message + add peer-affinity fallback
The earlier return-path fix relied on outbound rows carrying `in_reply_to`
so the host could correlate replies back to the originating session. The
container's XML `<message to="...">` path set this correctly, but the MCP
`send_message` and `send_file` tools wrote outbound without it — so any
reply emitted via those tool paths fell through to the legacy "newest
session" tie-break, defeating the fix in practice.

Two changes:

Container side
- New `current-batch.ts` exposes the active batch's `inReplyTo` to MCP
  tools that don't sit on poll-loop's call stack.
- `poll-loop.ts` publishes it before invoking the provider and clears it
  in a `finally` block so subsequent ad-hoc invocations don't inherit
  stale state.
- `mcp-tools/core.ts` reads it and stamps `in_reply_to` on the outbound
  row in both `send_message` and `send_file`.

Host side
- New `getMostRecentPeerSourceSessionId` in session-db.ts.
- `resolveTargetSession` in agent-route.ts now tries three layers:
  direct in_reply_to lookup → peer-affinity ("most recent a2a from this
  peer") → legacy newest-session. Peer affinity covers the gap when
  containers running pre-fix code or unusual call paths still emit
  outbound without `in_reply_to`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 00:38:53 +03:00
Doug Daniels 04f03e9064 fix(agent-to-agent): route a2a replies back to the originating session
When a target agent group has multiple active sessions (e.g. a parent
wired in `shared` mode with both Signal and email), every a2a reply
landed in the newest session via `findSessionByAgentGroup`'s
`ORDER BY created_at DESC LIMIT 1`. Conversations split-brain: the
session that asked the question never sees the answer.

Stamp `source_session_id` on the synthetic a2a inbound row when the host
routes an outbound, then on reply look up that column via the reply's
`in_reply_to` and route the response back to the originating session.
Falls back to the prior "newest active" heuristic when there's no
`in_reply_to` (fresh-initiated a2a) or the lookup misses.

- `messages_in` gains a nullable `source_session_id` column; existing
  rows stay NULL and route via the fallback path.
- The migration runs on every `openInboundDb` call, mirroring the
  existing `series_id` / `trigger` additive migrations.
- Container side is unchanged — formatter already populates
  `in_reply_to` on outbound replies.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 00:38:37 +03:00
12 changed files with 517 additions and 39 deletions
@@ -0,0 +1,29 @@
/**
* Per-batch context the poll loop publishes for downstream consumers
* (MCP tools, etc.) that don't sit on the poll-loop's call stack.
*
* Today the only field is `inReplyTo` — the id of the first inbound
* message in the batch the agent is currently processing. MCP tools like
* `send_message` and `send_file` read this and stamp it onto the outbound
* row so the host's a2a return-path routing can correlate replies back to
* the originating session.
*
* This is module-level state on purpose: the agent-runner is single-process
* and processes one batch at a time. Poll-loop calls `setCurrentInReplyTo`
* before invoking the provider and `clearCurrentInReplyTo` after the batch
* completes (or errors out).
*/
let currentInReplyTo: string | null = null;
export function setCurrentInReplyTo(id: string | null): void {
currentInReplyTo = id;
}
export function clearCurrentInReplyTo(): void {
currentInReplyTo = null;
}
export function getCurrentInReplyTo(): string | null {
return currentInReplyTo;
}
@@ -0,0 +1,50 @@
/**
* Tests for the core MCP tools' interaction with the per-batch routing
* context. The agent-runner sets a current `inReplyTo` at the top of each
* batch in poll-loop, and outbound writes from MCP tools (send_message,
* send_file) must pick it up so a2a return-path routing on the host can
* correlate replies back to the originating session.
*/
import { describe, it, expect, beforeEach, afterEach } from 'bun:test';
import { initTestSessionDb, closeSessionDb, getInboundDb } from '../db/connection.js';
import { getUndeliveredMessages } from '../db/messages-out.js';
import { setCurrentInReplyTo, clearCurrentInReplyTo } from '../current-batch.js';
import { sendMessage } from './core.js';
beforeEach(() => {
initTestSessionDb();
// Seed a peer agent destination
getInboundDb()
.prepare(
`INSERT INTO destinations (name, display_name, type, channel_type, platform_id, agent_group_id)
VALUES ('peer', 'Peer', 'agent', NULL, NULL, 'ag-peer')`,
)
.run();
});
afterEach(() => {
clearCurrentInReplyTo();
closeSessionDb();
});
describe('send_message MCP tool — in_reply_to plumbing', () => {
it('stamps current batch in_reply_to on outbound rows', async () => {
setCurrentInReplyTo('inbound-msg-1');
await sendMessage.handler({ to: 'peer', text: 'hello' });
const out = getUndeliveredMessages();
expect(out).toHaveLength(1);
expect(out[0].in_reply_to).toBe('inbound-msg-1');
});
it('writes null when no batch is active', async () => {
// No setCurrentInReplyTo before this call — simulates ad-hoc / out-of-batch invocation.
await sendMessage.handler({ to: 'peer', text: 'hello' });
const out = getUndeliveredMessages();
expect(out).toHaveLength(1);
expect(out[0].in_reply_to).toBeNull();
});
});
+10 -9
View File
@@ -9,6 +9,7 @@
import fs from 'fs';
import path from 'path';
import { getCurrentInReplyTo } from '../current-batch.js';
import { findByName, getAllDestinations } from '../destinations.js';
import { getMessageIdBySeq, getRoutingBySeq, writeMessageOut } from '../db/messages-out.js';
import { getSessionRouting } from '../db/session-routing.js';
@@ -50,9 +51,7 @@ function destinationList(): string {
*/
function resolveRouting(
to: string | undefined,
):
| { channel_type: string; platform_id: string; thread_id: string | null; resolvedName: string }
| { error: string } {
): { channel_type: string; platform_id: string; thread_id: string | null; resolvedName: string } | { error: string } {
if (!to) {
// Default: reply to whatever thread/channel this session is bound to.
const session = getSessionRouting();
@@ -82,9 +81,7 @@ function resolveRouting(
// preserve the thread_id so replies land in the correct thread.
const session = getSessionRouting();
const threadId =
session.channel_type === dest.channelType && session.platform_id === dest.platformId
? session.thread_id
: null;
session.channel_type === dest.channelType && session.platform_id === dest.platformId ? session.thread_id : null;
return {
channel_type: dest.channelType!,
platform_id: dest.platformId!,
@@ -98,12 +95,14 @@ function resolveRouting(
export const sendMessage: McpToolDefinition = {
tool: {
name: 'send_message',
description:
'Send a message to a named destination. If you have only one destination, you can omit `to`.',
description: 'Send a message to a named destination. If you have only one destination, you can omit `to`.',
inputSchema: {
type: 'object' as const,
properties: {
to: { type: 'string', description: 'Destination name (e.g., "family", "worker-1"). Optional if you have only one destination.' },
to: {
type: 'string',
description: 'Destination name (e.g., "family", "worker-1"). Optional if you have only one destination.',
},
text: { type: 'string', description: 'Message content' },
},
required: ['text'],
@@ -119,6 +118,7 @@ export const sendMessage: McpToolDefinition = {
const id = generateId();
const seq = writeMessageOut({
id,
in_reply_to: getCurrentInReplyTo(),
kind: 'chat',
platform_id: routing.platform_id,
channel_type: routing.channel_type,
@@ -165,6 +165,7 @@ export const sendFile: McpToolDefinition = {
writeMessageOut({
id,
in_reply_to: getCurrentInReplyTo(),
kind: 'chat',
platform_id: routing.platform_id,
channel_type: routing.channel_type,
+18 -6
View File
@@ -2,12 +2,17 @@ import { findByName, getAllDestinations, type DestinationEntry } from './destina
import { getPendingMessages, markProcessing, markCompleted, type MessageInRow } from './db/messages-in.js';
import { writeMessageOut } from './db/messages-out.js';
import { getInboundDb, touchHeartbeat, clearStaleProcessingAcks } from './db/connection.js';
import { clearContinuation, migrateLegacyContinuation, setContinuation } from './db/session-state.js';
import { clearCurrentInReplyTo, setCurrentInReplyTo } from './current-batch.js';
import {
clearContinuation,
migrateLegacyContinuation,
setContinuation,
} from './db/session-state.js';
import { formatMessages, extractRouting, categorizeMessage, isClearCommand, isRunnerCommand, stripInternalTags, type RoutingContext } from './formatter.js';
formatMessages,
extractRouting,
categorizeMessage,
isClearCommand,
isRunnerCommand,
stripInternalTags,
type RoutingContext,
} from './formatter.js';
import type { AgentProvider, AgentQuery, ProviderEvent } from './providers/types.js';
const POLL_INTERVAL_MS = 1000;
@@ -170,6 +175,9 @@ export async function runPollLoop(config: PollLoopConfig): Promise<void> {
// Process the query while concurrently polling for new messages
const skippedSet = new Set(skipped);
const processingIds = ids.filter((id) => !commandIds.includes(id) && !skippedSet.has(id));
// Publish the batch's in_reply_to so MCP tools (send_message, send_file)
// can stamp it on outbound rows — needed for a2a return-path routing.
setCurrentInReplyTo(routing.inReplyTo);
try {
const result = await processQuery(query, routing, processingIds, config.providerName);
if (result.continuation && result.continuation !== continuation) {
@@ -198,6 +206,8 @@ export async function runPollLoop(config: PollLoopConfig): Promise<void> {
thread_id: routing.threadId,
content: JSON.stringify({ text: `Error: ${errMsg}` }),
});
} finally {
clearCurrentInReplyTo();
}
// Ensure completed even if processQuery ended without a result event
@@ -402,7 +412,9 @@ function handleEvent(event: ProviderEvent, _routing: RoutingContext): void {
log(`Result: ${event.text ? event.text.slice(0, 200) : '(empty)'}`);
break;
case 'error':
log(`Error: ${event.message} (retryable: ${event.retryable}${event.classification ? `, ${event.classification}` : ''})`);
log(
`Error: ${event.message} (retryable: ${event.retryable}${event.classification ? `, ${event.classification}` : ''})`,
);
break;
case 'progress':
log(`Progress: ${event.message}`);
+7 -1
View File
@@ -171,7 +171,13 @@ CREATE TABLE IF NOT EXISTS messages_in (
platform_id TEXT,
channel_type TEXT,
thread_id TEXT,
content TEXT NOT NULL
content TEXT NOT NULL,
-- For agent-to-agent inbound rows: the source session that emitted the
-- triggering outbound. Used as a return path when the target replies —
-- the reply routes back to this exact session, not to the source agent
-- group's "newest" session. NULL on channel-side inbound and on a2a rows
-- written before this column existed.
source_session_id TEXT
);
CREATE INDEX IF NOT EXISTS idx_messages_in_series ON messages_in(series_id);
+37 -1
View File
@@ -10,7 +10,7 @@ import fs from 'fs';
import path from 'path';
import { describe, it, expect, afterEach } from 'vitest';
import { migrateMessagesInTable } from './session-db.js';
import { getInboundSourceSessionId, migrateMessagesInTable } from './session-db.js';
const TEST_DIR = '/tmp/nanoclaw-session-db-test';
const DB_PATH = path.join(TEST_DIR, 'inbound.db');
@@ -55,4 +55,40 @@ describe('migrateMessagesInTable', () => {
expect(row.series_id).toBe('legacy-1');
db.close();
});
it('adds source_session_id on a legacy DB, leaves existing rows NULL, is idempotent', () => {
if (fs.existsSync(TEST_DIR)) fs.rmSync(TEST_DIR, { recursive: true });
fs.mkdirSync(TEST_DIR, { recursive: true });
const db = new Database(DB_PATH);
db.exec(`
CREATE TABLE messages_in (
id TEXT PRIMARY KEY,
seq INTEGER UNIQUE,
kind TEXT NOT NULL,
timestamp TEXT NOT NULL,
status TEXT DEFAULT 'pending',
process_after TEXT,
recurrence TEXT,
tries INTEGER DEFAULT 0,
platform_id TEXT,
channel_type TEXT,
thread_id TEXT,
content TEXT NOT NULL
);
`);
db.prepare(
"INSERT INTO messages_in (id, seq, kind, timestamp, status, content) VALUES (?, ?, 'chat', datetime('now'), 'pending', '{}')",
).run('legacy-2', 2);
migrateMessagesInTable(db);
migrateMessagesInTable(db); // idempotent
const cols = (db.prepare("PRAGMA table_info('messages_in')").all() as Array<{ name: string }>).map((c) => c.name);
expect(cols).toContain('source_session_id');
expect(getInboundSourceSessionId(db, 'legacy-2')).toBeNull();
expect(getInboundSourceSessionId(db, 'does-not-exist')).toBeNull();
db.close();
});
});
+53 -2
View File
@@ -108,14 +108,21 @@ export function insertMessage(
* Host countDueMessages gates on this; container reads everything.
*/
trigger?: 0 | 1;
/**
* For agent-to-agent inbound: the source session id that emitted the
* outbound message which became this inbound row. Used as the return
* path for the target's reply. NULL on channel-side inbound.
*/
sourceSessionId?: string | null;
},
): void {
db.prepare(
`INSERT INTO messages_in (id, seq, kind, timestamp, status, platform_id, channel_type, thread_id, content, process_after, recurrence, series_id, trigger)
VALUES (@id, @seq, @kind, @timestamp, 'pending', @platformId, @channelType, @threadId, @content, @processAfter, @recurrence, @id, @trigger)`,
`INSERT INTO messages_in (id, seq, kind, timestamp, status, platform_id, channel_type, thread_id, content, process_after, recurrence, series_id, trigger, source_session_id)
VALUES (@id, @seq, @kind, @timestamp, 'pending', @platformId, @channelType, @threadId, @content, @processAfter, @recurrence, @id, @trigger, @sourceSessionId)`,
).run({
...message,
trigger: message.trigger ?? 1,
sourceSessionId: message.sourceSessionId ?? null,
seq: nextEvenSeq(db),
});
}
@@ -239,6 +246,7 @@ export interface OutboundMessage {
channel_type: string | null;
thread_id: string | null;
content: string;
in_reply_to: string | null;
}
export function getDueOutboundMessages(db: Database.Database): OutboundMessage[] {
@@ -305,4 +313,47 @@ export function migrateMessagesInTable(db: Database.Database): void {
// the agent" semantics, so backfill 1 and default 1 for new inserts.
db.prepare('ALTER TABLE messages_in ADD COLUMN trigger INTEGER NOT NULL DEFAULT 1').run();
}
if (!cols.has('source_session_id')) {
// For agent-to-agent return-path routing. NULL on existing rows is fine —
// their replies fall back to the legacy "newest active session" lookup.
db.prepare('ALTER TABLE messages_in ADD COLUMN source_session_id TEXT').run();
}
}
/**
* Look up an inbound row's source_session_id by its message id. Returns null
* if the row doesn't exist or the column is NULL (channel inbound or
* pre-migration a2a inbound). Used by a2a routing to route replies back to
* the originating session.
*/
export function getInboundSourceSessionId(db: Database.Database, messageId: string): string | null {
const row = db.prepare('SELECT source_session_id FROM messages_in WHERE id = ?').get(messageId) as
| { source_session_id: string | null }
| undefined;
return row?.source_session_id ?? null;
}
/**
* Find the source_session_id of the most recent a2a inbound row from a
* specific peer (by agent group id). Used as a peer-affinity fallback in
* a2a routing when an outbound reply has no `in_reply_to` (e.g. the
* container's send_message MCP tool path didn't thread the batch's
* in_reply_to through).
*
* Heuristic: "the last time this peer talked to me, which session was it?"
* Returns null when no prior a2a inbound from that peer carries a
* non-null source_session_id (typical for pre-migration installs).
*/
export function getMostRecentPeerSourceSessionId(db: Database.Database, peerAgentGroupId: string): string | null {
const row = db
.prepare(
`SELECT source_session_id FROM messages_in
WHERE channel_type = 'agent'
AND platform_id = ?
AND source_session_id IS NOT NULL
ORDER BY seq DESC
LIMIT 1`,
)
.get(peerAgentGroupId) as { source_session_id: string | null } | undefined;
return row?.source_session_id ?? null;
}
+1
View File
@@ -239,6 +239,7 @@ async function deliverMessage(
channel_type: string | null;
thread_id: string | null;
content: string;
in_reply_to: string | null;
},
session: Session,
inDb: Database.Database,
+6 -8
View File
@@ -969,12 +969,10 @@ describe('agent-to-agent routing', () => {
expect(JSON.parse(rows[0].content).text).toBe('research this');
});
it('BUG: A2A return path resolves to wrong session when multiple channel sessions exist (#2332)', async () => {
it('A2A return path routes to originating session, not newest (#2332)', async () => {
// PA has Slack session, then gets wired to Discord (newer session).
// Researcher responds to PA. routeAgentMessage calls
// resolveSession('ag-pa', null, null, 'agent-shared') which calls
// findSessionByAgentGroup — picks newest (Discord) instead of the
// Slack session that originated the A2A call.
// Researcher responds to PA. With the return-path fix, the reply
// routes back to the Slack session (originator) not Discord (newest).
const { routeAgentMessage } = await import('./modules/agent-to-agent/agent-route.js');
const { session: paSlackSession } = resolveSession('ag-pa', 'mg-slack', null, 'shared');
@@ -1013,9 +1011,9 @@ describe('agent-to-agent routing', () => {
const discordA2a = discordDb.prepare("SELECT * FROM messages_in WHERE channel_type = 'agent'").all();
discordDb.close();
// Document the bug: response lands in Discord (newest) not Slack (origin)
expect(discordA2a).toHaveLength(1); // BUG: should be 0
expect(slackA2a).toHaveLength(0); // BUG: should be 1
// Fixed: response lands in Slack (origin) not Discord (newest)
expect(slackA2a).toHaveLength(1);
expect(discordA2a).toHaveLength(0);
});
it('BUG: A2A-only session gets null session_routing (#2332)', async () => {
+240 -10
View File
@@ -1,20 +1,53 @@
import { describe, expect, it } from 'vitest';
import Database from 'better-sqlite3';
import fs from 'fs';
import { describe, expect, it, beforeEach, afterEach, vi } from 'vitest';
import { isSafeAttachmentName } from './agent-route.js';
import { isSafeAttachmentName, routeAgentMessage } from './agent-route.js';
import { createDestination } from './db/agent-destinations.js';
import { initTestDb, closeDb, runMigrations, createAgentGroup } from '../../db/index.js';
import { createSession } from '../../db/sessions.js';
import { initSessionFolder, inboundDbPath } from '../../session-manager.js';
import type { Session } from '../../types.js';
vi.mock('../../container-runner.js', () => ({
wakeContainer: vi.fn().mockResolvedValue(undefined),
isContainerRunning: vi.fn().mockReturnValue(false),
getActiveContainerCount: vi.fn().mockReturnValue(0),
killContainer: vi.fn(),
}));
vi.mock('../../config.js', async () => {
const actual = await vi.importActual('../../config.js');
return { ...actual, DATA_DIR: '/tmp/nanoclaw-test-a2a-route' };
});
const TEST_DIR = '/tmp/nanoclaw-test-a2a-route';
function now(): string {
return new Date().toISOString();
}
function readInbound(agentGroupId: string, sessionId: string) {
const db = new Database(inboundDbPath(agentGroupId, sessionId), { readonly: true });
const rows = db
.prepare('SELECT id, platform_id, channel_type, content, source_session_id FROM messages_in ORDER BY seq')
.all() as Array<{
id: string;
platform_id: string | null;
channel_type: string | null;
content: string;
source_session_id: string | null;
}>;
db.close();
return rows;
}
/**
* `forwardAttachedFiles` has a filesystem side that's awkward to unit-test
* without mocking DATA_DIR. The guarantee worth pinning is that the
* filename validator rejects everything that could escape the inbox dir —
* `forwardAttachedFiles` runs this guard before any I/O, so traversal is
* impossible as long as this matrix holds.
*/
describe('isSafeAttachmentName', () => {
it('accepts plain filenames', () => {
expect(isSafeAttachmentName('baby-duck.png')).toBe(true);
expect(isSafeAttachmentName('file with spaces.pdf')).toBe(true);
expect(isSafeAttachmentName('report.v2.docx')).toBe(true);
expect(isSafeAttachmentName('.hidden')).toBe(true); // leading dot is fine, just not `.` / `..`
expect(isSafeAttachmentName('.hidden')).toBe(true);
});
it('rejects empty / sentinel values', () => {
@@ -44,3 +77,200 @@ describe('isSafeAttachmentName', () => {
expect(isSafeAttachmentName(undefined as unknown as string)).toBe(false);
});
});
/**
* Return-path routing: when an a2a reply targets an agent group with multiple
* sessions, it must land in the *originating* session — not the newest one.
*
* Setup: agent A has two active sessions S1 (older) + S2 (newer).
* Agent B is the peer A talks to. Bidirectional destinations wired.
*/
describe('routeAgentMessage return-path', () => {
const A = 'ag-A';
const B = 'ag-B';
let S1: Session;
let S2: Session;
let SB: Session;
beforeEach(() => {
if (fs.existsSync(TEST_DIR)) fs.rmSync(TEST_DIR, { recursive: true });
fs.mkdirSync(TEST_DIR, { recursive: true });
const db = initTestDb();
runMigrations(db);
createAgentGroup({ id: A, name: 'A', folder: 'a', agent_provider: null, created_at: now() });
createAgentGroup({ id: B, name: 'B', folder: 'b', agent_provider: null, created_at: now() });
// S1 (older), S2 (newer) — both active sessions on A.
S1 = {
id: 'sess-A-old',
agent_group_id: A,
messaging_group_id: null,
thread_id: null,
agent_provider: null,
status: 'active',
container_status: 'stopped',
last_active: null,
created_at: '2026-01-01T00:00:00.000Z',
};
S2 = {
id: 'sess-A-new',
agent_group_id: A,
messaging_group_id: null,
thread_id: null,
agent_provider: null,
status: 'active',
container_status: 'stopped',
last_active: null,
created_at: '2026-02-01T00:00:00.000Z',
};
SB = {
id: 'sess-B',
agent_group_id: B,
messaging_group_id: null,
thread_id: null,
agent_provider: null,
status: 'active',
container_status: 'stopped',
last_active: null,
created_at: '2026-01-15T00:00:00.000Z',
};
createSession(S1);
createSession(S2);
createSession(SB);
initSessionFolder(A, S1.id);
initSessionFolder(A, S2.id);
initSessionFolder(B, SB.id);
createDestination({
agent_group_id: A,
local_name: 'b',
target_type: 'agent',
target_id: B,
created_at: now(),
});
createDestination({
agent_group_id: B,
local_name: 'a',
target_type: 'agent',
target_id: A,
created_at: now(),
});
});
afterEach(() => {
closeDb();
if (fs.existsSync(TEST_DIR)) fs.rmSync(TEST_DIR, { recursive: true });
});
it('forward direction: stamps source_session_id on the target inbound row', async () => {
// A.S1 emits an outbound a2a to B.
await routeAgentMessage(
{
id: 'msg-from-A-S1',
platform_id: B,
content: JSON.stringify({ text: 'hello B' }),
in_reply_to: null,
},
S1,
);
const bRows = readInbound(B, SB.id);
expect(bRows).toHaveLength(1);
expect(bRows[0].platform_id).toBe(A);
expect(bRows[0].source_session_id).toBe(S1.id); // <- the return address
});
it('reply direction: routes back to the originating session, not the newest', async () => {
// A.S1 sends to B.
await routeAgentMessage(
{
id: 'msg-from-A-S1',
platform_id: B,
content: JSON.stringify({ text: 'ping' }),
in_reply_to: null,
},
S1,
);
// Capture the synthetic id the host stamped on B's inbound — that's what
// B's container would reference as `in_reply_to` when replying.
const bRows = readInbound(B, SB.id);
const yId = bRows[0].id;
// B replies to that message.
await routeAgentMessage(
{
id: 'msg-from-B',
platform_id: A,
content: JSON.stringify({ text: 'pong' }),
in_reply_to: yId,
},
SB,
);
const s1Rows = readInbound(A, S1.id);
const s2Rows = readInbound(A, S2.id);
// The reply lands in S1 (originator) even though S2 is newer.
expect(s1Rows).toHaveLength(1);
expect(s1Rows[0].platform_id).toBe(B);
expect(JSON.parse(s1Rows[0].content).text).toBe('pong');
expect(s2Rows).toHaveLength(0);
});
it('fallback: a2a with no in_reply_to falls through to newest-session lookup', async () => {
// No prior conversation. B initiates an a2a to A out of the blue.
await routeAgentMessage(
{
id: 'msg-from-B-fresh',
platform_id: A,
content: JSON.stringify({ text: 'unsolicited' }),
in_reply_to: null,
},
SB,
);
// Newest session wins (current heuristic, preserved).
const s1Rows = readInbound(A, S1.id);
const s2Rows = readInbound(A, S2.id);
expect(s1Rows).toHaveLength(0);
expect(s2Rows).toHaveLength(1);
});
it('peer-affinity fallback: with no in_reply_to, routes to most recent peer-source session', async () => {
// A.S1 sends to B (establishing affinity: B's last contact from A was via S1).
await routeAgentMessage(
{
id: 'msg-from-A-S1-pre',
platform_id: B,
content: JSON.stringify({ text: 'context-establishing' }),
in_reply_to: null,
},
S1,
);
// B sends a follow-up but its container forgot to set in_reply_to (e.g.
// emitted via an MCP tool path that doesn't thread the batch's in_reply_to
// through). The host should still route this to S1 because S1 is the
// session most recently in conversation with B — not the chronologically
// newest session of A.
await routeAgentMessage(
{
id: 'msg-from-B-followup',
platform_id: A,
content: JSON.stringify({ text: 'standing by' }),
in_reply_to: null,
},
SB,
);
const s1Rows = readInbound(A, S1.id);
const s2Rows = readInbound(A, S2.id);
// Affinity wins: reply to S1, not the newer S2.
expect(s1Rows).toHaveLength(1);
expect(JSON.parse(s1Rows[0].content).text).toBe('standing by');
expect(s2Rows).toHaveLength(0);
});
});
+59 -2
View File
@@ -23,10 +23,11 @@ import path from 'path';
import { isSafeAttachmentName } from '../../attachment-safety.js';
import { getAgentGroup } from '../../db/agent-groups.js';
import { getInboundSourceSessionId, getMostRecentPeerSourceSessionId } from '../../db/session-db.js';
import { getSession } from '../../db/sessions.js';
import { wakeContainer } from '../../container-runner.js';
import { log } from '../../log.js';
import { resolveSession, sessionDir, writeSessionMessage } from '../../session-manager.js';
import { openInboundDb, resolveSession, sessionDir, writeSessionMessage } from '../../session-manager.js';
import type { Session } from '../../types.js';
import { hasDestination } from './db/agent-destinations.js';
@@ -101,6 +102,61 @@ export interface RoutableAgentMessage {
id: string;
platform_id: string | null;
content: string;
/**
* For replies, the id of the inbound message being replied to. The
* container's formatter sets this from the first inbound in the batch
* (`container/agent-runner/src/formatter.ts`). Used here to route the
* reply back to the originating session — see `resolveTargetSession`.
*/
in_reply_to: string | null;
}
/**
* Pick which session of `targetAgentGroupId` should receive this a2a message.
*
* Three layers, highest-fidelity first:
*
* 1. **Direct return-path** (in_reply_to lookup): if the message is a reply
* (`in_reply_to` set), open the source agent's inbound DB and read the
* triggering row's `source_session_id`. That column was stamped when the
* original outbound was routed — it's the session that started the
* conversation, and replies should land there even when the target has
* multiple active sessions.
*
* 2. **Peer-affinity fallback**: if (1) misses (in_reply_to is null or the
* referenced row isn't an a2a inbound), look up the most recent a2a
* inbound *from the target agent group* in source's inbound and use its
* `source_session_id`. The intuition: the last time this peer talked to
* me, which target session was driving? Route the reply there, since
* that's the session most plausibly in active conversation.
*
* 3. **Newest active session**: legacy heuristic. Used when no prior a2a
* has been recorded with `source_session_id` (e.g. fresh installs,
* pre-migration data).
*/
function resolveTargetSession(msg: RoutableAgentMessage, sourceSession: Session, targetAgentGroupId: string): Session {
const srcDb = openInboundDb(sourceSession.agent_group_id, sourceSession.id);
let originSessionId: string | null = null;
try {
if (msg.in_reply_to) {
originSessionId = getInboundSourceSessionId(srcDb, msg.in_reply_to);
}
if (!originSessionId) {
// Peer-affinity fallback — covers the case where the container's
// outbound write didn't carry in_reply_to (e.g. legacy MCP send_message
// path, container running pre-fix code).
originSessionId = getMostRecentPeerSourceSessionId(srcDb, targetAgentGroupId);
}
} finally {
srcDb.close();
}
if (originSessionId) {
const candidate = getSession(originSessionId);
if (candidate && candidate.agent_group_id === targetAgentGroupId && candidate.status === 'active') {
return candidate;
}
}
return resolveSession(targetAgentGroupId, null, null, 'agent-shared').session;
}
export async function routeAgentMessage(msg: RoutableAgentMessage, session: Session): Promise<void> {
@@ -119,7 +175,7 @@ export async function routeAgentMessage(msg: RoutableAgentMessage, session: Sess
if (!getAgentGroup(targetAgentGroupId)) {
throw new Error(`target agent group ${targetAgentGroupId} not found for message ${msg.id}`);
}
const { session: targetSession } = resolveSession(targetAgentGroupId, null, null, 'agent-shared');
const targetSession = resolveTargetSession(msg, session, targetAgentGroupId);
const a2aMsgId = `a2a-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
// If the source message references files (via `send_file`), forward the
@@ -137,6 +193,7 @@ export async function routeAgentMessage(msg: RoutableAgentMessage, session: Sess
channelType: 'agent',
threadId: null,
content: forwardedContent,
sourceSessionId: session.id,
});
log.info('Agent message routed', {
from: session.agent_group_id,
+7
View File
@@ -210,6 +210,12 @@ export function writeSessionMessage(
* a trigger-1 message does arrive.
*/
trigger?: 0 | 1;
/**
* For agent-to-agent inbound: the source session id that emitted the
* outbound message which became this inbound row. Used as the return
* path so the target's reply routes back to that exact session.
*/
sourceSessionId?: string | null;
},
): void {
// Extract base64 attachment data, save to inbox, replace with file paths
@@ -228,6 +234,7 @@ export function writeSessionMessage(
processAfter: message.processAfter ?? null,
recurrence: message.recurrence ?? null,
trigger: message.trigger ?? 1,
sourceSessionId: message.sourceSessionId ?? null,
});
} finally {
db.close();