Merge branch 'main' into fix/agent-runner-drop-messages-envelope

This commit is contained in:
gavrielc
2026-05-22 23:05:40 +03:00
committed by GitHub
18 changed files with 859 additions and 94 deletions
+13
View File
@@ -58,6 +58,19 @@ export async function runPollLoop(config: PollLoopConfig): Promise<void> {
// a Codex thread id never gets handed to Claude or vice versa.
let continuation: string | undefined = migrateLegacyContinuation(config.providerName);
// Before resuming, drop a session whose on-disk transcript has grown too
// large/old to cold-resume within the host's idle ceiling. Without this a
// long-lived hub keeps trying to reload an ever-growing .jsonl, hangs the
// first turn, and gets killed before it can reply (then repeats forever).
if (continuation) {
const rotateReason = config.provider.maybeRotateContinuation?.(continuation, config.cwd);
if (rotateReason) {
log(`Rotating session — ${rotateReason}; starting fresh`);
clearContinuation(config.providerName);
continuation = undefined;
}
}
if (continuation) {
log(`Resuming agent session ${continuation}`);
}
@@ -0,0 +1,89 @@
import { describe, it, expect, beforeEach, afterEach } from 'bun:test';
import fs from 'fs';
import os from 'os';
import path from 'path';
import { ClaudeProvider } from './claude.js';
// maybeRotateContinuation guards the cold-resume failure mode: a long-lived
// session whose on-disk transcript has grown so large (or old) that the SDK
// can't reload it before the host's idle ceiling kills the container.
let tmp: string;
let prevHome: string | undefined;
let prevConv: string | undefined;
let prevBytes: string | undefined;
let prevDays: string | undefined;
const PROJECT_DIR = '-workspace-agent';
const CWD = '/workspace/agent';
function writeTranscript(sessionId: string, bytes: number, firstTs?: string): string {
const dir = path.join(tmp, '.claude', 'projects', PROJECT_DIR);
fs.mkdirSync(dir, { recursive: true });
const p = path.join(dir, `${sessionId}.jsonl`);
const first =
JSON.stringify({
type: 'user',
timestamp: firstTs ?? new Date().toISOString(),
message: { role: 'user', content: 'hello' },
}) + '\n';
const filler = 'x'.repeat(Math.max(0, bytes - first.length));
fs.writeFileSync(p, first + filler);
return p;
}
beforeEach(() => {
tmp = fs.mkdtempSync(path.join(os.tmpdir(), 'claude-rotate-'));
prevHome = process.env.HOME;
prevConv = process.env.NANOCLAW_CONVERSATIONS_DIR;
prevBytes = process.env.CLAUDE_TRANSCRIPT_ROTATE_BYTES;
prevDays = process.env.CLAUDE_TRANSCRIPT_ROTATE_AGE_DAYS;
process.env.HOME = tmp;
delete process.env.CLAUDE_CONFIG_DIR;
process.env.NANOCLAW_CONVERSATIONS_DIR = path.join(tmp, 'conversations');
});
afterEach(() => {
const restore = (k: string, v: string | undefined) => (v === undefined ? delete process.env[k] : (process.env[k] = v));
restore('HOME', prevHome);
restore('NANOCLAW_CONVERSATIONS_DIR', prevConv);
restore('CLAUDE_TRANSCRIPT_ROTATE_BYTES', prevBytes);
restore('CLAUDE_TRANSCRIPT_ROTATE_AGE_DAYS', prevDays);
fs.rmSync(tmp, { recursive: true, force: true });
});
describe('ClaudeProvider.maybeRotateContinuation', () => {
it('keeps a small, recent transcript (returns null, leaves file in place)', () => {
process.env.CLAUDE_TRANSCRIPT_ROTATE_BYTES = String(1024 * 1024);
const p = writeTranscript('sess-small', 4096);
const provider = new ClaudeProvider();
expect(provider.maybeRotateContinuation('sess-small', CWD)).toBeNull();
expect(fs.existsSync(p)).toBe(true);
});
it('rotates an oversized transcript (returns reason, moves the .jsonl aside)', () => {
process.env.CLAUDE_TRANSCRIPT_ROTATE_BYTES = String(64 * 1024);
const p = writeTranscript('sess-big', 200 * 1024);
const provider = new ClaudeProvider();
const reason = provider.maybeRotateContinuation('sess-big', CWD);
expect(reason).toContain('MB');
expect(fs.existsSync(p)).toBe(false); // original moved out of the resume path
const dir = path.dirname(p);
expect(fs.readdirSync(dir).some((f) => f.startsWith('sess-big.jsonl.rotated-'))).toBe(true);
});
it('rotates an aged transcript even when small', () => {
process.env.CLAUDE_TRANSCRIPT_ROTATE_BYTES = String(1024 * 1024);
process.env.CLAUDE_TRANSCRIPT_ROTATE_AGE_DAYS = '7';
const old = new Date(Date.now() - 10 * 86400_000).toISOString();
writeTranscript('sess-old', 2048, old);
const provider = new ClaudeProvider();
expect(provider.maybeRotateContinuation('sess-old', CWD)).toContain('d');
});
it('returns null for an unknown session id', () => {
const provider = new ClaudeProvider();
expect(provider.maybeRotateContinuation('does-not-exist', CWD)).toBeNull();
});
});
+145 -36
View File
@@ -1,4 +1,5 @@
import fs from 'fs';
import os from 'os';
import path from 'path';
import { query as sdkQuery, type HookCallback, type PreCompactHookInput } from '@anthropic-ai/claude-agent-sdk';
@@ -188,49 +189,122 @@ const postToolUseHook: HookCallback = async () => {
return { continue: true };
};
/**
* Read a Claude transcript .jsonl, render a markdown summary, and drop it into
* the agent's `conversations/` folder so context survives a compaction or a
* session rotation. Best-effort: returns false (and logs) on any failure.
*/
function archiveTranscriptFile(transcriptPath: string | undefined, sessionId: string | undefined, assistantName?: string): boolean {
if (!transcriptPath || !fs.existsSync(transcriptPath)) {
log('No transcript found for archiving');
return false;
}
try {
const content = fs.readFileSync(transcriptPath, 'utf-8');
const messages = parseTranscript(content);
if (messages.length === 0) return false;
// Try to get summary from sessions index
let summary: string | undefined;
const indexPath = path.join(path.dirname(transcriptPath), 'sessions-index.json');
if (fs.existsSync(indexPath)) {
try {
const index = JSON.parse(fs.readFileSync(indexPath, 'utf-8'));
summary = index.entries?.find((e: { sessionId: string; summary?: string }) => e.sessionId === sessionId)?.summary;
} catch {
/* ignore */
}
}
const name = summary
? summary.toLowerCase().replace(/[^a-z0-9]+/g, '-').replace(/^-+|-+$/g, '').slice(0, 50)
: `conversation-${new Date().getHours().toString().padStart(2, '0')}${new Date().getMinutes().toString().padStart(2, '0')}`;
const conversationsDir = process.env.NANOCLAW_CONVERSATIONS_DIR || '/workspace/agent/conversations';
fs.mkdirSync(conversationsDir, { recursive: true });
const filename = `${new Date().toISOString().split('T')[0]}-${name}.md`;
fs.writeFileSync(path.join(conversationsDir, filename), formatTranscriptMarkdown(messages, summary, assistantName));
log(`Archived conversation to ${filename}`);
return true;
} catch (err) {
log(`Failed to archive transcript: ${err instanceof Error ? err.message : String(err)}`);
return false;
}
}
function createPreCompactHook(assistantName?: string): HookCallback {
return async (input) => {
const preCompact = input as PreCompactHookInput;
const { transcript_path: transcriptPath, session_id: sessionId } = preCompact;
if (!transcriptPath || !fs.existsSync(transcriptPath)) {
log('No transcript found for archiving');
return {};
}
try {
const content = fs.readFileSync(transcriptPath, 'utf-8');
const messages = parseTranscript(content);
if (messages.length === 0) return {};
// Try to get summary from sessions index
let summary: string | undefined;
const indexPath = path.join(path.dirname(transcriptPath), 'sessions-index.json');
if (fs.existsSync(indexPath)) {
try {
const index = JSON.parse(fs.readFileSync(indexPath, 'utf-8'));
summary = index.entries?.find((e: { sessionId: string; summary?: string }) => e.sessionId === sessionId)?.summary;
} catch {
/* ignore */
}
}
const name = summary
? summary.toLowerCase().replace(/[^a-z0-9]+/g, '-').replace(/^-+|-+$/g, '').slice(0, 50)
: `conversation-${new Date().getHours().toString().padStart(2, '0')}${new Date().getMinutes().toString().padStart(2, '0')}`;
const conversationsDir = '/workspace/agent/conversations';
fs.mkdirSync(conversationsDir, { recursive: true });
const filename = `${new Date().toISOString().split('T')[0]}-${name}.md`;
fs.writeFileSync(path.join(conversationsDir, filename), formatTranscriptMarkdown(messages, summary, assistantName));
log(`Archived conversation to ${filename}`);
} catch (err) {
log(`Failed to archive transcript: ${err instanceof Error ? err.message : String(err)}`);
}
archiveTranscriptFile(preCompact.transcript_path, preCompact.session_id, assistantName);
return {};
};
}
// ── Continuation rotation (cold-resume guard) ──
/**
* Resume cost is dominated by transcript size. Past this many bytes a fresh
* cold container can't reload the .jsonl before the host's 30-min idle ceiling
* fires, so the session is dropped and started clean. Operator-overridable.
*/
function transcriptRotateBytes(): number {
return Number(process.env.CLAUDE_TRANSCRIPT_ROTATE_BYTES) || 12 * 1024 * 1024;
}
/**
* Secondary age trigger, measured from the transcript's first entry. 0 (or a
* non-positive value) disables the age check; size alone then governs.
*/
function transcriptRotateAgeMs(): number {
const days = Number(process.env.CLAUDE_TRANSCRIPT_ROTATE_AGE_DAYS);
return Number.isFinite(days) && days > 0 ? days * 86_400_000 : 14 * 86_400_000;
}
function claudeProjectsDir(): string {
const base = process.env.CLAUDE_CONFIG_DIR || path.join(process.env.HOME || os.homedir(), '.claude');
return path.join(base, 'projects');
}
/**
* Locate the .jsonl backing a session id. The SDK names project dirs by a
* mangled cwd; rather than reproduce that convention we scan project dirs for
* `<sessionId>.jsonl` (session ids are UUIDs, so this is unambiguous).
*/
function findTranscriptPath(sessionId: string): string | null {
const projects = claudeProjectsDir();
let dirs: string[];
try {
dirs = fs.readdirSync(projects);
} catch {
return null;
}
for (const dir of dirs) {
const candidate = path.join(projects, dir, `${sessionId}.jsonl`);
if (fs.existsSync(candidate)) return candidate;
}
return null;
}
/** Epoch-ms of the first transcript entry, or null if unreadable. */
function transcriptStartMs(transcriptPath: string): number | null {
try {
const fd = fs.openSync(transcriptPath, 'r');
try {
const buf = Buffer.alloc(4096);
const n = fs.readSync(fd, buf, 0, buf.length, 0);
const firstLine = buf.toString('utf-8', 0, n).split('\n', 1)[0];
const ts = JSON.parse(firstLine)?.timestamp;
const ms = ts ? Date.parse(ts) : NaN;
return Number.isNaN(ms) ? null : ms;
} finally {
fs.closeSync(fd);
}
} catch {
return null;
}
}
// ── Provider ──
/**
@@ -277,6 +351,41 @@ export class ClaudeProvider implements AgentProvider {
return STALE_SESSION_RE.test(msg);
}
maybeRotateContinuation(continuation: string): string | null {
const transcriptPath = findTranscriptPath(continuation);
if (!transcriptPath) return null;
let size: number;
try {
size = fs.statSync(transcriptPath).size;
} catch {
return null;
}
const maxBytes = transcriptRotateBytes();
const startMs = transcriptStartMs(transcriptPath);
const ageMs = startMs === null ? 0 : Date.now() - startMs;
const maxAgeMs = transcriptRotateAgeMs();
let reason: string | null = null;
if (size > maxBytes) {
reason = `transcript ${(size / 1_048_576).toFixed(1)}MB > ${(maxBytes / 1_048_576).toFixed(0)}MB cap`;
} else if (startMs !== null && ageMs > maxAgeMs) {
reason = `transcript ${(ageMs / 86_400_000).toFixed(1)}d old > ${(maxAgeMs / 86_400_000).toFixed(0)}d cap`;
}
if (!reason) return null;
// Preserve a readable summary, then move the heavy .jsonl out of the
// resume path so the SDK starts a fresh session and the disk is reclaimed.
archiveTranscriptFile(transcriptPath, continuation, this.assistantName);
try {
fs.renameSync(transcriptPath, `${transcriptPath}.rotated-${Date.now()}`);
} catch (err) {
log(`Failed to move rotated transcript aside: ${err instanceof Error ? err.message : String(err)}`);
}
return reason;
}
query(input: QueryInput): AgentQuery {
const stream = new MessageStream();
stream.push(input.prompt);
@@ -14,6 +14,21 @@ export interface AgentProvider {
* (missing transcript, unknown session, etc.) and should be cleared.
*/
isSessionInvalid(err: unknown): boolean;
/**
* Optional pre-resume maintenance. Given the stored continuation token,
* decide whether its backing transcript has grown too large or too old to
* resume cheaply. Return a non-null reason string to tell the caller to drop
* the continuation and start a fresh session (the provider archives any
* recoverable summary first); return null to keep resuming.
*
* Guards the cold-resume failure mode: a long-lived hub session accumulates
* days of history — including base64 image blocks the agent Read — and the
* SDK reloads the whole .jsonl on every resume. Past a threshold the first
* turn alone can exceed the host's idle ceiling, so the container is killed
* before it ever replies. Providers without an on-disk transcript omit this.
*/
maybeRotateContinuation?(continuation: string, cwd: string): string | null;
}
/**