From 81ef193e692dcf76a2fcd72a3995dc7edd017aef Mon Sep 17 00:00:00 2001 From: Adam Date: Fri, 24 Apr 2026 13:38:46 +1000 Subject: [PATCH] refactor(session-state): key continuations per provider to survive provider switches MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Before, every provider stored its opaque continuation id under the single outbound.db key `sdk_session_id`. Flipping a session's agent_provider (e.g. Codex → Claude) meant the new provider read the old provider's id at wake, handed it to its own SDK, and got a "No conversation found" error that cost the user one sacrificed message before the stale-session recovery path cleared the id. This reshapes session_state so continuations are keyed `continuation:` instead. Consequences: - Per-provider continuations coexist. Flipping Claude → Codex → Claude resumes the Claude thread exactly where it left off, with the intervening Codex thread also still on file. - No provider ever reads another provider's id. Switching costs no sacrificed message and emits no transient error. - Legacy installs are migrated forward on first startup: migrateLegacyContinuation() adopts any pre-existing `sdk_session_id` row into the current provider's slot (best guess — it was whichever provider ran last), then deletes the legacy row unconditionally so it can't poison a future provider's read. runPollLoop now takes providerName alongside the provider instance, and threads it through processQuery to setContinuation on init. Tests: 9 new tests covering set/get isolation across providers, clear-specificity, legacy-adoption, legacy-always-deleted, prefer-existing-slot-over-legacy, and idempotency of a second migration call. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../agent-runner/src/db/session-state.test.ts | 100 ++++++++++++++++++ .../agent-runner/src/db/session-state.ts | 62 ++++++++--- container/agent-runner/src/index.ts | 1 + .../agent-runner/src/integration.test.ts | 1 + container/agent-runner/src/poll-loop.ts | 28 +++-- 5 files changed, 172 insertions(+), 20 deletions(-) create mode 100644 container/agent-runner/src/db/session-state.test.ts diff --git a/container/agent-runner/src/db/session-state.test.ts b/container/agent-runner/src/db/session-state.test.ts new file mode 100644 index 000000000..b5aa26948 --- /dev/null +++ b/container/agent-runner/src/db/session-state.test.ts @@ -0,0 +1,100 @@ +import { beforeEach, describe, expect, test } from 'bun:test'; + +import { getOutboundDb, initTestSessionDb } from './connection.js'; +import { + clearContinuation, + getContinuation, + migrateLegacyContinuation, + setContinuation, +} from './session-state.js'; + +beforeEach(() => { + initTestSessionDb(); +}); + +function seedLegacy(value: string): void { + getOutboundDb() + .prepare('INSERT INTO session_state (key, value, updated_at) VALUES (?, ?, ?)') + .run('sdk_session_id', value, new Date().toISOString()); +} + +describe('session-state — per-provider continuations', () => { + test('set/get round-trip, case-insensitive provider key', () => { + setContinuation('claude', 'claude-conv-1'); + expect(getContinuation('claude')).toBe('claude-conv-1'); + expect(getContinuation('Claude')).toBe('claude-conv-1'); + expect(getContinuation('CLAUDE')).toBe('claude-conv-1'); + }); + + test('providers are isolated — switching reads the right slot', () => { + setContinuation('claude', 'claude-conv-1'); + setContinuation('codex', 'codex-thread-xyz'); + + expect(getContinuation('claude')).toBe('claude-conv-1'); + expect(getContinuation('codex')).toBe('codex-thread-xyz'); + }); + + test('clearContinuation only affects the specified provider', () => { + setContinuation('claude', 'keep-me'); + setContinuation('codex', 'drop-me'); + + clearContinuation('codex'); + + expect(getContinuation('claude')).toBe('keep-me'); + expect(getContinuation('codex')).toBeUndefined(); + }); + + test('unknown provider returns undefined', () => { + expect(getContinuation('never-used')).toBeUndefined(); + }); +}); + +describe('session-state — legacy migration', () => { + test('adopts legacy value into current provider when current is empty', () => { + seedLegacy('old-session-id'); + + const adopted = migrateLegacyContinuation('claude'); + + expect(adopted).toBe('old-session-id'); + expect(getContinuation('claude')).toBe('old-session-id'); + }); + + test('always deletes legacy row regardless of migration outcome', () => { + seedLegacy('old-session-id'); + setContinuation('claude', 'existing'); + + migrateLegacyContinuation('claude'); + + // After migration the legacy key must be gone, whether or not it was adopted. + // A subsequent migration for a different provider must not see it. + const resultAfterSecondCall = migrateLegacyContinuation('codex'); + expect(resultAfterSecondCall).toBeUndefined(); + }); + + test('prefers existing current-provider slot over legacy', () => { + seedLegacy('legacy-value'); + setContinuation('claude', 'claude-value'); + + const result = migrateLegacyContinuation('claude'); + + expect(result).toBe('claude-value'); + expect(getContinuation('claude')).toBe('claude-value'); + }); + + test('no legacy row — returns current provider value (possibly undefined)', () => { + expect(migrateLegacyContinuation('claude')).toBeUndefined(); + + setContinuation('codex', 'codex-value'); + expect(migrateLegacyContinuation('codex')).toBe('codex-value'); + }); + + test('migration is idempotent on a second call (legacy already gone)', () => { + seedLegacy('once'); + + const first = migrateLegacyContinuation('claude'); + expect(first).toBe('once'); + + const second = migrateLegacyContinuation('claude'); + expect(second).toBe('once'); + }); +}); diff --git a/container/agent-runner/src/db/session-state.ts b/container/agent-runner/src/db/session-state.ts index a199ae113..9e1230926 100644 --- a/container/agent-runner/src/db/session-state.ts +++ b/container/agent-runner/src/db/session-state.ts @@ -2,12 +2,20 @@ * Persistent key/value state for the container. Lives in outbound.db * (container-owned, already scoped per channel/thread). * - * Primary use: remember the SDK session ID so the agent's conversation - * resumes across container restarts. Cleared by /clear. + * Primary use: remember each provider's opaque continuation id so the + * agent's conversation resumes across container restarts. Keyed per + * provider because continuations are provider-private — a Claude + * conversation id means nothing to Codex and vice versa. Switching + * providers is therefore lossless: each provider's last thread stays + * on file and resumes cleanly if the user flips back. */ import { getOutboundDb } from './connection.js'; -const SDK_SESSION_KEY = 'sdk_session_id'; +const LEGACY_KEY = 'sdk_session_id'; + +function continuationKey(providerName: string): string { + return `continuation:${providerName.toLowerCase()}`; +} function getValue(key: string): string | undefined { const row = getOutboundDb() @@ -18,9 +26,7 @@ function getValue(key: string): string | undefined { function setValue(key: string, value: string): void { getOutboundDb() - .prepare( - 'INSERT OR REPLACE INTO session_state (key, value, updated_at) VALUES (?, ?, ?)', - ) + .prepare('INSERT OR REPLACE INTO session_state (key, value, updated_at) VALUES (?, ?, ?)') .run(key, value, new Date().toISOString()); } @@ -28,14 +34,46 @@ function deleteValue(key: string): void { getOutboundDb().prepare('DELETE FROM session_state WHERE key = ?').run(key); } -export function getStoredSessionId(): string | undefined { - return getValue(SDK_SESSION_KEY); +/** + * One-time migration of the pre-per-provider continuation row. + * + * Before this was keyed per provider, continuations lived under the + * single key `sdk_session_id`. On container start, if that legacy row + * exists and the current provider has no continuation of its own, adopt + * the legacy value into the current provider's slot (best-guess — the + * legacy row was written by whatever provider ran last). The legacy row + * is always deleted so future provider flips never re-read a stale id + * through the wrong lens. + * + * Returns the continuation the caller should use at startup (either the + * current provider's existing value, the adopted legacy value, or + * undefined). + */ +export function migrateLegacyContinuation(providerName: string): string | undefined { + const legacy = getValue(LEGACY_KEY); + const currentKey = continuationKey(providerName); + const current = getValue(currentKey); + + if (legacy === undefined) return current; + + // Always drop the legacy row so no future provider reads it. + deleteValue(LEGACY_KEY); + + // Prefer the current provider's own slot if one already exists. + if (current !== undefined) return current; + + setValue(currentKey, legacy); + return legacy; } -export function setStoredSessionId(sessionId: string): void { - setValue(SDK_SESSION_KEY, sessionId); +export function getContinuation(providerName: string): string | undefined { + return getValue(continuationKey(providerName)); } -export function clearStoredSessionId(): void { - deleteValue(SDK_SESSION_KEY); +export function setContinuation(providerName: string, id: string): void { + setValue(continuationKey(providerName), id); +} + +export function clearContinuation(providerName: string): void { + deleteValue(continuationKey(providerName)); } diff --git a/container/agent-runner/src/index.ts b/container/agent-runner/src/index.ts index 236be4caf..90c690ff2 100644 --- a/container/agent-runner/src/index.ts +++ b/container/agent-runner/src/index.ts @@ -95,6 +95,7 @@ async function main(): Promise { await runPollLoop({ provider, + providerName, cwd: CWD, systemContext: { instructions }, }); diff --git a/container/agent-runner/src/integration.test.ts b/container/agent-runner/src/integration.test.ts index 4a8b0918c..3447c3894 100644 --- a/container/agent-runner/src/integration.test.ts +++ b/container/agent-runner/src/integration.test.ts @@ -98,6 +98,7 @@ async function runPollLoopWithTimeout(provider: MockProvider, signal: AbortSigna return Promise.race([ runPollLoop({ provider, + providerName: 'mock', cwd: '/tmp', }), new Promise((_, reject) => { diff --git a/container/agent-runner/src/poll-loop.ts b/container/agent-runner/src/poll-loop.ts index d93bdd30e..bd48db235 100644 --- a/container/agent-runner/src/poll-loop.ts +++ b/container/agent-runner/src/poll-loop.ts @@ -2,7 +2,11 @@ import { findByName, getAllDestinations, type DestinationEntry } from './destina import { getPendingMessages, markProcessing, markCompleted, type MessageInRow } from './db/messages-in.js'; import { writeMessageOut } from './db/messages-out.js'; import { touchHeartbeat, clearStaleProcessingAcks } from './db/connection.js'; -import { getStoredSessionId, setStoredSessionId, clearStoredSessionId } from './db/session-state.js'; +import { + clearContinuation, + migrateLegacyContinuation, + setContinuation, +} from './db/session-state.js'; import { formatMessages, extractRouting, categorizeMessage, isClearCommand, stripInternalTags, type RoutingContext } from './formatter.js'; import type { AgentProvider, AgentQuery, ProviderEvent } from './providers/types.js'; @@ -19,6 +23,12 @@ function generateId(): string { export interface PollLoopConfig { provider: AgentProvider; + /** + * Name of the provider (e.g. "claude", "codex", "opencode"). Used to key + * the stored continuation per-provider so flipping providers doesn't + * resurrect a stale id from a different backend. + */ + providerName: string; cwd: string; systemContext?: { instructions?: string; @@ -39,8 +49,9 @@ export async function runPollLoop(config: PollLoopConfig): Promise { // Resume the agent's prior session from a previous container run if one // was persisted. The continuation is opaque to the poll-loop — the // provider decides how to use it (Claude resumes a .jsonl transcript, - // other providers may reload a thread ID, etc.). - let continuation: string | undefined = getStoredSessionId(); + // other providers may reload a thread ID, etc.). Keyed per-provider so + // a Codex thread id never gets handed to Claude or vice versa. + let continuation: string | undefined = migrateLegacyContinuation(config.providerName); if (continuation) { log(`Resuming agent session ${continuation}`); @@ -94,7 +105,7 @@ export async function runPollLoop(config: PollLoopConfig): Promise { if ((msg.kind === 'chat' || msg.kind === 'chat-sdk') && isClearCommand(msg)) { log('Clearing session (resetting continuation)'); continuation = undefined; - clearStoredSessionId(); + clearContinuation(config.providerName); writeMessageOut({ id: generateId(), kind: 'chat', @@ -160,10 +171,10 @@ export async function runPollLoop(config: PollLoopConfig): Promise { const skippedSet = new Set(skipped); const processingIds = ids.filter((id) => !commandIds.includes(id) && !skippedSet.has(id)); try { - const result = await processQuery(query, routing, processingIds); + const result = await processQuery(query, routing, processingIds, config.providerName); if (result.continuation && result.continuation !== continuation) { continuation = result.continuation; - setStoredSessionId(continuation); + setContinuation(config.providerName, continuation); } } catch (err) { const errMsg = err instanceof Error ? err.message : String(err); @@ -175,7 +186,7 @@ export async function runPollLoop(config: PollLoopConfig): Promise { if (continuation && config.provider.isSessionInvalid(err)) { log(`Stale session detected (${continuation}) — clearing for next retry`); continuation = undefined; - clearStoredSessionId(); + clearContinuation(config.providerName); } // Write error response so the user knows something went wrong @@ -238,6 +249,7 @@ async function processQuery( query: AgentQuery, routing: RoutingContext, initialBatchIds: string[], + providerName: string, ): Promise { let queryContinuation: string | undefined; let done = false; @@ -288,7 +300,7 @@ async function processQuery( // container died between `init` and `result`, the SDK session was // effectively orphaned and the next message started a blank // Claude session with no prior context. - setStoredSessionId(event.continuation); + setContinuation(providerName, event.continuation); } else if (event.type === 'result') { // A result — with or without text — means the turn is done. Mark // the initial batch completed now so the host sweep doesn't see