From 8ef30ad28935f969b47e4d1577a492c286ce985e Mon Sep 17 00:00:00 2001 From: "exe.dev user" Date: Thu, 16 Apr 2026 15:51:37 +0000 Subject: [PATCH] fix(v2): cancel/pause/resume recurring tasks via series_id Recurring tasks spawn a new messages_in row per occurrence. Cancel only matched the completed row the agent remembered, leaving the live next occurrence running. Tag every row in a recurrence chain with the originating task's id (series_id) so cancel/pause/resume can reach any live row in the series. Cancel also clears recurrence to prevent the sweep from cloning a cancelled task. Kind-aware id prefix on recurrences (task- instead of msg-) keeps list_tasks output consistent across occurrences. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/db/schema.ts | 2 + src/db/session-db.test.ts | 201 ++++++++++++++++++++++++++++++++++++++ src/db/session-db.ts | 49 +++++++--- src/host-sweep.ts | 5 +- src/session-manager.ts | 5 +- 5 files changed, 245 insertions(+), 17 deletions(-) create mode 100644 src/db/session-db.test.ts diff --git a/src/db/schema.ts b/src/db/schema.ts index fa5a4054f..2b53ed64c 100644 --- a/src/db/schema.ts +++ b/src/db/schema.ts @@ -136,12 +136,14 @@ CREATE TABLE IF NOT EXISTS messages_in ( status TEXT DEFAULT 'pending', process_after TEXT, recurrence TEXT, + series_id TEXT, tries INTEGER DEFAULT 0, platform_id TEXT, channel_type TEXT, thread_id TEXT, content TEXT NOT NULL ); +CREATE INDEX IF NOT EXISTS idx_messages_in_series ON messages_in(series_id); -- Host tracks delivery outcomes for messages_out IDs. -- Avoids writing to outbound.db (container-owned). diff --git a/src/db/session-db.test.ts b/src/db/session-db.test.ts new file mode 100644 index 000000000..cf544181c --- /dev/null +++ b/src/db/session-db.test.ts @@ -0,0 +1,201 @@ +/** + * Tests for per-session messages_in operations — focused on the series_id + * invariant that lets cancel/pause/resume reach the live next occurrence of + * a recurring task, even after the row the agent remembers has completed + * and been replaced by a follow-up. + */ +import Database from 'better-sqlite3'; +import fs from 'fs'; +import path from 'path'; +import { describe, it, expect, afterEach } from 'vitest'; + +import { + ensureSchema, + openInboundDb, + insertTask, + insertRecurrence, + cancelTask, + pauseTask, + resumeTask, + getCompletedRecurring, + migrateMessagesInTable, + type RecurringMessage, +} from './session-db.js'; + +const TEST_DIR = '/tmp/nanoclaw-session-db-test'; +const DB_PATH = path.join(TEST_DIR, 'inbound.db'); + +function freshDb() { + if (fs.existsSync(TEST_DIR)) fs.rmSync(TEST_DIR, { recursive: true }); + fs.mkdirSync(TEST_DIR, { recursive: true }); + ensureSchema(DB_PATH, 'inbound'); + return openInboundDb(DB_PATH); +} + +function insertBasicTask(db: ReturnType, id: string, recurrence: string | null) { + insertTask(db, { + id, + processAfter: new Date().toISOString(), + recurrence, + platformId: null, + channelType: null, + threadId: null, + content: JSON.stringify({ prompt: 'noop' }), + }); +} + +afterEach(() => { + if (fs.existsSync(TEST_DIR)) fs.rmSync(TEST_DIR, { recursive: true }); +}); + +describe('insertTask', () => { + it('stamps series_id = id on insert', () => { + const db = freshDb(); + insertBasicTask(db, 'task-1', null); + const row = db.prepare('SELECT series_id FROM messages_in WHERE id = ?').get('task-1') as { series_id: string }; + expect(row.series_id).toBe('task-1'); + db.close(); + }); +}); + +describe('cancelTask / pauseTask / resumeTask series matching', () => { + // Simulates the recurrence chain that used to survive cancellation: + // the original task completes → handleRecurrence spawns a follow-up + // row → agent calls cancel_task(originalId) → historically only hit + // the completed row, leaving the live one running. + function seedRecurringChain(db: ReturnType) { + insertBasicTask(db, 'task-orig', '0 9 * * *'); + // Mark the original as completed (as syncProcessingAcks would do). + db.prepare("UPDATE messages_in SET status = 'completed' WHERE id = 'task-orig'").run(); + + const msg: RecurringMessage = { + id: 'task-orig', + kind: 'task', + content: JSON.stringify({ prompt: 'noop' }), + recurrence: '0 9 * * *', + process_after: null, + platform_id: null, + channel_type: null, + thread_id: null, + series_id: 'task-orig', + }; + insertRecurrence(db, msg, 'task-next', new Date(Date.now() + 86400000).toISOString()); + } + + it('cancel by original id reaches the live follow-up via series_id', () => { + const db = freshDb(); + seedRecurringChain(db); + + cancelTask(db, 'task-orig'); + + const live = db.prepare("SELECT id, status, recurrence FROM messages_in WHERE status = 'pending'").all(); + expect(live).toHaveLength(0); + + const followUp = db.prepare("SELECT status, recurrence FROM messages_in WHERE id = 'task-next'").get() as { + status: string; + recurrence: string | null; + }; + expect(followUp.status).toBe('completed'); + // Recurrence cleared so the sweep doesn't spawn another clone. + expect(followUp.recurrence).toBeNull(); + db.close(); + }); + + it('cancelled task is not picked up by getCompletedRecurring', () => { + const db = freshDb(); + insertBasicTask(db, 'task-1', '0 9 * * *'); + cancelTask(db, 'task-1'); + + const recurring = getCompletedRecurring(db); + expect(recurring).toHaveLength(0); + db.close(); + }); + + it('pause by original id pauses the live follow-up', () => { + const db = freshDb(); + seedRecurringChain(db); + + pauseTask(db, 'task-orig'); + + const followUp = db.prepare("SELECT status FROM messages_in WHERE id = 'task-next'").get() as { status: string }; + expect(followUp.status).toBe('paused'); + db.close(); + }); + + it('resume by original id resumes the live follow-up', () => { + const db = freshDb(); + seedRecurringChain(db); + + db.prepare("UPDATE messages_in SET status = 'paused' WHERE id = 'task-next'").run(); + resumeTask(db, 'task-orig'); + + const followUp = db.prepare("SELECT status FROM messages_in WHERE id = 'task-next'").get() as { status: string }; + expect(followUp.status).toBe('pending'); + db.close(); + }); +}); + +describe('insertRecurrence', () => { + it('copies series_id forward', () => { + const db = freshDb(); + insertBasicTask(db, 'task-orig', '0 9 * * *'); + db.prepare("UPDATE messages_in SET status = 'completed' WHERE id = 'task-orig'").run(); + + const msg: RecurringMessage = { + id: 'task-orig', + kind: 'task', + content: '{}', + recurrence: '0 9 * * *', + process_after: null, + platform_id: null, + channel_type: null, + thread_id: null, + series_id: 'task-orig', + }; + insertRecurrence(db, msg, 'task-next', new Date().toISOString()); + + const row = db.prepare('SELECT series_id FROM messages_in WHERE id = ?').get('task-next') as { + series_id: string; + }; + expect(row.series_id).toBe('task-orig'); + db.close(); + }); +}); + +describe('migrateMessagesInTable', () => { + it('backfills series_id = id on legacy rows and is idempotent', () => { + if (fs.existsSync(TEST_DIR)) fs.rmSync(TEST_DIR, { recursive: true }); + fs.mkdirSync(TEST_DIR, { recursive: true }); + + // Build a legacy inbound.db WITHOUT series_id to simulate a pre-fix install. + const db = new Database(DB_PATH); + db.exec(` + CREATE TABLE messages_in ( + id TEXT PRIMARY KEY, + seq INTEGER UNIQUE, + kind TEXT NOT NULL, + timestamp TEXT NOT NULL, + status TEXT DEFAULT 'pending', + process_after TEXT, + recurrence TEXT, + tries INTEGER DEFAULT 0, + platform_id TEXT, + channel_type TEXT, + thread_id TEXT, + content TEXT NOT NULL + ); + `); + db.prepare( + "INSERT INTO messages_in (id, seq, kind, timestamp, status, content) VALUES (?, ?, 'task', datetime('now'), 'pending', '{}')", + ).run('legacy-1', 2); + + migrateMessagesInTable(db); + migrateMessagesInTable(db); // idempotent + + const row = db.prepare('SELECT series_id FROM messages_in WHERE id = ?').get('legacy-1') as { + series_id: string; + }; + expect(row.series_id).toBe('legacy-1'); + db.close(); + }); +}); diff --git a/src/db/session-db.ts b/src/db/session-db.ts index 83271230d..8bb9e2e6f 100644 --- a/src/db/session-db.ts +++ b/src/db/session-db.ts @@ -92,8 +92,8 @@ export function insertMessage( }, ): void { db.prepare( - `INSERT INTO messages_in (id, seq, kind, timestamp, status, platform_id, channel_type, thread_id, content, process_after, recurrence) - VALUES (@id, @seq, @kind, @timestamp, 'pending', @platformId, @channelType, @threadId, @content, @processAfter, @recurrence)`, + `INSERT INTO messages_in (id, seq, kind, timestamp, status, platform_id, channel_type, thread_id, content, process_after, recurrence, series_id) + VALUES (@id, @seq, @kind, @timestamp, 'pending', @platformId, @channelType, @threadId, @content, @processAfter, @recurrence, @id)`, ).run({ ...message, seq: nextEvenSeq(db), @@ -113,30 +113,34 @@ export function insertTask( }, ): void { db.prepare( - `INSERT INTO messages_in (id, seq, timestamp, status, tries, process_after, recurrence, kind, platform_id, channel_type, thread_id, content) - VALUES (@id, @seq, datetime('now'), 'pending', 0, @processAfter, @recurrence, 'task', @platformId, @channelType, @threadId, @content)`, + `INSERT INTO messages_in (id, seq, timestamp, status, tries, process_after, recurrence, kind, platform_id, channel_type, thread_id, content, series_id) + VALUES (@id, @seq, datetime('now'), 'pending', 0, @processAfter, @recurrence, 'task', @platformId, @channelType, @threadId, @content, @id)`, ).run({ ...task, seq: nextEvenSeq(db), }); } +// cancel/pause/resume match any live row in the series, not just the exact id. +// Recurring tasks get a new row per occurrence (see handleRecurrence), all +// sharing series_id. Matching by id alone would only hit the completed row +// the agent remembers, missing the live next occurrence. export function cancelTask(db: Database.Database, taskId: string): void { db.prepare( - "UPDATE messages_in SET status = 'completed' WHERE id = ? AND kind = 'task' AND status IN ('pending', 'paused')", - ).run(taskId); + "UPDATE messages_in SET status = 'completed', recurrence = NULL WHERE (id = ? OR series_id = ?) AND kind = 'task' AND status IN ('pending', 'paused')", + ).run(taskId, taskId); } export function pauseTask(db: Database.Database, taskId: string): void { - db.prepare("UPDATE messages_in SET status = 'paused' WHERE id = ? AND kind = 'task' AND status = 'pending'").run( - taskId, - ); + db.prepare( + "UPDATE messages_in SET status = 'paused' WHERE (id = ? OR series_id = ?) AND kind = 'task' AND status = 'pending'", + ).run(taskId, taskId); } export function resumeTask(db: Database.Database, taskId: string): void { - db.prepare("UPDATE messages_in SET status = 'pending' WHERE id = ? AND kind = 'task' AND status = 'paused'").run( - taskId, - ); + db.prepare( + "UPDATE messages_in SET status = 'pending' WHERE (id = ? OR series_id = ?) AND kind = 'task' AND status = 'paused'", + ).run(taskId, taskId); } export function countDueMessages(db: Database.Database): number { @@ -180,6 +184,7 @@ export interface RecurringMessage { platform_id: string | null; channel_type: string | null; thread_id: string | null; + series_id: string; } export function getCompletedRecurring(db: Database.Database): RecurringMessage[] { @@ -195,8 +200,8 @@ export function insertRecurrence( nextRun: string | null, ): void { db.prepare( - `INSERT INTO messages_in (id, seq, kind, timestamp, status, process_after, recurrence, platform_id, channel_type, thread_id, content) - VALUES (?, ?, ?, datetime('now'), 'pending', ?, ?, ?, ?, ?, ?)`, + `INSERT INTO messages_in (id, seq, kind, timestamp, status, process_after, recurrence, platform_id, channel_type, thread_id, content, series_id) + VALUES (?, ?, ?, datetime('now'), 'pending', ?, ?, ?, ?, ?, ?, ?)`, ).run( newId, nextEvenSeq(db), @@ -207,6 +212,7 @@ export function insertRecurrence( msg.channel_type, msg.thread_id, msg.content, + msg.series_id, ); } @@ -296,3 +302,18 @@ export function migrateDeliveredTable(db: Database.Database): void { db.prepare("ALTER TABLE delivered ADD COLUMN status TEXT NOT NULL DEFAULT 'delivered'").run(); } } + +// Adds series_id (groups all occurrences of a recurring task) to pre-existing +// messages_in tables. No-op on fresh installs where the column is in the schema. +// Backfills existing rows so cancel/pause/resume queries can rely on +// series_id IS NOT NULL. +export function migrateMessagesInTable(db: Database.Database): void { + const cols = new Set( + (db.prepare("PRAGMA table_info('messages_in')").all() as Array<{ name: string }>).map((c) => c.name), + ); + if (!cols.has('series_id')) { + db.prepare('ALTER TABLE messages_in ADD COLUMN series_id TEXT').run(); + db.prepare('UPDATE messages_in SET series_id = id WHERE series_id IS NULL').run(); + db.prepare('CREATE INDEX IF NOT EXISTS idx_messages_in_series ON messages_in(series_id)').run(); + } +} diff --git a/src/host-sweep.ts b/src/host-sweep.ts index 8316cc5c0..9e8f6b452 100644 --- a/src/host-sweep.ts +++ b/src/host-sweep.ts @@ -159,12 +159,13 @@ async function handleRecurrence(inDb: Database.Database, session: Session): Prom const { CronExpressionParser } = await import('cron-parser'); const interval = CronExpressionParser.parse(msg.recurrence); const nextRun = interval.next().toISOString(); - const newId = `msg-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; + const prefix = msg.kind === 'task' ? 'task' : 'msg'; + const newId = `${prefix}-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; insertRecurrence(inDb, msg, newId, nextRun); clearRecurrence(inDb, msg.id); - log.info('Inserted next recurrence', { originalId: msg.id, newId, nextRun }); + log.info('Inserted next recurrence', { originalId: msg.id, newId, seriesId: msg.series_id, nextRun }); } catch (err) { log.error('Failed to compute next recurrence', { messageId: msg.id, recurrence: msg.recurrence, err }); } diff --git a/src/session-manager.ts b/src/session-manager.ts index 4ebbd3fcf..0f8c8a3b2 100644 --- a/src/session-manager.ts +++ b/src/session-manager.ts @@ -26,6 +26,7 @@ import { upsertSessionRouting, replaceDestinations, insertMessage, + migrateMessagesInTable, type DestinationRow, } from './db/session-db.js'; import { log } from './log.js'; @@ -305,7 +306,9 @@ function extractAttachmentFiles( /** Open the inbound DB for a session (host reads/writes). */ export function openInboundDb(agentGroupId: string, sessionId: string): Database.Database { - return openInboundDbRaw(inboundDbPath(agentGroupId, sessionId)); + const db = openInboundDbRaw(inboundDbPath(agentGroupId, sessionId)); + migrateMessagesInTable(db); + return db; } /** Open the outbound DB for a session (host reads only). */