mirror of
https://github.com/qwibitai/nanoclaw.git
synced 2026-06-04 10:14:47 +08:00
16b9499532
Replaces the opaque trigger_rules JSON + response_scope enum on
messaging_group_agents with four explicit orthogonal columns:
engage_mode 'pattern' | 'mention' | 'mention-sticky'
engage_pattern regex source; required when mode='pattern';
'.' is the "always" sentinel
sender_scope 'all' | 'known'
ignored_message_policy 'drop' | 'accumulate'
Inbound routing becomes a fan-out — every wired agent is evaluated
independently. A match gets its own session + container wake. A miss
with accumulate keeps the message as context-only (trigger=0) in that
agent's session, so when the agent does eventually engage it sees the
prior chatter.
## Schema
- Migration 010 (`engage-modes`): adds the 4 new columns, backfills
from trigger_rules.pattern + requiresTrigger + response_scope, drops
the legacy columns.
- messages_in gains `trigger INTEGER NOT NULL DEFAULT 1` (session DB
schema + `migrateMessagesInTable` forward-compat).
- countDueMessages gates waking on `trigger = 1`.
## Routing
- `pickAgent` (returns one) → loop over all wired agents. Per agent:
evaluate engage_mode; run access gate + sender-scope gate; on full
match → resolveSession + writeSessionMessage(trigger=1) + wake. On
miss with accumulate → writeSessionMessage(trigger=0), no wake. On
miss with drop → skip.
- New `findSessionForAgent(agentGroupId, mgId, threadId)` scopes
session lookup by agent so fan-out doesn't cross sessions.
- `messageIdForAgent` namespaces inbound message ids by agent_group_id
so PRIMARY KEY doesn't collide across per-agent session DBs.
## Adapter layer
- `ConversationConfig` replaces `triggerPattern` + `requiresTrigger`
with `engageMode` + `engagePattern`.
- Chat SDK bridge stores `Map<platformId, ConversationConfig[]>` (multi-
agent per conversation) and applies union gating pre-onInbound:
* onSubscribedMessage: engage if any wiring keeps firing in
subscribed state (mention-sticky or pattern)
* onNewMention: engage on mention; only subscribes the thread if
at least one wiring is `mention-sticky`
* onDirectMessage: engage per mode; sticky follows same rule
- Bridge no longer unconditionally calls `thread.subscribe()`.
## Sender scope
- Permissions module registers a second hook `setSenderScopeGate` that
runs per-wiring after the existing access gate. `sender_scope='known'`
requires canAccessAgentGroup(); `'all'` is a no-op. Not installed →
no-op everywhere (default allow).
## Container side
- Host passes `NANOCLAW_MAX_MESSAGES_PER_PROMPT` (reuses existing
MAX_MESSAGES_PER_PROMPT config; was dead code from v1).
- `getPendingMessages` queries `ORDER BY seq DESC LIMIT N`, reverses to
chronological order for the prompt — accumulated context rides along
with trigger rows up to the cap.
- `MessageInRow` gains `trigger: number` so the container can tell them
apart in downstream code (container still processes both; only the
host uses `trigger=0` for don't-wake).
## Defaults (per ACTION-ITEMS item 1 decision)
- DM (is_group=0): `engage_mode='pattern'`, `engage_pattern='.'` (always)
- Threaded group: `engage_mode='mention-sticky'` (seed-discord)
- Non-threaded group / CLI: pattern '.' in bootstrap scripts
## Tests
- src/host-core.test.ts: 3 new cases — fan-out (2 agents, 2 sessions,
2 wakes), accumulate (trigger=0 + no wake), drop (no session created).
- Existing 10 host-core tests still pass.
- Migration 010 runs on an empty DB in 0-row path — verified.
Closes: ACTION-ITEMS items 1, 4.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
266 lines
8.6 KiB
TypeScript
266 lines
8.6 KiB
TypeScript
/**
|
|
* End-to-end test of v2 channel adapter pipeline:
|
|
*
|
|
* Mock adapter → onInbound → router → session DB → Docker container →
|
|
* agent-runner → Claude → messages_out → delivery → mock adapter.deliver()
|
|
*
|
|
* Usage: pnpm exec tsx scripts/test-v2-channel-e2e.ts
|
|
*/
|
|
import Database from 'better-sqlite3';
|
|
import fs from 'fs';
|
|
import path from 'path';
|
|
|
|
const TEST_DIR = '/tmp/nanoclaw-v2-channel-e2e';
|
|
if (fs.existsSync(TEST_DIR)) fs.rmSync(TEST_DIR, { recursive: true });
|
|
fs.mkdirSync(TEST_DIR, { recursive: true });
|
|
|
|
// --- Step 1: Init central DB ---
|
|
console.log('\n=== Step 1: Init central DB ===');
|
|
|
|
import { initDb } from '../src/db/connection.js';
|
|
import { runMigrations } from '../src/db/migrations/index.js';
|
|
import { createAgentGroup } from '../src/db/agent-groups.js';
|
|
import { createMessagingGroup, createMessagingGroupAgent } from '../src/db/messaging-groups.js';
|
|
|
|
const centralDb = initDb(path.join(TEST_DIR, 'v2.db'));
|
|
runMigrations(centralDb);
|
|
|
|
// Create groups dir for agent folder mount
|
|
const groupsDir = path.resolve(process.cwd(), 'groups');
|
|
const testGroupDir = path.join(groupsDir, 'test-channel-e2e');
|
|
fs.mkdirSync(testGroupDir, { recursive: true });
|
|
fs.writeFileSync(path.join(testGroupDir, 'CLAUDE.md'), '# Test Agent\nYou are a test agent. Be brief.\n');
|
|
|
|
createAgentGroup({
|
|
id: 'ag-chan',
|
|
name: 'Channel E2E Agent',
|
|
folder: 'test-channel-e2e',
|
|
agent_provider: 'claude',
|
|
created_at: new Date().toISOString(),
|
|
});
|
|
|
|
createMessagingGroup({
|
|
id: 'mg-chan',
|
|
channel_type: 'mock',
|
|
platform_id: 'mock-channel-1',
|
|
name: 'Mock Channel',
|
|
is_group: 0,
|
|
unknown_sender_policy: 'public',
|
|
created_at: new Date().toISOString(),
|
|
});
|
|
|
|
createMessagingGroupAgent({
|
|
id: 'mga-chan',
|
|
messaging_group_id: 'mg-chan',
|
|
agent_group_id: 'ag-chan',
|
|
engage_mode: 'pattern',
|
|
engage_pattern: '.',
|
|
sender_scope: 'all',
|
|
ignored_message_policy: 'drop',
|
|
session_mode: 'shared',
|
|
priority: 0,
|
|
created_at: new Date().toISOString(),
|
|
});
|
|
|
|
console.log('✓ Central DB initialized');
|
|
|
|
// --- Step 2: Set up mock channel adapter + delivery ---
|
|
console.log('\n=== Step 2: Set up mock channel adapter & delivery ===');
|
|
|
|
import { routeInbound } from '../src/router.js';
|
|
import { setDeliveryAdapter, startActiveDeliveryPoll, stopDeliveryPolls } from '../src/delivery.js';
|
|
import { getChannelAdapter, registerChannelAdapter, initChannelAdapters } from '../src/channels/channel-registry.js';
|
|
import { findSession } from '../src/db/sessions.js';
|
|
import { sessionDbPath } from '../src/session-manager.js';
|
|
import type { ChannelAdapter, ChannelSetup, OutboundMessage } from '../src/channels/adapter.js';
|
|
|
|
// Track delivered messages
|
|
const deliveredMessages: Array<{ platformId: string; threadId: string | null; message: OutboundMessage }> = [];
|
|
let lastDeliveryTime = 0;
|
|
const startTime = Date.now();
|
|
|
|
// Create mock adapter
|
|
const mockAdapter: ChannelAdapter = {
|
|
name: 'mock',
|
|
channelType: 'mock',
|
|
|
|
async setup(config: ChannelSetup) {
|
|
console.log(` ✓ Mock adapter setup with ${config.conversations.length} conversations`);
|
|
},
|
|
|
|
async deliver(platformId, threadId, message) {
|
|
deliveredMessages.push({ platformId, threadId, message });
|
|
lastDeliveryTime = Date.now();
|
|
const elapsed = Math.floor((Date.now() - startTime) / 1000);
|
|
const content = message.content as Record<string, unknown>;
|
|
const text = ((content.text as string) || '').slice(0, 120);
|
|
console.log(` ✓ [${elapsed}s] Delivered #${deliveredMessages.length}: ${text}...`);
|
|
},
|
|
|
|
async setTyping() {},
|
|
async teardown() {},
|
|
isConnected() { return true; },
|
|
};
|
|
|
|
// Register mock adapter
|
|
registerChannelAdapter('mock', { factory: () => mockAdapter });
|
|
|
|
// Init channel adapters — this calls setup() with conversation configs from central DB
|
|
await initChannelAdapters((adapter) => ({
|
|
conversations: [
|
|
{
|
|
platformId: 'mock-channel-1',
|
|
agentGroupId: 'ag-chan',
|
|
engageMode: 'pattern',
|
|
engagePattern: '.',
|
|
sessionMode: 'shared',
|
|
},
|
|
],
|
|
onInbound(platformId, threadId, message) {
|
|
routeInbound({
|
|
channelType: adapter.channelType,
|
|
platformId,
|
|
threadId,
|
|
message: {
|
|
id: message.id,
|
|
kind: message.kind,
|
|
content: JSON.stringify(message.content),
|
|
timestamp: message.timestamp,
|
|
},
|
|
}).catch((err) => console.error('Route error:', err));
|
|
},
|
|
onMetadata() {},
|
|
}));
|
|
|
|
// Set up delivery adapter bridge
|
|
setDeliveryAdapter({
|
|
async deliver(channelType, platformId, threadId, kind, content) {
|
|
const adapter = getChannelAdapter(channelType);
|
|
if (!adapter) return;
|
|
await adapter.deliver(platformId, threadId, { kind, content: JSON.parse(content) });
|
|
},
|
|
});
|
|
|
|
// Start delivery polling
|
|
startActiveDeliveryPoll();
|
|
console.log('✓ Mock adapter & delivery configured');
|
|
|
|
// --- Step 3: Simulate inbound message through adapter ---
|
|
console.log('\n=== Step 3: Simulate inbound message ===');
|
|
|
|
// This is what a real adapter would do when receiving a platform message
|
|
const adapterSetup = (mockAdapter as { _setup?: ChannelSetup })._setup;
|
|
|
|
// Call routeInbound directly (simulating onInbound callback)
|
|
await routeInbound({
|
|
channelType: 'mock',
|
|
platformId: 'mock-channel-1',
|
|
threadId: null,
|
|
message: {
|
|
id: 'msg-chan-1',
|
|
kind: 'chat',
|
|
content: JSON.stringify({
|
|
sender: 'Gavriel',
|
|
text: 'Call the send_message tool 3 times: text="Update 1", text="Update 2", text="Update 3". Make each call separately. After all 3, say "Done".',
|
|
}),
|
|
timestamp: new Date().toISOString(),
|
|
},
|
|
});
|
|
|
|
const session = findSession('mg-chan', null);
|
|
if (!session) {
|
|
console.log('✗ No session created!');
|
|
cleanup();
|
|
process.exit(1);
|
|
}
|
|
console.log(`✓ Session: ${session.id}`);
|
|
console.log(`✓ Container status: ${session.container_status}`);
|
|
|
|
import { execSync } from 'child_process';
|
|
const checkContainerLogs = () => {
|
|
try {
|
|
const containers = execSync('docker ps -a --filter name=nanoclaw-v2-test-channel --format "{{.Names}}"').toString().trim();
|
|
for (const name of containers.split('\n').filter(Boolean)) {
|
|
console.log(`\nContainer logs (${name}):`);
|
|
console.log(execSync(`docker logs ${name} 2>&1`).toString());
|
|
}
|
|
} catch { /* ignore */ }
|
|
};
|
|
|
|
const sessDbPath = sessionDbPath('ag-chan', session.id);
|
|
console.log(`✓ Session DB: ${sessDbPath}`);
|
|
|
|
// --- Step 4: Wait for delivery through mock adapter ---
|
|
console.log('\n=== Step 4: Waiting for delivery through mock adapter... ===');
|
|
const TIMEOUT_MS = 300_000;
|
|
|
|
// Wait for deliveries — resolve when no new ones for 30s after first delivery
|
|
await new Promise<void>((resolve) => {
|
|
const poll = () => {
|
|
if (lastDeliveryTime > 0 && Date.now() - lastDeliveryTime > 30_000) {
|
|
resolve();
|
|
return;
|
|
}
|
|
if (Date.now() - startTime > TIMEOUT_MS) {
|
|
console.log(`\n✗ Timed out after ${TIMEOUT_MS / 1000}s`);
|
|
// Check session DB directly
|
|
try {
|
|
const db = new Database(sessDbPath, { readonly: true });
|
|
const out = db.prepare('SELECT * FROM messages_out').all();
|
|
console.log(` messages_out rows: ${out.length}`);
|
|
if (out.length > 0) console.log(' (messages exist but delivery failed)');
|
|
db.close();
|
|
} catch { /* ignore */ }
|
|
checkContainerLogs();
|
|
cleanup();
|
|
process.exit(1);
|
|
}
|
|
const elapsed = Math.floor((Date.now() - startTime) / 1000);
|
|
if (elapsed > 0 && elapsed % 10 === 0) {
|
|
process.stdout.write(` ${elapsed}s...`);
|
|
}
|
|
setTimeout(poll, 1000);
|
|
};
|
|
poll();
|
|
});
|
|
|
|
// --- Step 5: Print results ---
|
|
console.log('\n\n=== Results ===');
|
|
|
|
console.log('\nSession DB:');
|
|
try {
|
|
const db = new Database(sessDbPath, { readonly: true });
|
|
const inRows = db.prepare('SELECT * FROM messages_in').all() as Array<Record<string, unknown>>;
|
|
const outRows = db.prepare('SELECT * FROM messages_out').all() as Array<Record<string, unknown>>;
|
|
db.close();
|
|
|
|
console.log(` messages_in: ${inRows.length} row(s)`);
|
|
for (const r of inRows) {
|
|
console.log(` [${r.id}] status=${r.status} kind=${r.kind}`);
|
|
}
|
|
console.log(` messages_out: ${outRows.length} row(s)`);
|
|
for (const r of outRows) {
|
|
const content = JSON.parse(r.content as string);
|
|
console.log(` [${r.id}] kind=${r.kind} delivered=${r.delivered}`);
|
|
console.log(` → ${content.text}`);
|
|
}
|
|
} catch (err) {
|
|
console.log(` (could not read session DB: ${err})`);
|
|
}
|
|
|
|
console.log('\nDelivered through mock adapter:');
|
|
for (const d of deliveredMessages) {
|
|
const content = d.message.content as Record<string, unknown>;
|
|
console.log(` → [${d.platformId}] ${content.text}`);
|
|
}
|
|
|
|
console.log('\n✓ Full channel adapter pipeline verified!');
|
|
|
|
cleanup();
|
|
process.exit(0);
|
|
|
|
function cleanup() {
|
|
stopDeliveryPolls();
|
|
fs.rmSync(testGroupDir, { recursive: true, force: true });
|
|
}
|