Files
nanoclaw/src/channels/telegram.ts
T
exe.dev user 8ef26d323f fix(v2/telegram): await pairing interceptor work to serialize DB commits
The Telegram pairing interceptor fired DB writes (createMessagingGroup,
upsertUser, grantRole) and the pairing-success confirmation inside an
unawaited `void (async () => {...})()`. Recent changes (0d3326a user
privilege model, c483860 pairing confirmation) widened the work done
inside this closure to include an extra two DB writes and a Telegram
API round-trip, making the race between match and commit reproducible
— a paired message could appear "lost" until a second send.

Change onInbound to optionally return a Promise, await it in the
chat-sdk-bridge dispatch callbacks, and make the pairing interceptor
async so its DB writes + confirmation send complete before the handler
resolves.

Note: the upstream @chat-adapter/telegram SDK itself does not await
processUpdate in its polling loop, so the adapter's getUpdates offset
still advances before our handler resolves. A true restart-safe fix
needs a corresponding change in chat-adapter.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-15 12:16:49 +00:00

230 lines
8.0 KiB
TypeScript

/**
* Telegram channel adapter (v2) — uses Chat SDK bridge, with a pairing
* interceptor wrapped around onInbound to verify chat ownership before
* registration. See telegram-pairing.ts for the why.
*/
import { createTelegramAdapter } from '@chat-adapter/telegram';
import { readEnvFile } from '../env.js';
import { log } from '../log.js';
import { createMessagingGroup, getMessagingGroupByPlatform, updateMessagingGroup } from '../db/messaging-groups.js';
import { grantRole, hasAnyOwner } from '../db/user-roles.js';
import { upsertUser } from '../db/users.js';
import { createChatSdkBridge, type ReplyContext } from './chat-sdk-bridge.js';
import { sanitizeTelegramLegacyMarkdown } from './telegram-markdown-sanitize.js';
import { registerChannelAdapter } from './channel-registry.js';
import type { ChannelAdapter, ChannelSetup, InboundMessage } from './adapter.js';
import { tryConsume } from './telegram-pairing.js';
/**
* Retry a one-shot operation that can fail on transient network errors at
* cold-start (DNS hiccups, brief upstream outages). Exponential backoff capped
* at 5 attempts — if the network is truly down we surface it instead of
* hanging the service indefinitely.
*/
async function withRetry<T>(fn: () => Promise<T>, label: string, maxAttempts = 5): Promise<T> {
let lastErr: unknown;
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
try {
return await fn();
} catch (err) {
lastErr = err;
if (attempt === maxAttempts) break;
const delay = Math.min(16000, 1000 * 2 ** (attempt - 1));
log.warn('Telegram setup failed, retrying', { label, attempt, delayMs: delay, err });
await new Promise((r) => setTimeout(r, delay));
}
}
throw lastErr;
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
function extractReplyContext(raw: Record<string, any>): ReplyContext | null {
if (!raw.reply_to_message) return null;
const reply = raw.reply_to_message;
return {
text: reply.text || reply.caption || '',
sender: reply.from?.first_name || reply.from?.username || 'Unknown',
};
}
/** Look up the bot username via Telegram getMe. Cached after first call. */
async function fetchBotUsername(token: string): Promise<string | null> {
try {
const res = await fetch(`https://api.telegram.org/bot${token}/getMe`);
const json = (await res.json()) as { ok: boolean; result?: { username?: string } };
return json.ok ? (json.result?.username ?? null) : null;
} catch (err) {
log.warn('Telegram getMe failed', { err });
return null;
}
}
function isGroupPlatformId(platformId: string): boolean {
// platformId is "telegram:<chatId>". Negative chat IDs are groups/channels.
const id = platformId.split(':').pop() ?? '';
return id.startsWith('-');
}
interface InboundFields {
text: string;
authorUserId: string | null;
}
function readInboundFields(message: InboundMessage): InboundFields {
if (message.kind !== 'chat-sdk' || !message.content || typeof message.content !== 'object') {
return { text: '', authorUserId: null };
}
const c = message.content as { text?: string; author?: { userId?: string } };
return { text: c.text ?? '', authorUserId: c.author?.userId ?? null };
}
/**
* Build an onInbound interceptor that consumes pairing codes before they
* reach the router. On match: records the chat + its paired user, promotes
* the user to owner if the instance has no owner yet, and short-circuits.
* On miss: forwards to the host.
*/
/**
* Send a one-shot confirmation back to the paired chat. Best-effort — failures
* are logged but never propagated, so a Telegram outage can't undo a successful
* pairing or trigger the interceptor's fail-open path.
*/
async function sendPairingConfirmation(token: string, platformId: string): Promise<void> {
const chatId = platformId.split(':').slice(1).join(':');
if (!chatId) return;
try {
const res = await fetch(`https://api.telegram.org/bot${token}/sendMessage`, {
method: 'POST',
headers: { 'content-type': 'application/json' },
body: JSON.stringify({
chat_id: chatId,
text: "Pairing success! I'm spinning up the agent now, you'll get a message from them shortly.",
}),
});
if (!res.ok) {
log.warn('Telegram pairing confirmation non-OK', { status: res.status });
}
} catch (err) {
log.warn('Telegram pairing confirmation failed', { err });
}
}
function createPairingInterceptor(
botUsernamePromise: Promise<string | null>,
hostOnInbound: ChannelSetup['onInbound'],
token: string,
): ChannelSetup['onInbound'] {
return async (platformId, threadId, message) => {
try {
const botUsername = await botUsernamePromise;
if (!botUsername) {
hostOnInbound(platformId, threadId, message);
return;
}
const { text, authorUserId } = readInboundFields(message);
if (!text) {
hostOnInbound(platformId, threadId, message);
return;
}
const consumed = await tryConsume({
text,
botUsername,
platformId,
isGroup: isGroupPlatformId(platformId),
adminUserId: authorUserId,
});
if (!consumed) {
hostOnInbound(platformId, threadId, message);
return;
}
// Pairing matched — record the chat and short-circuit so the
// code-bearing message never reaches an agent. Privilege is now a
// property of the paired user, not the chat: upsert the user, and if
// this instance has no owner yet, promote them to owner.
const existing = getMessagingGroupByPlatform('telegram', platformId);
if (existing) {
updateMessagingGroup(existing.id, {
is_group: consumed.consumed!.isGroup ? 1 : 0,
});
} else {
createMessagingGroup({
id: `mg-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`,
channel_type: 'telegram',
platform_id: platformId,
name: consumed.consumed!.name,
is_group: consumed.consumed!.isGroup ? 1 : 0,
unknown_sender_policy: 'strict',
created_at: new Date().toISOString(),
});
}
const pairedUserId = `telegram:${consumed.consumed!.adminUserId}`;
upsertUser({
id: pairedUserId,
kind: 'telegram',
display_name: null,
created_at: new Date().toISOString(),
});
let promotedToOwner = false;
if (!hasAnyOwner()) {
grantRole({
user_id: pairedUserId,
role: 'owner',
agent_group_id: null,
granted_by: null,
granted_at: new Date().toISOString(),
});
promotedToOwner = true;
}
log.info('Telegram pairing accepted — chat registered', {
platformId,
pairedUser: pairedUserId,
promotedToOwner,
intent: consumed.intent,
});
await sendPairingConfirmation(token, platformId);
} catch (err) {
log.error('Telegram pairing interceptor error', { err });
// Fail open: pass through so a pairing bug doesn't break normal traffic.
hostOnInbound(platformId, threadId, message);
}
};
}
registerChannelAdapter('telegram', {
factory: () => {
const env = readEnvFile(['TELEGRAM_BOT_TOKEN']);
if (!env.TELEGRAM_BOT_TOKEN) return null;
const token = env.TELEGRAM_BOT_TOKEN;
const telegramAdapter = createTelegramAdapter({
botToken: token,
mode: 'polling',
});
const bridge = createChatSdkBridge({
adapter: telegramAdapter,
concurrency: 'concurrent',
extractReplyContext,
supportsThreads: false,
transformOutboundText: sanitizeTelegramLegacyMarkdown,
});
const botUsernamePromise = fetchBotUsername(token);
const wrapped: ChannelAdapter = {
...bridge,
async setup(hostConfig: ChannelSetup) {
const intercepted: ChannelSetup = {
...hostConfig,
onInbound: createPairingInterceptor(botUsernamePromise, hostConfig.onInbound, token),
};
return withRetry(() => bridge.setup(intercepted), 'bridge.setup');
},
};
return wrapped;
},
});