From b591d7ce96323599e697a5de73600a1bd5d78311 Mon Sep 17 00:00:00 2001 From: gavrielc Date: Fri, 10 Apr 2026 16:45:53 +0300 Subject: [PATCH] refactor: move destinations from JSON file into inbound.db MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The per-session destination map was being written as a sidecar JSON file (/workspace/.nanoclaw-destinations.json) — inconsistent with the rest of v2, where all host↔container IO goes through inbound.db / outbound.db. Move it into a `destinations` table in INBOUND_SCHEMA. The host writes it before every container wake AND on demand (e.g. after create_agent) so the creator sees the new child destination mid-session without a restart. The container queries the table live on every lookup — no cache, no staleness window. - src/db/schema.ts: add `destinations` table to INBOUND_SCHEMA. - src/session-manager.ts: writeDestinationsFile → writeDestinations, writes via DELETE + INSERT inside a transaction. - src/delivery.ts: create_agent handler calls writeDestinations on the creator's session after inserting the new destination rows. - container/agent-runner/src/destinations.ts: queries inbound.db directly in every findByName/getAllDestinations/findByRouting call. No more cache. No setDestinationsForTest (obsolete). No fs import. - container/agent-runner/src/index.ts and mcp-tools/index.ts: remove loadDestinations() calls — no longer needed. - Test helper initTestSessionDb creates the destinations table. Integration test inserts a row directly instead of mocking the cache. No backwards compatibility: sessions predating the schema update must be recreated. This is fine on the v2 branch. Co-Authored-By: Claude Opus 4.6 (1M context) --- container/agent-runner/src/db/connection.ts | 8 ++ container/agent-runner/src/destinations.ts | 84 +++++++++++-------- container/agent-runner/src/index.ts | 5 +- .../agent-runner/src/integration.test.ts | 19 ++--- container/agent-runner/src/mcp-tools/index.ts | 5 -- src/container-runner.ts | 6 +- src/db/schema.ts | 15 +++- src/delivery.ts | 5 ++ src/session-manager.ts | 64 ++++++++++---- 9 files changed, 132 insertions(+), 79 deletions(-) diff --git a/container/agent-runner/src/db/connection.ts b/container/agent-runner/src/db/connection.ts index 08775310b..954ebbcf8 100644 --- a/container/agent-runner/src/db/connection.ts +++ b/container/agent-runner/src/db/connection.ts @@ -95,6 +95,14 @@ export function initTestSessionDb(): { inbound: Database.Database; outbound: Dat status TEXT NOT NULL DEFAULT 'delivered', delivered_at TEXT NOT NULL ); + CREATE TABLE destinations ( + name TEXT PRIMARY KEY, + display_name TEXT, + type TEXT NOT NULL, + channel_type TEXT, + platform_id TEXT, + agent_group_id TEXT + ); `); _outbound = new Database(':memory:'); diff --git a/container/agent-runner/src/destinations.ts b/container/agent-runner/src/destinations.ts index 57f151d59..d525cf133 100644 --- a/container/agent-runner/src/destinations.ts +++ b/container/agent-runner/src/destinations.ts @@ -1,11 +1,16 @@ /** - * Destination map loaded at container startup from - * /workspace/.nanoclaw-destinations.json (written by the host on wake). + * Destination map — lives in inbound.db's `destinations` table. * - * The map is BOTH the routing table and the ACL — if a name/target - * isn't in here, the agent can't reach it. + * The host writes this table before every container wake AND on demand + * (e.g. when a new child agent is created mid-session). The container + * queries the table live on every lookup, so admin changes take effect + * immediately — no restart required. + * + * This table is BOTH the routing map and the container-visible ACL. + * The host re-validates on the delivery side against the central DB, + * so even if this table is stale the host's enforcement is authoritative. */ -import fs from 'fs'; +import { getInboundDb } from './db/connection.js'; export interface DestinationEntry { name: string; @@ -16,36 +21,34 @@ export interface DestinationEntry { agentGroupId?: string; } -const DEST_FILE = '/workspace/.nanoclaw-destinations.json'; +interface DestRow { + name: string; + display_name: string | null; + type: 'channel' | 'agent'; + channel_type: string | null; + platform_id: string | null; + agent_group_id: string | null; +} -let cache: DestinationEntry[] = []; - -export function loadDestinations(): void { - try { - if (!fs.existsSync(DEST_FILE)) { - cache = []; - return; - } - const raw = fs.readFileSync(DEST_FILE, 'utf-8'); - const parsed = JSON.parse(raw) as { destinations?: DestinationEntry[] }; - cache = Array.isArray(parsed.destinations) ? parsed.destinations : []; - } catch (err) { - console.error(`[destinations] Failed to load: ${err instanceof Error ? err.message : String(err)}`); - cache = []; - } +function rowToEntry(row: DestRow): DestinationEntry { + return { + name: row.name, + displayName: row.display_name ?? row.name, + type: row.type, + channelType: row.channel_type ?? undefined, + platformId: row.platform_id ?? undefined, + agentGroupId: row.agent_group_id ?? undefined, + }; } export function getAllDestinations(): DestinationEntry[] { - return cache; -} - -/** Test-only: inject destinations without touching the filesystem. */ -export function setDestinationsForTest(destinations: DestinationEntry[]): void { - cache = destinations; + const rows = getInboundDb().prepare('SELECT * FROM destinations ORDER BY name').all() as DestRow[]; + return rows.map(rowToEntry); } export function findByName(name: string): DestinationEntry | undefined { - return cache.find((d) => d.name === name); + const row = getInboundDb().prepare('SELECT * FROM destinations WHERE name = ?').get(name) as DestRow | undefined; + return row ? rowToEntry(row) : undefined; } /** @@ -57,15 +60,23 @@ export function findByRouting( platformId: string | null | undefined, ): DestinationEntry | undefined { if (!channelType || !platformId) return undefined; - if (channelType === 'agent') { - return cache.find((d) => d.type === 'agent' && d.agentGroupId === platformId); - } - return cache.find((d) => d.type === 'channel' && d.channelType === channelType && d.platformId === platformId); + const db = getInboundDb(); + const row = + channelType === 'agent' + ? (db + .prepare("SELECT * FROM destinations WHERE type = 'agent' AND agent_group_id = ?") + .get(platformId) as DestRow | undefined) + : (db + .prepare("SELECT * FROM destinations WHERE type = 'channel' AND channel_type = ? AND platform_id = ?") + .get(channelType, platformId) as DestRow | undefined); + return row ? rowToEntry(row) : undefined; } /** Generate the system-prompt addendum describing destinations and syntax. */ export function buildSystemPromptAddendum(): string { - if (cache.length === 0) { + const all = getAllDestinations(); + + if (all.length === 0) { return [ '## Sending messages', '', @@ -74,9 +85,8 @@ export function buildSystemPromptAddendum(): string { } // Single-destination shortcut: the agent just writes its response normally. - // No wrapping needed. This preserves the simple case (one user, one channel). - if (cache.length === 1) { - const d = cache[0]; + if (all.length === 1) { + const d = all[0]; const label = d.displayName && d.displayName !== d.name ? ` (${d.displayName})` : ''; return [ '## Sending messages', @@ -90,7 +100,7 @@ export function buildSystemPromptAddendum(): string { } const lines = ['## Sending messages', '', 'You can send messages to the following destinations:', '']; - for (const d of cache) { + for (const d of all) { const label = d.displayName && d.displayName !== d.name ? ` (${d.displayName})` : ''; lines.push(`- \`${d.name}\`${label}`); } diff --git a/container/agent-runner/src/index.ts b/container/agent-runner/src/index.ts index 8bada5b05..6692d3311 100644 --- a/container/agent-runner/src/index.ts +++ b/container/agent-runner/src/index.ts @@ -26,7 +26,7 @@ import fs from 'fs'; import path from 'path'; import { fileURLToPath } from 'url'; -import { buildSystemPromptAddendum, loadDestinations } from './destinations.js'; +import { buildSystemPromptAddendum } from './destinations.js'; import { createProvider, type ProviderName } from './providers/factory.js'; import { runPollLoop } from './poll-loop.js'; @@ -45,9 +45,6 @@ async function main(): Promise { const provider = createProvider(providerName, { assistantName }); - // Load destination map (written by host on every wake) - loadDestinations(); - // Load global CLAUDE.md as additional system context, then append destinations addendum let systemPrompt: string | undefined; if (fs.existsSync(GLOBAL_CLAUDE_MD)) { diff --git a/container/agent-runner/src/integration.test.ts b/container/agent-runner/src/integration.test.ts index 90aae2b5e..d30f324d3 100644 --- a/container/agent-runner/src/integration.test.ts +++ b/container/agent-runner/src/integration.test.ts @@ -1,7 +1,6 @@ import { describe, it, expect, beforeEach, afterEach } from 'vitest'; import { initTestSessionDb, closeSessionDb, getInboundDb, getOutboundDb } from './db/connection.js'; -import { setDestinationsForTest } from './destinations.js'; import { getUndeliveredMessages } from './db/messages-out.js'; import { getPendingMessages } from './db/messages-in.js'; import { MockProvider } from './providers/mock.js'; @@ -9,21 +8,17 @@ import { runPollLoop } from './poll-loop.js'; beforeEach(() => { initTestSessionDb(); - // Provide a test destination map so output parsing can resolve "discord-test" → routing - setDestinationsForTest([ - { - name: 'discord-test', - displayName: 'Discord Test', - type: 'channel', - channelType: 'discord', - platformId: 'chan-1', - }, - ]); + // Seed a destination so output parsing can resolve "discord-test" → routing + getInboundDb() + .prepare( + `INSERT INTO destinations (name, display_name, type, channel_type, platform_id, agent_group_id) + VALUES ('discord-test', 'Discord Test', 'channel', 'discord', 'chan-1', NULL)`, + ) + .run(); }); afterEach(() => { closeSessionDb(); - setDestinationsForTest([]); }); function insertMessage(id: string, content: object, opts?: { platformId?: string; channelType?: string; threadId?: string }) { diff --git a/container/agent-runner/src/mcp-tools/index.ts b/container/agent-runner/src/mcp-tools/index.ts index b0116284d..b1e7bbd82 100644 --- a/container/agent-runner/src/mcp-tools/index.ts +++ b/container/agent-runner/src/mcp-tools/index.ts @@ -9,7 +9,6 @@ import { Server } from '@modelcontextprotocol/sdk/server/index.js'; import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js'; import { CallToolRequestSchema, ListToolsRequestSchema } from '@modelcontextprotocol/sdk/types.js'; -import { loadDestinations } from '../destinations.js'; import type { McpToolDefinition } from './types.js'; import { coreTools } from './core.js'; import { schedulingTools } from './scheduling.js'; @@ -21,10 +20,6 @@ function log(msg: string): void { console.error(`[mcp-tools] ${msg}`); } -// Load the destination map — this process is spawned fresh for each container -// wake, so the map file is always fresh (written by the host before spawn). -loadDestinations(); - // Only admin agents get the create_agent tool. Non-admins never see it in the // listTools response; the host also re-checks permission on receive as defense // in depth (see delivery.ts create_agent handler). diff --git a/src/container-runner.ts b/src/container-runner.ts index ac4d2cf14..9881ca202 100644 --- a/src/container-runner.ts +++ b/src/container-runner.ts @@ -20,7 +20,7 @@ import { markContainerRunning, markContainerStopped, sessionDir, - writeDestinationsFile, + writeDestinations, } from './session-manager.js'; import type { AgentGroup, Session } from './types.js'; @@ -59,8 +59,8 @@ export async function wakeContainer(session: Session): Promise { return; } - // Refresh the destination map file so any admin changes take effect on wake - writeDestinationsFile(agentGroup.id, session.id); + // Refresh the destination map so any admin changes take effect on wake + writeDestinations(agentGroup.id, session.id); const mounts = buildMounts(agentGroup, session); const containerName = `nanoclaw-v2-${agentGroup.folder}-${Date.now()}`; diff --git a/src/db/schema.ts b/src/db/schema.ts index d2ed36ade..08bc95d34 100644 --- a/src/db/schema.ts +++ b/src/db/schema.ts @@ -76,7 +76,7 @@ CREATE TABLE pending_questions ( * outbound.db — container writes, host reads (read-only open) */ -/** Host-owned: inbound messages + delivery tracking. */ +/** Host-owned: inbound messages + delivery tracking + destination map. */ export const INBOUND_SCHEMA = ` CREATE TABLE messages_in ( id TEXT PRIMARY KEY, @@ -101,6 +101,19 @@ CREATE TABLE delivered ( status TEXT NOT NULL DEFAULT 'delivered', delivered_at TEXT NOT NULL ); + +-- Destination map for this session's agent. +-- Host overwrites on every container wake AND on demand (admin rewires, new child agents, etc.). +-- Container queries this live on every lookup, so admin changes take effect +-- mid-session without requiring a container restart. +CREATE TABLE destinations ( + name TEXT PRIMARY KEY, + display_name TEXT, + type TEXT NOT NULL, -- 'channel' | 'agent' + channel_type TEXT, -- for type='channel' + platform_id TEXT, -- for type='channel' + agent_group_id TEXT -- for type='agent' +); `; /** Container-owned: outbound messages + processing acknowledgments. */ diff --git a/src/delivery.ts b/src/delivery.ts index 2c44941f1..4d60715e8 100644 --- a/src/delivery.ts +++ b/src/delivery.ts @@ -35,6 +35,7 @@ import { sessionDir, inboundDbPath, resolveSession, + writeDestinations, writeSessionMessage, writeSystemResponse, } from './session-manager.js'; @@ -611,6 +612,10 @@ async function handleSystemAction( created_at: now, }); + // Refresh the creator's destination map so the new child appears + // immediately on the next query — no restart needed. + writeDestinations(session.agent_group_id, session.id); + // Fire-and-forget notification back to the creator notifyAgent( session, diff --git a/src/session-manager.ts b/src/session-manager.ts index 1bd61be0d..3267871f2 100644 --- a/src/session-manager.ts +++ b/src/session-manager.ts @@ -132,43 +132,73 @@ export function initSessionFolder(agentGroupId: string, sessionId: string): void } /** - * Write the destination map file into the session folder. - * Called before every container wake so admin changes take effect on next start. - * The container loads this at startup to know what destinations exist. + * Write the session's destination map into its inbound.db `destinations` table. + * + * Called before every container wake so admin changes take effect on next start — + * but the container also re-queries on demand, so mid-session admin changes + * (e.g. spawning a new child agent) can also call this to push the new map + * without restarting the container. + * + * Uses DELETE + INSERT in a transaction for a clean overwrite. */ -export function writeDestinationsFile(agentGroupId: string, sessionId: string): void { - const dir = sessionDir(agentGroupId, sessionId); - if (!fs.existsSync(dir)) return; +export function writeDestinations(agentGroupId: string, sessionId: string): void { + const dbPath = inboundDbPath(agentGroupId, sessionId); + if (!fs.existsSync(dbPath)) return; const rows = getDestinations(agentGroupId); - const destinations: Array> = []; + type DestRow = { + name: string; + display_name: string | null; + type: 'channel' | 'agent'; + channel_type: string | null; + platform_id: string | null; + agent_group_id: string | null; + }; + const resolved: DestRow[] = []; for (const row of rows) { if (row.target_type === 'channel') { const mg = getMessagingGroup(row.target_id); if (!mg) continue; - destinations.push({ + resolved.push({ name: row.local_name, - displayName: mg.name ?? row.local_name, + display_name: mg.name ?? row.local_name, type: 'channel', - channelType: mg.channel_type, - platformId: mg.platform_id, + channel_type: mg.channel_type, + platform_id: mg.platform_id, + agent_group_id: null, }); } else if (row.target_type === 'agent') { const ag = getAgentGroup(row.target_id); if (!ag) continue; - destinations.push({ + resolved.push({ name: row.local_name, - displayName: ag.name, + display_name: ag.name, type: 'agent', - agentGroupId: ag.id, + channel_type: null, + platform_id: null, + agent_group_id: ag.id, }); } } - const filePath = path.join(dir, '.nanoclaw-destinations.json'); - fs.writeFileSync(filePath, JSON.stringify({ destinations }, null, 2)); - log.debug('Destination map written', { sessionId, count: destinations.length }); + const db = new Database(dbPath); + db.pragma('journal_mode = DELETE'); + db.pragma('busy_timeout = 5000'); + try { + const tx = db.transaction((entries: DestRow[]) => { + db.prepare('DELETE FROM destinations').run(); + const stmt = db.prepare( + `INSERT INTO destinations (name, display_name, type, channel_type, platform_id, agent_group_id) + VALUES (@name, @display_name, @type, @channel_type, @platform_id, @agent_group_id)`, + ); + for (const e of entries) stmt.run(e); + }); + tx(resolved); + } finally { + db.close(); + } + log.debug('Destination map written', { sessionId, count: resolved.length }); } /** Write a message to a session's inbound DB (messages_in). Host-only. */