Files
nanoclaw/src/db/session-db.ts
T
exe.dev user 209061f54f fix(sweep): wake before reset + idempotent retry for orphan claims
When a container exits with an unresolved processing_ack claim, the
sweep's crashed-container cleanup would reset the matching inbound
message with tries++ and a future process_after. dueCount then dropped
to 0, so the wake step never fired — and the next sweep tick found the
same orphan claim, bumped tries again, and pushed process_after further
out. The message reached MAX_TRIES and was marked failed without any
container ever being spawned.

Two changes:

1. Reorder sweep so the wake step runs before crashed-container
   cleanup. A fresh container clears orphan 'processing' rows on its
   own startup (container/agent-runner/src/db/connection.ts), so once
   we get it running the claim resolves itself.

2. Make resetStuckProcessingRows idempotent: if a message already has
   process_after set to a future time, skip the retry bump. The wake
   path will pick it up when the backoff elapses. Requires returning
   process_after from getMessageForRetry.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-23 15:12:16 +00:00

288 lines
10 KiB
TypeScript

/**
* SQL operations on per-session inbound/outbound DBs.
*
* These are NOT the central app DB — they're the cross-mount SQLite files
* shared between host and container. Callers own the connection lifecycle
* (open-write-close per op). See session-manager.ts header for invariants.
*/
import Database from 'better-sqlite3';
import { INBOUND_SCHEMA, OUTBOUND_SCHEMA } from './schema.js';
/** Apply the inbound or outbound schema to a DB file. Idempotent. */
export function ensureSchema(dbPath: string, schema: 'inbound' | 'outbound'): void {
const db = new Database(dbPath);
db.pragma('journal_mode = DELETE');
db.exec(schema === 'inbound' ? INBOUND_SCHEMA : OUTBOUND_SCHEMA);
db.close();
}
/** Open the inbound DB for a session (host reads/writes). */
export function openInboundDb(dbPath: string): Database.Database {
const db = new Database(dbPath);
db.pragma('journal_mode = DELETE');
db.pragma('busy_timeout = 5000');
return db;
}
/** Open the outbound DB for a session (host reads only). */
export function openOutboundDb(dbPath: string): Database.Database {
const db = new Database(dbPath, { readonly: true });
db.pragma('busy_timeout = 5000');
return db;
}
export function upsertSessionRouting(
db: Database.Database,
routing: { channel_type: string | null; platform_id: string | null; thread_id: string | null },
): void {
db.prepare(
`INSERT INTO session_routing (id, channel_type, platform_id, thread_id)
VALUES (1, @channel_type, @platform_id, @thread_id)
ON CONFLICT(id) DO UPDATE SET
channel_type = excluded.channel_type,
platform_id = excluded.platform_id,
thread_id = excluded.thread_id`,
).run(routing);
}
export interface DestinationRow {
name: string;
display_name: string | null;
type: 'channel' | 'agent';
channel_type: string | null;
platform_id: string | null;
agent_group_id: string | null;
}
export function replaceDestinations(db: Database.Database, entries: DestinationRow[]): void {
const tx = db.transaction((rows: DestinationRow[]) => {
db.prepare('DELETE FROM destinations').run();
const stmt = db.prepare(
`INSERT INTO destinations (name, display_name, type, channel_type, platform_id, agent_group_id)
VALUES (@name, @display_name, @type, @channel_type, @platform_id, @agent_group_id)`,
);
for (const row of rows) stmt.run(row);
});
tx(entries);
}
// ---------------------------------------------------------------------------
// messages_in
// ---------------------------------------------------------------------------
/**
* Next even seq number for host-owned inbound.db.
*
* Exported so the scheduling module's task helpers can maintain the
* host-writes-even-seq invariant without duplicating the logic. Not part of
* the general public API — imported by `src/modules/scheduling/db.ts` only.
*/
export function nextEvenSeq(db: Database.Database): number {
const maxSeq = (db.prepare('SELECT COALESCE(MAX(seq), 0) AS m FROM messages_in').get() as { m: number }).m;
return maxSeq < 2 ? 2 : maxSeq + 2 - (maxSeq % 2);
}
export function insertMessage(
db: Database.Database,
message: {
id: string;
kind: string;
timestamp: string;
platformId: string | null;
channelType: string | null;
threadId: string | null;
content: string;
processAfter: string | null;
recurrence: string | null;
/**
* 1 = wake the agent (default); 0 = accumulate as context only.
* Host countDueMessages gates on this; container reads everything.
*/
trigger?: 0 | 1;
},
): void {
db.prepare(
`INSERT INTO messages_in (id, seq, kind, timestamp, status, platform_id, channel_type, thread_id, content, process_after, recurrence, series_id, trigger)
VALUES (@id, @seq, @kind, @timestamp, 'pending', @platformId, @channelType, @threadId, @content, @processAfter, @recurrence, @id, @trigger)`,
).run({
...message,
trigger: message.trigger ?? 1,
seq: nextEvenSeq(db),
});
}
export function countDueMessages(db: Database.Database): number {
return (
db
.prepare(
`SELECT COUNT(*) as count FROM messages_in
WHERE status = 'pending'
AND trigger = 1
AND (process_after IS NULL OR datetime(process_after) <= datetime('now'))`,
)
.get() as { count: number }
).count;
}
export function markMessageFailed(db: Database.Database, messageId: string): void {
db.prepare("UPDATE messages_in SET status = 'failed' WHERE id = ?").run(messageId);
}
export function retryWithBackoff(db: Database.Database, messageId: string, backoffSec: number): void {
db.prepare(
`UPDATE messages_in SET tries = tries + 1, process_after = datetime('now', '+${backoffSec} seconds') WHERE id = ?`,
).run(messageId);
}
export function getMessageForRetry(
db: Database.Database,
messageId: string,
status: string,
): { id: string; tries: number; processAfter: string | null } | undefined {
return db
.prepare('SELECT id, tries, process_after as processAfter FROM messages_in WHERE id = ? AND status = ?')
.get(messageId, status) as { id: string; tries: number; processAfter: string | null } | undefined;
}
export function syncProcessingAcks(inDb: Database.Database, outDb: Database.Database): void {
const completed = outDb
.prepare("SELECT message_id FROM processing_ack WHERE status IN ('completed', 'failed')")
.all() as Array<{ message_id: string }>;
if (completed.length === 0) return;
const updateStmt = inDb.prepare("UPDATE messages_in SET status = 'completed' WHERE id = ? AND status != 'completed'");
inDb.transaction(() => {
for (const { message_id } of completed) {
updateStmt.run(message_id);
}
})();
}
export function getStuckProcessingIds(outDb: Database.Database): string[] {
return (
outDb.prepare("SELECT message_id FROM processing_ack WHERE status = 'processing'").all() as Array<{
message_id: string;
}>
).map((r) => r.message_id);
}
export interface ProcessingClaim {
message_id: string;
status_changed: string;
}
/** Return processing_ack rows still in 'processing' with their claim timestamps. */
export function getProcessingClaims(outDb: Database.Database): ProcessingClaim[] {
return outDb
.prepare("SELECT message_id, status_changed FROM processing_ack WHERE status = 'processing'")
.all() as ProcessingClaim[];
}
export interface ContainerState {
current_tool: string | null;
tool_declared_timeout_ms: number | null;
tool_started_at: string | null;
}
/**
* Read the container's current tool-in-flight state, if any. Returns null
* when either the table doesn't exist yet (older session DB) or no tool is
* active. Host sweep reads this to widen stuck-detection tolerance while
* Bash is running with a long declared timeout.
*/
export function getContainerState(outDb: Database.Database): ContainerState | null {
try {
const row = outDb
.prepare(
`SELECT current_tool, tool_declared_timeout_ms, tool_started_at
FROM container_state WHERE id = 1`,
)
.get() as ContainerState | undefined;
return row ?? null;
} catch {
// Table not present on older session DBs — treat as "no tool in flight".
return null;
}
}
// ---------------------------------------------------------------------------
// messages_out (read-only from host)
// ---------------------------------------------------------------------------
export interface OutboundMessage {
id: string;
kind: string;
platform_id: string | null;
channel_type: string | null;
thread_id: string | null;
content: string;
}
export function getDueOutboundMessages(db: Database.Database): OutboundMessage[] {
return db
.prepare(
`SELECT * FROM messages_out
WHERE (deliver_after IS NULL OR deliver_after <= datetime('now'))
ORDER BY timestamp ASC`,
)
.all() as OutboundMessage[];
}
// ---------------------------------------------------------------------------
// delivered
// ---------------------------------------------------------------------------
export function getDeliveredIds(db: Database.Database): Set<string> {
return new Set(
(db.prepare('SELECT message_out_id FROM delivered').all() as Array<{ message_out_id: string }>).map(
(r) => r.message_out_id,
),
);
}
export function markDelivered(db: Database.Database, messageOutId: string, platformMessageId: string | null): void {
db.prepare(
"INSERT OR IGNORE INTO delivered (message_out_id, platform_message_id, status, delivered_at) VALUES (?, ?, 'delivered', datetime('now'))",
).run(messageOutId, platformMessageId ?? null);
}
export function markDeliveryFailed(db: Database.Database, messageOutId: string): void {
db.prepare(
"INSERT OR IGNORE INTO delivered (message_out_id, platform_message_id, status, delivered_at) VALUES (?, NULL, 'failed', datetime('now'))",
).run(messageOutId);
}
/** Ensure the delivered table has columns added after initial schema. */
export function migrateDeliveredTable(db: Database.Database): void {
const cols = new Set(
(db.prepare("PRAGMA table_info('delivered')").all() as Array<{ name: string }>).map((c) => c.name),
);
if (!cols.has('platform_message_id')) {
db.prepare('ALTER TABLE delivered ADD COLUMN platform_message_id TEXT').run();
}
if (!cols.has('status')) {
db.prepare("ALTER TABLE delivered ADD COLUMN status TEXT NOT NULL DEFAULT 'delivered'").run();
}
}
// Adds columns added to messages_in after the initial v2 schema to
// pre-existing session DBs. No-op on fresh installs where the columns are
// in the baseline schema. Backfills existing rows so invariants hold.
export function migrateMessagesInTable(db: Database.Database): void {
const cols = new Set(
(db.prepare("PRAGMA table_info('messages_in')").all() as Array<{ name: string }>).map((c) => c.name),
);
if (!cols.has('series_id')) {
db.prepare('ALTER TABLE messages_in ADD COLUMN series_id TEXT').run();
db.prepare('UPDATE messages_in SET series_id = id WHERE series_id IS NULL').run();
db.prepare('CREATE INDEX IF NOT EXISTS idx_messages_in_series ON messages_in(series_id)').run();
}
if (!cols.has('trigger')) {
// All pre-existing rows got written with the old "every inbound wakes
// the agent" semantics, so backfill 1 and default 1 for new inserts.
db.prepare('ALTER TABLE messages_in ADD COLUMN trigger INTEGER NOT NULL DEFAULT 1').run();
}
}