feat(v2): OneCLI 0.3.1 — approvals, credential collection, threaded routing

Three features built on top of @onecli-sh/sdk 0.3.1, landed together because
they share wiring surfaces (session DB schema, delivery dispatcher, Chat SDK
bridge, channel adapter contract).

## OneCLI manual-approval handler

* `src/onecli-approvals.ts` — long-polls OneCLI via the SDK's
  `configureManualApproval`; on each request, delivers an `ask_question` card
  to the admin agent group's first messaging group, persists a
  `pending_approvals` row, and waits on an in-memory Promise resolved by the
  admin's button click or an expiry timer. Expired cards are edited to
  "Expired (...)" and a startup sweep flushes any rows left over from a
  previous process.
* Short 11-byte approval id (`oa-<8 base36>`) instead of the SDK's UUID so the
  Telegram 64-byte `callback_data` limit is respected; the OneCLI UUID stays
  in the persisted payload for audit.
* Migration 003 consolidated: `pending_approvals` now has the OneCLI-aware
  columns from the start (`agent_group_id`, `channel_type`, `platform_id`,
  `platform_message_id`, `expires_at`, `status`), `session_id` relaxed to
  nullable so cross-session approvals fit.
* `handleQuestionResponse` in `src/index.ts` now routes OneCLI approvals
  through `resolveOneCLIApproval` before falling back to the
  session-bound approval path.

## Credential collection from chat

New `trigger_credential_collection` MCP tool — the agent researches a
third-party API, calls the tool with `{name, hostPattern, headerName,
valueFormat, description}`, and blocks until the host reports saved, rejected,
or failed. The credential value never enters the agent's context: the user
submits it into a Chat SDK Modal on the host side, the host writes it to
OneCLI via a thin facade (`src/onecli-secrets.ts` — shells out to
`onecli secrets create`, shape mirrors the SDK we expect upstream), and only
the status string flows back to the container via a system message.

* `src/credentials.ts` — host-side handler: delivers the card to the
  conversation's own channel (not the admin channel — credential collection
  is a user-facing flow, distinct from admin approval), persists a
  `pending_credentials` row, drives the submit → `createSecret` → notify
  pipeline. Falls back gracefully when the channel doesn't support modals.
* `src/db/credentials.ts` + migration 005: `pending_credentials` table.
* `src/channels/chat-sdk-bridge.ts`: renders a `credential_request` card,
  handles the `nccr:` action prefix by opening a Modal with a TextInput,
  registers an `onModalSubmit` handler for the `nccm:` callback prefix.
* `container/agent-runner/src/mcp-tools/credentials.ts`: the blocking MCP
  tool, mirroring the `ask_user_question` polling pattern.
* `container/agent-runner/src/db/messages-in.ts`: `findCredentialResponse`
  helper to pick up the system message the host writes back.

## Threaded adapter routing

The destination layer previously didn't carry thread context, so agent replies
to Discord always landed in the root channel regardless of which thread the
inbound came from.

* `ChannelAdapter.supportsThreads: boolean` — declared by every channel skill
  at `createChatSdkBridge`. Threaded: Discord, Slack, Teams, Google Chat,
  Linear, GitHub, Webex. Non-threaded: Telegram, WhatsApp Cloud, Matrix,
  Resend, iMessage.
* `src/router.ts`: non-threaded adapters strip `threadId` at ingest (threads
  collapse to channel-level sessions). Threaded adapters override the
  wiring's `session_mode` to `'per-thread'` so each thread = a session
  (except `agent-shared`, which is preserved as a cross-channel intent the
  adapter can't know about).
* `session_routing` table in `inbound.db` — single-row default reply routing
  written by the host on every container wake from
  `session.messaging_group_id` + `session.thread_id`. Forward-compat
  `CREATE TABLE IF NOT EXISTS` handles older session DBs lazily.
* `container/agent-runner/src/db/session-routing.ts` — container-side reader.
* `send_message` / `send_file` / `ask_user_question` / `send_card` /
  scheduling tools all default their routing (channel, platform, **and**
  thread) from the session when no explicit `to` is given. Explicit `to`
  uses the destination's channel with `thread_id = null` (cross-destination
  sends start a new conversation elsewhere).
* `poll-loop.ts::sendToDestination` (the final-text single-destination
  shortcut) now inherits `thread_id` from `RoutingContext` too — this was
  the root cause of Discord replies landing in the root channel even after
  `send_message` was wired correctly.

## Related cleanups

* `src/container-runner.ts`: OneCLI agent identifier switched from the lossy
  folder-derived string to `agent_group.id`, making `getAgentGroup(externalId)`
  a trivial reverse lookup for per-agent scoping.
* `wakeContainer` race fix via an in-flight promise map — concurrent wakes
  during the async buildContainerArgs / OneCLI `applyContainerConfig` window
  no longer double-spawn containers against the same session directory.
* `src/db/db-v2.test.ts`: dropped the brittle `expect(row.v).toBe(N)` schema
  version assertion — it had to be bumped on every migration addition.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
