Files
nanoclaw/container/agent-runner/src/db/connection.ts
T
gavrielc b59216c299 fix(v2): persist SDK session ID across container restarts
The v2 poll loop held the session ID in a local variable, so every
container restart started a fresh SDK session even though the .jsonl
transcript was still sitting in the shared .claude mount. Store it in
outbound.db (container-owned, already per channel/thread), seed the
loop on startup, clear on /clear, and recover from stale-session
errors the same way v1 did.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-11 01:17:42 +03:00

163 lines
5.0 KiB
TypeScript

/**
* Two-DB connection layer.
*
* The session uses two SQLite files to eliminate write contention across
* the host-container mount boundary:
*
* inbound.db — host writes new messages here; container opens READ-ONLY
* outbound.db — container writes responses + acks here; host opens read-only
*
* Each file has exactly one writer, so no cross-process lock contention.
*/
import Database from 'better-sqlite3';
import fs from 'fs';
const DEFAULT_INBOUND_PATH = '/workspace/inbound.db';
const DEFAULT_OUTBOUND_PATH = '/workspace/outbound.db';
const DEFAULT_HEARTBEAT_PATH = '/workspace/.heartbeat';
let _inbound: Database.Database | null = null;
let _outbound: Database.Database | null = null;
let _heartbeatPath: string = DEFAULT_HEARTBEAT_PATH;
/** Inbound DB — container opens read-only (host is the sole writer). */
export function getInboundDb(): Database.Database {
if (!_inbound) {
const dbPath = process.env.SESSION_INBOUND_DB_PATH || DEFAULT_INBOUND_PATH;
_inbound = new Database(dbPath, { readonly: true });
_inbound.pragma('busy_timeout = 5000');
}
return _inbound;
}
/** Outbound DB — container owns this file (sole writer). */
export function getOutboundDb(): Database.Database {
if (!_outbound) {
const dbPath = process.env.SESSION_OUTBOUND_DB_PATH || DEFAULT_OUTBOUND_PATH;
_outbound = new Database(dbPath);
_outbound.pragma('journal_mode = DELETE');
_outbound.pragma('busy_timeout = 5000');
_outbound.pragma('foreign_keys = ON');
// Lightweight forward-compat: session_state was added after the initial
// v2 schema, so older session DBs don't have it. Create it on demand
// instead of requiring a formal migration pass.
_outbound.exec(`
CREATE TABLE IF NOT EXISTS session_state (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
updated_at TEXT NOT NULL
);
`);
}
return _outbound;
}
/**
* Touch the heartbeat file — replaces the old touchProcessing() DB writes.
* The host checks this file's mtime for stale container detection.
* A file touch is cheaper and avoids cross-boundary DB write contention.
*/
export function touchHeartbeat(): void {
const p = process.env.SESSION_HEARTBEAT_PATH || _heartbeatPath;
const now = new Date();
try {
fs.utimesSync(p, now, now);
} catch {
try {
fs.writeFileSync(p, '');
} catch {
// Silently ignore — parent dir may not exist (e.g., in-memory test DBs)
}
}
}
/**
* Clear stale processing_ack entries on container startup.
* If the previous container crashed, 'processing' entries are leftover.
* Clearing them lets the new container re-process those messages.
*/
export function clearStaleProcessingAcks(): void {
getOutboundDb().prepare("DELETE FROM processing_ack WHERE status = 'processing'").run();
}
/** For tests — creates in-memory DBs with the session schemas. */
export function initTestSessionDb(): { inbound: Database.Database; outbound: Database.Database } {
_inbound = new Database(':memory:');
_inbound.pragma('foreign_keys = ON');
_inbound.exec(`
CREATE TABLE messages_in (
id TEXT PRIMARY KEY,
seq INTEGER UNIQUE,
kind TEXT NOT NULL,
timestamp TEXT NOT NULL,
status TEXT DEFAULT 'pending',
process_after TEXT,
recurrence TEXT,
tries INTEGER DEFAULT 0,
platform_id TEXT,
channel_type TEXT,
thread_id TEXT,
content TEXT NOT NULL
);
CREATE TABLE delivered (
message_out_id TEXT PRIMARY KEY,
platform_message_id TEXT,
status TEXT NOT NULL DEFAULT 'delivered',
delivered_at TEXT NOT NULL
);
CREATE TABLE destinations (
name TEXT PRIMARY KEY,
display_name TEXT,
type TEXT NOT NULL,
channel_type TEXT,
platform_id TEXT,
agent_group_id TEXT
);
`);
_outbound = new Database(':memory:');
_outbound.pragma('foreign_keys = ON');
_outbound.exec(`
CREATE TABLE messages_out (
id TEXT PRIMARY KEY,
seq INTEGER UNIQUE,
in_reply_to TEXT,
timestamp TEXT NOT NULL,
deliver_after TEXT,
recurrence TEXT,
kind TEXT NOT NULL,
platform_id TEXT,
channel_type TEXT,
thread_id TEXT,
content TEXT NOT NULL
);
CREATE TABLE processing_ack (
message_id TEXT PRIMARY KEY,
status TEXT NOT NULL,
status_changed TEXT NOT NULL
);
CREATE TABLE session_state (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
updated_at TEXT NOT NULL
);
`);
return { inbound: _inbound, outbound: _outbound };
}
export function closeSessionDb(): void {
_inbound?.close();
_inbound = null;
_outbound?.close();
_outbound = null;
}
/**
* @deprecated Use getInboundDb() / getOutboundDb() instead.
* Kept for backward compatibility during migration.
*/
export function getSessionDb(): Database.Database {
return getInboundDb();
}