From e4181f5451f1ac514c94ab6d4e737bfad5cb5076 Mon Sep 17 00:00:00 2001 From: Charlie Savage Date: Sat, 2 May 2026 22:45:23 -0700 Subject: [PATCH 1/2] =?UTF-8?q?fix(host-sweep):=20regression=20in=20#2183?= =?UTF-8?q?=20=E2=80=94=20orphan-claim=20delete=20missed=20in=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit #2183 added orphan-claim cleanup that reopens `outbound.db` by session path (`openOutboundDbRw(session.agent_group_id, session.id)`) so the delete runs against a writable handle even when callers pass a readonly one. That works for the production caller — there's a real on-disk session DB at the expected path. The test wrapper `_resetStuckProcessingRowsForTesting` (introduced in the same series, #2151) is called with in-memory DBs that have no on-disk path. The reopen creates a fresh empty file at `/v2-sessions/ag-test/sess-test/outbound.db`, runs the delete against that, and leaves the in-memory `outDb` (which the test reads afterward) untouched. The two `resetStuckProcessingRows — orphan claim cleanup` tests assert `getProcessingClaims(outDb).toEqual([])` after the call and fail on the row that's still there. Fix: drop the `_…ForTesting` wrapper, export `resetStuckProcessingRows` directly with an optional `writableOutDb` parameter. When omitted (production), the function reopens `outbound.db` RW by session path — existing behavior, existing safety guarantee. When provided (tests, or any future caller that already holds a writable handle), the function uses it directly and skips the reopen. The optional parameter has a real meaning, not a "for tests" hack. Public API surface change: `_resetStuckProcessingRowsForTesting` is gone, `resetStuckProcessingRows` is now exported. No other callers inside the repo besides the test. --- src/host-sweep.test.ts | 11 +++-------- src/host-sweep.ts | 40 +++++++++++++++++++++++----------------- 2 files changed, 26 insertions(+), 25 deletions(-) diff --git a/src/host-sweep.test.ts b/src/host-sweep.test.ts index bd2e233db..155b1b125 100644 --- a/src/host-sweep.test.ts +++ b/src/host-sweep.test.ts @@ -7,12 +7,7 @@ import Database from 'better-sqlite3'; import { describe, expect, it } from 'vitest'; import { deleteOrphanProcessingClaims, getProcessingClaims } from './db/session-db.js'; -import { - ABSOLUTE_CEILING_MS, - CLAIM_STUCK_MS, - _resetStuckProcessingRowsForTesting, - decideStuckAction, -} from './host-sweep.js'; +import { ABSOLUTE_CEILING_MS, CLAIM_STUCK_MS, resetStuckProcessingRows, decideStuckAction } from './host-sweep.js'; import type { Session } from './types.js'; const BASE = Date.parse('2026-04-20T12:00:00.000Z'); @@ -253,7 +248,7 @@ describe('resetStuckProcessingRows — orphan claim cleanup', () => { // Sanity: the orphan claim is what would trip claim-stuck. expect(getProcessingClaims(outDb)).toHaveLength(1); - _resetStuckProcessingRowsForTesting(inDb, outDb, fakeSession(), 'absolute-ceiling'); + resetStuckProcessingRows(inDb, outDb, fakeSession(), 'absolute-ceiling', outDb); // Regression assertion: orphan claim is gone — next sweep tick will see // an empty claims list and not kill the freshly respawned container. @@ -285,7 +280,7 @@ describe('resetStuckProcessingRows — orphan claim cleanup', () => { .run(claimedAt, future); outDb.prepare("INSERT INTO processing_ack VALUES ('m-2', 'processing', ?)").run(claimedAt); - _resetStuckProcessingRowsForTesting(inDb, outDb, fakeSession(), 'claim-stuck'); + resetStuckProcessingRows(inDb, outDb, fakeSession(), 'claim-stuck', outDb); expect(getProcessingClaims(outDb)).toEqual([]); const row = inDb.prepare('SELECT tries FROM messages_in WHERE id = ?').get('m-2') as { tries: number }; diff --git a/src/host-sweep.ts b/src/host-sweep.ts index 09c82ac76..b10ee0df2 100644 --- a/src/host-sweep.ts +++ b/src/host-sweep.ts @@ -250,20 +250,28 @@ function enforceRunningContainerSla( resetStuckProcessingRows(inDb, outDb, session, 'claim-stuck'); } -export function _resetStuckProcessingRowsForTesting( - inDb: Database.Database, - outDb: Database.Database, - session: Session, - reason: string, -): void { - resetStuckProcessingRows(inDb, outDb, session, reason); -} - -function resetStuckProcessingRows( +/** + * Reset retries on inbound rows the container claimed but never acked, and + * delete the orphan `processing_ack` rows so the next sweep tick doesn't + * see them. + * + * Safe to call only when the container that owned `outbound.db` is dead — + * production callers invoke this either in the `!alive` branch or right + * after `killContainer`. Without that guarantee, the orphan-claim delete + * would race the container's own writer. + * + * `writableOutDb` is the same handle outbound writes go through. When + * omitted (typical production path) the function reopens `outbound.db` + * read-write by session path for the delete and closes that handle on + * exit. Callers that already hold a writable handle — including tests + * using in-memory DBs — can pass it in to skip the reopen. + */ +export function resetStuckProcessingRows( inDb: Database.Database, outDb: Database.Database, session: Session, reason: string, + writableOutDb?: Database.Database, ): void { const claims = getProcessingClaims(outDb); const now = Date.now(); @@ -300,19 +308,17 @@ function resetStuckProcessingRows( // would re-read them, see the old status_changed timestamp, conclude the // freshly respawned container is stuck, and SIGKILL it before its // agent-runner has a chance to run clearStaleProcessingAcks() on startup. - // We're safe to write outbound.db here because we just killed the container - // that owned it (or it crashed and left no writer behind). - // outDb was opened readonly for reads above; reopen with write access for this delete. - let outDbRw: Database.Database | null = null; + const ownsDb = !writableOutDb; + let useDb: Database.Database | null = writableOutDb ?? null; try { - outDbRw = openOutboundDbRw(session.agent_group_id, session.id); - const cleared = deleteOrphanProcessingClaims(outDbRw); + if (!useDb) useDb = openOutboundDbRw(session.agent_group_id, session.id); + const cleared = deleteOrphanProcessingClaims(useDb); if (cleared > 0) { log.info('Cleared orphan processing claims', { sessionId: session.id, cleared, reason }); } } catch (err) { log.warn('Failed to clear orphan processing claims', { sessionId: session.id, err }); } finally { - outDbRw?.close(); + if (ownsDb) useDb?.close(); } } From a870e7ebf24f2aface4a4359d75955f9ab79917b Mon Sep 17 00:00:00 2001 From: gavrielc Date: Tue, 5 May 2026 15:56:08 +0300 Subject: [PATCH 2/2] fix: keep resetStuckProcessingRows private, restore test wrapper MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The test wrapper forwards the in-memory outDb as the writable handle, avoiding the filesystem reopen that fails in CI. The function stays private — the optional writableOutDb param is an internal detail, not a public API. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/host-sweep.test.ts | 11 ++++++++--- src/host-sweep.ts | 27 ++++++++++----------------- 2 files changed, 18 insertions(+), 20 deletions(-) diff --git a/src/host-sweep.test.ts b/src/host-sweep.test.ts index 155b1b125..bd2e233db 100644 --- a/src/host-sweep.test.ts +++ b/src/host-sweep.test.ts @@ -7,7 +7,12 @@ import Database from 'better-sqlite3'; import { describe, expect, it } from 'vitest'; import { deleteOrphanProcessingClaims, getProcessingClaims } from './db/session-db.js'; -import { ABSOLUTE_CEILING_MS, CLAIM_STUCK_MS, resetStuckProcessingRows, decideStuckAction } from './host-sweep.js'; +import { + ABSOLUTE_CEILING_MS, + CLAIM_STUCK_MS, + _resetStuckProcessingRowsForTesting, + decideStuckAction, +} from './host-sweep.js'; import type { Session } from './types.js'; const BASE = Date.parse('2026-04-20T12:00:00.000Z'); @@ -248,7 +253,7 @@ describe('resetStuckProcessingRows — orphan claim cleanup', () => { // Sanity: the orphan claim is what would trip claim-stuck. expect(getProcessingClaims(outDb)).toHaveLength(1); - resetStuckProcessingRows(inDb, outDb, fakeSession(), 'absolute-ceiling', outDb); + _resetStuckProcessingRowsForTesting(inDb, outDb, fakeSession(), 'absolute-ceiling'); // Regression assertion: orphan claim is gone — next sweep tick will see // an empty claims list and not kill the freshly respawned container. @@ -280,7 +285,7 @@ describe('resetStuckProcessingRows — orphan claim cleanup', () => { .run(claimedAt, future); outDb.prepare("INSERT INTO processing_ack VALUES ('m-2', 'processing', ?)").run(claimedAt); - resetStuckProcessingRows(inDb, outDb, fakeSession(), 'claim-stuck', outDb); + _resetStuckProcessingRowsForTesting(inDb, outDb, fakeSession(), 'claim-stuck'); expect(getProcessingClaims(outDb)).toEqual([]); const row = inDb.prepare('SELECT tries FROM messages_in WHERE id = ?').get('m-2') as { tries: number }; diff --git a/src/host-sweep.ts b/src/host-sweep.ts index b10ee0df2..93a7e8729 100644 --- a/src/host-sweep.ts +++ b/src/host-sweep.ts @@ -250,23 +250,16 @@ function enforceRunningContainerSla( resetStuckProcessingRows(inDb, outDb, session, 'claim-stuck'); } -/** - * Reset retries on inbound rows the container claimed but never acked, and - * delete the orphan `processing_ack` rows so the next sweep tick doesn't - * see them. - * - * Safe to call only when the container that owned `outbound.db` is dead — - * production callers invoke this either in the `!alive` branch or right - * after `killContainer`. Without that guarantee, the orphan-claim delete - * would race the container's own writer. - * - * `writableOutDb` is the same handle outbound writes go through. When - * omitted (typical production path) the function reopens `outbound.db` - * read-write by session path for the delete and closes that handle on - * exit. Callers that already hold a writable handle — including tests - * using in-memory DBs — can pass it in to skip the reopen. - */ -export function resetStuckProcessingRows( +export function _resetStuckProcessingRowsForTesting( + inDb: Database.Database, + outDb: Database.Database, + session: Session, + reason: string, +): void { + resetStuckProcessingRows(inDb, outDb, session, reason, outDb); +} + +function resetStuckProcessingRows( inDb: Database.Database, outDb: Database.Database, session: Session,