diff --git a/container/agent-runner/src/db/connection.ts b/container/agent-runner/src/db/connection.ts index 46f4a7033..31f2fb2f0 100644 --- a/container/agent-runner/src/db/connection.ts +++ b/container/agent-runner/src/db/connection.ts @@ -1,31 +1,86 @@ +/** + * Two-DB connection layer. + * + * The session uses two SQLite files to eliminate write contention across + * the host-container mount boundary: + * + * inbound.db — host writes new messages here; container opens READ-ONLY + * outbound.db — container writes responses + acks here; host opens read-only + * + * Each file has exactly one writer, so no cross-process lock contention. + */ import Database from 'better-sqlite3'; +import fs from 'fs'; -const SESSION_DB_PATH = '/workspace/session.db'; +const DEFAULT_INBOUND_PATH = '/workspace/inbound.db'; +const DEFAULT_OUTBOUND_PATH = '/workspace/outbound.db'; +const DEFAULT_HEARTBEAT_PATH = '/workspace/.heartbeat'; -let _db: Database.Database | null = null; +let _inbound: Database.Database | null = null; +let _outbound: Database.Database | null = null; +let _heartbeatPath: string = DEFAULT_HEARTBEAT_PATH; -export function getSessionDb(): Database.Database { - if (!_db) { - _db = new Database(process.env.SESSION_DB_PATH || SESSION_DB_PATH); - _db.pragma('journal_mode = DELETE'); - _db.pragma('busy_timeout = 5000'); - _db.pragma('foreign_keys = ON'); +/** Inbound DB — container opens read-only (host is the sole writer). */ +export function getInboundDb(): Database.Database { + if (!_inbound) { + const dbPath = process.env.SESSION_INBOUND_DB_PATH || DEFAULT_INBOUND_PATH; + _inbound = new Database(dbPath, { readonly: true }); + _inbound.pragma('busy_timeout = 5000'); } - return _db; + return _inbound; } -/** For tests — opens an in-memory DB with session schema. */ -export function initTestSessionDb(): Database.Database { - _db = new Database(':memory:'); - _db.pragma('foreign_keys = ON'); - _db.exec(` +/** Outbound DB — container owns this file (sole writer). */ +export function getOutboundDb(): Database.Database { + if (!_outbound) { + const dbPath = process.env.SESSION_OUTBOUND_DB_PATH || DEFAULT_OUTBOUND_PATH; + _outbound = new Database(dbPath); + _outbound.pragma('journal_mode = DELETE'); + _outbound.pragma('busy_timeout = 5000'); + _outbound.pragma('foreign_keys = ON'); + } + return _outbound; +} + +/** + * Touch the heartbeat file — replaces the old touchProcessing() DB writes. + * The host checks this file's mtime for stale container detection. + * A file touch is cheaper and avoids cross-boundary DB write contention. + */ +export function touchHeartbeat(): void { + const p = process.env.SESSION_HEARTBEAT_PATH || _heartbeatPath; + const now = new Date(); + try { + fs.utimesSync(p, now, now); + } catch { + try { + fs.writeFileSync(p, ''); + } catch { + // Silently ignore — parent dir may not exist (e.g., in-memory test DBs) + } + } +} + +/** + * Clear stale processing_ack entries on container startup. + * If the previous container crashed, 'processing' entries are leftover. + * Clearing them lets the new container re-process those messages. + */ +export function clearStaleProcessingAcks(): void { + getOutboundDb().prepare("DELETE FROM processing_ack WHERE status = 'processing'").run(); +} + +/** For tests — creates in-memory DBs with the session schemas. */ +export function initTestSessionDb(): { inbound: Database.Database; outbound: Database.Database } { + _inbound = new Database(':memory:'); + _inbound.pragma('foreign_keys = ON'); + _inbound.exec(` CREATE TABLE messages_in ( id TEXT PRIMARY KEY, seq INTEGER UNIQUE, kind TEXT NOT NULL, timestamp TEXT NOT NULL, status TEXT DEFAULT 'pending', - status_changed TEXT, process_after TEXT, recurrence TEXT, tries INTEGER DEFAULT 0, @@ -34,12 +89,20 @@ export function initTestSessionDb(): Database.Database { thread_id TEXT, content TEXT NOT NULL ); + CREATE TABLE delivered ( + message_out_id TEXT PRIMARY KEY, + delivered_at TEXT NOT NULL + ); + `); + + _outbound = new Database(':memory:'); + _outbound.pragma('foreign_keys = ON'); + _outbound.exec(` CREATE TABLE messages_out ( id TEXT PRIMARY KEY, seq INTEGER UNIQUE, in_reply_to TEXT, timestamp TEXT NOT NULL, - delivered INTEGER DEFAULT 0, deliver_after TEXT, recurrence TEXT, kind TEXT NOT NULL, @@ -48,11 +111,27 @@ export function initTestSessionDb(): Database.Database { thread_id TEXT, content TEXT NOT NULL ); + CREATE TABLE processing_ack ( + message_id TEXT PRIMARY KEY, + status TEXT NOT NULL, + status_changed TEXT NOT NULL + ); `); - return _db; + + return { inbound: _inbound, outbound: _outbound }; } export function closeSessionDb(): void { - _db?.close(); - _db = null; + _inbound?.close(); + _inbound = null; + _outbound?.close(); + _outbound = null; +} + +/** + * @deprecated Use getInboundDb() / getOutboundDb() instead. + * Kept for backward compatibility during migration. + */ +export function getSessionDb(): Database.Database { + return getInboundDb(); } diff --git a/container/agent-runner/src/db/index.ts b/container/agent-runner/src/db/index.ts index 63c00d32f..cbd0e7e45 100644 --- a/container/agent-runner/src/db/index.ts +++ b/container/agent-runner/src/db/index.ts @@ -1,5 +1,13 @@ -export { getSessionDb, initTestSessionDb, closeSessionDb } from './connection.js'; +export { + getInboundDb, + getOutboundDb, + getSessionDb, + initTestSessionDb, + closeSessionDb, + touchHeartbeat, + clearStaleProcessingAcks, +} from './connection.js'; export { getPendingMessages, markProcessing, markCompleted, markFailed, getMessageIn, findQuestionResponse } from './messages-in.js'; export type { MessageInRow } from './messages-in.js'; -export { writeMessageOut, getUndeliveredMessages, markDelivered } from './messages-out.js'; +export { writeMessageOut, getUndeliveredMessages } from './messages-out.js'; export type { MessageOutRow, WriteMessageOut } from './messages-out.js'; diff --git a/container/agent-runner/src/db/messages-in.ts b/container/agent-runner/src/db/messages-in.ts index 579eb1531..fe2a222c1 100644 --- a/container/agent-runner/src/db/messages-in.ts +++ b/container/agent-runner/src/db/messages-in.ts @@ -1,4 +1,13 @@ -import { getSessionDb } from './connection.js'; +/** + * Inbound message operations (container side). + * + * Reads from inbound.db (host-owned, opened read-only). + * Writes processing status to processing_ack in outbound.db (container-owned). + * + * The container never writes to inbound.db — all status tracking goes through + * processing_ack. The host reads processing_ack to sync message lifecycle. + */ +import { getInboundDb, getOutboundDb } from './connection.js'; export interface MessageInRow { id: string; @@ -6,7 +15,6 @@ export interface MessageInRow { kind: string; timestamp: string; status: string; - status_changed: string | null; process_after: string | null; recurrence: string | null; tries: number; @@ -16,9 +24,16 @@ export interface MessageInRow { content: string; } -/** Fetch all pending messages that are due for processing. */ +/** + * Fetch pending messages that are due for processing. + * Reads from inbound.db (read-only), filters against processing_ack in outbound.db + * to skip messages already picked up by this or a previous container run. + */ export function getPendingMessages(): MessageInRow[] { - return getSessionDb() + const inbound = getInboundDb(); + const outbound = getOutboundDb(); + + const pending = inbound .prepare( `SELECT * FROM messages_in WHERE status = 'pending' @@ -26,49 +41,74 @@ export function getPendingMessages(): MessageInRow[] { ORDER BY timestamp ASC`, ) .all() as MessageInRow[]; + + if (pending.length === 0) return []; + + // Filter out messages already acknowledged in outbound.db + const ackedIds = new Set( + (outbound.prepare('SELECT message_id FROM processing_ack').all() as Array<{ message_id: string }>).map( + (r) => r.message_id, + ), + ); + + return pending.filter((m) => !ackedIds.has(m.id)); } -/** Mark messages as processing. */ +/** Mark messages as processing — writes to processing_ack in outbound.db. */ export function markProcessing(ids: string[]): void { if (ids.length === 0) return; - const db = getSessionDb(); - const stmt = db.prepare("UPDATE messages_in SET status = 'processing', status_changed = datetime('now'), tries = tries + 1 WHERE id = ?"); + const db = getOutboundDb(); + const stmt = db.prepare( + "INSERT OR REPLACE INTO processing_ack (message_id, status, status_changed) VALUES (?, 'processing', datetime('now'))", + ); db.transaction(() => { for (const id of ids) stmt.run(id); })(); } -/** Mark messages as completed. */ +/** Mark messages as completed — updates processing_ack in outbound.db. */ export function markCompleted(ids: string[]): void { if (ids.length === 0) return; - const db = getSessionDb(); - const stmt = db.prepare("UPDATE messages_in SET status = 'completed', status_changed = datetime('now') WHERE id = ?"); + const db = getOutboundDb(); + const stmt = db.prepare( + "INSERT OR REPLACE INTO processing_ack (message_id, status, status_changed) VALUES (?, 'completed', datetime('now'))", + ); db.transaction(() => { for (const id of ids) stmt.run(id); })(); } -/** Update status_changed on processing messages (heartbeat for host idle detection). */ -export function touchProcessing(ids: string[]): void { - if (ids.length === 0) return; - const db = getSessionDb(); - const stmt = db.prepare("UPDATE messages_in SET status_changed = datetime('now') WHERE id = ? AND status = 'processing'"); - for (const id of ids) stmt.run(id); -} - -/** Mark a single message as failed. */ +/** Mark a single message as failed — writes to processing_ack in outbound.db. */ export function markFailed(id: string): void { - getSessionDb().prepare("UPDATE messages_in SET status = 'failed', status_changed = datetime('now') WHERE id = ?").run(id); + getOutboundDb() + .prepare( + "INSERT OR REPLACE INTO processing_ack (message_id, status, status_changed) VALUES (?, 'failed', datetime('now'))", + ) + .run(id); } -/** Get a message by ID. */ +/** Get a message by ID (read from inbound.db). */ export function getMessageIn(id: string): MessageInRow | undefined { - return getSessionDb().prepare('SELECT * FROM messages_in WHERE id = ?').get(id) as MessageInRow | undefined; + return getInboundDb().prepare('SELECT * FROM messages_in WHERE id = ?').get(id) as MessageInRow | undefined; } -/** Find a pending response to a question (by questionId in content). */ +/** + * Find a pending response to a question (by questionId in content). + * Reads from inbound.db, checks processing_ack to skip already-handled responses. + */ export function findQuestionResponse(questionId: string): MessageInRow | undefined { - return getSessionDb() + const inbound = getInboundDb(); + const outbound = getOutboundDb(); + + const response = inbound .prepare("SELECT * FROM messages_in WHERE status = 'pending' AND content LIKE ?") .get(`%"questionId":"${questionId}"%`) as MessageInRow | undefined; + + if (!response) return undefined; + + // Check it hasn't been acked already + const acked = outbound.prepare('SELECT 1 FROM processing_ack WHERE message_id = ?').get(response.id); + if (acked) return undefined; + + return response; } diff --git a/container/agent-runner/src/db/messages-out.ts b/container/agent-runner/src/db/messages-out.ts index df6ebef56..55e078c09 100644 --- a/container/agent-runner/src/db/messages-out.ts +++ b/container/agent-runner/src/db/messages-out.ts @@ -1,11 +1,16 @@ -import { getSessionDb } from './connection.js'; +/** + * Outbound message operations (container side). + * + * Writes to outbound.db (container-owned). + * The host polls this DB (read-only) for undelivered messages. + */ +import { getInboundDb, getOutboundDb } from './connection.js'; export interface MessageOutRow { id: string; seq: number | null; in_reply_to: string | null; timestamp: string; - delivered: number; deliver_after: string | null; recurrence: string | null; kind: string; @@ -27,59 +32,63 @@ export interface WriteMessageOut { content: string; } -/** Write a new outbound message, auto-assigning a seq number. */ +/** + * Write a new outbound message, auto-assigning an odd seq number. + * Container uses odd seq (1, 3, 5...), host uses even (2, 4, 6...) — + * this prevents seq collisions without cross-DB coordination. + */ export function writeMessageOut(msg: WriteMessageOut): number { - const db = getSessionDb(); - const nextSeq = ( - db - .prepare( - `SELECT COALESCE(MAX(seq), 0) + 1 AS next FROM ( - SELECT seq FROM messages_in WHERE seq IS NOT NULL - UNION ALL - SELECT seq FROM messages_out WHERE seq IS NOT NULL - )`, - ) - .get() as { next: number } - ).next; + const outbound = getOutboundDb(); + const inbound = getInboundDb(); - db.prepare( - `INSERT INTO messages_out (id, seq, in_reply_to, timestamp, delivered, deliver_after, recurrence, kind, platform_id, channel_type, thread_id, content) - VALUES (@id, @seq, @in_reply_to, datetime('now'), 0, @deliver_after, @recurrence, @kind, @platform_id, @channel_type, @thread_id, @content)`, - ).run({ - in_reply_to: null, - deliver_after: null, - recurrence: null, - platform_id: null, - channel_type: null, - thread_id: null, - ...msg, - seq: nextSeq, - }); + // Read max seq from both DBs to maintain global ordering. + // Safe: each side only reads the other DB, never writes to it. + const maxOut = (outbound.prepare('SELECT COALESCE(MAX(seq), 0) AS m FROM messages_out').get() as { m: number }).m; + const maxIn = (inbound.prepare('SELECT COALESCE(MAX(seq), 0) AS m FROM messages_in').get() as { m: number }).m; + const max = Math.max(maxOut, maxIn); + const nextSeq = max % 2 === 0 ? max + 1 : max + 2; // next odd + + outbound + .prepare( + `INSERT INTO messages_out (id, seq, in_reply_to, timestamp, deliver_after, recurrence, kind, platform_id, channel_type, thread_id, content) + VALUES (@id, @seq, @in_reply_to, datetime('now'), @deliver_after, @recurrence, @kind, @platform_id, @channel_type, @thread_id, @content)`, + ) + .run({ + in_reply_to: null, + deliver_after: null, + recurrence: null, + platform_id: null, + channel_type: null, + thread_id: null, + ...msg, + seq: nextSeq, + }); return nextSeq; } -/** Look up a message's platform ID by seq number. */ +/** + * Look up a message's platform ID by seq number. + * Searches both inbound and outbound DBs since seq spans both. + */ export function getMessageIdBySeq(seq: number): string | null { - const inRow = getSessionDb().prepare('SELECT id FROM messages_in WHERE seq = ?').get(seq) as { id: string } | undefined; + const inRow = getInboundDb().prepare('SELECT id FROM messages_in WHERE seq = ?').get(seq) as + | { id: string } + | undefined; if (inRow) return inRow.id; - const outRow = getSessionDb().prepare('SELECT id FROM messages_out WHERE seq = ?').get(seq) as { id: string } | undefined; + const outRow = getOutboundDb().prepare('SELECT id FROM messages_out WHERE seq = ?').get(seq) as + | { id: string } + | undefined; return outRow?.id ?? null; } -/** Get undelivered messages (for host polling). */ +/** Get undelivered messages (for host polling — reads from outbound.db). */ export function getUndeliveredMessages(): MessageOutRow[] { - return getSessionDb() + return getOutboundDb() .prepare( `SELECT * FROM messages_out - WHERE delivered = 0 - AND (deliver_after IS NULL OR deliver_after <= datetime('now')) + WHERE (deliver_after IS NULL OR deliver_after <= datetime('now')) ORDER BY timestamp ASC`, ) .all() as MessageOutRow[]; } - -/** Mark a message as delivered. */ -export function markDelivered(id: string): void { - getSessionDb().prepare('UPDATE messages_out SET delivered = 1 WHERE id = ?').run(id); -} diff --git a/container/agent-runner/src/index.ts b/container/agent-runner/src/index.ts index db6523abb..8f91e6e83 100644 --- a/container/agent-runner/src/index.ts +++ b/container/agent-runner/src/index.ts @@ -5,14 +5,18 @@ * No stdin, no stdout markers, no IPC files. * * Config: - * - SESSION_DB_PATH: path to session SQLite DB (default: /workspace/session.db) + * - SESSION_INBOUND_DB_PATH: path to host-owned inbound DB (default: /workspace/inbound.db) + * - SESSION_OUTBOUND_DB_PATH: path to container-owned outbound DB (default: /workspace/outbound.db) + * - SESSION_HEARTBEAT_PATH: heartbeat file path (default: /workspace/.heartbeat) * - AGENT_PROVIDER: 'claude' | 'mock' (default: claude) * - NANOCLAW_ASSISTANT_NAME: assistant name for transcript archiving * - NANOCLAW_ADMIN_USER_ID: admin user ID for permission checks * * Mount structure: * /workspace/ - * session.db ← session SQLite DB + * inbound.db ← host-owned session DB (container reads only) + * outbound.db ← container-owned session DB + * .heartbeat ← container touches for liveness detection * outbox/ ← outbound files * agent/ ← agent group folder (CLAUDE.md, skills, working files) * .claude/ ← Claude SDK session data @@ -80,7 +84,9 @@ async function main(): Promise { command: 'node', args: [mcpServerPath], env: { - SESSION_DB_PATH: process.env.SESSION_DB_PATH || '/workspace/session.db', + SESSION_INBOUND_DB_PATH: process.env.SESSION_INBOUND_DB_PATH || '/workspace/inbound.db', + SESSION_OUTBOUND_DB_PATH: process.env.SESSION_OUTBOUND_DB_PATH || '/workspace/outbound.db', + SESSION_HEARTBEAT_PATH: process.env.SESSION_HEARTBEAT_PATH || '/workspace/.heartbeat', }, }, }, diff --git a/container/agent-runner/src/integration.test.ts b/container/agent-runner/src/integration.test.ts index 63c07b7e8..ae76e87b6 100644 --- a/container/agent-runner/src/integration.test.ts +++ b/container/agent-runner/src/integration.test.ts @@ -1,6 +1,6 @@ import { describe, it, expect, beforeEach, afterEach } from 'vitest'; -import { initTestSessionDb, closeSessionDb, getSessionDb } from './db/connection.js'; +import { initTestSessionDb, closeSessionDb, getInboundDb, getOutboundDb } from './db/connection.js'; import { getUndeliveredMessages } from './db/messages-out.js'; import { getPendingMessages } from './db/messages-in.js'; import { MockProvider } from './providers/mock.js'; @@ -15,7 +15,7 @@ afterEach(() => { }); function insertMessage(id: string, content: object, opts?: { platformId?: string; channelType?: string; threadId?: string }) { - getSessionDb() + getInboundDb() .prepare( `INSERT INTO messages_in (id, kind, timestamp, status, platform_id, channel_type, thread_id, content) VALUES (?, 'chat', datetime('now'), 'pending', ?, ?, ?, ?)`, @@ -25,20 +25,16 @@ function insertMessage(id: string, content: object, opts?: { platformId?: string describe('poll loop integration', () => { it('should pick up a message, process it, and write a response', async () => { - // Insert a message before starting the loop insertMessage('m1', { sender: 'Alice', text: 'What is the meaning of life?' }, { platformId: 'chan-1', channelType: 'discord', threadId: 'thread-1' }); const provider = new MockProvider(() => '42'); - // Run the poll loop in background, abort after it processes const controller = new AbortController(); const loopPromise = runPollLoopWithTimeout(provider, controller.signal, 2000); - // Wait for processing await waitFor(() => getUndeliveredMessages().length > 0, 2000); controller.abort(); - // Verify const out = getUndeliveredMessages(); expect(out).toHaveLength(1); expect(JSON.parse(out[0].content).text).toBe('42'); @@ -47,11 +43,11 @@ describe('poll loop integration', () => { expect(out[0].thread_id).toBe('thread-1'); expect(out[0].in_reply_to).toBe('m1'); - // Input message should be completed + // Input message should be acked (not pending) const pending = getPendingMessages(); expect(pending).toHaveLength(0); - await loopPromise.catch(() => {}); // swallow abort + await loopPromise.catch(() => {}); }); it('should process multiple messages in a batch', async () => { diff --git a/container/agent-runner/src/mcp-tools/interactive.ts b/container/agent-runner/src/mcp-tools/interactive.ts index dbd6ad646..f726876dc 100644 --- a/container/agent-runner/src/mcp-tools/interactive.ts +++ b/container/agent-runner/src/mcp-tools/interactive.ts @@ -4,7 +4,7 @@ * ask_user_question is a blocking tool call — it writes a messages_out row * with a question card, then polls messages_in for the response. */ -import { getSessionDb } from '../db/connection.js'; +import { findQuestionResponse, markCompleted } from '../db/messages-in.js'; import { writeMessageOut } from '../db/messages-out.js'; import type { McpToolDefinition } from './types.js'; @@ -64,7 +64,7 @@ export const askUserQuestion: McpToolDefinition = { const questionId = generateId(); const r = routing(); - // Write question card to messages_out + // Write question card to outbound.db writeMessageOut({ id: questionId, kind: 'chat-sdk', @@ -81,19 +81,15 @@ export const askUserQuestion: McpToolDefinition = { log(`ask_user_question: ${questionId} → "${question}" [${options.join(', ')}]`); - // Poll for response in messages_in + // Poll for response in inbound.db (host writes the response there) const deadline = Date.now() + timeout; while (Date.now() < deadline) { - const response = getSessionDb() - .prepare("SELECT content FROM messages_in WHERE kind = 'system' AND content LIKE ? AND status = 'pending' LIMIT 1") - .get(`%"questionId":"${questionId}"%`) as { content: string } | undefined; + const response = findQuestionResponse(questionId); if (response) { const parsed = JSON.parse(response.content); - // Mark the response as completed so the poll loop doesn't pick it up - getSessionDb() - .prepare("UPDATE messages_in SET status = 'completed', status_changed = datetime('now') WHERE kind = 'system' AND content LIKE ?") - .run(`%"questionId":"${questionId}"%`); + // Mark the response as completed via processing_ack (outbound.db) + markCompleted([response.id]); log(`ask_user_question response: ${questionId} → ${parsed.selectedOption}`); return ok(parsed.selectedOption); diff --git a/container/agent-runner/src/mcp-tools/scheduling.ts b/container/agent-runner/src/mcp-tools/scheduling.ts index 3f3d0d0ec..be3b576d5 100644 --- a/container/agent-runner/src/mcp-tools/scheduling.ts +++ b/container/agent-runner/src/mcp-tools/scheduling.ts @@ -1,10 +1,12 @@ /** * Scheduling MCP tools: schedule_task, list_tasks, cancel_task, pause_task, resume_task. * - * Tasks are messages_in rows with process_after timestamps and optional recurrence. - * The host sweep detects due tasks and wakes the container. + * With the two-DB split, the container cannot write to inbound.db (host-owned). + * Scheduling operations are sent as system actions via messages_out — the host + * reads them during delivery and applies the changes to inbound.db. */ -import { getSessionDb } from '../db/connection.js'; +import { getInboundDb } from '../db/connection.js'; +import { writeMessageOut } from '../db/messages-out.js'; import type { McpToolDefinition } from './types.js'; function log(msg: string): void { @@ -57,22 +59,22 @@ export const scheduleTask: McpToolDefinition = { const recurrence = (args.recurrence as string) || null; const script = (args.script as string) || null; - const content = JSON.stringify({ prompt, script }); - - getSessionDb() - .prepare( - `INSERT INTO messages_in (id, timestamp, status, status_changed, tries, process_after, recurrence, kind, platform_id, channel_type, thread_id, content) - VALUES (@id, datetime('now'), 'pending', datetime('now'), 0, @process_after, @recurrence, 'task', @platform_id, @channel_type, @thread_id, @content)`, - ) - .run({ - id, - process_after: processAfter, + // Write as a system action — host will insert into inbound.db + writeMessageOut({ + id, + kind: 'system', + platform_id: r.platform_id, + channel_type: r.channel_type, + thread_id: r.thread_id, + content: JSON.stringify({ + action: 'schedule_task', + taskId: id, + prompt, + script, + processAfter, recurrence, - platform_id: r.platform_id, - channel_type: r.channel_type, - thread_id: r.thread_id, - content, - }); + }), + }); log(`schedule_task: ${id} at ${processAfter}${recurrence ? ` (recurring: ${recurrence})` : ''}`); return ok(`Task scheduled (id: ${id}, runs at: ${processAfter}${recurrence ? `, recurrence: ${recurrence}` : ''})`); @@ -92,13 +94,14 @@ export const listTasks: McpToolDefinition = { }, async handler(args) { const status = args.status as string | undefined; + const db = getInboundDb(); let rows; if (status) { - rows = getSessionDb() + rows = db .prepare("SELECT id, status, process_after, recurrence, content FROM messages_in WHERE kind = 'task' AND status = ? ORDER BY process_after ASC") .all(status); } else { - rows = getSessionDb() + rows = db .prepare("SELECT id, status, process_after, recurrence, content FROM messages_in WHERE kind = 'task' AND status NOT IN ('completed') ORDER BY process_after ASC") .all(); } @@ -131,14 +134,15 @@ export const cancelTask: McpToolDefinition = { const taskId = args.taskId as string; if (!taskId) return err('taskId is required'); - const result = getSessionDb() - .prepare("UPDATE messages_in SET status = 'completed', status_changed = datetime('now') WHERE id = ? AND kind = 'task' AND status IN ('pending', 'paused')") - .run(taskId); - - if (result.changes === 0) return err(`Task not found or not cancellable: ${taskId}`); + // Write as a system action — host will update inbound.db + writeMessageOut({ + id: `sys-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`, + kind: 'system', + content: JSON.stringify({ action: 'cancel_task', taskId }), + }); log(`cancel_task: ${taskId}`); - return ok(`Task cancelled: ${taskId}`); + return ok(`Task cancellation requested: ${taskId}`); }, }; @@ -158,14 +162,14 @@ export const pauseTask: McpToolDefinition = { const taskId = args.taskId as string; if (!taskId) return err('taskId is required'); - const result = getSessionDb() - .prepare("UPDATE messages_in SET status = 'paused', status_changed = datetime('now') WHERE id = ? AND kind = 'task' AND status = 'pending'") - .run(taskId); - - if (result.changes === 0) return err(`Task not found or not pausable: ${taskId}`); + writeMessageOut({ + id: `sys-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`, + kind: 'system', + content: JSON.stringify({ action: 'pause_task', taskId }), + }); log(`pause_task: ${taskId}`); - return ok(`Task paused: ${taskId}`); + return ok(`Task pause requested: ${taskId}`); }, }; @@ -185,14 +189,14 @@ export const resumeTask: McpToolDefinition = { const taskId = args.taskId as string; if (!taskId) return err('taskId is required'); - const result = getSessionDb() - .prepare("UPDATE messages_in SET status = 'pending', status_changed = datetime('now') WHERE id = ? AND kind = 'task' AND status = 'paused'") - .run(taskId); - - if (result.changes === 0) return err(`Task not found or not paused: ${taskId}`); + writeMessageOut({ + id: `sys-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`, + kind: 'system', + content: JSON.stringify({ action: 'resume_task', taskId }), + }); log(`resume_task: ${taskId}`); - return ok(`Task resumed: ${taskId}`); + return ok(`Task resume requested: ${taskId}`); }, }; diff --git a/container/agent-runner/src/poll-loop.test.ts b/container/agent-runner/src/poll-loop.test.ts index 03fc0c7af..718be534e 100644 --- a/container/agent-runner/src/poll-loop.test.ts +++ b/container/agent-runner/src/poll-loop.test.ts @@ -1,6 +1,6 @@ import { describe, it, expect, beforeEach, afterEach } from 'vitest'; -import { initTestSessionDb, closeSessionDb, getSessionDb } from './db/connection.js'; +import { initTestSessionDb, closeSessionDb, getInboundDb, getOutboundDb } from './db/connection.js'; import { getPendingMessages, markCompleted } from './db/messages-in.js'; import { getUndeliveredMessages } from './db/messages-out.js'; import { formatMessages, extractRouting } from './formatter.js'; @@ -15,7 +15,7 @@ afterEach(() => { }); function insertMessage(id: string, kind: string, content: object, opts?: { processAfter?: string }) { - getSessionDb() + getInboundDb() .prepare( `INSERT INTO messages_in (id, kind, timestamp, status, process_after, content) VALUES (?, ?, datetime('now'), 'pending', ?, ?)`, @@ -86,7 +86,7 @@ describe('formatter', () => { describe('routing', () => { it('should extract routing from messages', () => { - getSessionDb() + getInboundDb() .prepare( `INSERT INTO messages_in (id, kind, timestamp, status, platform_id, channel_type, thread_id, content) VALUES ('m1', 'chat', datetime('now'), 'pending', 'chan-123', 'discord', 'thread-456', '{"text":"hi"}')`, @@ -113,7 +113,6 @@ describe('mock provider', () => { }); const events: Array<{ type: string }> = []; - // End the stream after initial response setTimeout(() => query.end(), 50); for await (const event of query.events) { @@ -138,7 +137,6 @@ describe('mock provider', () => { const events: Array<{ type: string; text?: string }> = []; - // Push a follow-up after a short delay, then end setTimeout(() => query.push('Second'), 30); setTimeout(() => query.end(), 60); @@ -155,7 +153,7 @@ describe('mock provider', () => { describe('end-to-end with mock provider', () => { it('should read messages_in, process with mock provider, write messages_out', async () => { - // Insert a chat message + // Insert a chat message into inbound DB insertMessage('m1', 'chat', { sender: 'User', text: 'What is 2+2?' }); // Read and process @@ -198,11 +196,11 @@ describe('end-to-end with mock provider', () => { markCompleted(['m1']); - // Verify: message was processed + // Verify: message was processed (not pending, acked in processing_ack) const processed = getPendingMessages(); expect(processed).toHaveLength(0); - // Verify: response was written + // Verify: response was written to outbound DB const outMessages = getUndeliveredMessages(); expect(outMessages).toHaveLength(1); expect(JSON.parse(outMessages[0].content).text).toBe('The answer is 4'); diff --git a/container/agent-runner/src/poll-loop.ts b/container/agent-runner/src/poll-loop.ts index 21fc8e109..149083ead 100644 --- a/container/agent-runner/src/poll-loop.ts +++ b/container/agent-runner/src/poll-loop.ts @@ -1,5 +1,6 @@ -import { getPendingMessages, markProcessing, markCompleted, touchProcessing, type MessageInRow } from './db/messages-in.js'; +import { getPendingMessages, markProcessing, markCompleted, type MessageInRow } from './db/messages-in.js'; import { writeMessageOut } from './db/messages-out.js'; +import { touchHeartbeat, clearStaleProcessingAcks } from './db/connection.js'; import { formatMessages, extractRouting, categorizeMessage, type RoutingContext } from './formatter.js'; import type { AgentProvider, AgentQuery, McpServerConfig, ProviderEvent } from './providers/types.js'; @@ -38,6 +39,10 @@ export async function runPollLoop(config: PollLoopConfig): Promise { let sessionId: string | undefined; let resumeAt: string | undefined; + // Clear leftover 'processing' acks from a previous crashed container. + // This lets the new container re-process those messages. + clearStaleProcessingAcks(); + let pollCount = 0; while (true) { // Skip system messages — they're responses for MCP tools (e.g., ask_user_question) @@ -260,7 +265,7 @@ async function processQuery(query: AgentQuery, routing: RoutingContext, config: for await (const event of query.events) { lastEventTime = Date.now(); handleEvent(event, routing); - touchProcessing(processingIds); + touchHeartbeat(); if (event.type === 'init') { querySessionId = event.sessionId; diff --git a/container/agent-runner/tsconfig.json b/container/agent-runner/tsconfig.json index d71b5ffff..008fdc990 100644 --- a/container/agent-runner/tsconfig.json +++ b/container/agent-runner/tsconfig.json @@ -11,5 +11,5 @@ "declaration": true }, "include": ["src/**/*"], - "exclude": ["node_modules", "dist", "src/**/*.test.ts"] + "exclude": ["node_modules", "dist", "src/**/*.test.ts", "src/v1/**/*"] } diff --git a/scripts/test-v2-host.ts b/scripts/test-v2-host.ts index d047d5f15..9ebe297de 100644 --- a/scripts/test-v2-host.ts +++ b/scripts/test-v2-host.ts @@ -2,9 +2,9 @@ * Real end-to-end test of v2: host router → Docker container → agent-runner → delivery. * * 1. Init central DB with agent group + messaging group + wiring - * 2. Route an inbound message (creates session, writes messages_in, spawns container) - * 3. Container runs v2 agent-runner, polls session DB, queries Claude - * 4. Poll session DB for messages_out response + * 2. Route an inbound message (creates session, writes inbound.db, spawns container) + * 3. Container runs v2 agent-runner, polls inbound.db, queries Claude, writes outbound.db + * 4. Poll outbound.db for messages_out response * * Usage: npx tsx scripts/test-v2-host.ts */ @@ -71,7 +71,7 @@ console.log('\n=== Step 2: Route inbound message ==='); import { routeInbound } from '../src/router.js'; import { findSession } from '../src/db/sessions.js'; -import { sessionDbPath } from '../src/session-manager.js'; +import { inboundDbPath, outboundDbPath } from '../src/session-manager.js'; await routeInbound({ channelType: 'test', @@ -96,8 +96,10 @@ if (!session) { console.log(`✓ Session: ${session.id}`); console.log(`✓ Container status: ${session.container_status}`); -const sessDbPath = sessionDbPath('ag-e2e', session.id); -console.log(`✓ Session DB: ${sessDbPath}`); +const inDbPath = inboundDbPath('ag-e2e', session.id); +const outDbPath = outboundDbPath('ag-e2e', session.id); +console.log(`✓ Inbound DB: ${inDbPath}`); +console.log(`✓ Outbound DB: ${outDbPath}`); // --- Step 3: Wait for response --- console.log('\n=== Step 3: Waiting for Claude response... ==='); @@ -107,7 +109,7 @@ const TIMEOUT_MS = 120_000; const checkForResponse = (): boolean => { try { - const db = new Database(sessDbPath, { readonly: true }); + const db = new Database(outDbPath, { readonly: true }); const out = db.prepare('SELECT * FROM messages_out').all() as Array>; db.close(); return out.length > 0; @@ -147,22 +149,36 @@ process.exit(0); function printState() { try { - const db = new Database(sessDbPath, { readonly: true }); - const inRows = db.prepare('SELECT * FROM messages_in').all() as Array>; - const outRows = db.prepare('SELECT * FROM messages_out').all() as Array>; - db.close(); + const inDb = new Database(inDbPath, { readonly: true }); + const inRows = inDb.prepare('SELECT * FROM messages_in').all() as Array>; + inDb.close(); - console.log('\nmessages_in:'); + console.log('\nmessages_in (inbound.db):'); for (const r of inRows) { console.log(` [${r.id}] status=${r.status} kind=${r.kind}`); } - console.log('\nmessages_out:'); + } catch (err) { + console.log(` (could not read inbound DB: ${err})`); + } + + try { + const outDb = new Database(outDbPath, { readonly: true }); + const outRows = outDb.prepare('SELECT * FROM messages_out').all() as Array>; + const ackRows = outDb.prepare('SELECT * FROM processing_ack').all() as Array>; + outDb.close(); + + console.log('\nmessages_out (outbound.db):'); for (const r of outRows) { const content = JSON.parse(r.content as string); console.log(` [${r.id}] kind=${r.kind}`); console.log(` → ${content.text}`); } + + console.log('\nprocessing_ack (outbound.db):'); + for (const r of ackRows) { + console.log(` [${r.message_id}] status=${r.status} changed=${r.status_changed}`); + } } catch (err) { - console.log(` (could not read session DB: ${err})`); + console.log(` (could not read outbound DB: ${err})`); } } diff --git a/src/channels/channel-registry.test.ts b/src/channels/channel-registry.test.ts index d5d0fa023..2fc183b67 100644 --- a/src/channels/channel-registry.test.ts +++ b/src/channels/channel-registry.test.ts @@ -162,7 +162,7 @@ describe('channel + router integration', () => { it('should route inbound message from adapter to session DB', async () => { const { routeInbound } = await import('../router.js'); const { findSession } = await import('../db/sessions.js'); - const { sessionDbPath } = await import('../session-manager.js'); + const { inboundDbPath } = await import('../session-manager.js'); // Simulate what the adapter bridge does: stringify content, call routeInbound const inboundContent = { sender: 'TestUser', senderId: 'u1', text: 'Hello from adapter', isFromMe: false }; @@ -183,7 +183,7 @@ describe('channel + router integration', () => { const session = findSession('mg-1', null); expect(session).toBeDefined(); - const dbPath = sessionDbPath('ag-1', session!.id); + const dbPath = inboundDbPath('ag-1', session!.id); const db = new Database(dbPath); const rows = db.prepare('SELECT * FROM messages_in').all() as Array<{ id: string; content: string }>; db.close(); diff --git a/src/container-runner.ts b/src/container-runner.ts index cdbfadc3d..c3dce4d0c 100644 --- a/src/container-runner.ts +++ b/src/container-runner.ts @@ -19,7 +19,6 @@ import { markContainerIdle, markContainerRunning, markContainerStopped, - sessionDbPath, sessionDir, } from './session-manager.js'; import type { AgentGroup, Session } from './types.js'; @@ -135,7 +134,7 @@ function buildMounts(agentGroup: AgentGroup, session: Session): VolumeMount[] { const sessDir = sessionDir(agentGroup.id, session.id); const groupDir = path.resolve(GROUPS_DIR, agentGroup.folder); - // Session folder at /workspace (contains session.db, outbox/, .claude/) + // Session folder at /workspace (contains inbound.db, outbound.db, outbox/, .claude/) mounts.push({ hostPath: sessDir, containerPath: '/workspace', readonly: false }); // Agent group folder at /workspace/agent @@ -226,7 +225,10 @@ async function buildContainerArgs( // Environment args.push('-e', `TZ=${TIMEZONE}`); args.push('-e', `AGENT_PROVIDER=${session.agent_provider || agentGroup.agent_provider || 'claude'}`); - args.push('-e', `SESSION_DB_PATH=/workspace/session.db`); + // Two-DB split: container reads inbound.db, writes outbound.db + args.push('-e', 'SESSION_INBOUND_DB_PATH=/workspace/inbound.db'); + args.push('-e', 'SESSION_OUTBOUND_DB_PATH=/workspace/outbound.db'); + args.push('-e', 'SESSION_HEARTBEAT_PATH=/workspace/.heartbeat'); // Pass admin user ID and assistant name from messaging group/agent group if (session.messaging_group_id) { @@ -239,10 +241,22 @@ async function buildContainerArgs( args.push('-e', `NANOCLAW_ASSISTANT_NAME=${agentGroup.name}`); } - // OneCLI gateway - const onecliApplied = await onecli.applyContainerConfig(args, { addHostMapping: false, agent: agentIdentifier }); - if (onecliApplied) { - log.debug('OneCLI gateway applied', { containerName }); + // OneCLI gateway — injects HTTPS_PROXY + certs so container API calls + // are routed through the agent vault for credential injection. + // Must ensureAgent first for non-admin groups, otherwise applyContainerConfig + // rejects the unknown agent identifier and returns false. + try { + if (agentIdentifier) { + await onecli.ensureAgent({ name: agentGroup.name, identifier: agentIdentifier }); + } + const onecliApplied = await onecli.applyContainerConfig(args, { addHostMapping: false, agent: agentIdentifier }); + if (onecliApplied) { + log.info('OneCLI gateway applied', { containerName }); + } else { + log.warn('OneCLI gateway not applied — container will have no credentials', { containerName }); + } + } catch (err) { + log.warn('OneCLI gateway error — container will have no credentials', { containerName, err }); } // Host gateway diff --git a/src/db/schema.ts b/src/db/schema.ts index bf8ff192c..b54210d56 100644 --- a/src/db/schema.ts +++ b/src/db/schema.ts @@ -69,16 +69,21 @@ CREATE TABLE pending_questions ( `; /** - * Session DB schema — created fresh by the host for each session. + * Session DB schemas — split into two files so each has exactly one writer. + * This eliminates SQLite write contention across the host-container mount boundary. + * + * inbound.db — host writes, container reads (read-only mount or open read-only) + * outbound.db — container writes, host reads (read-only open) */ -export const SESSION_SCHEMA = ` + +/** Host-owned: inbound messages + delivery tracking. */ +export const INBOUND_SCHEMA = ` CREATE TABLE messages_in ( id TEXT PRIMARY KEY, seq INTEGER UNIQUE, kind TEXT NOT NULL, timestamp TEXT NOT NULL, status TEXT DEFAULT 'pending', - status_changed TEXT, process_after TEXT, recurrence TEXT, tries INTEGER DEFAULT 0, @@ -88,12 +93,21 @@ CREATE TABLE messages_in ( content TEXT NOT NULL ); +-- Host tracks which messages_out IDs have been delivered. +-- Avoids writing to outbound.db (container-owned). +CREATE TABLE delivered ( + message_out_id TEXT PRIMARY KEY, + delivered_at TEXT NOT NULL +); +`; + +/** Container-owned: outbound messages + processing acknowledgments. */ +export const OUTBOUND_SCHEMA = ` CREATE TABLE messages_out ( id TEXT PRIMARY KEY, seq INTEGER UNIQUE, in_reply_to TEXT, timestamp TEXT NOT NULL, - delivered INTEGER DEFAULT 0, deliver_after TEXT, recurrence TEXT, kind TEXT NOT NULL, @@ -102,4 +116,13 @@ CREATE TABLE messages_out ( thread_id TEXT, content TEXT NOT NULL ); + +-- Container tracks processing status here instead of updating messages_in. +-- Host reads this to know which messages have been processed. +-- On container startup, stale 'processing' entries are cleared (crash recovery). +CREATE TABLE processing_ack ( + message_id TEXT PRIMARY KEY, + status TEXT NOT NULL, + status_changed TEXT NOT NULL +); `; diff --git a/src/delivery.ts b/src/delivery.ts index 4a020f8d8..d74df751f 100644 --- a/src/delivery.ts +++ b/src/delivery.ts @@ -1,6 +1,11 @@ /** * Outbound message delivery. - * Polls active session DBs for undelivered messages_out, delivers through channel adapters. + * Polls session outbound DBs for undelivered messages, delivers through channel adapters. + * + * Two-DB architecture: + * - Reads messages_out from outbound.db (container-owned, opened read-only) + * - Tracks delivery in inbound.db's `delivered` table (host-owned) + * - Never writes to outbound.db — preserves single-writer-per-file invariant */ import Database from 'better-sqlite3'; import fs from 'fs'; @@ -9,7 +14,7 @@ import path from 'path'; import { getRunningSessions, getActiveSessions, createPendingQuestion } from './db/sessions.js'; import { getAgentGroup } from './db/agent-groups.js'; import { log } from './log.js'; -import { openSessionDb, sessionDir } from './session-manager.js'; +import { openInboundDb, openOutboundDb, sessionDir, inboundDbPath } from './session-manager.js'; import { resetContainerIdleTimer } from './container-runner.js'; import type { OutboundFile } from './channels/adapter.js'; import type { Session } from './types.js'; @@ -85,19 +90,21 @@ async function deliverSessionMessages(session: Session): Promise { const agentGroup = getAgentGroup(session.agent_group_id); if (!agentGroup) return; - let db: Database.Database; + let outDb: Database.Database; + let inDb: Database.Database; try { - db = openSessionDb(agentGroup.id, session.id); + outDb = openOutboundDb(agentGroup.id, session.id); + inDb = openInboundDb(agentGroup.id, session.id); } catch { - return; // Session DB might not exist yet + return; // DBs might not exist yet } try { - const undelivered = db + // Read all due messages from outbound.db (read-only) + const allDue = outDb .prepare( `SELECT * FROM messages_out - WHERE delivered = 0 - AND (deliver_after IS NULL OR deliver_after <= datetime('now')) + WHERE (deliver_after IS NULL OR deliver_after <= datetime('now')) ORDER BY timestamp ASC`, ) .all() as Array<{ @@ -109,19 +116,32 @@ async function deliverSessionMessages(session: Session): Promise { content: string; }>; + if (allDue.length === 0) return; + + // Filter out already-delivered messages using inbound.db's delivered table + const deliveredIds = new Set( + (inDb.prepare('SELECT message_out_id FROM delivered').all() as Array<{ message_out_id: string }>).map( + (r) => r.message_out_id, + ), + ); + const undelivered = allDue.filter((m) => !deliveredIds.has(m.id)); if (undelivered.length === 0) return; for (const msg of undelivered) { try { - await deliverMessage(msg, session); - db.prepare('UPDATE messages_out SET delivered = 1 WHERE id = ?').run(msg.id); + await deliverMessage(msg, session, inDb); + // Track delivery in inbound.db (host-owned) — not outbound.db + inDb.prepare("INSERT OR IGNORE INTO delivered (message_out_id, delivered_at) VALUES (?, datetime('now'))").run( + msg.id, + ); resetContainerIdleTimer(session.id); } catch (err) { log.error('Failed to deliver message', { messageId: msg.id, sessionId: session.id, err }); } } } finally { - db.close(); + outDb.close(); + inDb.close(); } } @@ -135,6 +155,7 @@ async function deliverMessage( content: string; }, session: Session, + inDb: Database.Database, ): Promise { if (!deliveryAdapter) { log.warn('No delivery adapter configured, dropping message', { id: msg.id }); @@ -143,10 +164,9 @@ async function deliverMessage( const content = JSON.parse(msg.content); - // System actions — handle internally + // System actions — handle internally (schedule_task, cancel_task, etc.) if (msg.kind === 'system') { - log.info('System action from agent', { sessionId: session.id, action: content.action }); - // TODO: handle system actions (register_group, reset_session, etc.) + await handleSystemAction(content, session, inDb); return; } @@ -207,6 +227,84 @@ async function deliverMessage( } } +/** + * Handle system actions from the container agent. + * These are written to messages_out because the container can't write to inbound.db. + * The host applies them to inbound.db here. + */ +async function handleSystemAction( + content: Record, + session: Session, + inDb: Database.Database, +): Promise { + const action = content.action as string; + log.info('System action from agent', { sessionId: session.id, action }); + + switch (action) { + case 'schedule_task': { + const taskId = content.taskId as string; + const prompt = content.prompt as string; + const script = content.script as string | null; + const processAfter = content.processAfter as string; + const recurrence = (content.recurrence as string) || null; + + // Compute next even seq for host-owned inbound.db + const maxSeq = ( + inDb.prepare('SELECT COALESCE(MAX(seq), 0) AS m FROM messages_in').get() as { m: number } + ).m; + const nextSeq = maxSeq < 2 ? 2 : maxSeq + 2 - (maxSeq % 2); + + inDb + .prepare( + `INSERT INTO messages_in (id, seq, timestamp, status, tries, process_after, recurrence, kind, platform_id, channel_type, thread_id, content) + VALUES (@id, @seq, datetime('now'), 'pending', 0, @process_after, @recurrence, 'task', @platform_id, @channel_type, @thread_id, @content)`, + ) + .run({ + id: taskId, + seq: nextSeq, + process_after: processAfter, + recurrence, + platform_id: content.platformId ?? null, + channel_type: content.channelType ?? null, + thread_id: content.threadId ?? null, + content: JSON.stringify({ prompt, script }), + }); + log.info('Scheduled task created', { taskId, processAfter, recurrence }); + break; + } + + case 'cancel_task': { + const taskId = content.taskId as string; + inDb + .prepare("UPDATE messages_in SET status = 'completed' WHERE id = ? AND kind = 'task' AND status IN ('pending', 'paused')") + .run(taskId); + log.info('Task cancelled', { taskId }); + break; + } + + case 'pause_task': { + const taskId = content.taskId as string; + inDb + .prepare("UPDATE messages_in SET status = 'paused' WHERE id = ? AND kind = 'task' AND status = 'pending'") + .run(taskId); + log.info('Task paused', { taskId }); + break; + } + + case 'resume_task': { + const taskId = content.taskId as string; + inDb + .prepare("UPDATE messages_in SET status = 'pending' WHERE id = ? AND kind = 'task' AND status = 'paused'") + .run(taskId); + log.info('Task resumed', { taskId }); + break; + } + + default: + log.warn('Unknown system action', { action }); + } +} + export function stopDeliveryPolls(): void { activePolling = false; sweepPolling = false; diff --git a/src/host-core.test.ts b/src/host-core.test.ts index 03ddd98b2..9dc711e4c 100644 --- a/src/host-core.test.ts +++ b/src/host-core.test.ts @@ -21,7 +21,8 @@ import { writeSessionMessage, initSessionFolder, sessionDir, - sessionDbPath, + inboundDbPath, + outboundDbPath, sessionsBaseDir, } from './session-manager.js'; import { getSession, findSession } from './db/sessions.js'; @@ -84,22 +85,29 @@ describe('session manager', () => { }); }); - it('should create session folder and DB', () => { + it('should create session folder and both DBs', () => { initSessionFolder('ag-1', 'sess-test'); const dir = sessionDir('ag-1', 'sess-test'); expect(fs.existsSync(dir)).toBe(true); expect(fs.existsSync(path.join(dir, 'outbox'))).toBe(true); - const dbPath = sessionDbPath('ag-1', 'sess-test'); - expect(fs.existsSync(dbPath)).toBe(true); + // Verify inbound.db + const inPath = inboundDbPath('ag-1', 'sess-test'); + expect(fs.existsSync(inPath)).toBe(true); + const inDb = new Database(inPath); + const inTables = inDb.prepare("SELECT name FROM sqlite_master WHERE type='table'").all() as Array<{ name: string }>; + expect(inTables.map((t) => t.name)).toContain('messages_in'); + expect(inTables.map((t) => t.name)).toContain('delivered'); + inDb.close(); - // Verify session DB has the right tables - const db = new Database(dbPath); - const tables = db.prepare("SELECT name FROM sqlite_master WHERE type='table'").all() as Array<{ name: string }>; - const tableNames = tables.map((t) => t.name); - expect(tableNames).toContain('messages_in'); - expect(tableNames).toContain('messages_out'); - db.close(); + // Verify outbound.db + const outPath = outboundDbPath('ag-1', 'sess-test'); + expect(fs.existsSync(outPath)).toBe(true); + const outDb = new Database(outPath); + const outTables = outDb.prepare("SELECT name FROM sqlite_master WHERE type='table'").all() as Array<{ name: string }>; + expect(outTables.map((t) => t.name)).toContain('messages_out'); + expect(outTables.map((t) => t.name)).toContain('processing_ack'); + outDb.close(); }); it('should resolve to existing session (shared mode)', () => { @@ -124,7 +132,7 @@ describe('session manager', () => { expect(s2.id).toBe(s1.id); }); - it('should write message to session DB', () => { + it('should write message to inbound DB', () => { const { session } = resolveSession('ag-1', 'mg-1', null, 'shared'); writeSessionMessage('ag-1', session.id, { @@ -137,8 +145,8 @@ describe('session manager', () => { content: JSON.stringify({ sender: 'User', text: 'Hello' }), }); - // Read from the session DB - const dbPath = sessionDbPath('ag-1', session.id); + // Read from the inbound DB + const dbPath = inboundDbPath('ag-1', session.id); const db = new Database(dbPath); const rows = db.prepare('SELECT * FROM messages_in').all() as Array<{ id: string; @@ -223,8 +231,8 @@ describe('router', () => { const session = findSession('mg-1', null); expect(session).toBeDefined(); - // Verify message was written to session DB - const dbPath = sessionDbPath('ag-1', session!.id); + // Verify message was written to inbound DB + const dbPath = inboundDbPath('ag-1', session!.id); const db = new Database(dbPath); const rows = db.prepare('SELECT * FROM messages_in').all() as Array<{ id: string; content: string }>; db.close(); @@ -239,8 +247,6 @@ describe('router', () => { it('should auto-create messaging group for unknown platform', async () => { const { routeInbound } = await import('./router.js'); - // This platform ID isn't registered — but since there's no agent configured for it, - // it should create the messaging group but not route (no agents configured) const event: InboundEvent = { channelType: 'slack', platformId: 'C-NEW-CHANNEL', @@ -255,7 +261,6 @@ describe('router', () => { await routeInbound(event); - // Messaging group should be created const { getMessagingGroupByPlatform } = await import('./db/messaging-groups.js'); const mg = getMessagingGroupByPlatform('slack', 'C-NEW-CHANNEL'); expect(mg).toBeDefined(); @@ -285,7 +290,7 @@ describe('router', () => { // Both should be in the same session const session = findSession('mg-1', null); - const dbPath = sessionDbPath('ag-1', session!.id); + const dbPath = inboundDbPath('ag-1', session!.id); const db = new Database(dbPath); const rows = db.prepare('SELECT * FROM messages_in ORDER BY timestamp').all(); db.close(); @@ -295,7 +300,7 @@ describe('router', () => { }); describe('delivery', () => { - it('should detect undelivered messages in session DB', () => { + it('should detect undelivered messages in outbound DB', () => { createAgentGroup({ id: 'ag-1', name: 'Agent', @@ -317,16 +322,15 @@ describe('delivery', () => { const { session } = resolveSession('ag-1', 'mg-test', null, 'shared'); - // Write a response to the session DB (simulating what the agent-runner does) - const dbPath = sessionDbPath('ag-1', session.id); + // Write a response to the outbound DB (simulating what the agent-runner does) + const dbPath = outboundDbPath('ag-1', session.id); const db = new Database(dbPath); - db.pragma('journal_mode = WAL'); db.prepare( - `INSERT INTO messages_out (id, timestamp, delivered, kind, platform_id, channel_type, content) - VALUES ('out-1', datetime('now'), 0, 'chat', 'chan-123', 'discord', ?)`, + `INSERT INTO messages_out (id, timestamp, kind, platform_id, channel_type, content) + VALUES ('out-1', datetime('now'), 'chat', 'chan-123', 'discord', ?)`, ).run(JSON.stringify({ text: 'Agent response' })); - const undelivered = db.prepare('SELECT * FROM messages_out WHERE delivered = 0').all() as Array<{ + const undelivered = db.prepare('SELECT * FROM messages_out').all() as Array<{ id: string; content: string; }>; diff --git a/src/host-sweep.ts b/src/host-sweep.ts index 26a926fba..5bd877e22 100644 --- a/src/host-sweep.ts +++ b/src/host-sweep.ts @@ -1,10 +1,11 @@ /** * Host sweep — periodic maintenance of all session DBs. * - * - Wake containers for sessions with due messages (process_after) - * - Detect stale processing messages (container crash) → reset with backoff - * - Insert next occurrence for recurring messages - * - Kill idle containers past timeout + * Two-DB architecture: + * - Reads processing_ack from outbound.db to sync message status + * - Writes to inbound.db (host-owned) for status updates and recurrence + * - Uses heartbeat file mtime for stale container detection (not DB writes) + * - Never writes to outbound.db — preserves single-writer-per-file invariant */ import Database from 'better-sqlite3'; import fs from 'fs'; @@ -12,7 +13,7 @@ import fs from 'fs'; import { getActiveSessions, updateSession } from './db/sessions.js'; import { getAgentGroup } from './db/agent-groups.js'; import { log } from './log.js'; -import { openSessionDb, sessionDbPath } from './session-manager.js'; +import { openInboundDb, openOutboundDb, inboundDbPath, outboundDbPath, heartbeatPath } from './session-manager.js'; import { wakeContainer, isContainerRunning } from './container-runner.js'; import type { Session } from './types.js'; @@ -52,21 +53,31 @@ async function sweepSession(session: Session): Promise { const agentGroup = getAgentGroup(session.agent_group_id); if (!agentGroup) return; - const dbPath = sessionDbPath(agentGroup.id, session.id); - if (!fs.existsSync(dbPath)) return; + const inPath = inboundDbPath(agentGroup.id, session.id); + if (!fs.existsSync(inPath)) return; - let db: Database.Database; + let inDb: Database.Database; + let outDb: Database.Database | null = null; try { - db = new Database(dbPath); - db.pragma('journal_mode = DELETE'); - db.pragma('busy_timeout = 5000'); + inDb = openInboundDb(agentGroup.id, session.id); } catch { return; } try { - // 1. Check for due pending messages → wake container - const dueMessages = db + outDb = openOutboundDb(agentGroup.id, session.id); + } catch { + // outbound.db might not exist yet (container hasn't started) + } + + try { + // 1. Sync processing_ack → messages_in status + if (outDb) { + syncProcessingAcks(inDb, outDb); + } + + // 2. Check for due pending messages → wake container + const dueMessages = inDb .prepare( `SELECT COUNT(*) as count FROM messages_in WHERE status = 'pending' @@ -79,90 +90,134 @@ async function sweepSession(session: Session): Promise { await wakeContainer(session); } - // 2. Detect stale processing messages - const staleMessages = db - .prepare( - `SELECT id, tries FROM messages_in - WHERE status = 'processing' - AND status_changed < datetime('now', '-${Math.floor(STALE_THRESHOLD_MS / 1000)} seconds')`, - ) - .all() as Array<{ id: string; tries: number }>; - - for (const msg of staleMessages) { - if (msg.tries >= MAX_TRIES) { - db.prepare("UPDATE messages_in SET status = 'failed', status_changed = datetime('now') WHERE id = ?").run( - msg.id, - ); - log.warn('Message marked as failed after max retries', { messageId: msg.id, sessionId: session.id }); - } else { - const backoffMs = BACKOFF_BASE_MS * Math.pow(2, msg.tries); - const backoffSec = Math.floor(backoffMs / 1000); - db.prepare( - `UPDATE messages_in SET status = 'pending', status_changed = datetime('now'), process_after = datetime('now', '+${backoffSec} seconds') WHERE id = ?`, - ).run(msg.id); - log.info('Reset stale message with backoff', { messageId: msg.id, tries: msg.tries, backoffMs }); - } + // 3. Detect stale containers via heartbeat file + if (outDb) { + detectStaleContainers(inDb, outDb, session, agentGroup.id); } - // 3. Handle recurrence for completed messages - const completedRecurring = db - .prepare("SELECT * FROM messages_in WHERE status = 'completed' AND recurrence IS NOT NULL") - .all() as Array<{ - id: string; - kind: string; - content: string; - recurrence: string; - process_after: string | null; - platform_id: string | null; - channel_type: string | null; - thread_id: string | null; - }>; - - for (const msg of completedRecurring) { - try { - // Dynamic import to avoid loading cron-parser at module level - const { CronExpressionParser } = await import('cron-parser'); - const interval = CronExpressionParser.parse(msg.recurrence); - const nextRun = interval.next().toISOString(); - const newId = `msg-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; - - // Compute next seq from both tables (same pattern as session-manager.ts) - const nextSeq = ( - db - .prepare( - `SELECT COALESCE(MAX(seq), 0) + 1 AS next FROM ( - SELECT seq FROM messages_in WHERE seq IS NOT NULL - UNION ALL - SELECT seq FROM messages_out WHERE seq IS NOT NULL - )`, - ) - .get() as { next: number } - ).next; - - db.prepare( - `INSERT INTO messages_in (id, seq, kind, timestamp, status, process_after, recurrence, platform_id, channel_type, thread_id, content) - VALUES (?, ?, ?, datetime('now'), 'pending', ?, ?, ?, ?, ?, ?)`, - ).run( - newId, - nextSeq, - msg.kind, - nextRun, - msg.recurrence, - msg.platform_id, - msg.channel_type, - msg.thread_id, - msg.content, - ); - - // Remove recurrence from the completed message so it doesn't spawn again - db.prepare('UPDATE messages_in SET recurrence = NULL WHERE id = ?').run(msg.id); - - log.info('Inserted next recurrence', { originalId: msg.id, newId, nextRun }); - } catch (err) { - log.error('Failed to compute next recurrence', { messageId: msg.id, recurrence: msg.recurrence, err }); - } - } + // 4. Handle recurrence for completed messages + handleRecurrence(inDb, session); } finally { - db.close(); + inDb.close(); + outDb?.close(); + } +} + +/** + * Sync completed/failed processing_ack entries → messages_in.status. + * Only syncs terminal states — 'processing' is handled by stale detection. + */ +function syncProcessingAcks(inDb: Database.Database, outDb: Database.Database): void { + const completed = outDb + .prepare("SELECT message_id FROM processing_ack WHERE status IN ('completed', 'failed')") + .all() as Array<{ message_id: string }>; + + if (completed.length === 0) return; + + // Batch-update messages_in status for completed/failed messages + const updateStmt = inDb.prepare( + "UPDATE messages_in SET status = 'completed' WHERE id = ? AND status != 'completed'", + ); + inDb.transaction(() => { + for (const { message_id } of completed) { + updateStmt.run(message_id); + } + })(); +} + +/** + * Detect stale containers using heartbeat file mtime. + * If the heartbeat is older than STALE_THRESHOLD and processing_ack has + * 'processing' entries, the container likely crashed — reset with backoff. + */ +function detectStaleContainers( + inDb: Database.Database, + outDb: Database.Database, + session: Session, + agentGroupId: string, +): void { + const hbPath = heartbeatPath(agentGroupId, session.id); + let heartbeatAge = Infinity; + try { + const stat = fs.statSync(hbPath); + heartbeatAge = Date.now() - stat.mtimeMs; + } catch { + // No heartbeat file — container may never have started, or it's very old + } + + if (heartbeatAge < STALE_THRESHOLD_MS) return; // Container is alive + + // Heartbeat is stale — check for stuck processing entries + const processing = outDb + .prepare("SELECT message_id FROM processing_ack WHERE status = 'processing'") + .all() as Array<{ message_id: string }>; + + if (processing.length === 0) return; + + for (const { message_id } of processing) { + const msg = inDb + .prepare('SELECT id, tries FROM messages_in WHERE id = ? AND status = ?') + .get(message_id, 'pending') as { id: string; tries: number } | undefined; + + if (!msg) continue; + + if (msg.tries >= MAX_TRIES) { + inDb.prepare("UPDATE messages_in SET status = 'failed' WHERE id = ?").run(msg.id); + log.warn('Message marked as failed after max retries', { messageId: msg.id, sessionId: session.id }); + } else { + const backoffMs = BACKOFF_BASE_MS * Math.pow(2, msg.tries); + const backoffSec = Math.floor(backoffMs / 1000); + inDb + .prepare( + `UPDATE messages_in SET tries = tries + 1, process_after = datetime('now', '+${backoffSec} seconds') WHERE id = ?`, + ) + .run(msg.id); + log.info('Reset stale message with backoff', { messageId: msg.id, tries: msg.tries, backoffMs }); + } + } +} + +/** Insert next occurrence for completed recurring messages. */ +async function handleRecurrence(inDb: Database.Database, session: Session): Promise { + const completedRecurring = inDb + .prepare("SELECT * FROM messages_in WHERE status = 'completed' AND recurrence IS NOT NULL") + .all() as Array<{ + id: string; + kind: string; + content: string; + recurrence: string; + process_after: string | null; + platform_id: string | null; + channel_type: string | null; + thread_id: string | null; + }>; + + for (const msg of completedRecurring) { + try { + const { CronExpressionParser } = await import('cron-parser'); + const interval = CronExpressionParser.parse(msg.recurrence); + const nextRun = interval.next().toISOString(); + const newId = `msg-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; + + // Host uses even seq numbers + const maxSeq = ( + inDb.prepare('SELECT COALESCE(MAX(seq), 0) AS m FROM messages_in').get() as { m: number } + ).m; + const nextSeq = maxSeq < 2 ? 2 : maxSeq + 2 - (maxSeq % 2); + + inDb + .prepare( + `INSERT INTO messages_in (id, seq, kind, timestamp, status, process_after, recurrence, platform_id, channel_type, thread_id, content) + VALUES (?, ?, ?, datetime('now'), 'pending', ?, ?, ?, ?, ?, ?)`, + ) + .run(newId, nextSeq, msg.kind, nextRun, msg.recurrence, msg.platform_id, msg.channel_type, msg.thread_id, msg.content); + + // Remove recurrence from the completed message so it doesn't spawn again + inDb.prepare('UPDATE messages_in SET recurrence = NULL WHERE id = ?').run(msg.id); + + log.info('Inserted next recurrence', { originalId: msg.id, newId, nextRun }); + } catch (err) { + log.error('Failed to compute next recurrence', { messageId: msg.id, recurrence: msg.recurrence, err }); + } } } diff --git a/src/session-manager.ts b/src/session-manager.ts index 64e192238..f24f62084 100644 --- a/src/session-manager.ts +++ b/src/session-manager.ts @@ -1,6 +1,10 @@ /** * Session lifecycle management. * Creates session folders + DBs, writes messages, manages container status. + * + * Two-DB architecture: each session has inbound.db (host-owned) and outbound.db + * (container-owned). This eliminates SQLite write contention across the + * host-container mount boundary — each file has exactly one writer. */ import Database from 'better-sqlite3'; import fs from 'fs'; @@ -9,7 +13,7 @@ import path from 'path'; import { DATA_DIR } from './config.js'; import { createSession, findSession, getSession, updateSession } from './db/sessions.js'; import { log } from './log.js'; -import { SESSION_SCHEMA } from './db/schema.js'; +import { INBOUND_SCHEMA, OUTBOUND_SCHEMA } from './db/schema.js'; import type { Session } from './types.js'; /** Root directory for all session data. */ @@ -22,9 +26,27 @@ export function sessionDir(agentGroupId: string, sessionId: string): string { return path.join(sessionsBaseDir(), agentGroupId, sessionId); } -/** Path to a session's SQLite DB. */ +/** Path to the host-owned inbound DB (messages_in + delivered). */ +export function inboundDbPath(agentGroupId: string, sessionId: string): string { + return path.join(sessionDir(agentGroupId, sessionId), 'inbound.db'); +} + +/** Path to the container-owned outbound DB (messages_out + processing_ack). */ +export function outboundDbPath(agentGroupId: string, sessionId: string): string { + return path.join(sessionDir(agentGroupId, sessionId), 'outbound.db'); +} + +/** Path to the container heartbeat file (touched instead of DB writes). */ +export function heartbeatPath(agentGroupId: string, sessionId: string): string { + return path.join(sessionDir(agentGroupId, sessionId), '.heartbeat'); +} + +/** + * @deprecated Use inboundDbPath / outboundDbPath instead. + * Kept temporarily for test compatibility during migration. + */ export function sessionDbPath(agentGroupId: string, sessionId: string): string { - return path.join(sessionDir(agentGroupId, sessionId), 'session.db'); + return inboundDbPath(agentGroupId, sessionId); } function generateId(): string { @@ -41,8 +63,6 @@ export function resolveSession( threadId: string | null, sessionMode: 'shared' | 'per-thread', ): { session: Session; created: boolean } { - // For shared mode, look for any active session with this messaging group (threadId ignored) - // For per-thread mode, look for an active session with this specific thread const lookupThreadId = sessionMode === 'shared' ? null : threadId; const existing = findSession(messagingGroupId, lookupThreadId); @@ -50,7 +70,6 @@ export function resolveSession( return { session: existing, created: false }; } - // Create new session const id = generateId(); const session: Session = { id, @@ -71,23 +90,32 @@ export function resolveSession( return { session, created: true }; } -/** Create the session folder and initialize the session DB. */ +/** Create the session folder and initialize both DBs. */ export function initSessionFolder(agentGroupId: string, sessionId: string): void { const dir = sessionDir(agentGroupId, sessionId); fs.mkdirSync(dir, { recursive: true }); fs.mkdirSync(path.join(dir, 'outbox'), { recursive: true }); - const dbPath = sessionDbPath(agentGroupId, sessionId); - if (!fs.existsSync(dbPath)) { - const db = new Database(dbPath); + const inPath = inboundDbPath(agentGroupId, sessionId); + if (!fs.existsSync(inPath)) { + const db = new Database(inPath); db.pragma('journal_mode = DELETE'); - db.exec(SESSION_SCHEMA); + db.exec(INBOUND_SCHEMA); db.close(); - log.debug('Session DB created', { dbPath }); + log.debug('Inbound DB created', { dbPath: inPath }); + } + + const outPath = outboundDbPath(agentGroupId, sessionId); + if (!fs.existsSync(outPath)) { + const db = new Database(outPath); + db.pragma('journal_mode = DELETE'); + db.exec(OUTBOUND_SCHEMA); + db.close(); + log.debug('Outbound DB created', { dbPath: outPath }); } } -/** Write a message to a session's messages_in table. */ +/** Write a message to a session's inbound DB (messages_in). Host-only. */ export function writeSessionMessage( agentGroupId: string, sessionId: string, @@ -103,22 +131,19 @@ export function writeSessionMessage( recurrence?: string | null; }, ): void { - const dbPath = sessionDbPath(agentGroupId, sessionId); + const dbPath = inboundDbPath(agentGroupId, sessionId); const db = new Database(dbPath); db.pragma('journal_mode = DELETE'); + db.pragma('busy_timeout = 5000'); try { - const nextSeq = ( - db - .prepare( - `SELECT COALESCE(MAX(seq), 0) + 1 AS next FROM ( - SELECT seq FROM messages_in WHERE seq IS NOT NULL - UNION ALL - SELECT seq FROM messages_out WHERE seq IS NOT NULL - )`, - ) - .get() as { next: number } - ).next; + // Host uses even seq numbers, container uses odd — prevents collisions + // across the two-DB boundary without cross-DB coordination. + const maxSeq = ( + db.prepare('SELECT COALESCE(MAX(seq), 0) AS m FROM messages_in').get() as { m: number } + ).m; + const nextSeq = maxSeq < 2 ? 2 : maxSeq + 2 - (maxSeq % 2); // next even + db.prepare( `INSERT INTO messages_in (id, seq, kind, timestamp, status, platform_id, channel_type, thread_id, content, process_after, recurrence) VALUES (@id, @seq, @kind, @timestamp, 'pending', @platformId, @channelType, @threadId, @content, @processAfter, @recurrence)`, @@ -138,18 +163,33 @@ export function writeSessionMessage( db.close(); } - // Update last_active updateSession(sessionId, { last_active: new Date().toISOString() }); } -/** Open a session DB for reading (e.g., polling messages_out). */ -export function openSessionDb(agentGroupId: string, sessionId: string): Database.Database { - const dbPath = sessionDbPath(agentGroupId, sessionId); +/** Open the inbound DB for a session (host reads/writes). */ +export function openInboundDb(agentGroupId: string, sessionId: string): Database.Database { + const dbPath = inboundDbPath(agentGroupId, sessionId); const db = new Database(dbPath); db.pragma('journal_mode = DELETE'); + db.pragma('busy_timeout = 5000'); return db; } +/** Open the outbound DB for a session (host reads only). */ +export function openOutboundDb(agentGroupId: string, sessionId: string): Database.Database { + const dbPath = outboundDbPath(agentGroupId, sessionId); + const db = new Database(dbPath, { readonly: true }); + db.pragma('busy_timeout = 5000'); + return db; +} + +/** + * @deprecated Use openInboundDb / openOutboundDb instead. + */ +export function openSessionDb(agentGroupId: string, sessionId: string): Database.Database { + return openInboundDb(agentGroupId, sessionId); +} + /** Mark a container as running for a session. */ export function markContainerRunning(sessionId: string): void { updateSession(sessionId, { container_status: 'running', last_active: new Date().toISOString() });