-
Notifications
You must be signed in to change notification settings - Fork 335
feat: core SPI for contrib leaf scans (CometScanWithPlanData) [Delta contrib split, part 1] #4700
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,6 +27,7 @@ import scala.jdk.CollectionConverters._ | |
|
|
||
| import org.apache.spark.{Partition, TaskContext} | ||
| import org.apache.spark.broadcast.Broadcast | ||
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.rdd.RDD | ||
| import org.apache.spark.sql.catalyst.InternalRow | ||
| import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeSet, Expression, ExpressionSet, Generator, NamedExpression, SortOrder} | ||
|
|
@@ -87,14 +88,42 @@ private[comet] trait PlanDataInjector { | |
| /** | ||
| * Registry and utilities for injecting per-partition planning data into operator trees. | ||
| */ | ||
| private[comet] object PlanDataInjector { | ||
|
|
||
| // Registry of injectors for different operator types | ||
| private val injectors: Seq[PlanDataInjector] = Seq( | ||
| IcebergPlanDataInjector, | ||
| NativeScanPlanDataInjector | ||
| // Future: DeltaPlanDataInjector, HudiPlanDataInjector, etc. | ||
| ) | ||
| private[comet] object PlanDataInjector extends Logging { | ||
|
|
||
| // Registry of injectors for different operator types. The contrib/delta integration's | ||
| // DeltaPlanDataInjector is appended via one reflective class lookup -- present only when | ||
| // the contrib was bundled (i.e. -Pcontrib-delta on the Maven build). Default builds get | ||
| // the empty Option and an unmodified injectors list, so there's zero contrib surface at | ||
| // runtime on default builds. | ||
| private val injectors: Seq[PlanDataInjector] = { | ||
| val builtin: Seq[PlanDataInjector] = Seq(IcebergPlanDataInjector, NativeScanPlanDataInjector) | ||
| val deltaOpt: Option[PlanDataInjector] = | ||
| try { | ||
| // Scala compiles `object Foo` into BOTH `Foo.class` (a static-forwarder | ||
| // class with no MODULE$ field) AND `Foo$.class` (the module class that | ||
| // does have MODULE$). The trailing `$` selects the module class. | ||
| // scalastyle:off classforname | ||
| val cls = Class.forName("org.apache.spark.sql.comet.DeltaPlanDataInjector$") | ||
| // scalastyle:on classforname | ||
| Some(cls.getField("MODULE$").get(null).asInstanceOf[PlanDataInjector]) | ||
| } catch { | ||
| // Default builds (no -Pcontrib-delta) won't have the class -> silent None. | ||
| // This is the only EXPECTED miss, so it's the only quiet one. | ||
| case _: ClassNotFoundException => None | ||
| // The class IS on the classpath but couldn't be bound: missing MODULE$, | ||
| // access drift (NoSuchField/IllegalAccess), an initializer/linkage error, | ||
| // or a CCE on the PlanDataInjector cast. That's a misbuilt contrib jar, not | ||
| // a default build -- warn so it's diagnosable, then still decline so the | ||
| // rest of the planner stays alive. | ||
| case e: Throwable => | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe use |
||
| logWarning( | ||
| "Found org.apache.spark.sql.comet.DeltaPlanDataInjector$ on classpath " + | ||
| "but failed to load it; skipping contrib-delta plan-data injection", | ||
| e) | ||
| None | ||
| } | ||
| builtin ++ deltaOpt | ||
| } | ||
|
|
||
| // O(1) lookup by op kind: most operators in any tree don't match any injector, so the per-op | ||
| // `for (injector <- injectors if injector.canInject(op))` walk was paying N*M canInject calls | ||
|
|
@@ -718,8 +747,16 @@ abstract class CometNativeExec extends CometExec { | |
| */ | ||
| def foreachUntilCometInput(plan: SparkPlan)(func: SparkPlan => Unit): Unit = { | ||
| plan match { | ||
| case _: CometNativeScanExec | _: CometScanExec | _: CometBatchScanExec | | ||
| _: CometIcebergNativeScanExec | _: CometCsvNativeScanExec | _: ShuffleQueryStageExec | | ||
| // Match `CometLeafExec` first so contrib leaf scans (e.g. the Delta | ||
| // contrib's `CometDeltaNativeScanExec`) are recognised as input boundaries | ||
| // without requiring a core compile-time reference to the contrib class. | ||
| // All built-in leaf scans (`CometNativeScanExec`, `CometIcebergNativeScanExec`, | ||
| // `CometCsvNativeScanExec`) also extend `CometLeafExec`, so this is a | ||
| // strict superset of the previous enumeration -- it just generalises the | ||
| // input-boundary concept from "this fixed list" to "any leaf Comet exec". | ||
| case _: CometLeafExec => | ||
| func(plan) | ||
| case _: CometScanExec | _: CometBatchScanExec | _: ShuffleQueryStageExec | | ||
| _: AQEShuffleReadExec | _: CometShuffleExchangeExec | _: CometUnionExec | | ||
| _: CometTakeOrderedAndProjectExec | _: CometCoalesceExec | _: ReusedExchangeExec | | ||
| _: CometBroadcastExchangeExec | _: BroadcastQueryStageExec | | ||
|
|
@@ -786,11 +823,16 @@ abstract class CometNativeExec extends CometExec { | |
| (Map.empty, Map.empty) | ||
| } | ||
|
|
||
| case nativeScan: CometNativeScanExec => | ||
| nativeScan.ensureSubqueriesResolved() | ||
| ( | ||
| Map(nativeScan.sourceKey -> nativeScan.commonData), | ||
| Map(nativeScan.sourceKey -> nativeScan.perPartitionData)) | ||
| // Generic path for leaf scans that surface planning data via the | ||
| // `CometScanWithPlanData` trait. Catches `CometNativeScanExec` and any contrib | ||
| // leaf scan (e.g. the Delta contrib's `CometDeltaNativeScanExec`) without | ||
| // requiring core to compile-time reference contrib classes. | ||
| case s: CometScanWithPlanData => | ||
| s match { | ||
| case leaf: CometLeafExec => leaf.ensureSubqueriesResolved() | ||
| case _ => // no DPP lifecycle to drive | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The iceberg path handles |
||
| } | ||
| (Map(s.sourceKey -> s.commonData), Map(s.sourceKey -> s.perPartitionData)) | ||
|
|
||
| // Broadcast stages are boundaries - don't collect per-partition data from inside them. | ||
| // After DPP filtering, broadcast scans may have different partition counts than the | ||
|
|
@@ -901,6 +943,43 @@ abstract class CometLeafExec extends CometNativeExec with LeafExecNode { | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Marker trait for scan execs that surface planning data (a `commonData` block + per-partition | ||
| * task bytes keyed by `sourceKey`) so that a parent `CometNativeExec` can find and inject the | ||
| * data when the scan is fused into a larger native subtree. | ||
| * | ||
| * Implemented by `CometNativeScanExec` and the contrib's `CometDeltaNativeScanExec` -- without | ||
| * it, [[PlanDataInjector.findAllPlanData]] cannot collect the per-partition tasks and the | ||
| * parent's native execution receives an empty input. (`CometIcebergNativeScanExec` does NOT use | ||
| * this trait; it has a dedicated `findAllPlanData` case.) | ||
| * | ||
| * Each implementation also resolves its own DPP subqueries via `ensureSubqueriesResolved` | ||
| * (overridden from [[CometLeafExec]]) before `commonData`/`perPartitionData` are read. | ||
| */ | ||
| trait CometScanWithPlanData { | ||
| def sourceKey: String | ||
| def commonData: Array[Byte] | ||
| def perPartitionData: Array[Array[Byte]] | ||
|
|
||
| // DPP / partition filters that may carry AQE SubqueryAdaptiveBroadcast | ||
| // subqueries needing rewrite by CometPlanAdaptiveDynamicPruningFilters. | ||
| // Default empty: scans with dedicated handling (CometNativeScanExec, | ||
| // CometIcebergNativeScanExec) don't use this path. | ||
| def dynamicPruningFilters: Seq[Expression] = Nil | ||
|
|
||
| // Install rewritten DPP filters on this scan. Implementers whose filters live | ||
| // in a @transient field (which TreeNode.makeCopy can't carry, #3510) update | ||
| // them via a transient side-channel and return `this` -- so the optimizer | ||
| // rule's rewrite lands on the SAME instance that executes, instead of a copy | ||
| // that gets dropped when the enclosing native block is rebuilt. Only called | ||
| // when `dynamicPruningFilters` is non-empty, so the default is never reached | ||
| // for scans that leave it empty. | ||
| def withDynamicPruningFilters(filters: Seq[Expression]): SparkPlan = | ||
| throw new UnsupportedOperationException( | ||
| s"${getClass.getSimpleName} exposes dynamicPruningFilters but does not " + | ||
| "override withDynamicPruningFilters") | ||
| } | ||
|
|
||
| abstract class CometUnaryExec extends CometNativeExec with UnaryExecNode | ||
|
|
||
| abstract class CometBinaryExec extends CometNativeExec with BinaryExecNode | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,78 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.comet | ||
|
|
||
| import org.scalatest.funsuite.AnyFunSuite | ||
|
|
||
| import org.apache.spark.sql.catalyst.expressions.Expression | ||
|
|
||
| /** | ||
| * Unit coverage for the core SPI that lets out-of-tree contrib leaf scans (e.g. the Delta | ||
| * contrib's `CometDeltaNativeScanExec`) participate in Comet native planning without core holding | ||
| * a compile-time reference to them: the [[CometScanWithPlanData]] trait contract and the | ||
| * reflective `DeltaPlanDataInjector` slot in [[PlanDataInjector]]'s registry. | ||
| * | ||
| * Deliberately does not exercise the per-op injector registry mechanics; that surface is owned by | ||
| * `PlanDataInjectorSuite`. | ||
| */ | ||
| class CometScanWithPlanDataSuite extends AnyFunSuite { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This really needs a test that implements a stub/mock |
||
|
|
||
| /** Minimal trait implementation that opts into none of the optional DPP behaviour. */ | ||
| private class StubScan extends CometScanWithPlanData { | ||
| override def sourceKey: String = "stub-key" | ||
| override def commonData: Array[Byte] = Array.emptyByteArray | ||
| override def perPartitionData: Array[Array[Byte]] = Array.empty | ||
| } | ||
|
|
||
| test( | ||
| "CometScanWithPlanData defaults: no DPP filters and withDynamicPruningFilters is unsupported") { | ||
| val scan = new StubScan | ||
| assert(scan.dynamicPruningFilters == Nil, "a scan must expose no DPP filters by default") | ||
| // The default exists only so the DPP rule never calls it for scans that leave | ||
| // dynamicPruningFilters empty; if a scan does expose filters it must override this. | ||
| val ex = intercept[UnsupportedOperationException] { | ||
| scan.withDynamicPruningFilters(Seq.empty[Expression]) | ||
| } | ||
| assert( | ||
| ex.getMessage.contains("withDynamicPruningFilters"), | ||
| s"default failure must name the un-overridden method, got: ${ex.getMessage}") | ||
| } | ||
|
|
||
| test("contrib DeltaPlanDataInjector slot is absent in default builds") { | ||
| // PlanDataInjector appends a reflective `DeltaPlanDataInjector$` slot to its registry only | ||
| // when the contrib-delta build bundled it. A default build must not contain the class, so the | ||
| // reflective lookup degrades to a plain ClassNotFoundException (-> None) and the registry keeps | ||
| // only its built-in injectors. (That the registry still passes non-scan operator trees through | ||
| // unchanged is covered by `PlanDataInjectorSuite`, which exercises the post-registry behaviour | ||
| // directly; this suite asserts only the contract A.1 adds -- the slot's optional presence.) | ||
| val deltaSlotAbsent = | ||
| try { | ||
| // scalastyle:off classforname | ||
| Class.forName("org.apache.spark.sql.comet.DeltaPlanDataInjector$") | ||
| // scalastyle:on classforname | ||
| false | ||
| } catch { | ||
| case _: ClassNotFoundException => true | ||
| } | ||
| assert( | ||
| deltaSlotAbsent, | ||
| "default (non -Pcontrib-delta) build must not carry the contrib DeltaPlanDataInjector") | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would require every other contrib to have to add their own block of code. Consider ServiceLoader (META-INF/services/...PlanDataInjector) so contribs are discoverable without modifying core.