TopK dynamic filter pushdown attempt 2#15770
Conversation
| context | ||
| .transform_up(|node| { | ||
| if node.plan.as_any().downcast_ref::<FilterExec>().is_some() { | ||
| if node.plan.as_any().downcast_ref::<FilterExec>().is_some() || node.plan.as_any().downcast_ref::<SortExec>().is_some() { |
There was a problem hiding this comment.
@berkaysynnada I didn't notice this in the original PR. This seems problematic. IMO doing downcast matching here is a smell that the API needs changing. It limits implementations to a hardcoded list of plans, which defeats the purpose of making DataFusion pluggable / having a dyn ExecutionPlan. The original implementation didn't require this. I think this goes hand-in hand with the revisit parameter. It seems that you were able to get from 3 methods down to 2 by replacing one of them with this downcast matching and the other with the extra recursion via the revisit parameter. It would be great to iterate on this and find a way to avoid the downcast matching.
There was a problem hiding this comment.
Yes, you're right. We can run this pushdown logic on every operator actually, but then it will work in worst-time complexity always. I've shared the solution of removing revisit parameter, and let me open an issue for that. I strongly believe it will be taken and implemented in short time by some people.
To remove these downcasts, I think we can either introduce a new method to the API just returning a boolean saying that "this operator might introduce a filter or not", or try to understand that by the existing API's, maybe with some refactor. Do you have an idea for the latter?
There was a problem hiding this comment.
I propose an API something like this:
trait ExecutionPlan {
fn gather_filters_for_pushdown(
&self,
parent_filters: &[Arc<dyn ExecutionPlan>],
) -> Result<FilterPushdownPlan> {
let unsupported = vec![FilterPushdownSupport::Unsupported; parent_filters.len()];
Ok(
FilterPushdownPlan {
parent_filters_for_children: vec![unsupported; self.children().len()],
self_filters_for_children: vec![vec![]; self.children().len()],
},
)
}
fn propagate_filter_pushdown(
&self,
parent_pushdown_result: Vec<FilterPushdowChildResult>,
_self_filter_pushdown_result: Vec<FilterPushdowChildResult>,
) -> Result<FilterPushdownPropagation> {
Ok(
FilterPushdownPropagation {
parent_filter_result: parent_pushdown_result,
new_node: None,
},
)
}
}
pub struct FilterPushdownPropagation {
parent_filter_result: Vec<FilterPushdowChildResult>,
new_node: Option<Arc<dyn ExecutionPlan>>,
}
#[derive(Debug, Clone, Copy)]
pub enum FilterPushdowChildResult {
Supported,
Unsupported,
}
impl FilterPushdowChildResult {
}
#[derive(Debug, Clone)]
pub enum FilterPushdownSupport {
Supported(Arc<dyn PhysicalExpr>),
Unsupported,
}
#[derive(Debug, Clone)]
pub struct FilterPushdownPlan {
parent_filters_for_children: Vec<Vec<FilterPushdownSupport>>,
self_filters_for_children: Vec<Vec<FilterPushdownSupport>>,
}The optimizer rule will have to do a bit of bookeeping and slicing correctly but this should avoid the need for any downcast matching or retry and minimize clones of plans. And it should do one walk down and up regardless of what ends up happening with the filters.
There was a problem hiding this comment.
Needs fixing of some failing tests, cleanup of the plethora of helper methods I added and a lot of docs but here's the idea: #15801. The points are:
- No downcast matching / hardcoding of implementations
- Only recurses once / no retrying
- Does no cloning / copying for branches that have no changes
- Doesn't insert new operators
|
Pausing this until #15769 is done |
I was able to unblock by wiring up to TestDataSource |
| let mut new_sort = SortExec::new(self.expr.clone(), Arc::clone(&children[0])) | ||
| .with_fetch(self.fetch) | ||
| .with_preserve_partitioning(self.preserve_partitioning); | ||
| new_sort.filter = Arc::clone(&self.filter); |
There was a problem hiding this comment.
I missed this for a while and spent an hour trying to figure out why my test was failing. IMO we should have a test that enforces the invariant that ExecutionPlan::with_new_children(Arc::clone(&node), node.children()) == node
| mod cast; | ||
| mod column; | ||
| mod dynamic_filters; | ||
| pub mod dynamic_filters; |
There was a problem hiding this comment.
This bit has me tripped up. I'm not sure where the right place to put dynamic_filters is such that it's public for our internal use in operators but private from the outside world 🤔
|
@Dandandan I believe with this setup we should be able to achieve with a couple LOC in // Apply the filter to the batch before processing
let filter = Arc::clone(&self.filter) as Arc<dyn PhysicalExpr>;
let batch = filter_and_project(&batch, &filter, None, batch.schema_ref())?;
if batch.num_rows() == 0 {
return Ok(());
}( |
I think we probably want to avoid filtering the entire batch, but indeed, if the filter expression is available it will be only a couple LOC! |
| " | ||
| ); | ||
|
|
||
| // Actually apply the optimization to the plan |
There was a problem hiding this comment.
I recognize these diverge a bit from other tests, happy to move them somewhere better....
|
Marking as ready for review despite not having any numbers to substantiate performance improvement (because we need #15769) given that algorithmically and from experience in the previous PR we know this is a big win it might be okay to merge without interlocking PRs. |
|
@adriangb I'll complete reviewing this after merging other open PR's. |
6ec4de1 to
b3431ab
Compare
Thanks for all of the reviews @berkaysynnada. This one is now ready again. |
4e69fac to
bdc341c
Compare
|
I think some tweaks will be needed based on https://github.com/apache/datafusion/pull/15769/files#r2074207291 |
bdc341c to
73b800a
Compare
This is super nice. |
And maybe #16424 will speed up the wide partitions case by stopping those scans early! |
|
I'll also run some profiling on those topk benchmarks to see if there is any further low hanging fruit. |
|
Hm @adriangb another thing I wondered is Perhaps we can compare against the current filter and only update the expression if it is greater / more selective? |
Yeah I think that would be good. |
|
woohoo! |
This reverts commit 6e83cf4.
|
Can someone (@adriangb maybe?) give me a high level summary of what was improved in this PR for the DF 49 blog post? The performance improvement highlighted in #15770 (comment) seem to make it a worthy candidate for highlighting in the performance improvement section :) |
|
Awesome, thanks. Yes, I think a full blog post for this would be amazing but I'd still like to have it in the release blog post with a short summary and maybe a few stats. |
…atch (apache#22852) ## Which issue does this PR close? - Closes apache#22849 - A related cross-partition starvation case is tracked separately in apache#22874 and addressed by an upcoming follow-up PR — see [discussion](apache#22852 (comment)) for details ## Rationale for this change `TopK::insert_batch` short-circuits when the heap's dynamic filter rejects every row in a batch: ```rust if !filter.has_true() { // nothing to filter, so no need to update return Ok(()); } ``` The early-exit check `attempt_early_completion(&batch)` lives later in the same function, gated on `replacements > 0`. So a batch that the filter rejects entirely bypasses the check. The heap's dynamic filter is derived from the heap's worst row (via `update_filter`). A batch whose rows all come from a strictly worse sort prefix is exactly the batch the filter rejects entirely — i.e. the very signal `attempt_early_completion` is designed to detect ("the next batch is past the heap's boundary, we can stop") is what causes the function to short-circuit *before* the check runs. This is a feature-interaction regression between two PRs that were both correct in isolation. The `attempt_early_completion` mechanism was added by apache#15563 (closing apache#15529). At the time, there was no heap-derived dynamic filter on TopK, so the only sensible call site was right after a successful heap insertion. Two months later, apache#15770 added the dynamic-filter pushdown for TopK sorts, introducing the `!filter.has_true()` short-circuit. The two features address different problems and the new short-circuit didn't connect to the existing prefix-completion check — which is how this gap opened up. **Consequence**: on a TopK over an input ordered on the sort prefix, `finished = true` is never set once the heap stabilizes. Since `finished` is the signal `SortExec` uses to stop pulling from its input (via `Poll::Ready(None)` from the TopK stream, which cascades into dropping the source stream), the source keeps being polled long past the point where no further row can improve the heap. The LIMIT optimization effectively degrades to "heap saves memory but reads everything"; sources with cancellable streams (e.g. networked sources) never receive the cancellation signal. ## What changes are included in this PR? Single behavioral change in `datafusion/physical-plan/src/topk/mod.rs`: call `attempt_early_completion(&batch)` immediately before the `return Ok(())` in the `!filter.has_true()` branch. Why this scope, not a broader restructuring: - The existing `attempt_early_completion` call inside `if replacements > 0` is load-bearing for a related case: a batch containing a mix of "still valuable" rows and "past the boundary" rows. The existing `test_try_finish_marks_finished_with_prefix` test covers this case — Batch 2 with `a=[2,3], b=[10,20]` against a heap where `heap.max.a = 2`; the `(2, 10)` row must be inserted before the check on the `(3, 20)` last row triggers. Moving the call earlier would skip the insertion of valuable rows and break that test. - The bug is specifically that the *short-circuit* path doesn't call the check. The fix targets exactly that path. - A related but separate gap is not addressed here: when `filter.has_true() == true` but `replacements == 0` (the filter accepts some rows but `find_new_topk_items` ends up inserting none of them), the existing call inside `if replacements > 0` is also skipped. This requires a divergence between the heap's filter predicate and the row-byte comparison used inside `find_new_topk_items`, which shouldn't normally happen (the filter is derived from the heap's worst row using the same comparator). A deterministic synthetic repro would likely require concurrent heap updates from sibling partitions or boundary-value edge cases (NaN/NULL semantics, type coercion). Happy to send a follow-up if reviewers want it covered; the workload that motivated this fix was the filter-rejection case empirically. ## Are these changes tested? Yes. Added a regression test `test_try_finish_fires_when_filter_rejects_entire_batch`. The assertion target is `topk.finished` — the flag that signals "stop pulling from the source" to upstream consumers (read by `TopKExec::poll_next` to emit `Poll::Ready(None)`). Asserting that the flag transitions on the fully-filter-rejected batch is equivalent to asserting that the source-stopping mechanism activates. - Builds a TopK over a `(a, b)` sort with prefix `a`, k=3. - Inserts a batch that fills the heap with rows from `a ∈ {1, 2}`; `update_filter` tightens the filter to `a < 2 OR (a = 2 AND b < 30)`. - Inserts a second batch with all rows at `a = 3` — filter rejects every row. - Without the fix: `insert_batch` short-circuits, `topk.finished` stays `false`. Test fails. - With the fix: `attempt_early_completion` fires (last-row prefix `a = 3` > heap.max prefix `a = 2`), `topk.finished` becomes `true`. Test passes. The test also asserts the emitted top-K is unchanged from after batch 1, confirming no candidate row was incorrectly excluded by the early bail. All 28 existing `topk::` tests continue to pass (including `test_try_finish_marks_finished_with_prefix`, which exercises the mixed-prefix case). ## Are there any user-facing changes? No public API or output changes. The fix only changes when TopK marks itself `finished = true` — specifically, it now fires `attempt_early_completion` for batches that are entirely rejected by the heap's dynamic filter, where previously it would silently skip the check. Output of TopK is unchanged; only the early-exit behavior improves. --------- Co-authored-by: Gabriel <45515538+gabotechs@users.noreply.github.com>
…atch (apache#22852) ## Which issue does this PR close? - Closes apache#22849 - A related cross-partition starvation case is tracked separately in apache#22874 and addressed by an upcoming follow-up PR — see [discussion](apache#22852 (comment)) for details ## Rationale for this change `TopK::insert_batch` short-circuits when the heap's dynamic filter rejects every row in a batch: ```rust if !filter.has_true() { // nothing to filter, so no need to update return Ok(()); } ``` The early-exit check `attempt_early_completion(&batch)` lives later in the same function, gated on `replacements > 0`. So a batch that the filter rejects entirely bypasses the check. The heap's dynamic filter is derived from the heap's worst row (via `update_filter`). A batch whose rows all come from a strictly worse sort prefix is exactly the batch the filter rejects entirely — i.e. the very signal `attempt_early_completion` is designed to detect ("the next batch is past the heap's boundary, we can stop") is what causes the function to short-circuit *before* the check runs. This is a feature-interaction regression between two PRs that were both correct in isolation. The `attempt_early_completion` mechanism was added by apache#15563 (closing apache#15529). At the time, there was no heap-derived dynamic filter on TopK, so the only sensible call site was right after a successful heap insertion. Two months later, apache#15770 added the dynamic-filter pushdown for TopK sorts, introducing the `!filter.has_true()` short-circuit. The two features address different problems and the new short-circuit didn't connect to the existing prefix-completion check — which is how this gap opened up. **Consequence**: on a TopK over an input ordered on the sort prefix, `finished = true` is never set once the heap stabilizes. Since `finished` is the signal `SortExec` uses to stop pulling from its input (via `Poll::Ready(None)` from the TopK stream, which cascades into dropping the source stream), the source keeps being polled long past the point where no further row can improve the heap. The LIMIT optimization effectively degrades to "heap saves memory but reads everything"; sources with cancellable streams (e.g. networked sources) never receive the cancellation signal. ## What changes are included in this PR? Single behavioral change in `datafusion/physical-plan/src/topk/mod.rs`: call `attempt_early_completion(&batch)` immediately before the `return Ok(())` in the `!filter.has_true()` branch. Why this scope, not a broader restructuring: - The existing `attempt_early_completion` call inside `if replacements > 0` is load-bearing for a related case: a batch containing a mix of "still valuable" rows and "past the boundary" rows. The existing `test_try_finish_marks_finished_with_prefix` test covers this case — Batch 2 with `a=[2,3], b=[10,20]` against a heap where `heap.max.a = 2`; the `(2, 10)` row must be inserted before the check on the `(3, 20)` last row triggers. Moving the call earlier would skip the insertion of valuable rows and break that test. - The bug is specifically that the *short-circuit* path doesn't call the check. The fix targets exactly that path. - A related but separate gap is not addressed here: when `filter.has_true() == true` but `replacements == 0` (the filter accepts some rows but `find_new_topk_items` ends up inserting none of them), the existing call inside `if replacements > 0` is also skipped. This requires a divergence between the heap's filter predicate and the row-byte comparison used inside `find_new_topk_items`, which shouldn't normally happen (the filter is derived from the heap's worst row using the same comparator). A deterministic synthetic repro would likely require concurrent heap updates from sibling partitions or boundary-value edge cases (NaN/NULL semantics, type coercion). Happy to send a follow-up if reviewers want it covered; the workload that motivated this fix was the filter-rejection case empirically. ## Are these changes tested? Yes. Added a regression test `test_try_finish_fires_when_filter_rejects_entire_batch`. The assertion target is `topk.finished` — the flag that signals "stop pulling from the source" to upstream consumers (read by `TopKExec::poll_next` to emit `Poll::Ready(None)`). Asserting that the flag transitions on the fully-filter-rejected batch is equivalent to asserting that the source-stopping mechanism activates. - Builds a TopK over a `(a, b)` sort with prefix `a`, k=3. - Inserts a batch that fills the heap with rows from `a ∈ {1, 2}`; `update_filter` tightens the filter to `a < 2 OR (a = 2 AND b < 30)`. - Inserts a second batch with all rows at `a = 3` — filter rejects every row. - Without the fix: `insert_batch` short-circuits, `topk.finished` stays `false`. Test fails. - With the fix: `attempt_early_completion` fires (last-row prefix `a = 3` > heap.max prefix `a = 2`), `topk.finished` becomes `true`. Test passes. The test also asserts the emitted top-K is unchanged from after batch 1, confirming no candidate row was incorrectly excluded by the early bail. All 28 existing `topk::` tests continue to pass (including `test_try_finish_marks_finished_with_prefix`, which exercises the mixed-prefix case). ## Are there any user-facing changes? No public API or output changes. The fix only changes when TopK marks itself `finished = true` — specifically, it now fires `attempt_early_completion` for batches that are entirely rejected by the heap's dynamic filter, where previously it would silently skip the check. Output of TopK is unchanged; only the early-exit behavior improves. --------- Co-authored-by: Gabriel <45515538+gabotechs@users.noreply.github.com> (cherry picked from commit 6520315)
## Which issue does this PR close? Closes apache#22874. Follow-up to [fix(topk): call attempt_early_completion when filter rejects entire batch]. ## Rationale for this change [TopK dynamic filter pushdown attempt 2] lets `SortExec` tighten scan-side predicates while a TopK heap finds better rows. Once the current TopK threshold is known, scans can skip data that cannot enter the final `ORDER BY ... LIMIT` result. That works well for a single partition. Partitioned `SortExec` has one extra case to handle: - each output partition has its own local `TopK` heap - those local heaps share one `TopKDynamicFilters` instance - one partition can tighten the shared filter before another partition has enough rows to fill its local heap [fix(topk): call attempt_early_completion when filter rejects entire batch] fixed the local case where a heap already has a max row and the dynamic filter rejects a whole batch. This PR fixes the remaining shared-filter case. A lagging partition can now use the shared prefix threshold to stop early even when its local heap is still empty. If there is no shared threshold yet, it falls back to the existing local heap prefix check. The shared prefix check is not treating another partition's threshold as this partition's local heap boundary. It uses the same threshold that already drives the shared dynamic filter. Once a partition's ordered input has moved past that shared prefix threshold, later batches from that partition cannot add rows that survive the shared filter. The local heap still emits the candidates it has already kept; this only stops pulling input that can no longer add candidates. Single-partition behavior is unchanged. ## How the TopK optimizations fit together There are two existing optimizations involved here: - Dynamic filter pushdown: once a `TopK` heap has K rows, its worst kept row becomes a threshold. That threshold tightens a scan-side filter so later data that cannot enter the final `ORDER BY ... LIMIT` result can be skipped. - Prefix early exit: when the input is ordered by a prefix of the requested sort, `TopK` can stop pulling once the last row in a batch is past a known TopK boundary on that shared prefix. For a partition-preserving `SortExec`, those optimizations meet in one shared place. Each output partition has a local `TopK` heap, but all of those local heaps publish into one shared dynamic filter. A partition that fills first can tighten the shared filter for everyone else. Lagging partitions then need to use that same shared prefix threshold to stop pulling once their ordered input has moved past it. This PR makes that composition explicit: the shared filter stays alive until every local `TopK` has emitted, and the shared threshold carries its common-prefix row so lagging partitions can apply the same prefix early-exit check. ## What changes are included in this PR? - Check early completion when a batch passes the dynamic filter but produces zero heap replacements. - Track local TopK emitters so a shared filter completes only after the last emitter has produced output. - Store the shared threshold and its common-prefix row together in `TopKDynamicFilters`. - Check the shared prefix in `attempt_early_completion` before falling back to the local heap prefix. - Add focused TopK and `SortExec` tests for the local zero-replacement path, early completion before local heap fill, equal-prefix non-completion, DESC/null prefix ordering, and shared-filter completion. ## How is this split for review? The commits are ordered so each one has a narrow job: 1. `Check TopK early completion after zero-replacement batches` Handles the local case where a batch passes the dynamic filter, produces zero heap replacements, but still proves later rows cannot enter the TopK. 2. `Complete shared TopK filters after all emitters` Keeps a shared dynamic filter watchable until every local TopK emitter has emitted. This includes the `SortExec` wiring for preserved partitioning. 3. `Use shared TopK prefix thresholds for early exit` Carries the shared threshold's common-prefix row so lagging partitions can stop before their local heap is full. ## Are these changes tested? Correctness is covered by targeted tests for: - the local zero-replacement early-completion path - early completion before local heap fill from a shared prefix threshold - equal-prefix non-completion - DESC and NULLS LAST prefix row ordering - shared-filter completion after all TopK emitters, including preserved `SortExec` partitioning Relevant background: - [perf: Add TopK benchmarks as variation over the `sort_tpch` benchmarks] added the benchmark setup used here. - [perf: Introduce sort prefix computation for early TopK exit optimization on partially sorted input (10x speedup on top10 bench)] added common-prefix TopK early termination. - [TopK dynamic filter pushdown attempt 2] added scan-side dynamic filter pushdown and exposed the shared-filter / local-heap interaction. - [fix(topk): call attempt_early_completion when filter rejects entire batch] fixed the local all-filtered-batch case. - This PR fixes the remaining partitioned shared-filter case. Benchmark command: ```bash dfbench sort-tpch --sorted --limit 10 --iterations 5 \ --path /tmp/df-topk-bench-data/tpch_sf1 \ -o /tmp/topk-shared-prefix-followup.json ``` Both sides were rebuilt with fresh isolated `release-nonlto` target directories before running the benchmark. This is not the regular `topk_tpch` script: this case needs `--sorted --limit 10` to exercise the prefix early-exit path. Clean rerun against the PR base commit `7bb6e152b`. Times are milliseconds, using the average of 5 iterations for each row. | scope | PR base | this PR | change | |---|---:|---:|---:| | all sort-tpch queries | 760.59 | 348.84 | -54.1% | | Q8 | 49.70 | 7.66 | -84.6% | | Q9 | 92.13 | 10.27 | -88.8% | | Q10 | 89.66 | 14.35 | -84.0% | The `DataSourceExec` counters show the less noisy part of the result. Q8/Q9/Q10 now emit only the first batch from each partition instead of continuing to drain millions of rows. | query | PR base `DataSourceExec output_rows` | this PR `DataSourceExec output_rows` | PR base `bytes_scanned` | this PR `bytes_scanned` | |---|---:|---:|---:|---:| | Q8 | 3.66M | 81.92K | 56.81M | 15.79M | | Q9 | 3.66M | 81.92K | 75.19M | 20.89M | | Q10 | 3.10M | 81.92K | 110.9M | 34.69M | ## Are there any user-facing changes? No. This is an internal physical execution optimization fix. [perf: Add TopK benchmarks as variation over the `sort_tpch` benchmarks]: apache#15560 [perf: Introduce sort prefix computation for early TopK exit optimization on partially sorted input (10x speedup on top10 bench)]: apache#15563 [TopK dynamic filter pushdown attempt 2]: apache#15770 [fix(topk): call attempt_early_completion when filter rejects entire batch]: apache#22852 --------- Co-authored-by: kosiew <kosiew@gmail.com>

ORDER BY LIMITqueries) #15037