gavrielc
2026-04-11 17:18:21 +03:00
parent 9dc8bc5d99
commit e92b245399
43 changed files with 1391 additions and 70 deletions
+9 -1
View File
@@ -7,7 +7,15 @@ export {
touchHeartbeat,
clearStaleProcessingAcks,
} from './connection.js';
export { getPendingMessages, markProcessing, markCompleted, markFailed, getMessageIn, findQuestionResponse } from './messages-in.js';
export {
getPendingMessages,
markProcessing,
markCompleted,
markFailed,
getMessageIn,
findQuestionResponse,
findCredentialResponse,
} from './messages-in.js';
export type { MessageInRow } from './messages-in.js';
export { writeMessageOut, getUndeliveredMessages } from './messages-out.js';
export type { MessageOutRow, WriteMessageOut } from './messages-out.js';
@@ -112,3 +112,20 @@ export function findQuestionResponse(questionId: string): MessageInRow | undefin
return response;
}
/** Find a pending credential_response system message for a given credential id. */
export function findCredentialResponse(credentialId: string): MessageInRow | undefined {
const inbound = getInboundDb();
const outbound = getOutboundDb();
const response = inbound
.prepare("SELECT * FROM messages_in WHERE status = 'pending' AND kind = 'system' AND content LIKE ?")
.get(`%"credentialId":"${credentialId}"%`) as MessageInRow | undefined;
if (!response) return undefined;
const acked = outbound.prepare('SELECT 1 FROM processing_ack WHERE message_id = ?').get(response.id);
if (acked) return undefined;
return response;
}
@@ -0,0 +1,30 @@
/**
* Default reply routing for this session — written by the host on every
* container wake (see src/session-manager.ts `writeSessionRouting`).
*
* Read by the MCP tools as the default destination for outbound messages
* when the agent doesn't specify an explicit `to`. This is what makes
* "agent replies in the thread it's currently in" work: the router strips
* or preserves thread_id based on the adapter's thread support, and we
* just read the fixed routing the host committed for this session.
*/
import { getInboundDb } from './connection.js';
export interface SessionRouting {
channel_type: string | null;
platform_id: string | null;
thread_id: string | null;
}
export function getSessionRouting(): SessionRouting {
const db = getInboundDb();
try {
const row = db
.prepare('SELECT channel_type, platform_id, thread_id FROM session_routing WHERE id = 1')
.get() as SessionRouting | undefined;
if (row) return row;
} catch {
// Table may not exist on an older session DB — fall through to defaults
}
return { channel_type: null, platform_id: null, thread_id: null };
}
+35 -12
View File
@@ -11,6 +11,7 @@ import path from 'path';
import { findByName, getAllDestinations } from '../destinations.js';
import { getMessageIdBySeq, getRoutingBySeq, writeMessageOut } from '../db/messages-out.js';
import { getSessionRouting } from '../db/session-routing.js';
import type { McpToolDefinition } from './types.js';
function log(msg: string): void {
@@ -37,14 +38,31 @@ function destinationList(): string {
/**
* Resolve a destination name to routing fields.
* If `to` is omitted and the agent has exactly one destination, that one is used.
* With multiple destinations, omitting `to` is an error.
*
* If `to` is omitted, use the session's default reply routing (channel +
* thread the conversation is in) — the agent replies in place.
*
* If `to` is specified, look up the named destination; thread_id is null
* because a cross-destination send starts a new conversation elsewhere.
*/
function resolveRouting(
to: string | undefined,
): { channel_type: string; platform_id: string; resolvedName: string } | { error: string } {
let name = to;
if (!name) {
):
| { channel_type: string; platform_id: string; thread_id: string | null; resolvedName: string }
| { error: string } {
if (!to) {
// Default: reply to whatever thread/channel this session is bound to.
const session = getSessionRouting();
if (session.channel_type && session.platform_id) {
return {
channel_type: session.channel_type,
platform_id: session.platform_id,
thread_id: session.thread_id,
resolvedName: '(current conversation)',
};
}
// No session routing (e.g., agent-shared or internal-only agent) —
// fall back to the legacy single-destination shortcut.
const all = getAllDestinations();
if (all.length === 0) return { error: 'No destinations configured.' };
if (all.length > 1) {
@@ -52,14 +70,19 @@ function resolveRouting(
error: `You have multiple destinations — specify "to". Options: ${all.map((d) => d.name).join(', ')}`,
};
}
name = all[0].name;
to = all[0].name;
}
const dest = findByName(name);
if (!dest) return { error: `Unknown destination "${name}". Known: ${destinationList()}` };
const dest = findByName(to);
if (!dest) return { error: `Unknown destination "${to}". Known: ${destinationList()}` };
if (dest.type === 'channel') {
return { channel_type: dest.channelType!, platform_id: dest.platformId!, resolvedName: name };
return {
channel_type: dest.channelType!,
platform_id: dest.platformId!,
thread_id: null,
resolvedName: to,
};
}
return { channel_type: 'agent', platform_id: dest.agentGroupId!, resolvedName: name };
return { channel_type: 'agent', platform_id: dest.agentGroupId!, thread_id: null, resolvedName: to };
}
export const sendMessage: McpToolDefinition = {
@@ -89,7 +112,7 @@ export const sendMessage: McpToolDefinition = {
kind: 'chat',
platform_id: routing.platform_id,
channel_type: routing.channel_type,
thread_id: null,
thread_id: routing.thread_id,
content: JSON.stringify({ text }),
});
@@ -135,7 +158,7 @@ export const sendFile: McpToolDefinition = {
kind: 'chat',
platform_id: routing.platform_id,
channel_type: routing.channel_type,
thread_id: null,
thread_id: routing.thread_id,
content: JSON.stringify({ text: (args.text as string) || '', files: [filename] }),
});
@@ -0,0 +1,132 @@
/**
* Credential collection MCP tool.
*
* trigger_credential_collection sends a card to the user and blocks until the
* host reports back whether the credential was saved, rejected, or failed.
* The credential value NEVER enters agent context — the user submits it into
* a modal whose value is consumed entirely on the host side, and the host
* only writes back a status string.
*/
import { findCredentialResponse, markCompleted } from '../db/messages-in.js';
import { writeMessageOut } from '../db/messages-out.js';
import type { McpToolDefinition } from './types.js';
function log(msg: string): void {
console.error(`[mcp-tools] ${msg}`);
}
function generateId(): string {
return `cred-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
}
function ok(text: string) {
return { content: [{ type: 'text' as const, text }] };
}
function err(text: string) {
return { content: [{ type: 'text' as const, text: `Error: ${text}` }], isError: true };
}
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
export const triggerCredentialCollection: McpToolDefinition = {
tool: {
name: 'trigger_credential_collection',
description:
'Collect a credential (API key, token, etc.) from the user for a third-party service. Research the service first so you can pass the correct host pattern, header name, and value format. A card is sent to the user with a button that opens a secure input modal — the value is inserted directly into OneCLI and never enters your context. Blocks until the user saves, rejects, or the request fails.',
inputSchema: {
type: 'object' as const,
properties: {
name: {
type: 'string',
description: 'Display name for the secret (e.g. "Resend API Key").',
},
type: {
type: 'string',
enum: ['generic', 'anthropic'],
description: "Secret type. Use 'generic' for most third-party APIs; 'anthropic' is reserved for Anthropic API keys.",
},
hostPattern: {
type: 'string',
description: 'Host pattern to match (e.g. "api.resend.com"). Used by OneCLI to know when to inject this credential.',
},
pathPattern: {
type: 'string',
description: 'Optional path pattern to match (e.g. "/v1/*").',
},
headerName: {
type: 'string',
description: 'Header name to inject the credential into (e.g. "Authorization"). Required for generic type.',
},
valueFormat: {
type: 'string',
description: 'Value format template. Use {value} as the placeholder. Example: "Bearer {value}". Defaults to "{value}".',
},
description: {
type: 'string',
description: 'User-facing explanation shown on the card and in the input modal.',
},
timeout: {
type: 'number',
description: 'Timeout in seconds (default: 600).',
},
},
required: ['name', 'hostPattern'],
},
},
async handler(args) {
const name = args.name as string;
const type = ((args.type as string) || 'generic') as 'generic' | 'anthropic';
const hostPattern = args.hostPattern as string;
const pathPattern = (args.pathPattern as string) || '';
const headerName = (args.headerName as string) || '';
const valueFormat = (args.valueFormat as string) || '';
const description = (args.description as string) || '';
const timeoutMs = ((args.timeout as number) || 600) * 1000;
if (!name || !hostPattern) return err('name and hostPattern are required');
const credentialId = generateId();
writeMessageOut({
id: credentialId,
kind: 'system',
content: JSON.stringify({
action: 'request_credential',
credentialId,
name,
type,
hostPattern,
pathPattern,
headerName,
valueFormat,
description,
}),
});
log(`trigger_credential_collection: ${credentialId}${name} (${hostPattern})`);
const deadline = Date.now() + timeoutMs;
while (Date.now() < deadline) {
const response = findCredentialResponse(credentialId);
if (response) {
const parsed = JSON.parse(response.content) as {
status: 'saved' | 'rejected' | 'failed';
detail?: string;
};
markCompleted([response.id]);
log(`trigger_credential_collection result: ${credentialId}${parsed.status}`);
if (parsed.status === 'saved') return ok(parsed.detail || 'Credential saved.');
if (parsed.status === 'rejected') return err(parsed.detail || 'Credential request rejected.');
return err(parsed.detail || 'Credential request failed.');
}
await sleep(1000);
}
log(`trigger_credential_collection timeout: ${credentialId}`);
return err(`Credential request timed out after ${timeoutMs / 1000}s`);
},
};
export const credentialTools: McpToolDefinition[] = [triggerCredentialCollection];
@@ -15,6 +15,7 @@ import { schedulingTools } from './scheduling.js';
import { interactiveTools } from './interactive.js';
import { agentTools } from './agents.js';
import { selfModTools } from './self-mod.js';
import { credentialTools } from './credentials.js';
function log(msg: string): void {
console.error(`[mcp-tools] ${msg}`);
@@ -32,6 +33,7 @@ const allTools: McpToolDefinition[] = [
...interactiveTools,
...conditionalAgentTools,
...selfModTools,
...credentialTools,
];
const toolMap = new Map<string, McpToolDefinition>();
@@ -6,6 +6,7 @@
*/
import { findQuestionResponse, markCompleted } from '../db/messages-in.js';
import { writeMessageOut } from '../db/messages-out.js';
import { getSessionRouting } from '../db/session-routing.js';
import type { McpToolDefinition } from './types.js';
function log(msg: string): void {
@@ -17,11 +18,7 @@ function generateId(): string {
}
function routing() {
return {
platform_id: process.env.NANOCLAW_PLATFORM_ID || null,
channel_type: process.env.NANOCLAW_CHANNEL_TYPE || null,
thread_id: process.env.NANOCLAW_THREAD_ID || null,
};
return getSessionRouting();
}
function ok(text: string) {
@@ -7,6 +7,7 @@
*/
import { getInboundDb } from '../db/connection.js';
import { writeMessageOut } from '../db/messages-out.js';
import { getSessionRouting } from '../db/session-routing.js';
import type { McpToolDefinition } from './types.js';
function log(msg: string): void {
@@ -18,11 +19,7 @@ function generateId(): string {
}
function routing() {
return {
platform_id: process.env.NANOCLAW_PLATFORM_ID || null,
channel_type: process.env.NANOCLAW_CHANNEL_TYPE || null,
thread_id: process.env.NANOCLAW_THREAD_ID || null,
};
return getSessionRouting();
}
function ok(text: string) {
+4 -1
View File
@@ -387,13 +387,16 @@ function dispatchResultText(text: string, routing: RoutingContext): void {
function sendToDestination(dest: DestinationEntry, body: string, routing: RoutingContext): void {
const platformId = dest.type === 'channel' ? dest.platformId! : dest.agentGroupId!;
const channelType = dest.type === 'channel' ? dest.channelType! : 'agent';
// Inherit thread_id from the inbound routing context so replies land in the
// same thread the conversation is in. For non-threaded adapters the router
// strips thread_id at ingest, so this will already be null.
writeMessageOut({
id: generateId(),
in_reply_to: routing.inReplyTo,
kind: 'chat',
platform_id: platformId,
channel_type: channelType,
thread_id: null,
thread_id: routing.threadId,
content: JSON.stringify({ text: body }),
});
}
+5 -4
View File
@@ -19,7 +19,7 @@
"@chat-adapter/teams": "^4.24.0",
"@chat-adapter/telegram": "^4.24.0",
"@chat-adapter/whatsapp": "^4.24.0",
"@onecli-sh/sdk": "^0.2.0",
"@onecli-sh/sdk": "^0.3.1",
"@resend/chat-sdk-adapter": "^0.1.1",
"better-sqlite3": "11.10.0",
"chat": "^4.24.0",
@@ -1881,9 +1881,10 @@
}
},
"node_modules/@onecli-sh/sdk": {
"version": "0.2.0",
"resolved": "https://registry.npmjs.org/@onecli-sh/sdk/-/sdk-0.2.0.tgz",
"integrity": "sha512-u7PqWROEvTV9f0ADVkjigTrd2AZn3klbPrv7GGpeRHIJpjAxJUdlWqxr5kiGt6qTDKL8t3nq76xr4X2pxTiyBg==",
"version": "0.3.1",
"resolved": "https://registry.npmjs.org/@onecli-sh/sdk/-/sdk-0.3.1.tgz",
"integrity": "sha512-oMSa4DUCVS52vec41nFOg3XdCBTbMVEZdCFCsaUd9sRXVorCPWd3VyZq4giXsmk4g09DA/zLjsnrY7l6G94Ulg==",
"license": "MIT",
"engines": {
"node": ">=20"
}
+1 -1
View File
@@ -32,7 +32,7 @@
"@chat-adapter/teams": "^4.24.0",
"@chat-adapter/telegram": "^4.24.0",
"@chat-adapter/whatsapp": "^4.24.0",
"@onecli-sh/sdk": "^0.2.0",
"@onecli-sh/sdk": "^0.3.1",
"@resend/chat-sdk-adapter": "^0.1.1",
"better-sqlite3": "11.10.0",
"chat": "^4.24.0",
+18
View File
@@ -27,6 +27,12 @@ export interface ChannelSetup {
/** Called when a user clicks a button/action in a card (e.g., ask_user_question response). */
onAction(questionId: string, selectedOption: string, userId: string): void;
/** Credential collection hooks — used by chat-sdk-bridge to route the modal flow. */
getCredentialForModal?(credentialId: string): { name: string; description: string | null; hostPattern: string } | null;
onCredentialReject?(credentialId: string): void;
onCredentialSubmit?(credentialId: string, value: string): void;
onCredentialChannelUnsupported?(credentialId: string): void;
}
/** Inbound message from adapter to host. */
@@ -62,6 +68,18 @@ export interface ChannelAdapter {
name: string;
channelType: string;
/**
* Whether this adapter models conversations as threads.
*
* true — adapter's platform uses threads as the primary conversation unit
* (Discord, Slack, Linear, GitHub). One thread = one session; the
* agent replies into the originating thread.
* false — adapter's platform treats the channel itself as the conversation
* (Telegram, WhatsApp, iMessage). Thread ids are stripped at the
* router; agent replies go to the channel.
*/
supportsThreads: boolean;
// Lifecycle
setup(config: ChannelSetup): Promise<void>;
teardown(): Promise<void>;
+1
View File
@@ -39,6 +39,7 @@ function createMockAdapter(
return {
name: channelType,
channelType,
supportsThreads: false,
delivered,
inbound,
+110 -1
View File
@@ -12,6 +12,8 @@ import {
CardText,
Actions,
Button,
Modal,
TextInput,
type Adapter,
type ConcurrencyStrategy,
type Message as ChatMessage,
@@ -47,6 +49,13 @@ export interface ChatSdkBridgeConfig {
botToken?: string;
/** Platform-specific reply context extraction. */
extractReplyContext?: ReplyContextExtractor;
/**
* Whether this platform uses threads as the primary conversation unit.
* See `ChannelAdapter.supportsThreads`. Declared by the calling channel
* skill, not inferred, because some platforms (Discord) can be used either
* way and the default depends on installation style.
*/
supportsThreads: boolean;
}
export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter {
@@ -116,6 +125,7 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter
return {
name: adapter.name,
channelType: adapter.name,
supportsThreads: config.supportsThreads,
async setup(hostConfig: ChannelSetup) {
setupConfig = hostConfig;
@@ -151,8 +161,75 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter
await thread.subscribe();
});
// Handle button clicks (ask_user_question responses)
// Handle button clicks (ask_user_question, credential card)
chat.onAction(async (event) => {
// Credential card actions: nccr:<credentialId>:<enter|reject>
if (event.actionId.startsWith('nccr:')) {
const [, credentialId, subAction] = event.actionId.split(':');
if (!credentialId || !subAction) return;
if (subAction === 'reject') {
try {
await adapter.editMessage(event.threadId, event.messageId, {
markdown: `🔑 Credential request\n\n❌ Rejected`,
});
} catch (err) {
log.warn('Failed to update credential card after reject', { err });
}
setupConfig.onCredentialReject?.(credentialId);
return;
}
if (subAction === 'enter') {
const pending = setupConfig.getCredentialForModal?.(credentialId);
if (!pending) {
log.warn('Credential card clicked but row not pending', { credentialId });
return;
}
try {
const modalChildren = [
CardText(
pending.description ??
`Enter the value for ${pending.name} (host: ${pending.hostPattern}).`,
),
TextInput({
id: 'value',
label: pending.name,
placeholder: 'Paste your credential value',
}),
];
// Modal children include a text element for context; the SDK
// accepts TextElement in ModalChild so this is valid.
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const modal = Modal({
callbackId: `nccm:${credentialId}`,
title: 'Enter credential',
submitLabel: 'Save',
// eslint-disable-next-line @typescript-eslint/no-explicit-any
children: modalChildren as any,
});
const result = await event.openModal(modal);
if (!result) {
log.warn('openModal returned undefined — channel unsupported', { credentialId });
setupConfig.onCredentialChannelUnsupported?.(credentialId);
try {
await adapter.editMessage(event.threadId, event.messageId, {
markdown: `🔑 Credential request\n\n⚠️ This channel does not support modals.`,
});
} catch {
// best effort
}
}
} catch (err) {
log.error('Failed to open credential modal', { credentialId, err });
setupConfig.onCredentialChannelUnsupported?.(credentialId);
}
return;
}
return;
}
if (!event.actionId.startsWith('ncq:')) return;
const parts = event.actionId.split(':');
if (parts.length < 3) return;
@@ -173,6 +250,18 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter
setupConfig.onAction(questionId, selectedOption, userId);
});
// Modal submissions for credential collection
chat.onModalSubmit(async (event) => {
if (!event.callbackId.startsWith('nccm:')) return;
const credentialId = event.callbackId.slice('nccm:'.length);
const value = event.values?.value ?? '';
if (!value) {
log.warn('Credential modal submitted with empty value', { credentialId });
return;
}
setupConfig.onCredentialSubmit?.(credentialId, value);
});
await chat.initialize();
// Start Gateway listener for adapters that support it (e.g., Discord)
@@ -259,6 +348,26 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter
return result?.id;
}
// Credential request card — buttons open a modal for secure input
if (content.type === 'credential_request' && content.credentialId) {
const credentialId = content.credentialId as string;
const card = Card({
title: '🔑 Credential request',
children: [
CardText(content.question as string),
Actions([
Button({ id: `nccr:${credentialId}:enter`, label: 'Enter credential', value: 'enter' }),
Button({ id: `nccr:${credentialId}:reject`, label: 'Reject', value: 'reject' }),
]),
],
});
const result = await adapter.postMessage(tid, {
card,
fallbackText: `Credential request — open in a channel that supports modals.`,
});
return result?.id;
}
// Normal message
const text = (content.markdown as string) || (content.text as string);
if (text) {
+1
View File
@@ -32,6 +32,7 @@ registerChannelAdapter('discord', {
concurrency: 'concurrent',
botToken: env.DISCORD_BOT_TOKEN,
extractReplyContext,
supportsThreads: true,
});
},
});
+1 -1
View File
@@ -15,6 +15,6 @@ registerChannelAdapter('gchat', {
const gchatAdapter = createGoogleChatAdapter({
credentials: JSON.parse(env.GCHAT_CREDENTIALS),
});
return createChatSdkBridge({ adapter: gchatAdapter, concurrency: 'concurrent' });
return createChatSdkBridge({ adapter: gchatAdapter, concurrency: 'concurrent', supportsThreads: true });
},
});
+1 -1
View File
@@ -17,6 +17,6 @@ registerChannelAdapter('github', {
token: env.GITHUB_TOKEN,
webhookSecret: env.GITHUB_WEBHOOK_SECRET,
});
return createChatSdkBridge({ adapter: githubAdapter, concurrency: 'queue' });
return createChatSdkBridge({ adapter: githubAdapter, concurrency: 'queue', supportsThreads: true });
},
});
+1 -1
View File
@@ -24,6 +24,6 @@ registerChannelAdapter('imessage', {
const imessageAdapter = Object.assign(rawAdapter, {
channelIdFromThreadId: (threadId: string) => threadId,
});
return createChatSdkBridge({ adapter: imessageAdapter, concurrency: 'concurrent' });
return createChatSdkBridge({ adapter: imessageAdapter, concurrency: 'concurrent', supportsThreads: false });
},
});
+1 -1
View File
@@ -17,6 +17,6 @@ registerChannelAdapter('linear', {
apiKey: env.LINEAR_API_KEY,
webhookSecret: env.LINEAR_WEBHOOK_SECRET,
});
return createChatSdkBridge({ adapter: linearAdapter, concurrency: 'queue' });
return createChatSdkBridge({ adapter: linearAdapter, concurrency: 'queue', supportsThreads: true });
},
});
+1 -1
View File
@@ -18,6 +18,6 @@ registerChannelAdapter('matrix', {
if (env.MATRIX_USER_ID) process.env.MATRIX_USER_ID = env.MATRIX_USER_ID;
if (env.MATRIX_BOT_USERNAME) process.env.MATRIX_BOT_USERNAME = env.MATRIX_BOT_USERNAME;
const matrixAdapter = createMatrixAdapter();
return createChatSdkBridge({ adapter: matrixAdapter, concurrency: 'concurrent' });
return createChatSdkBridge({ adapter: matrixAdapter, concurrency: 'concurrent', supportsThreads: false });
},
});
+1 -1
View File
@@ -18,6 +18,6 @@ registerChannelAdapter('resend', {
fromName: env.RESEND_FROM_NAME,
webhookSecret: env.RESEND_WEBHOOK_SECRET,
});
return createChatSdkBridge({ adapter: resendAdapter, concurrency: 'queue' });
return createChatSdkBridge({ adapter: resendAdapter, concurrency: 'queue', supportsThreads: false });
},
});
+1 -1
View File
@@ -16,6 +16,6 @@ registerChannelAdapter('slack', {
botToken: env.SLACK_BOT_TOKEN,
signingSecret: env.SLACK_SIGNING_SECRET,
});
return createChatSdkBridge({ adapter: slackAdapter, concurrency: 'concurrent' });
return createChatSdkBridge({ adapter: slackAdapter, concurrency: 'concurrent', supportsThreads: true });
},
});
+1 -1
View File
@@ -16,6 +16,6 @@ registerChannelAdapter('teams', {
appId: env.TEAMS_APP_ID,
appPassword: env.TEAMS_APP_PASSWORD,
});
return createChatSdkBridge({ adapter: teamsAdapter, concurrency: 'concurrent' });
return createChatSdkBridge({ adapter: teamsAdapter, concurrency: 'concurrent', supportsThreads: true });
},
});
+6 -1
View File
@@ -26,6 +26,11 @@ registerChannelAdapter('telegram', {
botToken: env.TELEGRAM_BOT_TOKEN,
mode: 'polling',
});
return createChatSdkBridge({ adapter: telegramAdapter, concurrency: 'concurrent', extractReplyContext });
return createChatSdkBridge({
adapter: telegramAdapter,
concurrency: 'concurrent',
extractReplyContext,
supportsThreads: false,
});
},
});
+1 -1
View File
@@ -16,6 +16,6 @@ registerChannelAdapter('webex', {
botToken: env.WEBEX_BOT_TOKEN,
webhookSecret: env.WEBEX_WEBHOOK_SECRET,
});
return createChatSdkBridge({ adapter: webexAdapter, concurrency: 'concurrent' });
return createChatSdkBridge({ adapter: webexAdapter, concurrency: 'concurrent', supportsThreads: true });
},
});
+1 -1
View File
@@ -24,6 +24,6 @@ registerChannelAdapter('whatsapp-cloud', {
appSecret: env.WHATSAPP_APP_SECRET,
verifyToken: env.WHATSAPP_VERIFY_TOKEN,
});
return createChatSdkBridge({ adapter: whatsappAdapter, concurrency: 'concurrent' });
return createChatSdkBridge({ adapter: whatsappAdapter, concurrency: 'concurrent', supportsThreads: false });
},
});
+36 -5
View File
@@ -21,6 +21,7 @@ import {
markContainerStopped,
sessionDir,
writeDestinations,
writeSessionRouting,
} from './session-manager.js';
import type { AgentGroup, Session } from './types.js';
@@ -35,6 +36,16 @@ interface VolumeMount {
/** Active containers tracked by session ID. */
const activeContainers = new Map<string, { process: ChildProcess; containerName: string }>();
/**
* In-flight wake promises, keyed by session id. Deduplicates concurrent
* `wakeContainer` calls while the first spawn is still mid-setup (async
* buildContainerArgs, OneCLI gateway apply, etc.) — otherwise a second
* wake in that window passes the `activeContainers.has` check and spawns
* a duplicate container against the same session directory, producing
* racy double-replies.
*/
const wakePromises = new Map<string, Promise<void>>();
export function getActiveContainerCount(): number {
return activeContainers.size;
}
@@ -44,27 +55,47 @@ export function isContainerRunning(sessionId: string): boolean {
}
/**
* Wake up a container for a session. If already running, no-op.
* Wake up a container for a session. If already running or mid-spawn, no-op
* (the in-flight wake promise is reused).
*
* The container runs the v2 agent-runner which polls the session DB.
*/
export async function wakeContainer(session: Session): Promise<void> {
export function wakeContainer(session: Session): Promise<void> {
if (activeContainers.has(session.id)) {
log.debug('Container already running', { sessionId: session.id });
return;
return Promise.resolve();
}
const existing = wakePromises.get(session.id);
if (existing) {
log.debug('Container wake already in-flight — joining existing promise', { sessionId: session.id });
return existing;
}
const promise = spawnContainer(session).finally(() => {
wakePromises.delete(session.id);
});
wakePromises.set(session.id, promise);
return promise;
}
async function spawnContainer(session: Session): Promise<void> {
const agentGroup = getAgentGroup(session.agent_group_id);
if (!agentGroup) {
log.error('Agent group not found', { agentGroupId: session.agent_group_id });
return;
}
// Refresh the destination map so any admin changes take effect on wake
// Refresh the destination map and default reply routing so any admin
// changes take effect on wake.
writeDestinations(agentGroup.id, session.id);
writeSessionRouting(agentGroup.id, session.id);
const mounts = buildMounts(agentGroup, session);
const containerName = `nanoclaw-v2-${agentGroup.folder}-${Date.now()}`;
const agentIdentifier = agentGroup.is_admin ? undefined : agentGroup.folder.toLowerCase().replace(/_/g, '-');
// OneCLI agent identifier is the agent group id. The admin group uses OneCLI's
// default agent (undefined), so unscoped credentials apply. Non-admin groups
// use their stable ag-xxx id, which is reversible via getAgentGroup() for
// approval-request routing.
const agentIdentifier = agentGroup.is_admin ? undefined : agentGroup.id;
const args = await buildContainerArgs(mounts, containerName, session, agentGroup, agentIdentifier);
log.info('Spawning container', { sessionId: session.id, agentGroup: agentGroup.name, containerName });
+312
View File
@@ -0,0 +1,312 @@
/**
* Credential collection flow.
*
* Agent calls `trigger_credential_collection` — container writes a system
* action `request_credential` into outbound.db. This module:
*
* 1. Delivers an `[Enter credential] [Reject]` card to the admin channel.
* 2. On "Enter credential" click, the Chat SDK bridge opens a modal with a
* TextInput, captures the user's value in `onModalSubmit`, and calls
* `handleCredentialSubmit()` here.
* 3. We insert the secret into OneCLI and write a system chat message into
* the agent's session DB so the blocking MCP tool call returns.
* 4. The credential value never enters any session DB or log line.
*/
import {
createPendingCredential,
deletePendingCredential,
getPendingCredential as getPendingCredentialRow,
updatePendingCredentialMessageId,
updatePendingCredentialStatus,
} from './db/credentials.js';
import { getMessagingGroup } from './db/messaging-groups.js';
import type { ChannelDeliveryAdapter } from './delivery.js';
import { log } from './log.js';
import { createSecret, OneCLISecretError } from './onecli-secrets.js';
import { writeSessionMessage } from './session-manager.js';
import type { PendingCredential, Session } from './types.js';
import { wakeContainer } from './container-runner.js';
let adapterRef: ChannelDeliveryAdapter | null = null;
export function setCredentialDeliveryAdapter(adapter: ChannelDeliveryAdapter): void {
adapterRef = adapter;
}
/** Handle a `request_credential` system action from a container. */
export async function handleCredentialRequest(
content: Record<string, unknown>,
session: Session,
): Promise<void> {
if (!adapterRef) {
notifyAgentCredentialResult(session, content.credentialId as string, 'failed', 'delivery adapter not ready');
return;
}
const credentialId = (content.credentialId as string) || '';
const name = (content.name as string) || '';
const type = ((content.type as string) || 'generic') as 'generic' | 'anthropic';
const hostPattern = (content.hostPattern as string) || '';
const pathPattern = (content.pathPattern as string) || null;
const headerName = (content.headerName as string) || null;
const valueFormat = (content.valueFormat as string) || null;
const description = (content.description as string) || null;
if (!credentialId || !name || !hostPattern) {
notifyAgentCredentialResult(
session,
credentialId,
'failed',
'name and hostPattern are required',
);
return;
}
// Deliver the credential card to the channel where the conversation is
// happening — not the admin channel. The user triggered this request by
// chatting with the agent, so the response surface is their chat channel.
if (!session.messaging_group_id) {
notifyAgentCredentialResult(
session,
credentialId,
'failed',
'session has no messaging group — cannot deliver credential card',
);
return;
}
const mg = getMessagingGroup(session.messaging_group_id);
if (!mg) {
notifyAgentCredentialResult(session, credentialId, 'failed', 'messaging group not found');
return;
}
createPendingCredential({
id: credentialId,
agent_group_id: session.agent_group_id,
session_id: session.id,
name,
type,
host_pattern: hostPattern,
path_pattern: pathPattern,
header_name: headerName,
value_format: valueFormat,
description,
channel_type: mg.channel_type,
platform_id: mg.platform_id,
platform_message_id: null,
status: 'pending',
created_at: new Date().toISOString(),
});
const question = buildCardText({
name,
hostPattern,
headerName,
valueFormat,
description,
});
let platformMessageId: string | undefined;
try {
platformMessageId = await adapterRef.deliver(
mg.channel_type,
mg.platform_id,
session.thread_id,
'chat-sdk',
JSON.stringify({
type: 'credential_request',
credentialId,
question,
}),
);
} catch (err) {
log.error('Failed to deliver credential request card', { credentialId, err });
updatePendingCredentialStatus(credentialId, 'failed');
notifyAgentCredentialResult(session, credentialId, 'failed', 'could not deliver card');
return;
}
if (platformMessageId) {
updatePendingCredentialMessageId(credentialId, platformMessageId);
}
log.info('Credential request delivered', { credentialId, name, hostPattern });
}
/** Called by chat-sdk-bridge to fetch metadata for building the modal. */
export function getCredentialForModal(
credentialId: string,
): { name: string; description: string | null; hostPattern: string } | null {
const row = getPendingCredentialRow(credentialId);
if (!row || row.status !== 'pending') return null;
return { name: row.name, description: row.description, hostPattern: row.host_pattern };
}
/** Admin clicked "Reject" on the card (or cancelled the modal). */
export async function handleCredentialReject(credentialId: string): Promise<void> {
const row = getPendingCredentialRow(credentialId);
if (!row) return;
updatePendingCredentialStatus(credentialId, 'rejected');
if (row.session_id) {
await notifyAgentSessionResult(
row.agent_group_id,
row.session_id,
credentialId,
'rejected',
`Credential request for ${row.name} was rejected by admin.`,
);
}
deletePendingCredential(credentialId);
log.info('Credential request rejected', { credentialId });
}
/**
* Admin submitted the modal with a credential value.
* The value is held only long enough to call OneCLI and is then dropped.
*/
export async function handleCredentialSubmit(credentialId: string, value: string): Promise<void> {
const row = getPendingCredentialRow(credentialId);
if (!row) {
log.warn('Credential submit for unknown id', { credentialId });
return;
}
if (row.status !== 'pending') {
log.warn('Credential submit for non-pending row', { credentialId, status: row.status });
return;
}
updatePendingCredentialStatus(credentialId, 'submitted');
try {
await createSecret({
name: row.name,
type: row.type,
value,
hostPattern: row.host_pattern,
pathPattern: row.path_pattern ?? undefined,
headerName: row.header_name ?? undefined,
valueFormat: row.value_format ?? undefined,
agentId: row.agent_group_id, // honored once OneCLI SDK adds scoping
});
} catch (err) {
const reason = err instanceof OneCLISecretError ? err.message : String(err);
log.error('Failed to create OneCLI secret', { credentialId, reason });
updatePendingCredentialStatus(credentialId, 'failed');
if (row.session_id) {
await notifyAgentSessionResult(
row.agent_group_id,
row.session_id,
credentialId,
'failed',
`Credential save failed: ${reason}`,
);
}
deletePendingCredential(credentialId);
return;
}
updatePendingCredentialStatus(credentialId, 'saved');
log.info('Credential saved', { credentialId, name: row.name, hostPattern: row.host_pattern });
if (row.session_id) {
await notifyAgentSessionResult(
row.agent_group_id,
row.session_id,
credentialId,
'saved',
`Credential "${row.name}" saved (host pattern: ${row.host_pattern}).`,
);
}
deletePendingCredential(credentialId);
}
/**
* Fallback for inbound channels that don't support modals — the bridge calls
* this when `event.openModal()` is unavailable or returned undefined.
*/
export async function handleCredentialChannelUnsupported(credentialId: string): Promise<void> {
const row = getPendingCredentialRow(credentialId);
if (!row) return;
updatePendingCredentialStatus(credentialId, 'failed');
if (row.session_id) {
await notifyAgentSessionResult(
row.agent_group_id,
row.session_id,
credentialId,
'failed',
`This channel doesn't support credential collection modals. Use Slack, Discord, Teams, or Google Chat.`,
);
}
deletePendingCredential(credentialId);
}
function notifyAgentCredentialResult(
session: Session,
credentialId: string,
status: 'saved' | 'rejected' | 'failed',
detail: string,
): void {
writeSessionMessage(session.agent_group_id, session.id, {
id: `cred-${credentialId}-${Date.now()}`,
kind: 'system',
timestamp: new Date().toISOString(),
platformId: session.agent_group_id,
channelType: 'agent',
threadId: null,
content: JSON.stringify({
type: 'credential_response',
credentialId,
status,
detail,
}),
});
}
async function notifyAgentSessionResult(
agentGroupId: string,
sessionId: string,
credentialId: string,
status: 'saved' | 'rejected' | 'failed',
detail: string,
): Promise<void> {
writeSessionMessage(agentGroupId, sessionId, {
id: `cred-${credentialId}-${Date.now()}`,
kind: 'system',
timestamp: new Date().toISOString(),
platformId: agentGroupId,
channelType: 'agent',
threadId: null,
content: JSON.stringify({
type: 'credential_response',
credentialId,
status,
detail,
}),
});
const { getSession } = await import('./db/sessions.js');
const session = getSession(sessionId);
if (session) await wakeContainer(session);
}
function buildCardText(opts: {
name: string;
hostPattern: string;
headerName: string | null;
valueFormat: string | null;
description: string | null;
}): string {
const lines = [
`🔑 Credential request: ${opts.name}`,
'',
`Host: \`${opts.hostPattern}\``,
];
if (opts.headerName) lines.push(`Header: \`${opts.headerName}\``);
if (opts.valueFormat) lines.push(`Format: \`${opts.valueFormat}\``);
if (opts.description) lines.push('', opts.description);
lines.push('', 'Click Enter credential to provide the value, or Reject to decline.');
return lines.join('\n');
}
+33
View File
@@ -0,0 +1,33 @@
import type { PendingCredential, PendingCredentialStatus } from '../types.js';
import { getDb } from './connection.js';
export function createPendingCredential(c: PendingCredential): void {
getDb()
.prepare(
`INSERT INTO pending_credentials
(id, agent_group_id, session_id, name, type, host_pattern, path_pattern,
header_name, value_format, description, channel_type, platform_id,
platform_message_id, status, created_at)
VALUES
(@id, @agent_group_id, @session_id, @name, @type, @host_pattern, @path_pattern,
@header_name, @value_format, @description, @channel_type, @platform_id,
@platform_message_id, @status, @created_at)`,
)
.run(c);
}
export function getPendingCredential(id: string): PendingCredential | undefined {
return getDb().prepare('SELECT * FROM pending_credentials WHERE id = ?').get(id) as PendingCredential | undefined;
}
export function updatePendingCredentialStatus(id: string, status: PendingCredentialStatus): void {
getDb().prepare('UPDATE pending_credentials SET status = ? WHERE id = ?').run(status, id);
}
export function updatePendingCredentialMessageId(id: string, platformMessageId: string): void {
getDb().prepare('UPDATE pending_credentials SET platform_message_id = ? WHERE id = ?').run(platformMessageId, id);
}
export function deletePendingCredential(id: string): void {
getDb().prepare('DELETE FROM pending_credentials WHERE id = ?').run(id);
}
-6
View File
@@ -58,12 +58,6 @@ describe('migrations', () => {
runMigrations(db);
});
it('should track schema version', () => {
const db = initTestDb();
runMigrations(db);
const row = db.prepare('SELECT MAX(version) as v FROM schema_version').get() as { v: number };
expect(row.v).toBe(4);
});
});
// ── Agent Groups ──
+12
View File
@@ -36,4 +36,16 @@ export {
createPendingQuestion,
getPendingQuestion,
deletePendingQuestion,
createPendingApproval,
getPendingApproval,
updatePendingApprovalStatus,
deletePendingApproval,
getPendingApprovalsByAction,
} from './sessions.js';
export {
createPendingCredential,
getPendingCredential,
updatePendingCredentialStatus,
updatePendingCredentialMessageId,
deletePendingCredential,
} from './credentials.js';
+27 -6
View File
@@ -1,18 +1,39 @@
import type { Migration } from './index.js';
/**
* `pending_approvals` table — host-side records for any approval-requiring
* request. Used by:
* - install_packages / request_rebuild / add_mcp_server (session-bound,
* `session_id` set, status stays at default 'pending' until handled)
* - OneCLI credential approvals from the SDK `configureManualApproval`
* callback (session_id may be null, action='onecli_credential').
*
* The OneCLI-specific columns (`agent_group_id`, `channel_type`, `platform_id`,
* `platform_message_id`, `expires_at`, `status`) let the host edit the admin
* card when a request expires and sweep stale rows on startup.
*/
export const migration003: Migration = {
version: 3,
name: 'pending-approvals',
up(db) {
db.exec(`
CREATE TABLE pending_approvals (
approval_id TEXT PRIMARY KEY,
session_id TEXT NOT NULL REFERENCES sessions(id),
request_id TEXT NOT NULL,
action TEXT NOT NULL,
payload TEXT NOT NULL,
created_at TEXT NOT NULL
approval_id TEXT PRIMARY KEY,
session_id TEXT REFERENCES sessions(id),
request_id TEXT NOT NULL,
action TEXT NOT NULL,
payload TEXT NOT NULL,
created_at TEXT NOT NULL,
agent_group_id TEXT REFERENCES agent_groups(id),
channel_type TEXT,
platform_id TEXT,
platform_message_id TEXT,
expires_at TEXT,
status TEXT NOT NULL DEFAULT 'pending'
);
CREATE INDEX idx_pending_approvals_action_status
ON pending_approvals(action, status);
`);
},
};
@@ -0,0 +1,34 @@
import type { Migration } from './index.js';
/**
* `pending_credentials` — backs the trigger_credential_collection flow.
* One row per in-flight credential request; status transitions
* pending → submitted → saved | rejected | failed.
*/
export const migration005: Migration = {
version: 5,
name: 'pending-credentials',
up(db) {
db.exec(`
CREATE TABLE pending_credentials (
id TEXT PRIMARY KEY,
agent_group_id TEXT NOT NULL REFERENCES agent_groups(id),
session_id TEXT REFERENCES sessions(id),
name TEXT NOT NULL,
type TEXT NOT NULL,
host_pattern TEXT NOT NULL,
path_pattern TEXT,
header_name TEXT,
value_format TEXT,
description TEXT,
channel_type TEXT NOT NULL,
platform_id TEXT NOT NULL,
platform_message_id TEXT,
status TEXT NOT NULL DEFAULT 'pending',
created_at TEXT NOT NULL
);
CREATE INDEX idx_pending_credentials_status ON pending_credentials(status);
`);
},
};
+2 -1
View File
@@ -5,6 +5,7 @@ import { migration001 } from './001-initial.js';
import { migration002 } from './002-chat-sdk-state.js';
import { migration003 } from './003-pending-approvals.js';
import { migration004 } from './004-agent-destinations.js';
import { migration005 } from './005-pending-credentials.js';
export interface Migration {
version: number;
@@ -12,7 +13,7 @@ export interface Migration {
up: (db: Database.Database) => void;
}
const migrations: Migration[] = [migration001, migration002, migration003, migration004];
const migrations: Migration[] = [migration001, migration002, migration003, migration004, migration005];
export function runMigrations(db: Database.Database): void {
db.exec(`
+12
View File
@@ -114,6 +114,18 @@ CREATE TABLE destinations (
platform_id TEXT, -- for type='channel'
agent_group_id TEXT -- for type='agent'
);
-- Default reply routing for this session. Single-row table (id=1).
-- Host overwrites on every container wake from the session's messaging_group
-- and thread_id. Container reads it in send_message / ask_user_question /
-- trigger_credential_collection to default the channel/thread of outbound
-- messages when the agent doesn't specify an explicit destination.
CREATE TABLE session_routing (
id INTEGER PRIMARY KEY CHECK (id = 1),
channel_type TEXT,
platform_id TEXT,
thread_id TEXT
);
`;
/** Container-owned: outbound messages + processing acknowledgments. */
+25 -4
View File
@@ -93,13 +93,26 @@ export function deletePendingQuestion(questionId: string): void {
// ── Pending Approvals ──
export function createPendingApproval(pa: PendingApproval): void {
export function createPendingApproval(pa: Partial<PendingApproval> & Pick<PendingApproval, 'approval_id' | 'request_id' | 'action' | 'payload' | 'created_at'>): void {
getDb()
.prepare(
`INSERT INTO pending_approvals (approval_id, session_id, request_id, action, payload, created_at)
VALUES (@approval_id, @session_id, @request_id, @action, @payload, @created_at)`,
`INSERT INTO pending_approvals
(approval_id, session_id, request_id, action, payload, created_at,
agent_group_id, channel_type, platform_id, platform_message_id, expires_at, status)
VALUES
(@approval_id, @session_id, @request_id, @action, @payload, @created_at,
@agent_group_id, @channel_type, @platform_id, @platform_message_id, @expires_at, @status)`,
)
.run(pa);
.run({
session_id: null,
agent_group_id: null,
channel_type: null,
platform_id: null,
platform_message_id: null,
expires_at: null,
status: 'pending',
...pa,
});
}
export function getPendingApproval(approvalId: string): PendingApproval | undefined {
@@ -108,6 +121,14 @@ export function getPendingApproval(approvalId: string): PendingApproval | undefi
| undefined;
}
export function updatePendingApprovalStatus(approvalId: string, status: PendingApproval['status']): void {
getDb().prepare('UPDATE pending_approvals SET status = ? WHERE approval_id = ?').run(status, approvalId);
}
export function deletePendingApproval(approvalId: string): void {
getDb().prepare('DELETE FROM pending_approvals WHERE approval_id = ?').run(approvalId);
}
export function getPendingApprovalsByAction(action: string): PendingApproval[] {
return getDb().prepare('SELECT * FROM pending_approvals WHERE action = ?').all(action) as PendingApproval[];
}
+6
View File
@@ -720,6 +720,12 @@ async function handleSystemAction(
break;
}
case 'request_credential': {
const { handleCredentialRequest } = await import('./credentials.js');
await handleCredentialRequest(content, session);
break;
}
default:
log.warn('Unknown system action', { action });
}
+61 -4
View File
@@ -13,6 +13,19 @@ import { getMessagingGroupsByChannel, getMessagingGroupAgents } from './db/messa
import { ensureContainerRuntimeRunning, cleanupOrphans } from './container-runtime.js';
import { startActiveDeliveryPoll, startSweepDeliveryPoll, setDeliveryAdapter, stopDeliveryPolls } from './delivery.js';
import { startHostSweep, stopHostSweep } from './host-sweep.js';
import {
ONECLI_ACTION,
resolveOneCLIApproval,
startOneCLIApprovalHandler,
stopOneCLIApprovalHandler,
} from './onecli-approvals.js';
import {
getCredentialForModal,
handleCredentialChannelUnsupported,
handleCredentialReject,
handleCredentialSubmit,
setCredentialDeliveryAdapter,
} from './credentials.js';
import { routeInbound } from './router.js';
import {
getPendingQuestion,
@@ -79,12 +92,35 @@ async function main(): Promise<void> {
log.error('Failed to handle question response', { questionId, err });
});
},
getCredentialForModal,
onCredentialReject(credentialId) {
handleCredentialReject(credentialId).catch((err) =>
log.error('Failed to handle credential reject', { credentialId, err }),
);
},
onCredentialSubmit(credentialId, value) {
handleCredentialSubmit(credentialId, value).catch((err) =>
log.error('Failed to handle credential submit', { credentialId, err }),
);
},
onCredentialChannelUnsupported(credentialId) {
handleCredentialChannelUnsupported(credentialId).catch((err) =>
log.error('Failed to handle credential channel-unsupported', { credentialId, err }),
);
},
};
});
// 4. Delivery adapter bridge — dispatches to channel adapters
setDeliveryAdapter({
async deliver(channelType, platformId, threadId, kind, content, files) {
const deliveryAdapter = {
async deliver(
channelType: string,
platformId: string,
threadId: string | null,
kind: string,
content: string,
files?: import('./channels/adapter.js').OutboundFile[],
): Promise<string | undefined> {
const adapter = getChannelAdapter(channelType);
if (!adapter) {
log.warn('No adapter for channel type', { channelType });
@@ -92,11 +128,13 @@ async function main(): Promise<void> {
}
return adapter.deliver(platformId, threadId, { kind, content: JSON.parse(content), files });
},
async setTyping(channelType, platformId, threadId) {
async setTyping(channelType: string, platformId: string, threadId: string | null): Promise<void> {
const adapter = getChannelAdapter(channelType);
await adapter?.setTyping?.(platformId, threadId);
},
});
};
setDeliveryAdapter(deliveryAdapter);
setCredentialDeliveryAdapter(deliveryAdapter);
// 5. Start delivery polls
startActiveDeliveryPoll();
@@ -107,6 +145,9 @@ async function main(): Promise<void> {
startHostSweep();
log.info('Host sweep started');
// 7. Start OneCLI manual-approval handler
startOneCLIApprovalHandler(deliveryAdapter);
log.info('NanoClaw v2 running');
}
@@ -134,9 +175,20 @@ function buildConversationConfigs(channelType: string): ConversationConfig[] {
/** Handle a user's response to an ask_user_question card or an approval card. */
async function handleQuestionResponse(questionId: string, selectedOption: string, userId: string): Promise<void> {
// OneCLI credential approvals — resolved via in-memory Promise, not session DB
if (resolveOneCLIApproval(questionId, selectedOption)) {
return;
}
// Check if this is a pending approval (install_packages, request_rebuild)
const approval = getPendingApproval(questionId);
if (approval) {
if (approval.action === ONECLI_ACTION) {
// Row exists but the in-memory resolver is gone (timer fired or process
// was in a weird state). Nothing to do — just drop the row.
deletePendingApproval(questionId);
return;
}
await handleApprovalResponse(approval, selectedOption, userId);
return;
}
@@ -188,6 +240,10 @@ async function handleApprovalResponse(
selectedOption: string,
userId: string,
): Promise<void> {
if (!approval.session_id) {
deletePendingApproval(approval.approval_id);
return;
}
const session = getSession(approval.session_id);
if (!session) {
deletePendingApproval(approval.approval_id);
@@ -262,6 +318,7 @@ async function handleApprovalResponse(
/** Graceful shutdown. */
async function shutdown(signal: string): Promise<void> {
log.info('Shutdown signal received', { signal });
stopOneCLIApprovalHandler();
stopDeliveryPolls();
stopHostSweep();
await teardownChannelAdapters();
+252
View File
@@ -0,0 +1,252 @@
/**
* OneCLI manual-approval handler.
*
* When the OneCLI gateway intercepts a credentialed request that needs human
* approval, it holds the HTTP connection open and fires our `configureManualApproval`
* callback. We:
* 1. Deliver an ask_question card to the admin channel (same routing as
* `requestApproval()` — global admin agent group's first messaging group).
* 2. Persist a `pending_approvals` row (action='onecli_credential') so we can
* edit the card on expiry and sweep stale rows at startup.
* 3. Wait on an in-memory Promise: resolved by the admin click
* (`resolveOneCLIApproval`) or by a local expiry timer.
* 4. On expiry, edit the card to "Expired" and return 'deny' — the gateway's
* HTTP side will have already closed, but we need to release the Promise
* so the SDK callback returns cleanly.
*
* Startup sweep edits any leftover cards from a previous process to
* "Expired (host restarted)" and drops the rows.
*/
import { OneCLI, type ApprovalRequest, type ManualApprovalHandle } from '@onecli-sh/sdk';
import { ONECLI_URL } from './config.js';
import { getAdminAgentGroup, getAgentGroup } from './db/agent-groups.js';
import { getMessagingGroupsByAgentGroup } from './db/messaging-groups.js';
import {
createPendingApproval,
deletePendingApproval,
getPendingApprovalsByAction,
updatePendingApprovalStatus,
} from './db/sessions.js';
import type { ChannelDeliveryAdapter } from './delivery.js';
import { log } from './log.js';
import type { PendingApproval } from './types.js';
export const ONECLI_ACTION = 'onecli_credential';
type Decision = 'approve' | 'deny';
const onecli = new OneCLI({ url: ONECLI_URL });
interface PendingState {
resolve: (decision: Decision) => void;
timer: NodeJS.Timeout;
}
const pending = new Map<string, PendingState>();
let handle: ManualApprovalHandle | null = null;
let adapterRef: ChannelDeliveryAdapter | null = null;
/**
* Generate a short approval id for card buttons.
*
* OneCLI's native request.id is a UUID (36 bytes). When we put it into a card
* button's action id as `ncq:<uuid>:Approve`, Chat SDK's Telegram adapter then
* serializes both `id` and `value` into the Telegram `callback_data` field,
* which has a hard 64-byte limit. UUIDs push past that limit.
*
* Instead we generate a 10-byte id (`oa-` + 8 base36 chars) for the card, and
* keep the OneCLI request.id in the persisted payload for audit. The pending
* map, DB row, and button callback all use this short id; click handling
* looks up the short id and resolves the Promise that was waiting on it.
*/
function shortApprovalId(): string {
return `oa-${Math.random().toString(36).slice(2, 10)}`;
}
/** Called from the main `handleQuestionResponse` path when a card button is clicked. */
export function resolveOneCLIApproval(approvalId: string, selectedOption: string): boolean {
const state = pending.get(approvalId);
if (!state) return false;
pending.delete(approvalId);
clearTimeout(state.timer);
const decision: Decision = selectedOption === 'Approve' ? 'approve' : 'deny';
updatePendingApprovalStatus(approvalId, decision === 'approve' ? 'approved' : 'rejected');
// Card is auto-edited to "✅ <option>" by chat-sdk-bridge's onAction handler,
// so we don't need to deliver an edit here.
deletePendingApproval(approvalId);
state.resolve(decision);
log.info('OneCLI approval resolved', { approvalId, decision });
return true;
}
export function startOneCLIApprovalHandler(deliveryAdapter: ChannelDeliveryAdapter): void {
if (handle) return;
adapterRef = deliveryAdapter;
// Sweep any rows left over from a previous process.
sweepStaleApprovals().catch((err) => log.error('OneCLI approval sweep failed', { err }));
handle = onecli.configureManualApproval(async (request: ApprovalRequest): Promise<Decision> => {
try {
return await handleRequest(request);
} catch (err) {
log.error('OneCLI approval handler errored', { id: request.id, err });
return 'deny';
}
});
log.info('OneCLI approval handler started');
}
export function stopOneCLIApprovalHandler(): void {
handle?.stop();
handle = null;
for (const state of pending.values()) {
clearTimeout(state.timer);
}
pending.clear();
adapterRef = null;
}
async function handleRequest(request: ApprovalRequest): Promise<Decision> {
if (!adapterRef) return 'deny';
// Same routing as requestApproval(): global admin agent group's first messaging group.
// Per-group routing is a follow-up (see admin-model refactor in docs/v2-checklist.md).
const adminGroup = getAdminAgentGroup();
const adminMGs = adminGroup ? getMessagingGroupsByAgentGroup(adminGroup.id) : [];
if (adminMGs.length === 0) {
log.warn('OneCLI approval auto-denied: no admin channel configured', {
id: request.id,
host: request.host,
agent: request.agent.externalId,
});
return 'deny';
}
const adminChannel = adminMGs[0];
// Resolve the originating agent group (for logging / future per-group routing).
const originGroup = request.agent.externalId ? getAgentGroup(request.agent.externalId) : adminGroup;
const agentGroupId = originGroup?.id ?? null;
// Use a short id for the card/button so Chat SDK's Telegram adapter can
// fit everything inside the 64-byte callback_data limit. The OneCLI
// request.id stays in the payload for audit.
const approvalId = shortApprovalId();
const question = buildQuestion(request, originGroup?.name ?? request.agent.name);
let platformMessageId: string | undefined;
try {
platformMessageId = await adapterRef.deliver(
adminChannel.channel_type,
adminChannel.platform_id,
null,
'chat-sdk',
JSON.stringify({
type: 'ask_question',
questionId: approvalId,
question,
options: ['Approve', 'Reject'],
}),
);
} catch (err) {
log.error('Failed to deliver OneCLI approval card', { approvalId, oneCliRequestId: request.id, err });
return 'deny';
}
createPendingApproval({
approval_id: approvalId,
session_id: null,
request_id: request.id,
action: ONECLI_ACTION,
payload: JSON.stringify({
oneCliRequestId: request.id,
method: request.method,
host: request.host,
path: request.path,
bodyPreview: request.bodyPreview,
agent: request.agent,
}),
created_at: new Date().toISOString(),
agent_group_id: agentGroupId,
channel_type: adminChannel.channel_type,
platform_id: adminChannel.platform_id,
platform_message_id: platformMessageId ?? null,
expires_at: request.expiresAt,
status: 'pending',
});
// Expiry timer fires just before the gateway's own TTL so our decision lands
// in time to be recorded, even though the HTTP side will already be closing.
const expiresAtMs = new Date(request.expiresAt).getTime();
const timeoutMs = Math.max(1000, expiresAtMs - Date.now() - 1000);
return new Promise<Decision>((resolve) => {
const timer = setTimeout(() => {
if (!pending.has(approvalId)) return;
pending.delete(approvalId);
expireApproval(approvalId, 'no response').catch((err) =>
log.error('Failed to mark OneCLI approval expired', { approvalId, err }),
);
resolve('deny');
}, timeoutMs);
pending.set(approvalId, { resolve, timer });
});
}
async function expireApproval(approvalId: string, reason: string): Promise<void> {
const rows = getPendingApprovalsByAction(ONECLI_ACTION).filter((r) => r.approval_id === approvalId);
const row = rows[0];
if (!row) return;
updatePendingApprovalStatus(approvalId, 'expired');
await editCardExpired(row, reason);
deletePendingApproval(approvalId);
log.info('OneCLI approval expired', { approvalId, reason });
}
async function editCardExpired(row: PendingApproval, reason: string): Promise<void> {
if (!adapterRef || !row.platform_message_id || !row.channel_type || !row.platform_id) return;
try {
await adapterRef.deliver(
row.channel_type,
row.platform_id,
null,
'chat-sdk',
JSON.stringify({
operation: 'edit',
messageId: row.platform_message_id,
text: `Expired (${reason})`,
}),
);
} catch (err) {
log.warn('Failed to edit expired OneCLI approval card', { approvalId: row.approval_id, err });
}
}
async function sweepStaleApprovals(): Promise<void> {
const rows = getPendingApprovalsByAction(ONECLI_ACTION);
if (rows.length === 0) return;
log.info('Sweeping stale OneCLI approvals from previous process', { count: rows.length });
for (const row of rows) {
await editCardExpired(row, 'host restarted');
deletePendingApproval(row.approval_id);
}
}
function buildQuestion(request: ApprovalRequest, agentName: string): string {
const lines = [
'Credential access request',
`Agent: ${agentName}`,
'```',
`${request.method} ${request.host}${request.path}`,
'```',
];
if (request.bodyPreview) {
lines.push('Body:', '```', request.bodyPreview, '```');
}
return lines.join('\n');
}
+84
View File
@@ -0,0 +1,84 @@
/**
* OneCLI secrets facade.
*
* @onecli-sh/sdk 0.3.1 does not yet expose secret management. This module wraps
* the `onecli secrets create` CLI so the rest of the codebase can call
* `createSecret(...)` with the same shape we expect the SDK to ship with.
*
* When the SDK adds secret management, replace the body of `createSecret()`
* with the SDK call and delete the CLI plumbing below. Nothing else in
* NanoClaw should need to change — the public types here mirror the
* anticipated SDK surface.
*/
import { execFile } from 'child_process';
export interface CreateSecretInput {
name: string;
type: 'generic' | 'anthropic';
value: string;
hostPattern: string;
pathPattern?: string;
headerName?: string;
valueFormat?: string;
/**
* Agent scoping. Not supported by current OneCLI CLI — included here so
* callers can pass it today and it becomes live when the SDK adds scoping.
*/
agentId?: string;
}
export interface CreateSecretResponse {
id: string;
name: string;
hostPattern: string;
}
export class OneCLISecretError extends Error {
constructor(message: string) {
super(message);
this.name = 'OneCLISecretError';
}
}
export async function createSecret(input: CreateSecretInput): Promise<CreateSecretResponse> {
const payload: Record<string, unknown> = {
name: input.name,
type: input.type,
value: input.value,
hostPattern: input.hostPattern,
};
if (input.pathPattern) payload.pathPattern = input.pathPattern;
if (input.headerName || input.valueFormat) {
payload.injectionConfig = {
...(input.headerName && { headerName: input.headerName }),
...(input.valueFormat && { valueFormat: input.valueFormat }),
};
}
const stdout = await runOnecli(['secrets', 'create', '--json', JSON.stringify(payload)]);
let parsed: unknown;
try {
parsed = JSON.parse(stdout);
} catch {
throw new OneCLISecretError(`onecli returned non-JSON: ${stdout.slice(0, 200)}`);
}
const result = parsed as { id?: string; name?: string; hostPattern?: string; error?: string };
if (result.error) throw new OneCLISecretError(result.error);
return {
id: result.id ?? '',
name: result.name ?? input.name,
hostPattern: result.hostPattern ?? input.hostPattern,
};
}
function runOnecli(args: string[]): Promise<string> {
return new Promise((resolve, reject) => {
execFile('onecli', args, { timeout: 15_000 }, (error, stdout, stderr) => {
if (error) {
reject(new OneCLISecretError(stderr || error.message));
return;
}
resolve(stdout);
});
});
}
+22 -2
View File
@@ -4,6 +4,7 @@
* Channel adapter event → resolve messaging group → resolve agent group
* → resolve/create session → write messages_in → wake container
*/
import { getChannelAdapter } from './channels/channel-registry.js';
import { getMessagingGroupByPlatform, createMessagingGroup, getMessagingGroupAgents } from './db/messaging-groups.js';
import { triggerTyping } from './delivery.js';
import { log } from './log.js';
@@ -33,6 +34,15 @@ export interface InboundEvent {
* Creates messaging group + session if they don't exist yet.
*/
export async function routeInbound(event: InboundEvent): Promise<void> {
// 0. Apply the adapter's thread policy. Non-threaded adapters (Telegram,
// WhatsApp, iMessage, email) collapse threads to the channel — the
// agent always replies to the main channel regardless of where the
// inbound came from.
const adapter = getChannelAdapter(event.channelType);
if (adapter && !adapter.supportsThreads) {
event = { ...event, threadId: null };
}
// 1. Resolve messaging group
let mg = getMessagingGroupByPlatform(event.channelType, event.platformId);
@@ -79,8 +89,18 @@ export async function routeInbound(event: InboundEvent): Promise<void> {
return;
}
// 3. Resolve or create session
const { session, created } = resolveSession(match.agent_group_id, mg.id, event.threadId, match.session_mode);
// 3. Resolve or create session.
//
// Adapter thread policy overrides the wiring's session_mode: if the adapter
// is threaded, each thread gets its own session regardless of what the
// wiring says, because "thread = session" is the first-class model for
// threaded platforms. Agent-shared is preserved because it expresses a
// cross-channel intent the adapter can't know about.
let effectiveSessionMode = match.session_mode;
if (adapter && adapter.supportsThreads && effectiveSessionMode !== 'agent-shared') {
effectiveSessionMode = 'per-thread';
}
const { session, created } = resolveSession(match.agent_group_id, mg.id, event.threadId, effectiveSessionMode);
// 4. Write message to session DB
writeSessionMessage(session.agent_group_id, session.id, {
+59
View File
@@ -141,6 +141,65 @@ export function initSessionFolder(agentGroupId: string, sessionId: string): void
*
* Uses DELETE + INSERT in a transaction for a clean overwrite.
*/
/**
* Write the default reply routing for a session into its inbound.db.
*
* The container reads this as the default (channel_type, platform_id, thread_id)
* for outbound messages when the agent doesn't specify an explicit destination.
* Derived from session.messaging_group_id → messaging_groups row + session.thread_id.
*
* Called on every container wake alongside writeDestinations() so the latest
* routing is always in place, including after admin rewiring.
*/
export function writeSessionRouting(agentGroupId: string, sessionId: string): void {
const dbPath = inboundDbPath(agentGroupId, sessionId);
if (!fs.existsSync(dbPath)) return;
const session = getSession(sessionId);
if (!session) return;
let channelType: string | null = null;
let platformId: string | null = null;
if (session.messaging_group_id) {
const mg = getMessagingGroup(session.messaging_group_id);
if (mg) {
channelType = mg.channel_type;
platformId = mg.platform_id;
}
}
const db = new Database(dbPath);
db.pragma('journal_mode = DELETE');
db.pragma('busy_timeout = 5000');
try {
// Lightweight forward-compat: create the table for older session DBs
// that predate this column.
db.exec(`
CREATE TABLE IF NOT EXISTS session_routing (
id INTEGER PRIMARY KEY CHECK (id = 1),
channel_type TEXT,
platform_id TEXT,
thread_id TEXT
);
`);
db.prepare(
`INSERT INTO session_routing (id, channel_type, platform_id, thread_id)
VALUES (1, @channel_type, @platform_id, @thread_id)
ON CONFLICT(id) DO UPDATE SET
channel_type = excluded.channel_type,
platform_id = excluded.platform_id,
thread_id = excluded.thread_id`,
).run({
channel_type: channelType,
platform_id: platformId,
thread_id: session.thread_id,
});
} finally {
db.close();
}
log.debug('Session routing written', { sessionId, channelType, platformId, threadId: session.thread_id });
}
export function writeDestinations(agentGroupId: string, sessionId: string): void {
const dbPath = inboundDbPath(agentGroupId, sessionId);
if (!fs.existsSync(dbPath)) return;
+29 -1
View File
@@ -93,11 +93,39 @@ export interface PendingQuestion {
export interface PendingApproval {
approval_id: string;
session_id: string;
session_id: string | null;
request_id: string;
action: string;
payload: string; // JSON
created_at: string;
agent_group_id: string | null;
channel_type: string | null;
platform_id: string | null;
platform_message_id: string | null;
expires_at: string | null;
status: 'pending' | 'approved' | 'rejected' | 'expired';
}
// ── Pending credentials (central DB) ──
export type PendingCredentialStatus = 'pending' | 'submitted' | 'saved' | 'rejected' | 'failed';
export interface PendingCredential {
id: string;
agent_group_id: string;
session_id: string | null;
name: string;
type: 'generic' | 'anthropic';
host_pattern: string;
path_pattern: string | null;
header_name: string | null;
value_format: string | null;
description: string | null;
channel_type: string;
platform_id: string;
platform_message_id: string | null;
status: PendingCredentialStatus;
created_at: string;
}
// ── Agent destinations (central DB) ──