diff --git a/container/agent-runner/src/poll-loop.ts b/container/agent-runner/src/poll-loop.ts index bd48db235..986489f92 100644 --- a/container/agent-runner/src/poll-loop.ts +++ b/container/agent-runner/src/poll-loop.ts @@ -260,31 +260,69 @@ async function processQuery( // Stream liveness is decided host-side via the heartbeat file + processing // claim age (see src/host-sweep.ts); if something is truly stuck, the host // will kill the container and messages get reset to pending. + let pollInFlight = false; const pollHandle = setInterval(() => { - if (done) return; + if (done || pollInFlight) return; + pollInFlight = true; - // Skip system messages (MCP tool responses) and /clear (needs fresh query). - // Thread routing is the router's concern — if a message landed in this - // session, the agent should see it. Per-thread sessions already isolate - // threads into separate containers; shared sessions intentionally merge - // everything. Filtering on thread_id here caused deadlocks when the - // initial batch and follow-ups had mismatched thread_ids (e.g. a - // host-generated welcome trigger with null thread vs a Discord DM reply). - const newMessages = getPendingMessages().filter((m) => { - if (m.kind === 'system') return false; - if ((m.kind === 'chat' || m.kind === 'chat-sdk') && isClearCommand(m)) return false; - return true; - }); - if (newMessages.length > 0) { - const newIds = newMessages.map((m) => m.id); - markProcessing(newIds); + void (async () => { + try { + // Skip system messages (MCP tool responses) and /clear (needs fresh query). + // Thread routing is the router's concern — if a message landed in this + // session, the agent should see it. Per-thread sessions already isolate + // threads into separate containers; shared sessions intentionally merge + // everything. Filtering on thread_id here caused deadlocks when the + // initial batch and follow-ups had mismatched thread_ids (e.g. a + // host-generated welcome trigger with null thread vs a Discord DM reply). + const newMessages = getPendingMessages().filter((m) => { + if (m.kind === 'system') return false; + if ((m.kind === 'chat' || m.kind === 'chat-sdk') && isClearCommand(m)) return false; + return true; + }); + if (newMessages.length === 0) return; - const prompt = formatMessages(newMessages); - log(`Pushing ${newMessages.length} follow-up message(s) into active query`); - query.push(prompt); + const newIds = newMessages.map((m) => m.id); + markProcessing(newIds); - markCompleted(newIds); - } + // Run pre-task scripts on follow-ups too — without this, a task that + // arrives during an active query (e.g. a */10 monitoring cron) bypasses + // its script gate and always wakes the agent, defeating the gate. + // Mirrors the initial-batch hook above. + let keep = newMessages; + let skipped: string[] = []; + // MODULE-HOOK:scheduling-pre-task-followup:start + const { applyPreTaskScripts } = await import('./scheduling/task-script.js'); + const preTask = await applyPreTaskScripts(newMessages); + keep = preTask.keep; + skipped = preTask.skipped; + if (skipped.length > 0) { + markCompleted(skipped); + log(`Pre-task script skipped ${skipped.length} follow-up task(s): ${skipped.join(', ')}`); + } + // MODULE-HOOK:scheduling-pre-task-followup:end + + if (keep.length === 0) return; + // Re-check done — the outer query may have finished while the script + // was awaited. Pushing into a closed stream is wasted work; the + // claimed messages get released by the host's processing-claim sweep. + if (done) return; + + const keptIds = keep.map((m) => m.id); + const prompt = formatMessages(keep); + log(`Pushing ${keep.length} follow-up message(s) into active query`); + query.push(prompt); + markCompleted(keptIds); + } catch (err) { + // Without this catch the rejection escapes the void IIFE and Node + // terminates the container on unhandled-rejection. The initial-batch + // path is wrapped by processQuery's outer try/catch; the follow-up + // path is not, so it needs its own. + const errMsg = err instanceof Error ? err.message : String(err); + log(`Follow-up poll error: ${errMsg}`); + } finally { + pollInFlight = false; + } + })(); }, ACTIVE_POLL_INTERVAL_MS); try {