diff --git a/src/backends/codex.ts b/src/backends/codex.ts index 91ae660..019f7fd 100644 --- a/src/backends/codex.ts +++ b/src/backends/codex.ts @@ -135,6 +135,18 @@ export class CodexBackend implements Backend { resolveCodexAuthPath(), ) + // When MCP passthrough is active, the synthetic CODEX_HOME (merged MCP config + // + copied auth) is the source of truth. Register it as the jail's codex auth + // source so a CONFINED run gets it surfaced inside the jail with CODEX_HOME + // redirected there. The jail applies this only when it actually wraps; on + // docker/fallback paths the host `CODEX_HOME` env below is used unchanged. + if (req.jailSpec && codexHome) { + req.jailSpec.authSources = [ + ...(req.jailSpec.authSources ?? []).filter((s) => s.envVar !== 'CODEX_HOME'), + { source: codexHome.homePath, jailRel: '.codex', envVar: 'CODEX_HOME' }, + ] + } + // Phase-2 host wiring: provision cwd-native profile dimensions (skills/context/ // hooks/subagents/commands) before spawn. MCP stays on the path above. Fail-safe. provisionProfileWorkspace(req, session, 'codex', req.cwd ?? session?.cwd ?? process.cwd()) diff --git a/src/backends/content.ts b/src/backends/content.ts index 14f0e6d..c2953a8 100644 --- a/src/backends/content.ts +++ b/src/backends/content.ts @@ -31,6 +31,24 @@ export function tokensFromChars(chars: number): number { return Math.max(0, Math.ceil(chars / 4)) } +/** + * Total characters the model actually reads for a request — message content PLUS + * assistant tool-call structures (id + name + arguments), which `flattenMessages` + * deliberately omits. Used only for usage estimation, where dropping tool calls + * would systematically undercount tool-heavy turns and make them look cheaper than + * they were. Counts the semantic payload, not JSON framing. + */ +export function estimateMessagesChars(messages: ChatMessage[]): number { + let n = 0 + for (const m of messages) { + n += contentToText(m.content).length + for (const tc of m.tool_calls ?? []) { + n += (tc.id?.length ?? 0) + (tc.function?.name?.length ?? 0) + (tc.function?.arguments?.length ?? 0) + } + } + return n +} + export function collectSystemText(messages: ChatMessage[]): string { return messages .filter((m) => m.role === 'system') diff --git a/src/backends/types.ts b/src/backends/types.ts index 5461501..c682c58 100644 --- a/src/backends/types.ts +++ b/src/backends/types.ts @@ -171,8 +171,12 @@ export interface ChatDelta { tool_calls?: Array<{ id: string; name: string; arguments: string }> /** Terminal reason. Emitted once on the final chunk. */ finish_reason?: 'stop' | 'length' | 'tool_calls' | 'error' | 'timeout' - /** Backend-reported usage. Optional; present on final chunk when known. */ - usage?: { input_tokens?: number; output_tokens?: number } + /** + * Token usage. Optional; present on the final chunk when known. `estimated` + * is set when the bridge derived it from text (~4 chars/token) because the + * backend CLI reported none — consumers price it as approximate, not measured. + */ + usage?: { input_tokens?: number; output_tokens?: number; estimated?: boolean } /** Backend assigned id for this turn. Written to session store. */ internal_session_id?: string /** diff --git a/src/config.ts b/src/config.ts index 8922992..d96ee67 100644 --- a/src/config.ts +++ b/src/config.ts @@ -110,6 +110,31 @@ export interface BackendExecutorConfig { containerConfigDir?: string } +/** Backends that never spawn a CLI on the host (remote HTTP, local proxy, or a + * socket to an already-running daemon), so the host write-jail never applies. */ +const NON_HOST_SPAWN_BACKENDS = new Set(['sandbox', 'passthrough', 'nanoclaw']) + +/** + * Whether any ENABLED backend will spawn a CLI on the host (and therefore be + * subject to the write-jail). True unless every enabled backend is remote/proxy + * or pinned to a docker executor. Errs toward true: an unrecognized backend is + * assumed to host-spawn, so the startup jail check fails closed rather than + * booting "healthy" and failing every request at runtime. Covers backends that + * are NOT in `executors` (e.g. ACP hermes/openclaw, factory, amp, forge), which + * default to host spawn. + */ +export function anyBackendSpawnsOnHost( + backends: Iterable, + executors: Record, +): boolean { + for (const name of backends) { + if (NON_HOST_SPAWN_BACKENDS.has(name)) continue + if (executors[name]?.kind === 'docker') continue + return true + } + return false +} + const LOOPBACK = new Set(['127.0.0.1', '::1', 'localhost']) export function loadConfig(env: NodeJS.ProcessEnv = process.env): Config { diff --git a/src/executors/jail-support.ts b/src/executors/jail-support.ts index 660849e..f993d42 100644 --- a/src/executors/jail-support.ts +++ b/src/executors/jail-support.ts @@ -15,6 +15,7 @@ import { selectJailBackend } from '../jail/index.js' import type { JailBackend } from '../jail/index.js' +import { BackendError } from '../backends/types.js' import type { SpawnOpts } from './types.js' export interface JailedCommand { @@ -54,9 +55,12 @@ export async function applyJail( } return { bin, args, env: opts.env } } - throw new Error( + // Typed BackendError (not a plain Error) so the chat wrapper surfaces it as a + // real config error (5xx / typed SSE), not an opaque finish_reason:'error'. + throw new BackendError( `write-jail requested but '${backend.name}' cannot run on this host, refusing to run ` + `unconfined. ${ENABLE_HINT} Or set BRIDGE_JAIL_FALLBACK=warn to run without confinement.`, + 'not_configured', ) } diff --git a/src/jail/auth-preserve.ts b/src/jail/auth-preserve.ts index ff686a3..a9a9dcf 100644 --- a/src/jail/auth-preserve.ts +++ b/src/jail/auth-preserve.ts @@ -16,7 +16,8 @@ import { existsSync } from 'node:fs' import { cp } from 'node:fs/promises' import { homedir } from 'node:os' -import { join, relative } from 'node:path' +import { join, resolve } from 'node:path' +import type { JailAuthSource } from './types.js' /** * $HOME-relative auth/config paths per REGISTERED backend name. Aliases that @@ -36,6 +37,10 @@ const AUTH_PATHS: Record = { // is active; in the common no-MCP case it reads ~/.codex, which the jail would // otherwise hide. Preserve it here so jailed codex authenticates either way. codex: ['.codex'], + // pi keeps provider registrations / model defaults in ~/.pi/agent (the same + // dir config.ts mounts into pi's docker containers). Without it a jailed pi + // run starts from an empty HOME and loses every persisted provider/default. + pi: ['.pi/agent'], } /** The HOME the spawned CLIs actually read, honoring a cli-bridge-set HOME @@ -44,18 +49,35 @@ function backendHome(): string { return process.env.HOME?.trim() || homedir() } -/** Absolute host auth paths for a backend that actually exist on this host. */ -export function authSourcesFor(backendName: string): string[] { +/** Auth sources for a backend that actually exist on this host, each mapped to + * the jail-relative location the confined CLI reads. */ +export function authSourcesFor(backendName: string): JailAuthSource[] { const home = backendHome() - return (AUTH_PATHS[backendName] ?? []) - .map((rel) => join(home, rel)) - .filter((abs) => existsSync(abs)) -} - -/** The path, inside the jail HOME, where an auth source must appear (its - * location relative to the real HOME the CLI reads). */ -export function jailRelPath(source: string): string { - return relative(backendHome(), source) + const out: JailAuthSource[] = [] + for (const rel of AUTH_PATHS[backendName] ?? []) { + const source = join(home, rel) + // rel is already a POSIX-style jail-relative target ('.claude', '.config/opencode'). + if (existsSync(source)) out.push({ source, jailRel: rel }) + } + if (backendName === 'codex') { + // codex.ts honors $CODEX_HOME (src/backends/codex.ts) and only falls back to + // ~/.codex when it is unset. Mirror that: when CODEX_HOME points elsewhere, + // surface THAT directory at the jail's ~/.codex (where a confined codex with + // HOME=root looks), replacing the default entry rather than copying the wrong + // creds. Without this, a custom-CODEX_HOME install loses its auth in the jail. + const codexHome = process.env.CODEX_HOME?.trim() + if (codexHome) { + const source = resolve(codexHome) + const idx = out.findIndex((e) => e.jailRel === '.codex') + if (idx >= 0) out.splice(idx, 1) + if (existsSync(source)) out.push({ source, jailRel: '.codex' }) + } + // Redirect CODEX_HOME at the in-jail copy so a confined codex reads creds + // there rather than the (read-only) host path. The jail applies this only + // when it actually wraps — docker/fallback runs keep the host CODEX_HOME. + for (const e of out) if (e.jailRel === '.codex') e.envVar = 'CODEX_HOME' + } + return out } /** @@ -65,11 +87,11 @@ export function jailRelPath(source: string): string { * the copied destination paths so the caller can remove them on cleanup — the * jail root is project-local, so copied credentials must NOT linger there. */ -export async function copyAuthIntoJail(root: string, sources: string[] | undefined): Promise { +export async function copyAuthIntoJail(root: string, sources: JailAuthSource[] | undefined): Promise { const copied: string[] = [] - for (const source of sources ?? []) { + for (const { source, jailRel } of sources ?? []) { if (!existsSync(source)) continue - const dest = join(root, jailRelPath(source)) + const dest = join(root, jailRel) await cp(source, dest, { recursive: true, force: true, errorOnExist: false }) copied.push(dest) } diff --git a/src/jail/linux-bwrap.ts b/src/jail/linux-bwrap.ts index 4c2e220..3b05f55 100644 --- a/src/jail/linux-bwrap.ts +++ b/src/jail/linux-bwrap.ts @@ -28,9 +28,8 @@ import { spawnSync } from 'node:child_process' import { accessSync, constants, existsSync } from 'node:fs' import { delimiter, join } from 'node:path' -import { jailRelPath } from './auth-preserve.js' import type { JailBackend, JailSpec, JailWrap } from './types.js' -import { jailEnv, prepareJailHome, resolveJailRoot } from './types.js' +import { ignoreJailRoot, jailEnv, prepareJailHome, resolveJailRoot } from './types.js' const BWRAP_BIN = 'bwrap' @@ -45,6 +44,7 @@ export class LinuxBwrapJail implements JailBackend { async wrap(bin: string, args: string[], spec: JailSpec): Promise { const root = resolveJailRoot(spec.root, spec.projectDir) await prepareJailHome(root) + ignoreJailRoot(spec.projectDir, root) const bwrapArgs = [ '--unshare-user', @@ -74,10 +74,13 @@ export class LinuxBwrapJail implements JailBackend { // Make the backend's host auth readable inside the jail (read-only), // bound AFTER the writable root so these specific subpaths stay read-only. // HOME is the jail root, so ~/.claude etc. resolve to these binds. - for (const source of spec.authSources ?? []) { - if (existsSync(source)) { - bwrapArgs.push('--ro-bind', source, join(root, jailRelPath(source))) - } + for (const { source, jailRel, envVar } of spec.authSources ?? []) { + if (!existsSync(source)) continue + const dest = join(root, jailRel) + bwrapArgs.push('--ro-bind', source, dest) + // Point the backend's env var (e.g. CODEX_HOME) at the in-jail copy. Done + // here, where the jail truly applies, so non-jailed paths are untouched. + if (envVar) bwrapArgs.push('--setenv', envVar, dest) } // Redirect HOME + XDG dirs into the jail so stateful CLIs write inside it. diff --git a/src/jail/macos-seatbelt.ts b/src/jail/macos-seatbelt.ts index 9498307..4cafc70 100644 --- a/src/jail/macos-seatbelt.ts +++ b/src/jail/macos-seatbelt.ts @@ -16,16 +16,29 @@ * removes it (and its temp dir) after the spawn completes. */ -import { accessSync, constants } from 'node:fs' +import { accessSync, constants, existsSync } from 'node:fs' import { mkdir, mkdtemp, realpath, rm, writeFile } from 'node:fs/promises' import { delimiter, join } from 'node:path' import { tmpdir } from 'node:os' import { copyAuthIntoJail } from './auth-preserve.js' import type { JailBackend, JailSpec, JailWrap } from './types.js' -import { jailEnv, prepareJailHome, resolveJailRoot } from './types.js' +import { ignoreJailRoot, jailEnv, prepareJailHome, resolveJailRoot } from './types.js' const SANDBOX_EXEC_BIN = 'sandbox-exec' -const SYSTEM_WRITABLE = ['/private/tmp', '/private/var/folders'] +// Device nodes a normal process writes to (output redirection, RNG, tracing, +// the controlling tty). These are not filesystem locations a confined run can +// persist files to, so allowing them does not weaken the "writes confined to +// the jail root" guarantee. We deliberately do NOT allow the shared temp trees +// (/private/tmp, /private/var/folders): the CLI's temp writes are redirected to +// TMPDIR=/.tmp (jailEnv), which sits inside the writable root. +const DEVICE_WRITABLE = [ + '/dev/null', + '/dev/zero', + '/dev/random', + '/dev/urandom', + '/dev/dtracehelper', + '/dev/tty', +] export class MacosSeatbeltJail implements JailBackend { readonly name = 'seatbelt' @@ -39,48 +52,69 @@ export class MacosSeatbeltJail implements JailBackend { // Create the redirected HOME/XDG dirs under the (canonical) root so the CLI // can write to them; they sit inside `root`, already in the writable set. await prepareJailHome(root) + ignoreJailRoot(spec.projectDir, root) // sandbox-exec cannot bind-mount, so copy the backend's host auth into the // jail HOME (writable, under root) — the CLI authenticates as the operator. // The copies are removed in cleanup() so credentials never linger in the // project-local jail root. const copiedAuth = await copyAuthIntoJail(root, spec.authSources) - const writable = [root, ...SYSTEM_WRITABLE] - for (const path of spec.extraWritablePaths ?? []) { - writable.push(await canonicalize(path)) + const removeCopiedAuth = async (): Promise => { + for (const copied of copiedAuth) { + await rm(copied, { recursive: true, force: true }) + } } + // From here on, any failure must remove the copied credentials — otherwise a + // throw before `cleanup` is returned leaves real auth under the repo jail root. + try { + const writable = [root] + for (const path of spec.extraWritablePaths ?? []) { + writable.push(await canonicalize(path)) + } + + // Point any backend env var (e.g. CODEX_HOME) at the in-jail copy. Done + // here, where the jail truly applies, so non-jailed paths are untouched. + const authEnv: Record = {} + for (const { source, jailRel, envVar } of spec.authSources ?? []) { + if (envVar && existsSync(source)) authEnv[envVar] = join(root, jailRel) + } - const profile = buildProfile(writable) - const dir = await mkdtemp(join(tmpdir(), 'cli-bridge-jail-')) - const profilePath = join(dir, 'profile.sb') - await writeFile(profilePath, profile, { mode: 0o600 }) + const profile = buildProfile(writable) + const dir = await mkdtemp(join(tmpdir(), 'cli-bridge-jail-')) + const profilePath = join(dir, 'profile.sb') + await writeFile(profilePath, profile, { mode: 0o600 }) - return { - bin: SANDBOX_EXEC_BIN, - args: ['-f', profilePath, '-D', `HOME=${root}`, '-D', `WORK=${spec.projectDir}`, bin, ...args], - // sandbox-exec does NOT rewrite the child env; -D only parameterizes the - // profile. Return the real env so HOME/XDG actually point into the jail. - env: jailEnv(root), - cleanup: async () => { - await rm(dir, { recursive: true, force: true }) - // Remove copied credentials from the project-local jail root. - for (const copied of copiedAuth) { - await rm(copied, { recursive: true, force: true }) - } - }, + return { + bin: SANDBOX_EXEC_BIN, + args: ['-f', profilePath, '-D', `HOME=${root}`, '-D', `WORK=${spec.projectDir}`, bin, ...args], + // sandbox-exec does NOT rewrite the child env; -D only parameterizes the + // profile. Return the real env so HOME/XDG actually point into the jail. + env: { ...jailEnv(root), ...authEnv }, + cleanup: async () => { + await rm(dir, { recursive: true, force: true }) + await removeCopiedAuth() + }, + } + } catch (err) { + await removeCopiedAuth() + throw err } } } function buildProfile(writable: string[]): string { - const allow = writable.map((path) => ` (subpath "${sbplEscape(path)}")`).join('\n') + const allowSubpaths = writable.map((path) => ` (subpath "${sbplEscape(path)}")`).join('\n') + const allowDevices = DEVICE_WRITABLE.map((path) => ` (literal "${sbplEscape(path)}")`).join('\n') return [ '(version 1)', '(allow default)', '', - '; Confine writes to the jail root and explicit writable paths.', + '; Deny all writes, then re-allow only the jail root + explicit writable paths', + '; (subpaths) and standard device nodes (literals). Shared temp trees stay', + '; denied; the CLI writes temp to TMPDIR=/.tmp instead.', '(deny file-write* (subpath "/"))', '(allow file-write*', - allow, + allowSubpaths, + allowDevices, ')', '', ].join('\n') diff --git a/src/jail/types.ts b/src/jail/types.ts index 4378602..0d3a6cd 100644 --- a/src/jail/types.ts +++ b/src/jail/types.ts @@ -12,7 +12,7 @@ * else the NoopJail passes argv through unchanged. */ -import { existsSync, realpathSync } from 'node:fs' +import { appendFileSync, existsSync, mkdirSync, readFileSync, realpathSync, statSync } from 'node:fs' import { mkdir, writeFile } from 'node:fs/promises' import { basename, dirname, isAbsolute, join, relative, resolve, sep } from 'node:path' @@ -27,11 +27,28 @@ export interface JailSpec { extraWritablePaths?: string[] /** Extra absolute paths to expose read-only beyond the host default. */ extraReadablePaths?: string[] - /** Absolute host paths holding the backend CLI's auth/config, made - * available inside the jail at their $HOME-relative location (read-only - * bind on Linux, copy on macOS) so a confined run still authenticates as - * the operator. Populated per backend by {@link authSourcesFor}. */ - authSources?: string[] + /** Host auth/config sources made available inside the jail (read-only bind + * on Linux, copy on macOS) so a confined run still authenticates as the + * operator. Each carries an explicit jail-relative target so a source OUTSIDE + * the operator HOME (e.g. a custom `CODEX_HOME`) still lands at the location + * the confined CLI reads. Populated per backend by {@link authSourcesFor}. */ + authSources?: JailAuthSource[] +} + +/** A host credential/config path and where it must appear inside the jail. */ +export interface JailAuthSource { + /** Absolute host path holding the backend CLI's auth/config. */ + source: string + /** Path relative to the jail root (== jail HOME) where `source` must appear, + * so the confined CLI finds it at the same logical location. Always inside + * the root, even when `source` lives outside the operator HOME. */ + jailRel: string + /** Optional env var the jail must point at this source's IN-JAIL location + * (`/`). Set ONLY by the jail backend when it actually wraps, + * so e.g. codex's `CODEX_HOME` is redirected into the jail for confined runs + * but left untouched on docker / fallback-passthrough paths that ignore the + * jail. */ + envVar?: string } export interface JailWrap { @@ -117,6 +134,53 @@ export function jailEnv(root: string): Record { } } +/** + * Ensure the jail root is git-ignored from the REPO's perspective. A .gitignore + * placed inside an untracked directory does not make Git ignore that directory + * itself, so we add a rule to the project's `.git/info/exclude` (local + untracked, + * no working-tree change). Best-effort and idempotent; a no-op outside a git repo + * or when `.git` is a file (worktree/submodule). + */ +export function ignoreJailRoot(projectDir: string, root: string): void { + try { + const found = findGitDir(resolve(projectDir)) + if (!found) return + // .git/info/exclude patterns are anchored at the repo root, so the entry is + // the jail root relative to THAT (handles cwd being a repo subdirectory). + const rel = relative(found.repoRoot, root).split(sep).join('/') + if (!rel || rel.startsWith('..')) return + const entry = `/${rel}/` + const excludeFile = join(found.gitDir, 'info', 'exclude') + const current = existsSync(excludeFile) ? readFileSync(excludeFile, 'utf8') : '' + if (current.split(/\r?\n/).includes(entry)) return + mkdirSync(dirname(excludeFile), { recursive: true }) + appendFileSync(excludeFile, `${current && !current.endsWith('\n') ? '\n' : ''}${entry}\n`) + } catch { + // best-effort: do not fail a jailed run because the ignore rule could not be written + } +} + +/** Find the git dir + repo root for `start`, walking up parents. Handles a `.git` + * directory (normal repo) and a `.git` FILE (`gitdir: ` for worktrees / + * submodules). Returns null outside any repo. */ +function findGitDir(start: string): { gitDir: string; repoRoot: string } | null { + let dir = start + for (;;) { + const dotgit = join(dir, '.git') + if (existsSync(dotgit)) { + const st = statSync(dotgit) + if (st.isDirectory()) return { gitDir: dotgit, repoRoot: dir } + if (st.isFile()) { + const m = /^gitdir:\s*(.+)\s*$/m.exec(readFileSync(dotgit, 'utf8')) + if (m && m[1]) return { gitDir: resolve(dir, m[1].trim()), repoRoot: dir } + } + } + const parent = dirname(dir) + if (parent === dir) return null + dir = parent + } +} + /** Create the jail root and the redirected HOME/XDG dirs so a CLI that * expects them to exist does not fail on first write. */ export async function prepareJailHome(root: string): Promise { diff --git a/src/routes/chat-completions.ts b/src/routes/chat-completions.ts index a5e7e6c..e7befb0 100644 --- a/src/routes/chat-completions.ts +++ b/src/routes/chat-completions.ts @@ -23,10 +23,11 @@ import type { ChatDelta, ChatRequest } from '../backends/types.js' import { BackendError } from '../backends/types.js' import { parseMode, ModeNotSupportedError } from '../modes.js' import { collectNonStreaming, deltaToOpenAIChunk, deltaToSseComment, makeChunkMeta } from '../streaming/sse.js' -import { flattenMessages, tokensFromChars } from '../backends/content.js' +import { estimateMessagesChars, tokensFromChars } from '../backends/content.js' import { resolveJailSpec } from '../jail/resolve-spec.js' import { authSourcesFor } from '../jail/auth-preserve.js' import { AdmissionRejectedError, type AdmissionGate, type AdmissionLease } from '../admission.js' +import type { Run, RunRegistry } from '../runs/registry.js' const DEFAULT_SSE_HEARTBEAT_MS = 15_000 @@ -77,6 +78,19 @@ const chatRequestSchema = z.object({ effort: z.enum(['none', 'minimal', 'low', 'medium', 'high', 'xhigh', 'ultracode']).optional(), session_id: z.string().optional(), resume_id: z.string().optional(), // alias for session_id + /** + * Durable-run id. Decouples the JOB from this HTTP connection. A + * client disconnect never kills the run; a reconnect/retry that reuses + * the same `run_id` RE-ATTACHES to the same in-flight subprocess + * (idempotent dispatch) instead of cold-starting a second one. + * + * Absent → a fresh run id is minted per request (today's behavior, + * minus the kill-on-disconnect). Also accepted via `X-Run-Id`. + * + * Reconnect replay: send `Last-Event-ID: ` (or `X-Last-Event-Id`) + * with the same run_id to replay only the deltas missed since `seq`. + */ + run_id: z.string().optional(), mode: z.enum(['byob', 'hosted-safe', 'hosted-sandboxed']).optional(), // OpenAI-compatible shape — wire is snake_case, TS is camelCase. We // translate to responseFormat when we build the ChatRequest below. @@ -150,7 +164,7 @@ const chatRequestSchema = z.object({ export function mountChatCompletions( app: Hono, - deps: { registry: BackendRegistry; sessions: SessionStore; admission?: AdmissionGate }, + deps: { registry: BackendRegistry; sessions: SessionStore; runs: RunRegistry; admission?: AdmissionGate }, ): void { app.post('/v1/chat/completions', async (c) => { let raw: unknown @@ -213,7 +227,7 @@ export function mountChatCompletions( // Pull response_format off so it doesn't bleed through the spread // as an unknown extra field — we translate snake_case → camelCase // here to match the ChatRequest type. - const { response_format, agent_profile, cwd, execution, mcp: bodyMcp, ...rest } = parsed.data + const { response_format, agent_profile, cwd, execution, mcp: bodyMcp, run_id: bodyRunId, ...rest } = parsed.data // MCP can arrive in the body OR the `X-Mcp-Config` header. Body // wins on conflict — header is for callers that can't extend the // request body (e.g. forwarding through a third-party gateway that @@ -257,8 +271,22 @@ export function mountChatCompletions( req.cwd = session.cwd } - const ac = new AbortController() - c.req.raw.signal.addEventListener('abort', () => ac.abort(), { once: true }) + // Durable-run id: connection-independent job identity. A reconnect or + // retry reusing this id RE-ATTACHES to the same in-flight subprocess. + const runId = bodyRunId ?? c.req.header('x-run-id') ?? crypto.randomUUID() + // Last-Event-ID (standard SSE reconnect header) or the X-Last-Event-Id + // alias: the highest seq the client already saw. Replay starts after it. + const afterSeq = parseLastEventId( + c.req.header('last-event-id') ?? c.req.header('x-last-event-id'), + ) + + // Idempotent dispatch. A known run id re-attaches with zero new work — + // no second subprocess, no second admission slot. Only a genuinely new + // id reaches the setup-and-pump path below. + const existing = deps.runs.get(runId) + if (existing) { + return respondFromRun(c, existing, req, runId, afterSeq, false) + } // Execution router: when the caller asks for `execution: 'sandbox'` // on a host harness (claude/kimi/gemini/codex/...), delegate to the @@ -266,8 +294,12 @@ export function mountChatCompletions( // + prompt + cwd contract is identical — only the execution location // changes. Map the host harness → in-container backend type via // `harnessToSandboxBackendType`. - let source: AsyncIterable + // + // `run.signal` (NOT the HTTP socket) drives the backend's abort + // contract. A client disconnect leaves this signal untouched, so the + // subprocess keeps running; only an explicit cancel aborts it. let admissionLease: AdmissionLease | null = null + let makeSource: ((run: Run) => AsyncIterable) | null = null if (req.execution?.kind === 'sandbox' && backend.name !== 'sandbox') { const sandboxBackend = deps.registry.byName('sandbox') if (!sandboxBackend) { @@ -290,7 +322,7 @@ export function mountChatCompletions( sandboxBackendType, }, } - source = sandboxBackend.chat(delegatedReq, session, ac.signal) + makeSource = (run) => sandboxBackend.chat(delegatedReq, session, run.signal) } else { // Host execution: resolve the write-jail spec from execution.jail // (host variant) layered over the BRIDGE_JAIL_* env defaults, using @@ -309,23 +341,40 @@ export function mountChatCompletions( if (req.jailSpec) req.jailSpec.authSources = authSourcesFor(backend.name) if (deps.admission && shouldApplyHostAdmission(backend.name, req)) { try { - admissionLease = await deps.admission.acquire(ac.signal) + // Admission is held by the JOB, not the connection — release it + // when the run finishes, not when the client drops. Acquire is + // cancellable only by an explicit shutdown, so pass no signal. + admissionLease = await deps.admission.acquire() } catch (err) { return admissionErrorResponse(c, err) } } - source = backend.chat(req, session, ac.signal) + makeSource = (run) => backend.chat(req, session, run.signal) } + // Approximate input size once (content + tool-call structures), for backends that + // report no usage. Estimated in wrap; tool calls are included so tool-heavy turns + // are not systematically undercounted. + const promptChars = estimateMessagesChars(req.messages) + // Persist internal session id as it flows in. Returns a new // AsyncIterable so the typed boundary stays clean. - // Typed backend/mode errors re-throw so the outer handler can return - // a real HTTP status or SSE error frame. Unknown errors terminate - // with finish_reason='error' so we do not leak internals. - const wrapped: AsyncIterable = { + // Typed backend/mode errors are converted to a terminal error delta + // INSIDE the run buffer (the run owns the stream now — there is no + // outer iterator to re-throw to). The route reader surfaces the right + // HTTP/SSE shape from the buffered finish_reason. + const wrap = (source: AsyncIterable): AsyncIterable => ({ [Symbol.asyncIterator]: async function* () { + let sawUsage = false + let completionChars = 0 try { for await (const delta of source) { + if (delta.usage) sawUsage = true + completionChars += (delta.content?.length ?? 0) + + (delta.tool_calls?.reduce( + (s, tc) => s + (tc.id?.length ?? 0) + (tc.name?.length ?? 0) + (tc.arguments?.length ?? 0), + 0, + ) ?? 0) if (delta.internal_session_id && req.session_id) { deps.sessions.upsert({ externalId: req.session_id, @@ -341,6 +390,20 @@ export function mountChatCompletions( } yield delta } + // Backends whose CLI reports no usage (kimi-code, opencode) would leave + // every reader with zero tokens, indistinguishable from a stub. Estimate + // from the text (~4 chars/token) and emit a usage delta flagged `estimated` + // so cost ledgers approximate spend without mistaking it for measured truth. + // It is buffered in the run, so reconnecting readers receive it too. + if (!sawUsage) { + yield { + usage: { + input_tokens: tokensFromChars(promptChars), + output_tokens: tokensFromChars(completionChars), + estimated: true, + }, + } satisfies ChatDelta + } } catch (err) { if (err instanceof ModeNotSupportedError || err instanceof BackendError) { throw err @@ -348,123 +411,148 @@ export function mountChatCompletions( yield { finish_reason: 'error' } satisfies ChatDelta console.error(`[cli-bridge] backend ${backend.name} failed:`, err) } finally { + // Admission is released when the JOB ends — pump() consumes this + // source to completion regardless of client connection state. admissionLease?.release() } }, - } + }) + + // Register + start the durable run. getOrCreate is idempotent: a + // racing duplicate (same run_id arriving twice) re-attaches to the + // first run and never invokes the factory twice. The run pumps the + // source to completion on its own — the client connection below is + // just one of possibly many readers. + const run = deps.runs.getOrCreate(runId, (r) => { + void r.pump(wrap(makeSource!(r))) + }) - // Surface mode in response headers so clients can confirm what actually ran. - c.header('X-Bridge-Mode', req.mode ?? 'byob') + return respondFromRun(c, run, req, runId, afterSeq, true) + }) +} - // Approximate input size once, for backends that report no usage (estimated below). - const promptText = flattenMessages(req.messages, { includeSystem: true }) +/** + * Render a (possibly already-running) durable run to this request. The + * client attaches as a reader from `afterSeq`; a disconnect ends the + * reader but NEVER the run. Streaming and non-streaming both read the + * same buffered, seq-numbered delta log. + */ +async function respondFromRun( + c: Context, + run: Run, + req: ChatRequest, + runId: string, + afterSeq: number, + isFresh: boolean, +): Promise { + // Surface mode + run id so clients can reconnect/cancel by run id. + c.header('X-Bridge-Mode', req.mode ?? 'byob') + c.header('X-Run-Id', runId) - // OpenAI's /v1/chat/completions defaults `stream: false` when the - // field is omitted. cli-bridge previously inverted that (defaulted - // to SSE), which silently broke every off-the-shelf OpenAI SDK - // (ai-sdk, agent-eval's callLlm, openai-node) that POSTs without - // explicit stream. Match OpenAI's contract: only stream when the - // caller asked for it (`stream: true`). - if (req.stream !== true) { - try { - const body = await collectNonStreaming(wrapped, req.model, promptText) - return c.json(body) - } catch (err) { - return errorResponse(c, err) - } + // OpenAI's /v1/chat/completions defaults `stream: false` when the field + // is omitted. Only stream when the caller asked for it (`stream: true`); + // otherwise drain the run's buffer to a single completion body. + if (req.stream !== true) { + // A non-streaming response is a single JSON body, so a dispatch-time + // typed error (mode rejected, spawn/config failure — thrown before any + // delta) must become a real HTTP status, not a 200 with an error + // payload. Only the fresh dispatcher does this; a re-attaching client + // (run already known) drains the buffered terminal error instead. The + // gate resolves the moment output starts or the run settles, so a + // healthy long job is never blocked past its first delta. + if (isFresh) { + await run.whenStarted() + const dispatchErr = run.dispatchError() + if (dispatchErr !== undefined) return errorResponse(c, dispatchErr) } + try { + const deltas = mapSeq(run.attach(afterSeq)) + const body = await collectNonStreaming(deltas, req.model) + return c.json(body) + } catch (err) { + return errorResponse(c, err) + } + } - return streamSSE(c, async (stream) => { - const meta = makeChunkMeta(req.model) - const heartbeatMs = resolveSseHeartbeatMs() - let streamClosed = false - const abortStream = (): void => { - streamClosed = true - ac.abort() - } - const writeRaw = async (chunk: string): Promise => { - if (streamClosed || ac.signal.aborted) return false - try { - await stream.write(chunk) - return true - } catch { - abortStream() - return false - } - } - const writeSse = async (data: string): Promise => { - if (streamClosed || ac.signal.aborted) return false - try { - await stream.writeSSE({ data }) - return true - } catch { - abortStream() - return false - } + return streamSSE(c, async (stream) => { + const meta = makeChunkMeta(req.model) + const heartbeatMs = resolveSseHeartbeatMs() + // `clientGone` ends THIS reader on a write failure (socket closed). It + // does NOT cancel the run — that is the whole point of the decoupling. + let clientGone = false + const writeRaw = async (chunk: string): Promise => { + if (clientGone) return false + try { + await stream.write(chunk) + return true + } catch { + clientGone = true + return false } - const heartbeat = setInterval(() => { - void writeRaw(': keepalive\n\n') - }, heartbeatMs) - let sawUsage = false - let completionChars = 0 + } + // SSE `id:` carries the per-run seq so the client's next reconnect can + // send it back as Last-Event-ID and replay exactly what it missed. + const writeSse = async (data: string, id?: number): Promise => { + if (clientGone) return false try { - if (!await writeRaw(': connected\n\n')) return - for await (const delta of wrapped) { - if (ac.signal.aborted) break - if (delta.usage) sawUsage = true - completionChars += (delta.content?.length ?? 0) - + (delta.tool_calls?.reduce((s, tc) => s + (tc.arguments?.length ?? 0), 0) ?? 0) - // Backend-level liveness ping (e.g. kimi/opencode stdout idle): - // render as SSE comment so the consumer (AI SDK, openai-node) - // ignores it per spec instead of trying to route a fake tool - // call. SSE comments also count as transport heartbeats. - const comment = deltaToSseComment(delta) - if (comment) { - if (!await writeRaw(comment)) break - continue - } - const chunk = deltaToOpenAIChunk(delta, meta) - // Metadata-only deltas (e.g. internal_session_id) yield null — - // the route already consumed the metadata above; nothing to - // write to the OpenAI-visible stream. - if (!chunk) continue - // deltaToOpenAIChunk returns a complete "data: …\n\n" line. - // Strip the framing so streamSSE can re-add it. - const payload = chunk.slice('data: '.length).replace(/\n\n$/, '') - if (!await writeSse(payload)) break - } - // Backends that emit no usage (kimi-code, opencode) would leave consumers - // with zero tokens, indistinguishable from a stub. Emit an estimated usage - // chunk (flagged) so cost ledgers can approximate spend. - if (!sawUsage && !ac.signal.aborted) { - const prompt_tokens = tokensFromChars(promptText.length) - const completion_tokens = tokensFromChars(completionChars) - await writeSse(JSON.stringify({ - id: meta.id, - object: 'chat.completion.chunk', - created: meta.created, - model: req.model, - choices: [], - usage: { prompt_tokens, completion_tokens, total_tokens: prompt_tokens + completion_tokens, estimated: true }, - })) + await stream.writeSSE(id !== undefined ? { data, id: String(id) } : { data }) + return true + } catch { + clientGone = true + return false + } + } + const heartbeat = setInterval(() => { + void writeRaw(': keepalive\n\n') + }, heartbeatMs) + try { + if (!await writeRaw(': connected\n\n')) return + for await (const { seq, delta } of run.attach(afterSeq)) { + if (clientGone) break + // Backend-level liveness ping (e.g. kimi/opencode stdout idle): + // render as SSE comment so the consumer (AI SDK, openai-node) + // ignores it per spec instead of trying to route a fake tool + // call. SSE comments also count as transport heartbeats. + const comment = deltaToSseComment(delta) + if (comment) { + if (!await writeRaw(comment)) break + continue } - } catch (err) { - if (ac.signal.aborted) return - const type = err instanceof ModeNotSupportedError - ? 'mode_not_supported' - : err instanceof BackendError - ? err.code - : 'server_error' - const message = err instanceof Error ? err.message : String(err) - await writeSse(JSON.stringify({ error: { message, type } })) - } finally { - clearInterval(heartbeat) + const chunk = deltaToOpenAIChunk(delta, meta) + // Metadata-only deltas (e.g. internal_session_id) yield null — + // consumed by the run/session store; nothing to write here. + if (!chunk) continue + // deltaToOpenAIChunk returns a complete "data: …\n\n" line. Strip + // the framing so streamSSE can re-add it (with the seq as id). + const payload = chunk.slice('data: '.length).replace(/\n\n$/, '') + if (!await writeSse(payload, seq)) break } - await writeSse('[DONE]') - }) + } catch (err) { + if (clientGone) return + const message = err instanceof Error ? err.message : String(err) + await writeSse(JSON.stringify({ error: { message, type: 'server_error' } })) + } finally { + clearInterval(heartbeat) + } + await writeSse('[DONE]') }) } +/** Unwrap SeqDelta → ChatDelta for the non-streaming collector. */ +async function* mapSeq(iter: AsyncIterable<{ delta: ChatDelta }>): AsyncIterable { + for await (const { delta } of iter) yield delta +} + +/** + * Parse a `Last-Event-ID` / `X-Last-Event-Id` reconnect header into a + * seq. Non-numeric / absent → 0 (replay from the start of the buffer). + */ +function parseLastEventId(value: string | undefined): number { + if (!value) return 0 + const n = Number.parseInt(value, 10) + return Number.isInteger(n) && n >= 0 ? n : 0 +} + function resolveSseHeartbeatMs(): number { const raw = Number(process.env.BRIDGE_SSE_HEARTBEAT_MS) return Number.isFinite(raw) && raw >= 10 ? raw : DEFAULT_SSE_HEARTBEAT_MS diff --git a/src/routes/runs.ts b/src/routes/runs.ts new file mode 100644 index 0000000..6961324 --- /dev/null +++ b/src/routes/runs.ts @@ -0,0 +1,31 @@ +/** + * Run admin endpoints — explicit cancel + status for durable runs. + * + * Cancel is the ONLY client-initiated path that kills a running CLI + * subprocess. A socket disconnect does not (the job survives so the + * client can reconnect); this endpoint is how a caller says "I actually + * want this stopped." It aborts the run's owned controller, which the + * backend honors via its `signal → killTree` wiring. + */ + +import { Hono } from 'hono' +import type { RunRegistry } from '../runs/registry.js' + +export function mountRuns(app: Hono, deps: { runs: RunRegistry }): void { + app.get('/v1/runs/:id', (c) => { + const run = deps.runs.get(c.req.param('id')) + if (!run) return c.json({ error: { message: 'run not found', type: 'not_found_error' } }, 404) + return c.json(run.snapshot()) + }) + + // POST (not DELETE) — cancelling mutates the run's lifecycle and is the + // semantic counterpart to dispatch, not a resource deletion. + app.post('/v1/runs/:id/cancel', (c) => { + const cancelled = deps.runs.cancel(c.req.param('id')) + if (!cancelled) { + // Already terminal or unknown — idempotent success-ish: nothing to kill. + return c.json({ cancelled: false }, 200) + } + return c.json({ cancelled: true }) + }) +} diff --git a/src/runs/registry.ts b/src/runs/registry.ts new file mode 100644 index 0000000..4528bd7 --- /dev/null +++ b/src/runs/registry.ts @@ -0,0 +1,330 @@ +/** + * Durable run registry — decouples a CLI job from any one client + * connection, mirroring the @tangle-network/sandbox SessionGateway + * primitive (per-session monotonic `seq`, replay by `lastEventId`, + * idempotent dispatch by run id). + * + * The flaw this fixes: today a mere client disconnect aborts the route's + * AbortController, which the backend contract interprets as "kill the + * subprocess" — 30 min of work destroyed by a transport blip or the + * client's own retry. Here, a `Run` owns the subprocess lifecycle + * INDEPENDENTLY of any HTTP request: + * + * - The backend stream is consumed by the registry ONCE, into a + * server-side buffer where every delta gets a monotonic `seq`. + * - Any number of clients attach/detach freely. A drop costs nothing. + * - On reconnect with `Last-Event-ID: `, the client replays the + * missed deltas from the buffer, then tails live — no cold restart. + * - A retry that reuses the same run id RE-ATTACHES to the same live + * run (idempotent dispatch) instead of spawning a second subprocess. + * - The job is killed ONLY on an explicit cancel — never on socket + * close (Pillar 1 + Pillar 4 of the resilience plan). + * + * Buffering is in-memory and per-run. A run is reaped a bounded time + * after it finishes (so a reconnecting client can still drain the tail), + * or immediately on explicit cancel. + */ + +import type { ChatDelta } from '../backends/types.js' + +/** A buffered delta plus its per-run monotonic sequence number. */ +export interface SeqDelta { + seq: number + delta: ChatDelta +} + +export type RunStatus = 'running' | 'done' | 'error' | 'cancelled' + +export interface RunSnapshot { + id: string + status: RunStatus + /** Highest seq emitted so far. 0 = nothing buffered yet. */ + lastSeq: number + startedAt: number + endedAt: number | null +} + +interface Waiter { + resolve: () => void +} + +/** + * A single durable run. Owns the subprocess (via `abort`), buffers every + * delta with a monotonic seq, and fans out to any number of attached + * readers via a seq cursor. + */ +export class Run { + readonly id: string + readonly startedAt = Date.now() + private readonly buffer: SeqDelta[] = [] + private seq = 0 + private status: RunStatus = 'running' + private endedAt: number | null = null + private readonly waiters = new Set() + private reapTimer: ReturnType | null = null + /** + * Set when the registry reaps this run and clears the buffer. A reader + * that is still attached at reap time must terminate rather than wait + * forever for deltas that no longer exist. + */ + private disposed = false + + /** Aborts the OWNED job (subprocess). Distinct from any socket signal. */ + private readonly ac = new AbortController() + /** Fires when the job is finished (any terminal status). */ + private settled?: Promise + + /** + * A typed error thrown at DISPATCH time — before the backend emitted a + * single delta (seq 0). E.g. `ModeNotSupportedError`, or a `BackendError` + * from spawn/config. These are request rejections, not mid-stream + * failures: the fresh dispatcher surfaces them as a real HTTP status + * (501/502/…) instead of a 200 with a buffered error delta. A re-attaching + * client (run already known) reads the buffered terminal error instead. + */ + private setupError: unknown + + constructor( + id: string, + private readonly onReap: (id: string) => void, + private readonly reapDelayMs: number, + ) { + this.id = id + } + + /** The signal a backend's `chat()` consumes — aborted only on cancel. */ + get signal(): AbortSignal { + return this.ac.signal + } + + snapshot(): RunSnapshot { + return { + id: this.id, + status: this.status, + lastSeq: this.seq, + startedAt: this.startedAt, + endedAt: this.endedAt, + } + } + + isTerminal(): boolean { + return this.status !== 'running' + } + + /** + * The dispatch-time typed error (see `setupError`), or undefined. Only + * ever set when the backend failed before emitting any delta. A fresh + * dispatcher awaits the run settling, then consults this to choose + * between a proper HTTP error and a normal streamed response. + */ + dispatchError(): unknown { + return this.setupError + } + + /** + * Resolves once the run has either emitted its first delta or reached a + * terminal status. Lets the fresh dispatcher decide whether dispatch + * failed (setup error, seq still 0 + terminal) WITHOUT waiting for a + * healthy long-running job to complete — it returns the moment real + * output starts flowing. + */ + async whenStarted(): Promise { + while (this.seq === 0 && !this.isTerminal() && !this.disposed) { + await this.waitForChange() + } + } + + /** + * Drive the backend stream into the buffer, exactly once. The run keeps + * pulling deltas even with zero attached clients — that IS the + * decoupling. Marks terminal status when the source completes, errors, + * or is cancelled, then schedules reaping. + */ + pump(source: AsyncIterable): Promise { + if (this.settled) return this.settled + this.settled = (async () => { + try { + for await (const delta of source) { + this.append(delta) + } + // A backend that yields no terminal finish_reason still ended the + // stream — record completion so readers stop tailing. + this.finish('done') + } catch (err) { + // Cancel surfaces here as an aborted stream; classify it as such. + if (this.ac.signal.aborted) { + this.finish('cancelled') + } else { + // A typed error before any delta (seq 0) is a dispatch-time + // rejection — record it so the fresh dispatcher can return the + // right HTTP status. The error delta still lands in the buffer so + // re-attaching readers see a terminal frame. + if (this.seq === 0) this.setupError = err + this.append({ finish_reason: 'error' }) + this.finish('error') + console.error(`[cli-bridge] run ${this.id} failed:`, err) + } + } + })() + return this.settled + } + + private append(delta: ChatDelta): void { + this.seq += 1 + this.buffer.push({ seq: this.seq, delta }) + this.wakeAll() + } + + private finish(status: RunStatus): void { + if (this.status !== 'running') return + this.status = status + this.endedAt = Date.now() + this.wakeAll() + // Keep the buffer around briefly so a reconnecting client can still + // drain the tail (and the final finish_reason). Cancelled runs are + // reaped on the same delay — the client asked for them gone, but a + // racing reader may still want the cancellation notice. + this.scheduleReap() + } + + private scheduleReap(): void { + if (this.reapTimer) return + this.reapTimer = setTimeout(() => this.onReap(this.id), this.reapDelayMs) + this.reapTimer.unref?.() + } + + /** + * Explicit cancel — the ONLY path that kills the subprocess. Aborts the + * owned controller, which the backend's `chat()` honors via its + * `signal.addEventListener('abort', killTree)` wiring. + */ + cancel(): void { + if (this.status !== 'running') return + this.ac.abort() + // pump() observes the aborted signal and records 'cancelled'. + } + + /** Drop all buffered deltas + timers. Called by the registry on reap. */ + dispose(): void { + if (this.reapTimer) { + clearTimeout(this.reapTimer) + this.reapTimer = null + } + this.disposed = true + this.buffer.length = 0 + this.wakeAll() + } + + private wakeAll(): void { + for (const w of this.waiters) w.resolve() + this.waiters.clear() + } + + private waitForChange(): Promise { + return new Promise((resolve) => { + const waiter: Waiter = { resolve } + this.waiters.add(waiter) + }) + } + + /** + * Attach a reader. Replays every buffered delta with `seq > afterSeq` + * (0 = from the start), then tails live deltas until the run reaches a + * terminal status and the buffer is drained. + * + * `afterSeq` is the client's `Last-Event-ID`: a reconnecting client + * passes the last seq it saw and gets EXACTLY the deltas it missed — + * no gap, no duplicate, no cold restart. + */ + async *attach(afterSeq = 0): AsyncGenerator { + let cursor = afterSeq + while (true) { + // The buffer is append-only with contiguous seqs starting at 1, so + // the first un-yielded delta lives at index `cursor` (seq N is at + // index N-1). Slicing from there keeps tailing linear in deltas + // produced rather than re-scanning the whole buffer on every wake — + // a 30-min run emits thousands of deltas, and O(n²) there would + // dominate the run. + for (let i = cursor; i < this.buffer.length; i++) { + const item = this.buffer[i] + if (!item) break + cursor = item.seq + yield item + } + if (this.isTerminal() && cursor >= this.seq) return + // The run was reaped while we were still attached: the buffer is + // gone and no further deltas will ever arrive. Stop rather than + // wait on a wake that will never come. + if (this.disposed) return + await this.waitForChange() + } + } +} + +export interface RunRegistryOptions { + /** + * How long a finished run's buffer survives so a reconnecting client + * can drain the tail. Default 60s. + */ + reapDelayMs?: number +} + +/** + * Process-wide registry of durable runs, keyed by run id. Idempotent: + * `getOrCreate` returns the live run for a known id (retry re-attaches) + * and only invokes the factory for a genuinely new id. + */ +export class RunRegistry { + private readonly runs = new Map() + private readonly reapDelayMs: number + + constructor(opts: RunRegistryOptions = {}) { + this.reapDelayMs = opts.reapDelayMs ?? 60_000 + } + + get(id: string): Run | undefined { + return this.runs.get(id) + } + + /** + * Idempotent dispatch. If `id` names a known run, return it WITHOUT + * calling `start` — a retry re-attaches to the same subprocess. For a + * new id, create the run, hand it to `start` (which wires the backend + * source and begins pumping), and register it. + * + * `start` receives the run so it can read `run.signal` for the backend + * call and call `run.pump(source)`. + */ + getOrCreate(id: string, start: (run: Run) => void): Run { + const existing = this.runs.get(id) + if (existing) return existing + const run = new Run(id, (rid) => this.reap(rid), this.reapDelayMs) + this.runs.set(id, run) + start(run) + return run + } + + /** Explicit cancel by id. Returns true if a live run was cancelled. */ + cancel(id: string): boolean { + const run = this.runs.get(id) + if (!run || run.isTerminal()) return false + run.cancel() + return true + } + + private reap(id: string): void { + const run = this.runs.get(id) + if (!run) return + this.runs.delete(id) + run.dispose() + } + + /** Test/shutdown aid — cancel + drop every run. */ + clear(): void { + for (const run of this.runs.values()) { + run.cancel() + run.dispose() + } + this.runs.clear() + } +} diff --git a/src/runtime/single-instance.ts b/src/runtime/single-instance.ts new file mode 100644 index 0000000..9b0be95 --- /dev/null +++ b/src/runtime/single-instance.ts @@ -0,0 +1,134 @@ +/** + * Single-instance guard — one cli-bridge per BRIDGE_PORT. + * + * Two bridges bound to the same port is the silent-corruption failure + * mode: the second `serve()` either EADDRINUSE-crashes (loud, fine) or — + * worse, under a racing restart — both processes spawn subprocesses, + * both write the SAME `sessions.sqlite`, and runs get killed out from + * under each other. The job/connection decoupling and the durable run + * buffer both assume exactly one owner of the run registry; this guard + * enforces that assumption before we ever listen. + * + * Mechanism: an atomic `O_CREAT | O_EXCL` pidfile keyed by port. Node + * ships no `flock`, so we use the portable, dependency-free pidfile + * pattern with a liveness reclaim: + * + * - Create the pidfile exclusively. Win → we own the port. + * - On EEXIST, read the holder pid and probe it with `kill(pid, 0)`. + * - Holder alive → throw PortAlreadyBoundError (refuse to start). + * - Holder dead → the file is STALE (predecessor SIGKILL'd, + * never ran its release). Reclaim atomically. + * + * The liveness reclaim is what makes this safe under systemd + * `Restart=always`: a SIGKILL'd predecessor leaves the pidfile behind, + * but its pid is gone, so the restart reclaims instead of wedging. A + * graceful exit removes the file in `release()`. + */ + +import { openSync, writeSync, closeSync, readFileSync, unlinkSync, constants as fsConstants } from 'node:fs' +import { tmpdir } from 'node:os' +import { join } from 'node:path' + +export interface InstanceLock { + /** Absolute path to the pidfile this lock holds. */ + path: string + /** Remove the pidfile. Idempotent — safe to call from shutdown + atexit. */ + release(): void +} + +export class PortAlreadyBoundError extends Error { + constructor( + public readonly port: number, + public readonly lockPath: string, + public readonly holderPid: number | null, + ) { + super( + `cli-bridge is already running on port ${port}` + + (holderPid ? ` (pid ${holderPid})` : '') + + `. Lockfile ${lockPath} is held by a live process. ` + + `Stop the other instance or set BRIDGE_PORT to a free port.`, + ) + this.name = 'PortAlreadyBoundError' + } +} + +/** + * Acquire the per-port single-instance lock. Throws + * `PortAlreadyBoundError` (a fatal startup error — see + * `isFatalServerStartupError`) when a LIVE process already holds it. + * Reclaims a stale lockfile left by a crashed predecessor. Returns a + * handle whose `release()` removes the pidfile. + * + * `dir` defaults to the OS temp dir — a writable location even under + * systemd `ProtectSystem=strict`. PrivateTmp gives each unit its own + * /tmp namespace, which is correct: the guard is per-host-port within + * one namespace, and systemd never runs two instances of the same + * templated unit on the same port. + */ +export function acquireInstanceLock(port: number, dir: string = tmpdir()): InstanceLock { + const path = join(dir, `cli-bridge-${port}.pid`) + claim(path, port) + + let released = false + return { + path, + release(): void { + if (released) return + released = true + // Only remove the file if it still carries OUR pid — never delete + // a lock a successor reclaimed after we were declared dead. + try { + if (readHolderPid(path) === process.pid) unlinkSync(path) + } catch { /* already gone */ } + }, + } +} + +/** + * Try to create the pidfile exclusively; on collision, reclaim iff the + * recorded holder is dead. Retries once after a reclaim to close the + * (vanishingly small) race where two reclaimers fight — the second sees + * the first's fresh pid and correctly refuses. + */ +function claim(path: string, port: number, attempt = 0): void { + let fd: number + try { + fd = openSync(path, fsConstants.O_WRONLY | fsConstants.O_CREAT | fsConstants.O_EXCL, 0o644) + } catch (err) { + if ((err as NodeJS.ErrnoException).code !== 'EEXIST') throw err + const holderPid = readHolderPid(path) + if (holderPid !== null && isAlive(holderPid)) { + throw new PortAlreadyBoundError(port, path, holderPid) + } + // Stale lockfile (holder dead or unreadable). Reclaim atomically. + if (attempt >= 2) { + // A live successor reclaimed faster than us — treat as bound. + throw new PortAlreadyBoundError(port, path, holderPid) + } + try { unlinkSync(path) } catch { /* someone else reclaimed first */ } + claim(path, port, attempt + 1) + return + } + writeSync(fd, `${process.pid}\n`) + closeSync(fd) +} + +function readHolderPid(path: string): number | null { + try { + const pid = Number.parseInt(readFileSync(path, 'utf8').trim(), 10) + return Number.isInteger(pid) && pid > 0 ? pid : null + } catch { + return null + } +} + +/** `kill(pid, 0)` probes existence without delivering a signal. */ +function isAlive(pid: number): boolean { + try { + process.kill(pid, 0) + return true + } catch (err) { + // EPERM = exists but not ours to signal (still alive). ESRCH = gone. + return (err as NodeJS.ErrnoException).code === 'EPERM' + } +} diff --git a/src/server.ts b/src/server.ts index 51c6ba0..ee8d3ac 100644 --- a/src/server.ts +++ b/src/server.ts @@ -8,7 +8,7 @@ import { Hono } from 'hono' import { serve } from '@hono/node-server' -import { loadConfig, type Config } from './config.js' +import { anyBackendSpawnsOnHost, loadConfig, type Config } from './config.js' import { SessionStore } from './sessions/store.js' import { BackendRegistry } from './backends/registry.js' import { ClaudeBackend } from './backends/claude.js' @@ -31,6 +31,7 @@ import { mountHealth } from './routes/health.js' import { mountModels } from './routes/models.js' import { mountProfiles } from './routes/profiles.js' import { mountSessions } from './routes/sessions.js' +import { mountRuns } from './routes/runs.js' import { mountCadRender } from './routes/cad-render.js' import { mountImagesGenerate } from './routes/images-generate.js' import { mountMetrics, registerPoolForMetrics } from './routes/metrics.js' @@ -39,6 +40,9 @@ import { createDockerSpawner } from './executors/docker.js' import type { Spawner } from './executors/types.js' import type { BackendExecutorConfig } from './config.js' import { AdmissionGate } from './admission.js' +import { RunRegistry } from './runs/registry.js' +import { selectJailBackend } from './jail/index.js' +import { acquireInstanceLock, PortAlreadyBoundError } from './runtime/single-instance.js' function parseEnvPositiveInt(name: string, fallback: number): number { const raw = process.env[name] @@ -102,11 +106,13 @@ export async function buildApp(config: Config): Promise<{ app: Hono sessions: SessionStore registry: BackendRegistry + runs: RunRegistry catalog: ProfileCatalog extras: BuildAppExtras }> { const sessions = new SessionStore(config.dataDir) const registry = new BackendRegistry() + const runs = new RunRegistry() const extras: BuildAppExtras = { shutdownHooks: [] } const catalog = createProfileCatalog(config.sandboxProfilesDir) const admission = new AdmissionGate(config.admission) @@ -233,8 +239,9 @@ export async function buildApp(config: Config): Promise<{ mountHealth(app, { registry, admission }) mountModels(app, { registry, catalog }) mountSessions(app, { sessions }) + mountRuns(app, { runs }) mountProfiles(app, { catalog }) - mountChatCompletions(app, { registry, sessions, admission }) + mountChatCompletions(app, { registry, sessions, runs, admission }) mountCadRender(app) mountImagesGenerate(app) mountMetrics(app) @@ -249,12 +256,14 @@ export async function buildApp(config: Config): Promise<{ '/v1/models', '/v1/chat/completions', '/v1/sessions', + '/v1/runs/:id', + '/v1/runs/:id/cancel', '/cad/render', '/images/generate', ], })) - return { app, sessions, registry, catalog, extras } + return { app, sessions, registry, runs, catalog, extras } } function constantTimeEqual(a: string, b: string): boolean { @@ -266,7 +275,27 @@ function constantTimeEqual(a: string, b: string): boolean { export async function startServer(): Promise { const config = loadConfig() + + // Single-instance guard — refuse to start a second bridge on the same + // BRIDGE_PORT. Acquired BEFORE buildApp opens sessions.sqlite so two + // instances never share that DB. A clean exit here keeps systemd happy: + // Type=simple + Restart=always retries, and a dead predecessor's stale + // lock is reclaimed automatically (see single-instance.ts). + let instanceLock + try { + instanceLock = acquireInstanceLock(config.port) + } catch (err) { + if (err instanceof PortAlreadyBoundError) { + console.error(`[cli-bridge] ${err.message}`) + process.exit(1) + } + throw err + } + const { app, sessions, extras } = await buildApp(config) + // Drop the lock on hard exit too — graceful shutdown also releases, but + // a process.exit() path (fatal error) must not strand the pidfile. + process.once('exit', () => instanceLock.release()) const server = serve({ fetch: app.fetch, hostname: config.host, @@ -285,6 +314,28 @@ export async function startServer(): Promise { console.log(`[cli-bridge] bearer: ${config.bearer ? 'required' : 'none (loopback only)'}`) console.log(`[cli-bridge] host admission: maxActive=${config.admission.maxActive} maxQueue=${config.admission.maxQueue} queueTimeoutMs=${config.admission.queueTimeoutMs}`) console.log(`[cli-bridge] write-jail default: ${config.jailMode}${config.jailMode === 'write-jail' ? ` root=${config.jailRoot ?? '/.agent-home'}` : ''}`) + // Fail fast (don't go ready) if write-jail is the operator floor but no jail + // backend can run here — every host request would otherwise fail closed while + // /health reports ready. Only relevant when some backend actually spawns on the + // host: docker/remote-only deployments never hit the host jail. Honor + // BRIDGE_JAIL_FALLBACK=warn, which runs unconfined-with-warning instead. + const hasHostSpawn = anyBackendSpawnsOnHost(config.backends, config.executors) + if (config.jailMode === 'write-jail' && hasHostSpawn && !selectJailBackend().isAvailable()) { + if (process.env.BRIDGE_JAIL_FALLBACK === 'warn') { + console.warn( + '[cli-bridge] WARNING: BRIDGE_JAIL_MODE=write-jail but no jail backend can run here; ' + + 'requests run UNCONFINED (BRIDGE_JAIL_FALLBACK=warn).', + ) + } else { + console.error( + '[cli-bridge] FATAL: BRIDGE_JAIL_MODE=write-jail but no write-jail backend can run on this ' + + 'host — every host request would fail closed. Enable unprivileged user namespaces (Linux: ' + + '`sudo sysctl -w kernel.apparmor_restrict_unprivileged_userns=0` or `sudo chmod u+s /usr/bin/bwrap`), ' + + 'ensure sandbox-exec exists (macOS), set BRIDGE_JAIL_FALLBACK=warn, or unset BRIDGE_JAIL_MODE.', + ) + process.exit(1) + } + } for (const cfg of Object.values(config.executors)) { if (cfg.kind === 'docker') { console.log(`[cli-bridge] ${cfg.name} executor: docker pool size=${cfg.poolSize} image=${cfg.image}`) @@ -307,6 +358,7 @@ export async function startServer(): Promise { } server.close(() => { sessions.close() + instanceLock.release() process.exit(0) }) setTimeout(() => process.exit(1), 5000).unref() @@ -363,7 +415,10 @@ if (import.meta.url === `file://${process.argv[1]}`) { } function isFatalServerStartupError(err: Error): boolean { + // A second instance racing for the same port is fatal — exit cleanly so + // the operator sees the clear message rather than a survived exception. + if (err instanceof PortAlreadyBoundError) return true const code = (err as NodeJS.ErrnoException).code if (code === 'EADDRINUSE' || code === 'EACCES') return true - return /listen|address already in use/i.test(err.message) + return /listen|address already in use|already running on port/i.test(err.message) } diff --git a/src/streaming/sse.ts b/src/streaming/sse.ts index ebefb84..833906b 100644 --- a/src/streaming/sse.ts +++ b/src/streaming/sse.ts @@ -8,7 +8,6 @@ */ import type { ChatDelta } from '../backends/types.js' -import { tokensFromChars } from '../backends/content.js' export interface ChunkMeta { id: string @@ -56,19 +55,30 @@ export function deltaToOpenAIChunk(delta: ChatDelta, meta: ChunkMeta): string | })) } + // A usage-only delta (no content/tool_calls/finish) is the OpenAI usage trailer: + // it must carry `choices: []`, not an empty choice, or strict clients misparse it + // as assistant output. Every other delta carries the single streaming choice. + const usageOnly = hasUsage && !hasContent && !hasToolCalls && !hasFinish const payload = { id: meta.id, object: 'chat.completion.chunk', created: meta.created, model: meta.model, - choices: [ + choices: usageOnly ? [] : [ { index: 0, delta: choiceDelta, finish_reason: delta.finish_reason ?? null, }, ], - ...(delta.usage ? { usage: { prompt_tokens: delta.usage.input_tokens, completion_tokens: delta.usage.output_tokens } } : {}), + ...(delta.usage ? { + usage: { + prompt_tokens: delta.usage.input_tokens ?? 0, + completion_tokens: delta.usage.output_tokens ?? 0, + total_tokens: (delta.usage.input_tokens ?? 0) + (delta.usage.output_tokens ?? 0), + ...(delta.usage.estimated ? { estimated: true } : {}), + }, + } : {}), } return `data: ${JSON.stringify(payload)}\n\n` } @@ -108,7 +118,6 @@ export function makeChunkMeta(model: string): ChunkMeta { export async function collectNonStreaming( iter: AsyncIterable, model: string, - promptText = '', ): Promise { let content = '' const toolCalls: Array<{ id: string; name: string; arguments: string }> = [] @@ -125,22 +134,16 @@ export async function collectNonStreaming( if (d.usage) usage = d.usage } - // Backends whose CLI emits no usage (kimi-code, opencode) would otherwise - // report zero tokens, indistinguishable from a stub. Estimate from the text so - // cost is approximable; flag `estimated` so consumers never treat it as measured. - const completionChars = content.length + toolCalls.reduce((s, tc) => s + (tc.arguments?.length ?? 0), 0) + // Usage (estimated or measured) is produced upstream in the run source, so + // here we only normalise to the OpenAI shape, preserving the `estimated` flag. const usageOut = usage ? { prompt_tokens: usage.input_tokens ?? 0, completion_tokens: usage.output_tokens ?? 0, total_tokens: (usage.input_tokens ?? 0) + (usage.output_tokens ?? 0), + ...(usage.estimated ? { estimated: true } : {}), } - : { - prompt_tokens: tokensFromChars(promptText.length), - completion_tokens: tokensFromChars(completionChars), - total_tokens: tokensFromChars(promptText.length) + tokensFromChars(completionChars), - estimated: true, - } + : undefined return { id: `chatcmpl-${crypto.randomUUID().replace(/-/g, '').slice(0, 24)}`, @@ -165,6 +168,6 @@ export async function collectNonStreaming( finish_reason: finishReason ?? 'stop', }, ], - usage: usageOut, + ...(usageOut ? { usage: usageOut } : {}), } } diff --git a/tests/execution-router.test.ts b/tests/execution-router.test.ts index e8911e4..9c85b3c 100644 --- a/tests/execution-router.test.ts +++ b/tests/execution-router.test.ts @@ -21,6 +21,7 @@ import { describe, expect, it } from 'vitest' import { mountChatCompletions } from '../src/routes/chat-completions.js' import { BackendRegistry } from '../src/backends/registry.js' import { SessionStore } from '../src/sessions/store.js' +import { RunRegistry } from '../src/runs/registry.js' import type { Backend, BackendHealth, ChatDelta, ChatRequest } from '../src/backends/types.js' import { mkdtempSync, rmSync } from 'node:fs' import { tmpdir } from 'node:os' @@ -47,7 +48,7 @@ function buildApp(backends: Backend[]): { app: Hono; cleanup: () => void } { const registry = new BackendRegistry() for (const b of backends) registry.register(b) const app = new Hono() - mountChatCompletions(app, { registry, sessions }) + mountChatCompletions(app, { registry, sessions, runs: new RunRegistry() }) return { app, cleanup: () => rmSync(dir, { recursive: true, force: true }) } } diff --git a/tests/jail.test.ts b/tests/jail.test.ts index 82a64c8..88846d8 100644 --- a/tests/jail.test.ts +++ b/tests/jail.test.ts @@ -16,7 +16,7 @@ */ import { existsSync } from 'node:fs' -import { mkdtemp, readFile, realpath, rm, symlink } from 'node:fs/promises' +import { mkdir, mkdtemp, readFile, realpath, rm, symlink, writeFile } from 'node:fs/promises' import { homedir, tmpdir } from 'node:os' import { join, resolve } from 'node:path' import { afterEach, describe, expect, it } from 'vitest' @@ -28,7 +28,10 @@ import { } from '../src/jail/index.js' import { DEFAULT_JAIL_ROOT, resolveJailSpec } from '../src/jail/resolve-spec.js' import { applyJail } from '../src/executors/jail-support.js' -import { authSourcesFor, jailRelPath } from '../src/jail/auth-preserve.js' +import { authSourcesFor } from '../src/jail/auth-preserve.js' +import { ignoreJailRoot } from '../src/jail/types.js' +import { anyBackendSpawnsOnHost } from '../src/config.js' +import type { BackendExecutorConfig } from '../src/config.js' import type { JailBackend } from '../src/jail/index.js' /** Index of the first position where `seq` appears contiguously in `argv`, else -1. */ @@ -119,6 +122,13 @@ describe('MacosSeatbeltJail.wrap', () => { expect(profile).toContain('(deny file-write* (subpath "/"))') expect(profile).toContain('(allow file-write*') + // Regression: shared temp trees must NOT be writable — a confined run could + // otherwise persist files outside the jail root. Temp goes to /.tmp. + expect(profile, 'must not whitelist the per-user temp tree').not.toContain('/private/var/folders') + expect(profile, 'must not whitelist /private/tmp').not.toContain('/private/tmp') + // Standard device nodes stay writable so output redirection / RNG still work. + expect(profile).toContain('(literal "/dev/null")') + // The root is canonicalized (realpath) before embedding in the profile. const expectedRoot = await realpath(resolveJailRoot(root, projectDir)) expect(profile).toContain(`(subpath "${expectedRoot}")`) @@ -148,27 +158,96 @@ describe('resolveJailRoot containment', () => { const base = await realpath(await tempProjectDir()) expect(resolveJailRoot('.agent-home', base)).toBe(join(base, '.agent-home')) }) -}) -describe('auth preservation', () => { - it('jailRelPath maps a host auth path to its $HOME-relative location', () => { - expect(jailRelPath(join(homedir(), '.claude'))).toBe('.claude') - expect(jailRelPath(join(homedir(), '.config', 'opencode'))).toBe(join('.config', 'opencode')) + it('ignoreJailRoot adds an anchored, idempotent exclude entry', async () => { + const base = await realpath(await tempProjectDir()) + await mkdir(join(base, '.git', 'info'), { recursive: true }) + ignoreJailRoot(base, join(base, '.agent-home')) + ignoreJailRoot(base, join(base, '.agent-home')) + const exclude = await readFile(join(base, '.git', 'info', 'exclude'), 'utf8') + expect(exclude.match(/^\/\.agent-home\/$/gm)?.length).toBe(1) + }) + + it('ignoreJailRoot finds the repo when cwd is a subdirectory (anchored to repo root)', async () => { + const base = await realpath(await tempProjectDir()) + await mkdir(join(base, '.git', 'info'), { recursive: true }) + const sub = join(base, 'pkg', 'app') + await mkdir(sub, { recursive: true }) + ignoreJailRoot(sub, join(sub, '.agent-home')) + const exclude = await readFile(join(base, '.git', 'info', 'exclude'), 'utf8') + expect(exclude).toContain('/pkg/app/.agent-home/') }) + it('ignoreJailRoot follows a .git FILE (worktree) to the real gitdir', async () => { + const base = await realpath(await tempProjectDir()) + const realGit = join(base, 'realgit') + await mkdir(join(realGit, 'info'), { recursive: true }) + await writeFile(join(base, '.git'), `gitdir: ${realGit}\n`) + ignoreJailRoot(base, join(base, '.agent-home')) + const exclude = await readFile(join(realGit, 'info', 'exclude'), 'utf8') + expect(exclude).toContain('/.agent-home/') + }) +}) + +describe('auth preservation', () => { it('authSourcesFor maps registered harness aliases to the same creds, [] for unknown', () => { expect(authSourcesFor('totally-unknown-backend')).toEqual([]) // The server registers 'claude-code'/'claudish'/'kimi-code', not 'claude'/'kimi'. expect(authSourcesFor('claude-code')).toEqual(authSourcesFor('claude')) expect(authSourcesFor('claudish')).toEqual(authSourcesFor('claude')) expect(authSourcesFor('kimi-code')).toEqual(authSourcesFor('kimi')) - for (const p of authSourcesFor('claude-code')) { - expect(existsSync(p), `${p} should exist`).toBe(true) - expect(p.startsWith(homedir())).toBe(true) + for (const { source, jailRel } of authSourcesFor('claude-code')) { + expect(existsSync(source), `${source} should exist`).toBe(true) + expect(source.startsWith(homedir())).toBe(true) + // jailRel must be a relative location strictly inside the jail root. + expect(jailRel.startsWith('/'), `${jailRel} must be relative`).toBe(false) + expect(jailRel.startsWith('..'), `${jailRel} must not escape the root`).toBe(false) + } + // codex must be preserved too (no-MCP jailed codex would otherwise lose ~/.codex), + // and tagged so the jail redirects CODEX_HOME at the in-jail copy. + for (const { source, jailRel, envVar } of authSourcesFor('codex')) { + expect(source.endsWith('.codex')).toBe(true) + expect(jailRel).toBe('.codex') + expect(envVar).toBe('CODEX_HOME') + } + }) + + it('authSourcesFor(codex) honors a custom CODEX_HOME outside HOME, mapped to the jail ~/.codex', async () => { + const ext = await mkdtemp(join(tmpdir(), 'cli-bridge-codexhome-')) + cleanups.push(() => rm(ext, { recursive: true, force: true })) + await writeFile(join(ext, 'auth.json'), '{}') + const prev = process.env.CODEX_HOME + process.env.CODEX_HOME = ext + try { + const codexEntries = authSourcesFor('codex').filter((e) => e.jailRel === '.codex') + // Exactly one .codex entry, pointing at the custom CODEX_HOME (not ~/.codex), + // still placed at the jail's ~/.codex so a confined codex (HOME=root) finds it, + // and tagged with the env var the jail will redirect. + expect(codexEntries).toHaveLength(1) + expect(codexEntries[0]?.source).toBe(resolve(ext)) + expect(codexEntries[0]?.envVar).toBe('CODEX_HOME') + } finally { + if (prev === undefined) delete process.env.CODEX_HOME + else process.env.CODEX_HOME = prev } - // codex must be preserved too (no-MCP jailed codex would otherwise lose ~/.codex). - for (const p of authSourcesFor('codex')) { - expect(p.endsWith('.codex')).toBe(true) + }) + + it('authSourcesFor(pi) preserves ~/.pi/agent so a jailed pi keeps its provider/model config', async () => { + const fakeHome = await mkdtemp(join(tmpdir(), 'cli-bridge-pihome-')) + cleanups.push(() => rm(fakeHome, { recursive: true, force: true })) + await mkdir(join(fakeHome, '.pi', 'agent'), { recursive: true }) + await writeFile(join(fakeHome, '.pi', 'agent', 'config.json'), '{"provider":"x"}') + const prev = process.env.HOME + process.env.HOME = fakeHome + try { + const sources = authSourcesFor('pi') + expect(sources).toHaveLength(1) + // Lands at the jail's ~/.pi/agent, where pi (HOME=root) reads its state. + expect(sources[0]?.jailRel).toBe('.pi/agent') + expect(sources[0]?.source).toBe(join(fakeHome, '.pi', 'agent')) + } finally { + if (prev === undefined) delete process.env.HOME + else process.env.HOME = prev } }) @@ -177,13 +256,50 @@ describe('auth preservation', () => { cleanups.push(() => rm(authDir, { recursive: true, force: true })) const projectDir = await tempProjectDir() const root = join(projectDir, '.agent-home') - const wrap = await new LinuxBwrapJail().wrap('/bin/sh', ['-c', 'x'], { root, projectDir, authSources: [authDir] }) + const wrap = await new LinuxBwrapJail().wrap('/bin/sh', ['-c', 'x'], { + root, + projectDir, + authSources: [{ source: authDir, jailRel: '.claude' }], + }) const expectedRoot = resolveJailRoot(root, projectDir) expect( - seqIndex(wrap.args, '--ro-bind', authDir, join(expectedRoot, jailRelPath(authDir))), + seqIndex(wrap.args, '--ro-bind', authDir, join(expectedRoot, '.claude')), 'auth source ro-bound into the jail HOME', ).toBeGreaterThanOrEqual(0) }) + + it('bwrap redirects an auth env var (CODEX_HOME) at the in-jail copy — only when it wraps', async () => { + const authDir = await mkdtemp(join(homedir(), '.cli-bridge-codexauth-')) + cleanups.push(() => rm(authDir, { recursive: true, force: true })) + const projectDir = await tempProjectDir() + const root = join(projectDir, '.agent-home') + const wrap = await new LinuxBwrapJail().wrap('/bin/sh', ['-c', 'x'], { + root, + projectDir, + authSources: [{ source: authDir, jailRel: '.codex', envVar: 'CODEX_HOME' }], + }) + const expectedRoot = resolveJailRoot(root, projectDir) + expect(seqIndex(wrap.args, '--ro-bind', authDir, join(expectedRoot, '.codex'))).toBeGreaterThanOrEqual(0) + expect( + seqIndex(wrap.args, '--setenv', 'CODEX_HOME', join(expectedRoot, '.codex')), + 'CODEX_HOME redirected to the in-jail copy', + ).toBeGreaterThanOrEqual(0) + }) + + it('seatbelt returns an auth env var (CODEX_HOME) pointing at the in-jail copy', async () => { + const authDir = await mkdtemp(join(homedir(), '.cli-bridge-codexauth-')) + cleanups.push(() => rm(authDir, { recursive: true, force: true })) + const projectDir = await tempProjectDir() + const root = join(projectDir, '.agent-home') + const wrap = await new MacosSeatbeltJail().wrap('/bin/sh', ['-c', 'x'], { + root, + projectDir, + authSources: [{ source: authDir, jailRel: '.codex', envVar: 'CODEX_HOME' }], + }) + if (wrap.cleanup) cleanups.push(async () => { await wrap.cleanup?.() }) + const expectedRoot = await realpath(resolveJailRoot(root, projectDir)) + expect(wrap.env?.CODEX_HOME).toBe(join(expectedRoot, '.codex')) + }) }) describe('applyJail fail-closed', () => { @@ -290,3 +406,38 @@ describe('resolveJailSpec', () => { } }) }) + +describe('anyBackendSpawnsOnHost (startup jail fail-fast gate)', () => { + const docker = (name: string): BackendExecutorConfig => ({ name, kind: 'docker' }) + const host = (name: string): BackendExecutorConfig => ({ name, kind: 'host' }) + + it('is true for the default host-CLI backends', () => { + expect(anyBackendSpawnsOnHost(new Set(['claude', 'kimi', 'gemini']), {})).toBe(true) + }) + + it('is true for ACP backends absent from the executor map (hermes/openclaw)', () => { + // Regression: hermes/openclaw forward the jailSpec to the host spawner but are + // not in config.executors, so an executor-only check missed them and let an + // ACP-only write-jail deployment boot "healthy" then fail every request. + expect(anyBackendSpawnsOnHost(new Set(['hermes', 'openclaw']), {})).toBe(true) + expect(anyBackendSpawnsOnHost(new Set(['sandbox', 'passthrough', 'hermes']), {})).toBe(true) + }) + + it('is false when every enabled backend is remote/proxy (no host spawn)', () => { + expect(anyBackendSpawnsOnHost(new Set(['sandbox', 'passthrough', 'nanoclaw']), {})).toBe(false) + }) + + it('is false when the only host-CLI backend is pinned to a docker executor', () => { + expect(anyBackendSpawnsOnHost(new Set(['claude', 'sandbox']), { claude: docker('claude') })).toBe(false) + }) + + it('is true when at least one host-CLI backend keeps a host executor', () => { + expect( + anyBackendSpawnsOnHost(new Set(['claude', 'kimi']), { claude: docker('claude'), kimi: host('kimi') }), + ).toBe(true) + }) + + it('is false for an empty backend set', () => { + expect(anyBackendSpawnsOnHost(new Set(), {})).toBe(false) + }) +}) diff --git a/tests/mcp-passthrough.test.ts b/tests/mcp-passthrough.test.ts index f378523..9408abf 100644 --- a/tests/mcp-passthrough.test.ts +++ b/tests/mcp-passthrough.test.ts @@ -34,6 +34,7 @@ import { join } from 'node:path' import { Hono } from 'hono' import { BackendRegistry } from '../src/backends/registry.js' import { SessionStore } from '../src/sessions/store.js' +import { RunRegistry } from '../src/runs/registry.js' import type { Backend, ChatDelta, ChatRequest } from '../src/backends/types.js' import type { SessionRecord } from '../src/sessions/store.js' import { mountChatCompletions } from '../src/routes/chat-completions.js' @@ -74,7 +75,7 @@ describe('chat-completions route — mcp body field', () => { backend = new CapturingBackend() const registry = new BackendRegistry().register(backend) app = new Hono() - mountChatCompletions(app, { registry, sessions }) + mountChatCompletions(app, { registry, sessions, runs: new RunRegistry() }) }) afterEach(() => { sessions.close() diff --git a/tests/smoke.test.ts b/tests/smoke.test.ts index 911f785..271da9d 100644 --- a/tests/smoke.test.ts +++ b/tests/smoke.test.ts @@ -14,6 +14,7 @@ import { join } from 'node:path' import { Hono } from 'hono' import { BackendRegistry } from '../src/backends/registry.js' import { SessionStore } from '../src/sessions/store.js' +import { RunRegistry } from '../src/runs/registry.js' import type { Backend, ChatDelta, ChatRequest } from '../src/backends/types.js' import type { SessionRecord } from '../src/sessions/store.js' import { ClaudeBackend } from '../src/backends/claude.js' @@ -276,7 +277,7 @@ describe('POST /v1/chat/completions', () => { .register(new FakeBackend('claude')) .register(new FakeBackend('claudish')) app = new Hono() - mountChatCompletions(app, { registry, sessions }) + mountChatCompletions(app, { registry, sessions, runs: new RunRegistry() }) }) afterEach(() => { sessions.close() @@ -313,7 +314,7 @@ describe('POST /v1/chat/completions', () => { app = new Hono() mountChatCompletions( app, - { registry: new BackendRegistry().register(new DelayedBackend('slow', 40)), sessions }, + { registry: new BackendRegistry().register(new DelayedBackend('slow', 40)), sessions, runs: new RunRegistry() }, ) try { const res = await app.request('/v1/chat/completions', { @@ -388,6 +389,7 @@ describe('POST /v1/chat/completions', () => { mountChatCompletions(app, { registry: new BackendRegistry().register(backend), sessions, + runs: new RunRegistry(), admission, }) @@ -545,7 +547,7 @@ describe('POST /v1/chat/completions', () => { const kimi = new KimiBackend({ bin: '/nonexistent', timeoutMs: 1000, harness: 'kimi-code' }) const registry = new BackendRegistry().register(kimi) const appLocal = new Hono() - mountChatCompletions(appLocal, { registry, sessions }) + mountChatCompletions(appLocal, { registry, sessions, runs: new RunRegistry() }) const res = await appLocal.request('/v1/chat/completions', { method: 'POST', diff --git a/tests/sse-keepalive.test.ts b/tests/sse-keepalive.test.ts index 63d1f00..dd18675 100644 --- a/tests/sse-keepalive.test.ts +++ b/tests/sse-keepalive.test.ts @@ -63,6 +63,21 @@ describe('deltaToOpenAIChunk', () => { function: { name: 'lookup', arguments: '{"q":"x"}' }, }) }) + + it('emits a usage-only delta as an OpenAI usage trailer (choices: []), preserving the estimated flag', () => { + const out = deltaToOpenAIChunk({ usage: { input_tokens: 100, output_tokens: 13, estimated: true } }, meta) + expect(out).not.toBeNull() + const payload = JSON.parse(out!.slice('data: '.length).replace(/\n\n$/, '')) + // Must be an empty choices array, NOT a fake empty choice, or strict clients + // misparse the trailer as assistant output. + expect(payload.choices).toEqual([]) + expect(payload.usage).toMatchObject({ + prompt_tokens: 100, + completion_tokens: 13, + total_tokens: 113, + estimated: true, + }) + }) }) describe('collectNonStreaming', () => {