mirror of
https://github.com/qwibitai/nanoclaw.git
synced 2026-06-12 18:11:51 +08:00
fix(agent-to-agent): thread in_reply_to through MCP send_message + add peer-affinity fallback
The earlier return-path fix relied on outbound rows carrying `in_reply_to`
so the host could correlate replies back to the originating session. The
container's XML `<message to="...">` path set this correctly, but the MCP
`send_message` and `send_file` tools wrote outbound without it — so any
reply emitted via those tool paths fell through to the legacy "newest
session" tie-break, defeating the fix in practice.
Two changes:
Container side
- New `current-batch.ts` exposes the active batch's `inReplyTo` to MCP
tools that don't sit on poll-loop's call stack.
- `poll-loop.ts` publishes it before invoking the provider and clears it
in a `finally` block so subsequent ad-hoc invocations don't inherit
stale state.
- `mcp-tools/core.ts` reads it and stamps `in_reply_to` on the outbound
row in both `send_message` and `send_file`.
Host side
- New `getMostRecentPeerSourceSessionId` in session-db.ts.
- `resolveTargetSession` in agent-route.ts now tries three layers:
direct in_reply_to lookup → peer-affinity ("most recent a2a from this
peer") → legacy newest-session. Peer affinity covers the gap when
containers running pre-fix code or unusual call paths still emit
outbound without `in_reply_to`.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,28 @@
|
||||
/**
|
||||
* Per-batch context the poll loop publishes for downstream consumers
|
||||
* (MCP tools, etc.) that don't sit on the poll-loop's call stack.
|
||||
*
|
||||
* Today the only field is `inReplyTo` — the id of the first inbound
|
||||
* message in the batch the agent is currently processing. MCP tools like
|
||||
* `send_message` and `send_file` read this and stamp it onto the outbound
|
||||
* row so the host's a2a return-path routing can correlate replies back to
|
||||
* the originating session.
|
||||
*
|
||||
* This is module-level state on purpose: the agent-runner is single-process
|
||||
* and processes one batch at a time. Poll-loop calls `setCurrentInReplyTo`
|
||||
* before invoking the provider and `clearCurrentInReplyTo` after the batch
|
||||
* completes (or errors out).
|
||||
*/
|
||||
let currentInReplyTo: string | null = null;
|
||||
|
||||
export function setCurrentInReplyTo(id: string | null): void {
|
||||
currentInReplyTo = id;
|
||||
}
|
||||
|
||||
export function clearCurrentInReplyTo(): void {
|
||||
currentInReplyTo = null;
|
||||
}
|
||||
|
||||
export function getCurrentInReplyTo(): string | null {
|
||||
return currentInReplyTo;
|
||||
}
|
||||
@@ -0,0 +1,50 @@
|
||||
/**
|
||||
* Tests for the core MCP tools' interaction with the per-batch routing
|
||||
* context. The agent-runner sets a current `inReplyTo` at the top of each
|
||||
* batch in poll-loop, and outbound writes from MCP tools (send_message,
|
||||
* send_file) must pick it up so a2a return-path routing on the host can
|
||||
* correlate replies back to the originating session.
|
||||
*/
|
||||
import { describe, it, expect, beforeEach, afterEach } from 'bun:test';
|
||||
|
||||
import { initTestSessionDb, closeSessionDb, getInboundDb } from '../db/connection.js';
|
||||
import { getUndeliveredMessages } from '../db/messages-out.js';
|
||||
import { setCurrentInReplyTo, clearCurrentInReplyTo } from '../current-batch.js';
|
||||
import { sendMessage } from './core.js';
|
||||
|
||||
beforeEach(() => {
|
||||
initTestSessionDb();
|
||||
// Seed a peer agent destination
|
||||
getInboundDb()
|
||||
.prepare(
|
||||
`INSERT INTO destinations (name, display_name, type, channel_type, platform_id, agent_group_id)
|
||||
VALUES ('peer', 'Peer', 'agent', NULL, NULL, 'ag-peer')`,
|
||||
)
|
||||
.run();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
clearCurrentInReplyTo();
|
||||
closeSessionDb();
|
||||
});
|
||||
|
||||
describe('send_message MCP tool — in_reply_to plumbing', () => {
|
||||
it('stamps current batch in_reply_to on outbound rows', async () => {
|
||||
setCurrentInReplyTo('inbound-msg-1');
|
||||
|
||||
await sendMessage.handler({ to: 'peer', text: 'hello' });
|
||||
|
||||
const out = getUndeliveredMessages();
|
||||
expect(out).toHaveLength(1);
|
||||
expect(out[0].in_reply_to).toBe('inbound-msg-1');
|
||||
});
|
||||
|
||||
it('writes null when no batch is active', async () => {
|
||||
// No setCurrentInReplyTo before this call — simulates ad-hoc / out-of-batch invocation.
|
||||
await sendMessage.handler({ to: 'peer', text: 'hello' });
|
||||
|
||||
const out = getUndeliveredMessages();
|
||||
expect(out).toHaveLength(1);
|
||||
expect(out[0].in_reply_to).toBeNull();
|
||||
});
|
||||
});
|
||||
@@ -9,6 +9,7 @@
|
||||
import fs from 'fs';
|
||||
import path from 'path';
|
||||
|
||||
import { getCurrentInReplyTo } from '../current-batch.js';
|
||||
import { findByName, getAllDestinations } from '../destinations.js';
|
||||
import { getMessageIdBySeq, getRoutingBySeq, writeMessageOut } from '../db/messages-out.js';
|
||||
import { getSessionRouting } from '../db/session-routing.js';
|
||||
@@ -50,9 +51,7 @@ function destinationList(): string {
|
||||
*/
|
||||
function resolveRouting(
|
||||
to: string | undefined,
|
||||
):
|
||||
| { channel_type: string; platform_id: string; thread_id: string | null; resolvedName: string }
|
||||
| { error: string } {
|
||||
): { channel_type: string; platform_id: string; thread_id: string | null; resolvedName: string } | { error: string } {
|
||||
if (!to) {
|
||||
// Default: reply to whatever thread/channel this session is bound to.
|
||||
const session = getSessionRouting();
|
||||
@@ -82,9 +81,7 @@ function resolveRouting(
|
||||
// preserve the thread_id so replies land in the correct thread.
|
||||
const session = getSessionRouting();
|
||||
const threadId =
|
||||
session.channel_type === dest.channelType && session.platform_id === dest.platformId
|
||||
? session.thread_id
|
||||
: null;
|
||||
session.channel_type === dest.channelType && session.platform_id === dest.platformId ? session.thread_id : null;
|
||||
return {
|
||||
channel_type: dest.channelType!,
|
||||
platform_id: dest.platformId!,
|
||||
@@ -98,12 +95,14 @@ function resolveRouting(
|
||||
export const sendMessage: McpToolDefinition = {
|
||||
tool: {
|
||||
name: 'send_message',
|
||||
description:
|
||||
'Send a message to a named destination. If you have only one destination, you can omit `to`.',
|
||||
description: 'Send a message to a named destination. If you have only one destination, you can omit `to`.',
|
||||
inputSchema: {
|
||||
type: 'object' as const,
|
||||
properties: {
|
||||
to: { type: 'string', description: 'Destination name (e.g., "family", "worker-1"). Optional if you have only one destination.' },
|
||||
to: {
|
||||
type: 'string',
|
||||
description: 'Destination name (e.g., "family", "worker-1"). Optional if you have only one destination.',
|
||||
},
|
||||
text: { type: 'string', description: 'Message content' },
|
||||
},
|
||||
required: ['text'],
|
||||
@@ -119,6 +118,7 @@ export const sendMessage: McpToolDefinition = {
|
||||
const id = generateId();
|
||||
const seq = writeMessageOut({
|
||||
id,
|
||||
in_reply_to: getCurrentInReplyTo(),
|
||||
kind: 'chat',
|
||||
platform_id: routing.platform_id,
|
||||
channel_type: routing.channel_type,
|
||||
@@ -165,6 +165,7 @@ export const sendFile: McpToolDefinition = {
|
||||
|
||||
writeMessageOut({
|
||||
id,
|
||||
in_reply_to: getCurrentInReplyTo(),
|
||||
kind: 'chat',
|
||||
platform_id: routing.platform_id,
|
||||
channel_type: routing.channel_type,
|
||||
|
||||
@@ -2,12 +2,17 @@ import { findByName, getAllDestinations, type DestinationEntry } from './destina
|
||||
import { getPendingMessages, markProcessing, markCompleted, type MessageInRow } from './db/messages-in.js';
|
||||
import { writeMessageOut } from './db/messages-out.js';
|
||||
import { getInboundDb, touchHeartbeat, clearStaleProcessingAcks } from './db/connection.js';
|
||||
import { clearContinuation, migrateLegacyContinuation, setContinuation } from './db/session-state.js';
|
||||
import { clearCurrentInReplyTo, setCurrentInReplyTo } from './current-batch.js';
|
||||
import {
|
||||
clearContinuation,
|
||||
migrateLegacyContinuation,
|
||||
setContinuation,
|
||||
} from './db/session-state.js';
|
||||
import { formatMessages, extractRouting, categorizeMessage, isClearCommand, isRunnerCommand, stripInternalTags, type RoutingContext } from './formatter.js';
|
||||
formatMessages,
|
||||
extractRouting,
|
||||
categorizeMessage,
|
||||
isClearCommand,
|
||||
isRunnerCommand,
|
||||
stripInternalTags,
|
||||
type RoutingContext,
|
||||
} from './formatter.js';
|
||||
import type { AgentProvider, AgentQuery, ProviderEvent } from './providers/types.js';
|
||||
|
||||
const POLL_INTERVAL_MS = 1000;
|
||||
@@ -170,6 +175,9 @@ export async function runPollLoop(config: PollLoopConfig): Promise<void> {
|
||||
// Process the query while concurrently polling for new messages
|
||||
const skippedSet = new Set(skipped);
|
||||
const processingIds = ids.filter((id) => !commandIds.includes(id) && !skippedSet.has(id));
|
||||
// Publish the batch's in_reply_to so MCP tools (send_message, send_file)
|
||||
// can stamp it on outbound rows — needed for a2a return-path routing.
|
||||
setCurrentInReplyTo(routing.inReplyTo);
|
||||
try {
|
||||
const result = await processQuery(query, routing, processingIds, config.providerName);
|
||||
if (result.continuation && result.continuation !== continuation) {
|
||||
@@ -198,6 +206,8 @@ export async function runPollLoop(config: PollLoopConfig): Promise<void> {
|
||||
thread_id: routing.threadId,
|
||||
content: JSON.stringify({ text: `Error: ${errMsg}` }),
|
||||
});
|
||||
} finally {
|
||||
clearCurrentInReplyTo();
|
||||
}
|
||||
|
||||
// Ensure completed even if processQuery ended without a result event
|
||||
@@ -402,7 +412,9 @@ function handleEvent(event: ProviderEvent, _routing: RoutingContext): void {
|
||||
log(`Result: ${event.text ? event.text.slice(0, 200) : '(empty)'}`);
|
||||
break;
|
||||
case 'error':
|
||||
log(`Error: ${event.message} (retryable: ${event.retryable}${event.classification ? `, ${event.classification}` : ''})`);
|
||||
log(
|
||||
`Error: ${event.message} (retryable: ${event.retryable}${event.classification ? `, ${event.classification}` : ''})`,
|
||||
);
|
||||
break;
|
||||
case 'progress':
|
||||
log(`Progress: ${event.message}`);
|
||||
|
||||
@@ -332,3 +332,28 @@ export function getInboundSourceSessionId(db: Database.Database, messageId: stri
|
||||
| undefined;
|
||||
return row?.source_session_id ?? null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the source_session_id of the most recent a2a inbound row from a
|
||||
* specific peer (by agent group id). Used as a peer-affinity fallback in
|
||||
* a2a routing when an outbound reply has no `in_reply_to` (e.g. the
|
||||
* container's send_message MCP tool path didn't thread the batch's
|
||||
* in_reply_to through).
|
||||
*
|
||||
* Heuristic: "the last time this peer talked to me, which session was it?"
|
||||
* Returns null when no prior a2a inbound from that peer carries a
|
||||
* non-null source_session_id (typical for pre-migration installs).
|
||||
*/
|
||||
export function getMostRecentPeerSourceSessionId(db: Database.Database, peerAgentGroupId: string): string | null {
|
||||
const row = db
|
||||
.prepare(
|
||||
`SELECT source_session_id FROM messages_in
|
||||
WHERE channel_type = 'agent'
|
||||
AND platform_id = ?
|
||||
AND source_session_id IS NOT NULL
|
||||
ORDER BY seq DESC
|
||||
LIMIT 1`,
|
||||
)
|
||||
.get(peerAgentGroupId) as { source_session_id: string | null } | undefined;
|
||||
return row?.source_session_id ?? null;
|
||||
}
|
||||
|
||||
@@ -238,4 +238,39 @@ describe('routeAgentMessage return-path', () => {
|
||||
expect(s1Rows).toHaveLength(0);
|
||||
expect(s2Rows).toHaveLength(1);
|
||||
});
|
||||
|
||||
it('peer-affinity fallback: with no in_reply_to, routes to most recent peer-source session', async () => {
|
||||
// A.S1 sends to B (establishing affinity: B's last contact from A was via S1).
|
||||
await routeAgentMessage(
|
||||
{
|
||||
id: 'msg-from-A-S1-pre',
|
||||
platform_id: B,
|
||||
content: JSON.stringify({ text: 'context-establishing' }),
|
||||
in_reply_to: null,
|
||||
},
|
||||
S1,
|
||||
);
|
||||
|
||||
// B sends a follow-up but its container forgot to set in_reply_to (e.g.
|
||||
// emitted via an MCP tool path that doesn't thread the batch's in_reply_to
|
||||
// through). The host should still route this to S1 because S1 is the
|
||||
// session most recently in conversation with B — not the chronologically
|
||||
// newest session of A.
|
||||
await routeAgentMessage(
|
||||
{
|
||||
id: 'msg-from-B-followup',
|
||||
platform_id: A,
|
||||
content: JSON.stringify({ text: 'standing by' }),
|
||||
in_reply_to: null,
|
||||
},
|
||||
SB,
|
||||
);
|
||||
|
||||
const s1Rows = readInbound(A, S1.id);
|
||||
const s2Rows = readInbound(A, S2.id);
|
||||
// Affinity wins: reply to S1, not the newer S2.
|
||||
expect(s1Rows).toHaveLength(1);
|
||||
expect(JSON.parse(s1Rows[0].content).text).toBe('standing by');
|
||||
expect(s2Rows).toHaveLength(0);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -23,7 +23,7 @@ import path from 'path';
|
||||
|
||||
import { isSafeAttachmentName } from '../../attachment-safety.js';
|
||||
import { getAgentGroup } from '../../db/agent-groups.js';
|
||||
import { getInboundSourceSessionId } from '../../db/session-db.js';
|
||||
import { getInboundSourceSessionId, getMostRecentPeerSourceSessionId } from '../../db/session-db.js';
|
||||
import { getSession } from '../../db/sessions.js';
|
||||
import { wakeContainer } from '../../container-runner.js';
|
||||
import { log } from '../../log.js';
|
||||
@@ -114,34 +114,46 @@ export interface RoutableAgentMessage {
|
||||
/**
|
||||
* Pick which session of `targetAgentGroupId` should receive this a2a message.
|
||||
*
|
||||
* Return-path lookup: if the message is a reply (`in_reply_to` set), open the
|
||||
* source agent's inbound DB and read the original triggering row's
|
||||
* `source_session_id`. That column was stamped when the original outbound was
|
||||
* routed — it's the session that started the conversation, and replies should
|
||||
* land there even when the target agent group has multiple active sessions.
|
||||
* Three layers, highest-fidelity first:
|
||||
*
|
||||
* Falls back to `resolveSession(..., 'agent-shared')` (which selects the
|
||||
* newest active session) when:
|
||||
* - the message has no `in_reply_to` (fresh-initiated a2a), OR
|
||||
* - the referenced row isn't in source's inbound (cross-batch reference), OR
|
||||
* - the referenced row's source_session_id is NULL (channel inbound or
|
||||
* pre-migration row), OR
|
||||
* - the recovered session no longer exists / belongs to a different agent.
|
||||
* 1. **Direct return-path** (in_reply_to lookup): if the message is a reply
|
||||
* (`in_reply_to` set), open the source agent's inbound DB and read the
|
||||
* triggering row's `source_session_id`. That column was stamped when the
|
||||
* original outbound was routed — it's the session that started the
|
||||
* conversation, and replies should land there even when the target has
|
||||
* multiple active sessions.
|
||||
*
|
||||
* 2. **Peer-affinity fallback**: if (1) misses (in_reply_to is null or the
|
||||
* referenced row isn't an a2a inbound), look up the most recent a2a
|
||||
* inbound *from the target agent group* in source's inbound and use its
|
||||
* `source_session_id`. The intuition: the last time this peer talked to
|
||||
* me, which target session was driving? Route the reply there, since
|
||||
* that's the session most plausibly in active conversation.
|
||||
*
|
||||
* 3. **Newest active session**: legacy heuristic. Used when no prior a2a
|
||||
* has been recorded with `source_session_id` (e.g. fresh installs,
|
||||
* pre-migration data).
|
||||
*/
|
||||
function resolveTargetSession(msg: RoutableAgentMessage, sourceSession: Session, targetAgentGroupId: string): Session {
|
||||
if (msg.in_reply_to) {
|
||||
const srcDb = openInboundDb(sourceSession.agent_group_id, sourceSession.id);
|
||||
let originSessionId: string | null;
|
||||
try {
|
||||
const srcDb = openInboundDb(sourceSession.agent_group_id, sourceSession.id);
|
||||
let originSessionId: string | null = null;
|
||||
try {
|
||||
if (msg.in_reply_to) {
|
||||
originSessionId = getInboundSourceSessionId(srcDb, msg.in_reply_to);
|
||||
} finally {
|
||||
srcDb.close();
|
||||
}
|
||||
if (originSessionId) {
|
||||
const candidate = getSession(originSessionId);
|
||||
if (candidate && candidate.agent_group_id === targetAgentGroupId && candidate.status === 'active') {
|
||||
return candidate;
|
||||
}
|
||||
if (!originSessionId) {
|
||||
// Peer-affinity fallback — covers the case where the container's
|
||||
// outbound write didn't carry in_reply_to (e.g. legacy MCP send_message
|
||||
// path, container running pre-fix code).
|
||||
originSessionId = getMostRecentPeerSourceSessionId(srcDb, targetAgentGroupId);
|
||||
}
|
||||
} finally {
|
||||
srcDb.close();
|
||||
}
|
||||
if (originSessionId) {
|
||||
const candidate = getSession(originSessionId);
|
||||
if (candidate && candidate.agent_group_id === targetAgentGroupId && candidate.status === 'active') {
|
||||
return candidate;
|
||||
}
|
||||
}
|
||||
return resolveSession(targetAgentGroupId, null, null, 'agent-shared').session;
|
||||
|
||||
Reference in New Issue
Block a user