fix(container): gate poll loop on trigger=1 to honor accumulate contract

A warm container picks up every pending messages_in row on each poll tick
and calls markProcessing → agent.query → markCompleted. Before this, that
included trigger=0 rows (ignored_message_policy='accumulate' context),
causing the agent to wake and potentially respond to messages the wiring
had explicitly opted out of engaging on — defeating accumulate's "store
as context, don't engage" contract.

Gate the main poll loop with `messages.some(m => m.trigger === 1)` —
mirrors host-side countDueMessages which is already gated. If the batch
has no wake-eligible row, sleep and leave them pending. They ride along
via the same getPendingMessages query the next time a real trigger=1
lands, which is the intended accumulate behavior.

The concurrent active-turn poll (line ~290) is unchanged on purpose —
once the agent has engaged, pushing in accumulate rows mid-turn as
additional context is desired.

initTestSessionDb was missing the trigger and series_id columns on
messages_in, out of sync with the live migration. Added both so the new
tests (and any future trigger-aware tests) can run.

Four tests cover the data contract: trigger=0 rows are returned by
getPendingMessages (so they ride along), the gate predicate correctly
identifies accumulate-only batches, mixed batches pass the gate, and the
schema default of 1 applies when the column is omitted.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
gavrielc
2026-04-20 11:23:47 +03:00
parent c38e5b11a8
commit 31f2da9585
3 changed files with 64 additions and 4 deletions
@@ -157,7 +157,9 @@ export function initTestSessionDb(): { inbound: Database; outbound: Database } {
status TEXT DEFAULT 'pending',
process_after TEXT,
recurrence TEXT,
series_id TEXT,
tries INTEGER DEFAULT 0,
trigger INTEGER NOT NULL DEFAULT 1,
platform_id TEXT,
channel_type TEXT,
thread_id TEXT,
+49 -4
View File
@@ -14,13 +14,13 @@ afterEach(() => {
closeSessionDb();
});
function insertMessage(id: string, kind: string, content: object, opts?: { processAfter?: string }) {
function insertMessage(id: string, kind: string, content: object, opts?: { processAfter?: string; trigger?: 0 | 1 }) {
getInboundDb()
.prepare(
`INSERT INTO messages_in (id, kind, timestamp, status, process_after, content)
VALUES (?, ?, datetime('now'), 'pending', ?, ?)`,
`INSERT INTO messages_in (id, kind, timestamp, status, process_after, trigger, content)
VALUES (?, ?, datetime('now'), 'pending', ?, ?, ?)`,
)
.run(id, kind, opts?.processAfter ?? null, JSON.stringify(content));
.run(id, kind, opts?.processAfter ?? null, opts?.trigger ?? 1, JSON.stringify(content));
}
describe('formatter', () => {
@@ -84,6 +84,51 @@ describe('formatter', () => {
});
});
describe('accumulate gate (trigger column)', () => {
it('getPendingMessages returns both trigger=0 and trigger=1 rows', () => {
// trigger=0 rides along as context, trigger=1 is the wake-eligible row.
// The poll loop's gate depends on this data contract.
insertMessage('m1', 'chat', { sender: 'A', text: 'chit chat' }, { trigger: 0 });
insertMessage('m2', 'chat', { sender: 'B', text: 'actual mention' }, { trigger: 1 });
const messages = getPendingMessages();
expect(messages).toHaveLength(2);
const byId = Object.fromEntries(messages.map((m) => [m.id, m]));
expect(byId.m1.trigger).toBe(0);
expect(byId.m2.trigger).toBe(1);
});
it('trigger=0-only batch: gate predicate `some(trigger===1)` is false', () => {
insertMessage('m1', 'chat', { sender: 'A', text: 'noise' }, { trigger: 0 });
insertMessage('m2', 'chat', { sender: 'B', text: 'more noise' }, { trigger: 0 });
const messages = getPendingMessages();
// This is the exact predicate the poll loop uses to skip accumulate-only
// batches — gate should be false, so the loop sleeps without waking the agent.
expect(messages.some((m) => m.trigger === 1)).toBe(false);
});
it('mixed batch: gate is true → loop proceeds, accumulated rows ride along', () => {
insertMessage('m1', 'chat', { sender: 'A', text: 'earlier chatter' }, { trigger: 0 });
insertMessage('m2', 'chat', { sender: 'B', text: 'the real mention' }, { trigger: 1 });
const messages = getPendingMessages();
expect(messages.some((m) => m.trigger === 1)).toBe(true);
// Both messages are present for the formatter → agent sees the prior context.
expect(messages.map((m) => m.id).sort()).toEqual(['m1', 'm2']);
});
it('trigger column defaults to 1 for legacy inserts without explicit value', () => {
// The schema default is 1 (see src/db/schema.ts INBOUND_SCHEMA) — existing
// rows / tests without the column set are effectively wake-eligible.
getInboundDb()
.prepare(
`INSERT INTO messages_in (id, kind, timestamp, status, content)
VALUES ('m1', 'chat', datetime('now'), 'pending', '{"text":"hi"}')`,
)
.run();
const [msg] = getPendingMessages();
expect(msg.trigger).toBe(1);
});
});
describe('routing', () => {
it('should extract routing from messages', () => {
getInboundDb()
+13
View File
@@ -72,6 +72,19 @@ export async function runPollLoop(config: PollLoopConfig): Promise<void> {
continue;
}
// Accumulate gate: if the batch contains only trigger=0 rows
// (context-only, router-stored under ignored_message_policy='accumulate'),
// don't wake the agent. Leave them `pending` — they'll ride along the
// next time a real trigger=1 message lands via this same getPendingMessages
// query. Without this gate, a warm container keeps processing
// (and potentially responding to) every accumulate-only batch, defeating
// the "store as context, don't engage" contract. Host-side countDueMessages
// gates the same way for wake-from-cold (see src/db/session-db.ts).
if (!messages.some((m) => m.trigger === 1)) {
await sleep(POLL_INTERVAL_MS);
continue;
}
const ids = messages.map((m) => m.id);
markProcessing(ids);