feat: per-merge-region streaming engine — multi-RG inputs + outputs (PR-6c.2)#6410
feat: per-merge-region streaming engine — multi-RG inputs + outputs (PR-6c.2)#6410g-talbot wants to merge 8 commits into
Conversation
6226032 to
add52f0
Compare
bcaa08b to
5d486d5
Compare
add52f0 to
e1990b2
Compare
5d486d5 to
3ee8ef9
Compare
e1990b2 to
2af2fa8
Compare
3ee8ef9 to
307a981
Compare
2af2fa8 to
b2eee32
Compare
307a981 to
0f6892a
Compare
b2eee32 to
85b679a
Compare
0f6892a to
4ce95e5
Compare
85b679a to
38d4763
Compare
4ce95e5 to
ebd9683
Compare
38d4763 to
6adf05d
Compare
ebd9683 to
b6842c9
Compare
3c6e227 to
83153a2
Compare
b6842c9 to
bf75201
Compare
83153a2 to
f0a2d99
Compare
bf75201 to
0a168c1
Compare
f0a2d99 to
f748b94
Compare
0a168c1 to
bd639ce
Compare
f748b94 to
618e366
Compare
bd639ce to
9d805a7
Compare
618e366 to
ecd10a2
Compare
9d805a7 to
9651c63
Compare
ecd10a2 to
26b9257
Compare
9651c63 to
339d0eb
Compare
|
@codex review |
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 339d0eb6f7
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
91a0a77 to
f2588e6
Compare
…PR-6c.2) Restructures PR-6b.2's flat phase 0 → phase 3 into a per-merge-region loop. Unlocks multi-RG metric-aligned input support and produces multi-RG output naturally — one output row group per merge region (typically one per metric_name when `rg_partition_prefix_len == 1`). ## Architecture Sort-prefix alignment (`prefix_len >= 1`) guarantees that any merge region has AT MOST one row group per input. That single invariant unlocks the restructure: 1. Pre-compute regions from RG metadata. For `prefix_len >= 1`, read each RG's metric_name min stat (must equal max — verifies metric-alignment). Group RGs across inputs by prefix_key. Sort regions by prefix_key. For `prefix_len == 0` (single-RG inputs only, validated earlier), one region covers everything. 2. Assign regions to output files by cumulative row count. Caller's `num_outputs` preserved as the upper bound. Each output file gets a contiguous slice of the region list, so output files have non-overlapping key ranges. 3. Per-region processing: for each region, advance contributing inputs' decoders through their RGs (drain sort cols of that RG, then stream body cols via the existing page-bounded BodyColOutputPageAssembler). Each region becomes one output RG in the current writer; when the assignment moves to a new output file, close the previous writer and open a new one. The streaming body-col mechanism from PR-6b.2 (arrow::compute:: interleave + handle.block_on driven decoder) is unchanged; it just runs over smaller row ranges (one region instead of one whole output). ## Single-RG-input restriction lifted PR-6b.2's check that rejected any multi-RG input is replaced with: reject only `prefix_len == 0` AND multi-RG (those still need PR-5's LegacyMultiRGAdapter). Multi-RG metric-aligned inputs are now accepted natively. ## Per-output schema simplification PR-6b.2 optimised the per-output schema based on per-output sort col data (drop all-null cols, re-dict-encode low-cardinality strings). With per-region streaming we don't know each region's content until we drain it, so PR-6c.2 declares the writer's schema as the full union schema and leaves output strings as Utf8. Per-output dict re-encoding can be reintroduced later by tracking cardinality during the streaming pass. ## Tests - All 9 PR-6b.2 tests still pass (single-RG input regression — behaviour preserved). - New test_multi_rg_metric_aligned_input_produces_multi_rg_output: feeds a 2-RG metric-aligned input (prefix_len = 1, RG 0 = cpu.usage, RG 1 = memory.used); the streaming engine accepts it and produces a 2-RG output (one RG per metric_name region). - Renamed test_multi_rg_input_rejected → test_legacy_multi_rg_input_rejected to reflect the new rejection scope (only prefix_len == 0 multi-RG is rejected; metric-aligned is accepted). 10/10 streaming tests pass. Clippy, doc, machete, fmt all clean. ## Follow-ups deferred 1. File-size cap with sort-key-boundary splits. 2. Per-output schema optimisation (track region body-col cardinality during the streaming pass). 3. Mid-region splits at sorted_series transitions for finer-grained M:N control when callers want more outputs than regions. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
f2588e6 to
d93f8e6
Compare
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: d93f8e6c65
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
Two P1 bugs flagged by Codex on PR-6c.2 (#6410): 1. **Duplicate input row groups silently dropped.** When one input contained two RGs with the same composite prefix key, `process_region` overwrote `sort_col_batches[input_idx]` while `Region::total_rows` still counted both — losing rows and misaligning the body-col / sort-col mapping. Now enforce at-most-one-RG-per-input-per-prefix as a strong invariant at three sites: the merge read path (`extract_regions_from_metadata`), the streaming merge output finalize, and the indexing writer (`ParquetWriter::write_to_bytes` / `write_to_file_with_metadata`). The new `assert_unique_rg_prefix_keys` helper is shared. 2. **Byte-array prefix encoding broke lex order across lengths.** The 4-byte length prefix made `"b"` sort before `"aa"`, violating the declared ASC order. Switched to byte-stuffed escape encoding (`0x00` → `0x00 0x01`, terminator `0x00 0x00`), which preserves single-column lex order AND retains unambiguous concatenation for composite keys (the terminator is the smallest 2-byte sequence under escaping, so shorter values still sort before longer ones with the same prefix). Tests: - `test_byte_array_prefix_preserves_lex_order_across_lengths` — `"aa" < "b"`, empty < non-empty, shared-prefix shorter < longer, null-byte escaping preserves order. - `test_streaming_merge_rejects_duplicate_prefix_rgs_in_one_input` — end-to-end bail with clear error. - `test_write_to_bytes_rejects_duplicate_rg_prefix_when_claimed_aligned` + the `write_to_file` and single-RG positive counterparts. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 05dfba44c3
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| return Ok(vec![Region { | ||
| prefix_key: Vec::new(), | ||
| contributing, | ||
| }]); |
There was a problem hiding this comment.
Preserve num_outputs on the prefix_len=0 path
When the inputs have rg_partition_prefix_len == 0 (the normal single-row-group/legacy path), this branch always returns exactly one region. The outer merge loop now assigns files only at region boundaries, so num_outputs > 1 can never produce more than one output even when the merged data contains many sorted_series split points; the previous implementation used compute_output_boundaries to split the merge order within this case. Compactions of ordinary single-RG inputs requesting multiple outputs will silently create one oversized output file.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Fixed in c85bb8e — taking the broader approach we discussed offline: the prefix_len=0 path now synthesizes prefix-aligned regions during the merge rather than restoring the old compute_output_boundaries split.
Specifically: for rg_partition_prefix_len == 0 inputs, the engine drains all sort cols upfront, computes the global k-way merge order, and walks runs to find first-sort-col (e.g. metric_name) transitions. Each transition opens a synthesized region with per-input row ranges; the output writer materialises one parquet RG per region. The output is therefore prefix-aligned and declares rg_partition_prefix_len = 1 so future compactions take the fast streaming path.
Multi-output splitting falls out for free — assign_regions_to_output_files distributes the synthesized regions across files balancing row count. Regression test test_prefix_len_zero_multi_output_splits_at_prefix_transitions covers the exact num_outputs > 1 → multiple files case you flagged (6 metric_names × 50 rows + num_outputs = 3 produces 3 outputs, each declaring prefix_len = 1). Companion test test_prefix_len_zero_single_output_is_prefix_aligned_multi_rg covers num_outputs = 1 producing multi-RG output.
Plumbing: Region::contributing carries per-input start_row/num_rows so a single RG can be sliced across adjacent synthesized regions; InputDecoderState switched to per-parquet-col page cache + cursor (a HashMap keyed by col_idx) so reads of a later col that pull leftover earlier-col pages into the stream still cache them for the next synthesized region.
There was a problem hiding this comment.
Followup to my previous reply: the synthesis-at-metric_name approach was wrong on two counts (caught in review): the engine was declaring prefix_len = 1 on output regardless of input, and the synthesized boundaries couldn't honor num_outputs > 1 for a giant single-metric input (no metric_name transitions to split at).
Replaced in b642371 with a sub-region splitter that breaks at sorted_series transitions whenever a region's row count would push the current output past total_rows / num_outputs, never inside a single sorted_series run. Output rg_partition_prefix_len now inherits from InputMetadata unchanged — the engine no longer declares a prefix it can't unconditionally guarantee.
New tests:
test_prefix_len_zero_multi_output_splits_at_sorted_series— original P2 case (6 metrics × 50 rows +num_outputs = 3→ 3 balanced outputs).test_prefix_len_zero_giant_single_metric_splits_into_multiple_outputs— 1 metric × 200 rows +num_outputs = 2→ 2 outputs split at sorted_series within the one metric (the case the previous fix missed).test_prefix_len_zero_single_output_is_single_rg— no split needed, one output, one RG, prefix KV absent.
…regions Codex P2 on PR-6c.2 (#6410): for `rg_partition_prefix_len == 0` inputs, `extract_regions_from_metadata` returned a single region spanning all inputs, and the region-to-output assigner could only split work at region boundaries. `num_outputs > 1` therefore silently produced one oversized file. The previous (non-streaming) engine sidestepped this with `compute_output_boundaries`, splitting the merge order at `sorted_series` transitions. Rather than restore the legacy split, unify the two paths: for prefix_len=0 inputs the engine now drains all sort cols upfront, computes the global k-way merge order, and walks runs to find first-sort-col (e.g. `metric_name`) transitions. Each transition opens a new synthesized region carrying per-input row ranges; the output writer materialises one parquet row group per region. The output is therefore prefix-aligned (each RG carries a single first- sort-col value) and the writer advertises `rg_partition_prefix_len = 1` so future compactions take the fast streaming path. Multi-output splitting falls out for free: the existing `assign_regions_to_output_files` distributes synthesized regions across files balancing row count. Mechanical pieces: - `Region::contributing` becomes `Vec<RegionContribution>` with `start_row` per input, so a single RG can be sliced across multiple adjacent regions. - `InputDecoderState` switches from one-active-col page cache + cursor to per-parquet-col HashMaps. `advance_decoder_to_row` now stores pages under their actual `col_idx`, so reads of a later col that pull leftover earlier-col pages into the stream keep them cached for the next synthesized region to consume. - `set_body_col_cursor` replaces `reset_body_col_state`: it positions the cursor and drops only pages strictly below it, preserving rows that still belong to future regions. - `process_region` accepts an optional `prefetched_sort_batches` argument. When supplied (the synthesized path) it slices the pre- drained sort batches by each contribution's row range; when `None` (the existing prefix_len>0 path) it drains a fresh whole-RG batch from the decoder, same as before. Tests: - `test_prefix_len_zero_multi_output_splits_at_prefix_transitions` — 6 metric_names × 50 rows + `num_outputs = 3` produces 3 output files; each declares `rg_partition_prefix_len = 1`. - `test_prefix_len_zero_single_output_is_prefix_aligned_multi_rg` — 3 metric_names + `num_outputs = 1` produces one file with 3 RGs, declaring `rg_partition_prefix_len = 1`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…sizing prefix Per review feedback: the prior commit incorrectly conflated "fix the prefix_len=0 multi-output regression" with "make every legacy output prefix-aligned at the engine layer." Two issues with that approach: 1. The engine was declaring `rg_partition_prefix_len = 1` on outputs regardless of what the inputs claimed. That's a property the engine guessed and forced; it should come from the inputs (or from a future legacy-converter layer that translates legacy inputs into prefix-aligned form before the engine sees them). 2. Synthesizing regions at first-sort-col transitions doesn't honor `num_outputs` when there are no such transitions — a giant single metric with `prefix_len = 0` would still collapse into one output even with `num_outputs = 3`. New strategy: keep the per-col page cache and `RegionContribution` row-range plumbing — both are still needed — but replace the metric_name-transition synthesis with a sub-region splitter that breaks at `sorted_series` transitions whenever a region's row count would push the current output past `target_per_output = total_rows / num_outputs`. Splitting happens at run boundaries only — never inside a `sorted_series` run — so a single huge run lands in one output regardless of size. The output's `rg_partition_prefix_len` is inherited from `InputMetadata` unchanged. Main-loop shape: for each top-level region the engine first asks "would this region fit in the current output's remaining budget?" If yes, `process_region` drains internally as before (per-region memory bound preserved). If not, pre-drain the region's sort cols, compute its merge order, call `split_region_at_sorted_series` to get sub-regions, then process each sub-region with the prefetched batches. Between top-level regions we now `reset_all_body_col_state` on every input — different RGs have overlapping row-index spaces and their cached pages would collide. Sub-regions of one top-level region share an RG and keep the cache; this is what lets an earlier col's stream-tail get cached during a later col's read and remain available for the next sub-region. Tests rewritten: - `test_prefix_len_zero_multi_output_splits_at_sorted_series` — 6 metrics × 50 rows, `num_outputs = 3` → 3 outputs balanced near 100 rows each. Asserts the output's `rg_partition_prefix_len` KV is absent (inherits input's 0). - `test_prefix_len_zero_giant_single_metric_splits_into_multiple_outputs` — covers the giant-metric case: 1 metric × 200 rows, `num_outputs = 2` → 2 outputs, splitting at `sorted_series` transitions inside the single metric. - `test_prefix_len_zero_single_output_is_single_rg` — `num_outputs = 1` produces a single output with a single RG and no prefix KV. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The legacy adapter previously consolidated multi-RG legacy inputs into a single oversized row group and left `rg_partition_prefix_len` at the original's (typically `0`). The streaming merge engine then sent these single-RG/prefix=0 inputs through the new sub-region splitting path — correct, but it forfeits the prefix-aware fast path for outputs derived from legacy inputs and gives up the row-group pruning that prefix alignment enables. After consolidating, the adapter now slices the resulting record batch at first-sort-col transitions (typically `metric_name`) and emits one parquet row group per slice, stamping the re-encoded file with `qh.rg_partition_prefix_len = 1`. The merge engine then reads it through the prefix-aware fast path: one region per metric_name, the existing duplicate-prefix invariant on read validates uniqueness. Fallback: if the original file has no `qh.sort_fields` KV, the sort-fields string fails to parse, the first column can't be resolved in the arrow schema, or the consolidated batch is empty, the adapter reverts to a single-RG re-encode without claiming any prefix alignment. That input still works — the engine's prefix_len=0 sub-region splitting path picks it up. This keeps the adapter robust for files written by very early versions of the indexer that may pre-date the standard KV layout. Implementation: `reencode_prefix_aligned` replaces `reencode_as_single_row_group` and either dispatches to the new multi-RG writer or to the legacy single-RG writer based on whether the first sort col is resolvable. `RowConverter` handles the prefix-value equality check uniformly across dictionary, utf8, and primitive types. The KV injection helper replaces (rather than appends) any existing `qh.rg_partition_prefix_len` so re-runs and files mistakenly carrying a stale value still land at the freshly synthesized prefix. Tests: - `test_legacy_input_with_sort_fields_produces_prefix_aligned_multi_rg` — 3 metrics × 40 rows, multi-RG input → 3 prefix-aligned output RGs and `qh.rg_partition_prefix_len = 1` KV. - `test_legacy_input_single_metric_yields_one_rg_with_prefix_kv` — one metric → one RG, prefix KV still stamped (vacuously aligned). - `test_legacy_input_without_sort_fields_falls_back_to_single_rg` — fallback path preserved when sort-fields KV is missing. - All existing tests pass unchanged (they use empty KVs or unparseable sort-fields strings, both of which exercise the fallback path). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…e-prefix support `LegacyInputAdapter::try_open` now takes `target_prefix_len: u32` chosen by the caller, matching the merge plan's consensus prefix length. The adapter slices the consolidated batch at every transition of the first N sort columns (composite key, via `RowConverter` over all N fields) and emits one output row group per slice, stamping the output with `qh.rg_partition_prefix_len = target_prefix_len`. With `target_prefix_len = 0` the adapter takes the original single-RG passthrough path with no prefix-alignment claim. A sort column that is named in `qh.sort_fields` but missing from the file's arrow schema is treated as implicitly null at every row per SS-3. A constantly-null column trivially satisfies alignment on that column (null == null) and contributes no transitions, so the split boundaries are driven by the columns that are present. This matches the merge engine's compaction-time treatment of missing columns and keeps a legacy file with an evolved schema usable as a prefix-aligned input. `PrefixUnresolvable` now fires only on cases where the file doesn't advertise enough sort *names* to honor the request: - `qh.sort_fields` absent or unparseable - `qh.sort_fields` declares fewer sort columns than `target_prefix_len` A column missing from the arrow schema no longer counts as unresolvable; the adapter materialises a `NullArray` of the batch's length in that slot and proceeds. Tests: - `test_target_prefix_len_zero_passes_through_as_single_rg` — explicit N=0 fallback, no prefix KV stamped. - `test_target_prefix_len_two_splits_by_metric_and_service` — composite prefix (`metric_name`, `service`) → 4 RGs, KV declares prefix_len=2. - `test_target_prefix_len_one_without_sort_fields_returns_unresolvable` — no `qh.sort_fields` KV → `PrefixUnresolvable`. - `test_target_prefix_len_exceeds_declared_sort_cols_returns_unresolvable` — sort schema declares 2 cols, caller asks 3 → `PrefixUnresolvable`. - `test_missing_prefix_col_treated_as_null_satisfies_alignment` — sort schema declares `metric_name|env|-timestamp_secs` but `env` is absent from the arrow schema → no error, only metric_name transitions split RGs, KV still stamps prefix_len=2. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…KV stamp The streaming merge engine produces sort-prefix-aligned multi-RG output and stamps `qh.rg_partition_prefix_len = input_meta.rg_partition_prefix_len` in the file's KV (verified by `assert_unique_rg_prefix_keys` before close). `merge_parquet_split_metadata` then ran after and unconditionally demoted to 0 whenever `output.num_row_groups > 1` — breaking CS-1 (metastore must mirror on-disk KV) for every multi-RG streaming-engine output. Aligned splits got tagged 0 in the metastore on every merge and leaked out of the prefix-aligned compaction bucket on the next pass. Carry the value the writer actually stamped via a new `MergeOutputFile.output_rg_partition_prefix_len` field, then propagate it as-is in metadata aggregation. Both engines populate the field: - Legacy `merge/writer.rs` reports its demoted value (row-count-driven RG boundaries can't honor prefix alignment, so it stamps 0 on multi-RG). - Streaming `merge/streaming/output.rs` reports the inputs' prefix unchanged (it splits at prefix transitions and the writer verifies). CS-1 holds by construction — same source of truth, no re-derivation. Tests: - `test_output_prefix_len_demoted_when_multi_rg` → renamed to `test_output_prefix_len_carries_writers_value_when_demoted`; now asserts that the metastore mirrors the writer's reported value. - New `test_output_prefix_len_preserved_on_multi_rg_streaming_engine` asserts that a multi-RG streaming output (writer reports prefix_len=2) keeps the prefix in the metastore — the regression case for F1. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…onger test verifiers Three adversarial-review findings on the prefix/RG machinery, bundled because they touch the same producer/consumer contract: **F8: Legacy adapter rejects SS-1-violating input upfront.** The adapter walked rows in physical order and emitted one RG per prefix-value run. An unsorted legacy input (rows `[A,A,B,B,A,A]`) produced a 3-RG file where two RGs shared prefix `A`, violating PA-3. The streaming merge engine would later reject it mid-merge — but only after a quietly-bad file had been built. Now `compute_prefix_value_slices` tracks each slice's composite prefix-value bytes and bails with `LegacyAdapterError::InputNotSorted` on duplicates, surfacing the SS-1 violation before any file lands on disk. **F12: Consumer-side SS-3 (cross-layer divergence, discovered while wiring F2's chunk-level verifier into the SS-3 test).** The adapter implements SS-3 correctly (missing-from-schema → synthesized NullArray during slice computation, file stamps `prefix_len = N`). The streaming engine's reader did not: `find_prefix_parquet_col_indices` hard-required every named prefix column to be physically present, so a file the adapter produced from an SS-3 input was unreadable by the merge engine. Now `find_prefix_parquet_col_indices` returns `Vec<Option<PrefixColumn>>` and `extract_rg_composite_prefix_key` emits a constant null marker (`encode_byte_array_prefix(&[])`) for None slots. The column contributes no cross-RG ordering signal (constant everywhere) so region boundaries are driven entirely by the present columns. Both halves of SS-3 now agree end-to-end. Known limitation: cross-file SS-3 — where some inputs have a sort column and others don't — uses [0x00, 0x00] for the null contribution, which sorts BEFORE non-null per the encoded-empty-string convention. That weakly violates SS-2 (nulls sort last). Single-file SS-3 is correct because every RG in such a file contributes the same constant. If cross-file SS-3 becomes a production scenario, the encoding needs a leading-0xff sentinel instead. Not exercised today. **F2/F9/F11: Wire `assert_unique_rg_prefix_keys` into prefix-claiming tests.** Tests asserting `num_row_groups == N` + KV stamped to N would have passed even with an off-by-one in slice-boundary detection or column-content scrambling. The verifier reads chunk-level statistics directly: PA-1 (intra-RG `min == max`) + PA-3 (inter-RG uniqueness) on the composite key. Wired into six tests: - streaming engine: `test_streaming_merge_with_prefix_len_two`, `test_multi_rg_metric_aligned_input_produces_multi_rg_output`, `test_streaming_merge_with_desc_prefix_col` - legacy adapter: `test_target_prefix_len_two_splits_by_metric_and_service`, `test_legacy_input_with_sort_fields_produces_prefix_aligned_multi_rg`, `test_missing_prefix_col_treated_as_null_satisfies_alignment` (now passes thanks to F12). Also: `assert_unique_rg_prefix_keys` no longer short-circuits on single-RG files — they still go through PA-1 because an unsorted single-RG file CAN have `min != max` on a prefix column. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Summary
Restructures PR-6b.2's flat phase 0 → phase 3 into a per-merge-region loop. Unlocks multi-RG metric-aligned input support and produces multi-RG output naturally — one output row group per merge region (typically one per
metric_namewhenrg_partition_prefix_len == 1).Architectural insight
Sort-prefix alignment (
prefix_len >= 1) guarantees that any merge region has AT MOST one row group per input. That single invariant unlocks the restructure: phase 0 no longer has to span all RGs upfront (which would force either column-chunk buffering or a second body GET); it drains one RG's sort cols per input per region as we go.Pipeline
Pre-compute regions from RG metadata. For
prefix_len >= 1, read each RG'smetric_namecolumn-chunk min stat (must equal max — verifies metric-alignment). Group RGs across inputs by prefix_key. Sort regions by prefix_key.For
prefix_len == 0(single-RG inputs only, validated earlier), one region covers everything — behaviour matches PR-6b.2 for today's single-RG inputs.Assign regions to output files by cumulative row count. Caller's
num_outputsis preserved. Each output file gets a contiguous slice of the region list, so output files have non-overlapping key ranges.Per-region processing: for each region in order, advance contributing inputs' decoders through their RGs (drain sort cols of that RG, then stream body cols via the existing page-bounded
BodyColOutputPageAssembler). Each region becomes one output RG in the current writer; when the assignment moves to a new output file, close the previous writer and open a new one.The streaming body-col mechanism from PR-6b.2 (
arrow::compute::interleave+handle.block_on-driven decoder) is unchanged; it just runs over smaller row ranges (one region instead of one whole output).Single-RG-input restriction lifted
PR-6b.2's check that rejected any multi-RG input is replaced with: reject only
prefix_len == 0AND multi-RG (those still need PR-5'sLegacyMultiRGAdapter). Multi-RG metric-aligned inputs are now accepted natively.Per-output schema simplification
PR-6b.2 optimised the per-output schema based on per-output sort col data (drop all-null cols, re-dict-encode low-cardinality strings). With per-region streaming we don't know each region's content until we drain it, so PR-6c.2 declares the writer's schema as the full union schema and leaves output strings as Utf8. Per-output dict re-encoding can be reintroduced later by tracking cardinality during the streaming pass.
Tests
test_multi_rg_metric_aligned_input_produces_multi_rg_output: feeds a 2-RG metric-aligned input (prefix_len = 1, RG 0 = cpu.usage, RG 1 = memory.used); the streaming engine accepts it and produces a 2-RG output (one RG per metric_name region).test_multi_rg_input_rejected→test_legacy_multi_rg_input_rejectedto reflect the new rejection scope (onlyprefix_len == 0multi-RG is rejected; metric-aligned is accepted).10/10 streaming tests pass. 442/442 crate tests pass. Clippy, doc, machete, fmt all clean.
What changed
Added:
Regionstruct +extract_regions_from_metadata+assign_regions_to_output_files(region pre-computation; reads parquet column chunk statistics).OutputAccumulator— gathers per-output static metadata (row_keys, zonemap, metric_names, time_range, service names) across the regions assigned to each output file. Computed atfinalize_outputtime.open_output_writer_for_streaming,process_region,build_sorting_columns_from_schema,finalize_output.Removed (PR-6b.2 multi-output-parallel helpers — no longer needed):
write_streaming_outputs,write_all_columns,write_sort_col_for_all_outputs,write_body_col_for_all_outputs,build_parent_union_schema,writer_states_index_view.OutputWriterStoragestruct,PerOutputStatic,build_per_output_static,derive_output_schema,open_output_writer,finalize_output_writer.Kept:
BodyColOutputPageAssemblerand its iterator (the page-bounded body col fan-out — still the streaming core).collect_service_names_from_page,build_full_union_schema_from_arrow_schemas.Follow-ups deferred
Stack
Base:
gtt/streaming-merge-engine-merger(PR-6b.2 #6409).PR-7 (TBD) wires
ParquetMergeExecutorto the streaming engine and deletesParquetMergeSplitDownloader.