diff --git a/scripts/test-v2-agent.ts b/scripts/test-v2-agent.ts new file mode 100644 index 000000000..0e8c020ab --- /dev/null +++ b/scripts/test-v2-agent.ts @@ -0,0 +1,106 @@ +/** + * Quick integration test: create a session DB, insert a message, + * run the v2 poll loop with the Claude provider, verify output. + * + * Usage: npx tsx scripts/test-v2-agent.ts + */ +import Database from 'better-sqlite3'; +import fs from 'fs'; + +const TEST_DIR = '/tmp/nanoclaw-v2-test'; +const DB_PATH = `${TEST_DIR}/session.db`; + +// Clean up +if (fs.existsSync(TEST_DIR)) fs.rmSync(TEST_DIR, { recursive: true }); +fs.mkdirSync(TEST_DIR, { recursive: true }); + +// Create session DB +const db = new Database(DB_PATH); +db.pragma('journal_mode = WAL'); +db.exec(` + CREATE TABLE messages_in ( + id TEXT PRIMARY KEY, kind TEXT NOT NULL, timestamp TEXT NOT NULL, + status TEXT DEFAULT 'pending', status_changed TEXT, process_after TEXT, + recurrence TEXT, tries INTEGER DEFAULT 0, platform_id TEXT, + channel_type TEXT, thread_id TEXT, content TEXT NOT NULL + ); + CREATE TABLE messages_out ( + id TEXT PRIMARY KEY, in_reply_to TEXT, timestamp TEXT NOT NULL, + delivered INTEGER DEFAULT 0, deliver_after TEXT, recurrence TEXT, + kind TEXT NOT NULL, platform_id TEXT, channel_type TEXT, + thread_id TEXT, content TEXT NOT NULL + ); +`); + +// Insert test message +db.prepare(`INSERT INTO messages_in (id, kind, timestamp, status, content) VALUES (?, 'chat', datetime('now'), 'pending', ?)`).run( + 'test-1', + JSON.stringify({ sender: 'Gavriel', text: 'Say "Hello from v2!" and nothing else. Do not use any tools.' }), +); +console.log('✓ Session DB created with test message'); +db.close(); + +// Set env and run the poll loop +process.env.SESSION_DB_PATH = DB_PATH; +process.env.AGENT_PROVIDER = 'claude'; + +const { getSessionDb, closeSessionDb } = await import('../container/agent-runner/src/db/connection.js'); +const { getUndeliveredMessages } = await import('../container/agent-runner/src/db/messages-out.js'); +const { getPendingMessages } = await import('../container/agent-runner/src/db/messages-in.js'); +const { createProvider } = await import('../container/agent-runner/src/providers/factory.js'); +const { runPollLoop } = await import('../container/agent-runner/src/poll-loop.js'); + +const provider = createProvider('claude'); + +console.log('✓ Claude provider created'); +console.log('⏳ Starting poll loop (will timeout after 60s)...'); + +// Run with timeout +const timeout = setTimeout(() => { + console.log('\n✗ Timed out after 60s'); + printResults(); + process.exit(1); +}, 60_000); + +// Poll for results in parallel +const resultChecker = setInterval(() => { + try { + const out = getUndeliveredMessages(); + if (out.length > 0) { + clearTimeout(timeout); + clearInterval(resultChecker); + console.log('\n✓ Got response!'); + printResults(); + process.exit(0); + } + } catch { + // DB might be locked, retry + } +}, 500); + +function printResults() { + const db2 = new Database(DB_PATH, { readonly: true }); + const inRows = db2.prepare('SELECT * FROM messages_in').all() as Array>; + const outRows = db2.prepare('SELECT * FROM messages_out').all() as Array>; + console.log('\n--- messages_in ---'); + for (const r of inRows) { + console.log(` [${r.id}] status=${r.status} kind=${r.kind} content=${r.content}`); + } + console.log('\n--- messages_out ---'); + for (const r of outRows) { + console.log(` [${r.id}] kind=${r.kind} content=${r.content}`); + } + db2.close(); +} + +// Start the poll loop (runs forever, we exit from the checker above) +try { + await runPollLoop({ + provider, + cwd: TEST_DIR, + mcpServers: {}, + env: { ...process.env }, + }); +} catch (err) { + // Expected — we force exit +} diff --git a/src/container-runner-v2.ts b/src/container-runner-v2.ts new file mode 100644 index 000000000..79c49c89a --- /dev/null +++ b/src/container-runner-v2.ts @@ -0,0 +1,240 @@ +/** + * Container Runner v2 + * Spawns agent containers with session folder + agent group folder mounts. + * The container runs the v2 agent-runner which polls the session DB. + */ +import { ChildProcess, spawn } from 'child_process'; +import fs from 'fs'; +import path from 'path'; + +import { OneCLI } from '@onecli-sh/sdk'; + +import { CONTAINER_IMAGE, DATA_DIR, GROUPS_DIR, IDLE_TIMEOUT, ONECLI_URL, TIMEZONE } from './config.js'; +import { CONTAINER_RUNTIME_BIN, hostGatewayArgs, readonlyMountArgs, stopContainer } from './container-runtime.js'; +import { getAgentGroup } from './db/agent-groups.js'; +import { log } from './log.js'; +import { validateAdditionalMounts } from './mount-security.js'; +import { markContainerIdle, markContainerRunning, markContainerStopped, sessionDbPath, sessionDir } from './session-manager.js'; +import type { AgentGroup, Session } from './types-v2.js'; + +const onecli = new OneCLI({ url: ONECLI_URL }); + +interface VolumeMount { + hostPath: string; + containerPath: string; + readonly: boolean; +} + +/** Active containers tracked by session ID. */ +const activeContainers = new Map(); + +export function getActiveContainerCount(): number { + return activeContainers.size; +} + +export function isContainerRunning(sessionId: string): boolean { + return activeContainers.has(sessionId); +} + +/** + * Wake up a container for a session. If already running, no-op. + * The container runs the v2 agent-runner which polls the session DB. + */ +export async function wakeContainer(session: Session): Promise { + if (activeContainers.has(session.id)) { + log.debug('Container already running', { sessionId: session.id }); + return; + } + + const agentGroup = getAgentGroup(session.agent_group_id); + if (!agentGroup) { + log.error('Agent group not found', { agentGroupId: session.agent_group_id }); + return; + } + + const mounts = buildMounts(agentGroup, session); + const containerName = `nanoclaw-v2-${agentGroup.folder}-${Date.now()}`; + const agentIdentifier = agentGroup.is_admin ? undefined : agentGroup.folder.toLowerCase().replace(/_/g, '-'); + const args = await buildContainerArgs(mounts, containerName, session, agentGroup, agentIdentifier); + + log.info('Spawning container', { sessionId: session.id, agentGroup: agentGroup.name, containerName }); + + const container = spawn(CONTAINER_RUNTIME_BIN, args, { stdio: ['ignore', 'pipe', 'pipe'] }); + + activeContainers.set(session.id, { process: container, containerName }); + markContainerRunning(session.id); + + // Log stderr + container.stderr?.on('data', (data) => { + for (const line of data.toString().trim().split('\n')) { + if (line) log.debug(line, { container: agentGroup.folder }); + } + }); + + // stdout is unused in v2 (all IO is via session DB) + container.stdout?.on('data', () => {}); + + // Idle timeout: kill container after IDLE_TIMEOUT of no activity + let idleTimer = setTimeout(() => killContainer(session.id, 'idle timeout'), IDLE_TIMEOUT); + + const resetIdle = () => { + clearTimeout(idleTimer); + idleTimer = setTimeout(() => killContainer(session.id, 'idle timeout'), IDLE_TIMEOUT); + }; + + // Reset idle timer when the host detects new messages_out (called by delivery.ts) + const entry = activeContainers.get(session.id); + if (entry) { + (entry as { resetIdle?: () => void }).resetIdle = resetIdle; + } + + container.on('close', (code) => { + clearTimeout(idleTimer); + activeContainers.delete(session.id); + markContainerStopped(session.id); + log.info('Container exited', { sessionId: session.id, code, containerName }); + }); + + container.on('error', (err) => { + clearTimeout(idleTimer); + activeContainers.delete(session.id); + markContainerStopped(session.id); + log.error('Container spawn error', { sessionId: session.id, err }); + }); +} + +/** Reset the idle timer for a session's container (called when messages_out are delivered). */ +export function resetContainerIdleTimer(sessionId: string): void { + const entry = activeContainers.get(sessionId) as { resetIdle?: () => void } | undefined; + entry?.resetIdle?.(); +} + +/** Kill a container for a session. */ +export function killContainer(sessionId: string, reason: string): void { + const entry = activeContainers.get(sessionId); + if (!entry) return; + + log.info('Killing container', { sessionId, reason, containerName: entry.containerName }); + try { + stopContainer(entry.containerName); + } catch { + entry.process.kill('SIGKILL'); + } +} + +function buildMounts(agentGroup: AgentGroup, session: Session): VolumeMount[] { + const mounts: VolumeMount[] = []; + const projectRoot = process.cwd(); + const sessDir = sessionDir(agentGroup.id, session.id); + const groupDir = path.resolve(GROUPS_DIR, agentGroup.folder); + + // Session folder at /workspace (contains session.db, outbox/, .claude/) + mounts.push({ hostPath: sessDir, containerPath: '/workspace', readonly: false }); + + // Agent group folder at /workspace/agent + fs.mkdirSync(groupDir, { recursive: true }); + mounts.push({ hostPath: groupDir, containerPath: '/workspace/agent', readonly: false }); + + // Global memory directory + const globalDir = path.join(GROUPS_DIR, 'global'); + if (fs.existsSync(globalDir)) { + mounts.push({ hostPath: globalDir, containerPath: '/workspace/global', readonly: !agentGroup.is_admin }); + } + + // Claude sessions directory (per agent group, shared across sessions) + const claudeDir = path.join(DATA_DIR, 'v2-sessions', agentGroup.id, '.claude-shared'); + fs.mkdirSync(claudeDir, { recursive: true }); + const settingsFile = path.join(claudeDir, 'settings.json'); + if (!fs.existsSync(settingsFile)) { + fs.writeFileSync(settingsFile, JSON.stringify({ + env: { + CLAUDE_CODE_EXPERIMENTAL_AGENT_TEAMS: '1', + CLAUDE_CODE_ADDITIONAL_DIRECTORIES_CLAUDE_MD: '1', + CLAUDE_CODE_DISABLE_AUTO_MEMORY: '0', + }, + }, null, 2) + '\n'); + } + + // Sync container skills + const skillsSrc = path.join(projectRoot, 'container', 'skills'); + const skillsDst = path.join(claudeDir, 'skills'); + if (fs.existsSync(skillsSrc)) { + for (const skillDir of fs.readdirSync(skillsSrc)) { + const srcDir = path.join(skillsSrc, skillDir); + if (fs.statSync(srcDir).isDirectory()) { + fs.cpSync(srcDir, path.join(skillsDst, skillDir), { recursive: true }); + } + } + } + mounts.push({ hostPath: claudeDir, containerPath: '/home/node/.claude', readonly: false }); + + // Agent-runner source (per agent group, recompiled on container startup) + const agentRunnerSrc = path.join(projectRoot, 'container', 'agent-runner', 'src'); + const groupRunnerDir = path.join(DATA_DIR, 'v2-sessions', agentGroup.id, 'agent-runner-src'); + if (fs.existsSync(agentRunnerSrc)) { + const srcIndex = path.join(agentRunnerSrc, 'index-v2.ts'); + const cachedIndex = path.join(groupRunnerDir, 'index-v2.ts'); + const needsCopy = !fs.existsSync(groupRunnerDir) || !fs.existsSync(cachedIndex) || fs.statSync(srcIndex).mtimeMs > fs.statSync(cachedIndex).mtimeMs; + if (needsCopy) { + fs.cpSync(agentRunnerSrc, groupRunnerDir, { recursive: true }); + } + } + mounts.push({ hostPath: groupRunnerDir, containerPath: '/app/src', readonly: false }); + + // Admin: mount project root read-only + if (agentGroup.is_admin) { + mounts.push({ hostPath: projectRoot, containerPath: '/workspace/project', readonly: true }); + const envFile = path.join(projectRoot, '.env'); + if (fs.existsSync(envFile)) { + mounts.push({ hostPath: '/dev/null', containerPath: '/workspace/project/.env', readonly: true }); + } + } + + // Additional mounts from container config + const containerConfig = agentGroup.container_config ? JSON.parse(agentGroup.container_config) : {}; + if (containerConfig.additionalMounts) { + const validated = validateAdditionalMounts(containerConfig.additionalMounts, agentGroup.name, !!agentGroup.is_admin); + mounts.push(...validated); + } + + return mounts; +} + +async function buildContainerArgs(mounts: VolumeMount[], containerName: string, session: Session, agentGroup: AgentGroup, agentIdentifier?: string): Promise { + const args: string[] = ['run', '--rm', '--name', containerName]; + + // Environment + args.push('-e', `TZ=${TIMEZONE}`); + args.push('-e', `AGENT_PROVIDER=${session.agent_provider || agentGroup.agent_provider || 'claude'}`); + args.push('-e', `SESSION_DB_PATH=/workspace/session.db`); + + // OneCLI gateway + const onecliApplied = await onecli.applyContainerConfig(args, { addHostMapping: false, agent: agentIdentifier }); + if (onecliApplied) { + log.debug('OneCLI gateway applied', { containerName }); + } + + // Host gateway + args.push(...hostGatewayArgs()); + + // User mapping + const hostUid = process.getuid?.(); + const hostGid = process.getgid?.(); + if (hostUid != null && hostUid !== 0 && hostUid !== 1000) { + args.push('--user', `${hostUid}:${hostGid}`); + args.push('-e', 'HOME=/home/node'); + } + + // Volume mounts + for (const mount of mounts) { + if (mount.readonly) { + args.push(...readonlyMountArgs(mount.hostPath, mount.containerPath)); + } else { + args.push('-v', `${mount.hostPath}:${mount.containerPath}`); + } + } + + args.push(CONTAINER_IMAGE); + + return args; +} diff --git a/src/delivery.ts b/src/delivery.ts new file mode 100644 index 000000000..ea52e747f --- /dev/null +++ b/src/delivery.ts @@ -0,0 +1,156 @@ +/** + * Outbound message delivery. + * Polls active session DBs for undelivered messages_out, delivers through channel adapters. + */ +import Database from 'better-sqlite3'; + +import { getRunningSessions, getActiveSessions } from './db/sessions.js'; +import { getAgentGroup } from './db/agent-groups.js'; +import { log } from './log.js'; +import { openSessionDb, sessionDbPath } from './session-manager.js'; +import { resetContainerIdleTimer } from './container-runner-v2.js'; +import type { Session } from './types-v2.js'; + +const ACTIVE_POLL_MS = 1000; +const SWEEP_POLL_MS = 60_000; + +export interface ChannelDeliveryAdapter { + deliver(channelType: string, platformId: string, threadId: string | null, kind: string, content: string): Promise; + setTyping?(channelType: string, platformId: string, threadId: string | null): Promise; +} + +let deliveryAdapter: ChannelDeliveryAdapter | null = null; +let activePolling = false; +let sweepPolling = false; + +export function setDeliveryAdapter(adapter: ChannelDeliveryAdapter): void { + deliveryAdapter = adapter; +} + +/** Start the active container poll loop (~1s). */ +export function startActiveDeliveryPoll(): void { + if (activePolling) return; + activePolling = true; + pollActive(); +} + +/** Start the sweep poll loop (~60s). */ +export function startSweepDeliveryPoll(): void { + if (sweepPolling) return; + sweepPolling = true; + pollSweep(); +} + +async function pollActive(): Promise { + if (!activePolling) return; + + try { + const sessions = getRunningSessions(); + for (const session of sessions) { + await deliverSessionMessages(session); + } + } catch (err) { + log.error('Active delivery poll error', { err }); + } + + setTimeout(pollActive, ACTIVE_POLL_MS); +} + +async function pollSweep(): Promise { + if (!sweepPolling) return; + + try { + const sessions = getActiveSessions(); + for (const session of sessions) { + await deliverSessionMessages(session); + } + } catch (err) { + log.error('Sweep delivery poll error', { err }); + } + + setTimeout(pollSweep, SWEEP_POLL_MS); +} + +async function deliverSessionMessages(session: Session): Promise { + const agentGroup = getAgentGroup(session.agent_group_id); + if (!agentGroup) return; + + let db: Database.Database; + try { + db = openSessionDb(agentGroup.id, session.id); + } catch { + return; // Session DB might not exist yet + } + + try { + const undelivered = db + .prepare( + `SELECT * FROM messages_out + WHERE delivered = 0 + AND (deliver_after IS NULL OR deliver_after <= datetime('now')) + ORDER BY timestamp ASC`, + ) + .all() as Array<{ + id: string; + kind: string; + platform_id: string | null; + channel_type: string | null; + thread_id: string | null; + content: string; + }>; + + if (undelivered.length === 0) return; + + for (const msg of undelivered) { + try { + await deliverMessage(msg, session); + db.prepare('UPDATE messages_out SET delivered = 1 WHERE id = ?').run(msg.id); + resetContainerIdleTimer(session.id); + } catch (err) { + log.error('Failed to deliver message', { messageId: msg.id, sessionId: session.id, err }); + } + } + } finally { + db.close(); + } +} + +async function deliverMessage( + msg: { id: string; kind: string; platform_id: string | null; channel_type: string | null; thread_id: string | null; content: string }, + session: Session, +): Promise { + if (!deliveryAdapter) { + log.warn('No delivery adapter configured, dropping message', { id: msg.id }); + return; + } + + const content = JSON.parse(msg.content); + + // System actions — handle internally + if (msg.kind === 'system') { + log.info('System action from agent', { sessionId: session.id, action: content.action }); + // TODO: handle system actions (register_group, reset_session, etc.) + return; + } + + // Agent-to-agent — route to target session + if (msg.channel_type === 'agent') { + log.info('Agent-to-agent message', { from: session.id, target: msg.platform_id }); + // TODO: route to target agent's session DB + return; + } + + // Channel delivery + if (!msg.channel_type || !msg.platform_id) { + log.warn('Message missing routing fields', { id: msg.id }); + return; + } + + await deliveryAdapter.deliver(msg.channel_type, msg.platform_id, msg.thread_id, msg.kind, msg.content); + log.info('Message delivered', { id: msg.id, channelType: msg.channel_type, platformId: msg.platform_id }); +} + +export function stopDeliveryPolls(): void { + activePolling = false; + sweepPolling = false; +} diff --git a/src/host-sweep.ts b/src/host-sweep.ts new file mode 100644 index 000000000..431f04a7f --- /dev/null +++ b/src/host-sweep.ts @@ -0,0 +1,131 @@ +/** + * Host sweep — periodic maintenance of all session DBs. + * + * - Wake containers for sessions with due messages (process_after) + * - Detect stale processing messages (container crash) → reset with backoff + * - Insert next occurrence for recurring messages + * - Kill idle containers past timeout + */ +import Database from 'better-sqlite3'; +import fs from 'fs'; + +import { getActiveSessions, updateSession } from './db/sessions.js'; +import { getAgentGroup } from './db/agent-groups.js'; +import { log } from './log.js'; +import { openSessionDb, sessionDbPath } from './session-manager.js'; +import { wakeContainer, isContainerRunning } from './container-runner-v2.js'; +import type { Session } from './types-v2.js'; + +const SWEEP_INTERVAL_MS = 60_000; +const STALE_THRESHOLD_MS = 10 * 60 * 1000; // 10 minutes +const MAX_TRIES = 5; +const BACKOFF_BASE_MS = 5000; + +let running = false; + +export function startHostSweep(): void { + if (running) return; + running = true; + sweep(); +} + +export function stopHostSweep(): void { + running = false; +} + +async function sweep(): Promise { + if (!running) return; + + try { + const sessions = getActiveSessions(); + for (const session of sessions) { + await sweepSession(session); + } + } catch (err) { + log.error('Host sweep error', { err }); + } + + setTimeout(sweep, SWEEP_INTERVAL_MS); +} + +async function sweepSession(session: Session): Promise { + const agentGroup = getAgentGroup(session.agent_group_id); + if (!agentGroup) return; + + const dbPath = sessionDbPath(agentGroup.id, session.id); + if (!fs.existsSync(dbPath)) return; + + let db: Database.Database; + try { + db = new Database(dbPath); + db.pragma('journal_mode = WAL'); + } catch { + return; + } + + try { + // 1. Check for due pending messages → wake container + const dueMessages = db + .prepare( + `SELECT COUNT(*) as count FROM messages_in + WHERE status = 'pending' + AND (process_after IS NULL OR process_after <= datetime('now'))`, + ) + .get() as { count: number }; + + if (dueMessages.count > 0 && !isContainerRunning(session.id)) { + log.info('Waking container for due messages', { sessionId: session.id, count: dueMessages.count }); + await wakeContainer(session); + } + + // 2. Detect stale processing messages + const staleMessages = db + .prepare( + `SELECT id, tries FROM messages_in + WHERE status = 'processing' + AND status_changed < datetime('now', '-${Math.floor(STALE_THRESHOLD_MS / 1000)} seconds')`, + ) + .all() as Array<{ id: string; tries: number }>; + + for (const msg of staleMessages) { + if (msg.tries >= MAX_TRIES) { + db.prepare("UPDATE messages_in SET status = 'failed', status_changed = datetime('now') WHERE id = ?").run(msg.id); + log.warn('Message marked as failed after max retries', { messageId: msg.id, sessionId: session.id }); + } else { + const backoffMs = BACKOFF_BASE_MS * Math.pow(2, msg.tries); + const backoffSec = Math.floor(backoffMs / 1000); + db.prepare(`UPDATE messages_in SET status = 'pending', status_changed = datetime('now'), process_after = datetime('now', '+${backoffSec} seconds') WHERE id = ?`).run(msg.id); + log.info('Reset stale message with backoff', { messageId: msg.id, tries: msg.tries, backoffMs }); + } + } + + // 3. Handle recurrence for completed messages + const completedRecurring = db + .prepare("SELECT * FROM messages_in WHERE status = 'completed' AND recurrence IS NOT NULL") + .all() as Array<{ id: string; kind: string; content: string; recurrence: string; process_after: string | null; platform_id: string | null; channel_type: string | null; thread_id: string | null }>; + + for (const msg of completedRecurring) { + try { + // Dynamic import to avoid loading cron-parser at module level + const { CronExpressionParser } = await import('cron-parser'); + const interval = CronExpressionParser.parse(msg.recurrence); + const nextRun = interval.next().toISOString(); + const newId = `msg-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; + + db.prepare( + `INSERT INTO messages_in (id, kind, timestamp, status, process_after, recurrence, platform_id, channel_type, thread_id, content) + VALUES (?, ?, datetime('now'), 'pending', ?, ?, ?, ?, ?, ?)`, + ).run(newId, msg.kind, nextRun, msg.recurrence, msg.platform_id, msg.channel_type, msg.thread_id, msg.content); + + // Remove recurrence from the completed message so it doesn't spawn again + db.prepare("UPDATE messages_in SET recurrence = NULL WHERE id = ?").run(msg.id); + + log.info('Inserted next recurrence', { originalId: msg.id, newId, nextRun }); + } catch (err) { + log.error('Failed to compute next recurrence', { messageId: msg.id, recurrence: msg.recurrence, err }); + } + } + } finally { + db.close(); + } +} diff --git a/src/index-v2.ts b/src/index-v2.ts new file mode 100644 index 000000000..07da575b6 --- /dev/null +++ b/src/index-v2.ts @@ -0,0 +1,49 @@ +/** + * NanoClaw v2 — main entry point. + * + * Thin orchestrator: init DB, run migrations, start delivery polls, start sweep. + * Channel adapters are started separately (Phase 4). + */ +import path from 'path'; + +import { DATA_DIR } from './config.js'; +import { initDb } from './db/connection.js'; +import { runMigrations } from './db/migrations/index.js'; +import { ensureContainerRuntimeRunning, cleanupOrphans } from './container-runtime.js'; +import { startActiveDeliveryPoll, startSweepDeliveryPoll, setDeliveryAdapter } from './delivery.js'; +import { startHostSweep } from './host-sweep.js'; +import { log } from './log.js'; + +async function main(): Promise { + log.info('NanoClaw v2 starting'); + + // 1. Init central DB + const dbPath = path.join(DATA_DIR, 'v2.db'); + const db = initDb(dbPath); + runMigrations(db); + log.info('Central DB ready', { path: dbPath }); + + // 2. Container runtime + ensureContainerRuntimeRunning(); + cleanupOrphans(); + + // 3. Channel adapters (Phase 4 — placeholder) + // TODO: init channel adapters, set up delivery adapter + // setDeliveryAdapter({ deliver: async (...) => { ... } }); + + // 4. Start delivery polls + startActiveDeliveryPoll(); + startSweepDeliveryPoll(); + log.info('Delivery polls started'); + + // 5. Start host sweep + startHostSweep(); + log.info('Host sweep started'); + + log.info('NanoClaw v2 running'); +} + +main().catch((err) => { + log.fatal('Startup failed', { err }); + process.exit(1); +}); diff --git a/src/router-v2.ts b/src/router-v2.ts new file mode 100644 index 000000000..ee08d5e03 --- /dev/null +++ b/src/router-v2.ts @@ -0,0 +1,99 @@ +/** + * Inbound message routing for v2. + * + * Channel adapter event → resolve messaging group → resolve agent group + * → resolve/create session → write messages_in → wake container + */ +import { getMessagingGroupByPlatform, createMessagingGroup, getMessagingGroupAgents } from './db/messaging-groups.js'; +import { log } from './log.js'; +import { resolveSession, writeSessionMessage } from './session-manager.js'; +import { wakeContainer } from './container-runner-v2.js'; +import { getSession } from './db/sessions.js'; +import type { MessagingGroupAgent } from './types-v2.js'; + +function generateId(): string { + return `msg-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; +} + +export interface InboundEvent { + channelType: string; + platformId: string; + threadId: string | null; + message: { + id: string; + kind: 'chat' | 'chat-sdk'; + content: string; // JSON blob + timestamp: string; + }; +} + +/** + * Route an inbound message from a channel adapter to the correct session. + * Creates messaging group + session if they don't exist yet. + */ +export async function routeInbound(event: InboundEvent): Promise { + // 1. Resolve messaging group + let mg = getMessagingGroupByPlatform(event.channelType, event.platformId); + + if (!mg) { + // Auto-create messaging group (adapter already decided to forward this) + const mgId = `mg-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; + mg = { + id: mgId, + channel_type: event.channelType, + platform_id: event.platformId, + name: null, + is_group: 0, + admin_user_id: null, + created_at: new Date().toISOString(), + }; + createMessagingGroup(mg); + log.info('Auto-created messaging group', { id: mgId, channelType: event.channelType, platformId: event.platformId }); + } + + // 2. Resolve agent group via messaging_group_agents + const agents = getMessagingGroupAgents(mg.id); + if (agents.length === 0) { + log.warn('No agent groups configured for messaging group', { messagingGroupId: mg.id, platformId: event.platformId }); + return; + } + + // Pick the best matching agent (highest priority, trigger matching in future) + const match = pickAgent(agents, event); + if (!match) { + log.debug('No agent matched for message', { messagingGroupId: mg.id }); + return; + } + + // 3. Resolve or create session + const { session, created } = resolveSession(match.agent_group_id, mg.id, event.threadId, match.session_mode); + + // 4. Write message to session DB + writeSessionMessage(session.agent_group_id, session.id, { + id: event.message.id || generateId(), + kind: event.message.kind, + timestamp: event.message.timestamp, + platformId: event.platformId, + channelType: event.channelType, + threadId: event.threadId, + content: event.message.content, + }); + + log.info('Message routed', { sessionId: session.id, agentGroup: match.agent_group_id, kind: event.message.kind, created }); + + // 5. Wake container + const freshSession = getSession(session.id); + if (freshSession) { + await wakeContainer(freshSession); + } +} + +/** + * Pick the matching agent for an inbound event. + * Currently: highest priority agent. Future: trigger rule matching. + */ +function pickAgent(agents: MessagingGroupAgent[], _event: InboundEvent): MessagingGroupAgent | null { + // Agents are already ordered by priority DESC from the DB query + // TODO: apply trigger_rules matching (pattern, mentionOnly, etc.) + return agents[0] ?? null; +} diff --git a/src/session-manager.ts b/src/session-manager.ts new file mode 100644 index 000000000..ae075770b --- /dev/null +++ b/src/session-manager.ts @@ -0,0 +1,145 @@ +/** + * Session lifecycle management. + * Creates session folders + DBs, writes messages, manages container status. + */ +import Database from 'better-sqlite3'; +import fs from 'fs'; +import path from 'path'; + +import { DATA_DIR } from './config.js'; +import { createSession, findSession, getSession, updateSession } from './db/sessions.js'; +import { log } from './log.js'; +import { SESSION_SCHEMA } from './db/schema.js'; +import type { Session } from './types-v2.js'; + +/** Root directory for all session data. */ +export function sessionsBaseDir(): string { + return path.join(DATA_DIR, 'v2-sessions'); +} + +/** Directory for a specific session: sessions/{agent_group_id}/{session_id}/ */ +export function sessionDir(agentGroupId: string, sessionId: string): string { + return path.join(sessionsBaseDir(), agentGroupId, sessionId); +} + +/** Path to a session's SQLite DB. */ +export function sessionDbPath(agentGroupId: string, sessionId: string): string { + return path.join(sessionDir(agentGroupId, sessionId), 'session.db'); +} + +function generateId(): string { + return `sess-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; +} + +/** + * Find or create a session for a messaging group + thread. + * Returns the session and whether it was newly created. + */ +export function resolveSession(agentGroupId: string, messagingGroupId: string, threadId: string | null, sessionMode: 'shared' | 'per-thread'): { session: Session; created: boolean } { + // For shared mode, look for any active session with this messaging group (threadId ignored) + // For per-thread mode, look for an active session with this specific thread + const lookupThreadId = sessionMode === 'shared' ? null : threadId; + const existing = findSession(messagingGroupId, lookupThreadId); + + if (existing) { + return { session: existing, created: false }; + } + + // Create new session + const id = generateId(); + const session: Session = { + id, + agent_group_id: agentGroupId, + messaging_group_id: messagingGroupId, + thread_id: lookupThreadId, + agent_provider: null, + status: 'active', + container_status: 'stopped', + last_active: null, + created_at: new Date().toISOString(), + }; + + createSession(session); + initSessionFolder(agentGroupId, id); + log.info('Session created', { id, agentGroupId, messagingGroupId, threadId: lookupThreadId }); + + return { session, created: true }; +} + +/** Create the session folder and initialize the session DB. */ +export function initSessionFolder(agentGroupId: string, sessionId: string): void { + const dir = sessionDir(agentGroupId, sessionId); + fs.mkdirSync(dir, { recursive: true }); + fs.mkdirSync(path.join(dir, 'outbox'), { recursive: true }); + + const dbPath = sessionDbPath(agentGroupId, sessionId); + if (!fs.existsSync(dbPath)) { + const db = new Database(dbPath); + db.pragma('journal_mode = WAL'); + db.exec(SESSION_SCHEMA); + db.close(); + log.debug('Session DB created', { dbPath }); + } +} + +/** Write a message to a session's messages_in table. */ +export function writeSessionMessage(agentGroupId: string, sessionId: string, message: { + id: string; + kind: string; + timestamp: string; + platformId?: string | null; + channelType?: string | null; + threadId?: string | null; + content: string; + processAfter?: string | null; + recurrence?: string | null; +}): void { + const dbPath = sessionDbPath(agentGroupId, sessionId); + const db = new Database(dbPath); + db.pragma('journal_mode = WAL'); + + try { + db.prepare( + `INSERT INTO messages_in (id, kind, timestamp, status, platform_id, channel_type, thread_id, content, process_after, recurrence) + VALUES (@id, @kind, @timestamp, 'pending', @platformId, @channelType, @threadId, @content, @processAfter, @recurrence)`, + ).run({ + id: message.id, + kind: message.kind, + timestamp: message.timestamp, + platformId: message.platformId ?? null, + channelType: message.channelType ?? null, + threadId: message.threadId ?? null, + content: message.content, + processAfter: message.processAfter ?? null, + recurrence: message.recurrence ?? null, + }); + } finally { + db.close(); + } + + // Update last_active + updateSession(sessionId, { last_active: new Date().toISOString() }); +} + +/** Open a session DB for reading (e.g., polling messages_out). */ +export function openSessionDb(agentGroupId: string, sessionId: string): Database.Database { + const dbPath = sessionDbPath(agentGroupId, sessionId); + const db = new Database(dbPath); + db.pragma('journal_mode = WAL'); + return db; +} + +/** Mark a container as running for a session. */ +export function markContainerRunning(sessionId: string): void { + updateSession(sessionId, { container_status: 'running', last_active: new Date().toISOString() }); +} + +/** Mark a container as idle for a session. */ +export function markContainerIdle(sessionId: string): void { + updateSession(sessionId, { container_status: 'idle' }); +} + +/** Mark a container as stopped for a session. */ +export function markContainerStopped(sessionId: string): void { + updateSession(sessionId, { container_status: 'stopped' }); +}