diff --git a/container/agent-runner/src/db/connection.ts b/container/agent-runner/src/db/connection.ts index 954ebbcf8..1f1c40776 100644 --- a/container/agent-runner/src/db/connection.ts +++ b/container/agent-runner/src/db/connection.ts @@ -38,6 +38,16 @@ export function getOutboundDb(): Database.Database { _outbound.pragma('journal_mode = DELETE'); _outbound.pragma('busy_timeout = 5000'); _outbound.pragma('foreign_keys = ON'); + // Lightweight forward-compat: session_state was added after the initial + // v2 schema, so older session DBs don't have it. Create it on demand + // instead of requiring a formal migration pass. + _outbound.exec(` + CREATE TABLE IF NOT EXISTS session_state ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL, + updated_at TEXT NOT NULL + ); + `); } return _outbound; } @@ -126,6 +136,11 @@ export function initTestSessionDb(): { inbound: Database.Database; outbound: Dat status TEXT NOT NULL, status_changed TEXT NOT NULL ); + CREATE TABLE session_state ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL, + updated_at TEXT NOT NULL + ); `); return { inbound: _inbound, outbound: _outbound }; diff --git a/container/agent-runner/src/db/session-state.ts b/container/agent-runner/src/db/session-state.ts new file mode 100644 index 000000000..a199ae113 --- /dev/null +++ b/container/agent-runner/src/db/session-state.ts @@ -0,0 +1,41 @@ +/** + * Persistent key/value state for the container. Lives in outbound.db + * (container-owned, already scoped per channel/thread). + * + * Primary use: remember the SDK session ID so the agent's conversation + * resumes across container restarts. Cleared by /clear. + */ +import { getOutboundDb } from './connection.js'; + +const SDK_SESSION_KEY = 'sdk_session_id'; + +function getValue(key: string): string | undefined { + const row = getOutboundDb() + .prepare('SELECT value FROM session_state WHERE key = ?') + .get(key) as { value: string } | undefined; + return row?.value; +} + +function setValue(key: string, value: string): void { + getOutboundDb() + .prepare( + 'INSERT OR REPLACE INTO session_state (key, value, updated_at) VALUES (?, ?, ?)', + ) + .run(key, value, new Date().toISOString()); +} + +function deleteValue(key: string): void { + getOutboundDb().prepare('DELETE FROM session_state WHERE key = ?').run(key); +} + +export function getStoredSessionId(): string | undefined { + return getValue(SDK_SESSION_KEY); +} + +export function setStoredSessionId(sessionId: string): void { + setValue(SDK_SESSION_KEY, sessionId); +} + +export function clearStoredSessionId(): void { + deleteValue(SDK_SESSION_KEY); +} diff --git a/container/agent-runner/src/poll-loop.ts b/container/agent-runner/src/poll-loop.ts index 83d03166c..52b383990 100644 --- a/container/agent-runner/src/poll-loop.ts +++ b/container/agent-runner/src/poll-loop.ts @@ -2,6 +2,7 @@ import { findByName, getAllDestinations, type DestinationEntry } from './destina import { getPendingMessages, markProcessing, markCompleted, type MessageInRow } from './db/messages-in.js'; import { writeMessageOut } from './db/messages-out.js'; import { touchHeartbeat, clearStaleProcessingAcks } from './db/connection.js'; +import { getStoredSessionId, setStoredSessionId, clearStoredSessionId } from './db/session-state.js'; import { formatMessages, extractRouting, categorizeMessage, type RoutingContext } from './formatter.js'; import type { AgentProvider, AgentQuery, McpServerConfig, ProviderEvent } from './providers/types.js'; @@ -37,9 +38,17 @@ export interface PollLoopConfig { * 6. Loop */ export async function runPollLoop(config: PollLoopConfig): Promise { - let sessionId: string | undefined; + // Resume the SDK session from a prior container run if one was persisted. + // The SDK's .jsonl transcripts live in the shared ~/.claude mount, so the + // conversation history is already on disk — we just need the session ID + // to tell the SDK which one to continue. + let sessionId: string | undefined = getStoredSessionId(); let resumeAt: string | undefined; + if (sessionId) { + log(`Resuming SDK session ${sessionId}`); + } + // Clear leftover 'processing' acks from a previous crashed container. // This lets the new container re-process those messages. clearStaleProcessingAcks(); @@ -104,6 +113,7 @@ export async function runPollLoop(config: PollLoopConfig): Promise { log('Clearing session (resetting sessionId)'); sessionId = undefined; resumeAt = undefined; + clearStoredSessionId(); writeMessageOut({ id: generateId(), kind: 'chat', @@ -159,10 +169,26 @@ export async function runPollLoop(config: PollLoopConfig): Promise { const processingIds = ids.filter((id) => !commandIds.includes(id)); try { const result = await processQuery(query, routing, config, processingIds); - if (result.sessionId) sessionId = result.sessionId; + if (result.sessionId && result.sessionId !== sessionId) { + sessionId = result.sessionId; + setStoredSessionId(sessionId); + } if (result.resumeAt) resumeAt = result.resumeAt; } catch (err) { - log(`Query error: ${err instanceof Error ? err.message : String(err)}`); + const errMsg = err instanceof Error ? err.message : String(err); + log(`Query error: ${errMsg}`); + + // Stale/corrupt session recovery: if the SDK can't find the session + // we asked it to resume, clear the stored ID so the next attempt + // starts fresh. The transcript .jsonl can go missing after a crash + // mid-write, manual deletion, or disk-full. + if (sessionId && /no conversation found|ENOENT.*\.jsonl|session.*not found/i.test(errMsg)) { + log(`Stale session detected (${sessionId}) — clearing for next retry`); + sessionId = undefined; + resumeAt = undefined; + clearStoredSessionId(); + } + // Write error response so the user knows something went wrong writeMessageOut({ id: generateId(), @@ -170,7 +196,7 @@ export async function runPollLoop(config: PollLoopConfig): Promise { platform_id: routing.platformId, channel_type: routing.channelType, thread_id: routing.threadId, - content: JSON.stringify({ text: `Error: ${err instanceof Error ? err.message : String(err)}` }), + content: JSON.stringify({ text: `Error: ${errMsg}` }), }); } diff --git a/src/db/schema.ts b/src/db/schema.ts index 08bc95d34..2c40d6e3b 100644 --- a/src/db/schema.ts +++ b/src/db/schema.ts @@ -140,4 +140,13 @@ CREATE TABLE processing_ack ( status TEXT NOT NULL, status_changed TEXT NOT NULL ); + +-- Persistent key/value state owned by the container. Used (among other things) +-- to store the SDK session ID so the agent's conversation resumes across +-- container restarts. Cleared by /clear. +CREATE TABLE session_state ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL, + updated_at TEXT NOT NULL +); `;