fix(agent-runner): require explicit destination addressing, fix per-destination threading

The poll loop had a bare-text routing fallback in dispatchResultText: when
the agent produced text without <message to="..."> wrapping, it would auto-
route to the session's originating channel (via a frozen RoutingContext) or
to the single configured destination. This caused three problems:

1. Routing drift: RoutingContext was extracted once from the initial batch
   and never refreshed. When the initial batch was a null-routed cron task
   and a real chat arrived mid-query, replies were silently dropped to
   scratchpad because the frozen routing had all-null fields.

2. Cross-channel thread bleed: sendToDestination applied a single
   routing.threadId to every outbound message regardless of destination.
   In agent-shared sessions (multiple channels sharing one session), one
   channel's thread ID was stamped onto messages to a different channel.

3. Inconsistent formatting: task, webhook, and system messages had no
   origin metadata in their formatted output, so the agent couldn't tell
   which destination they came from — even when the underlying messages_in
   rows carried routing fields.

Changes:

- Remove the bare-text routing fallbacks in dispatchResultText (both the
  routing-based and single-destination shortcuts). All agent output must
  be wrapped in <message to="name">...</message>. Bare text is scratchpad.

- Update buildDestinationsSection() to require explicit wrapping for all
  groups, including single-destination. No more "no special wrapping
  needed" shortcut.

- Resolve thread_id per-destination via resolveDestinationThread(), which
  queries messages_in for the most recent message matching the target
  channel+platform. Falls back to null (top-level channel message) when
  no prior inbound exists for that destination.

- Extract originAttr() helper in formatter.ts and apply it to all message
  types. Tasks now render as <task from="dest" time="...">, webhooks as
  <webhook from="dest" source="..." event="...">, system responses as
  <system_response from="dest" ...>. The agent always sees where a
  message originated.

- Add a PreCompact shell hook (compact-instructions.ts) that outputs
  custom compaction instructions, telling the compactor to preserve
  recent message XML structure and routing metadata in the summary.
  Wired via settings.json in the .claude-shared scaffold, with a
  migration path (ensurePreCompactHook) for existing groups.

Relation to open PRs:

- #2277 (mergeRouting) becomes unnecessary — the routing fallback it
  patches no longer exists. Can be closed.
- #2327 (post-compaction destination reminder) is complementary — it
  handles the post-compaction push, this handles pre-compaction
  instructions. Both can merge independently.
- #2328 (default routing instruction) is complementary — it adds "reply
  to the from= destination" guidance to the multi-destination section.
  Compatible with the unified instruction format here.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
