diff --git a/container/agent-runner/src/formatter.ts b/container/agent-runner/src/formatter.ts new file mode 100644 index 000000000..f3bb5a81c --- /dev/null +++ b/container/agent-runner/src/formatter.ts @@ -0,0 +1,126 @@ +import type { MessageInRow } from './db/messages-in.js'; + +/** + * Routing context extracted from messages_in rows. + * Copied to messages_out by default so responses go back to the sender. + */ +export interface RoutingContext { + platformId: string | null; + channelType: string | null; + threadId: string | null; + inReplyTo: string | null; +} + +/** + * Extract routing context from a batch of messages. + * Uses the first message's routing fields. + */ +export function extractRouting(messages: MessageInRow[]): RoutingContext { + const first = messages[0]; + return { + platformId: first?.platform_id ?? null, + channelType: first?.channel_type ?? null, + threadId: first?.thread_id ?? null, + inReplyTo: first?.id ?? null, + }; +} + +/** + * Format a batch of messages_in rows into a prompt string. + * Strips routing fields — the agent never sees platform_id, channel_type, thread_id. + */ +export function formatMessages(messages: MessageInRow[]): string { + if (messages.length === 0) return ''; + + // Group by kind + const chatMessages = messages.filter((m) => m.kind === 'chat' || m.kind === 'chat-sdk'); + const taskMessages = messages.filter((m) => m.kind === 'task'); + const webhookMessages = messages.filter((m) => m.kind === 'webhook'); + const systemMessages = messages.filter((m) => m.kind === 'system'); + + const parts: string[] = []; + + if (chatMessages.length > 0) { + parts.push(formatChatMessages(chatMessages)); + } + if (taskMessages.length > 0) { + parts.push(...taskMessages.map(formatTaskMessage)); + } + if (webhookMessages.length > 0) { + parts.push(...webhookMessages.map(formatWebhookMessage)); + } + if (systemMessages.length > 0) { + parts.push(...systemMessages.map(formatSystemMessage)); + } + + return parts.join('\n\n'); +} + +function formatChatMessages(messages: MessageInRow[]): string { + if (messages.length === 1) { + return formatSingleChat(messages[0]); + } + + const lines = ['']; + for (const msg of messages) { + const content = parseContent(msg.content); + const sender = content.sender || content.author?.fullName || content.author?.userName || 'Unknown'; + const time = formatTime(msg.timestamp); + const text = content.text || ''; + lines.push(`${escapeXml(text)}`); + } + lines.push(''); + return lines.join('\n'); +} + +function formatSingleChat(msg: MessageInRow): string { + const content = parseContent(msg.content); + const sender = content.sender || content.author?.fullName || content.author?.userName || 'Unknown'; + const time = formatTime(msg.timestamp); + const text = content.text || ''; + return `${escapeXml(text)}`; +} + +function formatTaskMessage(msg: MessageInRow): string { + const content = parseContent(msg.content); + const parts = ['[SCHEDULED TASK]']; + if (content.scriptOutput) { + parts.push('', 'Script output:', JSON.stringify(content.scriptOutput, null, 2)); + } + parts.push('', 'Instructions:', content.prompt || ''); + return parts.join('\n'); +} + +function formatWebhookMessage(msg: MessageInRow): string { + const content = parseContent(msg.content); + const source = content.source || 'unknown'; + const event = content.event || 'unknown'; + return `[WEBHOOK: ${source}/${event}]\n\n${JSON.stringify(content.payload || content, null, 2)}`; +} + +function formatSystemMessage(msg: MessageInRow): string { + const content = parseContent(msg.content); + return `[SYSTEM RESPONSE]\n\nAction: ${content.action || 'unknown'}\nStatus: ${content.status || 'unknown'}\nResult: ${JSON.stringify(content.result || null)}`; +} + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +function parseContent(json: string): any { + try { + return JSON.parse(json); + } catch { + return { text: json }; + } +} + +function formatTime(timestamp: string): string { + try { + const d = new Date(timestamp); + return `${d.getHours().toString().padStart(2, '0')}:${d.getMinutes().toString().padStart(2, '0')}`; + } catch { + return timestamp; + } +} + +function escapeXml(str: string): string { + return str.replace(/&/g, '&').replace(//g, '>').replace(/"/g, '"'); +} diff --git a/container/agent-runner/src/index-v2.ts b/container/agent-runner/src/index-v2.ts new file mode 100644 index 000000000..1005e568a --- /dev/null +++ b/container/agent-runner/src/index-v2.ts @@ -0,0 +1,96 @@ +/** + * NanoClaw Agent Runner v2 + * + * Runs inside a container. All IO goes through the session DB. + * No stdin, no stdout markers, no IPC files. + * + * Config: + * - SESSION_DB_PATH: path to session SQLite DB (default: /workspace/session.db) + * - AGENT_PROVIDER: 'claude' | 'mock' (default: claude) + * - NANOCLAW_ASSISTANT_NAME: assistant name for transcript archiving + * - NANOCLAW_ADMIN_USER_ID: admin user ID for permission checks + * + * Mount structure: + * /workspace/ + * session.db ← session SQLite DB + * outbox/ ← outbound files + * agent/ ← agent group folder (CLAUDE.md, skills, working files) + * .claude/ ← Claude SDK session data + */ + +import fs from 'fs'; +import path from 'path'; +import { fileURLToPath } from 'url'; + +import { createProvider, type ProviderName } from './providers/factory.js'; +import { runPollLoop } from './poll-loop.js'; + +function log(msg: string): void { + console.error(`[agent-runner] ${msg}`); +} + +const CWD = '/workspace/agent'; +const GLOBAL_CLAUDE_MD = '/workspace/global/CLAUDE.md'; + +async function main(): Promise { + const providerName = (process.env.AGENT_PROVIDER || 'claude') as ProviderName; + const assistantName = process.env.NANOCLAW_ASSISTANT_NAME; + + log(`Starting v2 agent-runner (provider: ${providerName})`); + + const provider = createProvider(providerName, { assistantName }); + + // Load global CLAUDE.md as additional system context + let systemPrompt: string | undefined; + if (fs.existsSync(GLOBAL_CLAUDE_MD)) { + systemPrompt = fs.readFileSync(GLOBAL_CLAUDE_MD, 'utf-8'); + log('Loaded global CLAUDE.md'); + } + + // Discover additional directories mounted at /workspace/extra/* + const additionalDirectories: string[] = []; + const extraBase = '/workspace/extra'; + if (fs.existsSync(extraBase)) { + for (const entry of fs.readdirSync(extraBase)) { + const fullPath = path.join(extraBase, entry); + if (fs.statSync(fullPath).isDirectory()) { + additionalDirectories.push(fullPath); + } + } + if (additionalDirectories.length > 0) { + log(`Additional directories: ${additionalDirectories.join(', ')}`); + } + } + + // MCP server path + const __dirname = path.dirname(fileURLToPath(import.meta.url)); + const mcpServerPath = path.join(__dirname, 'mcp-tools.js'); + + // SDK env + const env: Record = { + ...process.env, + CLAUDE_CODE_AUTO_COMPACT_WINDOW: '165000', + }; + + await runPollLoop({ + provider, + cwd: CWD, + mcpServers: { + nanoclaw: { + command: 'node', + args: [mcpServerPath], + env: { + SESSION_DB_PATH: process.env.SESSION_DB_PATH || '/workspace/session.db', + }, + }, + }, + systemPrompt, + env, + additionalDirectories: additionalDirectories.length > 0 ? additionalDirectories : undefined, + }); +} + +main().catch((err) => { + log(`Fatal error: ${err instanceof Error ? err.message : String(err)}`); + process.exit(1); +}); diff --git a/container/agent-runner/src/mcp-tools.ts b/container/agent-runner/src/mcp-tools.ts new file mode 100644 index 000000000..e56d6a86e --- /dev/null +++ b/container/agent-runner/src/mcp-tools.ts @@ -0,0 +1,81 @@ +import { Server } from '@modelcontextprotocol/sdk/server/index.js'; +import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js'; +import { CallToolRequestSchema, ListToolsRequestSchema } from '@modelcontextprotocol/sdk/types.js'; + +import { writeMessageOut } from './db/messages-out.js'; + +function log(msg: string): void { + console.error(`[mcp-tools] ${msg}`); +} + +function generateId(): string { + return `msg-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; +} + +/** + * Start the MCP server with NanoClaw tools. + * Reads the session DB path from SESSION_DB_PATH env var. + * Routing context is passed via env vars from the poll loop. + */ +export async function startMcpServer(): Promise { + const server = new Server({ name: 'nanoclaw', version: '2.0.0' }, { capabilities: { tools: {} } }); + + server.setRequestHandler(ListToolsRequestSchema, async () => ({ + tools: [ + { + name: 'send_message', + description: 'Send a chat message to the current conversation or a specified destination.', + inputSchema: { + type: 'object' as const, + properties: { + text: { type: 'string', description: 'Message content' }, + channel: { type: 'string', description: 'Target channel type (default: reply to origin)' }, + platformId: { type: 'string', description: 'Target platform ID' }, + threadId: { type: 'string', description: 'Target thread ID' }, + }, + required: ['text'], + }, + }, + ], + })); + + server.setRequestHandler(CallToolRequestSchema, async (request) => { + const { name, arguments: args } = request.params; + + if (name === 'send_message') { + const text = args?.text as string; + if (!text) { + return { content: [{ type: 'text', text: 'Error: text is required' }] }; + } + + const id = generateId(); + const platformId = (args?.platformId as string) || process.env.NANOCLAW_PLATFORM_ID || null; + const channelType = (args?.channel as string) || process.env.NANOCLAW_CHANNEL_TYPE || null; + const threadId = (args?.threadId as string) || process.env.NANOCLAW_THREAD_ID || null; + + writeMessageOut({ + id, + kind: 'chat', + platform_id: platformId, + channel_type: channelType, + thread_id: threadId, + content: JSON.stringify({ text }), + }); + + log(`send_message: ${id} → ${channelType || 'default'}/${platformId || 'default'}`); + return { content: [{ type: 'text', text: `Message sent (id: ${id})` }] }; + } + + return { content: [{ type: 'text', text: `Unknown tool: ${name}` }] }; + }); + + const transport = new StdioServerTransport(); + await server.connect(transport); + log('MCP server started'); +} + +// Run as standalone process +startMcpServer().catch((err) => { + log(`MCP server error: ${err instanceof Error ? err.message : String(err)}`); + process.exit(1); +}); diff --git a/container/agent-runner/src/poll-loop.test.ts b/container/agent-runner/src/poll-loop.test.ts new file mode 100644 index 000000000..7cc3074cb --- /dev/null +++ b/container/agent-runner/src/poll-loop.test.ts @@ -0,0 +1,210 @@ +import { describe, it, expect, beforeEach, afterEach } from 'vitest'; + +import { initTestSessionDb, closeSessionDb, getSessionDb } from './db/connection.js'; +import { getPendingMessages, markCompleted } from './db/messages-in.js'; +import { getUndeliveredMessages } from './db/messages-out.js'; +import { formatMessages, extractRouting } from './formatter.js'; +import { MockProvider } from './providers/mock.js'; + +beforeEach(() => { + initTestSessionDb(); +}); + +afterEach(() => { + closeSessionDb(); +}); + +function insertMessage(id: string, kind: string, content: object, opts?: { processAfter?: string }) { + getSessionDb() + .prepare( + `INSERT INTO messages_in (id, kind, timestamp, status, process_after, content) + VALUES (?, ?, datetime('now'), 'pending', ?, ?)`, + ) + .run(id, kind, opts?.processAfter ?? null, JSON.stringify(content)); +} + +describe('formatter', () => { + it('should format a single chat message', () => { + insertMessage('m1', 'chat', { sender: 'John', text: 'Hello world' }); + const messages = getPendingMessages(); + const prompt = formatMessages(messages); + expect(prompt).toContain('sender="John"'); + expect(prompt).toContain('Hello world'); + }); + + it('should format multiple chat messages as XML block', () => { + insertMessage('m1', 'chat', { sender: 'John', text: 'Hello' }); + insertMessage('m2', 'chat', { sender: 'Jane', text: 'Hi there' }); + const messages = getPendingMessages(); + const prompt = formatMessages(messages); + expect(prompt).toContain(''); + expect(prompt).toContain(''); + expect(prompt).toContain('sender="John"'); + expect(prompt).toContain('sender="Jane"'); + }); + + it('should format task messages', () => { + insertMessage('m1', 'task', { prompt: 'Review open PRs' }); + const messages = getPendingMessages(); + const prompt = formatMessages(messages); + expect(prompt).toContain('[SCHEDULED TASK]'); + expect(prompt).toContain('Review open PRs'); + }); + + it('should format webhook messages', () => { + insertMessage('m1', 'webhook', { source: 'github', event: 'push', payload: { ref: 'main' } }); + const messages = getPendingMessages(); + const prompt = formatMessages(messages); + expect(prompt).toContain('[WEBHOOK: github/push]'); + }); + + it('should format system messages', () => { + insertMessage('m1', 'system', { action: 'register_group', status: 'success', result: { id: 'ag-1' } }); + const messages = getPendingMessages(); + const prompt = formatMessages(messages); + expect(prompt).toContain('[SYSTEM RESPONSE]'); + expect(prompt).toContain('register_group'); + }); + + it('should handle mixed kinds', () => { + insertMessage('m1', 'chat', { sender: 'John', text: 'Hello' }); + insertMessage('m2', 'system', { action: 'test', status: 'ok', result: null }); + const messages = getPendingMessages(); + const prompt = formatMessages(messages); + expect(prompt).toContain('sender="John"'); + expect(prompt).toContain('[SYSTEM RESPONSE]'); + }); + + it('should escape XML in content', () => { + insertMessage('m1', 'chat', { sender: 'A y && z' }); + const messages = getPendingMessages(); + const prompt = formatMessages(messages); + expect(prompt).toContain('A<B'); + expect(prompt).toContain('x > y && z'); + }); +}); + +describe('routing', () => { + it('should extract routing from messages', () => { + getSessionDb() + .prepare( + `INSERT INTO messages_in (id, kind, timestamp, status, platform_id, channel_type, thread_id, content) + VALUES ('m1', 'chat', datetime('now'), 'pending', 'chan-123', 'discord', 'thread-456', '{"text":"hi"}')`, + ) + .run(); + + const messages = getPendingMessages(); + const routing = extractRouting(messages); + expect(routing.platformId).toBe('chan-123'); + expect(routing.channelType).toBe('discord'); + expect(routing.threadId).toBe('thread-456'); + expect(routing.inReplyTo).toBe('m1'); + }); +}); + +describe('mock provider', () => { + it('should produce init + result events', async () => { + const provider = new MockProvider((prompt) => `Echo: ${prompt}`); + const query = provider.query({ + prompt: 'Hello', + cwd: '/tmp', + mcpServers: {}, + env: {}, + }); + + const events: Array<{ type: string }> = []; + // End the stream after initial response + setTimeout(() => query.end(), 50); + + for await (const event of query.events) { + events.push(event); + } + + expect(events.length).toBeGreaterThanOrEqual(2); + expect(events[0].type).toBe('init'); + expect(events[1].type).toBe('result'); + expect((events[1] as { text: string }).text).toBe('Echo: Hello'); + }); + + it('should handle push() during active query', async () => { + const provider = new MockProvider((prompt) => `Re: ${prompt}`); + const query = provider.query({ + prompt: 'First', + cwd: '/tmp', + mcpServers: {}, + env: {}, + }); + + const events: Array<{ type: string; text?: string }> = []; + + // Push a follow-up after a short delay, then end + setTimeout(() => query.push('Second'), 30); + setTimeout(() => query.end(), 60); + + for await (const event of query.events) { + events.push(event); + } + + const results = events.filter((e) => e.type === 'result'); + expect(results).toHaveLength(2); + expect(results[0].text).toBe('Re: First'); + expect(results[1].text).toBe('Re: Second'); + }); +}); + +describe('end-to-end with mock provider', () => { + it('should read messages_in, process with mock provider, write messages_out', async () => { + // Insert a chat message + insertMessage('m1', 'chat', { sender: 'User', text: 'What is 2+2?' }); + + // Read and process + const messages = getPendingMessages(); + expect(messages).toHaveLength(1); + + const routing = extractRouting(messages); + const prompt = formatMessages(messages); + + // Create mock provider and run query + const provider = new MockProvider(() => 'The answer is 4'); + const query = provider.query({ + prompt, + cwd: '/tmp', + mcpServers: {}, + env: {}, + }); + + // Process events — simulate what poll-loop does + const { markProcessing } = await import('./db/messages-in.js'); + const { writeMessageOut } = await import('./db/messages-out.js'); + + markProcessing(['m1']); + + setTimeout(() => query.end(), 50); + + for await (const event of query.events) { + if (event.type === 'result' && event.text) { + writeMessageOut({ + id: `out-${Date.now()}`, + in_reply_to: routing.inReplyTo, + kind: 'chat', + platform_id: routing.platformId, + channel_type: routing.channelType, + thread_id: routing.threadId, + content: JSON.stringify({ text: event.text }), + }); + } + } + + markCompleted(['m1']); + + // Verify: message was processed + const processed = getPendingMessages(); + expect(processed).toHaveLength(0); + + // Verify: response was written + const outMessages = getUndeliveredMessages(); + expect(outMessages).toHaveLength(1); + expect(JSON.parse(outMessages[0].content).text).toBe('The answer is 4'); + expect(outMessages[0].in_reply_to).toBe('m1'); + }); +}); diff --git a/container/agent-runner/src/poll-loop.ts b/container/agent-runner/src/poll-loop.ts new file mode 100644 index 000000000..e2712a581 --- /dev/null +++ b/container/agent-runner/src/poll-loop.ts @@ -0,0 +1,162 @@ +import { getPendingMessages, markProcessing, markCompleted } from './db/messages-in.js'; +import { writeMessageOut } from './db/messages-out.js'; +import { formatMessages, extractRouting, type RoutingContext } from './formatter.js'; +import type { AgentProvider, AgentQuery, McpServerConfig, ProviderEvent } from './providers/types.js'; + +const POLL_INTERVAL_MS = 1000; +const ACTIVE_POLL_INTERVAL_MS = 500; + +function log(msg: string): void { + console.error(`[poll-loop] ${msg}`); +} + +function generateId(): string { + return `msg-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; +} + +export interface PollLoopConfig { + provider: AgentProvider; + cwd: string; + mcpServers: Record; + systemPrompt?: string; + env: Record; + additionalDirectories?: string[]; +} + +/** + * Main poll loop. Runs indefinitely until the process is killed. + * + * 1. Poll messages_in for pending rows + * 2. Format into prompt, call provider.query() + * 3. While query active: continue polling, push new messages via provider.push() + * 4. On result: write messages_out + * 5. Mark messages completed + * 6. Loop + */ +export async function runPollLoop(config: PollLoopConfig): Promise { + let sessionId: string | undefined; + let resumeAt: string | undefined; + + while (true) { + const messages = getPendingMessages(); + + if (messages.length === 0) { + await sleep(POLL_INTERVAL_MS); + continue; + } + + const ids = messages.map((m) => m.id); + markProcessing(ids); + + const routing = extractRouting(messages); + const prompt = formatMessages(messages); + + log(`Processing ${messages.length} message(s), kinds: ${[...new Set(messages.map((m) => m.kind))].join(',')}`); + + // Set routing context as env vars for MCP tools + setRoutingEnv(routing, config.env); + + const query = config.provider.query({ + prompt, + sessionId, + resumeAt, + cwd: config.cwd, + mcpServers: config.mcpServers, + systemPrompt: config.systemPrompt, + env: config.env, + additionalDirectories: config.additionalDirectories, + }); + + // Process the query while concurrently polling for new messages + const result = await processQuery(query, routing, config); + + if (result.sessionId) sessionId = result.sessionId; + if (result.resumeAt) resumeAt = result.resumeAt; + + markCompleted(ids); + log(`Completed ${ids.length} message(s)`); + } +} + +interface QueryResult { + sessionId?: string; + resumeAt?: string; +} + +async function processQuery(query: AgentQuery, routing: RoutingContext, config: PollLoopConfig): Promise { + let querySessionId: string | undefined; + let done = false; + + // Concurrent polling: push new messages into the active query + const pollHandle = setInterval(() => { + if (done) return; + const newMessages = getPendingMessages(); + if (newMessages.length === 0) return; + + const newIds = newMessages.map((m) => m.id); + markProcessing(newIds); + + const prompt = formatMessages(newMessages); + log(`Pushing ${newMessages.length} follow-up message(s) into active query`); + query.push(prompt); + + // Update routing env for MCP tools with latest message context + const newRouting = extractRouting(newMessages); + setRoutingEnv(newRouting, config.env); + + // Mark these completed immediately (they've been pushed to the provider) + markCompleted(newIds); + }, ACTIVE_POLL_INTERVAL_MS); + + try { + for await (const event of query.events) { + handleEvent(event, routing); + + if (event.type === 'init') { + querySessionId = event.sessionId; + } else if (event.type === 'result' && event.text) { + writeMessageOut({ + id: generateId(), + in_reply_to: routing.inReplyTo, + kind: routing.channelType ? 'chat' : 'chat', + platform_id: routing.platformId, + channel_type: routing.channelType, + thread_id: routing.threadId, + content: JSON.stringify({ text: event.text }), + }); + } + } + } finally { + done = true; + clearInterval(pollHandle); + } + + return { sessionId: querySessionId }; +} + +function handleEvent(event: ProviderEvent, _routing: RoutingContext): void { + switch (event.type) { + case 'init': + log(`Session: ${event.sessionId}`); + break; + case 'result': + log(`Result: ${event.text ? event.text.slice(0, 200) : '(empty)'}`); + break; + case 'error': + log(`Error: ${event.message} (retryable: ${event.retryable}${event.classification ? `, ${event.classification}` : ''})`); + break; + case 'progress': + log(`Progress: ${event.message}`); + break; + } +} + +function setRoutingEnv(routing: RoutingContext, env: Record): void { + env.NANOCLAW_PLATFORM_ID = routing.platformId ?? undefined; + env.NANOCLAW_CHANNEL_TYPE = routing.channelType ?? undefined; + env.NANOCLAW_THREAD_ID = routing.threadId ?? undefined; +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} diff --git a/container/agent-runner/src/providers/claude.ts b/container/agent-runner/src/providers/claude.ts new file mode 100644 index 000000000..c25ff377b --- /dev/null +++ b/container/agent-runner/src/providers/claude.ts @@ -0,0 +1,231 @@ +import fs from 'fs'; +import path from 'path'; + +import { query as sdkQuery, type HookCallback, type PreCompactHookInput } from '@anthropic-ai/claude-agent-sdk'; + +import type { AgentProvider, AgentQuery, ProviderEvent, QueryInput } from './types.js'; + +function log(msg: string): void { + console.error(`[claude-provider] ${msg}`); +} + +// Tool allowlist for NanoClaw agent containers +const TOOL_ALLOWLIST = [ + 'Bash', + 'Read', + 'Write', + 'Edit', + 'Glob', + 'Grep', + 'WebSearch', + 'WebFetch', + 'Task', + 'TaskOutput', + 'TaskStop', + 'TeamCreate', + 'TeamDelete', + 'SendMessage', + 'TodoWrite', + 'ToolSearch', + 'Skill', + 'NotebookEdit', + 'mcp__nanoclaw__*', +]; + +interface SDKUserMessage { + type: 'user'; + message: { role: 'user'; content: string }; + parent_tool_use_id: null; + session_id: string; +} + +/** + * Push-based async iterable for streaming user messages to the Claude SDK. + */ +class MessageStream { + private queue: SDKUserMessage[] = []; + private waiting: (() => void) | null = null; + private done = false; + + push(text: string): void { + this.queue.push({ + type: 'user', + message: { role: 'user', content: text }, + parent_tool_use_id: null, + session_id: '', + }); + this.waiting?.(); + } + + end(): void { + this.done = true; + this.waiting?.(); + } + + async *[Symbol.asyncIterator](): AsyncGenerator { + while (true) { + while (this.queue.length > 0) { + yield this.queue.shift()!; + } + if (this.done) return; + await new Promise((r) => { + this.waiting = r; + }); + this.waiting = null; + } + } +} + +// ── Transcript archiving (PreCompact hook) ── + +interface ParsedMessage { + role: 'user' | 'assistant'; + content: string; +} + +function parseTranscript(content: string): ParsedMessage[] { + const messages: ParsedMessage[] = []; + for (const line of content.split('\n')) { + if (!line.trim()) continue; + try { + const entry = JSON.parse(line); + if (entry.type === 'user' && entry.message?.content) { + const text = typeof entry.message.content === 'string' ? entry.message.content : entry.message.content.map((c: { text?: string }) => c.text || '').join(''); + if (text) messages.push({ role: 'user', content: text }); + } else if (entry.type === 'assistant' && entry.message?.content) { + const textParts = entry.message.content.filter((c: { type: string }) => c.type === 'text').map((c: { text: string }) => c.text); + const text = textParts.join(''); + if (text) messages.push({ role: 'assistant', content: text }); + } + } catch { + /* skip unparseable lines */ + } + } + return messages; +} + +function formatTranscriptMarkdown(messages: ParsedMessage[], title?: string | null, assistantName?: string): string { + const now = new Date(); + const dateStr = now.toLocaleString('en-US', { month: 'short', day: 'numeric', hour: 'numeric', minute: '2-digit', hour12: true }); + const lines = [`# ${title || 'Conversation'}`, '', `Archived: ${dateStr}`, '', '---', '']; + for (const msg of messages) { + const sender = msg.role === 'user' ? 'User' : assistantName || 'Assistant'; + const content = msg.content.length > 2000 ? msg.content.slice(0, 2000) + '...' : msg.content; + lines.push(`**${sender}**: ${content}`, ''); + } + return lines.join('\n'); +} + +function createPreCompactHook(assistantName?: string): HookCallback { + return async (input) => { + const preCompact = input as PreCompactHookInput; + const { transcript_path: transcriptPath, session_id: sessionId } = preCompact; + + if (!transcriptPath || !fs.existsSync(transcriptPath)) { + log('No transcript found for archiving'); + return {}; + } + + try { + const content = fs.readFileSync(transcriptPath, 'utf-8'); + const messages = parseTranscript(content); + if (messages.length === 0) return {}; + + // Try to get summary from sessions index + let summary: string | undefined; + const indexPath = path.join(path.dirname(transcriptPath), 'sessions-index.json'); + if (fs.existsSync(indexPath)) { + try { + const index = JSON.parse(fs.readFileSync(indexPath, 'utf-8')); + summary = index.entries?.find((e: { sessionId: string; summary?: string }) => e.sessionId === sessionId)?.summary; + } catch { + /* ignore */ + } + } + + const name = summary + ? summary.toLowerCase().replace(/[^a-z0-9]+/g, '-').replace(/^-+|-+$/g, '').slice(0, 50) + : `conversation-${new Date().getHours().toString().padStart(2, '0')}${new Date().getMinutes().toString().padStart(2, '0')}`; + + const conversationsDir = '/workspace/agent/conversations'; + fs.mkdirSync(conversationsDir, { recursive: true }); + const filename = `${new Date().toISOString().split('T')[0]}-${name}.md`; + fs.writeFileSync(path.join(conversationsDir, filename), formatTranscriptMarkdown(messages, summary, assistantName)); + log(`Archived conversation to ${filename}`); + } catch (err) { + log(`Failed to archive transcript: ${err instanceof Error ? err.message : String(err)}`); + } + return {}; + }; +} + +// ── Provider ── + +export class ClaudeProvider implements AgentProvider { + private assistantName?: string; + + constructor(opts?: { assistantName?: string }) { + this.assistantName = opts?.assistantName; + } + + query(input: QueryInput): AgentQuery { + const stream = new MessageStream(); + stream.push(input.prompt); + + const sdkResult = sdkQuery({ + prompt: stream, + options: { + cwd: input.cwd, + additionalDirectories: input.additionalDirectories, + resume: input.sessionId, + resumeSessionAt: input.resumeAt, + systemPrompt: input.systemPrompt ? { type: 'preset' as const, preset: 'claude_code' as const, append: input.systemPrompt } : undefined, + allowedTools: TOOL_ALLOWLIST, + env: input.env, + permissionMode: 'bypassPermissions', + allowDangerouslySkipPermissions: true, + settingSources: ['project', 'user'], + mcpServers: input.mcpServers, + hooks: { + PreCompact: [{ hooks: [createPreCompactHook(this.assistantName)] }], + }, + }, + }); + + let aborted = false; + + async function* translateEvents(): AsyncGenerator { + let messageCount = 0; + for await (const message of sdkResult) { + if (aborted) return; + messageCount++; + + if (message.type === 'system' && message.subtype === 'init') { + yield { type: 'init', sessionId: message.session_id }; + } else if (message.type === 'result') { + const text = 'result' in message ? (message as { result?: string }).result ?? null : null; + yield { type: 'result', text }; + } else if (message.type === 'system' && (message as { subtype?: string }).subtype === 'api_retry') { + yield { type: 'error', message: 'API retry', retryable: true }; + } else if (message.type === 'system' && (message as { subtype?: string }).subtype === 'rate_limit_event') { + yield { type: 'error', message: 'Rate limit', retryable: false, classification: 'quota' }; + } else if (message.type === 'system' && (message as { subtype?: string }).subtype === 'task_notification') { + const tn = message as { summary?: string }; + yield { type: 'progress', message: tn.summary || 'Task notification' }; + } + // All other message types are logged but not emitted + } + log(`Query completed after ${messageCount} SDK messages`); + } + + return { + push: (msg) => stream.push(msg), + end: () => stream.end(), + events: translateEvents(), + abort: () => { + aborted = true; + stream.end(); + }, + }; + } +} diff --git a/container/agent-runner/src/providers/factory.ts b/container/agent-runner/src/providers/factory.ts new file mode 100644 index 000000000..077fd0888 --- /dev/null +++ b/container/agent-runner/src/providers/factory.ts @@ -0,0 +1,16 @@ +import type { AgentProvider } from './types.js'; +import { ClaudeProvider } from './claude.js'; +import { MockProvider } from './mock.js'; + +export type ProviderName = 'claude' | 'mock'; + +export function createProvider(name: ProviderName, opts?: { assistantName?: string }): AgentProvider { + switch (name) { + case 'claude': + return new ClaudeProvider(opts); + case 'mock': + return new MockProvider(); + default: + throw new Error(`Unknown provider: ${name}`); + } +} diff --git a/container/agent-runner/src/providers/mock.ts b/container/agent-runner/src/providers/mock.ts new file mode 100644 index 000000000..ed5cad138 --- /dev/null +++ b/container/agent-runner/src/providers/mock.ts @@ -0,0 +1,66 @@ +import type { AgentProvider, AgentQuery, ProviderEvent, QueryInput } from './types.js'; + +/** + * Mock provider for testing. Returns canned responses. + * Supports push() — queued messages produce additional results. + */ +export class MockProvider implements AgentProvider { + private responseFactory: (prompt: string) => string; + + constructor(responseFactory?: (prompt: string) => string) { + this.responseFactory = responseFactory ?? ((prompt) => `Mock response to: ${prompt.slice(0, 100)}`); + } + + query(input: QueryInput): AgentQuery { + const pending: string[] = []; + let waiting: (() => void) | null = null; + let ended = false; + let aborted = false; + const responseFactory = this.responseFactory; + + const events: AsyncIterable = { + async *[Symbol.asyncIterator]() { + yield { type: 'init', sessionId: `mock-session-${Date.now()}` }; + + // Process initial prompt + yield { type: 'result', text: responseFactory(input.prompt) }; + + // Process any pushed follow-ups + while (!ended && !aborted) { + if (pending.length > 0) { + const msg = pending.shift()!; + yield { type: 'result', text: responseFactory(msg) }; + continue; + } + // Wait for push() or end() + await new Promise((resolve) => { + waiting = resolve; + }); + waiting = null; + } + + // Drain remaining + while (pending.length > 0) { + const msg = pending.shift()!; + yield { type: 'result', text: responseFactory(msg) }; + } + }, + }; + + return { + push(message: string) { + pending.push(message); + waiting?.(); + }, + end() { + ended = true; + waiting?.(); + }, + events, + abort() { + aborted = true; + waiting?.(); + }, + }; + } +} diff --git a/container/agent-runner/src/providers/types.ts b/container/agent-runner/src/providers/types.ts new file mode 100644 index 000000000..6e43f3b3a --- /dev/null +++ b/container/agent-runner/src/providers/types.ts @@ -0,0 +1,56 @@ +export interface AgentProvider { + /** Start a new query. Returns a handle for streaming input and output. */ + query(input: QueryInput): AgentQuery; +} + +export interface QueryInput { + /** Initial prompt (already formatted by agent-runner). */ + prompt: string; + + /** Session ID to resume, if any. */ + sessionId?: string; + + /** Resume from a specific point in the session (provider-specific). */ + resumeAt?: string; + + /** Working directory inside the container. */ + cwd: string; + + /** MCP server configurations. */ + mcpServers: Record; + + /** System prompt / developer instructions. */ + systemPrompt?: string; + + /** Environment variables for the SDK process. */ + env: Record; + + /** Additional directories the agent can access. */ + additionalDirectories?: string[]; +} + +export interface McpServerConfig { + command: string; + args: string[]; + env: Record; +} + +export interface AgentQuery { + /** Push a follow-up message into the active query. */ + push(message: string): void; + + /** Signal that no more input will be sent. */ + end(): void; + + /** Output event stream. */ + events: AsyncIterable; + + /** Force-stop the query. */ + abort(): void; +} + +export type ProviderEvent = + | { type: 'init'; sessionId: string } + | { type: 'result'; text: string | null } + | { type: 'error'; message: string; retryable: boolean; classification?: string } + | { type: 'progress'; message: string }; diff --git a/vitest.config.ts b/vitest.config.ts index a456d1cc3..d78e79521 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -2,6 +2,6 @@ import { defineConfig } from 'vitest/config'; export default defineConfig({ test: { - include: ['src/**/*.test.ts', 'setup/**/*.test.ts'], + include: ['src/**/*.test.ts', 'setup/**/*.test.ts', 'container/agent-runner/src/**/*.test.ts'], }, });