diff --git a/src/channels/chat-sdk-bridge.ts b/src/channels/chat-sdk-bridge.ts index 8302afdd8..0ed32fda6 100644 --- a/src/channels/chat-sdk-bridge.ts +++ b/src/channels/chat-sdk-bridge.ts @@ -14,12 +14,10 @@ import { Button, Modal, TextInput, - markdownToPlainText, type Adapter, type ConcurrencyStrategy, type Message as ChatMessage, } from 'chat'; -import { ValidationError } from '@chat-adapter/shared'; import { log } from '../log.js'; import { SqliteStateAdapter } from '../state-sqlite.js'; import type { ChannelAdapter, ChannelSetup, ConversationConfig, InboundMessage } from './adapter.js'; @@ -380,30 +378,12 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter data: f.data, filename: f.filename, })); - try { - if (fileUploads && fileUploads.length > 0) { - const result = await adapter.postMessage(tid, { markdown: text, files: fileUploads }); - return result?.id; - } else { - const result = await adapter.postMessage(tid, { markdown: text }); - return result?.id; - } - } catch (err) { - // Permanent formatting failure (e.g. Telegram MarkdownV2 entity parse error): - // retry once as plain text so the queue isn't blocked forever. - if (err instanceof ValidationError) { - log.warn('Markdown rejected by adapter, retrying as plain text', { - adapter: adapter.name, - err: err.message, - }); - const plain = markdownToPlainText(text); - const result = await adapter.postMessage(tid, plain); - if (fileUploads && fileUploads.length > 0) { - await adapter.postMessage(tid, { markdown: '', files: fileUploads }); - } - return result?.id; - } - throw err; + if (fileUploads && fileUploads.length > 0) { + const result = await adapter.postMessage(tid, { markdown: text, files: fileUploads }); + return result?.id; + } else { + const result = await adapter.postMessage(tid, { markdown: text }); + return result?.id; } } else if (message.files && message.files.length > 0) { // Files only, no text diff --git a/src/db/schema.ts b/src/db/schema.ts index acffa22f5..6f8a803e0 100644 --- a/src/db/schema.ts +++ b/src/db/schema.ts @@ -78,7 +78,7 @@ CREATE TABLE pending_questions ( /** Host-owned: inbound messages + delivery tracking + destination map. */ export const INBOUND_SCHEMA = ` -CREATE TABLE messages_in ( +CREATE TABLE IF NOT EXISTS messages_in ( id TEXT PRIMARY KEY, seq INTEGER UNIQUE, kind TEXT NOT NULL, @@ -95,7 +95,7 @@ CREATE TABLE messages_in ( -- Host tracks delivery outcomes for messages_out IDs. -- Avoids writing to outbound.db (container-owned). -CREATE TABLE delivered ( +CREATE TABLE IF NOT EXISTS delivered ( message_out_id TEXT PRIMARY KEY, platform_message_id TEXT, status TEXT NOT NULL DEFAULT 'delivered', @@ -106,7 +106,7 @@ CREATE TABLE delivered ( -- Host overwrites on every container wake AND on demand (admin rewires, new child agents, etc.). -- Container queries this live on every lookup, so admin changes take effect -- mid-session without requiring a container restart. -CREATE TABLE destinations ( +CREATE TABLE IF NOT EXISTS destinations ( name TEXT PRIMARY KEY, display_name TEXT, type TEXT NOT NULL, -- 'channel' | 'agent' @@ -120,7 +120,7 @@ CREATE TABLE destinations ( -- and thread_id. Container reads it in send_message / ask_user_question / -- trigger_credential_collection to default the channel/thread of outbound -- messages when the agent doesn't specify an explicit destination. -CREATE TABLE session_routing ( +CREATE TABLE IF NOT EXISTS session_routing ( id INTEGER PRIMARY KEY CHECK (id = 1), channel_type TEXT, platform_id TEXT, @@ -130,7 +130,7 @@ CREATE TABLE session_routing ( /** Container-owned: outbound messages + processing acknowledgments. */ export const OUTBOUND_SCHEMA = ` -CREATE TABLE messages_out ( +CREATE TABLE IF NOT EXISTS messages_out ( id TEXT PRIMARY KEY, seq INTEGER UNIQUE, in_reply_to TEXT, @@ -147,7 +147,7 @@ CREATE TABLE messages_out ( -- Container tracks processing status here instead of updating messages_in. -- Host reads this to know which messages have been processed. -- On container startup, stale 'processing' entries are cleared (crash recovery). -CREATE TABLE processing_ack ( +CREATE TABLE IF NOT EXISTS processing_ack ( message_id TEXT PRIMARY KEY, status TEXT NOT NULL, status_changed TEXT NOT NULL @@ -156,7 +156,7 @@ CREATE TABLE processing_ack ( -- Persistent key/value state owned by the container. Used (among other things) -- to store the SDK session ID so the agent's conversation resumes across -- container restarts. Cleared by /clear. -CREATE TABLE session_state ( +CREATE TABLE IF NOT EXISTS session_state ( key TEXT PRIMARY KEY, value TEXT NOT NULL, updated_at TEXT NOT NULL diff --git a/src/db/session-db.ts b/src/db/session-db.ts new file mode 100644 index 000000000..73dc13996 --- /dev/null +++ b/src/db/session-db.ts @@ -0,0 +1,288 @@ +/** + * SQL operations on per-session inbound/outbound DBs. + * + * These are NOT the central app DB — they're the cross-mount SQLite files + * shared between host and container. Callers own the connection lifecycle + * (open-write-close per op). See session-manager.ts header for invariants. + */ +import Database from 'better-sqlite3'; + +import { INBOUND_SCHEMA, OUTBOUND_SCHEMA } from './schema.js'; + +/** Apply the inbound or outbound schema to a DB file. Idempotent. */ +export function ensureSchema(dbPath: string, schema: 'inbound' | 'outbound'): void { + const db = new Database(dbPath); + db.pragma('journal_mode = DELETE'); + db.exec(schema === 'inbound' ? INBOUND_SCHEMA : OUTBOUND_SCHEMA); + db.close(); +} + +/** Open the inbound DB for a session (host reads/writes). */ +export function openInboundDb(dbPath: string): Database.Database { + const db = new Database(dbPath); + db.pragma('journal_mode = DELETE'); + db.pragma('busy_timeout = 5000'); + return db; +} + +/** Open the outbound DB for a session (host reads only). */ +export function openOutboundDb(dbPath: string): Database.Database { + const db = new Database(dbPath, { readonly: true }); + db.pragma('busy_timeout = 5000'); + return db; +} + +export function upsertSessionRouting( + db: Database.Database, + routing: { channel_type: string | null; platform_id: string | null; thread_id: string | null }, +): void { + db.prepare( + `INSERT INTO session_routing (id, channel_type, platform_id, thread_id) + VALUES (1, @channel_type, @platform_id, @thread_id) + ON CONFLICT(id) DO UPDATE SET + channel_type = excluded.channel_type, + platform_id = excluded.platform_id, + thread_id = excluded.thread_id`, + ).run(routing); +} + +export interface DestinationRow { + name: string; + display_name: string | null; + type: 'channel' | 'agent'; + channel_type: string | null; + platform_id: string | null; + agent_group_id: string | null; +} + +export function replaceDestinations(db: Database.Database, entries: DestinationRow[]): void { + const tx = db.transaction((rows: DestinationRow[]) => { + db.prepare('DELETE FROM destinations').run(); + const stmt = db.prepare( + `INSERT INTO destinations (name, display_name, type, channel_type, platform_id, agent_group_id) + VALUES (@name, @display_name, @type, @channel_type, @platform_id, @agent_group_id)`, + ); + for (const row of rows) stmt.run(row); + }); + tx(entries); +} + +// --------------------------------------------------------------------------- +// messages_in +// --------------------------------------------------------------------------- + +/** Next even seq number for host-owned inbound.db. */ +function nextEvenSeq(db: Database.Database): number { + const maxSeq = (db.prepare('SELECT COALESCE(MAX(seq), 0) AS m FROM messages_in').get() as { m: number }).m; + return maxSeq < 2 ? 2 : maxSeq + 2 - (maxSeq % 2); +} + +export function insertMessage( + db: Database.Database, + message: { + id: string; + kind: string; + timestamp: string; + platformId: string | null; + channelType: string | null; + threadId: string | null; + content: string; + processAfter: string | null; + recurrence: string | null; + }, +): void { + db.prepare( + `INSERT INTO messages_in (id, seq, kind, timestamp, status, platform_id, channel_type, thread_id, content, process_after, recurrence) + VALUES (@id, @seq, @kind, @timestamp, 'pending', @platformId, @channelType, @threadId, @content, @processAfter, @recurrence)`, + ).run({ + ...message, + seq: nextEvenSeq(db), + }); +} + +export function insertTask( + db: Database.Database, + task: { + id: string; + processAfter: string; + recurrence: string | null; + platformId: string | null; + channelType: string | null; + threadId: string | null; + content: string; + }, +): void { + db.prepare( + `INSERT INTO messages_in (id, seq, timestamp, status, tries, process_after, recurrence, kind, platform_id, channel_type, thread_id, content) + VALUES (@id, @seq, datetime('now'), 'pending', 0, @processAfter, @recurrence, 'task', @platformId, @channelType, @threadId, @content)`, + ).run({ + ...task, + seq: nextEvenSeq(db), + }); +} + +export function cancelTask(db: Database.Database, taskId: string): void { + db.prepare( + "UPDATE messages_in SET status = 'completed' WHERE id = ? AND kind = 'task' AND status IN ('pending', 'paused')", + ).run(taskId); +} + +export function pauseTask(db: Database.Database, taskId: string): void { + db.prepare("UPDATE messages_in SET status = 'paused' WHERE id = ? AND kind = 'task' AND status = 'pending'").run( + taskId, + ); +} + +export function resumeTask(db: Database.Database, taskId: string): void { + db.prepare("UPDATE messages_in SET status = 'pending' WHERE id = ? AND kind = 'task' AND status = 'paused'").run( + taskId, + ); +} + +export function countDueMessages(db: Database.Database): number { + return ( + db + .prepare( + `SELECT COUNT(*) as count FROM messages_in + WHERE status = 'pending' + AND (process_after IS NULL OR process_after <= datetime('now'))`, + ) + .get() as { count: number } + ).count; +} + +export function markMessageFailed(db: Database.Database, messageId: string): void { + db.prepare("UPDATE messages_in SET status = 'failed' WHERE id = ?").run(messageId); +} + +export function retryWithBackoff(db: Database.Database, messageId: string, backoffSec: number): void { + db.prepare( + `UPDATE messages_in SET tries = tries + 1, process_after = datetime('now', '+${backoffSec} seconds') WHERE id = ?`, + ).run(messageId); +} + +export function getMessageForRetry( + db: Database.Database, + messageId: string, + status: string, +): { id: string; tries: number } | undefined { + return db.prepare('SELECT id, tries FROM messages_in WHERE id = ? AND status = ?').get(messageId, status) as + | { id: string; tries: number } + | undefined; +} + +export interface RecurringMessage { + id: string; + kind: string; + content: string; + recurrence: string; + process_after: string | null; + platform_id: string | null; + channel_type: string | null; + thread_id: string | null; +} + +export function getCompletedRecurring(db: Database.Database): RecurringMessage[] { + return db + .prepare("SELECT * FROM messages_in WHERE status = 'completed' AND recurrence IS NOT NULL") + .all() as RecurringMessage[]; +} + +export function insertRecurrence( + db: Database.Database, + msg: RecurringMessage, + newId: string, + nextRun: string | null, +): void { + db.prepare( + `INSERT INTO messages_in (id, seq, kind, timestamp, status, process_after, recurrence, platform_id, channel_type, thread_id, content) + VALUES (?, ?, ?, datetime('now'), 'pending', ?, ?, ?, ?, ?, ?)`, + ).run(newId, nextEvenSeq(db), msg.kind, nextRun, msg.recurrence, msg.platform_id, msg.channel_type, msg.thread_id, msg.content); +} + +export function clearRecurrence(db: Database.Database, messageId: string): void { + db.prepare('UPDATE messages_in SET recurrence = NULL WHERE id = ?').run(messageId); +} + +export function syncProcessingAcks(inDb: Database.Database, outDb: Database.Database): void { + const completed = outDb + .prepare("SELECT message_id FROM processing_ack WHERE status IN ('completed', 'failed')") + .all() as Array<{ message_id: string }>; + + if (completed.length === 0) return; + + const updateStmt = inDb.prepare("UPDATE messages_in SET status = 'completed' WHERE id = ? AND status != 'completed'"); + inDb.transaction(() => { + for (const { message_id } of completed) { + updateStmt.run(message_id); + } + })(); +} + +export function getStuckProcessingIds(outDb: Database.Database): string[] { + return ( + outDb.prepare("SELECT message_id FROM processing_ack WHERE status = 'processing'").all() as Array<{ + message_id: string; + }> + ).map((r) => r.message_id); +} + +// --------------------------------------------------------------------------- +// messages_out (read-only from host) +// --------------------------------------------------------------------------- + +export interface OutboundMessage { + id: string; + kind: string; + platform_id: string | null; + channel_type: string | null; + thread_id: string | null; + content: string; +} + +export function getDueOutboundMessages(db: Database.Database): OutboundMessage[] { + return db + .prepare( + `SELECT * FROM messages_out + WHERE (deliver_after IS NULL OR deliver_after <= datetime('now')) + ORDER BY timestamp ASC`, + ) + .all() as OutboundMessage[]; +} + +// --------------------------------------------------------------------------- +// delivered +// --------------------------------------------------------------------------- + +export function getDeliveredIds(db: Database.Database): Set { + return new Set( + (db.prepare('SELECT message_out_id FROM delivered').all() as Array<{ message_out_id: string }>).map( + (r) => r.message_out_id, + ), + ); +} + +export function markDelivered(db: Database.Database, messageOutId: string, platformMessageId: string | null): void { + db.prepare( + "INSERT OR IGNORE INTO delivered (message_out_id, platform_message_id, status, delivered_at) VALUES (?, ?, 'delivered', datetime('now'))", + ).run(messageOutId, platformMessageId ?? null); +} + +export function markDeliveryFailed(db: Database.Database, messageOutId: string): void { + db.prepare( + "INSERT OR IGNORE INTO delivered (message_out_id, platform_message_id, status, delivered_at) VALUES (?, NULL, 'failed', datetime('now'))", + ).run(messageOutId); +} + +/** Ensure the delivered table has columns added after initial schema. */ +export function migrateDeliveredTable(db: Database.Database): void { + const cols = new Set( + (db.prepare("PRAGMA table_info('delivered')").all() as Array<{ name: string }>).map((c) => c.name), + ); + if (!cols.has('platform_message_id')) { + db.prepare('ALTER TABLE delivered ADD COLUMN platform_message_id TEXT').run(); + } + if (!cols.has('status')) { + db.prepare("ALTER TABLE delivered ADD COLUMN status TEXT NOT NULL DEFAULT 'delivered'").run(); + } +} diff --git a/src/delivery.ts b/src/delivery.ts index 368b2f9db..e884efbbf 100644 --- a/src/delivery.ts +++ b/src/delivery.ts @@ -7,7 +7,7 @@ * - Tracks delivery in inbound.db's `delivered` table (host-owned) * - Never writes to outbound.db — preserves single-writer-per-file invariant */ -import Database from 'better-sqlite3'; +import type Database from 'better-sqlite3'; import fs from 'fs'; import path from 'path'; @@ -28,6 +28,17 @@ import { } from './db/agent-groups.js'; import { createDestination, getDestinationByName, hasDestination, normalizeName } from './db/agent-destinations.js'; import { getMessagingGroupByPlatform, getMessagingGroupsByAgentGroup } from './db/messaging-groups.js'; +import { + getDueOutboundMessages, + getDeliveredIds, + markDelivered, + markDeliveryFailed, + migrateDeliveredTable, + insertTask, + cancelTask, + pauseTask, + resumeTask, +} from './db/session-db.js'; import { log } from './log.js'; import { openInboundDb, @@ -215,30 +226,12 @@ async function deliverSessionMessages(session: Session): Promise { try { // Read all due messages from outbound.db (read-only) - const allDue = outDb - .prepare( - `SELECT * FROM messages_out - WHERE (deliver_after IS NULL OR deliver_after <= datetime('now')) - ORDER BY timestamp ASC`, - ) - .all() as Array<{ - id: string; - kind: string; - platform_id: string | null; - channel_type: string | null; - thread_id: string | null; - content: string; - }>; - + const allDue = getDueOutboundMessages(outDb); if (allDue.length === 0) return; // Filter out already-delivered messages using inbound.db's delivered table - const deliveredIds = new Set( - (inDb.prepare('SELECT message_out_id FROM delivered').all() as Array<{ message_out_id: string }>).map( - (r) => r.message_out_id, - ), - ); - const undelivered = allDue.filter((m) => !deliveredIds.has(m.id)); + const delivered = getDeliveredIds(inDb); + const undelivered = allDue.filter((m) => !delivered.has(m.id)); if (undelivered.length === 0) return; // Ensure platform_message_id column exists (migration for existing sessions) @@ -247,11 +240,7 @@ async function deliverSessionMessages(session: Session): Promise { for (const msg of undelivered) { try { const platformMsgId = await deliverMessage(msg, session, inDb); - inDb - .prepare( - "INSERT OR IGNORE INTO delivered (message_out_id, platform_message_id, status, delivered_at) VALUES (?, ?, 'delivered', datetime('now'))", - ) - .run(msg.id, platformMsgId ?? null); + markDelivered(inDb, msg.id, platformMsgId ?? null); deliveryAttempts.delete(msg.id); resetContainerIdleTimer(session.id); } catch (err) { @@ -264,11 +253,7 @@ async function deliverSessionMessages(session: Session): Promise { attempts, err, }); - inDb - .prepare( - "INSERT OR IGNORE INTO delivered (message_out_id, platform_message_id, status, delivered_at) VALUES (?, NULL, 'failed', datetime('now'))", - ) - .run(msg.id); + markDeliveryFailed(inDb, msg.id); deliveryAttempts.delete(msg.id); } else { log.warn('Message delivery failed, will retry', { @@ -428,19 +413,6 @@ async function deliverMessage( return platformMsgId; } -/** Ensure the delivered table has new columns (migration for existing sessions). */ -function migrateDeliveredTable(db: Database.Database): void { - const cols = new Set( - (db.prepare("PRAGMA table_info('delivered')").all() as Array<{ name: string }>).map((c) => c.name), - ); - if (!cols.has('platform_message_id')) { - db.prepare('ALTER TABLE delivered ADD COLUMN platform_message_id TEXT').run(); - } - if (!cols.has('status')) { - db.prepare("ALTER TABLE delivered ADD COLUMN status TEXT NOT NULL DEFAULT 'delivered'").run(); - } -} - /** * Handle system actions from the container agent. * These are written to messages_out because the container can't write to inbound.db. @@ -462,54 +434,36 @@ async function handleSystemAction( const processAfter = content.processAfter as string; const recurrence = (content.recurrence as string) || null; - // Compute next even seq for host-owned inbound.db - const maxSeq = (inDb.prepare('SELECT COALESCE(MAX(seq), 0) AS m FROM messages_in').get() as { m: number }).m; - const nextSeq = maxSeq < 2 ? 2 : maxSeq + 2 - (maxSeq % 2); - - inDb - .prepare( - `INSERT INTO messages_in (id, seq, timestamp, status, tries, process_after, recurrence, kind, platform_id, channel_type, thread_id, content) - VALUES (@id, @seq, datetime('now'), 'pending', 0, @process_after, @recurrence, 'task', @platform_id, @channel_type, @thread_id, @content)`, - ) - .run({ - id: taskId, - seq: nextSeq, - process_after: processAfter, - recurrence, - platform_id: content.platformId ?? null, - channel_type: content.channelType ?? null, - thread_id: content.threadId ?? null, - content: JSON.stringify({ prompt, script }), - }); + insertTask(inDb, { + id: taskId, + processAfter, + recurrence, + platformId: (content.platformId as string) ?? null, + channelType: (content.channelType as string) ?? null, + threadId: (content.threadId as string) ?? null, + content: JSON.stringify({ prompt, script }), + }); log.info('Scheduled task created', { taskId, processAfter, recurrence }); break; } case 'cancel_task': { const taskId = content.taskId as string; - inDb - .prepare( - "UPDATE messages_in SET status = 'completed' WHERE id = ? AND kind = 'task' AND status IN ('pending', 'paused')", - ) - .run(taskId); + cancelTask(inDb, taskId); log.info('Task cancelled', { taskId }); break; } case 'pause_task': { const taskId = content.taskId as string; - inDb - .prepare("UPDATE messages_in SET status = 'paused' WHERE id = ? AND kind = 'task' AND status = 'pending'") - .run(taskId); + pauseTask(inDb, taskId); log.info('Task paused', { taskId }); break; } case 'resume_task': { const taskId = content.taskId as string; - inDb - .prepare("UPDATE messages_in SET status = 'pending' WHERE id = ? AND kind = 'task' AND status = 'paused'") - .run(taskId); + resumeTask(inDb, taskId); log.info('Task resumed', { taskId }); break; } diff --git a/src/host-sweep.ts b/src/host-sweep.ts index 22583a82d..b1b49f951 100644 --- a/src/host-sweep.ts +++ b/src/host-sweep.ts @@ -7,11 +7,22 @@ * - Uses heartbeat file mtime for stale container detection (not DB writes) * - Never writes to outbound.db — preserves single-writer-per-file invariant */ -import Database from 'better-sqlite3'; +import type Database from 'better-sqlite3'; import fs from 'fs'; import { getActiveSessions, updateSession } from './db/sessions.js'; import { getAgentGroup } from './db/agent-groups.js'; +import { + countDueMessages, + syncProcessingAcks, + getStuckProcessingIds, + getMessageForRetry, + markMessageFailed, + retryWithBackoff, + getCompletedRecurring, + insertRecurrence, + clearRecurrence, +} from './db/session-db.js'; import { log } from './log.js'; import { openInboundDb, openOutboundDb, inboundDbPath, outboundDbPath, heartbeatPath } from './session-manager.js'; import { wakeContainer, isContainerRunning } from './container-runner.js'; @@ -77,16 +88,10 @@ async function sweepSession(session: Session): Promise { } // 2. Check for due pending messages → wake container - const dueMessages = inDb - .prepare( - `SELECT COUNT(*) as count FROM messages_in - WHERE status = 'pending' - AND (process_after IS NULL OR process_after <= datetime('now'))`, - ) - .get() as { count: number }; + const dueCount = countDueMessages(inDb); - if (dueMessages.count > 0 && !isContainerRunning(session.id)) { - log.info('Waking container for due messages', { sessionId: session.id, count: dueMessages.count }); + if (dueCount > 0 && !isContainerRunning(session.id)) { + log.info('Waking container for due messages', { sessionId: session.id, count: dueCount }); await wakeContainer(session); } @@ -103,26 +108,6 @@ async function sweepSession(session: Session): Promise { } } -/** - * Sync completed/failed processing_ack entries → messages_in.status. - * Only syncs terminal states — 'processing' is handled by stale detection. - */ -function syncProcessingAcks(inDb: Database.Database, outDb: Database.Database): void { - const completed = outDb - .prepare("SELECT message_id FROM processing_ack WHERE status IN ('completed', 'failed')") - .all() as Array<{ message_id: string }>; - - if (completed.length === 0) return; - - // Batch-update messages_in status for completed/failed messages - const updateStmt = inDb.prepare("UPDATE messages_in SET status = 'completed' WHERE id = ? AND status != 'completed'"); - inDb.transaction(() => { - for (const { message_id } of completed) { - updateStmt.run(message_id); - } - })(); -} - /** * Detect stale containers using heartbeat file mtime. * If the heartbeat is older than STALE_THRESHOLD and processing_ack has @@ -146,30 +131,20 @@ function detectStaleContainers( if (heartbeatAge < STALE_THRESHOLD_MS) return; // Container is alive // Heartbeat is stale — check for stuck processing entries - const processing = outDb.prepare("SELECT message_id FROM processing_ack WHERE status = 'processing'").all() as Array<{ - message_id: string; - }>; - - if (processing.length === 0) return; - - for (const { message_id } of processing) { - const msg = inDb - .prepare('SELECT id, tries FROM messages_in WHERE id = ? AND status = ?') - .get(message_id, 'pending') as { id: string; tries: number } | undefined; + const processingIds = getStuckProcessingIds(outDb); + if (processingIds.length === 0) return; + for (const messageId of processingIds) { + const msg = getMessageForRetry(inDb, messageId, 'pending'); if (!msg) continue; if (msg.tries >= MAX_TRIES) { - inDb.prepare("UPDATE messages_in SET status = 'failed' WHERE id = ?").run(msg.id); + markMessageFailed(inDb, msg.id); log.warn('Message marked as failed after max retries', { messageId: msg.id, sessionId: session.id }); } else { const backoffMs = BACKOFF_BASE_MS * Math.pow(2, msg.tries); const backoffSec = Math.floor(backoffMs / 1000); - inDb - .prepare( - `UPDATE messages_in SET tries = tries + 1, process_after = datetime('now', '+${backoffSec} seconds') WHERE id = ?`, - ) - .run(msg.id); + retryWithBackoff(inDb, msg.id, backoffSec); log.info('Reset stale message with backoff', { messageId: msg.id, tries: msg.tries, backoffMs }); } } @@ -177,49 +152,17 @@ function detectStaleContainers( /** Insert next occurrence for completed recurring messages. */ async function handleRecurrence(inDb: Database.Database, session: Session): Promise { - const completedRecurring = inDb - .prepare("SELECT * FROM messages_in WHERE status = 'completed' AND recurrence IS NOT NULL") - .all() as Array<{ - id: string; - kind: string; - content: string; - recurrence: string; - process_after: string | null; - platform_id: string | null; - channel_type: string | null; - thread_id: string | null; - }>; + const recurring = getCompletedRecurring(inDb); - for (const msg of completedRecurring) { + for (const msg of recurring) { try { const { CronExpressionParser } = await import('cron-parser'); const interval = CronExpressionParser.parse(msg.recurrence); const nextRun = interval.next().toISOString(); const newId = `msg-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; - // Host uses even seq numbers - const maxSeq = (inDb.prepare('SELECT COALESCE(MAX(seq), 0) AS m FROM messages_in').get() as { m: number }).m; - const nextSeq = maxSeq < 2 ? 2 : maxSeq + 2 - (maxSeq % 2); - - inDb - .prepare( - `INSERT INTO messages_in (id, seq, kind, timestamp, status, process_after, recurrence, platform_id, channel_type, thread_id, content) - VALUES (?, ?, ?, datetime('now'), 'pending', ?, ?, ?, ?, ?, ?)`, - ) - .run( - newId, - nextSeq, - msg.kind, - nextRun, - msg.recurrence, - msg.platform_id, - msg.channel_type, - msg.thread_id, - msg.content, - ); - - // Remove recurrence from the completed message so it doesn't spawn again - inDb.prepare('UPDATE messages_in SET recurrence = NULL WHERE id = ?').run(msg.id); + insertRecurrence(inDb, msg, newId, nextRun); + clearRecurrence(inDb, msg.id); log.info('Inserted next recurrence', { originalId: msg.id, newId, nextRun }); } catch (err) { diff --git a/src/session-manager.ts b/src/session-manager.ts index 8f05e286c..4ebbd3fcf 100644 --- a/src/session-manager.ts +++ b/src/session-manager.ts @@ -10,7 +10,7 @@ * 3. One writer per file — DELETE-mode journal-unlink isn't atomic across * the mount; concurrent writers corrupt the DB. */ -import Database from 'better-sqlite3'; +import type Database from 'better-sqlite3'; import fs from 'fs'; import path from 'path'; @@ -19,8 +19,16 @@ import { getAgentGroup } from './db/agent-groups.js'; import { getDestinations } from './db/agent-destinations.js'; import { getMessagingGroup } from './db/messaging-groups.js'; import { createSession, findSession, findSessionByAgentGroup, getSession, updateSession } from './db/sessions.js'; +import { + ensureSchema, + openInboundDb as openInboundDbRaw, + openOutboundDb as openOutboundDbRaw, + upsertSessionRouting, + replaceDestinations, + insertMessage, + type DestinationRow, +} from './db/session-db.js'; import { log } from './log.js'; -import { INBOUND_SCHEMA, OUTBOUND_SCHEMA } from './db/schema.js'; import type { Session } from './types.js'; /** Root directory for all session data. */ @@ -116,23 +124,8 @@ export function initSessionFolder(agentGroupId: string, sessionId: string): void fs.mkdirSync(dir, { recursive: true }); fs.mkdirSync(path.join(dir, 'outbox'), { recursive: true }); - const inPath = inboundDbPath(agentGroupId, sessionId); - if (!fs.existsSync(inPath)) { - const db = new Database(inPath); - db.pragma('journal_mode = DELETE'); - db.exec(INBOUND_SCHEMA); - db.close(); - log.debug('Inbound DB created', { dbPath: inPath }); - } - - const outPath = outboundDbPath(agentGroupId, sessionId); - if (!fs.existsSync(outPath)) { - const db = new Database(outPath); - db.pragma('journal_mode = DELETE'); - db.exec(OUTBOUND_SCHEMA); - db.close(); - log.debug('Outbound DB created', { dbPath: outPath }); - } + ensureSchema(inboundDbPath(agentGroupId, sessionId), 'inbound'); + ensureSchema(outboundDbPath(agentGroupId, sessionId), 'outbound'); } /** @@ -172,18 +165,9 @@ export function writeSessionRouting(agentGroupId: string, sessionId: string): vo } } - const db = new Database(dbPath); - db.pragma('journal_mode = DELETE'); - db.pragma('busy_timeout = 5000'); + const db = openInboundDb(agentGroupId, sessionId); try { - db.prepare( - `INSERT INTO session_routing (id, channel_type, platform_id, thread_id) - VALUES (1, @channel_type, @platform_id, @thread_id) - ON CONFLICT(id) DO UPDATE SET - channel_type = excluded.channel_type, - platform_id = excluded.platform_id, - thread_id = excluded.thread_id`, - ).run({ + upsertSessionRouting(db, { channel_type: channelType, platform_id: platformId, thread_id: session.thread_id, @@ -199,15 +183,7 @@ export function writeDestinations(agentGroupId: string, sessionId: string): void if (!fs.existsSync(dbPath)) return; const rows = getDestinations(agentGroupId); - type DestRow = { - name: string; - display_name: string | null; - type: 'channel' | 'agent'; - channel_type: string | null; - platform_id: string | null; - agent_group_id: string | null; - }; - const resolved: DestRow[] = []; + const resolved: DestinationRow[] = []; for (const row of rows) { if (row.target_type === 'channel') { @@ -235,19 +211,9 @@ export function writeDestinations(agentGroupId: string, sessionId: string): void } } - const db = new Database(dbPath); - db.pragma('journal_mode = DELETE'); - db.pragma('busy_timeout = 5000'); + const db = openInboundDb(agentGroupId, sessionId); try { - const tx = db.transaction((entries: DestRow[]) => { - db.prepare('DELETE FROM destinations').run(); - const stmt = db.prepare( - `INSERT INTO destinations (name, display_name, type, channel_type, platform_id, agent_group_id) - VALUES (@name, @display_name, @type, @channel_type, @platform_id, @agent_group_id)`, - ); - for (const e of entries) stmt.run(e); - }); - tx(resolved); + replaceDestinations(db, resolved); } finally { db.close(); } @@ -279,28 +245,10 @@ export function writeSessionMessage( // Extract base64 attachment data, save to inbox, replace with file paths const content = extractAttachmentFiles(agentGroupId, sessionId, message.id, message.content); - const dbPath = inboundDbPath(agentGroupId, sessionId); - const db = new Database(dbPath); - db.pragma('journal_mode = DELETE'); - db.pragma('busy_timeout = 5000'); - + const db = openInboundDb(agentGroupId, sessionId); try { - // Host uses even seq, container uses odd. This is not just collision - // avoidance between the two DB files — the seq is the agent-facing - // message ID returned by send_message and accepted by edit_message / - // add_reaction, and those tools look up by seq across BOTH tables - // (see container/agent-runner/src/db/messages-out.ts:getMessageIdBySeq). - // So the {messages_in.seq, messages_out.seq} namespace MUST be disjoint, - // or the agent's "edit message #5" could resolve to the wrong row. - const maxSeq = (db.prepare('SELECT COALESCE(MAX(seq), 0) AS m FROM messages_in').get() as { m: number }).m; - const nextSeq = maxSeq < 2 ? 2 : maxSeq + 2 - (maxSeq % 2); // next even - - db.prepare( - `INSERT INTO messages_in (id, seq, kind, timestamp, status, platform_id, channel_type, thread_id, content, process_after, recurrence) - VALUES (@id, @seq, @kind, @timestamp, 'pending', @platformId, @channelType, @threadId, @content, @processAfter, @recurrence)`, - ).run({ + insertMessage(db, { id: message.id, - seq: nextSeq, kind: message.kind, timestamp: message.timestamp, platformId: message.platformId ?? null, @@ -357,19 +305,12 @@ function extractAttachmentFiles( /** Open the inbound DB for a session (host reads/writes). */ export function openInboundDb(agentGroupId: string, sessionId: string): Database.Database { - const dbPath = inboundDbPath(agentGroupId, sessionId); - const db = new Database(dbPath); - db.pragma('journal_mode = DELETE'); - db.pragma('busy_timeout = 5000'); - return db; + return openInboundDbRaw(inboundDbPath(agentGroupId, sessionId)); } /** Open the outbound DB for a session (host reads only). */ export function openOutboundDb(agentGroupId: string, sessionId: string): Database.Database { - const dbPath = outboundDbPath(agentGroupId, sessionId); - const db = new Database(dbPath, { readonly: true }); - db.pragma('busy_timeout = 5000'); - return db; + return openOutboundDbRaw(outboundDbPath(agentGroupId, sessionId)); } /**