mirror of
https://github.com/qwibitai/nanoclaw.git
synced 2026-06-04 10:14:47 +08:00
fix(channels): restore channel adapters deleted during v2 sync
Phase 1 boundary sync (5454bae) inadvertently re-applied v2's channel- adapter deletions (v2 commit437ba63"move channel adapters off v2 trunk") to the channels branch. 17 adapter files and their package.json deps were wiped: - discord, gchat, github, imessage, linear, matrix, resend, slack, teams, telegram + telegram-markdown-sanitize + telegram-pairing, webex, whatsapp, whatsapp-cloud - @chat-adapter/* packages, @whiskeysockets/baileys, @resend/..., qrcode, pino, chat-adapter-imessage, @beeper/... Caught when testing PR #3 — the service had no channels to bind to. Root cause: the sync merge commit message ("No channel adapter changes required") was wrong. I checked the registry surface but not file presence. Providers had the same failure mode during its sync, but there it surfaced immediately via a test import; channels has no test that imports adapter files directly, so it slipped through. Fix: restore src/channels/*.ts and the matching package.json / pnpm-lock.yaml entries from0d75ca2(last pre-sync commit). Tests pass (198/198 vs 137/137 pre-restore — the restored telegram-pairing and markdown-sanitize tests are back). Going forward: channel/provider branches that carry files v2 has deleted need `git checkout origin/<branch> -- <paths>` applied after any v2 sync merge that touches those paths, or a merge strategy that ignores deletions under the branch-owned directory. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
+18
-1
@@ -22,10 +22,27 @@
|
||||
"test:watch": "vitest"
|
||||
},
|
||||
"dependencies": {
|
||||
"@beeper/chat-adapter-matrix": "^0.2.0",
|
||||
"@bitbasti/chat-adapter-webex": "^0.1.0",
|
||||
"@chat-adapter/discord": "^4.24.0",
|
||||
"@chat-adapter/gchat": "^4.24.0",
|
||||
"@chat-adapter/github": "^4.24.0",
|
||||
"@chat-adapter/linear": "^4.26.0",
|
||||
"@chat-adapter/slack": "^4.24.0",
|
||||
"@chat-adapter/state-memory": "^4.24.0",
|
||||
"@chat-adapter/teams": "^4.24.0",
|
||||
"@chat-adapter/telegram": "^4.24.0",
|
||||
"@chat-adapter/whatsapp": "^4.24.0",
|
||||
"@onecli-sh/sdk": "^0.3.1",
|
||||
"@resend/chat-sdk-adapter": "^0.1.1",
|
||||
"@types/qrcode": "^1.5.6",
|
||||
"@whiskeysockets/baileys": "^6.17.16",
|
||||
"better-sqlite3": "11.10.0",
|
||||
"chat": "^4.24.0",
|
||||
"cron-parser": "5.5.0"
|
||||
"chat-adapter-imessage": "^0.1.1",
|
||||
"cron-parser": "5.5.0",
|
||||
"pino": "^9.6.0",
|
||||
"qrcode": "^1.5.4"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@eslint/js": "^9.35.0",
|
||||
|
||||
Generated
+3881
-10
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,38 @@
|
||||
/**
|
||||
* Discord channel adapter (v2) — uses Chat SDK bridge.
|
||||
* Self-registers on import.
|
||||
*/
|
||||
import { createDiscordAdapter } from '@chat-adapter/discord';
|
||||
|
||||
import { readEnvFile } from '../env.js';
|
||||
import { createChatSdkBridge, type ReplyContext } from './chat-sdk-bridge.js';
|
||||
import { registerChannelAdapter } from './channel-registry.js';
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
function extractReplyContext(raw: Record<string, any>): ReplyContext | null {
|
||||
if (!raw.referenced_message) return null;
|
||||
const reply = raw.referenced_message;
|
||||
return {
|
||||
text: reply.content || '',
|
||||
sender: reply.author?.global_name || reply.author?.username || 'Unknown',
|
||||
};
|
||||
}
|
||||
|
||||
registerChannelAdapter('discord', {
|
||||
factory: () => {
|
||||
const env = readEnvFile(['DISCORD_BOT_TOKEN', 'DISCORD_PUBLIC_KEY', 'DISCORD_APPLICATION_ID']);
|
||||
if (!env.DISCORD_BOT_TOKEN) return null;
|
||||
const discordAdapter = createDiscordAdapter({
|
||||
botToken: env.DISCORD_BOT_TOKEN,
|
||||
publicKey: env.DISCORD_PUBLIC_KEY,
|
||||
applicationId: env.DISCORD_APPLICATION_ID,
|
||||
});
|
||||
return createChatSdkBridge({
|
||||
adapter: discordAdapter,
|
||||
concurrency: 'concurrent',
|
||||
botToken: env.DISCORD_BOT_TOKEN,
|
||||
extractReplyContext,
|
||||
supportsThreads: true,
|
||||
});
|
||||
},
|
||||
});
|
||||
@@ -0,0 +1,20 @@
|
||||
/**
|
||||
* Google Chat channel adapter (v2) — uses Chat SDK bridge.
|
||||
* Self-registers on import.
|
||||
*/
|
||||
import { createGoogleChatAdapter } from '@chat-adapter/gchat';
|
||||
|
||||
import { readEnvFile } from '../env.js';
|
||||
import { createChatSdkBridge } from './chat-sdk-bridge.js';
|
||||
import { registerChannelAdapter } from './channel-registry.js';
|
||||
|
||||
registerChannelAdapter('gchat', {
|
||||
factory: () => {
|
||||
const env = readEnvFile(['GCHAT_CREDENTIALS']);
|
||||
if (!env.GCHAT_CREDENTIALS) return null;
|
||||
const gchatAdapter = createGoogleChatAdapter({
|
||||
credentials: JSON.parse(env.GCHAT_CREDENTIALS),
|
||||
});
|
||||
return createChatSdkBridge({ adapter: gchatAdapter, concurrency: 'concurrent', supportsThreads: true });
|
||||
},
|
||||
});
|
||||
@@ -0,0 +1,22 @@
|
||||
/**
|
||||
* GitHub channel adapter (v2) — uses Chat SDK bridge.
|
||||
* PR comment threads as conversations.
|
||||
* Self-registers on import.
|
||||
*/
|
||||
import { createGitHubAdapter } from '@chat-adapter/github';
|
||||
|
||||
import { readEnvFile } from '../env.js';
|
||||
import { createChatSdkBridge } from './chat-sdk-bridge.js';
|
||||
import { registerChannelAdapter } from './channel-registry.js';
|
||||
|
||||
registerChannelAdapter('github', {
|
||||
factory: () => {
|
||||
const env = readEnvFile(['GITHUB_TOKEN', 'GITHUB_WEBHOOK_SECRET']);
|
||||
if (!env.GITHUB_TOKEN) return null;
|
||||
const githubAdapter = createGitHubAdapter({
|
||||
token: env.GITHUB_TOKEN,
|
||||
webhookSecret: env.GITHUB_WEBHOOK_SECRET,
|
||||
});
|
||||
return createChatSdkBridge({ adapter: githubAdapter, concurrency: 'queue', supportsThreads: true });
|
||||
},
|
||||
});
|
||||
@@ -0,0 +1,29 @@
|
||||
/**
|
||||
* iMessage channel adapter (v2) — uses Chat SDK bridge.
|
||||
* Supports local mode (macOS Full Disk Access) and remote mode (Photon API).
|
||||
* Self-registers on import.
|
||||
*/
|
||||
import { createiMessageAdapter } from 'chat-adapter-imessage';
|
||||
|
||||
import { readEnvFile } from '../env.js';
|
||||
import { createChatSdkBridge } from './chat-sdk-bridge.js';
|
||||
import { registerChannelAdapter } from './channel-registry.js';
|
||||
|
||||
registerChannelAdapter('imessage', {
|
||||
factory: () => {
|
||||
const env = readEnvFile(['IMESSAGE_ENABLED', 'IMESSAGE_LOCAL', 'IMESSAGE_SERVER_URL', 'IMESSAGE_API_KEY']);
|
||||
const isLocal = env.IMESSAGE_LOCAL !== 'false';
|
||||
if (isLocal && !env.IMESSAGE_ENABLED) return null;
|
||||
if (!isLocal && !env.IMESSAGE_SERVER_URL) return null;
|
||||
const rawAdapter = createiMessageAdapter({
|
||||
local: isLocal,
|
||||
serverUrl: env.IMESSAGE_SERVER_URL,
|
||||
apiKey: env.IMESSAGE_API_KEY,
|
||||
});
|
||||
// Polyfill channelIdFromThreadId (community adapter doesn't implement it)
|
||||
const imessageAdapter = Object.assign(rawAdapter, {
|
||||
channelIdFromThreadId: (threadId: string) => threadId,
|
||||
});
|
||||
return createChatSdkBridge({ adapter: imessageAdapter, concurrency: 'concurrent', supportsThreads: false });
|
||||
},
|
||||
});
|
||||
@@ -0,0 +1,22 @@
|
||||
/**
|
||||
* Linear channel adapter (v2) — uses Chat SDK bridge.
|
||||
* Issue comment threads as conversations.
|
||||
* Self-registers on import.
|
||||
*/
|
||||
import { createLinearAdapter } from '@chat-adapter/linear';
|
||||
|
||||
import { readEnvFile } from '../env.js';
|
||||
import { createChatSdkBridge } from './chat-sdk-bridge.js';
|
||||
import { registerChannelAdapter } from './channel-registry.js';
|
||||
|
||||
registerChannelAdapter('linear', {
|
||||
factory: () => {
|
||||
const env = readEnvFile(['LINEAR_API_KEY', 'LINEAR_WEBHOOK_SECRET']);
|
||||
if (!env.LINEAR_API_KEY) return null;
|
||||
const linearAdapter = createLinearAdapter({
|
||||
apiKey: env.LINEAR_API_KEY,
|
||||
webhookSecret: env.LINEAR_WEBHOOK_SECRET,
|
||||
});
|
||||
return createChatSdkBridge({ adapter: linearAdapter, concurrency: 'queue', supportsThreads: true });
|
||||
},
|
||||
});
|
||||
@@ -0,0 +1,23 @@
|
||||
/**
|
||||
* Matrix channel adapter (v2) — uses Chat SDK bridge.
|
||||
* Self-registers on import.
|
||||
*/
|
||||
import { createMatrixAdapter } from '@beeper/chat-adapter-matrix';
|
||||
|
||||
import { readEnvFile } from '../env.js';
|
||||
import { createChatSdkBridge } from './chat-sdk-bridge.js';
|
||||
import { registerChannelAdapter } from './channel-registry.js';
|
||||
|
||||
registerChannelAdapter('matrix', {
|
||||
factory: () => {
|
||||
const env = readEnvFile(['MATRIX_BASE_URL', 'MATRIX_ACCESS_TOKEN', 'MATRIX_USER_ID', 'MATRIX_BOT_USERNAME']);
|
||||
if (!env.MATRIX_BASE_URL) return null;
|
||||
// Matrix adapter reads from process.env directly
|
||||
process.env.MATRIX_BASE_URL = env.MATRIX_BASE_URL;
|
||||
if (env.MATRIX_ACCESS_TOKEN) process.env.MATRIX_ACCESS_TOKEN = env.MATRIX_ACCESS_TOKEN;
|
||||
if (env.MATRIX_USER_ID) process.env.MATRIX_USER_ID = env.MATRIX_USER_ID;
|
||||
if (env.MATRIX_BOT_USERNAME) process.env.MATRIX_BOT_USERNAME = env.MATRIX_BOT_USERNAME;
|
||||
const matrixAdapter = createMatrixAdapter();
|
||||
return createChatSdkBridge({ adapter: matrixAdapter, concurrency: 'concurrent', supportsThreads: false });
|
||||
},
|
||||
});
|
||||
@@ -0,0 +1,23 @@
|
||||
/**
|
||||
* Resend (email) channel adapter (v2) — uses Chat SDK bridge.
|
||||
* Self-registers on import.
|
||||
*/
|
||||
import { createResendAdapter } from '@resend/chat-sdk-adapter';
|
||||
|
||||
import { readEnvFile } from '../env.js';
|
||||
import { createChatSdkBridge } from './chat-sdk-bridge.js';
|
||||
import { registerChannelAdapter } from './channel-registry.js';
|
||||
|
||||
registerChannelAdapter('resend', {
|
||||
factory: () => {
|
||||
const env = readEnvFile(['RESEND_API_KEY', 'RESEND_FROM_ADDRESS', 'RESEND_FROM_NAME', 'RESEND_WEBHOOK_SECRET']);
|
||||
if (!env.RESEND_API_KEY) return null;
|
||||
const resendAdapter = createResendAdapter({
|
||||
apiKey: env.RESEND_API_KEY,
|
||||
fromAddress: env.RESEND_FROM_ADDRESS,
|
||||
fromName: env.RESEND_FROM_NAME,
|
||||
webhookSecret: env.RESEND_WEBHOOK_SECRET,
|
||||
});
|
||||
return createChatSdkBridge({ adapter: resendAdapter, concurrency: 'queue', supportsThreads: false });
|
||||
},
|
||||
});
|
||||
@@ -0,0 +1,21 @@
|
||||
/**
|
||||
* Slack channel adapter (v2) — uses Chat SDK bridge.
|
||||
* Self-registers on import.
|
||||
*/
|
||||
import { createSlackAdapter } from '@chat-adapter/slack';
|
||||
|
||||
import { readEnvFile } from '../env.js';
|
||||
import { createChatSdkBridge } from './chat-sdk-bridge.js';
|
||||
import { registerChannelAdapter } from './channel-registry.js';
|
||||
|
||||
registerChannelAdapter('slack', {
|
||||
factory: () => {
|
||||
const env = readEnvFile(['SLACK_BOT_TOKEN', 'SLACK_SIGNING_SECRET']);
|
||||
if (!env.SLACK_BOT_TOKEN) return null;
|
||||
const slackAdapter = createSlackAdapter({
|
||||
botToken: env.SLACK_BOT_TOKEN,
|
||||
signingSecret: env.SLACK_SIGNING_SECRET,
|
||||
});
|
||||
return createChatSdkBridge({ adapter: slackAdapter, concurrency: 'concurrent', supportsThreads: true });
|
||||
},
|
||||
});
|
||||
@@ -0,0 +1,23 @@
|
||||
/**
|
||||
* Microsoft Teams channel adapter (v2) — uses Chat SDK bridge.
|
||||
* Self-registers on import.
|
||||
*/
|
||||
import { createTeamsAdapter } from '@chat-adapter/teams';
|
||||
|
||||
import { readEnvFile } from '../env.js';
|
||||
import { createChatSdkBridge } from './chat-sdk-bridge.js';
|
||||
import { registerChannelAdapter } from './channel-registry.js';
|
||||
|
||||
registerChannelAdapter('teams', {
|
||||
factory: () => {
|
||||
const env = readEnvFile(['TEAMS_APP_ID', 'TEAMS_APP_PASSWORD', 'TEAMS_APP_TENANT_ID', 'TEAMS_APP_TYPE']);
|
||||
if (!env.TEAMS_APP_ID) return null;
|
||||
const teamsAdapter = createTeamsAdapter({
|
||||
appId: env.TEAMS_APP_ID,
|
||||
appPassword: env.TEAMS_APP_PASSWORD,
|
||||
appType: (env.TEAMS_APP_TYPE as 'SingleTenant' | 'MultiTenant') || undefined,
|
||||
appTenantId: env.TEAMS_APP_TENANT_ID || undefined,
|
||||
});
|
||||
return createChatSdkBridge({ adapter: teamsAdapter, concurrency: 'concurrent', supportsThreads: true });
|
||||
},
|
||||
});
|
||||
@@ -0,0 +1,68 @@
|
||||
import { describe, it, expect } from 'vitest';
|
||||
import { sanitizeTelegramLegacyMarkdown } from './telegram-markdown-sanitize.js';
|
||||
|
||||
describe('sanitizeTelegramLegacyMarkdown', () => {
|
||||
it('downgrades CommonMark **bold** to legacy *bold*', () => {
|
||||
expect(sanitizeTelegramLegacyMarkdown('**Host path**')).toBe('*Host path*');
|
||||
});
|
||||
|
||||
it('downgrades CommonMark __bold__ to legacy _italic_', () => {
|
||||
expect(sanitizeTelegramLegacyMarkdown('__label__')).toBe('_label_');
|
||||
});
|
||||
|
||||
it('leaves balanced legacy *bold* and _italic_ alone', () => {
|
||||
expect(sanitizeTelegramLegacyMarkdown('a *b* c _d_ e')).toBe('a *b* c _d_ e');
|
||||
});
|
||||
|
||||
it('preserves inline code spans untouched', () => {
|
||||
const input = 'see `file_name.py` and `**not bold**` here';
|
||||
expect(sanitizeTelegramLegacyMarkdown(input)).toBe(input);
|
||||
});
|
||||
|
||||
it('preserves fenced code blocks untouched', () => {
|
||||
const input = '```\nfoo_bar **baz**\n```';
|
||||
expect(sanitizeTelegramLegacyMarkdown(input)).toBe(input);
|
||||
});
|
||||
|
||||
it('strips formatting chars on odd delimiter count (unbalanced *)', () => {
|
||||
expect(sanitizeTelegramLegacyMarkdown('a * b *c*')).toBe('a b c');
|
||||
});
|
||||
|
||||
it('strips formatting chars on odd delimiter count (unbalanced _)', () => {
|
||||
expect(sanitizeTelegramLegacyMarkdown('file_name has _one italic_')).toBe('filename has one italic');
|
||||
});
|
||||
|
||||
it('strips brackets when unbalanced', () => {
|
||||
expect(sanitizeTelegramLegacyMarkdown('see [docs here')).toBe('see docs here');
|
||||
});
|
||||
|
||||
it('leaves matched brackets (e.g. links) alone when counts balance', () => {
|
||||
const input = 'see [docs](https://example.com) for more';
|
||||
expect(sanitizeTelegramLegacyMarkdown(input)).toBe(input);
|
||||
});
|
||||
|
||||
it('fixes the real failing message', () => {
|
||||
const input =
|
||||
'Sure! What do you want to mount, and where should it appear inside the container?\n\n' +
|
||||
'- **Host path** (on your machine): e.g. `~/projects/webapp`\n' +
|
||||
'- **Container path**: e.g. `workspace/webapp`\n' +
|
||||
'- **Read-only or read-write?**';
|
||||
const out = sanitizeTelegramLegacyMarkdown(input);
|
||||
expect(out).not.toContain('**');
|
||||
expect(out).toContain('*Host path*');
|
||||
expect(out).toContain('`~/projects/webapp`');
|
||||
expect((out.match(/\*/g) ?? []).length % 2).toBe(0);
|
||||
});
|
||||
|
||||
it('is a no-op on empty string', () => {
|
||||
expect(sanitizeTelegramLegacyMarkdown('')).toBe('');
|
||||
});
|
||||
|
||||
it('replaces dash list bullets with • so the adapter does not re-emit `*` markers', () => {
|
||||
expect(sanitizeTelegramLegacyMarkdown('- one\n- two')).toBe('• one\n• two');
|
||||
});
|
||||
|
||||
it('preserves indented list structure', () => {
|
||||
expect(sanitizeTelegramLegacyMarkdown(' - nested')).toBe(' • nested');
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,50 @@
|
||||
/**
|
||||
* Sanitize outbound text for Telegram's legacy `Markdown` parse mode.
|
||||
*
|
||||
* WORKAROUND: The @chat-adapter/telegram adapter hardcodes parse_mode=Markdown
|
||||
* (legacy) but its converter emits CommonMark. Messages with `**bold**`, odd
|
||||
* delimiter counts, or malformed links are rejected by Telegram and dropped
|
||||
* after retries. Remove this once upstream ships real mode-aware conversion
|
||||
* (vercel/chat PR #367 adds the knob; a follow-up is needed for the converter).
|
||||
*/
|
||||
|
||||
const CODE_PATTERN = /```[\s\S]*?```|`[^`\n]*`/g;
|
||||
const PLACEHOLDER_PREFIX = '\x00CODE';
|
||||
const PLACEHOLDER_SUFFIX = '\x00';
|
||||
|
||||
export function sanitizeTelegramLegacyMarkdown(input: string): string {
|
||||
if (!input) return input;
|
||||
|
||||
const codeSegments: string[] = [];
|
||||
let text = input.replace(CODE_PATTERN, (m) => {
|
||||
codeSegments.push(m);
|
||||
return `${PLACEHOLDER_PREFIX}${codeSegments.length - 1}${PLACEHOLDER_SUFFIX}`;
|
||||
});
|
||||
|
||||
// The adapter re-parses and re-stringifies markdown before sending, which
|
||||
// rewrites `- item` list bullets into `* item` — injecting unbalanced
|
||||
// asterisks that Telegram's legacy Markdown parser then rejects. Replace
|
||||
// list bullets with a plain Unicode bullet so the adapter treats the line
|
||||
// as prose.
|
||||
text = text.replace(/^(\s*)[-+]\s+/gm, '$1• ');
|
||||
|
||||
text = text.replace(/\*\*([^*\n]+?)\*\*/g, '*$1*');
|
||||
text = text.replace(/__([^_\n]+?)__/g, '_$1_');
|
||||
|
||||
const starCount = (text.match(/\*/g) ?? []).length;
|
||||
const underCount = (text.match(/_/g) ?? []).length;
|
||||
if (starCount % 2 !== 0 || underCount % 2 !== 0) {
|
||||
text = text.replace(/[*_]/g, '');
|
||||
}
|
||||
|
||||
const openBrackets = (text.match(/\[/g) ?? []).length;
|
||||
const closeBrackets = (text.match(/\]/g) ?? []).length;
|
||||
if (openBrackets !== closeBrackets) {
|
||||
text = text.replace(/[[\]]/g, '');
|
||||
}
|
||||
|
||||
return text.replace(
|
||||
new RegExp(`${PLACEHOLDER_PREFIX}(\\d+)${PLACEHOLDER_SUFFIX}`, 'g'),
|
||||
(_, i) => codeSegments[Number(i)],
|
||||
);
|
||||
}
|
||||
@@ -0,0 +1,248 @@
|
||||
import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest';
|
||||
import fs from 'fs';
|
||||
import path from 'path';
|
||||
import os from 'os';
|
||||
|
||||
vi.mock('../log.js', () => ({ log: { info: vi.fn(), warn: vi.fn(), error: vi.fn(), debug: vi.fn() } }));
|
||||
|
||||
import {
|
||||
createPairing,
|
||||
tryConsume,
|
||||
getStatus,
|
||||
getPairing,
|
||||
waitForPairing,
|
||||
extractCode,
|
||||
extractAddressedText,
|
||||
_setStorePathForTest,
|
||||
_resetForTest,
|
||||
} from './telegram-pairing.js';
|
||||
|
||||
let tmpDir: string;
|
||||
|
||||
beforeEach(() => {
|
||||
tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), 'tg-pair-'));
|
||||
_setStorePathForTest(path.join(tmpDir, 'pairings.json'));
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
_resetForTest();
|
||||
_setStorePathForTest(null);
|
||||
fs.rmSync(tmpDir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
describe('extractAddressedText', () => {
|
||||
it('strips @botname prefix', () => {
|
||||
expect(extractAddressedText('@nanobot 1234', 'nanobot')).toBe('1234');
|
||||
});
|
||||
it('is case-insensitive', () => {
|
||||
expect(extractAddressedText('@NanoBot hello', 'nanobot')).toBe('hello');
|
||||
});
|
||||
it('returns null when not addressed', () => {
|
||||
expect(extractAddressedText('hello 1234', 'nanobot')).toBeNull();
|
||||
});
|
||||
it('returns null when address is mid-text', () => {
|
||||
expect(extractAddressedText('hi @nanobot 1234', 'nanobot')).toBeNull();
|
||||
});
|
||||
});
|
||||
|
||||
describe('extractCode', () => {
|
||||
it('accepts a bare 4-digit code', () => {
|
||||
expect(extractCode('0349', 'nanobot')).toBe('0349');
|
||||
});
|
||||
it('accepts 4-digit code after @botname', () => {
|
||||
expect(extractCode('@nanobot 0042', 'nanobot')).toBe('0042');
|
||||
});
|
||||
it('rejects non-4-digit numbers', () => {
|
||||
expect(extractCode('@nanobot 12345', 'nanobot')).toBeNull();
|
||||
expect(extractCode('@nanobot 12', 'nanobot')).toBeNull();
|
||||
expect(extractCode('12345', 'nanobot')).toBeNull();
|
||||
});
|
||||
it('rejects loose matches with surrounding text', () => {
|
||||
expect(extractCode('my pin is 0349', 'nanobot')).toBeNull();
|
||||
expect(extractCode('0349 thanks', 'nanobot')).toBeNull();
|
||||
});
|
||||
});
|
||||
|
||||
describe('createPairing', () => {
|
||||
it('generates a 4-digit code', async () => {
|
||||
const r = await createPairing('main');
|
||||
expect(r.code).toMatch(/^\d{4}$/);
|
||||
expect(r.status).toBe('pending');
|
||||
});
|
||||
|
||||
it('does not collide with active codes', async () => {
|
||||
const codes = new Set<string>();
|
||||
for (let i = 0; i < 20; i++) {
|
||||
const r = await createPairing('main');
|
||||
expect(codes.has(r.code)).toBe(false);
|
||||
codes.add(r.code);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe('tryConsume', () => {
|
||||
it('matches and marks consumed', async () => {
|
||||
const r = await createPairing('main');
|
||||
const consumed = await tryConsume({
|
||||
text: `@nanobot ${r.code}`,
|
||||
botUsername: 'nanobot',
|
||||
platformId: 'telegram:123',
|
||||
isGroup: false,
|
||||
adminUserId: 'u1',
|
||||
});
|
||||
expect(consumed).not.toBeNull();
|
||||
expect(consumed!.status).toBe('consumed');
|
||||
expect(consumed!.consumed?.platformId).toBe('telegram:123');
|
||||
expect(consumed!.consumed?.adminUserId).toBe('u1');
|
||||
expect(getStatus(r.code)).toBe('consumed');
|
||||
});
|
||||
|
||||
it('returns null on no match (silent drop)', async () => {
|
||||
await createPairing('main');
|
||||
const out = await tryConsume({
|
||||
text: '@nanobot 9999',
|
||||
botUsername: 'nanobot',
|
||||
platformId: 'x',
|
||||
isGroup: false,
|
||||
});
|
||||
expect(out).toBeNull();
|
||||
});
|
||||
|
||||
it('matches a bare code without @botname addressing', async () => {
|
||||
const r = await createPairing('main');
|
||||
const out = await tryConsume({
|
||||
text: r.code,
|
||||
botUsername: 'nanobot',
|
||||
platformId: 'x',
|
||||
isGroup: false,
|
||||
});
|
||||
expect(out).not.toBeNull();
|
||||
expect(out!.status).toBe('consumed');
|
||||
});
|
||||
|
||||
it('cannot be consumed twice', async () => {
|
||||
const r = await createPairing('main');
|
||||
await tryConsume({ text: `@b ${r.code}`, botUsername: 'b', platformId: 'p', isGroup: false });
|
||||
const second = await tryConsume({ text: `@b ${r.code}`, botUsername: 'b', platformId: 'p', isGroup: false });
|
||||
expect(second).toBeNull();
|
||||
});
|
||||
|
||||
it('cannot consume an invalidated pairing', async () => {
|
||||
const r = await createPairing('main');
|
||||
// Invalidate by sending a wrong code
|
||||
await tryConsume({ text: '9999', botUsername: 'b', platformId: 'p', isGroup: false });
|
||||
const out = await tryConsume({ text: `@b ${r.code}`, botUsername: 'b', platformId: 'p', isGroup: false });
|
||||
expect(out).toBeNull();
|
||||
expect(getStatus(r.code)).toBe('invalidated');
|
||||
});
|
||||
});
|
||||
|
||||
describe('getStatus', () => {
|
||||
it('returns unknown for missing codes', () => {
|
||||
expect(getStatus('0000')).toBe('unknown');
|
||||
});
|
||||
});
|
||||
|
||||
describe('waitForPairing', () => {
|
||||
it('resolves when consumed', async () => {
|
||||
const r = await createPairing('main');
|
||||
const p = waitForPairing(r.code, { pollMs: 50 });
|
||||
setTimeout(() => {
|
||||
tryConsume({ text: `@b ${r.code}`, botUsername: 'b', platformId: 'tg:1', isGroup: true, name: 'Group' });
|
||||
}, 100);
|
||||
const consumed = await p;
|
||||
expect(consumed.status).toBe('consumed');
|
||||
expect(consumed.consumed?.name).toBe('Group');
|
||||
});
|
||||
|
||||
it('rejects on invalidation', async () => {
|
||||
const r = await createPairing('main');
|
||||
const waiter = waitForPairing(r.code, { pollMs: 30 });
|
||||
setTimeout(() => {
|
||||
tryConsume({ text: '0000', botUsername: 'b', platformId: 'tg:1', isGroup: false });
|
||||
}, 60);
|
||||
await expect(waiter).rejects.toThrow(/invalidated/);
|
||||
});
|
||||
});
|
||||
|
||||
describe('replace-by-default', () => {
|
||||
it('supersedes an existing pending pairing with the same intent', async () => {
|
||||
const first = await createPairing('main');
|
||||
const second = await createPairing('main');
|
||||
expect(getStatus(first.code)).toBe('invalidated');
|
||||
expect(getStatus(second.code)).toBe('pending');
|
||||
});
|
||||
|
||||
it('does not supersede pairings with a different intent', async () => {
|
||||
const a = await createPairing({ kind: 'wire-to', folder: 'work' });
|
||||
const b = await createPairing({ kind: 'wire-to', folder: 'side' });
|
||||
expect(getStatus(a.code)).toBe('pending');
|
||||
expect(getStatus(b.code)).toBe('pending');
|
||||
});
|
||||
|
||||
it('causes waitForPairing on the old code to reject as invalidated', async () => {
|
||||
const first = await createPairing('main');
|
||||
const waiter = waitForPairing(first.code, { pollMs: 30 });
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
await createPairing('main');
|
||||
await expect(waiter).rejects.toThrow(/invalidated/);
|
||||
});
|
||||
});
|
||||
|
||||
describe('attempt tracking', () => {
|
||||
it('fires onAttempt for a wrong code, invalidates the pairing, and rejects the waiter', async () => {
|
||||
const r = await createPairing('main');
|
||||
const attempts: string[] = [];
|
||||
const waiter = waitForPairing(r.code, {
|
||||
pollMs: 30,
|
||||
onAttempt: (a) => attempts.push(a.candidate),
|
||||
});
|
||||
setTimeout(() => {
|
||||
tryConsume({ text: '9999', botUsername: 'b', platformId: 'tg:1', isGroup: false });
|
||||
}, 60);
|
||||
await expect(waiter).rejects.toThrow(/invalidated by wrong code \(9999\)/);
|
||||
expect(attempts).toEqual(['9999']);
|
||||
expect(getStatus(r.code)).toBe('invalidated');
|
||||
});
|
||||
|
||||
it('a correct code consumes without firing onAttempt', async () => {
|
||||
const r = await createPairing('main');
|
||||
const attempts: string[] = [];
|
||||
const waiter = waitForPairing(r.code, {
|
||||
pollMs: 30,
|
||||
onAttempt: (a) => attempts.push(a.candidate),
|
||||
});
|
||||
setTimeout(() => {
|
||||
tryConsume({ text: r.code, botUsername: 'b', platformId: 'tg:1', isGroup: false });
|
||||
}, 60);
|
||||
const consumed = await waiter;
|
||||
expect(consumed.status).toBe('consumed');
|
||||
expect(attempts).toEqual([]);
|
||||
});
|
||||
|
||||
it('ignores non-code messages and keeps the pairing pending', async () => {
|
||||
const r = await createPairing('main');
|
||||
await tryConsume({ text: 'hello there', botUsername: 'b', platformId: 'p', isGroup: false });
|
||||
const after = getPairing(r.code);
|
||||
expect(after?.status).toBe('pending');
|
||||
expect(after?.attempts ?? []).toHaveLength(0);
|
||||
});
|
||||
|
||||
it('a second code attempt after invalidation does not match', async () => {
|
||||
const r = await createPairing('main');
|
||||
await tryConsume({ text: '9999', botUsername: 'b', platformId: 'p', isGroup: false });
|
||||
const retry = await tryConsume({ text: r.code, botUsername: 'b', platformId: 'p', isGroup: false });
|
||||
expect(retry).toBeNull();
|
||||
});
|
||||
});
|
||||
|
||||
describe('intent passthrough', () => {
|
||||
it('preserves wire-to and new-agent intents', async () => {
|
||||
const a = await createPairing({ kind: 'wire-to', folder: 'work' });
|
||||
const b = await createPairing({ kind: 'new-agent', folder: 'side' });
|
||||
const ca = await tryConsume({ text: `@b ${a.code}`, botUsername: 'b', platformId: 'p1', isGroup: true });
|
||||
const cb = await tryConsume({ text: `@b ${b.code}`, botUsername: 'b', platformId: 'p2', isGroup: true });
|
||||
expect(ca!.intent).toEqual({ kind: 'wire-to', folder: 'work' });
|
||||
expect(cb!.intent).toEqual({ kind: 'new-agent', folder: 'side' });
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,339 @@
|
||||
/**
|
||||
* Telegram pairing — proves the operator owns the chat they're registering.
|
||||
*
|
||||
* BotFather hands out tokens with no user binding, so anyone who guesses the
|
||||
* bot's username can DM it. Pairing closes that gap: setup creates a one-time
|
||||
* 4-digit code and the operator echoes it back from the chat they want to
|
||||
* register. The message must be exactly the 4 digits (optionally prefixed by
|
||||
* `@botname ` for groups with privacy ON) — arbitrary messages that happen to
|
||||
* contain a 4-digit number do NOT match. The inbound interceptor in
|
||||
* telegram.ts matches the code, records the chat, upserts the paired user,
|
||||
* and (if no owner exists yet) promotes them to owner — all before the
|
||||
* message ever reaches the router.
|
||||
*
|
||||
* Storage is a JSON file at data/telegram-pairings.json — single-process,
|
||||
* read-modify-write under an in-process mutex.
|
||||
*/
|
||||
import fs from 'fs';
|
||||
import path from 'path';
|
||||
|
||||
import { DATA_DIR } from '../config.js';
|
||||
import { log } from '../log.js';
|
||||
|
||||
export type PairingIntent = 'main' | { kind: 'wire-to'; folder: string } | { kind: 'new-agent'; folder: string };
|
||||
export type PairingStatus = 'pending' | 'consumed' | 'invalidated' | 'unknown';
|
||||
|
||||
export interface ConsumedDetails {
|
||||
platformId: string;
|
||||
isGroup: boolean;
|
||||
name: string | null;
|
||||
adminUserId: string | null;
|
||||
consumedAt: string;
|
||||
}
|
||||
|
||||
export interface PairingAttempt {
|
||||
candidate: string;
|
||||
platformId: string;
|
||||
at: string;
|
||||
matched: boolean;
|
||||
}
|
||||
|
||||
export interface PairingRecord {
|
||||
code: string;
|
||||
intent: PairingIntent;
|
||||
createdAt: string;
|
||||
status: Exclude<PairingStatus, 'unknown'>;
|
||||
consumed?: ConsumedDetails;
|
||||
/** Recent pairing attempts observed while this record was pending. Capped. */
|
||||
attempts?: PairingAttempt[];
|
||||
}
|
||||
|
||||
const MAX_ATTEMPTS_PER_RECORD = 10;
|
||||
|
||||
function intentEquals(a: PairingIntent, b: PairingIntent): boolean {
|
||||
if (a === 'main' || b === 'main') return a === b;
|
||||
return a.kind === b.kind && a.folder === b.folder;
|
||||
}
|
||||
|
||||
interface Store {
|
||||
pairings: PairingRecord[];
|
||||
}
|
||||
|
||||
/** Pairing codes do not expire — they are consumed on match or invalidated by wrong guesses. */
|
||||
const FILE_NAME = 'telegram-pairings.json';
|
||||
|
||||
let storePathOverride: string | null = null;
|
||||
export function _setStorePathForTest(p: string | null): void {
|
||||
storePathOverride = p;
|
||||
}
|
||||
|
||||
function storePath(): string {
|
||||
return storePathOverride ?? path.join(DATA_DIR, FILE_NAME);
|
||||
}
|
||||
|
||||
let mutex: Promise<unknown> = Promise.resolve();
|
||||
function withLock<T>(fn: () => Promise<T> | T): Promise<T> {
|
||||
const next = mutex.then(() => fn());
|
||||
mutex = next.catch(() => {});
|
||||
return next;
|
||||
}
|
||||
|
||||
function readStore(): Store {
|
||||
try {
|
||||
const raw = fs.readFileSync(storePath(), 'utf8');
|
||||
const parsed = JSON.parse(raw) as Store;
|
||||
if (!Array.isArray(parsed.pairings)) return { pairings: [] };
|
||||
return parsed;
|
||||
} catch {
|
||||
return { pairings: [] };
|
||||
}
|
||||
}
|
||||
|
||||
function writeStore(store: Store): void {
|
||||
const p = storePath();
|
||||
fs.mkdirSync(path.dirname(p), { recursive: true });
|
||||
const tmp = `${p}.tmp`;
|
||||
fs.writeFileSync(tmp, JSON.stringify(store, null, 2));
|
||||
fs.renameSync(tmp, p);
|
||||
}
|
||||
|
||||
/** Clean up old consumed/invalidated records (keep last 50). */
|
||||
function sweep(store: Store): boolean {
|
||||
if (store.pairings.length <= 50) return false;
|
||||
store.pairings = store.pairings.slice(-50);
|
||||
return true;
|
||||
}
|
||||
|
||||
function generateCode(active: Set<string>): string {
|
||||
// 4-digit numeric, zero-padded. 10k space, fine for one-at-a-time intents.
|
||||
for (let i = 0; i < 50; i++) {
|
||||
const code = Math.floor(Math.random() * 10000)
|
||||
.toString()
|
||||
.padStart(4, '0');
|
||||
if (!active.has(code)) return code;
|
||||
}
|
||||
throw new Error('Could not allocate a free pairing code (too many active).');
|
||||
}
|
||||
|
||||
export async function createPairing(intent: PairingIntent): Promise<PairingRecord> {
|
||||
return withLock(() => {
|
||||
const store = readStore();
|
||||
sweep(store);
|
||||
// Replace-by-default: a new pairing for an intent supersedes any existing
|
||||
// pending pairing for the same intent. Old waitForPairing calls observe
|
||||
// `invalidated` and exit on their own.
|
||||
for (const r of store.pairings) {
|
||||
if (r.status === 'pending' && intentEquals(r.intent, intent)) {
|
||||
r.status = 'invalidated';
|
||||
log.info('Pairing superseded by new request', { code: r.code, intent });
|
||||
}
|
||||
}
|
||||
const active = new Set(store.pairings.filter((r) => r.status === 'pending').map((r) => r.code));
|
||||
const record: PairingRecord = {
|
||||
code: generateCode(active),
|
||||
intent,
|
||||
createdAt: new Date().toISOString(),
|
||||
status: 'pending',
|
||||
};
|
||||
store.pairings.push(record);
|
||||
writeStore(store);
|
||||
log.info('Pairing created', { code: record.code, intent });
|
||||
return record;
|
||||
});
|
||||
}
|
||||
|
||||
export interface ConsumeInput {
|
||||
text: string;
|
||||
botUsername: string;
|
||||
platformId: string;
|
||||
isGroup: boolean;
|
||||
name?: string | null;
|
||||
adminUserId?: string | null;
|
||||
}
|
||||
|
||||
/** Strip leading @botname and return the trimmed remainder, or null if not addressed. */
|
||||
export function extractAddressedText(text: string, botUsername: string): string | null {
|
||||
const trimmed = text.trim();
|
||||
const re = new RegExp(`^@${botUsername.replace(/[.*+?^${}()|[\\]\\\\]/g, '\\$&')}\\b`, 'i');
|
||||
const m = trimmed.match(re);
|
||||
if (!m) return null;
|
||||
return trimmed.slice(m[0].length).trim();
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract a pairing code from an inbound message. The message must be exactly
|
||||
* 4 digits (optionally prefixed by `@botname `) — loose matches like
|
||||
* "my pin is 1234" are rejected to avoid false positives from chatter.
|
||||
*/
|
||||
export function extractCode(text: string, botUsername: string): string | null {
|
||||
const addressed = extractAddressedText(text, botUsername);
|
||||
const candidate = (addressed !== null ? addressed : text).trim();
|
||||
const m = candidate.match(/^(\d{4})$/);
|
||||
return m ? m[1] : null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to match an inbound message against a pending pairing. On match,
|
||||
* marks the pairing consumed atomically and returns the record. Returns
|
||||
* null on no match or expiry (silent drop).
|
||||
*/
|
||||
export async function tryConsume(input: ConsumeInput): Promise<PairingRecord | null> {
|
||||
const code = extractCode(input.text, input.botUsername);
|
||||
if (!code) return null;
|
||||
return withLock(() => {
|
||||
const store = readStore();
|
||||
const now = Date.now();
|
||||
sweep(store);
|
||||
const record = store.pairings.find((r) => r.code === code && r.status === 'pending');
|
||||
if (!record) {
|
||||
// Miss: record the attempt on every currently-pending record so each
|
||||
// waitForPairing caller can surface it as user feedback.
|
||||
const attempt: PairingAttempt = {
|
||||
candidate: code,
|
||||
platformId: input.platformId,
|
||||
at: new Date(now).toISOString(),
|
||||
matched: false,
|
||||
};
|
||||
let recorded = false;
|
||||
for (const r of store.pairings) {
|
||||
if (r.status !== 'pending') continue;
|
||||
r.attempts = [...(r.attempts ?? []), attempt].slice(-MAX_ATTEMPTS_PER_RECORD);
|
||||
// One attempt per code. A wrong guess invalidates the pairing
|
||||
// immediately — pair-telegram observes the `invalidated` signal and
|
||||
// auto-issues a fresh code (up to a retry cap).
|
||||
r.status = 'invalidated';
|
||||
recorded = true;
|
||||
}
|
||||
writeStore(store);
|
||||
if (recorded) {
|
||||
log.info('Pairing invalidated by wrong attempt', { candidate: code, platformId: input.platformId });
|
||||
}
|
||||
return null;
|
||||
}
|
||||
record.status = 'consumed';
|
||||
record.consumed = {
|
||||
platformId: input.platformId,
|
||||
isGroup: input.isGroup,
|
||||
name: input.name ?? null,
|
||||
adminUserId: input.adminUserId ?? null,
|
||||
consumedAt: new Date(now).toISOString(),
|
||||
};
|
||||
record.attempts = [
|
||||
...(record.attempts ?? []),
|
||||
{ candidate: code, platformId: input.platformId, at: new Date(now).toISOString(), matched: true },
|
||||
].slice(-MAX_ATTEMPTS_PER_RECORD);
|
||||
writeStore(store);
|
||||
log.info('Pairing consumed', { code, platformId: input.platformId, intent: record.intent });
|
||||
return record;
|
||||
});
|
||||
}
|
||||
|
||||
export function getStatus(code: string): PairingStatus {
|
||||
const store = readStore();
|
||||
sweep(store);
|
||||
const r = store.pairings.find((p) => p.code === code);
|
||||
if (!r) return 'unknown';
|
||||
return r.status;
|
||||
}
|
||||
|
||||
export function getPairing(code: string): PairingRecord | null {
|
||||
const store = readStore();
|
||||
sweep(store);
|
||||
return store.pairings.find((p) => p.code === code) ?? null;
|
||||
}
|
||||
|
||||
export interface WaitForPairingOptions {
|
||||
/** Polling interval as a fallback when fs.watch misses an event. */
|
||||
pollMs?: number;
|
||||
/** Fires once per new attempt recorded against this pairing (misses only). */
|
||||
onAttempt?: (attempt: PairingAttempt) => void;
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolve when the pairing is consumed; reject when it is invalidated
|
||||
* (wrong code guess). Waits indefinitely — codes do not expire.
|
||||
* Uses fs.watch as the primary signal with a slow poll fallback.
|
||||
*/
|
||||
export async function waitForPairing(code: string, opts: WaitForPairingOptions = {}): Promise<PairingRecord> {
|
||||
const pollMs = opts.pollMs ?? 1000;
|
||||
const initial = getPairing(code);
|
||||
if (!initial) throw new Error(`Unknown pairing code: ${code}`);
|
||||
|
||||
return new Promise<PairingRecord>((resolve, reject) => {
|
||||
let watcher: fs.FSWatcher | null = null;
|
||||
let interval: NodeJS.Timeout | null = null;
|
||||
let settled = false;
|
||||
|
||||
const cleanup = () => {
|
||||
settled = true;
|
||||
if (watcher)
|
||||
try {
|
||||
watcher.close();
|
||||
} catch {
|
||||
/* ignore */
|
||||
}
|
||||
if (interval) clearInterval(interval);
|
||||
};
|
||||
|
||||
let seenAttempts = 0;
|
||||
const check = () => {
|
||||
if (settled) return;
|
||||
const r = getPairing(code);
|
||||
if (!r) {
|
||||
cleanup();
|
||||
reject(new Error(`Pairing ${code} disappeared`));
|
||||
return;
|
||||
}
|
||||
// Surface any new miss attempts since the last tick. Only fire for
|
||||
// misses — matches are signaled by `status === 'consumed'` below.
|
||||
if (opts.onAttempt && r.attempts) {
|
||||
for (let i = seenAttempts; i < r.attempts.length; i++) {
|
||||
const a = r.attempts[i];
|
||||
if (!a.matched) {
|
||||
try {
|
||||
opts.onAttempt(a);
|
||||
} catch {
|
||||
/* ignore */
|
||||
}
|
||||
}
|
||||
}
|
||||
seenAttempts = r.attempts.length;
|
||||
}
|
||||
if (r.status === 'consumed') {
|
||||
cleanup();
|
||||
resolve(r);
|
||||
return;
|
||||
}
|
||||
if (r.status === 'invalidated') {
|
||||
cleanup();
|
||||
const lastMiss = r.attempts
|
||||
?.slice()
|
||||
.reverse()
|
||||
.find((a) => !a.matched);
|
||||
reject(new Error(`Pairing ${code} invalidated by wrong code${lastMiss ? ` (${lastMiss.candidate})` : ''}`));
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
try {
|
||||
const dir = path.dirname(storePath());
|
||||
fs.mkdirSync(dir, { recursive: true });
|
||||
watcher = fs.watch(dir, (_event, fname) => {
|
||||
if (!fname || fname.toString().startsWith(path.basename(storePath()))) check();
|
||||
});
|
||||
} catch {
|
||||
// fs.watch unsupported — poll-only is fine
|
||||
}
|
||||
interval = setInterval(check, pollMs);
|
||||
check();
|
||||
});
|
||||
}
|
||||
|
||||
/** Test helper — wipe the store. */
|
||||
export function _resetForTest(): void {
|
||||
try {
|
||||
fs.unlinkSync(storePath());
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,229 @@
|
||||
/**
|
||||
* Telegram channel adapter (v2) — uses Chat SDK bridge, with a pairing
|
||||
* interceptor wrapped around onInbound to verify chat ownership before
|
||||
* registration. See telegram-pairing.ts for the why.
|
||||
*/
|
||||
import { createTelegramAdapter } from '@chat-adapter/telegram';
|
||||
|
||||
import { readEnvFile } from '../env.js';
|
||||
import { log } from '../log.js';
|
||||
import { createMessagingGroup, getMessagingGroupByPlatform, updateMessagingGroup } from '../db/messaging-groups.js';
|
||||
import { grantRole, hasAnyOwner } from '../db/user-roles.js';
|
||||
import { upsertUser } from '../db/users.js';
|
||||
import { createChatSdkBridge, type ReplyContext } from './chat-sdk-bridge.js';
|
||||
import { sanitizeTelegramLegacyMarkdown } from './telegram-markdown-sanitize.js';
|
||||
import { registerChannelAdapter } from './channel-registry.js';
|
||||
import type { ChannelAdapter, ChannelSetup, InboundMessage } from './adapter.js';
|
||||
import { tryConsume } from './telegram-pairing.js';
|
||||
|
||||
/**
|
||||
* Retry a one-shot operation that can fail on transient network errors at
|
||||
* cold-start (DNS hiccups, brief upstream outages). Exponential backoff capped
|
||||
* at 5 attempts — if the network is truly down we surface it instead of
|
||||
* hanging the service indefinitely.
|
||||
*/
|
||||
async function withRetry<T>(fn: () => Promise<T>, label: string, maxAttempts = 5): Promise<T> {
|
||||
let lastErr: unknown;
|
||||
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
|
||||
try {
|
||||
return await fn();
|
||||
} catch (err) {
|
||||
lastErr = err;
|
||||
if (attempt === maxAttempts) break;
|
||||
const delay = Math.min(16000, 1000 * 2 ** (attempt - 1));
|
||||
log.warn('Telegram setup failed, retrying', { label, attempt, delayMs: delay, err });
|
||||
await new Promise((r) => setTimeout(r, delay));
|
||||
}
|
||||
}
|
||||
throw lastErr;
|
||||
}
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
function extractReplyContext(raw: Record<string, any>): ReplyContext | null {
|
||||
if (!raw.reply_to_message) return null;
|
||||
const reply = raw.reply_to_message;
|
||||
return {
|
||||
text: reply.text || reply.caption || '',
|
||||
sender: reply.from?.first_name || reply.from?.username || 'Unknown',
|
||||
};
|
||||
}
|
||||
|
||||
/** Look up the bot username via Telegram getMe. Cached after first call. */
|
||||
async function fetchBotUsername(token: string): Promise<string | null> {
|
||||
try {
|
||||
const res = await fetch(`https://api.telegram.org/bot${token}/getMe`);
|
||||
const json = (await res.json()) as { ok: boolean; result?: { username?: string } };
|
||||
return json.ok ? (json.result?.username ?? null) : null;
|
||||
} catch (err) {
|
||||
log.warn('Telegram getMe failed', { err });
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
function isGroupPlatformId(platformId: string): boolean {
|
||||
// platformId is "telegram:<chatId>". Negative chat IDs are groups/channels.
|
||||
const id = platformId.split(':').pop() ?? '';
|
||||
return id.startsWith('-');
|
||||
}
|
||||
|
||||
interface InboundFields {
|
||||
text: string;
|
||||
authorUserId: string | null;
|
||||
}
|
||||
|
||||
function readInboundFields(message: InboundMessage): InboundFields {
|
||||
if (message.kind !== 'chat-sdk' || !message.content || typeof message.content !== 'object') {
|
||||
return { text: '', authorUserId: null };
|
||||
}
|
||||
const c = message.content as { text?: string; author?: { userId?: string } };
|
||||
return { text: c.text ?? '', authorUserId: c.author?.userId ?? null };
|
||||
}
|
||||
|
||||
/**
|
||||
* Build an onInbound interceptor that consumes pairing codes before they
|
||||
* reach the router. On match: records the chat + its paired user, promotes
|
||||
* the user to owner if the instance has no owner yet, and short-circuits.
|
||||
* On miss: forwards to the host.
|
||||
*/
|
||||
/**
|
||||
* Send a one-shot confirmation back to the paired chat. Best-effort — failures
|
||||
* are logged but never propagated, so a Telegram outage can't undo a successful
|
||||
* pairing or trigger the interceptor's fail-open path.
|
||||
*/
|
||||
async function sendPairingConfirmation(token: string, platformId: string): Promise<void> {
|
||||
const chatId = platformId.split(':').slice(1).join(':');
|
||||
if (!chatId) return;
|
||||
try {
|
||||
const res = await fetch(`https://api.telegram.org/bot${token}/sendMessage`, {
|
||||
method: 'POST',
|
||||
headers: { 'content-type': 'application/json' },
|
||||
body: JSON.stringify({
|
||||
chat_id: chatId,
|
||||
text: "Pairing success! I'm spinning up the agent now, you'll get a message from them shortly.",
|
||||
}),
|
||||
});
|
||||
if (!res.ok) {
|
||||
log.warn('Telegram pairing confirmation non-OK', { status: res.status });
|
||||
}
|
||||
} catch (err) {
|
||||
log.warn('Telegram pairing confirmation failed', { err });
|
||||
}
|
||||
}
|
||||
|
||||
function createPairingInterceptor(
|
||||
botUsernamePromise: Promise<string | null>,
|
||||
hostOnInbound: ChannelSetup['onInbound'],
|
||||
token: string,
|
||||
): ChannelSetup['onInbound'] {
|
||||
return async (platformId, threadId, message) => {
|
||||
try {
|
||||
const botUsername = await botUsernamePromise;
|
||||
if (!botUsername) {
|
||||
hostOnInbound(platformId, threadId, message);
|
||||
return;
|
||||
}
|
||||
const { text, authorUserId } = readInboundFields(message);
|
||||
if (!text) {
|
||||
hostOnInbound(platformId, threadId, message);
|
||||
return;
|
||||
}
|
||||
const consumed = await tryConsume({
|
||||
text,
|
||||
botUsername,
|
||||
platformId,
|
||||
isGroup: isGroupPlatformId(platformId),
|
||||
adminUserId: authorUserId,
|
||||
});
|
||||
if (!consumed) {
|
||||
hostOnInbound(platformId, threadId, message);
|
||||
return;
|
||||
}
|
||||
// Pairing matched — record the chat and short-circuit so the
|
||||
// code-bearing message never reaches an agent. Privilege is now a
|
||||
// property of the paired user, not the chat: upsert the user, and if
|
||||
// this instance has no owner yet, promote them to owner.
|
||||
const existing = getMessagingGroupByPlatform('telegram', platformId);
|
||||
if (existing) {
|
||||
updateMessagingGroup(existing.id, {
|
||||
is_group: consumed.consumed!.isGroup ? 1 : 0,
|
||||
});
|
||||
} else {
|
||||
createMessagingGroup({
|
||||
id: `mg-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`,
|
||||
channel_type: 'telegram',
|
||||
platform_id: platformId,
|
||||
name: consumed.consumed!.name,
|
||||
is_group: consumed.consumed!.isGroup ? 1 : 0,
|
||||
unknown_sender_policy: 'strict',
|
||||
created_at: new Date().toISOString(),
|
||||
});
|
||||
}
|
||||
|
||||
const pairedUserId = `telegram:${consumed.consumed!.adminUserId}`;
|
||||
upsertUser({
|
||||
id: pairedUserId,
|
||||
kind: 'telegram',
|
||||
display_name: null,
|
||||
created_at: new Date().toISOString(),
|
||||
});
|
||||
|
||||
let promotedToOwner = false;
|
||||
if (!hasAnyOwner()) {
|
||||
grantRole({
|
||||
user_id: pairedUserId,
|
||||
role: 'owner',
|
||||
agent_group_id: null,
|
||||
granted_by: null,
|
||||
granted_at: new Date().toISOString(),
|
||||
});
|
||||
promotedToOwner = true;
|
||||
}
|
||||
|
||||
log.info('Telegram pairing accepted — chat registered', {
|
||||
platformId,
|
||||
pairedUser: pairedUserId,
|
||||
promotedToOwner,
|
||||
intent: consumed.intent,
|
||||
});
|
||||
|
||||
await sendPairingConfirmation(token, platformId);
|
||||
} catch (err) {
|
||||
log.error('Telegram pairing interceptor error', { err });
|
||||
// Fail open: pass through so a pairing bug doesn't break normal traffic.
|
||||
hostOnInbound(platformId, threadId, message);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
registerChannelAdapter('telegram', {
|
||||
factory: () => {
|
||||
const env = readEnvFile(['TELEGRAM_BOT_TOKEN']);
|
||||
if (!env.TELEGRAM_BOT_TOKEN) return null;
|
||||
const token = env.TELEGRAM_BOT_TOKEN;
|
||||
const telegramAdapter = createTelegramAdapter({
|
||||
botToken: token,
|
||||
mode: 'polling',
|
||||
});
|
||||
const bridge = createChatSdkBridge({
|
||||
adapter: telegramAdapter,
|
||||
concurrency: 'concurrent',
|
||||
extractReplyContext,
|
||||
supportsThreads: false,
|
||||
transformOutboundText: sanitizeTelegramLegacyMarkdown,
|
||||
});
|
||||
|
||||
const botUsernamePromise = fetchBotUsername(token);
|
||||
|
||||
const wrapped: ChannelAdapter = {
|
||||
...bridge,
|
||||
async setup(hostConfig: ChannelSetup) {
|
||||
const intercepted: ChannelSetup = {
|
||||
...hostConfig,
|
||||
onInbound: createPairingInterceptor(botUsernamePromise, hostConfig.onInbound, token),
|
||||
};
|
||||
return withRetry(() => bridge.setup(intercepted), 'bridge.setup');
|
||||
},
|
||||
};
|
||||
return wrapped;
|
||||
},
|
||||
});
|
||||
@@ -0,0 +1,21 @@
|
||||
/**
|
||||
* Webex channel adapter (v2) — uses Chat SDK bridge.
|
||||
* Self-registers on import.
|
||||
*/
|
||||
import { createWebexAdapter } from '@bitbasti/chat-adapter-webex';
|
||||
|
||||
import { readEnvFile } from '../env.js';
|
||||
import { createChatSdkBridge } from './chat-sdk-bridge.js';
|
||||
import { registerChannelAdapter } from './channel-registry.js';
|
||||
|
||||
registerChannelAdapter('webex', {
|
||||
factory: () => {
|
||||
const env = readEnvFile(['WEBEX_BOT_TOKEN', 'WEBEX_WEBHOOK_SECRET']);
|
||||
if (!env.WEBEX_BOT_TOKEN) return null;
|
||||
const webexAdapter = createWebexAdapter({
|
||||
botToken: env.WEBEX_BOT_TOKEN,
|
||||
webhookSecret: env.WEBEX_WEBHOOK_SECRET,
|
||||
});
|
||||
return createChatSdkBridge({ adapter: webexAdapter, concurrency: 'concurrent', supportsThreads: true });
|
||||
},
|
||||
});
|
||||
@@ -0,0 +1,29 @@
|
||||
/**
|
||||
* WhatsApp Cloud API channel adapter (v2) — uses Chat SDK bridge.
|
||||
* Uses the official Meta WhatsApp Business Cloud API (not Baileys).
|
||||
* Self-registers on import.
|
||||
*/
|
||||
import { createWhatsAppAdapter } from '@chat-adapter/whatsapp';
|
||||
|
||||
import { readEnvFile } from '../env.js';
|
||||
import { createChatSdkBridge } from './chat-sdk-bridge.js';
|
||||
import { registerChannelAdapter } from './channel-registry.js';
|
||||
|
||||
registerChannelAdapter('whatsapp-cloud', {
|
||||
factory: () => {
|
||||
const env = readEnvFile([
|
||||
'WHATSAPP_ACCESS_TOKEN',
|
||||
'WHATSAPP_PHONE_NUMBER_ID',
|
||||
'WHATSAPP_APP_SECRET',
|
||||
'WHATSAPP_VERIFY_TOKEN',
|
||||
]);
|
||||
if (!env.WHATSAPP_ACCESS_TOKEN) return null;
|
||||
const whatsappAdapter = createWhatsAppAdapter({
|
||||
accessToken: env.WHATSAPP_ACCESS_TOKEN,
|
||||
phoneNumberId: env.WHATSAPP_PHONE_NUMBER_ID,
|
||||
appSecret: env.WHATSAPP_APP_SECRET,
|
||||
verifyToken: env.WHATSAPP_VERIFY_TOKEN,
|
||||
});
|
||||
return createChatSdkBridge({ adapter: whatsappAdapter, concurrency: 'concurrent', supportsThreads: false });
|
||||
},
|
||||
});
|
||||
@@ -0,0 +1,741 @@
|
||||
/**
|
||||
* WhatsApp channel adapter (v2) — native Baileys v6 implementation.
|
||||
*
|
||||
* Implements ChannelAdapter directly (no Chat SDK bridge) using
|
||||
* @whiskeysockets/baileys v6 (stable). Ports proven v1 infrastructure:
|
||||
* getMessage fallback, outgoing queue, group metadata cache, LID mapping,
|
||||
* reconnection with backoff.
|
||||
*
|
||||
* Auth credentials persist in store/auth/. On first run:
|
||||
* - If WHATSAPP_PHONE_NUMBER is set → pairing code (printed to log)
|
||||
* - Otherwise → QR code (printed to log)
|
||||
* Subsequent restarts reuse the saved session automatically.
|
||||
*/
|
||||
import fs from 'fs';
|
||||
import path from 'path';
|
||||
import pino from 'pino';
|
||||
|
||||
import {
|
||||
makeWASocket,
|
||||
Browsers,
|
||||
DisconnectReason,
|
||||
fetchLatestWaWebVersion,
|
||||
downloadMediaMessage,
|
||||
makeCacheableSignalKeyStore,
|
||||
normalizeMessageContent,
|
||||
useMultiFileAuthState,
|
||||
} from '@whiskeysockets/baileys';
|
||||
import type { GroupMetadata, WAMessageKey, WAMessage, WASocket } from '@whiskeysockets/baileys';
|
||||
|
||||
import { ASSISTANT_HAS_OWN_NUMBER, ASSISTANT_NAME, DATA_DIR } from '../config.js';
|
||||
import { readEnvFile } from '../env.js';
|
||||
import { log } from '../log.js';
|
||||
import { registerChannelAdapter } from './channel-registry.js';
|
||||
import { normalizeOptions, type NormalizedOption } from './ask-question.js';
|
||||
import type {
|
||||
ChannelAdapter,
|
||||
ChannelSetup,
|
||||
ConversationConfig,
|
||||
ConversationInfo,
|
||||
InboundMessage,
|
||||
OutboundMessage,
|
||||
} from './adapter.js';
|
||||
|
||||
// Baileys v6 bug: getPlatformId sends charCode (49) instead of enum value (1).
|
||||
// Fixed in Baileys 7.x but not backported. Without this, pairing codes fail with
|
||||
// "couldn't link device" because WhatsApp receives an invalid platform ID.
|
||||
// Must use createRequire — ESM `import *` creates a read-only namespace.
|
||||
// proto is not available as a named ESM export — use createRequire (same as v1)
|
||||
import { createRequire } from 'module';
|
||||
const _require = createRequire(import.meta.url);
|
||||
const { proto } = _require('@whiskeysockets/baileys') as { proto: any };
|
||||
try {
|
||||
const _generics = _require('@whiskeysockets/baileys/lib/Utils/generics') as Record<string, unknown>;
|
||||
_generics.getPlatformId = (browser: string): string => {
|
||||
const platformType =
|
||||
proto.DeviceProps.PlatformType[browser.toUpperCase() as keyof typeof proto.DeviceProps.PlatformType];
|
||||
return platformType ? platformType.toString() : '1';
|
||||
};
|
||||
} catch {
|
||||
// If CJS require fails (Node version mismatch), pairing codes may not work
|
||||
// but QR auth will still function fine.
|
||||
log.warn('Could not patch getPlatformId — pairing code auth may fail');
|
||||
}
|
||||
|
||||
const baileysLogger = pino({ level: 'silent' });
|
||||
|
||||
const AUTH_DIR = path.join(process.cwd(), 'store', 'auth');
|
||||
const GROUP_SYNC_INTERVAL_MS = 24 * 60 * 60 * 1000; // 24h
|
||||
const GROUP_METADATA_CACHE_TTL_MS = 60_000; // 1 min for outbound sends
|
||||
const SENT_MESSAGE_CACHE_MAX = 256;
|
||||
const RECONNECT_DELAY_MS = 5000;
|
||||
const PENDING_QUESTIONS_MAX = 64;
|
||||
|
||||
/** Normalize an option label to a slash command: "Approve" → "/approve" */
|
||||
function optionToCommand(option: string): string {
|
||||
return '/' + option.toLowerCase().replace(/\s+/g, '-');
|
||||
}
|
||||
|
||||
// --- Markdown → WhatsApp formatting ---
|
||||
|
||||
interface TextSegment {
|
||||
content: string;
|
||||
isProtected: boolean;
|
||||
}
|
||||
|
||||
/** Split text into code-block-protected and unprotected regions. */
|
||||
function splitProtectedRegions(text: string): TextSegment[] {
|
||||
const segments: TextSegment[] = [];
|
||||
const codeBlockRegex = /```[\s\S]*?```|`[^`\n]+`/g;
|
||||
let lastIndex = 0;
|
||||
let match: RegExpExecArray | null;
|
||||
|
||||
while ((match = codeBlockRegex.exec(text)) !== null) {
|
||||
if (match.index > lastIndex) {
|
||||
segments.push({ content: text.slice(lastIndex, match.index), isProtected: false });
|
||||
}
|
||||
segments.push({ content: match[0], isProtected: true });
|
||||
lastIndex = match.index + match[0].length;
|
||||
}
|
||||
|
||||
if (lastIndex < text.length) {
|
||||
segments.push({ content: text.slice(lastIndex), isProtected: false });
|
||||
}
|
||||
|
||||
return segments;
|
||||
}
|
||||
|
||||
/** Apply WhatsApp-native formatting to an unprotected text segment. */
|
||||
function transformForWhatsApp(text: string): string {
|
||||
// Order matters: italic before bold to avoid **bold** → *bold* → _bold_
|
||||
// 1. Italic: *text* (not **) → _text_
|
||||
text = text.replace(/(?<!\*)\*(?=[^\s*])([^*\n]+?)(?<=[^\s*])\*(?!\*)/g, '_$1_');
|
||||
// 2. Bold: **text** → *text*
|
||||
text = text.replace(/\*\*(?=[^\s*])([^*]+?)(?<=[^\s*])\*\*/g, '*$1*');
|
||||
// 3. Headings: ## Title → *Title*
|
||||
text = text.replace(/^#{1,6}\s+(.+)$/gm, '*$1*');
|
||||
// 4. Links: [text](url) → text (url)
|
||||
text = text.replace(/\[([^\]]+)\]\(([^)]+)\)/g, '$1 ($2)');
|
||||
// 5. Horizontal rules: --- / *** / ___ → stripped
|
||||
text = text.replace(/^(-{3,}|\*{3,}|_{3,})$/gm, '');
|
||||
return text;
|
||||
}
|
||||
|
||||
/** Convert Claude's markdown to WhatsApp-native formatting. */
|
||||
function formatWhatsApp(text: string): string {
|
||||
const segments = splitProtectedRegions(text);
|
||||
return segments.map(({ content, isProtected }) => (isProtected ? content : transformForWhatsApp(content))).join('');
|
||||
}
|
||||
|
||||
/** Map file extension to Baileys media message type. */
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
function buildMediaMessage(data: Buffer, filename: string, ext: string, caption?: string): any {
|
||||
const imageExts = ['.jpg', '.jpeg', '.png', '.gif', '.webp'];
|
||||
const videoExts = ['.mp4', '.mov', '.avi', '.mkv'];
|
||||
const audioExts = ['.mp3', '.ogg', '.m4a', '.wav', '.aac', '.opus'];
|
||||
|
||||
if (imageExts.includes(ext)) {
|
||||
return { image: data, caption, mimetype: `image/${ext.slice(1) === 'jpg' ? 'jpeg' : ext.slice(1)}` };
|
||||
}
|
||||
if (videoExts.includes(ext)) {
|
||||
return { video: data, caption, mimetype: `video/${ext.slice(1)}` };
|
||||
}
|
||||
if (audioExts.includes(ext)) {
|
||||
return { audio: data, mimetype: `audio/${ext.slice(1) === 'mp3' ? 'mpeg' : ext.slice(1)}` };
|
||||
}
|
||||
// Default: send as document
|
||||
return { document: data, fileName: filename, caption, mimetype: 'application/octet-stream' };
|
||||
}
|
||||
|
||||
registerChannelAdapter('whatsapp', {
|
||||
factory: () => {
|
||||
const env = readEnvFile(['WHATSAPP_PHONE_NUMBER', 'WHATSAPP_ENABLED']);
|
||||
const phoneNumber = env.WHATSAPP_PHONE_NUMBER;
|
||||
const authDir = AUTH_DIR;
|
||||
|
||||
// Skip if no existing auth, no phone number for pairing, and not explicitly enabled (QR mode)
|
||||
const hasAuth = fs.existsSync(path.join(authDir, 'creds.json'));
|
||||
if (!hasAuth && !phoneNumber && !env.WHATSAPP_ENABLED) return null;
|
||||
|
||||
fs.mkdirSync(authDir, { recursive: true });
|
||||
|
||||
// State
|
||||
let sock: WASocket;
|
||||
let connected = false;
|
||||
let setupConfig: ChannelSetup;
|
||||
let conversations: Map<string, ConversationConfig>;
|
||||
|
||||
// LID → phone JID mapping (WhatsApp's new ID system)
|
||||
const lidToPhoneMap: Record<string, string> = {};
|
||||
let botLidUser: string | undefined;
|
||||
|
||||
// Outgoing queue for messages sent while disconnected
|
||||
const outgoingQueue: Array<{ jid: string; text: string }> = [];
|
||||
let flushing = false;
|
||||
|
||||
// Sent message cache for retry/re-encrypt requests
|
||||
const sentMessageCache = new Map<string, any>();
|
||||
|
||||
// Group metadata cache with TTL
|
||||
const groupMetadataCache = new Map<string, { metadata: GroupMetadata; expiresAt: number }>();
|
||||
|
||||
// Pending questions: chatJid → { questionId, options }
|
||||
// User replies with /approve, /reject, etc. to answer
|
||||
const pendingQuestions = new Map<
|
||||
string,
|
||||
{
|
||||
questionId: string;
|
||||
options: NormalizedOption[];
|
||||
}
|
||||
>();
|
||||
|
||||
// Group sync tracking
|
||||
let lastGroupSync = 0;
|
||||
let groupSyncTimerStarted = false;
|
||||
|
||||
// First-connect promise
|
||||
let resolveFirstOpen: (() => void) | undefined;
|
||||
let rejectFirstOpen: ((err: Error) => void) | undefined;
|
||||
|
||||
// Pairing code file for the setup skill to poll
|
||||
const pairingCodeFile = path.join(process.cwd(), 'store', 'pairing-code.txt');
|
||||
|
||||
// --- Helpers ---
|
||||
|
||||
function buildConversationMap(configs: ConversationConfig[]): Map<string, ConversationConfig> {
|
||||
const map = new Map<string, ConversationConfig>();
|
||||
for (const conv of configs) map.set(conv.platformId, conv);
|
||||
return map;
|
||||
}
|
||||
|
||||
function setLidPhoneMapping(lidUser: string, phoneJid: string): void {
|
||||
if (lidToPhoneMap[lidUser] === phoneJid) return;
|
||||
lidToPhoneMap[lidUser] = phoneJid;
|
||||
// Cached group metadata depends on participant IDs — invalidate
|
||||
groupMetadataCache.clear();
|
||||
}
|
||||
|
||||
async function translateJid(jid: string): Promise<string> {
|
||||
if (!jid.endsWith('@lid')) return jid;
|
||||
const lidUser = jid.split('@')[0].split(':')[0];
|
||||
|
||||
const cached = lidToPhoneMap[lidUser];
|
||||
if (cached) return cached;
|
||||
|
||||
// Query Baileys' signal repository
|
||||
try {
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
const pn = await (sock.signalRepository as any)?.lidMapping?.getPNForLID(jid);
|
||||
if (pn) {
|
||||
const phoneJid = `${pn.split('@')[0].split(':')[0]}@s.whatsapp.net`;
|
||||
setLidPhoneMapping(lidUser, phoneJid);
|
||||
log.info('Translated LID to phone JID', { lidJid: jid, phoneJid });
|
||||
return phoneJid;
|
||||
}
|
||||
} catch (err) {
|
||||
log.debug('Failed to resolve LID via signalRepository', { jid, err });
|
||||
}
|
||||
|
||||
return jid;
|
||||
}
|
||||
|
||||
async function getNormalizedGroupMetadata(jid: string): Promise<GroupMetadata | undefined> {
|
||||
if (!jid.endsWith('@g.us')) return undefined;
|
||||
|
||||
const cached = groupMetadataCache.get(jid);
|
||||
if (cached && cached.expiresAt > Date.now()) return cached.metadata;
|
||||
|
||||
const metadata = await sock.groupMetadata(jid);
|
||||
const participants = await Promise.all(
|
||||
metadata.participants.map(async (p) => ({
|
||||
...p,
|
||||
id: await translateJid(p.id),
|
||||
})),
|
||||
);
|
||||
const normalized = { ...metadata, participants };
|
||||
groupMetadataCache.set(jid, {
|
||||
metadata: normalized,
|
||||
expiresAt: Date.now() + GROUP_METADATA_CACHE_TTL_MS,
|
||||
});
|
||||
return normalized;
|
||||
}
|
||||
|
||||
async function syncGroupMetadata(force = false): Promise<void> {
|
||||
if (!force && lastGroupSync && Date.now() - lastGroupSync < GROUP_SYNC_INTERVAL_MS) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
log.info('Syncing group metadata from WhatsApp...');
|
||||
const groups = await sock.groupFetchAllParticipating();
|
||||
let count = 0;
|
||||
for (const [jid, metadata] of Object.entries(groups)) {
|
||||
if (metadata.subject) {
|
||||
setupConfig.onMetadata(jid, metadata.subject, true);
|
||||
count++;
|
||||
}
|
||||
}
|
||||
lastGroupSync = Date.now();
|
||||
log.info('Group metadata synced', { count });
|
||||
} catch (err) {
|
||||
log.error('Failed to sync group metadata', { err });
|
||||
}
|
||||
}
|
||||
|
||||
async function flushOutgoingQueue(): Promise<void> {
|
||||
if (flushing || outgoingQueue.length === 0) return;
|
||||
flushing = true;
|
||||
try {
|
||||
log.info('Flushing outgoing message queue', { count: outgoingQueue.length });
|
||||
while (outgoingQueue.length > 0) {
|
||||
const item = outgoingQueue.shift()!;
|
||||
const sent = await sock.sendMessage(item.jid, { text: item.text });
|
||||
if (sent?.key?.id && sent.message) {
|
||||
sentMessageCache.set(sent.key.id, sent.message);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
flushing = false;
|
||||
}
|
||||
}
|
||||
|
||||
/** Download media from an inbound message, save to /workspace/attachments/. */
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
async function downloadInboundMedia(
|
||||
msg: WAMessage,
|
||||
normalized: any,
|
||||
): Promise<Array<{ type: string; name: string; localPath: string }>> {
|
||||
const mediaTypes: Array<{ key: string; type: string; ext: string }> = [
|
||||
{ key: 'imageMessage', type: 'image', ext: '.jpg' },
|
||||
{ key: 'videoMessage', type: 'video', ext: '.mp4' },
|
||||
{ key: 'audioMessage', type: 'audio', ext: '.ogg' },
|
||||
{ key: 'documentMessage', type: 'document', ext: '' },
|
||||
];
|
||||
const results: Array<{ type: string; name: string; localPath: string }> = [];
|
||||
for (const { key, type, ext } of mediaTypes) {
|
||||
if (!normalized[key]) continue;
|
||||
try {
|
||||
const buffer = await downloadMediaMessage(msg, 'buffer', {});
|
||||
const docFilename = normalized[key].fileName;
|
||||
const filename = docFilename || `${type}-${Date.now()}${ext}`;
|
||||
const attachDir = path.join(DATA_DIR, 'attachments');
|
||||
fs.mkdirSync(attachDir, { recursive: true });
|
||||
const filePath = path.join(attachDir, filename);
|
||||
fs.writeFileSync(filePath, buffer);
|
||||
results.push({ type, name: filename, localPath: `attachments/${filename}` });
|
||||
log.info('Media downloaded', { type, filename });
|
||||
} catch (err) {
|
||||
log.warn('Failed to download media', { type, err });
|
||||
}
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
async function sendRawMessage(jid: string, text: string): Promise<string | undefined> {
|
||||
if (!connected) {
|
||||
outgoingQueue.push({ jid, text });
|
||||
log.info('WA disconnected, message queued', { jid, queueSize: outgoingQueue.length });
|
||||
return;
|
||||
}
|
||||
try {
|
||||
const sent = await sock.sendMessage(jid, { text });
|
||||
if (sent?.key?.id && sent.message) {
|
||||
sentMessageCache.set(sent.key.id, sent.message);
|
||||
if (sentMessageCache.size > SENT_MESSAGE_CACHE_MAX) {
|
||||
const oldest = sentMessageCache.keys().next().value!;
|
||||
sentMessageCache.delete(oldest);
|
||||
}
|
||||
}
|
||||
return sent?.key?.id ?? undefined;
|
||||
} catch (err) {
|
||||
outgoingQueue.push({ jid, text });
|
||||
log.warn('Failed to send, message queued', { jid, err, queueSize: outgoingQueue.length });
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
||||
// --- Socket creation ---
|
||||
|
||||
async function connectSocket(): Promise<void> {
|
||||
const { state, saveCreds } = await useMultiFileAuthState(authDir);
|
||||
|
||||
const { version } = await fetchLatestWaWebVersion({}).catch((err) => {
|
||||
log.warn('Failed to fetch latest WA Web version, using default', { err });
|
||||
return { version: undefined };
|
||||
});
|
||||
|
||||
sock = makeWASocket({
|
||||
version,
|
||||
auth: {
|
||||
creds: state.creds,
|
||||
keys: makeCacheableSignalKeyStore(state.keys, baileysLogger),
|
||||
},
|
||||
printQRInTerminal: false,
|
||||
logger: baileysLogger,
|
||||
browser: Browsers.macOS('Chrome'),
|
||||
cachedGroupMetadata: async (jid: string) => getNormalizedGroupMetadata(jid),
|
||||
getMessage: async (key: WAMessageKey) => {
|
||||
// Check in-memory cache first (recently sent messages)
|
||||
const cached = sentMessageCache.get(key.id || '');
|
||||
if (cached) return cached;
|
||||
// Return empty message to prevent indefinite "waiting for this message"
|
||||
return proto.Message.fromObject({});
|
||||
},
|
||||
});
|
||||
|
||||
// Request pairing code if phone number is set and not yet registered
|
||||
if (phoneNumber && !state.creds.registered) {
|
||||
setTimeout(async () => {
|
||||
try {
|
||||
const code = await sock.requestPairingCode(phoneNumber);
|
||||
log.info(`WhatsApp pairing code: ${code}`);
|
||||
log.info('Enter in WhatsApp > Linked Devices > Link with phone number');
|
||||
fs.writeFileSync(pairingCodeFile, code, 'utf-8');
|
||||
} catch (err) {
|
||||
log.error('Failed to request pairing code', { err });
|
||||
}
|
||||
}, 3000);
|
||||
}
|
||||
|
||||
sock.ev.on('connection.update', (update) => {
|
||||
const { connection, lastDisconnect, qr } = update;
|
||||
|
||||
if (qr && !phoneNumber) {
|
||||
// QR code auth — print to terminal
|
||||
(async () => {
|
||||
try {
|
||||
const QRCode = await import('qrcode');
|
||||
const qrText = await QRCode.toString(qr, { type: 'terminal' });
|
||||
log.info('WhatsApp QR code — scan with WhatsApp > Linked Devices:\n' + qrText);
|
||||
} catch {
|
||||
log.info('WhatsApp QR code (raw)', { qr });
|
||||
}
|
||||
})();
|
||||
}
|
||||
|
||||
if (connection === 'close') {
|
||||
connected = false;
|
||||
const reason = (lastDisconnect?.error as { output?: { statusCode?: number } })?.output?.statusCode;
|
||||
const shouldReconnect = reason !== DisconnectReason.loggedOut;
|
||||
|
||||
log.info('WhatsApp connection closed', { reason, shouldReconnect });
|
||||
|
||||
if (shouldReconnect) {
|
||||
log.info('Reconnecting...');
|
||||
connectSocket().catch((err) => {
|
||||
log.error('Failed to reconnect, retrying in 5s', { err });
|
||||
setTimeout(() => {
|
||||
connectSocket().catch((err2) => {
|
||||
log.error('Reconnection retry failed', { err: err2 });
|
||||
});
|
||||
}, RECONNECT_DELAY_MS);
|
||||
});
|
||||
} else {
|
||||
log.info('WhatsApp logged out');
|
||||
if (rejectFirstOpen) {
|
||||
rejectFirstOpen(new Error('WhatsApp logged out'));
|
||||
rejectFirstOpen = undefined;
|
||||
resolveFirstOpen = undefined;
|
||||
}
|
||||
}
|
||||
} else if (connection === 'open') {
|
||||
connected = true;
|
||||
log.info('Connected to WhatsApp');
|
||||
|
||||
// Clean up pairing code file after successful connection
|
||||
try {
|
||||
if (fs.existsSync(pairingCodeFile)) fs.unlinkSync(pairingCodeFile);
|
||||
} catch {
|
||||
/* ignore */
|
||||
}
|
||||
|
||||
// Announce availability for presence updates
|
||||
sock.sendPresenceUpdate('available').catch((err) => {
|
||||
log.warn('Failed to send presence update', { err });
|
||||
});
|
||||
|
||||
// Build LID → phone mapping from auth state
|
||||
if (sock.user) {
|
||||
const phoneUser = sock.user.id.split(':')[0];
|
||||
const lidUser = sock.user.lid?.split(':')[0];
|
||||
if (lidUser && phoneUser) {
|
||||
setLidPhoneMapping(lidUser, `${phoneUser}@s.whatsapp.net`);
|
||||
botLidUser = lidUser;
|
||||
}
|
||||
}
|
||||
|
||||
// Flush queued messages
|
||||
flushOutgoingQueue().catch((err) => log.error('Failed to flush outgoing queue', { err }));
|
||||
|
||||
// Group sync
|
||||
syncGroupMetadata().catch((err) => log.error('Initial group sync failed', { err }));
|
||||
if (!groupSyncTimerStarted) {
|
||||
groupSyncTimerStarted = true;
|
||||
setInterval(() => {
|
||||
syncGroupMetadata().catch((err) => log.error('Periodic group sync failed', { err }));
|
||||
}, GROUP_SYNC_INTERVAL_MS);
|
||||
}
|
||||
|
||||
// Signal first open
|
||||
if (resolveFirstOpen) {
|
||||
resolveFirstOpen();
|
||||
resolveFirstOpen = undefined;
|
||||
rejectFirstOpen = undefined;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
sock.ev.on('creds.update', saveCreds);
|
||||
|
||||
// Phone number sharing events — update LID mapping
|
||||
sock.ev.on('chats.phoneNumberShare', ({ lid, jid }) => {
|
||||
const lidUser = lid?.split('@')[0].split(':')[0];
|
||||
if (lidUser && jid) setLidPhoneMapping(lidUser, jid);
|
||||
});
|
||||
|
||||
// Inbound messages
|
||||
sock.ev.on('messages.upsert', async ({ messages }) => {
|
||||
for (const msg of messages) {
|
||||
try {
|
||||
if (!msg.message) continue;
|
||||
const normalized = normalizeMessageContent(msg.message);
|
||||
if (!normalized) continue;
|
||||
const rawJid = msg.key.remoteJid;
|
||||
if (!rawJid || rawJid === 'status@broadcast') continue;
|
||||
|
||||
// Translate LID → phone JID
|
||||
let chatJid = await translateJid(rawJid);
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
if (chatJid.endsWith('@lid') && (msg.key as any).senderPn) {
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
const pn = (msg.key as any).senderPn as string;
|
||||
const phoneJid = pn.includes('@') ? pn : `${pn}@s.whatsapp.net`;
|
||||
setLidPhoneMapping(rawJid.split('@')[0].split(':')[0], phoneJid);
|
||||
chatJid = phoneJid;
|
||||
}
|
||||
|
||||
const timestamp = new Date(Number(msg.messageTimestamp) * 1000).toISOString();
|
||||
const isGroup = chatJid.endsWith('@g.us');
|
||||
|
||||
// Notify metadata for group discovery
|
||||
setupConfig.onMetadata(chatJid, undefined, isGroup);
|
||||
|
||||
// Only forward messages for registered conversations
|
||||
if (!conversations.has(chatJid)) continue;
|
||||
|
||||
let content =
|
||||
normalized.conversation ||
|
||||
normalized.extendedTextMessage?.text ||
|
||||
normalized.imageMessage?.caption ||
|
||||
normalized.videoMessage?.caption ||
|
||||
'';
|
||||
|
||||
// Normalize bot LID mention → assistant name for trigger matching
|
||||
if (botLidUser && content.includes(`@${botLidUser}`)) {
|
||||
content = content.replace(`@${botLidUser}`, `@${ASSISTANT_NAME}`);
|
||||
}
|
||||
|
||||
// Download media attachments (images, video, audio, documents)
|
||||
const attachments = await downloadInboundMedia(msg, normalized);
|
||||
|
||||
// Skip empty protocol messages (no text and no attachments)
|
||||
if (!content && attachments.length === 0) continue;
|
||||
|
||||
const sender = msg.key.participant || msg.key.remoteJid || '';
|
||||
const senderName = msg.pushName || sender.split('@')[0];
|
||||
const fromMe = msg.key.fromMe || false;
|
||||
// Filter bot's own messages to prevent echo loops.
|
||||
// fromMe is always true for messages sent from this linked device,
|
||||
// regardless of ASSISTANT_HAS_OWN_NUMBER mode.
|
||||
if (fromMe) continue;
|
||||
|
||||
const isBotMessage = ASSISTANT_HAS_OWN_NUMBER ? false : content.startsWith(`${ASSISTANT_NAME}:`);
|
||||
|
||||
// Check if this reply answers a pending question via slash command
|
||||
const pending = pendingQuestions.get(chatJid);
|
||||
if (pending && content.startsWith('/')) {
|
||||
const cmd = content.trim().toLowerCase();
|
||||
const matched = pending.options.find((o) => optionToCommand(o.label) === cmd);
|
||||
if (matched) {
|
||||
const voterName = msg.pushName || sender.split('@')[0];
|
||||
setupConfig.onAction(pending.questionId, matched.value, sender);
|
||||
pendingQuestions.delete(chatJid);
|
||||
await sendRawMessage(chatJid, `${matched.selectedLabel} by ${voterName}`);
|
||||
log.info('Question answered', {
|
||||
questionId: pending.questionId,
|
||||
value: matched.value,
|
||||
voterName,
|
||||
});
|
||||
continue; // Don't forward this reply to the agent
|
||||
}
|
||||
}
|
||||
|
||||
const inbound: InboundMessage = {
|
||||
id: msg.key.id || `wa-${Date.now()}`,
|
||||
kind: 'chat',
|
||||
content: {
|
||||
text: content,
|
||||
sender,
|
||||
senderName,
|
||||
...(attachments.length > 0 && { attachments }),
|
||||
fromMe,
|
||||
isBotMessage,
|
||||
isGroup,
|
||||
chatJid,
|
||||
},
|
||||
timestamp,
|
||||
};
|
||||
|
||||
// WhatsApp doesn't use threads — threadId is null
|
||||
setupConfig.onInbound(chatJid, null, inbound);
|
||||
} catch (err) {
|
||||
log.error('Error processing incoming WhatsApp message', {
|
||||
err,
|
||||
remoteJid: msg.key?.remoteJid,
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// --- ChannelAdapter implementation ---
|
||||
|
||||
const adapter: ChannelAdapter = {
|
||||
name: 'whatsapp',
|
||||
channelType: 'whatsapp',
|
||||
supportsThreads: false,
|
||||
|
||||
async setup(hostConfig: ChannelSetup) {
|
||||
setupConfig = hostConfig;
|
||||
conversations = buildConversationMap(hostConfig.conversations);
|
||||
|
||||
// Connect and wait for first open
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
resolveFirstOpen = resolve;
|
||||
rejectFirstOpen = reject;
|
||||
connectSocket().catch(reject);
|
||||
});
|
||||
|
||||
log.info('WhatsApp adapter initialized');
|
||||
},
|
||||
|
||||
async deliver(
|
||||
platformId: string,
|
||||
_threadId: string | null,
|
||||
message: OutboundMessage,
|
||||
): Promise<string | undefined> {
|
||||
const content = message.content as Record<string, unknown>;
|
||||
|
||||
// Ask question → text with slash command replies
|
||||
if (content.type === 'ask_question' && content.questionId && content.options) {
|
||||
const questionId = content.questionId as string;
|
||||
const title = content.title as string;
|
||||
const question = content.question as string;
|
||||
if (!title) {
|
||||
log.error('ask_question missing required title — skipping delivery', { questionId });
|
||||
return;
|
||||
}
|
||||
const options: NormalizedOption[] = normalizeOptions(content.options as never);
|
||||
|
||||
const optionLines = options.map((o) => ` ${optionToCommand(o.label)}`).join('\n');
|
||||
const text = `*${title}*\n\n${question}\n\nReply with:\n${optionLines}`;
|
||||
const msgId = await sendRawMessage(platformId, text);
|
||||
if (msgId) {
|
||||
pendingQuestions.set(platformId, { questionId, options });
|
||||
if (pendingQuestions.size > PENDING_QUESTIONS_MAX) {
|
||||
const oldest = pendingQuestions.keys().next().value!;
|
||||
pendingQuestions.delete(oldest);
|
||||
}
|
||||
}
|
||||
return msgId;
|
||||
}
|
||||
|
||||
// Reaction → emoji on a message
|
||||
if (content.operation === 'reaction' && content.messageId && content.emoji) {
|
||||
try {
|
||||
await sock.sendMessage(platformId, {
|
||||
react: {
|
||||
text: content.emoji as string,
|
||||
key: { remoteJid: platformId, id: content.messageId as string, fromMe: false },
|
||||
},
|
||||
});
|
||||
} catch (err) {
|
||||
log.debug('Failed to send reaction', { platformId, err });
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Normal message (with optional file attachments)
|
||||
const text = (content.markdown as string) || (content.text as string);
|
||||
const hasFiles = message.files && message.files.length > 0;
|
||||
|
||||
if (!text && !hasFiles) return;
|
||||
|
||||
// Send file attachments (first file gets the caption, rest are captionless)
|
||||
if (hasFiles) {
|
||||
let captionUsed = false;
|
||||
for (const file of message.files!) {
|
||||
try {
|
||||
const ext = path.extname(file.filename).toLowerCase();
|
||||
const caption = !captionUsed ? text : undefined;
|
||||
const mediaMsg = buildMediaMessage(file.data, file.filename, ext, caption);
|
||||
const sent = await sock.sendMessage(platformId, mediaMsg);
|
||||
if (sent?.key?.id && sent.message) {
|
||||
sentMessageCache.set(sent.key.id, sent.message);
|
||||
}
|
||||
if (caption) captionUsed = true;
|
||||
} catch (err) {
|
||||
log.error('Failed to send file', { platformId, filename: file.filename, err });
|
||||
}
|
||||
}
|
||||
if (captionUsed) return; // Text was sent as caption
|
||||
}
|
||||
|
||||
if (text) {
|
||||
const formatted = formatWhatsApp(text);
|
||||
const prefixed = ASSISTANT_HAS_OWN_NUMBER ? formatted : `${ASSISTANT_NAME}: ${formatted}`;
|
||||
return sendRawMessage(platformId, prefixed);
|
||||
}
|
||||
},
|
||||
|
||||
async setTyping(platformId: string) {
|
||||
try {
|
||||
await sock.sendPresenceUpdate('composing', platformId);
|
||||
} catch (err) {
|
||||
log.debug('Failed to update typing status', { jid: platformId, err });
|
||||
}
|
||||
},
|
||||
|
||||
async teardown() {
|
||||
connected = false;
|
||||
sock?.end(undefined);
|
||||
log.info('WhatsApp adapter shut down');
|
||||
},
|
||||
|
||||
isConnected() {
|
||||
return connected;
|
||||
},
|
||||
|
||||
async syncConversations(): Promise<ConversationInfo[]> {
|
||||
try {
|
||||
const groups = await sock.groupFetchAllParticipating();
|
||||
return Object.entries(groups)
|
||||
.filter(([, m]) => m.subject)
|
||||
.map(([jid, m]) => ({
|
||||
platformId: jid,
|
||||
name: m.subject,
|
||||
isGroup: true,
|
||||
}));
|
||||
} catch (err) {
|
||||
log.error('Failed to sync WhatsApp conversations', { err });
|
||||
return [];
|
||||
}
|
||||
},
|
||||
|
||||
updateConversations(configs: ConversationConfig[]) {
|
||||
conversations = buildConversationMap(configs);
|
||||
},
|
||||
};
|
||||
|
||||
return adapter;
|
||||
},
|
||||
});
|
||||
Reference in New Issue
Block a user