From 8a06b01646bd12566e320d0c1991ca2ddf9eb510 Mon Sep 17 00:00:00 2001 From: gavrielc Date: Thu, 9 Apr 2026 03:58:35 +0300 Subject: [PATCH] v2: SQLite state adapter, admin commands, compact feedback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Replace in-memory Chat SDK state with SqliteStateAdapter — thread subscriptions now persist across restarts - Add migration 002 for chat_sdk_kv, subscriptions, locks, lists tables - Handle /clear in agent-runner (reset sessionId) — SDK has supportsNonInteractive:false for this command - Pass /compact, /context, /cost, /files through to SDK as admin commands - Skip admin commands in follow-up poll so they start fresh queries - Emit compact_boundary events as user-visible feedback messages - Pass NANOCLAW_ADMIN_USER_ID and NANOCLAW_ASSISTANT_NAME to containers Co-Authored-By: Claude Opus 4.6 (1M context) --- container/agent-runner/src/formatter.ts | 2 +- container/agent-runner/src/poll-loop.ts | 44 +++-- .../agent-runner/src/providers/claude.ts | 4 + src/channels/chat-sdk-bridge.ts | 55 +++--- src/container-runner-v2.ts | 12 ++ src/db/db-v2.test.ts | 2 +- src/db/migrations/002-chat-sdk-state.ts | 36 ++++ src/db/migrations/index.ts | 3 +- src/host-sweep.ts | 12 +- src/state-sqlite.ts | 160 ++++++++++++++++++ 10 files changed, 283 insertions(+), 47 deletions(-) create mode 100644 src/db/migrations/002-chat-sdk-state.ts create mode 100644 src/state-sqlite.ts diff --git a/container/agent-runner/src/formatter.ts b/container/agent-runner/src/formatter.ts index 7324f1b6d..8b0b1e82d 100644 --- a/container/agent-runner/src/formatter.ts +++ b/container/agent-runner/src/formatter.ts @@ -9,7 +9,7 @@ import type { MessageInRow } from './db/messages-in.js'; */ export type CommandCategory = 'admin' | 'filtered' | 'passthrough' | 'none'; -const ADMIN_COMMANDS = new Set(['/remote-control', '/clear', '/compact']); +const ADMIN_COMMANDS = new Set(['/remote-control', '/clear', '/compact', '/context', '/cost', '/files']); const FILTERED_COMMANDS = new Set(['/help', '/login', '/logout', '/doctor', '/config']); export interface CommandInfo { diff --git a/container/agent-runner/src/poll-loop.ts b/container/agent-runner/src/poll-loop.ts index 474be8b82..21fc8e109 100644 --- a/container/agent-runner/src/poll-loop.ts +++ b/container/agent-runner/src/poll-loop.ts @@ -81,7 +81,6 @@ export async function runPollLoop(config: PollLoopConfig): Promise { if (cmdInfo.category === 'admin') { if (!adminUserId || cmdInfo.senderId !== adminUserId) { - // Not admin — send error, mark completed log(`Admin command denied: ${cmdInfo.command} from ${cmdInfo.senderId} (msg: ${msg.id})`); writeMessageOut({ id: generateId(), @@ -94,7 +93,24 @@ export async function runPollLoop(config: PollLoopConfig): Promise { commandIds.push(msg.id); continue; } - // Admin user — format as system command + // Handle admin commands directly + if (cmdInfo.command === '/clear') { + log('Clearing session (resetting sessionId)'); + sessionId = undefined; + resumeAt = undefined; + writeMessageOut({ + id: generateId(), + kind: 'chat', + platform_id: routing.platformId, + channel_type: routing.channelType, + thread_id: routing.threadId, + content: JSON.stringify({ text: 'Session cleared.' }), + }); + commandIds.push(msg.id); + continue; + } + + // Other admin commands — pass through to agent normalMessages.push(msg); continue; } @@ -174,25 +190,16 @@ function formatMessagesWithCommands(messages: MessageInRow[]): string { for (const msg of messages) { if (msg.kind === 'chat' || msg.kind === 'chat-sdk') { const cmdInfo = categorizeMessage(msg); - if (cmdInfo.category === 'passthrough') { + if (cmdInfo.category === 'passthrough' || cmdInfo.category === 'admin') { // Flush normal batch first if (normalBatch.length > 0) { parts.push(formatMessages(normalBatch)); normalBatch.length = 0; } - // Pass raw command text (no XML wrapping) + // Pass raw command text (no XML wrapping) — SDK handles it natively parts.push(cmdInfo.text); continue; } - if (cmdInfo.category === 'admin') { - // Format admin command as a system command block - if (normalBatch.length > 0) { - parts.push(formatMessages(normalBatch)); - normalBatch.length = 0; - } - parts.push(`[SYSTEM COMMAND: ${cmdInfo.command}]\n${cmdInfo.text}`); - continue; - } } normalBatch.push(msg); } @@ -218,8 +225,15 @@ async function processQuery(query: AgentQuery, routing: RoutingContext, config: const pollHandle = setInterval(() => { if (done) return; - // Skip system messages — they're responses for MCP tools (e.g., ask_user_question) - const newMessages = getPendingMessages().filter((m) => m.kind !== 'system'); + // Skip system messages (MCP tool responses) and admin commands (need fresh query) + const newMessages = getPendingMessages().filter((m) => { + if (m.kind === 'system') return false; + if (m.kind === 'chat' || m.kind === 'chat-sdk') { + const cmd = categorizeMessage(m); + if (cmd.category === 'admin') return false; + } + return true; + }); if (newMessages.length > 0) { const newIds = newMessages.map((m) => m.id); markProcessing(newIds); diff --git a/container/agent-runner/src/providers/claude.ts b/container/agent-runner/src/providers/claude.ts index e17c5c56d..adfd0e22b 100644 --- a/container/agent-runner/src/providers/claude.ts +++ b/container/agent-runner/src/providers/claude.ts @@ -212,6 +212,10 @@ export class ClaudeProvider implements AgentProvider { yield { type: 'error', message: 'API retry', retryable: true }; } else if (message.type === 'system' && (message as { subtype?: string }).subtype === 'rate_limit_event') { yield { type: 'error', message: 'Rate limit', retryable: false, classification: 'quota' }; + } else if (message.type === 'system' && (message as { subtype?: string }).subtype === 'compact_boundary') { + const meta = (message as { compact_metadata?: { pre_tokens?: number } }).compact_metadata; + const detail = meta?.pre_tokens ? ` (${meta.pre_tokens.toLocaleString()} tokens compacted)` : ''; + yield { type: 'result', text: `Context compacted${detail}.` }; } else if (message.type === 'system' && (message as { subtype?: string }).subtype === 'task_notification') { const tn = message as { summary?: string }; yield { type: 'progress', message: tn.summary || 'Task notification' }; diff --git a/src/channels/chat-sdk-bridge.ts b/src/channels/chat-sdk-bridge.ts index e3f486ba0..5ab9d885d 100644 --- a/src/channels/chat-sdk-bridge.ts +++ b/src/channels/chat-sdk-bridge.ts @@ -6,10 +6,18 @@ */ import http from 'http'; -import { Chat, Card, CardText, Actions, Button, type Adapter, type ConcurrencyStrategy, type Message as ChatMessage } from 'chat'; -import { createMemoryState } from '@chat-adapter/state-memory'; - +import { + Chat, + Card, + CardText, + Actions, + Button, + type Adapter, + type ConcurrencyStrategy, + type Message as ChatMessage, +} from 'chat'; import { log } from '../log.js'; +import { SqliteStateAdapter } from '../state-sqlite.js'; import type { ChannelAdapter, ChannelSetup, ConversationConfig, InboundMessage } from './adapter.js'; /** Adapter with optional gateway support (e.g., Discord). */ @@ -32,7 +40,7 @@ export interface ChatSdkBridgeConfig { export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter { const { adapter } = config; let chat: Chat; - let state: ReturnType; + let state: SqliteStateAdapter; let setupConfig: ChannelSetup; let conversations: Map; let gatewayAbort: AbortController | null = null; @@ -62,7 +70,7 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter setupConfig = hostConfig; conversations = buildConversationMap(hostConfig.conversations); - state = createMemoryState(); + state = new SqliteStateAdapter(); chat = new Chat({ adapters: { [adapter.name]: adapter }, @@ -105,14 +113,6 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter await chat.initialize(); - // Subscribe registered conversations (after initialize connects state) - for (const conv of hostConfig.conversations) { - if (conv.agentGroupId) { - const threadId = adapter.encodeThreadId({ guildId: '', channelId: conv.platformId } as never); - await state.subscribe(threadId); - } - } - // Start Gateway listener for adapters that support it (e.g., Discord) if (adapter.startGatewayListener) { gatewayAbort = new AbortController(); @@ -184,11 +184,7 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter title: '❓ Question', children: [ CardText(content.question as string), - Actions( - options.map((opt) => - Button({ id: `ncq:${questionId}:${opt}`, label: opt, value: opt }), - ), - ), + Actions(options.map((opt) => Button({ id: `ncq:${questionId}:${opt}`, label: opt, value: opt }))), ], }); await adapter.postMessage(tid, { card, fallbackText: `${content.question}\nOptions: ${options.join(', ')}` }); @@ -229,13 +225,6 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter updateConversations(configs: ConversationConfig[]) { conversations = buildConversationMap(configs); - // Subscribe new conversations - for (const conv of configs) { - if (conv.agentGroupId) { - const threadId = adapter.encodeThreadId({ guildId: '', channelId: conv.platformId } as never); - state.subscribe(threadId).catch(() => {}); - } - } }, }; } @@ -246,7 +235,11 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter * sends ALL raw events (including INTERACTION_CREATE for button clicks) * to the webhookUrl, which we handle here. */ -function startLocalWebhookServer(adapter: GatewayAdapter, setupConfig: ChannelSetup, botToken?: string): Promise { +function startLocalWebhookServer( + adapter: GatewayAdapter, + setupConfig: ChannelSetup, + botToken?: string, +): Promise { return new Promise((resolve) => { const server = http.createServer((req, res) => { const chunks: Buffer[] = []; @@ -275,7 +268,12 @@ function startLocalWebhookServer(adapter: GatewayAdapter, setupConfig: ChannelSe }); } -async function handleForwardedEvent(body: string, adapter: GatewayAdapter, setupConfig: ChannelSetup, botToken?: string): Promise { +async function handleForwardedEvent( + body: string, + adapter: GatewayAdapter, + setupConfig: ChannelSetup, + botToken?: string, +): Promise { let event: { type: string; data: Record }; try { event = JSON.parse(body); @@ -305,7 +303,8 @@ async function handleForwardedEvent(body: string, adapter: GatewayAdapter, setup } // Update the card to show the selected answer and remove buttons - const originalEmbeds = ((interaction.message as Record)?.embeds as Array>) || []; + const originalEmbeds = + ((interaction.message as Record)?.embeds as Array>) || []; const originalDescription = (originalEmbeds[0]?.description as string) || ''; try { await fetch(`https://discord.com/api/v10/interactions/${interactionId}/${interactionToken}/callback`, { diff --git a/src/container-runner-v2.ts b/src/container-runner-v2.ts index c1b0aace2..81bbd5078 100644 --- a/src/container-runner-v2.ts +++ b/src/container-runner-v2.ts @@ -12,6 +12,7 @@ import { OneCLI } from '@onecli-sh/sdk'; import { CONTAINER_IMAGE, DATA_DIR, GROUPS_DIR, IDLE_TIMEOUT, ONECLI_URL, TIMEZONE } from './config.js'; import { CONTAINER_RUNTIME_BIN, hostGatewayArgs, readonlyMountArgs, stopContainer } from './container-runtime.js'; import { getAgentGroup } from './db/agent-groups.js'; +import { getMessagingGroup } from './db/messaging-groups.js'; import { log } from './log.js'; import { validateAdditionalMounts } from './mount-security.js'; import { @@ -227,6 +228,17 @@ async function buildContainerArgs( args.push('-e', `AGENT_PROVIDER=${session.agent_provider || agentGroup.agent_provider || 'claude'}`); args.push('-e', `SESSION_DB_PATH=/workspace/session.db`); + // Pass admin user ID and assistant name from messaging group/agent group + if (session.messaging_group_id) { + const mg = getMessagingGroup(session.messaging_group_id); + if (mg?.admin_user_id) { + args.push('-e', `NANOCLAW_ADMIN_USER_ID=${mg.admin_user_id}`); + } + } + if (agentGroup.name) { + args.push('-e', `NANOCLAW_ASSISTANT_NAME=${agentGroup.name}`); + } + // OneCLI gateway const onecliApplied = await onecli.applyContainerConfig(args, { addHostMapping: false, agent: agentIdentifier }); if (onecliApplied) { diff --git a/src/db/db-v2.test.ts b/src/db/db-v2.test.ts index daa957609..bea933440 100644 --- a/src/db/db-v2.test.ts +++ b/src/db/db-v2.test.ts @@ -62,7 +62,7 @@ describe('migrations', () => { const db = initTestDb(); runMigrations(db); const row = db.prepare('SELECT MAX(version) as v FROM schema_version').get() as { v: number }; - expect(row.v).toBe(1); + expect(row.v).toBe(2); }); }); diff --git a/src/db/migrations/002-chat-sdk-state.ts b/src/db/migrations/002-chat-sdk-state.ts new file mode 100644 index 000000000..0861af440 --- /dev/null +++ b/src/db/migrations/002-chat-sdk-state.ts @@ -0,0 +1,36 @@ +import type Database from 'better-sqlite3'; + +import type { Migration } from './index.js'; + +export const migration002: Migration = { + version: 2, + name: 'chat-sdk-state', + up(db: Database.Database) { + db.exec(` + CREATE TABLE chat_sdk_kv ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL, + expires_at INTEGER + ); + + CREATE TABLE chat_sdk_subscriptions ( + thread_id TEXT PRIMARY KEY, + subscribed_at TEXT NOT NULL DEFAULT (datetime('now')) + ); + + CREATE TABLE chat_sdk_locks ( + thread_id TEXT PRIMARY KEY, + token TEXT NOT NULL, + expires_at INTEGER NOT NULL + ); + + CREATE TABLE chat_sdk_lists ( + key TEXT NOT NULL, + idx INTEGER NOT NULL, + value TEXT NOT NULL, + expires_at INTEGER, + PRIMARY KEY (key, idx) + ); + `); + }, +}; diff --git a/src/db/migrations/index.ts b/src/db/migrations/index.ts index 54e848c17..114a5211b 100644 --- a/src/db/migrations/index.ts +++ b/src/db/migrations/index.ts @@ -2,6 +2,7 @@ import type Database from 'better-sqlite3'; import { log } from '../../log.js'; import { migration001 } from './001-initial.js'; +import { migration002 } from './002-chat-sdk-state.js'; export interface Migration { version: number; @@ -9,7 +10,7 @@ export interface Migration { up: (db: Database.Database) => void; } -const migrations: Migration[] = [migration001]; +const migrations: Migration[] = [migration001, migration002]; export function runMigrations(db: Database.Database): void { db.exec(` diff --git a/src/host-sweep.ts b/src/host-sweep.ts index 0c8ca41d4..d93d821d9 100644 --- a/src/host-sweep.ts +++ b/src/host-sweep.ts @@ -142,7 +142,17 @@ async function sweepSession(session: Session): Promise { db.prepare( `INSERT INTO messages_in (id, seq, kind, timestamp, status, process_after, recurrence, platform_id, channel_type, thread_id, content) VALUES (?, ?, ?, datetime('now'), 'pending', ?, ?, ?, ?, ?, ?)`, - ).run(newId, nextSeq, msg.kind, nextRun, msg.recurrence, msg.platform_id, msg.channel_type, msg.thread_id, msg.content); + ).run( + newId, + nextSeq, + msg.kind, + nextRun, + msg.recurrence, + msg.platform_id, + msg.channel_type, + msg.thread_id, + msg.content, + ); // Remove recurrence from the completed message so it doesn't spawn again db.prepare('UPDATE messages_in SET recurrence = NULL WHERE id = ?').run(msg.id); diff --git a/src/state-sqlite.ts b/src/state-sqlite.ts new file mode 100644 index 000000000..64731a2bd --- /dev/null +++ b/src/state-sqlite.ts @@ -0,0 +1,160 @@ +/** + * Chat SDK StateAdapter backed by SQLite. + * Persists subscriptions, locks, KV, and lists across restarts. + * + * Ported from feat/chat-sdk-integration branch. + */ +import crypto from 'crypto'; + +import type Database from 'better-sqlite3'; +import type { StateAdapter, QueueEntry } from 'chat'; + +import { getDb } from './db/connection.js'; + +interface Lock { + threadId: string; + token: string; + expiresAt: number; +} + +export class SqliteStateAdapter implements StateAdapter { + private db!: Database.Database; + + async connect(): Promise { + this.db = getDb(); + this.cleanup(); + } + + async disconnect(): Promise {} + + // --- Key-value --- + + async get(key: string): Promise { + this.cleanup(); + const row = this.db + .prepare('SELECT value, expires_at FROM chat_sdk_kv WHERE key = ?') + .get(key) as { value: string; expires_at: number | null } | undefined; + if (!row) return null; + if (row.expires_at && row.expires_at < Date.now()) { + this.db.prepare('DELETE FROM chat_sdk_kv WHERE key = ?').run(key); + return null; + } + return JSON.parse(row.value) as T; + } + + async set(key: string, value: T, ttlMs?: number): Promise { + const expiresAt = ttlMs ? Date.now() + ttlMs : null; + this.db.prepare('INSERT OR REPLACE INTO chat_sdk_kv (key, value, expires_at) VALUES (?, ?, ?)').run(key, JSON.stringify(value), expiresAt); + } + + async setIfNotExists(key: string, value: unknown, ttlMs?: number): Promise { + const existing = this.db.prepare('SELECT expires_at FROM chat_sdk_kv WHERE key = ?').get(key) as { expires_at: number | null } | undefined; + if (existing?.expires_at && existing.expires_at < Date.now()) { + this.db.prepare('DELETE FROM chat_sdk_kv WHERE key = ?').run(key); + } + const expiresAt = ttlMs ? Date.now() + ttlMs : null; + const result = this.db.prepare('INSERT OR IGNORE INTO chat_sdk_kv (key, value, expires_at) VALUES (?, ?, ?)').run(key, JSON.stringify(value), expiresAt); + return result.changes > 0; + } + + async delete(key: string): Promise { + this.db.prepare('DELETE FROM chat_sdk_kv WHERE key = ?').run(key); + } + + // --- Subscriptions --- + + async subscribe(threadId: string): Promise { + this.db.prepare('INSERT OR REPLACE INTO chat_sdk_subscriptions (thread_id) VALUES (?)').run(threadId); + } + + async unsubscribe(threadId: string): Promise { + this.db.prepare('DELETE FROM chat_sdk_subscriptions WHERE thread_id = ?').run(threadId); + } + + async isSubscribed(threadId: string): Promise { + const row = this.db.prepare('SELECT 1 FROM chat_sdk_subscriptions WHERE thread_id = ? LIMIT 1').get(threadId); + return !!row; + } + + // --- Locks --- + + async acquireLock(threadId: string, ttlMs: number): Promise { + const now = Date.now(); + const token = crypto.randomUUID(); + const expiresAt = now + ttlMs; + this.db.prepare('DELETE FROM chat_sdk_locks WHERE thread_id = ? AND expires_at < ?').run(threadId, now); + const result = this.db.prepare('INSERT OR IGNORE INTO chat_sdk_locks (thread_id, token, expires_at) VALUES (?, ?, ?)').run(threadId, token, expiresAt); + if (result.changes === 0) return null; + return { threadId, token, expiresAt }; + } + + async releaseLock(lock: Lock): Promise { + this.db.prepare('DELETE FROM chat_sdk_locks WHERE thread_id = ? AND token = ?').run(lock.threadId, lock.token); + } + + async extendLock(lock: Lock, ttlMs: number): Promise { + const newExpiry = Date.now() + ttlMs; + const result = this.db.prepare('UPDATE chat_sdk_locks SET expires_at = ? WHERE thread_id = ? AND token = ?').run(newExpiry, lock.threadId, lock.token); + if (result.changes > 0) { + lock.expiresAt = newExpiry; + return true; + } + return false; + } + + async forceReleaseLock(threadId: string): Promise { + this.db.prepare('DELETE FROM chat_sdk_locks WHERE thread_id = ?').run(threadId); + } + + // --- Lists --- + + async appendToList(key: string, value: unknown, options?: { maxLength?: number; ttlMs?: number }): Promise { + const expiresAt = options?.ttlMs ? Date.now() + options.ttlMs : null; + const maxRow = this.db.prepare('SELECT MAX(idx) as maxIdx FROM chat_sdk_lists WHERE key = ?').get(key) as { maxIdx: number | null } | undefined; + const nextIdx = (maxRow?.maxIdx ?? -1) + 1; + this.db.prepare('INSERT INTO chat_sdk_lists (key, idx, value, expires_at) VALUES (?, ?, ?, ?)').run(key, nextIdx, JSON.stringify(value), expiresAt); + if (options?.maxLength) { + const cutoff = nextIdx - options.maxLength; + if (cutoff >= 0) { + this.db.prepare('DELETE FROM chat_sdk_lists WHERE key = ? AND idx <= ?').run(key, cutoff); + } + } + } + + async getList(key: string): Promise { + const now = Date.now(); + const rows = this.db.prepare('SELECT value FROM chat_sdk_lists WHERE key = ? AND (expires_at IS NULL OR expires_at > ?) ORDER BY idx ASC').all(key, now) as { value: string }[]; + return rows.map((r) => JSON.parse(r.value) as T); + } + + // --- Queue --- + + async enqueue(threadId: string, entry: QueueEntry, maxSize: number): Promise { + const key = `queue:${threadId}`; + await this.appendToList(key, entry, { maxLength: maxSize }); + return await this.queueDepth(threadId); + } + + async dequeue(threadId: string): Promise { + const key = `queue:${threadId}`; + const row = this.db.prepare('SELECT idx, value FROM chat_sdk_lists WHERE key = ? ORDER BY idx ASC LIMIT 1').get(key) as { idx: number; value: string } | undefined; + if (!row) return null; + this.db.prepare('DELETE FROM chat_sdk_lists WHERE key = ? AND idx = ?').run(key, row.idx); + return JSON.parse(row.value) as QueueEntry; + } + + async queueDepth(threadId: string): Promise { + const key = `queue:${threadId}`; + const row = this.db.prepare('SELECT COUNT(*) as count FROM chat_sdk_lists WHERE key = ?').get(key) as { count: number }; + return row.count; + } + + // --- Cleanup --- + + private cleanup(): void { + const now = Date.now(); + this.db.prepare('DELETE FROM chat_sdk_kv WHERE expires_at IS NOT NULL AND expires_at < ?').run(now); + this.db.prepare('DELETE FROM chat_sdk_locks WHERE expires_at < ?').run(now); + this.db.prepare('DELETE FROM chat_sdk_lists WHERE expires_at IS NOT NULL AND expires_at < ?').run(now); + } +}