diff --git a/src/delivery.test.ts b/src/delivery.test.ts new file mode 100644 index 000000000..ca5b0303d --- /dev/null +++ b/src/delivery.test.ts @@ -0,0 +1,155 @@ +/** + * Delivery race tests. + * + * The active poll (1s, running sessions) and the sweep poll (60s, all + * active sessions) both call deliverSessionMessages. A running session + * sits in both result sets, so the two timer chains can race on the same + * outbound row — read-undelivered → call channel API → markDelivered. The + * INSERT OR IGNORE in markDelivered makes the DB write idempotent, but + * the channel API has already fired twice → user sees the message twice. + */ +import Database from 'better-sqlite3'; +import fs from 'fs'; +import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest'; + +vi.mock('./container-runner.js', () => ({ + wakeContainer: vi.fn().mockResolvedValue(undefined), + resetContainerIdleTimer: vi.fn(), + isContainerRunning: vi.fn().mockReturnValue(false), + killContainer: vi.fn(), + buildAgentGroupImage: vi.fn().mockResolvedValue(undefined), +})); + +vi.mock('./config.js', async () => { + const actual = await vi.importActual('./config.js'); + return { ...actual, DATA_DIR: '/tmp/nanoclaw-test-delivery' }; +}); + +const TEST_DIR = '/tmp/nanoclaw-test-delivery'; + +import { + initTestDb, + closeDb, + runMigrations, + createAgentGroup, + createMessagingGroup, +} from './db/index.js'; +import { resolveSession, outboundDbPath } from './session-manager.js'; +import { deliverSessionMessages, setDeliveryAdapter } from './delivery.js'; + +function now(): string { + return new Date().toISOString(); +} + +function seedAgentAndChannel(): void { + createAgentGroup({ + id: 'ag-1', + name: 'Test Agent', + folder: 'test-agent', + agent_provider: null, + created_at: now(), + }); + createMessagingGroup({ + id: 'mg-1', + channel_type: 'telegram', + platform_id: 'telegram:123', + name: 'Test Chat', + is_group: 0, + unknown_sender_policy: 'public', + created_at: now(), + }); +} + +function insertOutbound(agentGroupId: string, sessionId: string, msgId: string): void { + const db = new Database(outboundDbPath(agentGroupId, sessionId)); + db.prepare( + `INSERT INTO messages_out (id, timestamp, kind, platform_id, channel_type, content) + VALUES (?, datetime('now'), 'chat', 'telegram:123', 'telegram', ?)`, + ).run(msgId, JSON.stringify({ text: 'hello' })); + db.close(); +} + +beforeEach(() => { + if (fs.existsSync(TEST_DIR)) fs.rmSync(TEST_DIR, { recursive: true }); + fs.mkdirSync(TEST_DIR, { recursive: true }); + const db = initTestDb(); + runMigrations(db); +}); + +afterEach(() => { + closeDb(); + if (fs.existsSync(TEST_DIR)) fs.rmSync(TEST_DIR, { recursive: true }); +}); + +describe('deliverSessionMessages — concurrent invocations', () => { + it('delivers a message exactly once when active and sweep polls overlap', async () => { + seedAgentAndChannel(); + const { session } = resolveSession('ag-1', 'mg-1', null, 'shared'); + insertOutbound('ag-1', session.id, 'out-1'); + + const calls: string[] = []; + setDeliveryAdapter({ + async deliver(_channelType, _platformId, _threadId, _kind, content) { + calls.push(content); + // Hold long enough that the second concurrent caller can race the + // read-undelivered → markDelivered window. + await new Promise((r) => setTimeout(r, 100)); + return 'plat-msg-1'; + }, + }); + + // Two concurrent calls — simulating active (1s) and sweep (60s) polls + // hitting the same running session at the same moment. + await Promise.all([deliverSessionMessages(session), deliverSessionMessages(session)]); + + expect(calls).toHaveLength(1); + }); + + it('still delivers on a subsequent call after the first finishes', async () => { + seedAgentAndChannel(); + const { session } = resolveSession('ag-1', 'mg-1', null, 'shared'); + insertOutbound('ag-1', session.id, 'out-first'); + + const calls: string[] = []; + setDeliveryAdapter({ + async deliver(_channelType, _platformId, _threadId, _kind, content) { + calls.push(content); + return 'plat-msg-id'; + }, + }); + + await deliverSessionMessages(session); + expect(calls).toHaveLength(1); + + // Insert a second outbound message and deliver again — the lock from + // the first call must have been released. + insertOutbound('ag-1', session.id, 'out-second'); + await deliverSessionMessages(session); + expect(calls).toHaveLength(2); + }); + + it('does not re-deliver when retried after a successful send (cleanup-after-send safety)', async () => { + // If something post-send throws (e.g. outbox cleanup), the message has + // still landed on the user's screen — the catch path must not trigger + // a re-send. We simulate by having the adapter succeed on the first + // call and recording how many times it's invoked across two attempts. + seedAgentAndChannel(); + const { session } = resolveSession('ag-1', 'mg-1', null, 'shared'); + insertOutbound('ag-1', session.id, 'out-once'); + + let callCount = 0; + setDeliveryAdapter({ + async deliver() { + callCount++; + return 'plat-msg-id'; + }, + }); + + await deliverSessionMessages(session); + // Re-invoke — should be idempotent because the message is now in the + // delivered table; the channel adapter must not be called again. + await deliverSessionMessages(session); + + expect(callCount).toBe(1); + }); +}); diff --git a/src/delivery.ts b/src/delivery.ts index 12559c2ab..df1f992b8 100644 --- a/src/delivery.ts +++ b/src/delivery.ts @@ -60,6 +60,21 @@ const MAX_DELIVERY_ATTEMPTS = 3; /** Track delivery attempt counts. Resets on process restart (gives failed messages a fresh chance). */ const deliveryAttempts = new Map(); +/** + * Sessions whose outbound queue is currently being drained. + * + * The active poll (1s, running sessions) and the sweep poll (60s, all + * active sessions) both call deliverSessionMessages, and a running session + * is in *both* result sets. Without this guard, the two timer chains can + * race on the same outbound row: both read it as undelivered, both call + * the channel adapter, both markDelivered (idempotent in the DB via + * INSERT OR IGNORE — but the user has already seen the message twice). + * + * Skipping (vs. queueing) is correct: any message left over when the + * second caller skips will be picked up on the next poll tick (~1s). + */ +const inflightDeliveries = new Set(); + export interface ChannelDeliveryAdapter { deliver( channelType: string, @@ -365,7 +380,20 @@ async function pollSweep(): Promise { setTimeout(pollSweep, SWEEP_POLL_MS); } -async function deliverSessionMessages(session: Session): Promise { +export async function deliverSessionMessages(session: Session): Promise { + // Reject re-entry from a concurrent poll on the same session — see the + // comment on inflightDeliveries above. + if (inflightDeliveries.has(session.id)) return; + inflightDeliveries.add(session.id); + + try { + await drainSession(session); + } finally { + inflightDeliveries.delete(session.id); + } +} + +async function drainSession(session: Session): Promise { const agentGroup = getAgentGroup(session.agent_group_id); if (!agentGroup) return; @@ -594,9 +622,16 @@ async function deliverMessage( fileCount: files?.length, }); - // Clean up outbox directory after successful delivery + // Clean up outbox best-effort — the message is already on the user's + // screen, so a cleanup failure must NOT propagate. If it did, the + // caller would treat the whole delivery as failed, retry on the next + // poll, and the user would see the message twice. if (fs.existsSync(outboxDir)) { - fs.rmSync(outboxDir, { recursive: true, force: true }); + try { + fs.rmSync(outboxDir, { recursive: true, force: true }); + } catch (err) { + log.warn('Outbox cleanup failed (message already delivered)', { messageId: msg.id, err }); + } } return platformMsgId;