Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 63 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,35 @@ Eight Postgres-backed queues, same hardware, same harness. Three
contracts in the lineup — event bus, job queue, visibility-timeout
queue — so the throughput list isn't a single ranking. The
[2026-05-09 sweep](results/2026-05-09-full-sweep/SUMMARY.md) has the
per-cell numbers, chaos behaviour, bloat resistance, and a 6 h soak.

![Sustained throughput vs worker concurrency](results/2026-05-09-full-sweep/plots/throughput_scaling.png)
per-cell numbers, chaos behaviour, and bloat resistance.

![Peak throughput by queue contract](results/2026-05-09-full-sweep/plots/headline_throughput.png)

![Tail latency at each system's peak throughput](results/2026-05-09-full-sweep/plots/latency_at_peak.png)

Headline comparisons from that run:

- **Peak clean throughput:** pgque 39.9 k jobs/s in single-consumer
event-bus mode; awa 14.2 k as the fastest full job queue; pgmq
11.3 k as a visibility-timeout queue before anti-scaling at higher
worker counts.
- **Chaos recovery:** awa, pgque, and river recover from every
scenario. The other five adapters either hit zero or fail to
produce recovery samples in at least one chaos cell.
- **Bloat / pressure:** five adapters time out under at least one
sustained-pressure cell; only awa, oban, and pgque complete all
four pressure scenarios.

| System | Contract | Chaos recovery | Pressure cells | Notable caveat |
|---|---|---:|---:|---|
| awa | job queue | 5/5 | 4/4 | Full job-queue feature surface; fastest job queue in this run. |
| pgque | event/message bus | 5/5 | 4/4 | Single-consumer mode; batched success ack is a different contract. |
| river | job queue | 5/5 | 2/4 | Times out in two sustained-pressure cells. |
| oban | job queue | 4/5 | 4/4 | Handles pressure cells but has lower throughput in this run. |
| pg-boss | job queue | 3/5 | 2/4 | Postgres-level chaos exits the worker; times out in two pressure cells. |
| absurd | job queue | 3/5 | 2/4 | Shutdown timeout under pressure. |
| procrastinate | job queue | 3/5 | 2/4 | Weak repeated-kill recovery; times out in two pressure cells. |
| pgmq | visibility-timeout queue | 3/5 | 2/4 | Anti-scales past 16 workers and has the active-readers cliff. |

## Feature comparison

Expand All @@ -29,25 +55,44 @@ distribution.
| | awa | Absurd | pg-boss | pgmq | pgque | Oban | Procrastinate | River |
|---|:-:|:-:|:-:|:-:|:-:|:-:|:-:|:-:|
| **Language / runtime** | Rust + Python | Python | Node.js | Postgres extension (Rust core) | Postgres extension (PL/pgSQL) | Elixir | Python | Go |
| **Postgres extension required** | no | no | no | yes (`pgmq`) | optional (`pg_cron` for `pgque.start()`) | no | no | no |
| **Producer surface — bulk insert** | ✓ | — | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ (COPY) |
| **Postgres extension required** | no | no | no | yes[^pgmq-extension] | optional[^pgque-cron] | no | no | no |
| **Producer surface — bulk insert** | ✓ | — | ✓ | ✓ | ✓ | ✓ | ✓ | ✓[^river-copy] |
| **Storage shape on hot path** | append-only + receipt ring | row-mutating | row-mutating | partitioned archive | append-only + ticker | row-mutating | row-mutating | row-mutating |
| **Priorities** | ✓ (with aging) | — | ✓ | — | — | ✓ | ✓ | ✓ |
| **Retries with backoff** | ✓ | ✓ | ✓ | (visibility timeout) | ✓ | ✓ | ✓ | ✓ |
| **Cron / scheduled jobs** | ✓ | — | ✓ | — | (delayed) | ✓ | ✓ | ✓ |
| **Dead-letter queue** | ✓ (opt-in) | — | (failed-archive) | (archive table) | ✓ | (discarded) | (discarded) | ✓ |
| **Unique jobs / dedup** | ✓ | — | ✓ (singleton key) | — | — | ✓ | ✓ | ✓ |
| **Rate limiting per queue** | ✓ | — | ✓ (throttling) | — | — | ✓ (Pro for global) | (concurrency limit) | ✓ |
| **Callbacks / external waits** | ✓ | (workflow steps) | (event subscription) | — | — | — | — | — |
| **Web UI for ops** | ✓ (`awa serve`) | — | (3rd party: pgboss-dashboard) | — | — | ✓ (Oban Web, Pro) | (3rd party) | ✓ |
| **Priorities** | ✓[^awa-priority-aging] | — | ✓ | — | — | ✓ | ✓ | ✓ |
| **Retries with backoff** | ✓ | ✓ | ✓ | ✓[^pgmq-vt] | ✓ | ✓ | ✓ | ✓ |
| **Cron / scheduled jobs** | ✓ | — | ✓ | — | ✓[^pgque-delayed] | ✓ | ✓ | ✓ |
| **Dead-letter queue** | ✓[^awa-dlq] | — | ✓[^pgboss-failed-archive] | ✓[^pgmq-archive] | ✓ | ✓[^discarded-state] | ✓[^discarded-state] | ✓ |
| **Unique jobs / dedup** | ✓ | — | ✓[^pgboss-singleton] | — | — | ✓ | ✓ | ✓ |
| **Rate limiting per queue** | ✓ | — | ✓[^pgboss-throttling] | — | — | ✓[^oban-pro-rate-limit] | ✓[^procrastinate-concurrency] | ✓ |
| **Callbacks / external waits** | ✓ | ✓[^absurd-workflow-steps] | ✓[^pgboss-events] | — | — | — | — | — |
| **Web UI for ops** | ✓[^awa-serve] | — | —[^pgboss-dashboard] | — | — | —[^oban-web] | —[^procrastinate-third-party-ui] | ✓ |

[^pgmq-extension]: pgmq can also be installed as SQL, but the benchmark and the common packaged distribution use the `pgmq` Postgres extension.
[^pgque-cron]: pgque itself is PL/pgSQL. `pg_cron` is needed for the convenience `pgque.start()` ticker; callers may drive the ticker themselves instead.
[^river-copy]: River's fast bulk path uses the Postgres `COPY` protocol.
[^awa-priority-aging]: awa priorities include aging so lower-priority work is eventually promoted.
[^pgmq-vt]: pgmq is a visibility-timeout queue: redelivery is controlled by the visibility timeout rather than a job-framework retry policy with counted attempts and backoff.
[^pgque-delayed]: pgque supports delayed visibility, but not cron-style periodic scheduling.
[^awa-dlq]: awa DLQ routing is opt-in via `dlq_enabled_by_default` or a per-queue override.
[^pgboss-failed-archive]: pg-boss keeps failed/expired job history rather than exposing a separate DLQ queue abstraction.
[^pgmq-archive]: pgmq archives messages into queue-specific archive tables; that is retention/replay storage rather than a job-framework DLQ policy.
[^discarded-state]: Oban and Procrastinate retain exhausted failures in discarded/failed states rather than moving them to a separate queue table.
[^pgboss-singleton]: pg-boss deduplication is expressed through singleton keys and singleton windows.
[^pgboss-throttling]: pg-boss rate limiting is exposed as throttling.
[^oban-pro-rate-limit]: Oban OSS supports local queue limits; global rate limiting is an Oban Pro feature.
[^procrastinate-concurrency]: Procrastinate can limit concurrency with locks/queueing policy, but does not expose a named per-queue rate-limit primitive.
[^absurd-workflow-steps]: Absurd models external waits as durable workflow steps rather than queue-level callbacks.
[^pgboss-events]: pg-boss exposes job lifecycle events/subscriptions rather than durable external-wait callbacks.
[^awa-serve]: awa includes the `awa serve` ops UI.
[^pgboss-dashboard]: pg-boss has third-party dashboards such as `pgboss-dashboard`, not an official bundled UI.
[^oban-web]: Oban Web is part of Oban Pro.
[^procrastinate-third-party-ui]: Procrastinate has community/third-party admin surfaces rather than a bundled official UI.

Dashes indicate "not provided as a documented feature out of the box",
not "impossible". pgmq / pgque in particular are intentionally minimal
— you build the worker, you choose the lifecycle. "opt-in" on awa's
DLQ row means jobs are routed there only when the queue's
`dlq_enabled_by_default` (or per-queue override) is set. If you spot
something wrong, please open a PR — corrections welcome from the
maintainers of any of the systems listed.
— you build the worker, you choose the lifecycle. If you spot something
wrong, please open a PR — corrections welcome from the maintainers of
any of the systems listed.

## What's in the lineup

Expand Down
11 changes: 0 additions & 11 deletions bench_harness/plots.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ class PlotSpec:
title: str
filename_stem: str
y_label: str
log_scale: bool = False
use_raw_underlay: bool = False
sum_by_subject: bool = True # aggregate across subject per system
subject_kind: str | None = None # filter to a kind (e.g. "table")
Expand All @@ -101,7 +100,6 @@ class PlotSpec:
title="Claim p99 latency",
filename_stem="claim_p99",
y_label="claim p99 latency (ms)",
log_scale=False,
use_raw_underlay=True,
subject_kind="adapter",
sum_by_subject=False,
Expand All @@ -110,7 +108,6 @@ class PlotSpec:
title="Producer p99 latency",
filename_stem="producer_p99",
y_label="producer p99 latency (ms)",
log_scale=False,
use_raw_underlay=True,
subject_kind="adapter",
sum_by_subject=False,
Expand All @@ -119,7 +116,6 @@ class PlotSpec:
title="Producer call p99 latency",
filename_stem="producer_call_p99",
y_label="producer call p99 latency (ms)",
log_scale=False,
use_raw_underlay=True,
subject_kind="adapter",
sum_by_subject=False,
Expand All @@ -128,7 +124,6 @@ class PlotSpec:
title="Subscriber p99 latency",
filename_stem="subscriber_p99",
y_label="subscriber p99 latency (ms)",
log_scale=False,
use_raw_underlay=True,
subject_kind="adapter",
sum_by_subject=False,
Expand All @@ -137,7 +132,6 @@ class PlotSpec:
title="End-to-end p99 latency",
filename_stem="end_to_end_p99",
y_label="end-to-end p99 latency (ms)",
log_scale=False,
use_raw_underlay=True,
subject_kind="adapter",
sum_by_subject=False,
Expand Down Expand Up @@ -350,7 +344,6 @@ def _setup_axes(
phases: Iterable[Phase],
title: str,
y_label: str,
log_scale: bool,
) -> None:
phase_list = list(phases)
total_s = sum(p.duration_s for p in phase_list) or 1
Expand Down Expand Up @@ -384,8 +377,6 @@ def _fmt(x, _pos):
ax.xaxis.set_major_formatter(plt.FuncFormatter(_fmt))
ax.set_xlabel("elapsed time")
ax.set_xlim(0, total_s)
if log_scale:
ax.set_yscale("log")
ax.grid(True, axis="y", linestyle=":", color="#999", alpha=0.4)
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
Expand Down Expand Up @@ -430,7 +421,6 @@ def render_plot(
phases=phases,
title=spec.title,
y_label=spec.y_label,
log_scale=spec.log_scale,
)
fig.text(
0.5,
Expand Down Expand Up @@ -658,7 +648,6 @@ def render_faceted_dead_tuples(
phases=phases,
title=panel_title,
y_label="n_dead_tup",
log_scale=False,
)
subject_series: list[tuple[str, list[tuple[float, float]], float]] = []
subjects = {
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified results/2026-05-09-full-sweep/plots/throughput_scaling.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
174 changes: 167 additions & 7 deletions results/2026-05-09-full-sweep/scripts/render_plots.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
"""Render headline plots for the 2026-05-09 full sweep from matrix.csv.

Produces (when source data is present):
- headline_throughput.png — peak clean completion_rate by contract
- latency_at_peak.png — e2e p99 latency at each system's peak cell
- throughput_scaling.png — Phase A peak completion_rate per system across W
- chaos_summary.png — Phase B 8x5 heatmap of recovery/baseline ratio
- bloat_summary.png — Phase C 8x4 heatmap of stress/clean ratio
Expand Down Expand Up @@ -39,6 +41,18 @@ def load_matrix():
VT_QUEUES = ["pgmq"]
EVENT_BUSES = ["pgque"]

CONTRACT = {
**{s: "Job queue" for s in JOB_QUEUES},
**{s: "Visibility-timeout queue" for s in VT_QUEUES},
**{s: "Event/message bus" for s in EVENT_BUSES},
}

CONTRACT_COLOR = {
"Job queue": "#4E79A7",
"Visibility-timeout queue": "#59A14F",
"Event/message bus": "#E377C2",
}

LINE_STYLE = {
**{s: "-" for s in JOB_QUEUES},
**{s: ":" for s in VT_QUEUES},
Expand All @@ -56,9 +70,7 @@ def fnum(s):


# ── Phase A throughput scaling ───────────────────────────────────────
def plot_throughput_scaling(rows):
# We need clean-phase median completion_rate per (system, worker_count)
# using only Phase A cells.
def phase_a_completion_series(rows):
series = defaultdict(dict) # system -> {workers: rate}
for r in rows:
if r.get("phase") != "A":
Expand All @@ -74,6 +86,153 @@ def plot_throughput_scaling(rows):
if rate is None:
continue
series[sys_name][w] = rate
return series


def phase_a_peak_rows(rows):
peaks = {}
for r in rows:
if r.get("phase") != "A":
continue
if r.get("phase_type") != "clean":
continue
rate = fnum(r.get("completion_rate_median"))
if rate is None:
continue
system = r.get("system")
if system not in peaks or rate > peaks[system][0]:
peaks[system] = (rate, r)
return {system: row for system, (_, row) in peaks.items()}


def plot_headline_throughput(rows):
peak_rows = phase_a_peak_rows(rows)
peaks = []
for system, row in peak_rows.items():
rate = fnum(row.get("completion_rate_median"))
worker = int(row.get("worker_count") or 0)
peaks.append((rate, system, worker))
if not peaks:
return

peaks.sort()
systems = [system for _, system, _ in peaks]
values = [rate for rate, _, _ in peaks]
workers = [worker for _, _, worker in peaks]
colors = [CONTRACT_COLOR[CONTRACT[system]] for system in systems]

fig, ax = plt.subplots(figsize=(10, 5.8))
y = np.arange(len(systems))
ax.barh(y, values, color=colors, alpha=0.9)
ax.set_yticks(y)
ax.set_yticklabels([s.replace("pgboss", "pg-boss") for s in systems])
ax.set_xlabel("peak median completion rate (jobs/s)")
ax.set_title("Peak throughput by queue contract")
ax.set_xlim(0, max(values) * 1.16)
ax.grid(True, axis="x", alpha=0.25)
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)

for yi, value, worker in zip(y, values, workers, strict=True):
ax.text(
value + max(values) * 0.012,
yi,
f"{value:,.0f} @ 1x{worker}w",
va="center",
fontsize=9,
)

handles = [
plt.Line2D([0], [0], marker="s", linestyle="", markersize=9, color=color, label=label)
for label, color in CONTRACT_COLOR.items()
]
ax.legend(handles=handles, loc="lower right", frameon=True)
ax.text(
0,
-0.16,
"pgque is shown in the benchmark's single-consumer competing-consumers mode; native fan-out is a different contract.",
transform=ax.transAxes,
fontsize=9,
color="#555",
)
fig.tight_layout(rect=[0, 0.04, 1, 1])
fig.savefig(PLOTS / "headline_throughput.png", dpi=140)
plt.close(fig)


def plot_latency_at_peak(rows):
peak_rows = phase_a_peak_rows(rows)
records = []
missing = []
for system, row in peak_rows.items():
rate = fnum(row.get("completion_rate_median"))
latency = fnum(row.get("end_to_end_p99_ms_median"))
worker = int(row.get("worker_count") or 0)
if latency is None:
missing.append(system.replace("pgboss", "pg-boss"))
continue
records.append((rate, system, worker, latency))
if not records:
return

records.sort()
systems = [system for _, system, _, _ in records]
latencies = [latency for _, _, _, latency in records]
rates = [rate for rate, _, _, _ in records]
workers = [worker for _, _, worker, _ in records]
colors = [CONTRACT_COLOR[CONTRACT[system]] for system in systems]

cap_ms = 2_000
clipped = [min(latency, cap_ms) for latency in latencies]
fig, ax = plt.subplots(figsize=(10, 5.4))
y = np.arange(len(systems))
for yi, value, latency, color in zip(y, clipped, latencies, colors, strict=True):
ax.hlines(yi, 0, value, color=color, linewidth=3, alpha=0.75)
marker = ">" if latency > cap_ms else "o"
ax.plot(value, yi, marker=marker, color=color, markersize=8)
ax.set_yticks(y)
ax.set_yticklabels([s.replace("pgboss", "pg-boss") for s in systems])
ax.set_xlabel("median end-to-end p99 latency at peak throughput (ms)")
ax.set_title("Tail latency at each system's peak throughput")
ax.set_xlim(0, cap_ms * 1.12)
ax.grid(True, axis="x", alpha=0.25)
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)

for yi, value, latency, rate, worker in zip(
y, clipped, latencies, rates, workers, strict=True
):
label = f"{latency:,.0f} ms, {rate:,.0f}/s @ 1x{worker}w"
if latency > cap_ms:
ax.text(
cap_ms - cap_ms * 0.025,
yi,
label,
va="center",
ha="right",
fontsize=9,
)
else:
ax.text(value + cap_ms * 0.025, yi, label, va="center", fontsize=9)

handles = [
plt.Line2D([0], [0], marker="s", linestyle="", markersize=9, color=color, label=label)
for label, color in CONTRACT_COLOR.items()
]
ax.legend(handles=handles, loc="upper right", frameon=True)
note = "Linear axis clipped at 2s; arrows mark clipped outliers."
if missing:
note += " No end-to-end p99 emitted at peak for: " + ", ".join(sorted(missing)) + "."
ax.text(0, -0.16, note, transform=ax.transAxes, fontsize=9, color="#555")
fig.tight_layout(rect=[0, 0.04, 1, 1])
fig.savefig(PLOTS / "latency_at_peak.png", dpi=140)
plt.close(fig)


def plot_throughput_scaling(rows):
# We need clean-phase median completion_rate per (system, worker_count)
# using only Phase A cells.
series = phase_a_completion_series(rows)
if not series:
return
fig, ax = plt.subplots(figsize=(9, 6))
Expand All @@ -84,13 +243,12 @@ def plot_throughput_scaling(rows):
xs = [w for w in ws if w in series[s]]
ys = [series[s][w] for w in xs]
ax.plot(xs, ys, LINE_STYLE.get(s, "-"), marker="o", label=s, linewidth=2)
ax.set_xscale("log", base=2)
ax.set_yscale("log")
ax.set_xticks(ws)
ax.get_xaxis().set_major_formatter(plt.matplotlib.ticker.ScalarFormatter())
ax.set_xlim(0, max(ws) + 12)
ax.set_ylim(bottom=0)
ax.set_xlabel("worker count (1× replica)")
ax.set_ylabel("completion rate (jobs/s, median during clean)")
ax.set_title("Phase A — throughput scaling (depth-target=4000, producer-rate=50000)")
ax.set_title("Clean throughput scaling (depth-target=4000, producer-rate=50000)")
ax.grid(True, which="both", alpha=0.3)
ax.legend(loc="lower right", ncol=2)
fig.tight_layout()
Expand Down Expand Up @@ -318,6 +476,8 @@ def plot_soak_dead_tuples():

def main():
rows = load_matrix()
plot_headline_throughput(rows)
plot_latency_at_peak(rows)
plot_throughput_scaling(rows)
plot_chaos_summary(rows)
plot_bloat_summary(rows)
Expand Down
Loading
Loading