mirror of
https://github.com/qwibitai/nanoclaw.git
synced 2026-06-04 10:14:47 +08:00
fix(agent-runner): require explicit destination addressing, fix per-destination threading
The poll loop had a bare-text routing fallback in dispatchResultText: when the agent produced text without <message to="..."> wrapping, it would auto- route to the session's originating channel (via a frozen RoutingContext) or to the single configured destination. This caused three problems: 1. Routing drift: RoutingContext was extracted once from the initial batch and never refreshed. When the initial batch was a null-routed cron task and a real chat arrived mid-query, replies were silently dropped to scratchpad because the frozen routing had all-null fields. 2. Cross-channel thread bleed: sendToDestination applied a single routing.threadId to every outbound message regardless of destination. In agent-shared sessions (multiple channels sharing one session), one channel's thread ID was stamped onto messages to a different channel. 3. Inconsistent formatting: task, webhook, and system messages had no origin metadata in their formatted output, so the agent couldn't tell which destination they came from — even when the underlying messages_in rows carried routing fields. Changes: - Remove the bare-text routing fallbacks in dispatchResultText (both the routing-based and single-destination shortcuts). All agent output must be wrapped in <message to="name">...</message>. Bare text is scratchpad. - Update buildDestinationsSection() to require explicit wrapping for all groups, including single-destination. No more "no special wrapping needed" shortcut. - Resolve thread_id per-destination via resolveDestinationThread(), which queries messages_in for the most recent message matching the target channel+platform. Falls back to null (top-level channel message) when no prior inbound exists for that destination. - Extract originAttr() helper in formatter.ts and apply it to all message types. Tasks now render as <task from="dest" time="...">, webhooks as <webhook from="dest" source="..." event="...">, system responses as <system_response from="dest" ...>. The agent always sees where a message originated. - Add a PreCompact shell hook (compact-instructions.ts) that outputs custom compaction instructions, telling the compactor to preserve recent message XML structure and routing metadata in the summary. Wired via settings.json in the .claude-shared scaffold, with a migration path (ensurePreCompactHook) for existing groups. Relation to open PRs: - #2277 (mergeRouting) becomes unnecessary — the routing fallback it patches no longer exists. Can be closed. - #2327 (post-compaction destination reminder) is complementary — it handles the post-compaction push, this handles pre-compaction instructions. Both can merge independently. - #2328 (default routing instruction) is complementary — it adds "reply to the from= destination" guidance to the multi-destination section. Compatible with the unified instruction format here. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,34 @@
|
||||
/**
|
||||
* PreCompact hook script — outputs custom compaction instructions to stdout.
|
||||
*
|
||||
* Claude Code captures the stdout of PreCompact shell hooks and passes it
|
||||
* as `customInstructions` to the compaction prompt. This ensures the
|
||||
* compaction summary preserves message routing context that the agent needs
|
||||
* to correctly address responses.
|
||||
*
|
||||
* Invoked by the PreCompact hook in .claude-shared/settings.json:
|
||||
* "command": "bun /app/src/compact-instructions.ts"
|
||||
*/
|
||||
import { getAllDestinations } from './destinations.js';
|
||||
|
||||
const destinations = getAllDestinations();
|
||||
const names = destinations.map((d) => d.name);
|
||||
|
||||
const instructions = [
|
||||
'Preserve the following in the compaction summary:',
|
||||
'',
|
||||
'1. For recent messages, keep the full XML structure including all attributes:',
|
||||
' - <message from="..." sender="..." time="..."> for chat messages',
|
||||
' - <task from="..." time="..."> for scheduled tasks',
|
||||
' - <webhook from="..." source="..." event="..."> for webhooks',
|
||||
' The message content can be summarized if long, but the XML tags and attributes must remain.',
|
||||
'',
|
||||
'2. Preserve the chronological message/reply sequence of recent exchanges.',
|
||||
' The agent needs to see: who said what, in what order, and from which destination.',
|
||||
'',
|
||||
'3. The `from` attribute identifies which destination sent the message.',
|
||||
' The agent MUST wrap all responses in <message to="name">...</message> blocks.',
|
||||
` Available destinations: ${names.length > 0 ? names.map((n) => `\`${n}\``).join(', ') : '(none)'}`,
|
||||
];
|
||||
|
||||
console.log(instructions.join('\n'));
|
||||
@@ -102,28 +102,20 @@ function buildDestinationsSection(): string {
|
||||
].join('\n');
|
||||
}
|
||||
|
||||
// Single-destination shortcut: the agent just writes its response normally.
|
||||
const lines = ['## Sending messages', ''];
|
||||
if (all.length === 1) {
|
||||
const d = all[0];
|
||||
const label = d.displayName && d.displayName !== d.name ? ` (${d.displayName})` : '';
|
||||
return [
|
||||
'## Sending messages',
|
||||
'',
|
||||
`Your messages are delivered to \`${d.name}\`${label}. Just write your response directly — no special wrapping needed.`,
|
||||
'',
|
||||
'To mark something as scratchpad (logged but not sent), wrap it in `<internal>...</internal>`.',
|
||||
'',
|
||||
'To send a message mid-response (e.g., an acknowledgment before a long task), call the `send_message` MCP tool.',
|
||||
].join('\n');
|
||||
}
|
||||
|
||||
const lines = ['## Sending messages', '', 'You can send messages to the following destinations:', ''];
|
||||
for (const d of all) {
|
||||
const label = d.displayName && d.displayName !== d.name ? ` (${d.displayName})` : '';
|
||||
lines.push(`- \`${d.name}\`${label}`);
|
||||
lines.push(`Your destination is \`${d.name}\`${label}.`);
|
||||
} else {
|
||||
lines.push('You can send messages to the following destinations:', '');
|
||||
for (const d of all) {
|
||||
const label = d.displayName && d.displayName !== d.name ? ` (${d.displayName})` : '';
|
||||
lines.push(`- \`${d.name}\`${label}`);
|
||||
}
|
||||
}
|
||||
lines.push('');
|
||||
lines.push('To send a message, wrap it in a `<message to="name">...</message>` block.');
|
||||
lines.push('**Every response must be wrapped** in a `<message to="name">...</message>` block.');
|
||||
lines.push('You can include multiple `<message>` blocks in one response to send to multiple destinations.');
|
||||
lines.push('Text outside of `<message>` blocks is scratchpad — logged but not sent anywhere.');
|
||||
lines.push('Use `<internal>...</internal>` to make scratchpad intent explicit.');
|
||||
|
||||
@@ -177,40 +177,49 @@ function formatSingleChat(msg: MessageInRow): string {
|
||||
const replyPrefix = formatReplyContext(content.replyTo);
|
||||
const attachmentsSuffix = formatAttachments(content.attachments);
|
||||
|
||||
// Look up the destination name for the origin (reverse map lookup).
|
||||
// If not found, fall back to a raw channel:platform_id marker so nothing
|
||||
// gets silently dropped — this should only happen if the destination was
|
||||
// removed between when the message was received and when it's being processed.
|
||||
const fromDest = findByRouting(msg.channel_type, msg.platform_id);
|
||||
const fromAttr = fromDest
|
||||
? ` from="${escapeXml(fromDest.name)}"`
|
||||
: msg.channel_type || msg.platform_id
|
||||
? ` from="unknown:${escapeXml(msg.channel_type || '')}:${escapeXml(msg.platform_id || '')}"`
|
||||
: '';
|
||||
const fromAttr = originAttr(msg);
|
||||
|
||||
return `<message${idAttr}${fromAttr} sender="${escapeXml(sender)}" time="${escapeXml(time)}"${replyAttr}>${replyPrefix}${escapeXml(text)}${attachmentsSuffix}</message>`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a ` from="destination_name"` attribute string from a message's routing
|
||||
* fields. Shared by all formatters so the agent always knows where a message
|
||||
* originated — critical for explicit addressing.
|
||||
*/
|
||||
function originAttr(msg: MessageInRow): string {
|
||||
const fromDest = findByRouting(msg.channel_type, msg.platform_id);
|
||||
if (fromDest) return ` from="${escapeXml(fromDest.name)}"`;
|
||||
if (msg.channel_type || msg.platform_id) {
|
||||
return ` from="unknown:${escapeXml(msg.channel_type || '')}:${escapeXml(msg.platform_id || '')}"`;
|
||||
}
|
||||
return '';
|
||||
}
|
||||
|
||||
function formatTaskMessage(msg: MessageInRow): string {
|
||||
const content = parseContent(msg.content);
|
||||
const parts = ['[SCHEDULED TASK]'];
|
||||
const from = originAttr(msg);
|
||||
const time = formatLocalTime(msg.timestamp, TIMEZONE);
|
||||
const parts: string[] = [];
|
||||
if (content.scriptOutput) {
|
||||
parts.push('', 'Script output:', JSON.stringify(content.scriptOutput, null, 2));
|
||||
parts.push('Script output:', JSON.stringify(content.scriptOutput, null, 2), '');
|
||||
}
|
||||
parts.push('', 'Instructions:', content.prompt || '');
|
||||
return parts.join('\n');
|
||||
parts.push('Instructions:', content.prompt || '');
|
||||
return `<task${from} time="${escapeXml(time)}">${parts.join('\n')}</task>`;
|
||||
}
|
||||
|
||||
function formatWebhookMessage(msg: MessageInRow): string {
|
||||
const content = parseContent(msg.content);
|
||||
const source = content.source || 'unknown';
|
||||
const event = content.event || 'unknown';
|
||||
return `[WEBHOOK: ${source}/${event}]\n\n${JSON.stringify(content.payload || content, null, 2)}`;
|
||||
const from = originAttr(msg);
|
||||
return `<webhook${from} source="${escapeXml(source)}" event="${escapeXml(event)}">${JSON.stringify(content.payload || content, null, 2)}</webhook>`;
|
||||
}
|
||||
|
||||
function formatSystemMessage(msg: MessageInRow): string {
|
||||
const content = parseContent(msg.content);
|
||||
return `[SYSTEM RESPONSE]\n\nAction: ${content.action || 'unknown'}\nStatus: ${content.status || 'unknown'}\nResult: ${JSON.stringify(content.result || null)}`;
|
||||
const from = originAttr(msg);
|
||||
return `<system_response${from} action="${escapeXml(content.action || 'unknown')}" status="${escapeXml(content.status || 'unknown')}">${JSON.stringify(content.result || null)}</system_response>`;
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -47,7 +47,7 @@ describe('formatter', () => {
|
||||
insertMessage('m1', 'task', { prompt: 'Review open PRs' });
|
||||
const messages = getPendingMessages();
|
||||
const prompt = formatMessages(messages);
|
||||
expect(prompt).toContain('[SCHEDULED TASK]');
|
||||
expect(prompt).toContain('<task');
|
||||
expect(prompt).toContain('Review open PRs');
|
||||
});
|
||||
|
||||
@@ -55,15 +55,17 @@ describe('formatter', () => {
|
||||
insertMessage('m1', 'webhook', { source: 'github', event: 'push', payload: { ref: 'main' } });
|
||||
const messages = getPendingMessages();
|
||||
const prompt = formatMessages(messages);
|
||||
expect(prompt).toContain('[WEBHOOK: github/push]');
|
||||
expect(prompt).toContain('<webhook');
|
||||
expect(prompt).toContain('source="github"');
|
||||
expect(prompt).toContain('event="push"');
|
||||
});
|
||||
|
||||
it('should format system messages', () => {
|
||||
insertMessage('m1', 'system', { action: 'register_group', status: 'success', result: { id: 'ag-1' } });
|
||||
const messages = getPendingMessages();
|
||||
const prompt = formatMessages(messages);
|
||||
expect(prompt).toContain('[SYSTEM RESPONSE]');
|
||||
expect(prompt).toContain('register_group');
|
||||
expect(prompt).toContain('<system_response');
|
||||
expect(prompt).toContain('action="register_group"');
|
||||
});
|
||||
|
||||
it('should handle mixed kinds', () => {
|
||||
@@ -72,7 +74,7 @@ describe('formatter', () => {
|
||||
const messages = getPendingMessages();
|
||||
const prompt = formatMessages(messages);
|
||||
expect(prompt).toContain('sender="John"');
|
||||
expect(prompt).toContain('[SYSTEM RESPONSE]');
|
||||
expect(prompt).toContain('<system_response');
|
||||
});
|
||||
|
||||
it('should escape XML in content', () => {
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import { findByName, getAllDestinations, type DestinationEntry } from './destinations.js';
|
||||
import { findByName, type DestinationEntry } from './destinations.js';
|
||||
import { getPendingMessages, markProcessing, markCompleted, type MessageInRow } from './db/messages-in.js';
|
||||
import { writeMessageOut } from './db/messages-out.js';
|
||||
import { touchHeartbeat, clearStaleProcessingAcks } from './db/connection.js';
|
||||
import { getInboundDb, touchHeartbeat, clearStaleProcessingAcks } from './db/connection.js';
|
||||
import {
|
||||
clearContinuation,
|
||||
migrateLegacyContinuation,
|
||||
@@ -396,14 +396,10 @@ function handleEvent(event: ProviderEvent, _routing: RoutingContext): void {
|
||||
/**
|
||||
* Parse the agent's final text for <message to="name">...</message> blocks
|
||||
* and dispatch each one to its resolved destination. Text outside of blocks
|
||||
* (including <internal>...</internal>) is normally scratchpad — logged but
|
||||
* not sent.
|
||||
* (including <internal>...</internal>) is scratchpad — logged but not sent.
|
||||
*
|
||||
* Single-destination shortcut: if the agent has exactly one configured
|
||||
* destination AND the output contains zero <message> blocks, the entire
|
||||
* cleaned text (with <internal> tags stripped) is sent to that destination.
|
||||
* This preserves the simple case of one user on one channel — the agent
|
||||
* doesn't need to know about wrapping syntax at all.
|
||||
* The agent must always wrap output in <message to="name">...</message>
|
||||
* blocks, even with a single destination. Bare text is scratchpad only.
|
||||
*/
|
||||
function dispatchResultText(text: string, routing: RoutingContext): void {
|
||||
const MESSAGE_RE = /<message\s+to="([^"]+)"\s*>([\s\S]*?)<\/message>/g;
|
||||
@@ -436,30 +432,6 @@ function dispatchResultText(text: string, routing: RoutingContext): void {
|
||||
|
||||
const scratchpad = stripInternalTags(scratchpadParts.join(''));
|
||||
|
||||
// Single-destination shortcut: the agent wrote plain text — send to
|
||||
// the session's originating channel (from session_routing) if available,
|
||||
// otherwise fall back to the single destination.
|
||||
if (sent === 0 && scratchpad) {
|
||||
if (routing.channelType && routing.platformId) {
|
||||
// Reply to the channel/thread the message came from
|
||||
writeMessageOut({
|
||||
id: generateId(),
|
||||
in_reply_to: routing.inReplyTo,
|
||||
kind: 'chat',
|
||||
platform_id: routing.platformId,
|
||||
channel_type: routing.channelType,
|
||||
thread_id: routing.threadId,
|
||||
content: JSON.stringify({ text: scratchpad }),
|
||||
});
|
||||
return;
|
||||
}
|
||||
const all = getAllDestinations();
|
||||
if (all.length === 1) {
|
||||
sendToDestination(all[0], scratchpad, routing);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (scratchpad) {
|
||||
log(`[scratchpad] ${scratchpad.slice(0, 500)}${scratchpad.length > 500 ? '…' : ''}`);
|
||||
}
|
||||
@@ -472,20 +444,46 @@ function dispatchResultText(text: string, routing: RoutingContext): void {
|
||||
function sendToDestination(dest: DestinationEntry, body: string, routing: RoutingContext): void {
|
||||
const platformId = dest.type === 'channel' ? dest.platformId! : dest.agentGroupId!;
|
||||
const channelType = dest.type === 'channel' ? dest.channelType! : 'agent';
|
||||
// Inherit thread_id from the inbound routing context so replies land in the
|
||||
// same thread the conversation is in. For non-threaded adapters the router
|
||||
// strips thread_id at ingest, so this will already be null.
|
||||
// Resolve thread_id per-destination from the most recent inbound message
|
||||
// that came from this same channel+platform. In agent-shared sessions,
|
||||
// different destinations have different thread contexts — using a single
|
||||
// routing.threadId would stamp one channel's thread onto another.
|
||||
const destRouting = resolveDestinationThread(channelType, platformId);
|
||||
writeMessageOut({
|
||||
id: generateId(),
|
||||
in_reply_to: routing.inReplyTo,
|
||||
in_reply_to: destRouting?.inReplyTo ?? routing.inReplyTo,
|
||||
kind: 'chat',
|
||||
platform_id: platformId,
|
||||
channel_type: channelType,
|
||||
thread_id: routing.threadId,
|
||||
thread_id: destRouting?.threadId ?? null,
|
||||
content: JSON.stringify({ text: body }),
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the thread_id and message id from the most recent inbound message
|
||||
* matching the given channel+platform. Returns null if no match found.
|
||||
*/
|
||||
function resolveDestinationThread(
|
||||
channelType: string,
|
||||
platformId: string,
|
||||
): { threadId: string | null; inReplyTo: string | null } | null {
|
||||
try {
|
||||
const db = getInboundDb();
|
||||
const row = db
|
||||
.prepare(
|
||||
`SELECT thread_id, id FROM messages_in
|
||||
WHERE channel_type = ? AND platform_id = ?
|
||||
ORDER BY seq DESC LIMIT 1`,
|
||||
)
|
||||
.get(channelType, platformId) as { thread_id: string | null; id: string } | undefined;
|
||||
if (row) return { threadId: row.thread_id, inReplyTo: row.id };
|
||||
} catch {
|
||||
// Fall through — DB may not have these columns on older sessions
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
function sleep(ms: number): Promise<void> {
|
||||
return new Promise((resolve) => setTimeout(resolve, ms));
|
||||
}
|
||||
|
||||
@@ -14,6 +14,18 @@ const DEFAULT_SETTINGS_JSON =
|
||||
CLAUDE_CODE_ADDITIONAL_DIRECTORIES_CLAUDE_MD: '1',
|
||||
CLAUDE_CODE_DISABLE_AUTO_MEMORY: '0',
|
||||
},
|
||||
hooks: {
|
||||
PreCompact: [
|
||||
{
|
||||
hooks: [
|
||||
{
|
||||
type: 'command',
|
||||
command: 'bun /app/src/compact-instructions.ts',
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
null,
|
||||
2,
|
||||
@@ -71,10 +83,13 @@ export function initGroupFilesystem(group: AgentGroup, opts?: { instructions?: s
|
||||
if (!fs.existsSync(settingsFile)) {
|
||||
fs.writeFileSync(settingsFile, DEFAULT_SETTINGS_JSON);
|
||||
initialized.push('settings.json');
|
||||
} else {
|
||||
ensurePreCompactHook(settingsFile, initialized);
|
||||
}
|
||||
|
||||
// Skills directory — created empty here; symlinks are synced at spawn
|
||||
// time by container-runner.ts based on container.json skills selection.
|
||||
// (ensurePreCompactHook is defined after the main function.)
|
||||
const skillsDst = path.join(claudeDir, 'skills');
|
||||
if (!fs.existsSync(skillsDst)) {
|
||||
fs.mkdirSync(skillsDst, { recursive: true });
|
||||
@@ -90,3 +105,32 @@ export function initGroupFilesystem(group: AgentGroup, opts?: { instructions?: s
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
const PRE_COMPACT_COMMAND = 'bun /app/src/compact-instructions.ts';
|
||||
|
||||
/**
|
||||
* Patch an existing settings.json to add the PreCompact hook if missing.
|
||||
* Runs on every group init so pre-existing groups pick up the hook.
|
||||
*/
|
||||
function ensurePreCompactHook(settingsFile: string, initialized: string[]): void {
|
||||
try {
|
||||
const raw = fs.readFileSync(settingsFile, 'utf-8');
|
||||
const settings = JSON.parse(raw);
|
||||
|
||||
// Check if there's already a PreCompact hook with our command.
|
||||
const existing = settings.hooks?.PreCompact as unknown[] | undefined;
|
||||
if (existing && JSON.stringify(existing).includes(PRE_COMPACT_COMMAND)) return;
|
||||
|
||||
// Add the hook, preserving existing hooks.
|
||||
if (!settings.hooks) settings.hooks = {};
|
||||
if (!settings.hooks.PreCompact) settings.hooks.PreCompact = [];
|
||||
settings.hooks.PreCompact.push({
|
||||
hooks: [{ type: 'command', command: PRE_COMPACT_COMMAND }],
|
||||
});
|
||||
|
||||
fs.writeFileSync(settingsFile, JSON.stringify(settings, null, 2) + '\n');
|
||||
initialized.push('settings.json (added PreCompact hook)');
|
||||
} catch {
|
||||
// Don't break init if settings.json is malformed — it'll use whatever's there.
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user