fix(channels): restore channel adapters deleted during v2 sync

Phase 1 boundary sync (5454bae) inadvertently re-applied v2's channel-
adapter deletions (v2 commit 437ba63 "move channel adapters off v2
trunk") to the channels branch. 17 adapter files and their package.json
deps were wiped:

- discord, gchat, github, imessage, linear, matrix, resend, slack,
  teams, telegram + telegram-markdown-sanitize + telegram-pairing,
  webex, whatsapp, whatsapp-cloud
- @chat-adapter/* packages, @whiskeysockets/baileys, @resend/...,
  qrcode, pino, chat-adapter-imessage, @beeper/...

Caught when testing PR #3 — the service had no channels to bind to.

Root cause: the sync merge commit message ("No channel adapter changes
required") was wrong. I checked the registry surface but not file
presence. Providers had the same failure mode during its sync, but
there it surfaced immediately via a test import; channels has no test
that imports adapter files directly, so it slipped through.

Fix: restore src/channels/*.ts and the matching package.json /
pnpm-lock.yaml entries from 0d75ca2 (last pre-sync commit). Tests pass
(198/198 vs 137/137 pre-restore — the restored telegram-pairing and
markdown-sanitize tests are back).

Going forward: channel/provider branches that carry files v2 has
deleted need `git checkout origin/<branch> -- <paths>` applied after
any v2 sync merge that touches those paths, or a merge strategy that
ignores deletions under the branch-owned directory.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
gavrielc
2026-04-18 15:38:59 +03:00
parent 5454bae426
commit 303a5c7100
19 changed files with 5845 additions and 11 deletions
+18 -1
View File
@@ -22,10 +22,27 @@
"test:watch": "vitest"
},
"dependencies": {
"@beeper/chat-adapter-matrix": "^0.2.0",
"@bitbasti/chat-adapter-webex": "^0.1.0",
"@chat-adapter/discord": "^4.24.0",
"@chat-adapter/gchat": "^4.24.0",
"@chat-adapter/github": "^4.24.0",
"@chat-adapter/linear": "^4.26.0",
"@chat-adapter/slack": "^4.24.0",
"@chat-adapter/state-memory": "^4.24.0",
"@chat-adapter/teams": "^4.24.0",
"@chat-adapter/telegram": "^4.24.0",
"@chat-adapter/whatsapp": "^4.24.0",
"@onecli-sh/sdk": "^0.3.1",
"@resend/chat-sdk-adapter": "^0.1.1",
"@types/qrcode": "^1.5.6",
"@whiskeysockets/baileys": "^6.17.16",
"better-sqlite3": "11.10.0",
"chat": "^4.24.0",
"cron-parser": "5.5.0"
"chat-adapter-imessage": "^0.1.1",
"cron-parser": "5.5.0",
"pino": "^9.6.0",
"qrcode": "^1.5.4"
},
"devDependencies": {
"@eslint/js": "^9.35.0",
+3881 -10
View File
File diff suppressed because it is too large Load Diff
+38
View File
@@ -0,0 +1,38 @@
/**
* Discord channel adapter (v2) — uses Chat SDK bridge.
* Self-registers on import.
*/
import { createDiscordAdapter } from '@chat-adapter/discord';
import { readEnvFile } from '../env.js';
import { createChatSdkBridge, type ReplyContext } from './chat-sdk-bridge.js';
import { registerChannelAdapter } from './channel-registry.js';
// eslint-disable-next-line @typescript-eslint/no-explicit-any
function extractReplyContext(raw: Record<string, any>): ReplyContext | null {
if (!raw.referenced_message) return null;
const reply = raw.referenced_message;
return {
text: reply.content || '',
sender: reply.author?.global_name || reply.author?.username || 'Unknown',
};
}
registerChannelAdapter('discord', {
factory: () => {
const env = readEnvFile(['DISCORD_BOT_TOKEN', 'DISCORD_PUBLIC_KEY', 'DISCORD_APPLICATION_ID']);
if (!env.DISCORD_BOT_TOKEN) return null;
const discordAdapter = createDiscordAdapter({
botToken: env.DISCORD_BOT_TOKEN,
publicKey: env.DISCORD_PUBLIC_KEY,
applicationId: env.DISCORD_APPLICATION_ID,
});
return createChatSdkBridge({
adapter: discordAdapter,
concurrency: 'concurrent',
botToken: env.DISCORD_BOT_TOKEN,
extractReplyContext,
supportsThreads: true,
});
},
});
+20
View File
@@ -0,0 +1,20 @@
/**
* Google Chat channel adapter (v2) — uses Chat SDK bridge.
* Self-registers on import.
*/
import { createGoogleChatAdapter } from '@chat-adapter/gchat';
import { readEnvFile } from '../env.js';
import { createChatSdkBridge } from './chat-sdk-bridge.js';
import { registerChannelAdapter } from './channel-registry.js';
registerChannelAdapter('gchat', {
factory: () => {
const env = readEnvFile(['GCHAT_CREDENTIALS']);
if (!env.GCHAT_CREDENTIALS) return null;
const gchatAdapter = createGoogleChatAdapter({
credentials: JSON.parse(env.GCHAT_CREDENTIALS),
});
return createChatSdkBridge({ adapter: gchatAdapter, concurrency: 'concurrent', supportsThreads: true });
},
});
+22
View File
@@ -0,0 +1,22 @@
/**
* GitHub channel adapter (v2) — uses Chat SDK bridge.
* PR comment threads as conversations.
* Self-registers on import.
*/
import { createGitHubAdapter } from '@chat-adapter/github';
import { readEnvFile } from '../env.js';
import { createChatSdkBridge } from './chat-sdk-bridge.js';
import { registerChannelAdapter } from './channel-registry.js';
registerChannelAdapter('github', {
factory: () => {
const env = readEnvFile(['GITHUB_TOKEN', 'GITHUB_WEBHOOK_SECRET']);
if (!env.GITHUB_TOKEN) return null;
const githubAdapter = createGitHubAdapter({
token: env.GITHUB_TOKEN,
webhookSecret: env.GITHUB_WEBHOOK_SECRET,
});
return createChatSdkBridge({ adapter: githubAdapter, concurrency: 'queue', supportsThreads: true });
},
});
+29
View File
@@ -0,0 +1,29 @@
/**
* iMessage channel adapter (v2) — uses Chat SDK bridge.
* Supports local mode (macOS Full Disk Access) and remote mode (Photon API).
* Self-registers on import.
*/
import { createiMessageAdapter } from 'chat-adapter-imessage';
import { readEnvFile } from '../env.js';
import { createChatSdkBridge } from './chat-sdk-bridge.js';
import { registerChannelAdapter } from './channel-registry.js';
registerChannelAdapter('imessage', {
factory: () => {
const env = readEnvFile(['IMESSAGE_ENABLED', 'IMESSAGE_LOCAL', 'IMESSAGE_SERVER_URL', 'IMESSAGE_API_KEY']);
const isLocal = env.IMESSAGE_LOCAL !== 'false';
if (isLocal && !env.IMESSAGE_ENABLED) return null;
if (!isLocal && !env.IMESSAGE_SERVER_URL) return null;
const rawAdapter = createiMessageAdapter({
local: isLocal,
serverUrl: env.IMESSAGE_SERVER_URL,
apiKey: env.IMESSAGE_API_KEY,
});
// Polyfill channelIdFromThreadId (community adapter doesn't implement it)
const imessageAdapter = Object.assign(rawAdapter, {
channelIdFromThreadId: (threadId: string) => threadId,
});
return createChatSdkBridge({ adapter: imessageAdapter, concurrency: 'concurrent', supportsThreads: false });
},
});
+22
View File
@@ -0,0 +1,22 @@
/**
* Linear channel adapter (v2) — uses Chat SDK bridge.
* Issue comment threads as conversations.
* Self-registers on import.
*/
import { createLinearAdapter } from '@chat-adapter/linear';
import { readEnvFile } from '../env.js';
import { createChatSdkBridge } from './chat-sdk-bridge.js';
import { registerChannelAdapter } from './channel-registry.js';
registerChannelAdapter('linear', {
factory: () => {
const env = readEnvFile(['LINEAR_API_KEY', 'LINEAR_WEBHOOK_SECRET']);
if (!env.LINEAR_API_KEY) return null;
const linearAdapter = createLinearAdapter({
apiKey: env.LINEAR_API_KEY,
webhookSecret: env.LINEAR_WEBHOOK_SECRET,
});
return createChatSdkBridge({ adapter: linearAdapter, concurrency: 'queue', supportsThreads: true });
},
});
+23
View File
@@ -0,0 +1,23 @@
/**
* Matrix channel adapter (v2) — uses Chat SDK bridge.
* Self-registers on import.
*/
import { createMatrixAdapter } from '@beeper/chat-adapter-matrix';
import { readEnvFile } from '../env.js';
import { createChatSdkBridge } from './chat-sdk-bridge.js';
import { registerChannelAdapter } from './channel-registry.js';
registerChannelAdapter('matrix', {
factory: () => {
const env = readEnvFile(['MATRIX_BASE_URL', 'MATRIX_ACCESS_TOKEN', 'MATRIX_USER_ID', 'MATRIX_BOT_USERNAME']);
if (!env.MATRIX_BASE_URL) return null;
// Matrix adapter reads from process.env directly
process.env.MATRIX_BASE_URL = env.MATRIX_BASE_URL;
if (env.MATRIX_ACCESS_TOKEN) process.env.MATRIX_ACCESS_TOKEN = env.MATRIX_ACCESS_TOKEN;
if (env.MATRIX_USER_ID) process.env.MATRIX_USER_ID = env.MATRIX_USER_ID;
if (env.MATRIX_BOT_USERNAME) process.env.MATRIX_BOT_USERNAME = env.MATRIX_BOT_USERNAME;
const matrixAdapter = createMatrixAdapter();
return createChatSdkBridge({ adapter: matrixAdapter, concurrency: 'concurrent', supportsThreads: false });
},
});
+23
View File
@@ -0,0 +1,23 @@
/**
* Resend (email) channel adapter (v2) — uses Chat SDK bridge.
* Self-registers on import.
*/
import { createResendAdapter } from '@resend/chat-sdk-adapter';
import { readEnvFile } from '../env.js';
import { createChatSdkBridge } from './chat-sdk-bridge.js';
import { registerChannelAdapter } from './channel-registry.js';
registerChannelAdapter('resend', {
factory: () => {
const env = readEnvFile(['RESEND_API_KEY', 'RESEND_FROM_ADDRESS', 'RESEND_FROM_NAME', 'RESEND_WEBHOOK_SECRET']);
if (!env.RESEND_API_KEY) return null;
const resendAdapter = createResendAdapter({
apiKey: env.RESEND_API_KEY,
fromAddress: env.RESEND_FROM_ADDRESS,
fromName: env.RESEND_FROM_NAME,
webhookSecret: env.RESEND_WEBHOOK_SECRET,
});
return createChatSdkBridge({ adapter: resendAdapter, concurrency: 'queue', supportsThreads: false });
},
});
+21
View File
@@ -0,0 +1,21 @@
/**
* Slack channel adapter (v2) — uses Chat SDK bridge.
* Self-registers on import.
*/
import { createSlackAdapter } from '@chat-adapter/slack';
import { readEnvFile } from '../env.js';
import { createChatSdkBridge } from './chat-sdk-bridge.js';
import { registerChannelAdapter } from './channel-registry.js';
registerChannelAdapter('slack', {
factory: () => {
const env = readEnvFile(['SLACK_BOT_TOKEN', 'SLACK_SIGNING_SECRET']);
if (!env.SLACK_BOT_TOKEN) return null;
const slackAdapter = createSlackAdapter({
botToken: env.SLACK_BOT_TOKEN,
signingSecret: env.SLACK_SIGNING_SECRET,
});
return createChatSdkBridge({ adapter: slackAdapter, concurrency: 'concurrent', supportsThreads: true });
},
});
+23
View File
@@ -0,0 +1,23 @@
/**
* Microsoft Teams channel adapter (v2) — uses Chat SDK bridge.
* Self-registers on import.
*/
import { createTeamsAdapter } from '@chat-adapter/teams';
import { readEnvFile } from '../env.js';
import { createChatSdkBridge } from './chat-sdk-bridge.js';
import { registerChannelAdapter } from './channel-registry.js';
registerChannelAdapter('teams', {
factory: () => {
const env = readEnvFile(['TEAMS_APP_ID', 'TEAMS_APP_PASSWORD', 'TEAMS_APP_TENANT_ID', 'TEAMS_APP_TYPE']);
if (!env.TEAMS_APP_ID) return null;
const teamsAdapter = createTeamsAdapter({
appId: env.TEAMS_APP_ID,
appPassword: env.TEAMS_APP_PASSWORD,
appType: (env.TEAMS_APP_TYPE as 'SingleTenant' | 'MultiTenant') || undefined,
appTenantId: env.TEAMS_APP_TENANT_ID || undefined,
});
return createChatSdkBridge({ adapter: teamsAdapter, concurrency: 'concurrent', supportsThreads: true });
},
});
@@ -0,0 +1,68 @@
import { describe, it, expect } from 'vitest';
import { sanitizeTelegramLegacyMarkdown } from './telegram-markdown-sanitize.js';
describe('sanitizeTelegramLegacyMarkdown', () => {
it('downgrades CommonMark **bold** to legacy *bold*', () => {
expect(sanitizeTelegramLegacyMarkdown('**Host path**')).toBe('*Host path*');
});
it('downgrades CommonMark __bold__ to legacy _italic_', () => {
expect(sanitizeTelegramLegacyMarkdown('__label__')).toBe('_label_');
});
it('leaves balanced legacy *bold* and _italic_ alone', () => {
expect(sanitizeTelegramLegacyMarkdown('a *b* c _d_ e')).toBe('a *b* c _d_ e');
});
it('preserves inline code spans untouched', () => {
const input = 'see `file_name.py` and `**not bold**` here';
expect(sanitizeTelegramLegacyMarkdown(input)).toBe(input);
});
it('preserves fenced code blocks untouched', () => {
const input = '```\nfoo_bar **baz**\n```';
expect(sanitizeTelegramLegacyMarkdown(input)).toBe(input);
});
it('strips formatting chars on odd delimiter count (unbalanced *)', () => {
expect(sanitizeTelegramLegacyMarkdown('a * b *c*')).toBe('a b c');
});
it('strips formatting chars on odd delimiter count (unbalanced _)', () => {
expect(sanitizeTelegramLegacyMarkdown('file_name has _one italic_')).toBe('filename has one italic');
});
it('strips brackets when unbalanced', () => {
expect(sanitizeTelegramLegacyMarkdown('see [docs here')).toBe('see docs here');
});
it('leaves matched brackets (e.g. links) alone when counts balance', () => {
const input = 'see [docs](https://example.com) for more';
expect(sanitizeTelegramLegacyMarkdown(input)).toBe(input);
});
it('fixes the real failing message', () => {
const input =
'Sure! What do you want to mount, and where should it appear inside the container?\n\n' +
'- **Host path** (on your machine): e.g. `~/projects/webapp`\n' +
'- **Container path**: e.g. `workspace/webapp`\n' +
'- **Read-only or read-write?**';
const out = sanitizeTelegramLegacyMarkdown(input);
expect(out).not.toContain('**');
expect(out).toContain('*Host path*');
expect(out).toContain('`~/projects/webapp`');
expect((out.match(/\*/g) ?? []).length % 2).toBe(0);
});
it('is a no-op on empty string', () => {
expect(sanitizeTelegramLegacyMarkdown('')).toBe('');
});
it('replaces dash list bullets with • so the adapter does not re-emit `*` markers', () => {
expect(sanitizeTelegramLegacyMarkdown('- one\n- two')).toBe('• one\n• two');
});
it('preserves indented list structure', () => {
expect(sanitizeTelegramLegacyMarkdown(' - nested')).toBe(' • nested');
});
});
@@ -0,0 +1,50 @@
/**
* Sanitize outbound text for Telegram's legacy `Markdown` parse mode.
*
* WORKAROUND: The @chat-adapter/telegram adapter hardcodes parse_mode=Markdown
* (legacy) but its converter emits CommonMark. Messages with `**bold**`, odd
* delimiter counts, or malformed links are rejected by Telegram and dropped
* after retries. Remove this once upstream ships real mode-aware conversion
* (vercel/chat PR #367 adds the knob; a follow-up is needed for the converter).
*/
const CODE_PATTERN = /```[\s\S]*?```|`[^`\n]*`/g;
const PLACEHOLDER_PREFIX = '\x00CODE';
const PLACEHOLDER_SUFFIX = '\x00';
export function sanitizeTelegramLegacyMarkdown(input: string): string {
if (!input) return input;
const codeSegments: string[] = [];
let text = input.replace(CODE_PATTERN, (m) => {
codeSegments.push(m);
return `${PLACEHOLDER_PREFIX}${codeSegments.length - 1}${PLACEHOLDER_SUFFIX}`;
});
// The adapter re-parses and re-stringifies markdown before sending, which
// rewrites `- item` list bullets into `* item` — injecting unbalanced
// asterisks that Telegram's legacy Markdown parser then rejects. Replace
// list bullets with a plain Unicode bullet so the adapter treats the line
// as prose.
text = text.replace(/^(\s*)[-+]\s+/gm, '$1• ');
text = text.replace(/\*\*([^*\n]+?)\*\*/g, '*$1*');
text = text.replace(/__([^_\n]+?)__/g, '_$1_');
const starCount = (text.match(/\*/g) ?? []).length;
const underCount = (text.match(/_/g) ?? []).length;
if (starCount % 2 !== 0 || underCount % 2 !== 0) {
text = text.replace(/[*_]/g, '');
}
const openBrackets = (text.match(/\[/g) ?? []).length;
const closeBrackets = (text.match(/\]/g) ?? []).length;
if (openBrackets !== closeBrackets) {
text = text.replace(/[[\]]/g, '');
}
return text.replace(
new RegExp(`${PLACEHOLDER_PREFIX}(\\d+)${PLACEHOLDER_SUFFIX}`, 'g'),
(_, i) => codeSegments[Number(i)],
);
}
+248
View File
@@ -0,0 +1,248 @@
import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest';
import fs from 'fs';
import path from 'path';
import os from 'os';
vi.mock('../log.js', () => ({ log: { info: vi.fn(), warn: vi.fn(), error: vi.fn(), debug: vi.fn() } }));
import {
createPairing,
tryConsume,
getStatus,
getPairing,
waitForPairing,
extractCode,
extractAddressedText,
_setStorePathForTest,
_resetForTest,
} from './telegram-pairing.js';
let tmpDir: string;
beforeEach(() => {
tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), 'tg-pair-'));
_setStorePathForTest(path.join(tmpDir, 'pairings.json'));
});
afterEach(() => {
_resetForTest();
_setStorePathForTest(null);
fs.rmSync(tmpDir, { recursive: true, force: true });
});
describe('extractAddressedText', () => {
it('strips @botname prefix', () => {
expect(extractAddressedText('@nanobot 1234', 'nanobot')).toBe('1234');
});
it('is case-insensitive', () => {
expect(extractAddressedText('@NanoBot hello', 'nanobot')).toBe('hello');
});
it('returns null when not addressed', () => {
expect(extractAddressedText('hello 1234', 'nanobot')).toBeNull();
});
it('returns null when address is mid-text', () => {
expect(extractAddressedText('hi @nanobot 1234', 'nanobot')).toBeNull();
});
});
describe('extractCode', () => {
it('accepts a bare 4-digit code', () => {
expect(extractCode('0349', 'nanobot')).toBe('0349');
});
it('accepts 4-digit code after @botname', () => {
expect(extractCode('@nanobot 0042', 'nanobot')).toBe('0042');
});
it('rejects non-4-digit numbers', () => {
expect(extractCode('@nanobot 12345', 'nanobot')).toBeNull();
expect(extractCode('@nanobot 12', 'nanobot')).toBeNull();
expect(extractCode('12345', 'nanobot')).toBeNull();
});
it('rejects loose matches with surrounding text', () => {
expect(extractCode('my pin is 0349', 'nanobot')).toBeNull();
expect(extractCode('0349 thanks', 'nanobot')).toBeNull();
});
});
describe('createPairing', () => {
it('generates a 4-digit code', async () => {
const r = await createPairing('main');
expect(r.code).toMatch(/^\d{4}$/);
expect(r.status).toBe('pending');
});
it('does not collide with active codes', async () => {
const codes = new Set<string>();
for (let i = 0; i < 20; i++) {
const r = await createPairing('main');
expect(codes.has(r.code)).toBe(false);
codes.add(r.code);
}
});
});
describe('tryConsume', () => {
it('matches and marks consumed', async () => {
const r = await createPairing('main');
const consumed = await tryConsume({
text: `@nanobot ${r.code}`,
botUsername: 'nanobot',
platformId: 'telegram:123',
isGroup: false,
adminUserId: 'u1',
});
expect(consumed).not.toBeNull();
expect(consumed!.status).toBe('consumed');
expect(consumed!.consumed?.platformId).toBe('telegram:123');
expect(consumed!.consumed?.adminUserId).toBe('u1');
expect(getStatus(r.code)).toBe('consumed');
});
it('returns null on no match (silent drop)', async () => {
await createPairing('main');
const out = await tryConsume({
text: '@nanobot 9999',
botUsername: 'nanobot',
platformId: 'x',
isGroup: false,
});
expect(out).toBeNull();
});
it('matches a bare code without @botname addressing', async () => {
const r = await createPairing('main');
const out = await tryConsume({
text: r.code,
botUsername: 'nanobot',
platformId: 'x',
isGroup: false,
});
expect(out).not.toBeNull();
expect(out!.status).toBe('consumed');
});
it('cannot be consumed twice', async () => {
const r = await createPairing('main');
await tryConsume({ text: `@b ${r.code}`, botUsername: 'b', platformId: 'p', isGroup: false });
const second = await tryConsume({ text: `@b ${r.code}`, botUsername: 'b', platformId: 'p', isGroup: false });
expect(second).toBeNull();
});
it('cannot consume an invalidated pairing', async () => {
const r = await createPairing('main');
// Invalidate by sending a wrong code
await tryConsume({ text: '9999', botUsername: 'b', platformId: 'p', isGroup: false });
const out = await tryConsume({ text: `@b ${r.code}`, botUsername: 'b', platformId: 'p', isGroup: false });
expect(out).toBeNull();
expect(getStatus(r.code)).toBe('invalidated');
});
});
describe('getStatus', () => {
it('returns unknown for missing codes', () => {
expect(getStatus('0000')).toBe('unknown');
});
});
describe('waitForPairing', () => {
it('resolves when consumed', async () => {
const r = await createPairing('main');
const p = waitForPairing(r.code, { pollMs: 50 });
setTimeout(() => {
tryConsume({ text: `@b ${r.code}`, botUsername: 'b', platformId: 'tg:1', isGroup: true, name: 'Group' });
}, 100);
const consumed = await p;
expect(consumed.status).toBe('consumed');
expect(consumed.consumed?.name).toBe('Group');
});
it('rejects on invalidation', async () => {
const r = await createPairing('main');
const waiter = waitForPairing(r.code, { pollMs: 30 });
setTimeout(() => {
tryConsume({ text: '0000', botUsername: 'b', platformId: 'tg:1', isGroup: false });
}, 60);
await expect(waiter).rejects.toThrow(/invalidated/);
});
});
describe('replace-by-default', () => {
it('supersedes an existing pending pairing with the same intent', async () => {
const first = await createPairing('main');
const second = await createPairing('main');
expect(getStatus(first.code)).toBe('invalidated');
expect(getStatus(second.code)).toBe('pending');
});
it('does not supersede pairings with a different intent', async () => {
const a = await createPairing({ kind: 'wire-to', folder: 'work' });
const b = await createPairing({ kind: 'wire-to', folder: 'side' });
expect(getStatus(a.code)).toBe('pending');
expect(getStatus(b.code)).toBe('pending');
});
it('causes waitForPairing on the old code to reject as invalidated', async () => {
const first = await createPairing('main');
const waiter = waitForPairing(first.code, { pollMs: 30 });
await new Promise((r) => setTimeout(r, 50));
await createPairing('main');
await expect(waiter).rejects.toThrow(/invalidated/);
});
});
describe('attempt tracking', () => {
it('fires onAttempt for a wrong code, invalidates the pairing, and rejects the waiter', async () => {
const r = await createPairing('main');
const attempts: string[] = [];
const waiter = waitForPairing(r.code, {
pollMs: 30,
onAttempt: (a) => attempts.push(a.candidate),
});
setTimeout(() => {
tryConsume({ text: '9999', botUsername: 'b', platformId: 'tg:1', isGroup: false });
}, 60);
await expect(waiter).rejects.toThrow(/invalidated by wrong code \(9999\)/);
expect(attempts).toEqual(['9999']);
expect(getStatus(r.code)).toBe('invalidated');
});
it('a correct code consumes without firing onAttempt', async () => {
const r = await createPairing('main');
const attempts: string[] = [];
const waiter = waitForPairing(r.code, {
pollMs: 30,
onAttempt: (a) => attempts.push(a.candidate),
});
setTimeout(() => {
tryConsume({ text: r.code, botUsername: 'b', platformId: 'tg:1', isGroup: false });
}, 60);
const consumed = await waiter;
expect(consumed.status).toBe('consumed');
expect(attempts).toEqual([]);
});
it('ignores non-code messages and keeps the pairing pending', async () => {
const r = await createPairing('main');
await tryConsume({ text: 'hello there', botUsername: 'b', platformId: 'p', isGroup: false });
const after = getPairing(r.code);
expect(after?.status).toBe('pending');
expect(after?.attempts ?? []).toHaveLength(0);
});
it('a second code attempt after invalidation does not match', async () => {
const r = await createPairing('main');
await tryConsume({ text: '9999', botUsername: 'b', platformId: 'p', isGroup: false });
const retry = await tryConsume({ text: r.code, botUsername: 'b', platformId: 'p', isGroup: false });
expect(retry).toBeNull();
});
});
describe('intent passthrough', () => {
it('preserves wire-to and new-agent intents', async () => {
const a = await createPairing({ kind: 'wire-to', folder: 'work' });
const b = await createPairing({ kind: 'new-agent', folder: 'side' });
const ca = await tryConsume({ text: `@b ${a.code}`, botUsername: 'b', platformId: 'p1', isGroup: true });
const cb = await tryConsume({ text: `@b ${b.code}`, botUsername: 'b', platformId: 'p2', isGroup: true });
expect(ca!.intent).toEqual({ kind: 'wire-to', folder: 'work' });
expect(cb!.intent).toEqual({ kind: 'new-agent', folder: 'side' });
});
});
+339
View File
@@ -0,0 +1,339 @@
/**
* Telegram pairing — proves the operator owns the chat they're registering.
*
* BotFather hands out tokens with no user binding, so anyone who guesses the
* bot's username can DM it. Pairing closes that gap: setup creates a one-time
* 4-digit code and the operator echoes it back from the chat they want to
* register. The message must be exactly the 4 digits (optionally prefixed by
* `@botname ` for groups with privacy ON) — arbitrary messages that happen to
* contain a 4-digit number do NOT match. The inbound interceptor in
* telegram.ts matches the code, records the chat, upserts the paired user,
* and (if no owner exists yet) promotes them to owner — all before the
* message ever reaches the router.
*
* Storage is a JSON file at data/telegram-pairings.json — single-process,
* read-modify-write under an in-process mutex.
*/
import fs from 'fs';
import path from 'path';
import { DATA_DIR } from '../config.js';
import { log } from '../log.js';
export type PairingIntent = 'main' | { kind: 'wire-to'; folder: string } | { kind: 'new-agent'; folder: string };
export type PairingStatus = 'pending' | 'consumed' | 'invalidated' | 'unknown';
export interface ConsumedDetails {
platformId: string;
isGroup: boolean;
name: string | null;
adminUserId: string | null;
consumedAt: string;
}
export interface PairingAttempt {
candidate: string;
platformId: string;
at: string;
matched: boolean;
}
export interface PairingRecord {
code: string;
intent: PairingIntent;
createdAt: string;
status: Exclude<PairingStatus, 'unknown'>;
consumed?: ConsumedDetails;
/** Recent pairing attempts observed while this record was pending. Capped. */
attempts?: PairingAttempt[];
}
const MAX_ATTEMPTS_PER_RECORD = 10;
function intentEquals(a: PairingIntent, b: PairingIntent): boolean {
if (a === 'main' || b === 'main') return a === b;
return a.kind === b.kind && a.folder === b.folder;
}
interface Store {
pairings: PairingRecord[];
}
/** Pairing codes do not expire — they are consumed on match or invalidated by wrong guesses. */
const FILE_NAME = 'telegram-pairings.json';
let storePathOverride: string | null = null;
export function _setStorePathForTest(p: string | null): void {
storePathOverride = p;
}
function storePath(): string {
return storePathOverride ?? path.join(DATA_DIR, FILE_NAME);
}
let mutex: Promise<unknown> = Promise.resolve();
function withLock<T>(fn: () => Promise<T> | T): Promise<T> {
const next = mutex.then(() => fn());
mutex = next.catch(() => {});
return next;
}
function readStore(): Store {
try {
const raw = fs.readFileSync(storePath(), 'utf8');
const parsed = JSON.parse(raw) as Store;
if (!Array.isArray(parsed.pairings)) return { pairings: [] };
return parsed;
} catch {
return { pairings: [] };
}
}
function writeStore(store: Store): void {
const p = storePath();
fs.mkdirSync(path.dirname(p), { recursive: true });
const tmp = `${p}.tmp`;
fs.writeFileSync(tmp, JSON.stringify(store, null, 2));
fs.renameSync(tmp, p);
}
/** Clean up old consumed/invalidated records (keep last 50). */
function sweep(store: Store): boolean {
if (store.pairings.length <= 50) return false;
store.pairings = store.pairings.slice(-50);
return true;
}
function generateCode(active: Set<string>): string {
// 4-digit numeric, zero-padded. 10k space, fine for one-at-a-time intents.
for (let i = 0; i < 50; i++) {
const code = Math.floor(Math.random() * 10000)
.toString()
.padStart(4, '0');
if (!active.has(code)) return code;
}
throw new Error('Could not allocate a free pairing code (too many active).');
}
export async function createPairing(intent: PairingIntent): Promise<PairingRecord> {
return withLock(() => {
const store = readStore();
sweep(store);
// Replace-by-default: a new pairing for an intent supersedes any existing
// pending pairing for the same intent. Old waitForPairing calls observe
// `invalidated` and exit on their own.
for (const r of store.pairings) {
if (r.status === 'pending' && intentEquals(r.intent, intent)) {
r.status = 'invalidated';
log.info('Pairing superseded by new request', { code: r.code, intent });
}
}
const active = new Set(store.pairings.filter((r) => r.status === 'pending').map((r) => r.code));
const record: PairingRecord = {
code: generateCode(active),
intent,
createdAt: new Date().toISOString(),
status: 'pending',
};
store.pairings.push(record);
writeStore(store);
log.info('Pairing created', { code: record.code, intent });
return record;
});
}
export interface ConsumeInput {
text: string;
botUsername: string;
platformId: string;
isGroup: boolean;
name?: string | null;
adminUserId?: string | null;
}
/** Strip leading @botname and return the trimmed remainder, or null if not addressed. */
export function extractAddressedText(text: string, botUsername: string): string | null {
const trimmed = text.trim();
const re = new RegExp(`^@${botUsername.replace(/[.*+?^${}()|[\\]\\\\]/g, '\\$&')}\\b`, 'i');
const m = trimmed.match(re);
if (!m) return null;
return trimmed.slice(m[0].length).trim();
}
/**
* Extract a pairing code from an inbound message. The message must be exactly
* 4 digits (optionally prefixed by `@botname `) — loose matches like
* "my pin is 1234" are rejected to avoid false positives from chatter.
*/
export function extractCode(text: string, botUsername: string): string | null {
const addressed = extractAddressedText(text, botUsername);
const candidate = (addressed !== null ? addressed : text).trim();
const m = candidate.match(/^(\d{4})$/);
return m ? m[1] : null;
}
/**
* Try to match an inbound message against a pending pairing. On match,
* marks the pairing consumed atomically and returns the record. Returns
* null on no match or expiry (silent drop).
*/
export async function tryConsume(input: ConsumeInput): Promise<PairingRecord | null> {
const code = extractCode(input.text, input.botUsername);
if (!code) return null;
return withLock(() => {
const store = readStore();
const now = Date.now();
sweep(store);
const record = store.pairings.find((r) => r.code === code && r.status === 'pending');
if (!record) {
// Miss: record the attempt on every currently-pending record so each
// waitForPairing caller can surface it as user feedback.
const attempt: PairingAttempt = {
candidate: code,
platformId: input.platformId,
at: new Date(now).toISOString(),
matched: false,
};
let recorded = false;
for (const r of store.pairings) {
if (r.status !== 'pending') continue;
r.attempts = [...(r.attempts ?? []), attempt].slice(-MAX_ATTEMPTS_PER_RECORD);
// One attempt per code. A wrong guess invalidates the pairing
// immediately — pair-telegram observes the `invalidated` signal and
// auto-issues a fresh code (up to a retry cap).
r.status = 'invalidated';
recorded = true;
}
writeStore(store);
if (recorded) {
log.info('Pairing invalidated by wrong attempt', { candidate: code, platformId: input.platformId });
}
return null;
}
record.status = 'consumed';
record.consumed = {
platformId: input.platformId,
isGroup: input.isGroup,
name: input.name ?? null,
adminUserId: input.adminUserId ?? null,
consumedAt: new Date(now).toISOString(),
};
record.attempts = [
...(record.attempts ?? []),
{ candidate: code, platformId: input.platformId, at: new Date(now).toISOString(), matched: true },
].slice(-MAX_ATTEMPTS_PER_RECORD);
writeStore(store);
log.info('Pairing consumed', { code, platformId: input.platformId, intent: record.intent });
return record;
});
}
export function getStatus(code: string): PairingStatus {
const store = readStore();
sweep(store);
const r = store.pairings.find((p) => p.code === code);
if (!r) return 'unknown';
return r.status;
}
export function getPairing(code: string): PairingRecord | null {
const store = readStore();
sweep(store);
return store.pairings.find((p) => p.code === code) ?? null;
}
export interface WaitForPairingOptions {
/** Polling interval as a fallback when fs.watch misses an event. */
pollMs?: number;
/** Fires once per new attempt recorded against this pairing (misses only). */
onAttempt?: (attempt: PairingAttempt) => void;
}
/**
* Resolve when the pairing is consumed; reject when it is invalidated
* (wrong code guess). Waits indefinitely — codes do not expire.
* Uses fs.watch as the primary signal with a slow poll fallback.
*/
export async function waitForPairing(code: string, opts: WaitForPairingOptions = {}): Promise<PairingRecord> {
const pollMs = opts.pollMs ?? 1000;
const initial = getPairing(code);
if (!initial) throw new Error(`Unknown pairing code: ${code}`);
return new Promise<PairingRecord>((resolve, reject) => {
let watcher: fs.FSWatcher | null = null;
let interval: NodeJS.Timeout | null = null;
let settled = false;
const cleanup = () => {
settled = true;
if (watcher)
try {
watcher.close();
} catch {
/* ignore */
}
if (interval) clearInterval(interval);
};
let seenAttempts = 0;
const check = () => {
if (settled) return;
const r = getPairing(code);
if (!r) {
cleanup();
reject(new Error(`Pairing ${code} disappeared`));
return;
}
// Surface any new miss attempts since the last tick. Only fire for
// misses — matches are signaled by `status === 'consumed'` below.
if (opts.onAttempt && r.attempts) {
for (let i = seenAttempts; i < r.attempts.length; i++) {
const a = r.attempts[i];
if (!a.matched) {
try {
opts.onAttempt(a);
} catch {
/* ignore */
}
}
}
seenAttempts = r.attempts.length;
}
if (r.status === 'consumed') {
cleanup();
resolve(r);
return;
}
if (r.status === 'invalidated') {
cleanup();
const lastMiss = r.attempts
?.slice()
.reverse()
.find((a) => !a.matched);
reject(new Error(`Pairing ${code} invalidated by wrong code${lastMiss ? ` (${lastMiss.candidate})` : ''}`));
return;
}
};
try {
const dir = path.dirname(storePath());
fs.mkdirSync(dir, { recursive: true });
watcher = fs.watch(dir, (_event, fname) => {
if (!fname || fname.toString().startsWith(path.basename(storePath()))) check();
});
} catch {
// fs.watch unsupported — poll-only is fine
}
interval = setInterval(check, pollMs);
check();
});
}
/** Test helper — wipe the store. */
export function _resetForTest(): void {
try {
fs.unlinkSync(storePath());
} catch {
// ignore
}
}
+229
View File
@@ -0,0 +1,229 @@
/**
* Telegram channel adapter (v2) — uses Chat SDK bridge, with a pairing
* interceptor wrapped around onInbound to verify chat ownership before
* registration. See telegram-pairing.ts for the why.
*/
import { createTelegramAdapter } from '@chat-adapter/telegram';
import { readEnvFile } from '../env.js';
import { log } from '../log.js';
import { createMessagingGroup, getMessagingGroupByPlatform, updateMessagingGroup } from '../db/messaging-groups.js';
import { grantRole, hasAnyOwner } from '../db/user-roles.js';
import { upsertUser } from '../db/users.js';
import { createChatSdkBridge, type ReplyContext } from './chat-sdk-bridge.js';
import { sanitizeTelegramLegacyMarkdown } from './telegram-markdown-sanitize.js';
import { registerChannelAdapter } from './channel-registry.js';
import type { ChannelAdapter, ChannelSetup, InboundMessage } from './adapter.js';
import { tryConsume } from './telegram-pairing.js';
/**
* Retry a one-shot operation that can fail on transient network errors at
* cold-start (DNS hiccups, brief upstream outages). Exponential backoff capped
* at 5 attempts — if the network is truly down we surface it instead of
* hanging the service indefinitely.
*/
async function withRetry<T>(fn: () => Promise<T>, label: string, maxAttempts = 5): Promise<T> {
let lastErr: unknown;
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
try {
return await fn();
} catch (err) {
lastErr = err;
if (attempt === maxAttempts) break;
const delay = Math.min(16000, 1000 * 2 ** (attempt - 1));
log.warn('Telegram setup failed, retrying', { label, attempt, delayMs: delay, err });
await new Promise((r) => setTimeout(r, delay));
}
}
throw lastErr;
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
function extractReplyContext(raw: Record<string, any>): ReplyContext | null {
if (!raw.reply_to_message) return null;
const reply = raw.reply_to_message;
return {
text: reply.text || reply.caption || '',
sender: reply.from?.first_name || reply.from?.username || 'Unknown',
};
}
/** Look up the bot username via Telegram getMe. Cached after first call. */
async function fetchBotUsername(token: string): Promise<string | null> {
try {
const res = await fetch(`https://api.telegram.org/bot${token}/getMe`);
const json = (await res.json()) as { ok: boolean; result?: { username?: string } };
return json.ok ? (json.result?.username ?? null) : null;
} catch (err) {
log.warn('Telegram getMe failed', { err });
return null;
}
}
function isGroupPlatformId(platformId: string): boolean {
// platformId is "telegram:<chatId>". Negative chat IDs are groups/channels.
const id = platformId.split(':').pop() ?? '';
return id.startsWith('-');
}
interface InboundFields {
text: string;
authorUserId: string | null;
}
function readInboundFields(message: InboundMessage): InboundFields {
if (message.kind !== 'chat-sdk' || !message.content || typeof message.content !== 'object') {
return { text: '', authorUserId: null };
}
const c = message.content as { text?: string; author?: { userId?: string } };
return { text: c.text ?? '', authorUserId: c.author?.userId ?? null };
}
/**
* Build an onInbound interceptor that consumes pairing codes before they
* reach the router. On match: records the chat + its paired user, promotes
* the user to owner if the instance has no owner yet, and short-circuits.
* On miss: forwards to the host.
*/
/**
* Send a one-shot confirmation back to the paired chat. Best-effort — failures
* are logged but never propagated, so a Telegram outage can't undo a successful
* pairing or trigger the interceptor's fail-open path.
*/
async function sendPairingConfirmation(token: string, platformId: string): Promise<void> {
const chatId = platformId.split(':').slice(1).join(':');
if (!chatId) return;
try {
const res = await fetch(`https://api.telegram.org/bot${token}/sendMessage`, {
method: 'POST',
headers: { 'content-type': 'application/json' },
body: JSON.stringify({
chat_id: chatId,
text: "Pairing success! I'm spinning up the agent now, you'll get a message from them shortly.",
}),
});
if (!res.ok) {
log.warn('Telegram pairing confirmation non-OK', { status: res.status });
}
} catch (err) {
log.warn('Telegram pairing confirmation failed', { err });
}
}
function createPairingInterceptor(
botUsernamePromise: Promise<string | null>,
hostOnInbound: ChannelSetup['onInbound'],
token: string,
): ChannelSetup['onInbound'] {
return async (platformId, threadId, message) => {
try {
const botUsername = await botUsernamePromise;
if (!botUsername) {
hostOnInbound(platformId, threadId, message);
return;
}
const { text, authorUserId } = readInboundFields(message);
if (!text) {
hostOnInbound(platformId, threadId, message);
return;
}
const consumed = await tryConsume({
text,
botUsername,
platformId,
isGroup: isGroupPlatformId(platformId),
adminUserId: authorUserId,
});
if (!consumed) {
hostOnInbound(platformId, threadId, message);
return;
}
// Pairing matched — record the chat and short-circuit so the
// code-bearing message never reaches an agent. Privilege is now a
// property of the paired user, not the chat: upsert the user, and if
// this instance has no owner yet, promote them to owner.
const existing = getMessagingGroupByPlatform('telegram', platformId);
if (existing) {
updateMessagingGroup(existing.id, {
is_group: consumed.consumed!.isGroup ? 1 : 0,
});
} else {
createMessagingGroup({
id: `mg-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`,
channel_type: 'telegram',
platform_id: platformId,
name: consumed.consumed!.name,
is_group: consumed.consumed!.isGroup ? 1 : 0,
unknown_sender_policy: 'strict',
created_at: new Date().toISOString(),
});
}
const pairedUserId = `telegram:${consumed.consumed!.adminUserId}`;
upsertUser({
id: pairedUserId,
kind: 'telegram',
display_name: null,
created_at: new Date().toISOString(),
});
let promotedToOwner = false;
if (!hasAnyOwner()) {
grantRole({
user_id: pairedUserId,
role: 'owner',
agent_group_id: null,
granted_by: null,
granted_at: new Date().toISOString(),
});
promotedToOwner = true;
}
log.info('Telegram pairing accepted — chat registered', {
platformId,
pairedUser: pairedUserId,
promotedToOwner,
intent: consumed.intent,
});
await sendPairingConfirmation(token, platformId);
} catch (err) {
log.error('Telegram pairing interceptor error', { err });
// Fail open: pass through so a pairing bug doesn't break normal traffic.
hostOnInbound(platformId, threadId, message);
}
};
}
registerChannelAdapter('telegram', {
factory: () => {
const env = readEnvFile(['TELEGRAM_BOT_TOKEN']);
if (!env.TELEGRAM_BOT_TOKEN) return null;
const token = env.TELEGRAM_BOT_TOKEN;
const telegramAdapter = createTelegramAdapter({
botToken: token,
mode: 'polling',
});
const bridge = createChatSdkBridge({
adapter: telegramAdapter,
concurrency: 'concurrent',
extractReplyContext,
supportsThreads: false,
transformOutboundText: sanitizeTelegramLegacyMarkdown,
});
const botUsernamePromise = fetchBotUsername(token);
const wrapped: ChannelAdapter = {
...bridge,
async setup(hostConfig: ChannelSetup) {
const intercepted: ChannelSetup = {
...hostConfig,
onInbound: createPairingInterceptor(botUsernamePromise, hostConfig.onInbound, token),
};
return withRetry(() => bridge.setup(intercepted), 'bridge.setup');
},
};
return wrapped;
},
});
+21
View File
@@ -0,0 +1,21 @@
/**
* Webex channel adapter (v2) — uses Chat SDK bridge.
* Self-registers on import.
*/
import { createWebexAdapter } from '@bitbasti/chat-adapter-webex';
import { readEnvFile } from '../env.js';
import { createChatSdkBridge } from './chat-sdk-bridge.js';
import { registerChannelAdapter } from './channel-registry.js';
registerChannelAdapter('webex', {
factory: () => {
const env = readEnvFile(['WEBEX_BOT_TOKEN', 'WEBEX_WEBHOOK_SECRET']);
if (!env.WEBEX_BOT_TOKEN) return null;
const webexAdapter = createWebexAdapter({
botToken: env.WEBEX_BOT_TOKEN,
webhookSecret: env.WEBEX_WEBHOOK_SECRET,
});
return createChatSdkBridge({ adapter: webexAdapter, concurrency: 'concurrent', supportsThreads: true });
},
});
+29
View File
@@ -0,0 +1,29 @@
/**
* WhatsApp Cloud API channel adapter (v2) — uses Chat SDK bridge.
* Uses the official Meta WhatsApp Business Cloud API (not Baileys).
* Self-registers on import.
*/
import { createWhatsAppAdapter } from '@chat-adapter/whatsapp';
import { readEnvFile } from '../env.js';
import { createChatSdkBridge } from './chat-sdk-bridge.js';
import { registerChannelAdapter } from './channel-registry.js';
registerChannelAdapter('whatsapp-cloud', {
factory: () => {
const env = readEnvFile([
'WHATSAPP_ACCESS_TOKEN',
'WHATSAPP_PHONE_NUMBER_ID',
'WHATSAPP_APP_SECRET',
'WHATSAPP_VERIFY_TOKEN',
]);
if (!env.WHATSAPP_ACCESS_TOKEN) return null;
const whatsappAdapter = createWhatsAppAdapter({
accessToken: env.WHATSAPP_ACCESS_TOKEN,
phoneNumberId: env.WHATSAPP_PHONE_NUMBER_ID,
appSecret: env.WHATSAPP_APP_SECRET,
verifyToken: env.WHATSAPP_VERIFY_TOKEN,
});
return createChatSdkBridge({ adapter: whatsappAdapter, concurrency: 'concurrent', supportsThreads: false });
},
});
+741
View File
@@ -0,0 +1,741 @@
/**
* WhatsApp channel adapter (v2) — native Baileys v6 implementation.
*
* Implements ChannelAdapter directly (no Chat SDK bridge) using
* @whiskeysockets/baileys v6 (stable). Ports proven v1 infrastructure:
* getMessage fallback, outgoing queue, group metadata cache, LID mapping,
* reconnection with backoff.
*
* Auth credentials persist in store/auth/. On first run:
* - If WHATSAPP_PHONE_NUMBER is set → pairing code (printed to log)
* - Otherwise → QR code (printed to log)
* Subsequent restarts reuse the saved session automatically.
*/
import fs from 'fs';
import path from 'path';
import pino from 'pino';
import {
makeWASocket,
Browsers,
DisconnectReason,
fetchLatestWaWebVersion,
downloadMediaMessage,
makeCacheableSignalKeyStore,
normalizeMessageContent,
useMultiFileAuthState,
} from '@whiskeysockets/baileys';
import type { GroupMetadata, WAMessageKey, WAMessage, WASocket } from '@whiskeysockets/baileys';
import { ASSISTANT_HAS_OWN_NUMBER, ASSISTANT_NAME, DATA_DIR } from '../config.js';
import { readEnvFile } from '../env.js';
import { log } from '../log.js';
import { registerChannelAdapter } from './channel-registry.js';
import { normalizeOptions, type NormalizedOption } from './ask-question.js';
import type {
ChannelAdapter,
ChannelSetup,
ConversationConfig,
ConversationInfo,
InboundMessage,
OutboundMessage,
} from './adapter.js';
// Baileys v6 bug: getPlatformId sends charCode (49) instead of enum value (1).
// Fixed in Baileys 7.x but not backported. Without this, pairing codes fail with
// "couldn't link device" because WhatsApp receives an invalid platform ID.
// Must use createRequire — ESM `import *` creates a read-only namespace.
// proto is not available as a named ESM export — use createRequire (same as v1)
import { createRequire } from 'module';
const _require = createRequire(import.meta.url);
const { proto } = _require('@whiskeysockets/baileys') as { proto: any };
try {
const _generics = _require('@whiskeysockets/baileys/lib/Utils/generics') as Record<string, unknown>;
_generics.getPlatformId = (browser: string): string => {
const platformType =
proto.DeviceProps.PlatformType[browser.toUpperCase() as keyof typeof proto.DeviceProps.PlatformType];
return platformType ? platformType.toString() : '1';
};
} catch {
// If CJS require fails (Node version mismatch), pairing codes may not work
// but QR auth will still function fine.
log.warn('Could not patch getPlatformId — pairing code auth may fail');
}
const baileysLogger = pino({ level: 'silent' });
const AUTH_DIR = path.join(process.cwd(), 'store', 'auth');
const GROUP_SYNC_INTERVAL_MS = 24 * 60 * 60 * 1000; // 24h
const GROUP_METADATA_CACHE_TTL_MS = 60_000; // 1 min for outbound sends
const SENT_MESSAGE_CACHE_MAX = 256;
const RECONNECT_DELAY_MS = 5000;
const PENDING_QUESTIONS_MAX = 64;
/** Normalize an option label to a slash command: "Approve" → "/approve" */
function optionToCommand(option: string): string {
return '/' + option.toLowerCase().replace(/\s+/g, '-');
}
// --- Markdown → WhatsApp formatting ---
interface TextSegment {
content: string;
isProtected: boolean;
}
/** Split text into code-block-protected and unprotected regions. */
function splitProtectedRegions(text: string): TextSegment[] {
const segments: TextSegment[] = [];
const codeBlockRegex = /```[\s\S]*?```|`[^`\n]+`/g;
let lastIndex = 0;
let match: RegExpExecArray | null;
while ((match = codeBlockRegex.exec(text)) !== null) {
if (match.index > lastIndex) {
segments.push({ content: text.slice(lastIndex, match.index), isProtected: false });
}
segments.push({ content: match[0], isProtected: true });
lastIndex = match.index + match[0].length;
}
if (lastIndex < text.length) {
segments.push({ content: text.slice(lastIndex), isProtected: false });
}
return segments;
}
/** Apply WhatsApp-native formatting to an unprotected text segment. */
function transformForWhatsApp(text: string): string {
// Order matters: italic before bold to avoid **bold** → *bold* → _bold_
// 1. Italic: *text* (not **) → _text_
text = text.replace(/(?<!\*)\*(?=[^\s*])([^*\n]+?)(?<=[^\s*])\*(?!\*)/g, '_$1_');
// 2. Bold: **text** → *text*
text = text.replace(/\*\*(?=[^\s*])([^*]+?)(?<=[^\s*])\*\*/g, '*$1*');
// 3. Headings: ## Title → *Title*
text = text.replace(/^#{1,6}\s+(.+)$/gm, '*$1*');
// 4. Links: [text](url) → text (url)
text = text.replace(/\[([^\]]+)\]\(([^)]+)\)/g, '$1 ($2)');
// 5. Horizontal rules: --- / *** / ___ → stripped
text = text.replace(/^(-{3,}|\*{3,}|_{3,})$/gm, '');
return text;
}
/** Convert Claude's markdown to WhatsApp-native formatting. */
function formatWhatsApp(text: string): string {
const segments = splitProtectedRegions(text);
return segments.map(({ content, isProtected }) => (isProtected ? content : transformForWhatsApp(content))).join('');
}
/** Map file extension to Baileys media message type. */
// eslint-disable-next-line @typescript-eslint/no-explicit-any
function buildMediaMessage(data: Buffer, filename: string, ext: string, caption?: string): any {
const imageExts = ['.jpg', '.jpeg', '.png', '.gif', '.webp'];
const videoExts = ['.mp4', '.mov', '.avi', '.mkv'];
const audioExts = ['.mp3', '.ogg', '.m4a', '.wav', '.aac', '.opus'];
if (imageExts.includes(ext)) {
return { image: data, caption, mimetype: `image/${ext.slice(1) === 'jpg' ? 'jpeg' : ext.slice(1)}` };
}
if (videoExts.includes(ext)) {
return { video: data, caption, mimetype: `video/${ext.slice(1)}` };
}
if (audioExts.includes(ext)) {
return { audio: data, mimetype: `audio/${ext.slice(1) === 'mp3' ? 'mpeg' : ext.slice(1)}` };
}
// Default: send as document
return { document: data, fileName: filename, caption, mimetype: 'application/octet-stream' };
}
registerChannelAdapter('whatsapp', {
factory: () => {
const env = readEnvFile(['WHATSAPP_PHONE_NUMBER', 'WHATSAPP_ENABLED']);
const phoneNumber = env.WHATSAPP_PHONE_NUMBER;
const authDir = AUTH_DIR;
// Skip if no existing auth, no phone number for pairing, and not explicitly enabled (QR mode)
const hasAuth = fs.existsSync(path.join(authDir, 'creds.json'));
if (!hasAuth && !phoneNumber && !env.WHATSAPP_ENABLED) return null;
fs.mkdirSync(authDir, { recursive: true });
// State
let sock: WASocket;
let connected = false;
let setupConfig: ChannelSetup;
let conversations: Map<string, ConversationConfig>;
// LID → phone JID mapping (WhatsApp's new ID system)
const lidToPhoneMap: Record<string, string> = {};
let botLidUser: string | undefined;
// Outgoing queue for messages sent while disconnected
const outgoingQueue: Array<{ jid: string; text: string }> = [];
let flushing = false;
// Sent message cache for retry/re-encrypt requests
const sentMessageCache = new Map<string, any>();
// Group metadata cache with TTL
const groupMetadataCache = new Map<string, { metadata: GroupMetadata; expiresAt: number }>();
// Pending questions: chatJid → { questionId, options }
// User replies with /approve, /reject, etc. to answer
const pendingQuestions = new Map<
string,
{
questionId: string;
options: NormalizedOption[];
}
>();
// Group sync tracking
let lastGroupSync = 0;
let groupSyncTimerStarted = false;
// First-connect promise
let resolveFirstOpen: (() => void) | undefined;
let rejectFirstOpen: ((err: Error) => void) | undefined;
// Pairing code file for the setup skill to poll
const pairingCodeFile = path.join(process.cwd(), 'store', 'pairing-code.txt');
// --- Helpers ---
function buildConversationMap(configs: ConversationConfig[]): Map<string, ConversationConfig> {
const map = new Map<string, ConversationConfig>();
for (const conv of configs) map.set(conv.platformId, conv);
return map;
}
function setLidPhoneMapping(lidUser: string, phoneJid: string): void {
if (lidToPhoneMap[lidUser] === phoneJid) return;
lidToPhoneMap[lidUser] = phoneJid;
// Cached group metadata depends on participant IDs — invalidate
groupMetadataCache.clear();
}
async function translateJid(jid: string): Promise<string> {
if (!jid.endsWith('@lid')) return jid;
const lidUser = jid.split('@')[0].split(':')[0];
const cached = lidToPhoneMap[lidUser];
if (cached) return cached;
// Query Baileys' signal repository
try {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const pn = await (sock.signalRepository as any)?.lidMapping?.getPNForLID(jid);
if (pn) {
const phoneJid = `${pn.split('@')[0].split(':')[0]}@s.whatsapp.net`;
setLidPhoneMapping(lidUser, phoneJid);
log.info('Translated LID to phone JID', { lidJid: jid, phoneJid });
return phoneJid;
}
} catch (err) {
log.debug('Failed to resolve LID via signalRepository', { jid, err });
}
return jid;
}
async function getNormalizedGroupMetadata(jid: string): Promise<GroupMetadata | undefined> {
if (!jid.endsWith('@g.us')) return undefined;
const cached = groupMetadataCache.get(jid);
if (cached && cached.expiresAt > Date.now()) return cached.metadata;
const metadata = await sock.groupMetadata(jid);
const participants = await Promise.all(
metadata.participants.map(async (p) => ({
...p,
id: await translateJid(p.id),
})),
);
const normalized = { ...metadata, participants };
groupMetadataCache.set(jid, {
metadata: normalized,
expiresAt: Date.now() + GROUP_METADATA_CACHE_TTL_MS,
});
return normalized;
}
async function syncGroupMetadata(force = false): Promise<void> {
if (!force && lastGroupSync && Date.now() - lastGroupSync < GROUP_SYNC_INTERVAL_MS) {
return;
}
try {
log.info('Syncing group metadata from WhatsApp...');
const groups = await sock.groupFetchAllParticipating();
let count = 0;
for (const [jid, metadata] of Object.entries(groups)) {
if (metadata.subject) {
setupConfig.onMetadata(jid, metadata.subject, true);
count++;
}
}
lastGroupSync = Date.now();
log.info('Group metadata synced', { count });
} catch (err) {
log.error('Failed to sync group metadata', { err });
}
}
async function flushOutgoingQueue(): Promise<void> {
if (flushing || outgoingQueue.length === 0) return;
flushing = true;
try {
log.info('Flushing outgoing message queue', { count: outgoingQueue.length });
while (outgoingQueue.length > 0) {
const item = outgoingQueue.shift()!;
const sent = await sock.sendMessage(item.jid, { text: item.text });
if (sent?.key?.id && sent.message) {
sentMessageCache.set(sent.key.id, sent.message);
}
}
} finally {
flushing = false;
}
}
/** Download media from an inbound message, save to /workspace/attachments/. */
// eslint-disable-next-line @typescript-eslint/no-explicit-any
async function downloadInboundMedia(
msg: WAMessage,
normalized: any,
): Promise<Array<{ type: string; name: string; localPath: string }>> {
const mediaTypes: Array<{ key: string; type: string; ext: string }> = [
{ key: 'imageMessage', type: 'image', ext: '.jpg' },
{ key: 'videoMessage', type: 'video', ext: '.mp4' },
{ key: 'audioMessage', type: 'audio', ext: '.ogg' },
{ key: 'documentMessage', type: 'document', ext: '' },
];
const results: Array<{ type: string; name: string; localPath: string }> = [];
for (const { key, type, ext } of mediaTypes) {
if (!normalized[key]) continue;
try {
const buffer = await downloadMediaMessage(msg, 'buffer', {});
const docFilename = normalized[key].fileName;
const filename = docFilename || `${type}-${Date.now()}${ext}`;
const attachDir = path.join(DATA_DIR, 'attachments');
fs.mkdirSync(attachDir, { recursive: true });
const filePath = path.join(attachDir, filename);
fs.writeFileSync(filePath, buffer);
results.push({ type, name: filename, localPath: `attachments/${filename}` });
log.info('Media downloaded', { type, filename });
} catch (err) {
log.warn('Failed to download media', { type, err });
}
}
return results;
}
async function sendRawMessage(jid: string, text: string): Promise<string | undefined> {
if (!connected) {
outgoingQueue.push({ jid, text });
log.info('WA disconnected, message queued', { jid, queueSize: outgoingQueue.length });
return;
}
try {
const sent = await sock.sendMessage(jid, { text });
if (sent?.key?.id && sent.message) {
sentMessageCache.set(sent.key.id, sent.message);
if (sentMessageCache.size > SENT_MESSAGE_CACHE_MAX) {
const oldest = sentMessageCache.keys().next().value!;
sentMessageCache.delete(oldest);
}
}
return sent?.key?.id ?? undefined;
} catch (err) {
outgoingQueue.push({ jid, text });
log.warn('Failed to send, message queued', { jid, err, queueSize: outgoingQueue.length });
return undefined;
}
}
// --- Socket creation ---
async function connectSocket(): Promise<void> {
const { state, saveCreds } = await useMultiFileAuthState(authDir);
const { version } = await fetchLatestWaWebVersion({}).catch((err) => {
log.warn('Failed to fetch latest WA Web version, using default', { err });
return { version: undefined };
});
sock = makeWASocket({
version,
auth: {
creds: state.creds,
keys: makeCacheableSignalKeyStore(state.keys, baileysLogger),
},
printQRInTerminal: false,
logger: baileysLogger,
browser: Browsers.macOS('Chrome'),
cachedGroupMetadata: async (jid: string) => getNormalizedGroupMetadata(jid),
getMessage: async (key: WAMessageKey) => {
// Check in-memory cache first (recently sent messages)
const cached = sentMessageCache.get(key.id || '');
if (cached) return cached;
// Return empty message to prevent indefinite "waiting for this message"
return proto.Message.fromObject({});
},
});
// Request pairing code if phone number is set and not yet registered
if (phoneNumber && !state.creds.registered) {
setTimeout(async () => {
try {
const code = await sock.requestPairingCode(phoneNumber);
log.info(`WhatsApp pairing code: ${code}`);
log.info('Enter in WhatsApp > Linked Devices > Link with phone number');
fs.writeFileSync(pairingCodeFile, code, 'utf-8');
} catch (err) {
log.error('Failed to request pairing code', { err });
}
}, 3000);
}
sock.ev.on('connection.update', (update) => {
const { connection, lastDisconnect, qr } = update;
if (qr && !phoneNumber) {
// QR code auth — print to terminal
(async () => {
try {
const QRCode = await import('qrcode');
const qrText = await QRCode.toString(qr, { type: 'terminal' });
log.info('WhatsApp QR code — scan with WhatsApp > Linked Devices:\n' + qrText);
} catch {
log.info('WhatsApp QR code (raw)', { qr });
}
})();
}
if (connection === 'close') {
connected = false;
const reason = (lastDisconnect?.error as { output?: { statusCode?: number } })?.output?.statusCode;
const shouldReconnect = reason !== DisconnectReason.loggedOut;
log.info('WhatsApp connection closed', { reason, shouldReconnect });
if (shouldReconnect) {
log.info('Reconnecting...');
connectSocket().catch((err) => {
log.error('Failed to reconnect, retrying in 5s', { err });
setTimeout(() => {
connectSocket().catch((err2) => {
log.error('Reconnection retry failed', { err: err2 });
});
}, RECONNECT_DELAY_MS);
});
} else {
log.info('WhatsApp logged out');
if (rejectFirstOpen) {
rejectFirstOpen(new Error('WhatsApp logged out'));
rejectFirstOpen = undefined;
resolveFirstOpen = undefined;
}
}
} else if (connection === 'open') {
connected = true;
log.info('Connected to WhatsApp');
// Clean up pairing code file after successful connection
try {
if (fs.existsSync(pairingCodeFile)) fs.unlinkSync(pairingCodeFile);
} catch {
/* ignore */
}
// Announce availability for presence updates
sock.sendPresenceUpdate('available').catch((err) => {
log.warn('Failed to send presence update', { err });
});
// Build LID → phone mapping from auth state
if (sock.user) {
const phoneUser = sock.user.id.split(':')[0];
const lidUser = sock.user.lid?.split(':')[0];
if (lidUser && phoneUser) {
setLidPhoneMapping(lidUser, `${phoneUser}@s.whatsapp.net`);
botLidUser = lidUser;
}
}
// Flush queued messages
flushOutgoingQueue().catch((err) => log.error('Failed to flush outgoing queue', { err }));
// Group sync
syncGroupMetadata().catch((err) => log.error('Initial group sync failed', { err }));
if (!groupSyncTimerStarted) {
groupSyncTimerStarted = true;
setInterval(() => {
syncGroupMetadata().catch((err) => log.error('Periodic group sync failed', { err }));
}, GROUP_SYNC_INTERVAL_MS);
}
// Signal first open
if (resolveFirstOpen) {
resolveFirstOpen();
resolveFirstOpen = undefined;
rejectFirstOpen = undefined;
}
}
});
sock.ev.on('creds.update', saveCreds);
// Phone number sharing events — update LID mapping
sock.ev.on('chats.phoneNumberShare', ({ lid, jid }) => {
const lidUser = lid?.split('@')[0].split(':')[0];
if (lidUser && jid) setLidPhoneMapping(lidUser, jid);
});
// Inbound messages
sock.ev.on('messages.upsert', async ({ messages }) => {
for (const msg of messages) {
try {
if (!msg.message) continue;
const normalized = normalizeMessageContent(msg.message);
if (!normalized) continue;
const rawJid = msg.key.remoteJid;
if (!rawJid || rawJid === 'status@broadcast') continue;
// Translate LID → phone JID
let chatJid = await translateJid(rawJid);
// eslint-disable-next-line @typescript-eslint/no-explicit-any
if (chatJid.endsWith('@lid') && (msg.key as any).senderPn) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const pn = (msg.key as any).senderPn as string;
const phoneJid = pn.includes('@') ? pn : `${pn}@s.whatsapp.net`;
setLidPhoneMapping(rawJid.split('@')[0].split(':')[0], phoneJid);
chatJid = phoneJid;
}
const timestamp = new Date(Number(msg.messageTimestamp) * 1000).toISOString();
const isGroup = chatJid.endsWith('@g.us');
// Notify metadata for group discovery
setupConfig.onMetadata(chatJid, undefined, isGroup);
// Only forward messages for registered conversations
if (!conversations.has(chatJid)) continue;
let content =
normalized.conversation ||
normalized.extendedTextMessage?.text ||
normalized.imageMessage?.caption ||
normalized.videoMessage?.caption ||
'';
// Normalize bot LID mention → assistant name for trigger matching
if (botLidUser && content.includes(`@${botLidUser}`)) {
content = content.replace(`@${botLidUser}`, `@${ASSISTANT_NAME}`);
}
// Download media attachments (images, video, audio, documents)
const attachments = await downloadInboundMedia(msg, normalized);
// Skip empty protocol messages (no text and no attachments)
if (!content && attachments.length === 0) continue;
const sender = msg.key.participant || msg.key.remoteJid || '';
const senderName = msg.pushName || sender.split('@')[0];
const fromMe = msg.key.fromMe || false;
// Filter bot's own messages to prevent echo loops.
// fromMe is always true for messages sent from this linked device,
// regardless of ASSISTANT_HAS_OWN_NUMBER mode.
if (fromMe) continue;
const isBotMessage = ASSISTANT_HAS_OWN_NUMBER ? false : content.startsWith(`${ASSISTANT_NAME}:`);
// Check if this reply answers a pending question via slash command
const pending = pendingQuestions.get(chatJid);
if (pending && content.startsWith('/')) {
const cmd = content.trim().toLowerCase();
const matched = pending.options.find((o) => optionToCommand(o.label) === cmd);
if (matched) {
const voterName = msg.pushName || sender.split('@')[0];
setupConfig.onAction(pending.questionId, matched.value, sender);
pendingQuestions.delete(chatJid);
await sendRawMessage(chatJid, `${matched.selectedLabel} by ${voterName}`);
log.info('Question answered', {
questionId: pending.questionId,
value: matched.value,
voterName,
});
continue; // Don't forward this reply to the agent
}
}
const inbound: InboundMessage = {
id: msg.key.id || `wa-${Date.now()}`,
kind: 'chat',
content: {
text: content,
sender,
senderName,
...(attachments.length > 0 && { attachments }),
fromMe,
isBotMessage,
isGroup,
chatJid,
},
timestamp,
};
// WhatsApp doesn't use threads — threadId is null
setupConfig.onInbound(chatJid, null, inbound);
} catch (err) {
log.error('Error processing incoming WhatsApp message', {
err,
remoteJid: msg.key?.remoteJid,
});
}
}
});
}
// --- ChannelAdapter implementation ---
const adapter: ChannelAdapter = {
name: 'whatsapp',
channelType: 'whatsapp',
supportsThreads: false,
async setup(hostConfig: ChannelSetup) {
setupConfig = hostConfig;
conversations = buildConversationMap(hostConfig.conversations);
// Connect and wait for first open
await new Promise<void>((resolve, reject) => {
resolveFirstOpen = resolve;
rejectFirstOpen = reject;
connectSocket().catch(reject);
});
log.info('WhatsApp adapter initialized');
},
async deliver(
platformId: string,
_threadId: string | null,
message: OutboundMessage,
): Promise<string | undefined> {
const content = message.content as Record<string, unknown>;
// Ask question → text with slash command replies
if (content.type === 'ask_question' && content.questionId && content.options) {
const questionId = content.questionId as string;
const title = content.title as string;
const question = content.question as string;
if (!title) {
log.error('ask_question missing required title — skipping delivery', { questionId });
return;
}
const options: NormalizedOption[] = normalizeOptions(content.options as never);
const optionLines = options.map((o) => ` ${optionToCommand(o.label)}`).join('\n');
const text = `*${title}*\n\n${question}\n\nReply with:\n${optionLines}`;
const msgId = await sendRawMessage(platformId, text);
if (msgId) {
pendingQuestions.set(platformId, { questionId, options });
if (pendingQuestions.size > PENDING_QUESTIONS_MAX) {
const oldest = pendingQuestions.keys().next().value!;
pendingQuestions.delete(oldest);
}
}
return msgId;
}
// Reaction → emoji on a message
if (content.operation === 'reaction' && content.messageId && content.emoji) {
try {
await sock.sendMessage(platformId, {
react: {
text: content.emoji as string,
key: { remoteJid: platformId, id: content.messageId as string, fromMe: false },
},
});
} catch (err) {
log.debug('Failed to send reaction', { platformId, err });
}
return;
}
// Normal message (with optional file attachments)
const text = (content.markdown as string) || (content.text as string);
const hasFiles = message.files && message.files.length > 0;
if (!text && !hasFiles) return;
// Send file attachments (first file gets the caption, rest are captionless)
if (hasFiles) {
let captionUsed = false;
for (const file of message.files!) {
try {
const ext = path.extname(file.filename).toLowerCase();
const caption = !captionUsed ? text : undefined;
const mediaMsg = buildMediaMessage(file.data, file.filename, ext, caption);
const sent = await sock.sendMessage(platformId, mediaMsg);
if (sent?.key?.id && sent.message) {
sentMessageCache.set(sent.key.id, sent.message);
}
if (caption) captionUsed = true;
} catch (err) {
log.error('Failed to send file', { platformId, filename: file.filename, err });
}
}
if (captionUsed) return; // Text was sent as caption
}
if (text) {
const formatted = formatWhatsApp(text);
const prefixed = ASSISTANT_HAS_OWN_NUMBER ? formatted : `${ASSISTANT_NAME}: ${formatted}`;
return sendRawMessage(platformId, prefixed);
}
},
async setTyping(platformId: string) {
try {
await sock.sendPresenceUpdate('composing', platformId);
} catch (err) {
log.debug('Failed to update typing status', { jid: platformId, err });
}
},
async teardown() {
connected = false;
sock?.end(undefined);
log.info('WhatsApp adapter shut down');
},
isConnected() {
return connected;
},
async syncConversations(): Promise<ConversationInfo[]> {
try {
const groups = await sock.groupFetchAllParticipating();
return Object.entries(groups)
.filter(([, m]) => m.subject)
.map(([jid, m]) => ({
platformId: jid,
name: m.subject,
isGroup: true,
}));
} catch (err) {
log.error('Failed to sync WhatsApp conversations', { err });
return [];
}
},
updateConversations(configs: ConversationConfig[]) {
conversations = buildConversationMap(configs);
},
};
return adapter;
},
});