/** * Chat SDK StateAdapter backed by SQLite. * Persists subscriptions, locks, KV, and lists across restarts. * * Ported from feat/chat-sdk-integration branch. */ import crypto from 'crypto'; import type Database from 'better-sqlite3'; import type { StateAdapter, QueueEntry } from 'chat'; import { getDb } from './db/connection.js'; interface Lock { threadId: string; token: string; expiresAt: number; } export class SqliteStateAdapter implements StateAdapter { private db!: Database.Database; async connect(): Promise { this.db = getDb(); this.cleanup(); } async disconnect(): Promise {} // --- Key-value --- async get(key: string): Promise { this.cleanup(); const row = this.db.prepare('SELECT value, expires_at FROM chat_sdk_kv WHERE key = ?').get(key) as | { value: string; expires_at: number | null } | undefined; if (!row) return null; if (row.expires_at && row.expires_at < Date.now()) { this.db.prepare('DELETE FROM chat_sdk_kv WHERE key = ?').run(key); return null; } return JSON.parse(row.value) as T; } async set(key: string, value: T, ttlMs?: number): Promise { const expiresAt = ttlMs ? Date.now() + ttlMs : null; this.db .prepare('INSERT OR REPLACE INTO chat_sdk_kv (key, value, expires_at) VALUES (?, ?, ?)') .run(key, JSON.stringify(value), expiresAt); } async setIfNotExists(key: string, value: unknown, ttlMs?: number): Promise { const existing = this.db.prepare('SELECT expires_at FROM chat_sdk_kv WHERE key = ?').get(key) as | { expires_at: number | null } | undefined; if (existing?.expires_at && existing.expires_at < Date.now()) { this.db.prepare('DELETE FROM chat_sdk_kv WHERE key = ?').run(key); } const expiresAt = ttlMs ? Date.now() + ttlMs : null; const result = this.db .prepare('INSERT OR IGNORE INTO chat_sdk_kv (key, value, expires_at) VALUES (?, ?, ?)') .run(key, JSON.stringify(value), expiresAt); return result.changes > 0; } async delete(key: string): Promise { this.db.prepare('DELETE FROM chat_sdk_kv WHERE key = ?').run(key); } // --- Subscriptions --- async subscribe(threadId: string): Promise { this.db.prepare('INSERT OR REPLACE INTO chat_sdk_subscriptions (thread_id) VALUES (?)').run(threadId); } async unsubscribe(threadId: string): Promise { this.db.prepare('DELETE FROM chat_sdk_subscriptions WHERE thread_id = ?').run(threadId); } async isSubscribed(threadId: string): Promise { const row = this.db.prepare('SELECT 1 FROM chat_sdk_subscriptions WHERE thread_id = ? LIMIT 1').get(threadId); return !!row; } // --- Locks --- async acquireLock(threadId: string, ttlMs: number): Promise { const now = Date.now(); const token = crypto.randomUUID(); const expiresAt = now + ttlMs; this.db.prepare('DELETE FROM chat_sdk_locks WHERE thread_id = ? AND expires_at < ?').run(threadId, now); const result = this.db .prepare('INSERT OR IGNORE INTO chat_sdk_locks (thread_id, token, expires_at) VALUES (?, ?, ?)') .run(threadId, token, expiresAt); if (result.changes === 0) return null; return { threadId, token, expiresAt }; } async releaseLock(lock: Lock): Promise { this.db.prepare('DELETE FROM chat_sdk_locks WHERE thread_id = ? AND token = ?').run(lock.threadId, lock.token); } async extendLock(lock: Lock, ttlMs: number): Promise { const newExpiry = Date.now() + ttlMs; const result = this.db .prepare('UPDATE chat_sdk_locks SET expires_at = ? WHERE thread_id = ? AND token = ?') .run(newExpiry, lock.threadId, lock.token); if (result.changes > 0) { lock.expiresAt = newExpiry; return true; } return false; } async forceReleaseLock(threadId: string): Promise { this.db.prepare('DELETE FROM chat_sdk_locks WHERE thread_id = ?').run(threadId); } // --- Lists --- async appendToList(key: string, value: unknown, options?: { maxLength?: number; ttlMs?: number }): Promise { const expiresAt = options?.ttlMs ? Date.now() + options.ttlMs : null; const maxRow = this.db.prepare('SELECT MAX(idx) as maxIdx FROM chat_sdk_lists WHERE key = ?').get(key) as | { maxIdx: number | null } | undefined; const nextIdx = (maxRow?.maxIdx ?? -1) + 1; this.db .prepare('INSERT INTO chat_sdk_lists (key, idx, value, expires_at) VALUES (?, ?, ?, ?)') .run(key, nextIdx, JSON.stringify(value), expiresAt); if (options?.maxLength) { const cutoff = nextIdx - options.maxLength; if (cutoff >= 0) { this.db.prepare('DELETE FROM chat_sdk_lists WHERE key = ? AND idx <= ?').run(key, cutoff); } } } async getList(key: string): Promise { const now = Date.now(); const rows = this.db .prepare( 'SELECT value FROM chat_sdk_lists WHERE key = ? AND (expires_at IS NULL OR expires_at > ?) ORDER BY idx ASC', ) .all(key, now) as { value: string }[]; return rows.map((r) => JSON.parse(r.value) as T); } // --- Queue --- async enqueue(threadId: string, entry: QueueEntry, maxSize: number): Promise { const key = `queue:${threadId}`; await this.appendToList(key, entry, { maxLength: maxSize }); return await this.queueDepth(threadId); } async dequeue(threadId: string): Promise { const key = `queue:${threadId}`; const row = this.db .prepare('SELECT idx, value FROM chat_sdk_lists WHERE key = ? ORDER BY idx ASC LIMIT 1') .get(key) as { idx: number; value: string } | undefined; if (!row) return null; this.db.prepare('DELETE FROM chat_sdk_lists WHERE key = ? AND idx = ?').run(key, row.idx); return JSON.parse(row.value) as QueueEntry; } async queueDepth(threadId: string): Promise { const key = `queue:${threadId}`; const row = this.db.prepare('SELECT COUNT(*) as count FROM chat_sdk_lists WHERE key = ?').get(key) as { count: number; }; return row.count; } // --- Cleanup --- private cleanup(): void { const now = Date.now(); this.db.prepare('DELETE FROM chat_sdk_kv WHERE expires_at IS NOT NULL AND expires_at < ?').run(now); this.db.prepare('DELETE FROM chat_sdk_locks WHERE expires_at < ?').run(now); this.db.prepare('DELETE FROM chat_sdk_lists WHERE expires_at IS NOT NULL AND expires_at < ?').run(now); } }