gavrielc
2026-05-07 19:47:46 +03:00
parent ba70ddf73a
commit 9db39b291d
6 changed files with 155 additions and 76 deletions
@@ -0,0 +1,34 @@
/**
* PreCompact hook script — outputs custom compaction instructions to stdout.
*
* Claude Code captures the stdout of PreCompact shell hooks and passes it
* as `customInstructions` to the compaction prompt. This ensures the
* compaction summary preserves message routing context that the agent needs
* to correctly address responses.
*
* Invoked by the PreCompact hook in .claude-shared/settings.json:
* "command": "bun /app/src/compact-instructions.ts"
*/
import { getAllDestinations } from './destinations.js';
const destinations = getAllDestinations();
const names = destinations.map((d) => d.name);
const instructions = [
'Preserve the following in the compaction summary:',
'',
'1. For recent messages, keep the full XML structure including all attributes:',
' - <message from="..." sender="..." time="..."> for chat messages',
' - <task from="..." time="..."> for scheduled tasks',
' - <webhook from="..." source="..." event="..."> for webhooks',
' The message content can be summarized if long, but the XML tags and attributes must remain.',
'',
'2. Preserve the chronological message/reply sequence of recent exchanges.',
' The agent needs to see: who said what, in what order, and from which destination.',
'',
'3. The `from` attribute identifies which destination sent the message.',
' The agent MUST wrap all responses in <message to="name">...</message> blocks.',
` Available destinations: ${names.length > 0 ? names.map((n) => `\`${n}\``).join(', ') : '(none)'}`,
];
console.log(instructions.join('\n'));
+9 -17
View File
@@ -102,28 +102,20 @@ function buildDestinationsSection(): string {
].join('\n'); ].join('\n');
} }
// Single-destination shortcut: the agent just writes its response normally. const lines = ['## Sending messages', ''];
if (all.length === 1) { if (all.length === 1) {
const d = all[0]; const d = all[0];
const label = d.displayName && d.displayName !== d.name ? ` (${d.displayName})` : ''; const label = d.displayName && d.displayName !== d.name ? ` (${d.displayName})` : '';
return [ lines.push(`Your destination is \`${d.name}\`${label}.`);
'## Sending messages', } else {
'', lines.push('You can send messages to the following destinations:', '');
`Your messages are delivered to \`${d.name}\`${label}. Just write your response directly — no special wrapping needed.`, for (const d of all) {
'', const label = d.displayName && d.displayName !== d.name ? ` (${d.displayName})` : '';
'To mark something as scratchpad (logged but not sent), wrap it in `<internal>...</internal>`.', lines.push(`- \`${d.name}\`${label}`);
'', }
'To send a message mid-response (e.g., an acknowledgment before a long task), call the `send_message` MCP tool.',
].join('\n');
}
const lines = ['## Sending messages', '', 'You can send messages to the following destinations:', ''];
for (const d of all) {
const label = d.displayName && d.displayName !== d.name ? ` (${d.displayName})` : '';
lines.push(`- \`${d.name}\`${label}`);
} }
lines.push(''); lines.push('');
lines.push('To send a message, wrap it in a `<message to="name">...</message>` block.'); lines.push('**Every response must be wrapped** in a `<message to="name">...</message>` block.');
lines.push('You can include multiple `<message>` blocks in one response to send to multiple destinations.'); lines.push('You can include multiple `<message>` blocks in one response to send to multiple destinations.');
lines.push('Text outside of `<message>` blocks is scratchpad — logged but not sent anywhere.'); lines.push('Text outside of `<message>` blocks is scratchpad — logged but not sent anywhere.');
lines.push('Use `<internal>...</internal>` to make scratchpad intent explicit.'); lines.push('Use `<internal>...</internal>` to make scratchpad intent explicit.');
+25 -16
View File
@@ -177,40 +177,49 @@ function formatSingleChat(msg: MessageInRow): string {
const replyPrefix = formatReplyContext(content.replyTo); const replyPrefix = formatReplyContext(content.replyTo);
const attachmentsSuffix = formatAttachments(content.attachments); const attachmentsSuffix = formatAttachments(content.attachments);
// Look up the destination name for the origin (reverse map lookup). const fromAttr = originAttr(msg);
// If not found, fall back to a raw channel:platform_id marker so nothing
// gets silently dropped — this should only happen if the destination was
// removed between when the message was received and when it's being processed.
const fromDest = findByRouting(msg.channel_type, msg.platform_id);
const fromAttr = fromDest
? ` from="${escapeXml(fromDest.name)}"`
: msg.channel_type || msg.platform_id
? ` from="unknown:${escapeXml(msg.channel_type || '')}:${escapeXml(msg.platform_id || '')}"`
: '';
return `<message${idAttr}${fromAttr} sender="${escapeXml(sender)}" time="${escapeXml(time)}"${replyAttr}>${replyPrefix}${escapeXml(text)}${attachmentsSuffix}</message>`; return `<message${idAttr}${fromAttr} sender="${escapeXml(sender)}" time="${escapeXml(time)}"${replyAttr}>${replyPrefix}${escapeXml(text)}${attachmentsSuffix}</message>`;
} }
/**
* Build a ` from="destination_name"` attribute string from a message's routing
* fields. Shared by all formatters so the agent always knows where a message
* originated — critical for explicit addressing.
*/
function originAttr(msg: MessageInRow): string {
const fromDest = findByRouting(msg.channel_type, msg.platform_id);
if (fromDest) return ` from="${escapeXml(fromDest.name)}"`;
if (msg.channel_type || msg.platform_id) {
return ` from="unknown:${escapeXml(msg.channel_type || '')}:${escapeXml(msg.platform_id || '')}"`;
}
return '';
}
function formatTaskMessage(msg: MessageInRow): string { function formatTaskMessage(msg: MessageInRow): string {
const content = parseContent(msg.content); const content = parseContent(msg.content);
const parts = ['[SCHEDULED TASK]']; const from = originAttr(msg);
const time = formatLocalTime(msg.timestamp, TIMEZONE);
const parts: string[] = [];
if (content.scriptOutput) { if (content.scriptOutput) {
parts.push('', 'Script output:', JSON.stringify(content.scriptOutput, null, 2)); parts.push('Script output:', JSON.stringify(content.scriptOutput, null, 2), '');
} }
parts.push('', 'Instructions:', content.prompt || ''); parts.push('Instructions:', content.prompt || '');
return parts.join('\n'); return `<task${from} time="${escapeXml(time)}">${parts.join('\n')}</task>`;
} }
function formatWebhookMessage(msg: MessageInRow): string { function formatWebhookMessage(msg: MessageInRow): string {
const content = parseContent(msg.content); const content = parseContent(msg.content);
const source = content.source || 'unknown'; const source = content.source || 'unknown';
const event = content.event || 'unknown'; const event = content.event || 'unknown';
return `[WEBHOOK: ${source}/${event}]\n\n${JSON.stringify(content.payload || content, null, 2)}`; const from = originAttr(msg);
return `<webhook${from} source="${escapeXml(source)}" event="${escapeXml(event)}">${JSON.stringify(content.payload || content, null, 2)}</webhook>`;
} }
function formatSystemMessage(msg: MessageInRow): string { function formatSystemMessage(msg: MessageInRow): string {
const content = parseContent(msg.content); const content = parseContent(msg.content);
return `[SYSTEM RESPONSE]\n\nAction: ${content.action || 'unknown'}\nStatus: ${content.status || 'unknown'}\nResult: ${JSON.stringify(content.result || null)}`; const from = originAttr(msg);
return `<system_response${from} action="${escapeXml(content.action || 'unknown')}" status="${escapeXml(content.status || 'unknown')}">${JSON.stringify(content.result || null)}</system_response>`;
} }
/** /**
+7 -5
View File
@@ -47,7 +47,7 @@ describe('formatter', () => {
insertMessage('m1', 'task', { prompt: 'Review open PRs' }); insertMessage('m1', 'task', { prompt: 'Review open PRs' });
const messages = getPendingMessages(); const messages = getPendingMessages();
const prompt = formatMessages(messages); const prompt = formatMessages(messages);
expect(prompt).toContain('[SCHEDULED TASK]'); expect(prompt).toContain('<task');
expect(prompt).toContain('Review open PRs'); expect(prompt).toContain('Review open PRs');
}); });
@@ -55,15 +55,17 @@ describe('formatter', () => {
insertMessage('m1', 'webhook', { source: 'github', event: 'push', payload: { ref: 'main' } }); insertMessage('m1', 'webhook', { source: 'github', event: 'push', payload: { ref: 'main' } });
const messages = getPendingMessages(); const messages = getPendingMessages();
const prompt = formatMessages(messages); const prompt = formatMessages(messages);
expect(prompt).toContain('[WEBHOOK: github/push]'); expect(prompt).toContain('<webhook');
expect(prompt).toContain('source="github"');
expect(prompt).toContain('event="push"');
}); });
it('should format system messages', () => { it('should format system messages', () => {
insertMessage('m1', 'system', { action: 'register_group', status: 'success', result: { id: 'ag-1' } }); insertMessage('m1', 'system', { action: 'register_group', status: 'success', result: { id: 'ag-1' } });
const messages = getPendingMessages(); const messages = getPendingMessages();
const prompt = formatMessages(messages); const prompt = formatMessages(messages);
expect(prompt).toContain('[SYSTEM RESPONSE]'); expect(prompt).toContain('<system_response');
expect(prompt).toContain('register_group'); expect(prompt).toContain('action="register_group"');
}); });
it('should handle mixed kinds', () => { it('should handle mixed kinds', () => {
@@ -72,7 +74,7 @@ describe('formatter', () => {
const messages = getPendingMessages(); const messages = getPendingMessages();
const prompt = formatMessages(messages); const prompt = formatMessages(messages);
expect(prompt).toContain('sender="John"'); expect(prompt).toContain('sender="John"');
expect(prompt).toContain('[SYSTEM RESPONSE]'); expect(prompt).toContain('<system_response');
}); });
it('should escape XML in content', () => { it('should escape XML in content', () => {
+36 -38
View File
@@ -1,7 +1,7 @@
import { findByName, getAllDestinations, type DestinationEntry } from './destinations.js'; import { findByName, type DestinationEntry } from './destinations.js';
import { getPendingMessages, markProcessing, markCompleted, type MessageInRow } from './db/messages-in.js'; import { getPendingMessages, markProcessing, markCompleted, type MessageInRow } from './db/messages-in.js';
import { writeMessageOut } from './db/messages-out.js'; import { writeMessageOut } from './db/messages-out.js';
import { touchHeartbeat, clearStaleProcessingAcks } from './db/connection.js'; import { getInboundDb, touchHeartbeat, clearStaleProcessingAcks } from './db/connection.js';
import { import {
clearContinuation, clearContinuation,
migrateLegacyContinuation, migrateLegacyContinuation,
@@ -396,14 +396,10 @@ function handleEvent(event: ProviderEvent, _routing: RoutingContext): void {
/** /**
* Parse the agent's final text for <message to="name">...</message> blocks * Parse the agent's final text for <message to="name">...</message> blocks
* and dispatch each one to its resolved destination. Text outside of blocks * and dispatch each one to its resolved destination. Text outside of blocks
* (including <internal>...</internal>) is normally scratchpad — logged but * (including <internal>...</internal>) is scratchpad — logged but not sent.
* not sent.
* *
* Single-destination shortcut: if the agent has exactly one configured * The agent must always wrap output in <message to="name">...</message>
* destination AND the output contains zero <message> blocks, the entire * blocks, even with a single destination. Bare text is scratchpad only.
* cleaned text (with <internal> tags stripped) is sent to that destination.
* This preserves the simple case of one user on one channel — the agent
* doesn't need to know about wrapping syntax at all.
*/ */
function dispatchResultText(text: string, routing: RoutingContext): void { function dispatchResultText(text: string, routing: RoutingContext): void {
const MESSAGE_RE = /<message\s+to="([^"]+)"\s*>([\s\S]*?)<\/message>/g; const MESSAGE_RE = /<message\s+to="([^"]+)"\s*>([\s\S]*?)<\/message>/g;
@@ -436,30 +432,6 @@ function dispatchResultText(text: string, routing: RoutingContext): void {
const scratchpad = stripInternalTags(scratchpadParts.join('')); const scratchpad = stripInternalTags(scratchpadParts.join(''));
// Single-destination shortcut: the agent wrote plain text — send to
// the session's originating channel (from session_routing) if available,
// otherwise fall back to the single destination.
if (sent === 0 && scratchpad) {
if (routing.channelType && routing.platformId) {
// Reply to the channel/thread the message came from
writeMessageOut({
id: generateId(),
in_reply_to: routing.inReplyTo,
kind: 'chat',
platform_id: routing.platformId,
channel_type: routing.channelType,
thread_id: routing.threadId,
content: JSON.stringify({ text: scratchpad }),
});
return;
}
const all = getAllDestinations();
if (all.length === 1) {
sendToDestination(all[0], scratchpad, routing);
return;
}
}
if (scratchpad) { if (scratchpad) {
log(`[scratchpad] ${scratchpad.slice(0, 500)}${scratchpad.length > 500 ? '…' : ''}`); log(`[scratchpad] ${scratchpad.slice(0, 500)}${scratchpad.length > 500 ? '…' : ''}`);
} }
@@ -472,20 +444,46 @@ function dispatchResultText(text: string, routing: RoutingContext): void {
function sendToDestination(dest: DestinationEntry, body: string, routing: RoutingContext): void { function sendToDestination(dest: DestinationEntry, body: string, routing: RoutingContext): void {
const platformId = dest.type === 'channel' ? dest.platformId! : dest.agentGroupId!; const platformId = dest.type === 'channel' ? dest.platformId! : dest.agentGroupId!;
const channelType = dest.type === 'channel' ? dest.channelType! : 'agent'; const channelType = dest.type === 'channel' ? dest.channelType! : 'agent';
// Inherit thread_id from the inbound routing context so replies land in the // Resolve thread_id per-destination from the most recent inbound message
// same thread the conversation is in. For non-threaded adapters the router // that came from this same channel+platform. In agent-shared sessions,
// strips thread_id at ingest, so this will already be null. // different destinations have different thread contexts — using a single
// routing.threadId would stamp one channel's thread onto another.
const destRouting = resolveDestinationThread(channelType, platformId);
writeMessageOut({ writeMessageOut({
id: generateId(), id: generateId(),
in_reply_to: routing.inReplyTo, in_reply_to: destRouting?.inReplyTo ?? routing.inReplyTo,
kind: 'chat', kind: 'chat',
platform_id: platformId, platform_id: platformId,
channel_type: channelType, channel_type: channelType,
thread_id: routing.threadId, thread_id: destRouting?.threadId ?? null,
content: JSON.stringify({ text: body }), content: JSON.stringify({ text: body }),
}); });
} }
/**
* Find the thread_id and message id from the most recent inbound message
* matching the given channel+platform. Returns null if no match found.
*/
function resolveDestinationThread(
channelType: string,
platformId: string,
): { threadId: string | null; inReplyTo: string | null } | null {
try {
const db = getInboundDb();
const row = db
.prepare(
`SELECT thread_id, id FROM messages_in
WHERE channel_type = ? AND platform_id = ?
ORDER BY seq DESC LIMIT 1`,
)
.get(channelType, platformId) as { thread_id: string | null; id: string } | undefined;
if (row) return { threadId: row.thread_id, inReplyTo: row.id };
} catch {
// Fall through — DB may not have these columns on older sessions
}
return null;
}
function sleep(ms: number): Promise<void> { function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms)); return new Promise((resolve) => setTimeout(resolve, ms));
} }
+44
View File
@@ -14,6 +14,18 @@ const DEFAULT_SETTINGS_JSON =
CLAUDE_CODE_ADDITIONAL_DIRECTORIES_CLAUDE_MD: '1', CLAUDE_CODE_ADDITIONAL_DIRECTORIES_CLAUDE_MD: '1',
CLAUDE_CODE_DISABLE_AUTO_MEMORY: '0', CLAUDE_CODE_DISABLE_AUTO_MEMORY: '0',
}, },
hooks: {
PreCompact: [
{
hooks: [
{
type: 'command',
command: 'bun /app/src/compact-instructions.ts',
},
],
},
],
},
}, },
null, null,
2, 2,
@@ -71,10 +83,13 @@ export function initGroupFilesystem(group: AgentGroup, opts?: { instructions?: s
if (!fs.existsSync(settingsFile)) { if (!fs.existsSync(settingsFile)) {
fs.writeFileSync(settingsFile, DEFAULT_SETTINGS_JSON); fs.writeFileSync(settingsFile, DEFAULT_SETTINGS_JSON);
initialized.push('settings.json'); initialized.push('settings.json');
} else {
ensurePreCompactHook(settingsFile, initialized);
} }
// Skills directory — created empty here; symlinks are synced at spawn // Skills directory — created empty here; symlinks are synced at spawn
// time by container-runner.ts based on container.json skills selection. // time by container-runner.ts based on container.json skills selection.
// (ensurePreCompactHook is defined after the main function.)
const skillsDst = path.join(claudeDir, 'skills'); const skillsDst = path.join(claudeDir, 'skills');
if (!fs.existsSync(skillsDst)) { if (!fs.existsSync(skillsDst)) {
fs.mkdirSync(skillsDst, { recursive: true }); fs.mkdirSync(skillsDst, { recursive: true });
@@ -90,3 +105,32 @@ export function initGroupFilesystem(group: AgentGroup, opts?: { instructions?: s
}); });
} }
} }
const PRE_COMPACT_COMMAND = 'bun /app/src/compact-instructions.ts';
/**
* Patch an existing settings.json to add the PreCompact hook if missing.
* Runs on every group init so pre-existing groups pick up the hook.
*/
function ensurePreCompactHook(settingsFile: string, initialized: string[]): void {
try {
const raw = fs.readFileSync(settingsFile, 'utf-8');
const settings = JSON.parse(raw);
// Check if there's already a PreCompact hook with our command.
const existing = settings.hooks?.PreCompact as unknown[] | undefined;
if (existing && JSON.stringify(existing).includes(PRE_COMPACT_COMMAND)) return;
// Add the hook, preserving existing hooks.
if (!settings.hooks) settings.hooks = {};
if (!settings.hooks.PreCompact) settings.hooks.PreCompact = [];
settings.hooks.PreCompact.push({
hooks: [{ type: 'command', command: PRE_COMPACT_COMMAND }],
});
fs.writeFileSync(settingsFile, JSON.stringify(settings, null, 2) + '\n');
initialized.push('settings.json (added PreCompact hook)');
} catch {
// Don't break init if settings.json is malformed — it'll use whatever's there.
}
}