diff --git a/absurd-bench/adapter.json b/absurd-bench/adapter.json index 847e8b1..891fdd2 100644 --- a/absurd-bench/adapter.json +++ b/absurd-bench/adapter.json @@ -6,5 +6,6 @@ "absurd.t_long_horizon_bench" ], "event_indexes": [], - "extensions": [] + "extensions": [], + "shutdown_grace_s": 20 } diff --git a/absurd-bench/main.py b/absurd-bench/main.py index d1cda11..f13c386 100644 --- a/absurd-bench/main.py +++ b/absurd-bench/main.py @@ -13,6 +13,7 @@ from time import monotonic from absurd_sdk import AsyncAbsurd +import psycopg from psycopg import AsyncConnection, sql from psycopg.rows import dict_row @@ -276,8 +277,42 @@ async def scenario_long_horizon() -> None: current_total_backlog = 0 current_producer_target_rate = float(producer_rate) - producer_conn = await connect() - depth_conn = await connect() + # Mirrors pgque-bench's ReconnectingConn so the producer/depth tasks + # transparently survive chaos cells (postgres-restart, pg_terminate_backend). + # Without this absurd's 0% recovery in chaos_postgres_restart_absurd / + # chaos_pg_backend_kill_absurd in the 2026-05-09 sweep (audit_absurd.md §6). + class ReconnectingConn: + __slots__ = ("_conn",) + + def __init__(self, conn: AsyncConnection) -> None: + self._conn = conn + + @property + def conn(self) -> AsyncConnection: + return self._conn + + def cursor(self): + return self._conn.cursor() + + async def close(self) -> None: + try: + await self._conn.close() + except Exception: + pass + + async def reconnect(self) -> None: + await self.close() + for _ in range(50): + if shutdown.is_set(): + return + try: + self._conn = await connect() + return + except Exception: + await asyncio.sleep(0.2) + + producer_conn = ReconnectingConn(await connect()) + depth_conn = ReconnectingConn(await connect()) worker_app = build_app( QUEUE_NAME, loop=loop, @@ -324,7 +359,11 @@ async def producer() -> None: ) started = monotonic() - await enqueue_batch(producer_conn, QUEUE_NAME, batch) + try: + await enqueue_batch(producer_conn.conn, QUEUE_NAME, batch) + except (psycopg.OperationalError, psycopg.errors.ConnectionDoesNotExist): + await producer_conn.reconnect() + continue elapsed_ms = (monotonic() - started) * 1000 sample_ts = loop.time() per_job_ms = elapsed_ms / max(len(batch), 1) @@ -346,7 +385,11 @@ async def depth_task() -> None: await asyncio.sleep(0.25) return while not shutdown.is_set(): - counts = await count_by_state(depth_conn, QUEUE_NAME) + try: + counts = await count_by_state(depth_conn.conn, QUEUE_NAME) + except (psycopg.OperationalError, psycopg.errors.ConnectionDoesNotExist): + await depth_conn.reconnect() + continue current_queue_depth = counts.get("pending", 0) current_running_depth = counts.get("running", 0) current_retryable_depth = counts.get("failed", 0) @@ -432,10 +475,19 @@ async def sampler() -> None: ) await asyncio.sleep(sample_every_s) + # CLAIM_TIMEOUT_SECS — exposes AsyncAbsurd.start_worker(claim_timeout=…), + # which the SDK defaults to 120s. That default holds a row-level lock for + # 120s after a SIGKILL'd replica, blocking other replicas from claiming + # those rows until the lease expires — direct cause of the rc=137 cluster + # in 2026-05-09 idle_in_tx / event_burst Phase C cells (audit_absurd.md + # §1, §3). 10s is short enough to clear within a chaos-recovery window + # and long enough to absorb a worker hiccup. + claim_timeout_secs = env_int("CLAIM_TIMEOUT_SECS", 10) worker_task = asyncio.create_task( worker_app.start_worker( concurrency=worker_count, poll_interval=POLL_INTERVAL_SECS, + claim_timeout=claim_timeout_secs, ) ) tasks = [ diff --git a/bench_harness/adapters.py b/bench_harness/adapters.py index 9ad294d..d32500e 100644 --- a/bench_harness/adapters.py +++ b/bench_harness/adapters.py @@ -42,6 +42,13 @@ class AdapterManifest: extensions: list[str] family: str = "" display_name: str = "" + # Per-adapter SIGTERM grace window before the harness escalates to + # SIGKILL. Default 10s — long enough for a clean Postgres connection + # close, short enough that a wedged adapter fails the cell rather than + # eating the wrapper script's 15-min ceiling. Adapters with documented + # slow-drain shutdown paths (e.g. pgmq archive, absurd fail-task-run) + # can override via `adapter.json -> shutdown_grace_s`. + shutdown_grace_s: float = 10.0 def __post_init__(self) -> None: # Sensible defaults: standalone systems are their own family and @@ -64,6 +71,7 @@ def load(cls, bench_dir: Path) -> "AdapterManifest": extensions=list(data.get("extensions", [])), family=data.get("family", "") or "", display_name=data.get("display_name", "") or "", + shutdown_grace_s=float(data.get("shutdown_grace_s", 10.0)), ) diff --git a/bench_harness/orchestrator.py b/bench_harness/orchestrator.py index 54d8e89..e04d6fc 100644 --- a/bench_harness/orchestrator.py +++ b/bench_harness/orchestrator.py @@ -815,7 +815,10 @@ def run_one_system( daemon.join(timeout=5.0) if wait_sampler is not None: wait_sampler.stop() - pool.stop_all() + # Per-adapter SIGTERM grace before the pool escalates to SIGKILL. + # See AdapterManifest.shutdown_grace_s — defaults to 10s, adapters + # with a documented slow-drain shutdown path can override. + pool.stop_all(timeout_s=manifest.shutdown_grace_s) shutil.rmtree(control_dir, ignore_errors=True) # Representative descriptor for manifest inclusion. Prefer replica 0's diff --git a/bench_harness/replica_pool.py b/bench_harness/replica_pool.py index 44a6903..6d59acc 100644 --- a/bench_harness/replica_pool.py +++ b/bench_harness/replica_pool.py @@ -230,13 +230,53 @@ def restart_worker( def stop_all(self, *, timeout_s: float = 10.0) -> None: """Stop every RUNNING replica. Best-effort terminal teardown used at the end of run_one_system — idempotent and safe to call even - when some replicas were already KILLED or CRASHED mid-run.""" - for slot in self._slots: - if slot.state is ReplicaState.RUNNING: - self._terminate_slot( - slot, signal_type=_signal.SIGTERM, timeout_s=timeout_s - ) - slot.state = ReplicaState.STOPPED + when some replicas were already KILLED or CRASHED mid-run. + + Replicas are signalled in parallel and then waited on against a + single shared deadline rather than sequentially. Sequential + teardown would multiply wall-time by replica count: a pgmq cell + with `--replicas 32` and a 30 s adapter grace would burn 16 min + in shutdown alone, blowing past the wrapper script's per-cell + ceiling. Parallel signalling caps wall-time at roughly + ``timeout_s`` regardless of replica count. + + Emits per-replica shutdown breadcrumbs to stderr so post-mortem + analysis of a wrapper-timed-out cell can tell whether shutdown + wedged in the adapter vs. somewhere else. The breadcrumb + survives in `logs/.log` even after the wrapper SIGKILLs + the driver.""" + import time as _time + + targets: list[ReplicaSlot] = [ + slot for slot in self._slots if slot.state is ReplicaState.RUNNING + ] + if not targets: + return + + deadline = _time.monotonic() + timeout_s + + # Phase 1: send SIGTERM and close stdin to every running replica + # in parallel, without blocking on any individual exit. + for slot in targets: + print( + f"[{self.system}] replica {slot.instance_id} stop_all: " + f"sending SIGTERM, grace={timeout_s:.1f}s (shared deadline)", + file=sys.stderr, + ) + self._signal_slot(slot, _signal.SIGTERM) + + # Phase 2: wait for each replica against the shared deadline. + # Any slot that hasn't exited by `deadline` is SIGKILLed. + for slot in targets: + t0 = _time.monotonic() + self._reap_slot(slot, deadline=deadline) + elapsed = _time.monotonic() - t0 + print( + f"[{self.system}] replica {slot.instance_id} stop_all: " + f"exited in {elapsed:.2f}s", + file=sys.stderr, + ) + slot.state = ReplicaState.STOPPED # ── Health watch ─────────────────────────────────────────────────── @@ -266,13 +306,12 @@ def _check_id(self, instance_id: int) -> None: f"(capacity={self.capacity})" ) - def _terminate_slot( - self, - slot: ReplicaSlot, - *, - signal_type: int, - timeout_s: float, - ) -> None: + def _signal_slot(self, slot: ReplicaSlot, signal_type: int) -> None: + """Send `signal_type` to the slot's process and close stdin. + + Non-blocking: returns as soon as the signal has been delivered; + the caller is responsible for waiting on exit. Used by + ``stop_all`` to fan signals out across replicas in parallel.""" proc = slot.process if slot.stop_event is not None: slot.stop_event.set() @@ -291,13 +330,29 @@ def _terminate_slot( # Already gone between our poll() check and the signal — # not a fault, just racy teardown. pass + + def _reap_slot( + self, + slot: ReplicaSlot, + *, + deadline: float, + ) -> None: + """Wait for the slot's process to exit, escalating to SIGKILL if + the absolute monotonic ``deadline`` is reached. + + Sharing one deadline across replicas (as ``stop_all`` does) lets + teardown finish in O(grace) wall-time instead of O(N × grace).""" + import time as _time + + proc = slot.process + if proc is not None and proc.poll() is None: + remaining = max(0.0, deadline - _time.monotonic()) try: - proc.wait(timeout=timeout_s) + proc.wait(timeout=remaining) except subprocess.TimeoutExpired: - # Escalate if graceful window expired. print( f"[{self.system}] replica {slot.instance_id} did not exit " - f"within {timeout_s}s on {signal_type}; escalating to SIGKILL", + f"within shared grace; escalating to SIGKILL", file=sys.stderr, ) proc.kill() @@ -308,3 +363,18 @@ def _terminate_slot( slot.process = None slot.tailer = None slot.stop_event = None + + def _terminate_slot( + self, + slot: ReplicaSlot, + *, + signal_type: int, + timeout_s: float, + ) -> None: + """Synchronous send-then-wait. Used by single-slot lifecycle + operations (``stop_worker`` / ``kill_worker``). ``stop_all`` + uses ``_signal_slot`` + ``_reap_slot`` directly to fan-out.""" + import time as _time + + self._signal_slot(slot, signal_type) + self._reap_slot(slot, deadline=_time.monotonic() + timeout_s) diff --git a/oban-bench/lib/oban_bench/long_horizon.ex b/oban-bench/lib/oban_bench/long_horizon.ex index 8a88832..056d285 100644 --- a/oban-bench/lib/oban_bench/long_horizon.ex +++ b/oban-bench/lib/oban_bench/long_horizon.ex @@ -42,8 +42,10 @@ defmodule ObanBench.LongHorizon do # Batch size for Oban's documented bulk-insert API # (`Oban.insert_all/2`: https://hexdocs.pm/oban/Oban.html#insert_all/2 — # accepts a list of changesets, issues a single multi-row INSERT). - # Default 1 keeps existing row-by-row behaviour. - producer_batch_max = max(1, env_int("PRODUCER_BATCH_MAX", 1)) + # Default 128 to align with the other adapters (pgmq, pgque, awa, + # pgboss); pre-2026-05-09 default of 1 forced row-at-a-time inserts + # and was the largest factor in oban's flat ~280 jobs/s ceiling. + producer_batch_max = max(1, env_int("PRODUCER_BATCH_MAX", 128)) producer_batch_ms = max(1, env_int("PRODUCER_BATCH_MS", 10)) db_name = diff --git a/pgboss-bench/main.js b/pgboss-bench/main.js index 8a18e97..edeeb2b 100644 --- a/pgboss-bench/main.js +++ b/pgboss-bench/main.js @@ -231,55 +231,79 @@ async function scenarioLongHorizon() { } ); + // Catch connection-loss errors from any boss.* call and let the + // task loop continue. Without this, a single FATAL 57P0x from + // chaos_postgres_restart / chaos_pg_backend_kill crashed the whole + // process at rc=1 (audit_pgboss.md §4) — pg-boss's pg-pool will + // reconnect on its own; we just need to not propagate the rejection. + const isConnectionLoss = (err) => { + if (!err) return false; + const code = err.code || (err.cause && err.cause.code); + if (code === "57P01" || code === "57P02" || code === "57P03" || code === "ECONNRESET" || code === "ECONNREFUSED") { + return true; + } + const msg = String(err.message || err); + return /connection|terminat|shutdown|ECONN/i.test(msg); + }; + const producerTask = (async () => { let nextAt = nowMonoMs(); while (!shuttingDown) { - const targetRate = readProducerRate(producerRate); - currentProducerTargetRate = targetRate; - - let batchCount = 0; - if (producerMode === "depth-target") { - const stats = await boss.getQueueStats(QUEUE_NAME); - queueDepth = stats.queuedCount; - batchCount = Math.max(0, Math.min(producerBatchMax, targetDepth - queueDepth)); - if (batchCount === 0) { - await sleep(producerBatchMs); - continue; + try { + const targetRate = readProducerRate(producerRate); + currentProducerTargetRate = targetRate; + + let batchCount = 0; + if (producerMode === "depth-target") { + const stats = await boss.getQueueStats(QUEUE_NAME); + queueDepth = stats.queuedCount; + batchCount = Math.max(0, Math.min(producerBatchMax, targetDepth - queueDepth)); + if (batchCount === 0) { + await sleep(producerBatchMs); + continue; + } + } else { + const now = nowMonoMs(); + const credit = Math.max(0, ((now - nextAt) * targetRate) / 1000 + 1); + batchCount = Math.max(1, Math.min(producerBatchMax, Math.floor(credit))); } - } else { - const now = nowMonoMs(); - const credit = Math.max(0, ((now - nextAt) * targetRate) / 1000 + 1); - batchCount = Math.max(1, Math.min(producerBatchMax, Math.floor(credit))); - } - - const jobs = []; - for (let i = 0; i < batchCount; i += 1) { - seq += 1; - jobs.push({ - data: { - seq, - enqueued_at_ms: Date.now(), - payload_padding: payloadPadding, - }, - }); - } - const started = nowMonoMs(); - await boss.insert(QUEUE_NAME, jobs); - const elapsed = nowMonoMs() - started; - const perJobLatency = elapsed / Math.max(jobs.length, 1); - const sampleTs = nowMonoMs(); - for (let i = 0; i < jobs.length; i += 1) { - producerLatencies.push(sampleTs, perJobLatency); - } - enqueued += jobs.length; + const jobs = []; + for (let i = 0; i < batchCount; i += 1) { + seq += 1; + jobs.push({ + data: { + seq, + enqueued_at_ms: Date.now(), + payload_padding: payloadPadding, + }, + }); + } - if (producerMode === "fixed") { - nextAt += Math.round((jobs.length * 1000) / Math.max(targetRate, 1)); - const sleepFor = Math.max(0, nextAt - nowMonoMs()); - if (sleepFor > 0) { - await sleep(Math.min(sleepFor, producerBatchMs)); + const started = nowMonoMs(); + await boss.insert(QUEUE_NAME, jobs); + const elapsed = nowMonoMs() - started; + const perJobLatency = elapsed / Math.max(jobs.length, 1); + const sampleTs = nowMonoMs(); + for (let i = 0; i < jobs.length; i += 1) { + producerLatencies.push(sampleTs, perJobLatency); } + enqueued += jobs.length; + + if (producerMode === "fixed") { + nextAt += Math.round((jobs.length * 1000) / Math.max(targetRate, 1)); + const sleepFor = Math.max(0, nextAt - nowMonoMs()); + if (sleepFor > 0) { + await sleep(Math.min(sleepFor, producerBatchMs)); + } + } + } catch (err) { + if (isConnectionLoss(err)) { + console.error("[pgboss] producer connection lost; backing off 200ms", err.message || err); + await sleep(200); + continue; + } + throw err; } } })(); @@ -295,8 +319,16 @@ async function scenarioLongHorizon() { return; } while (!shuttingDown) { - const stats = await boss.getQueueStats(QUEUE_NAME); - queueDepth = stats.queuedCount; + try { + const stats = await boss.getQueueStats(QUEUE_NAME); + queueDepth = stats.queuedCount; + } catch (err) { + if (isConnectionLoss(err)) { + await sleep(200); + continue; + } + throw err; + } await sleep(250); } })(); @@ -307,6 +339,7 @@ async function scenarioLongHorizon() { let lastCompleted = completed; while (!shuttingDown) { + try { const sampleTs = nowIso(); const monoNow = nowMonoMs(); const producer = producerLatencies.percentiles(DEFAULT_SAMPLE_WINDOW_S * 1000, monoNow); @@ -352,6 +385,13 @@ async function scenarioLongHorizon() { window_s: windowS, }); } + } catch (err) { + if (isConnectionLoss(err)) { + await sleep(200); + continue; + } + throw err; + } await sleep(sampleEveryS * 1000); } diff --git a/pgmq-bench/adapter.json b/pgmq-bench/adapter.json index 66cd41f..b842384 100644 --- a/pgmq-bench/adapter.json +++ b/pgmq-bench/adapter.json @@ -9,5 +9,6 @@ "event_indexes": [], "extensions": [ "pgmq" - ] + ], + "shutdown_grace_s": 30 } diff --git a/pgmq-bench/main.py b/pgmq-bench/main.py index 23c77af..1a49b18 100644 --- a/pgmq-bench/main.py +++ b/pgmq-bench/main.py @@ -152,9 +152,14 @@ async def scenario_long_horizon() -> None: producer_batch_max = env_int("PRODUCER_BATCH_MAX", 128) poll_interval_ms = env_int("POLL_INTERVAL_MS", 50) visibility_timeout_s = env_int("VISIBILITY_TIMEOUT_S", 30) + # Floor at 8 to keep the W>=64 path off pgmq.read(qty=1), which + # serialises the readers on FOR UPDATE and was the direct cause + # of pgmq's W=16 → W=256 throughput cliff in the 2026-05-09 sweep + # (audit_pgmq.md §4). The default formula stays — operators can + # still pin a smaller batch via CONSUMER_BATCH_SIZE. consumer_batch_size = env_int( "CONSUMER_BATCH_SIZE", - max(1, min(64, round((producer_rate * 1.25) / max(worker_count * (1000 / max(poll_interval_ms, 1)), 1)))), + max(8, min(64, round((producer_rate * 1.25) / max(worker_count * (1000 / max(poll_interval_ms, 1)), 1)))), ) db_name = database_url().rsplit("/", 1)[-1] @@ -204,8 +209,51 @@ async def scenario_long_horizon() -> None: except NotImplementedError: signal.signal(sig, lambda *_a: shutdown.set()) - producer_conn = await aconnect() - depth_conn = await aconnect() + # Mirrors pgque-bench's ReconnectingConn so per-task loops can + # transparently reconnect when chaos kills the underlying socket + # (postgres-restart, pg_terminate_backend). Without this, the + # consumer loop swallowed OperationalError and continued on a + # stale handle — the direct cause of the 0% recovery in + # chaos_postgres_restart_pgmq / chaos_pg_backend_kill_pgmq cells + # in the 2026-05-09 sweep (audit_pgmq.md §7). + class ReconnectingConn: + __slots__ = ("_conn",) + + def __init__(self, conn: psycopg.AsyncConnection) -> None: + self._conn = conn + + @property + def conn(self) -> psycopg.AsyncConnection: + return self._conn + + def cursor(self): + return self._conn.cursor() + + async def close(self) -> None: + try: + await self._conn.close() + except Exception: + pass + + async def reconnect(self) -> None: + await self.close() + for _ in range(50): + if shutdown.is_set(): + return + try: + self._conn = await aconnect() + return + except Exception: + await asyncio.sleep(0.2) + + producer_conn = ReconnectingConn(await aconnect()) + depth_conn = ReconnectingConn(await aconnect()) + + # Track in-flight claimed message ids per consumer worker so the + # shutdown path can archive any still held when SIGTERM arrives. + # Without this the recovery phase of long-running stress cells + # blocked on stale visibility-timeout entries — see audit_pgmq.md §6. + in_flight: dict[int, list[int]] = {} async def producer() -> None: nonlocal enqueued, current_producer_target_rate, current_queue_depth @@ -216,7 +264,11 @@ async def producer() -> None: current_producer_target_rate = float(target_rate) if producer_mode == "depth-target": - current_queue_depth = await queue_depth(producer_conn) + try: + current_queue_depth = await queue_depth(producer_conn.conn) + except (psycopg.OperationalError, psycopg.errors.ConnectionDoesNotExist): + await producer_conn.reconnect() + continue remaining = max(0, target_depth - current_queue_depth) batch_count = min(producer_batch_max, remaining) if batch_count <= 0: @@ -240,12 +292,16 @@ async def producer() -> None: ) started = time.monotonic() - async with producer_conn.cursor() as cur: - await cur.execute( - "SELECT * FROM pgmq.send_batch(%s, %s)", - (QUEUE_NAME, batch), - ) - await cur.fetchall() + try: + async with producer_conn.cursor() as cur: + await cur.execute( + "SELECT * FROM pgmq.send_batch(%s, %s)", + (QUEUE_NAME, batch), + ) + await cur.fetchall() + except (psycopg.OperationalError, psycopg.errors.ConnectionDoesNotExist): + await producer_conn.reconnect() + continue elapsed_ms = (time.monotonic() - started) * 1000 sample_ts = loop.time() per_msg_ms = elapsed_ms / max(len(batch), 1) @@ -257,17 +313,22 @@ async def producer() -> None: next_t += len(batch) / max(target_rate, 1) await asyncio.sleep(max(0.0, min(producer_batch_ms / 1000.0, next_t - loop.time()))) - async def consumer_task() -> None: + async def consumer_task(worker_idx: int) -> None: nonlocal completed - conn = await aconnect() + conn = ReconnectingConn(await aconnect()) + in_flight[worker_idx] = [] try: while not shutdown.is_set(): - async with conn.cursor() as cur: - await cur.execute( - "SELECT * FROM pgmq.read(queue_name => %s, vt => %s, qty => %s)", - (QUEUE_NAME, visibility_timeout_s, consumer_batch_size), - ) - rows = await cur.fetchall() + try: + async with conn.cursor() as cur: + await cur.execute( + "SELECT * FROM pgmq.read(queue_name => %s, vt => %s, qty => %s)", + (QUEUE_NAME, visibility_timeout_s, consumer_batch_size), + ) + rows = await cur.fetchall() + except (psycopg.OperationalError, psycopg.errors.ConnectionDoesNotExist): + await conn.reconnect() + continue if not rows: await asyncio.sleep(poll_interval_ms / 1000.0) continue @@ -280,16 +341,27 @@ async def consumer_task() -> None: (loop.time(), float(started_at_ms - int(message["enqueued_at_ms"]))) ) + msg_ids = [int(row["msg_id"]) for row in rows] + # Mark claimed; cleared once archive() lands. + in_flight[worker_idx] = msg_ids + if work_ms > 0: await asyncio.sleep((work_ms * len(rows)) / 1000.0) - msg_ids = [int(row["msg_id"]) for row in rows] - async with conn.cursor() as cur: - await cur.execute( - "SELECT pgmq.archive(%s, %s)", - (QUEUE_NAME, msg_ids), - ) - await cur.fetchall() + try: + async with conn.cursor() as cur: + await cur.execute( + "SELECT pgmq.archive(%s, %s)", + (QUEUE_NAME, msg_ids), + ) + await cur.fetchall() + except (psycopg.OperationalError, psycopg.errors.ConnectionDoesNotExist): + # Archive failed — leave msg_ids in `in_flight` so the + # shutdown drain path can retry, and reconnect for + # subsequent reads. + await conn.reconnect() + continue + in_flight[worker_idx] = [] completed_at_ms = int(time.time() * 1000) for row in rows: @@ -312,7 +384,11 @@ async def depth_task() -> None: await asyncio.sleep(0.25) return while not shutdown.is_set(): - current_queue_depth = await queue_depth(depth_conn) + try: + current_queue_depth = await queue_depth(depth_conn.conn) + except (psycopg.OperationalError, psycopg.errors.ConnectionDoesNotExist): + await depth_conn.reconnect() + continue await asyncio.sleep(0.25) async def sampler() -> None: @@ -377,12 +453,38 @@ async def sampler() -> None: asyncio.create_task(depth_task()), asyncio.create_task(sampler()), ] - tasks.extend(asyncio.create_task(consumer_task()) for _ in range(worker_count)) + tasks.extend( + asyncio.create_task(consumer_task(idx)) for idx in range(worker_count) + ) await shutdown.wait() for task in tasks: task.cancel() await asyncio.gather(*tasks, return_exceptions=True) + + # Drain any in-flight claimed message ids by archiving them, so + # the next phase doesn't block on stale visibility-timeout entries. + # Best-effort with a single fresh connection — failures are logged + # to stderr; the process is going down anyway. + leftover = sorted({mid for ids in in_flight.values() for mid in ids}) + if leftover: + try: + drain_conn = await aconnect() + try: + async with drain_conn.cursor() as cur: + await cur.execute( + "SELECT pgmq.archive(%s, %s)", + (QUEUE_NAME, leftover), + ) + await cur.fetchall() + finally: + await drain_conn.close() + except Exception as exc: # noqa: BLE001 + print( + f"[pgmq] shutdown drain failed for {len(leftover)} msgs: {exc}", + flush=True, + ) + await producer_conn.close() await depth_conn.close() diff --git a/river-bench/main.go b/river-bench/main.go index cec2902..0687e0e 100644 --- a/river-bench/main.go +++ b/river-bench/main.go @@ -286,10 +286,11 @@ func scenarioWorkerThroughput(ctx context.Context, pool *pgxpool.Pool, jobCount Queues: map[string]river.QueueConfig{ river.QueueDefault: {MaxWorkers: workerCount}, }, - Workers: workers, - JobTimeout: -1, - FetchCooldown: 50 * time.Millisecond, - FetchPollInterval: 50 * time.Millisecond, + Workers: workers, + JobTimeout: -1, + FetchCooldown: 50 * time.Millisecond, + FetchPollInterval: 50 * time.Millisecond, + RescueStuckJobsAfter: time.Duration(envInt("RESCUE_AFTER_SECS", 30)) * time.Second, }) if err != nil { log.Fatalf("Failed to create worker client: %v", err) @@ -334,10 +335,11 @@ func scenarioPickupLatency(ctx context.Context, pool *pgxpool.Pool, iterations i Queues: map[string]river.QueueConfig{ river.QueueDefault: {MaxWorkers: workerCount}, }, - Workers: workers, - JobTimeout: -1, - FetchCooldown: 50 * time.Millisecond, - FetchPollInterval: 50 * time.Millisecond, + Workers: workers, + JobTimeout: -1, + FetchCooldown: 50 * time.Millisecond, + FetchPollInterval: 50 * time.Millisecond, + RescueStuckJobsAfter: time.Duration(envInt("RESCUE_AFTER_SECS", 30)) * time.Second, }) if err != nil { log.Fatalf("Failed to create client: %v", err) @@ -531,8 +533,10 @@ func runLongHorizon(ctx context.Context, pool *pgxpool.Pool, workerCount int) { // Batch size for River's documented bulk-insert API // (Client.InsertManyFast: https://pkg.go.dev/github.com/riverqueue/river#Client.InsertManyFast, // docs: https://riverqueue.com/docs/batch-job-insertion). Uses Postgres - // COPY under the hood. Default 1 keeps existing row-by-row behaviour. - producerBatchMax := envInt("PRODUCER_BATCH_MAX", 1) + // COPY under the hood. Default 128 to align with the other adapters; + // pre-2026-05-09 default of 1 forced row-at-a-time inserts and was a + // likely contributor to river's ~500 jobs/s ceiling on long_horizon. + producerBatchMax := envInt("PRODUCER_BATCH_MAX", 128) if producerBatchMax < 1 { producerBatchMax = 1 } @@ -578,10 +582,11 @@ func runLongHorizon(ctx context.Context, pool *pgxpool.Pool, workerCount int) { Queues: map[string]river.QueueConfig{ river.QueueDefault: {MaxWorkers: workerCount}, }, - Workers: workers, - JobTimeout: -1, - FetchCooldown: 50 * time.Millisecond, - FetchPollInterval: 50 * time.Millisecond, + Workers: workers, + JobTimeout: -1, + FetchCooldown: 50 * time.Millisecond, + FetchPollInterval: 50 * time.Millisecond, + RescueStuckJobsAfter: time.Duration(envInt("RESCUE_AFTER_SECS", 30)) * time.Second, }) if err != nil { log.Fatalf("long_horizon: failed to create client: %v", err) diff --git a/tests/test_harness_smoke.py b/tests/test_harness_smoke.py index b72f712..a6a9156 100644 --- a/tests/test_harness_smoke.py +++ b/tests/test_harness_smoke.py @@ -799,6 +799,53 @@ def test_pool_teardown_closes_stdin_before_waiting(): stdin.close.assert_called_once() +def test_pool_stop_all_signals_in_parallel_before_waiting(): + """``stop_all`` must fan SIGTERM out to every replica before + starting to wait on any single one. Sequential signal-then-wait + would multiply teardown wall-time by replica count, blowing past + the wrapper script's per-cell ceiling on multi-replica pgmq runs + (audit_pgmq.md / Codex review of PR #27).""" + launch, _, procs = _fake_launch_fn() + pool = ReplicaPool(system="awa", capacity=4, launch_fn=launch) + pool.start_all() + + events: list[tuple[str, int]] = [] + for idx, proc in enumerate(procs): + # _fake_launch_fn's default send_signal flips poll to 0 after + # firing. Wrap it so we can record ordering — the flip behaviour + # still runs so wait() short-circuits naturally. + def make_signal_recorder(idx: int, proc: MagicMock): + def _signal(_sig: int): + events.append(("signal", idx)) + proc.returncode = 0 + proc.poll.configure_mock(return_value=0) + + return _signal + + proc.send_signal = MagicMock(side_effect=make_signal_recorder(idx, proc)) + proc.wait = MagicMock( + side_effect=lambda *a, idx=idx, **kw: events.append(("wait", idx)) or 0 + ) + + pool.stop_all() + + # All four SIGTERMs must land before any wait. The fan-out ordering + # is what gives ``stop_all`` its O(grace) wall-time guarantee: a + # sequential signal-then-wait would interleave them and multiply + # teardown time by replica count. + signal_count = sum(1 for kind, _ in events if kind == "signal") + assert signal_count == 4 + first_wait_idx = next( + (i for i, (kind, _) in enumerate(events) if kind == "wait"), + len(events), + ) + # Either no wait was needed (procs reaped immediately) or every + # wait happened after every signal. + assert all(events[i][0] == "signal" for i in range(first_wait_idx)), ( + f"signals must precede the first wait; got {events}" + ) + + # ── kill-worker / start-worker parsing + hooks ──────────────────────────