Files
nanoclaw/src/db/sessions.ts
T
exe.dev user 97868af5a7 fix(delivery): make pending_questions/approvals insert idempotent
createPendingQuestion and createPendingApproval both run before the
adapter delivery call. When delivery fails and the retry loop reinvokes
deliverMessage with the same questionId/approvalId, the second attempt
hit UNIQUE constraint on the pending_questions.question_id (or
pending_approvals.approval_id) and threw — so the retry never reached
the send step, and every subsequent retry failed the same way until
max-attempts marked the message permanently failed.

Switch both inserts to INSERT OR IGNORE. Return bool indicating whether
a new row was actually inserted so delivery.ts can avoid logging
"Pending question created" twice for the same card.

Symptom that surfaced this: a send-layer ValidationError on one attempt
followed by SqliteError on every subsequent attempt, with the user
seeing neither the card nor a follow-up. Seen in conjunction with the
Telegram 64-byte callback_data limit (fixed separately in
#1942/chat-sdk-bridge), but the idempotency gap applies to any
transient delivery failure — rate limits, network blips, adapter 5xx —
and is worth fixing on its own.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-23 17:05:41 +00:00

211 lines
8.0 KiB
TypeScript

import type { PendingApproval, PendingQuestion, Session } from '../types.js';
import { getDb } from './connection.js';
// ── Sessions ──
export function createSession(session: Session): void {
getDb()
.prepare(
`INSERT INTO sessions (id, agent_group_id, messaging_group_id, thread_id, agent_provider, status, container_status, last_active, created_at)
VALUES (@id, @agent_group_id, @messaging_group_id, @thread_id, @agent_provider, @status, @container_status, @last_active, @created_at)`,
)
.run(session);
}
export function getSession(id: string): Session | undefined {
return getDb().prepare('SELECT * FROM sessions WHERE id = ?').get(id) as Session | undefined;
}
export function findSession(messagingGroupId: string, threadId: string | null): Session | undefined {
if (threadId) {
return getDb()
.prepare('SELECT * FROM sessions WHERE messaging_group_id = ? AND thread_id = ? AND status = ?')
.get(messagingGroupId, threadId, 'active') as Session | undefined;
}
return getDb()
.prepare('SELECT * FROM sessions WHERE messaging_group_id = ? AND thread_id IS NULL AND status = ?')
.get(messagingGroupId, 'active') as Session | undefined;
}
/**
* Session lookup scoped to a specific agent group. Needed when multiple
* agents are wired to the same messaging group + thread (fan-out) — the
* plain `findSession` would return whichever agent's session happened to
* be first and route to the wrong container.
*/
export function findSessionForAgent(
agentGroupId: string,
messagingGroupId: string,
threadId: string | null,
): Session | undefined {
if (threadId) {
return getDb()
.prepare(
"SELECT * FROM sessions WHERE agent_group_id = ? AND messaging_group_id = ? AND thread_id = ? AND status = 'active'",
)
.get(agentGroupId, messagingGroupId, threadId) as Session | undefined;
}
return getDb()
.prepare(
"SELECT * FROM sessions WHERE agent_group_id = ? AND messaging_group_id = ? AND thread_id IS NULL AND status = 'active'",
)
.get(agentGroupId, messagingGroupId) as Session | undefined;
}
/** Find an active session scoped to an agent group (ignoring messaging group). */
export function findSessionByAgentGroup(agentGroupId: string): Session | undefined {
return getDb()
.prepare("SELECT * FROM sessions WHERE agent_group_id = ? AND status = 'active' ORDER BY created_at DESC LIMIT 1")
.get(agentGroupId) as Session | undefined;
}
export function getSessionsByAgentGroup(agentGroupId: string): Session[] {
return getDb().prepare('SELECT * FROM sessions WHERE agent_group_id = ?').all(agentGroupId) as Session[];
}
export function getActiveSessions(): Session[] {
return getDb().prepare("SELECT * FROM sessions WHERE status = 'active'").all() as Session[];
}
export function getRunningSessions(): Session[] {
return getDb().prepare("SELECT * FROM sessions WHERE container_status IN ('running', 'idle')").all() as Session[];
}
export function updateSession(
id: string,
updates: Partial<Pick<Session, 'status' | 'container_status' | 'last_active' | 'agent_provider'>>,
): void {
const fields: string[] = [];
const values: Record<string, unknown> = { id };
for (const [key, value] of Object.entries(updates)) {
if (value !== undefined) {
fields.push(`${key} = @${key}`);
values[key] = value;
}
}
if (fields.length === 0) return;
getDb()
.prepare(`UPDATE sessions SET ${fields.join(', ')} WHERE id = @id`)
.run(values);
}
export function deleteSession(id: string): void {
getDb().prepare('DELETE FROM sessions WHERE id = ?').run(id);
}
// ── Pending Questions ──
/**
* Insert a pending question row. Idempotent: when delivery fails and retries,
* the second attempt calls this with the same question_id — without `OR
* IGNORE` that would throw UNIQUE and prevent the retry from reaching the
* actual send step. Returns true if a new row was inserted.
*/
export function createPendingQuestion(pq: PendingQuestion): boolean {
const result = getDb()
.prepare(
`INSERT OR IGNORE INTO pending_questions (question_id, session_id, message_out_id, platform_id, channel_type, thread_id, title, options_json, created_at)
VALUES (@question_id, @session_id, @message_out_id, @platform_id, @channel_type, @thread_id, @title, @options_json, @created_at)`,
)
.run({
question_id: pq.question_id,
session_id: pq.session_id,
message_out_id: pq.message_out_id,
platform_id: pq.platform_id,
channel_type: pq.channel_type,
thread_id: pq.thread_id,
title: pq.title,
options_json: JSON.stringify(pq.options),
created_at: pq.created_at,
});
return result.changes > 0;
}
export function getPendingQuestion(questionId: string): PendingQuestion | undefined {
const row = getDb().prepare('SELECT * FROM pending_questions WHERE question_id = ?').get(questionId) as
| (Omit<PendingQuestion, 'options'> & { options_json: string })
| undefined;
if (!row) return undefined;
const { options_json, ...rest } = row;
return { ...rest, options: JSON.parse(options_json) };
}
export function deletePendingQuestion(questionId: string): void {
getDb().prepare('DELETE FROM pending_questions WHERE question_id = ?').run(questionId);
}
// ── Pending Approvals ──
/**
* Insert a pending approval row. Idempotent for the same reason as
* createPendingQuestion: delivery retries with the same approval_id must not
* fail on UNIQUE before the send step gets a chance to succeed.
*/
export function createPendingApproval(
pa: Partial<PendingApproval> &
Pick<
PendingApproval,
'approval_id' | 'request_id' | 'action' | 'payload' | 'created_at' | 'title' | 'options_json'
>,
): boolean {
const result = getDb()
.prepare(
`INSERT OR IGNORE INTO pending_approvals
(approval_id, session_id, request_id, action, payload, created_at,
agent_group_id, channel_type, platform_id, platform_message_id, expires_at, status,
title, options_json)
VALUES
(@approval_id, @session_id, @request_id, @action, @payload, @created_at,
@agent_group_id, @channel_type, @platform_id, @platform_message_id, @expires_at, @status,
@title, @options_json)`,
)
.run({
session_id: null,
agent_group_id: null,
channel_type: null,
platform_id: null,
platform_message_id: null,
expires_at: null,
status: 'pending',
...pa,
});
return result.changes > 0;
}
export function getPendingApproval(approvalId: string): PendingApproval | undefined {
return getDb().prepare('SELECT * FROM pending_approvals WHERE approval_id = ?').get(approvalId) as
| PendingApproval
| undefined;
}
export function updatePendingApprovalStatus(approvalId: string, status: PendingApproval['status']): void {
getDb().prepare('UPDATE pending_approvals SET status = ? WHERE approval_id = ?').run(status, approvalId);
}
export function deletePendingApproval(approvalId: string): void {
getDb().prepare('DELETE FROM pending_approvals WHERE approval_id = ?').run(approvalId);
}
export function getPendingApprovalsByAction(action: string): PendingApproval[] {
return getDb().prepare('SELECT * FROM pending_approvals WHERE action = ?').all(action) as PendingApproval[];
}
/**
* Resolve ask_question render metadata (title + normalized options) for any
* card, regardless of whether it was persisted as a pending_question (generic
* ask_user_question) or a pending_approval (self-mod / OneCLI credential).
*/
export function getAskQuestionRender(
id: string,
): { title: string; options: import('../channels/ask-question.js').NormalizedOption[] } | undefined {
const q = getPendingQuestion(id);
if (q) return { title: q.title, options: q.options };
const a = getDb().prepare('SELECT title, options_json FROM pending_approvals WHERE approval_id = ?').get(id) as
| { title: string; options_json: string }
| undefined;
if (!a || !a.title) return undefined;
return { title: a.title, options: JSON.parse(a.options_json) };
}