v2 phase 1: foundation — types, DB layer, logging

Add the v2 data layer: typed interfaces, central DB with migration
runner, per-entity CRUD, and agent-runner session DB operations.

- src/log.ts: concise message-first logging API
- src/types-v2.ts: AgentGroup, MessagingGroup, Session, MessageIn/Out
- src/db/: connection (WAL), migration runner, 001-initial schema,
  CRUD for agent_groups, messaging_groups, sessions, pending_questions
- container/agent-runner/src/db/: session DB connection, messages_in
  reads + status transitions, messages_out writes
- 31 new tests, all 277 tests pass

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
gavrielc
2026-04-08 23:34:09 +03:00
parent 90acff28ad
commit 3f0451b7b0
15 changed files with 1267 additions and 0 deletions
@@ -0,0 +1,55 @@
import Database from 'better-sqlite3';
const SESSION_DB_PATH = '/workspace/session.db';
let _db: Database.Database | null = null;
export function getSessionDb(): Database.Database {
if (!_db) {
_db = new Database(process.env.SESSION_DB_PATH || SESSION_DB_PATH);
_db.pragma('journal_mode = WAL');
_db.pragma('foreign_keys = ON');
}
return _db;
}
/** For tests — opens an in-memory DB with session schema. */
export function initTestSessionDb(): Database.Database {
_db = new Database(':memory:');
_db.pragma('foreign_keys = ON');
_db.exec(`
CREATE TABLE messages_in (
id TEXT PRIMARY KEY,
kind TEXT NOT NULL,
timestamp TEXT NOT NULL,
status TEXT DEFAULT 'pending',
status_changed TEXT,
process_after TEXT,
recurrence TEXT,
tries INTEGER DEFAULT 0,
platform_id TEXT,
channel_type TEXT,
thread_id TEXT,
content TEXT NOT NULL
);
CREATE TABLE messages_out (
id TEXT PRIMARY KEY,
in_reply_to TEXT,
timestamp TEXT NOT NULL,
delivered INTEGER DEFAULT 0,
deliver_after TEXT,
recurrence TEXT,
kind TEXT NOT NULL,
platform_id TEXT,
channel_type TEXT,
thread_id TEXT,
content TEXT NOT NULL
);
`);
return _db;
}
export function closeSessionDb(): void {
_db?.close();
_db = null;
}
+5
View File
@@ -0,0 +1,5 @@
export { getSessionDb, initTestSessionDb, closeSessionDb } from './connection.js';
export { getPendingMessages, markProcessing, markCompleted, markFailed, getMessageIn, findQuestionResponse } from './messages-in.js';
export type { MessageInRow } from './messages-in.js';
export { writeMessageOut, getUndeliveredMessages, markDelivered } from './messages-out.js';
export type { MessageOutRow, WriteMessageOut } from './messages-out.js';
@@ -0,0 +1,65 @@
import { getSessionDb } from './connection.js';
export interface MessageInRow {
id: string;
kind: string;
timestamp: string;
status: string;
status_changed: string | null;
process_after: string | null;
recurrence: string | null;
tries: number;
platform_id: string | null;
channel_type: string | null;
thread_id: string | null;
content: string;
}
/** Fetch all pending messages that are due for processing. */
export function getPendingMessages(): MessageInRow[] {
return getSessionDb()
.prepare(
`SELECT * FROM messages_in
WHERE status = 'pending'
AND (process_after IS NULL OR process_after <= datetime('now'))
ORDER BY timestamp ASC`,
)
.all() as MessageInRow[];
}
/** Mark messages as processing. */
export function markProcessing(ids: string[]): void {
if (ids.length === 0) return;
const db = getSessionDb();
const stmt = db.prepare("UPDATE messages_in SET status = 'processing', status_changed = datetime('now'), tries = tries + 1 WHERE id = ?");
db.transaction(() => {
for (const id of ids) stmt.run(id);
})();
}
/** Mark messages as completed. */
export function markCompleted(ids: string[]): void {
if (ids.length === 0) return;
const db = getSessionDb();
const stmt = db.prepare("UPDATE messages_in SET status = 'completed', status_changed = datetime('now') WHERE id = ?");
db.transaction(() => {
for (const id of ids) stmt.run(id);
})();
}
/** Mark a single message as failed. */
export function markFailed(id: string): void {
getSessionDb().prepare("UPDATE messages_in SET status = 'failed', status_changed = datetime('now') WHERE id = ?").run(id);
}
/** Get a message by ID. */
export function getMessageIn(id: string): MessageInRow | undefined {
return getSessionDb().prepare('SELECT * FROM messages_in WHERE id = ?').get(id) as MessageInRow | undefined;
}
/** Find a pending response to a question (by questionId in content). */
export function findQuestionResponse(questionId: string): MessageInRow | undefined {
return getSessionDb()
.prepare("SELECT * FROM messages_in WHERE status = 'pending' AND content LIKE ?")
.get(`%"questionId":"${questionId}"%`) as MessageInRow | undefined;
}
@@ -0,0 +1,62 @@
import { getSessionDb } from './connection.js';
export interface MessageOutRow {
id: string;
in_reply_to: string | null;
timestamp: string;
delivered: number;
deliver_after: string | null;
recurrence: string | null;
kind: string;
platform_id: string | null;
channel_type: string | null;
thread_id: string | null;
content: string;
}
export interface WriteMessageOut {
id: string;
in_reply_to?: string | null;
deliver_after?: string | null;
recurrence?: string | null;
kind: string;
platform_id?: string | null;
channel_type?: string | null;
thread_id?: string | null;
content: string;
}
/** Write a new outbound message. */
export function writeMessageOut(msg: WriteMessageOut): void {
getSessionDb()
.prepare(
`INSERT INTO messages_out (id, in_reply_to, timestamp, delivered, deliver_after, recurrence, kind, platform_id, channel_type, thread_id, content)
VALUES (@id, @in_reply_to, datetime('now'), 0, @deliver_after, @recurrence, @kind, @platform_id, @channel_type, @thread_id, @content)`,
)
.run({
in_reply_to: null,
deliver_after: null,
recurrence: null,
platform_id: null,
channel_type: null,
thread_id: null,
...msg,
});
}
/** Get undelivered messages (for host polling). */
export function getUndeliveredMessages(): MessageOutRow[] {
return getSessionDb()
.prepare(
`SELECT * FROM messages_out
WHERE delivered = 0
AND (deliver_after IS NULL OR deliver_after <= datetime('now'))
ORDER BY timestamp ASC`,
)
.all() as MessageOutRow[];
}
/** Mark a message as delivered. */
export function markDelivered(id: string): void {
getSessionDb().prepare('UPDATE messages_out SET delivered = 1 WHERE id = ?').run(id);
}