From fc48c0cfeb986fc6c8a21d697c51be48059a547f Mon Sep 17 00:00:00 2001 From: Scott Schenkein Date: Sat, 13 Jun 2026 09:09:36 -0400 Subject: [PATCH] feat: core SPI for contrib leaf scans (CometScanWithPlanData) Introduce a small extension contract so out-of-tree Comet contrib leaf scans (Delta, and future Hudi/etc.) can participate in native planning without core holding a compile-time reference to them -- mirroring the Iceberg-precedent of keeping the data-source-specific code at the edge. What this adds: - `trait CometScanWithPlanData` (`sourceKey` / `commonData` / `perPartitionData`, plus optional `dynamicPruningFilters` / `withDynamicPruningFilters` for scans whose DPP filters live in a @transient field). `CometNativeScanExec` now mixes it in. - `CometNativeExec.foreachUntilCometInput` matches `case _: CometLeafExec` (a strict superset of the previous fixed scan enumeration -- all built-in leaf scans already extend `CometLeafExec`), so any leaf Comet exec is recognised as an input boundary. - `PlanDataInjector.findAllPlanData` collects per-partition planning data via the trait instead of a hardcoded `CometNativeScanExec` match. - `PlanDataInjector`'s registry gains one reflective `DeltaPlanDataInjector$` slot, appended only when a contrib bundled it (`-Pcontrib-delta`). Default builds get a `ClassNotFoundException` -> `None` and an unchanged injectors list, so there is zero contrib surface at runtime. - `CometPlanAdaptiveDynamicPruningFilters` rewrites AQE DPP filters in place for trait scans whose filters can't survive `makeCopy` (#3510). Inert by construction: with no contrib on the classpath this is behavior- preserving (the leaf match is a superset; the trait match catches the same `CometNativeScanExec`; the reflective slot resolves to nothing). Tests: `CometScanWithPlanDataSuite` (trait-contract defaults + reflective-slot graceful absence). Verified `CometJoinSuite` (native scan fusion / DPP) stays green. First unit of the Delta-contrib PR split (tracking: #4366). --- .github/workflows/pr_build_linux.yml | 1 + .github/workflows/pr_build_macos.yml | 1 + ...metPlanAdaptiveDynamicPruningFilters.scala | 9 ++ .../spark/sql/comet/CometNativeScanExec.scala | 3 +- .../apache/spark/sql/comet/operators.scala | 109 +++++++++++++++--- .../comet/CometScanWithPlanDataSuite.scala | 78 +++++++++++++ 6 files changed, 185 insertions(+), 16 deletions(-) create mode 100644 spark/src/test/scala/org/apache/spark/sql/comet/CometScanWithPlanDataSuite.scala diff --git a/.github/workflows/pr_build_linux.yml b/.github/workflows/pr_build_linux.yml index 174eae89c6..5aa9241614 100644 --- a/.github/workflows/pr_build_linux.yml +++ b/.github/workflows/pr_build_linux.yml @@ -358,6 +358,7 @@ jobs: org.apache.spark.sql.comet.CometShuffleFallbackStickinessSuite org.apache.spark.sql.comet.PlanDataInjectorSuite org.apache.spark.sql.comet.CometDecimalArithmeticViewSuite + org.apache.spark.sql.comet.CometScanWithPlanDataSuite org.apache.spark.sql.comet.util.UtilsSuite org.apache.comet.objectstore.NativeConfigSuite org.apache.spark.sql.CometToPrettyStringSuite diff --git a/.github/workflows/pr_build_macos.yml b/.github/workflows/pr_build_macos.yml index 2294e00363..b499ac2c87 100644 --- a/.github/workflows/pr_build_macos.yml +++ b/.github/workflows/pr_build_macos.yml @@ -174,6 +174,7 @@ jobs: org.apache.spark.sql.comet.CometShuffleFallbackStickinessSuite org.apache.spark.sql.comet.PlanDataInjectorSuite org.apache.spark.sql.comet.CometDecimalArithmeticViewSuite + org.apache.spark.sql.comet.CometScanWithPlanDataSuite org.apache.spark.sql.comet.util.UtilsSuite org.apache.comet.objectstore.NativeConfigSuite org.apache.spark.sql.CometToPrettyStringSuite diff --git a/spark/src/main/scala/org/apache/comet/rules/CometPlanAdaptiveDynamicPruningFilters.scala b/spark/src/main/scala/org/apache/comet/rules/CometPlanAdaptiveDynamicPruningFilters.scala index 217f8bc314..8876a28e05 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometPlanAdaptiveDynamicPruningFilters.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometPlanAdaptiveDynamicPruningFilters.scala @@ -83,6 +83,15 @@ case object CometPlanAdaptiveDynamicPruningFilters if icebergScan.runtimeFilters.exists(hasCometSAB) => logDebug("Converting AQE DPP for CometIcebergNativeScanExec") convertIcebergScanDPP(icebergScan, plan) + // Comet scans whose DPP filters live in a @transient field (the contrib's + // CometDeltaNativeScanExec). transformExpressions/makeCopy can't rewrite + // them, and a rewritten copy is orphaned when the enclosing native block + // is rebuilt (#3510). The scan's `withDynamicPruningFilters` installs the + // rewrite in place and returns `this`, so it lands on the executing + // instance. + case p: CometScanWithPlanData if p.dynamicPruningFilters.exists(hasCometSAB) => + logDebug(s"Converting AQE DPP for ${p.getClass.getSimpleName} in place") + p.withDynamicPruningFilters(p.dynamicPruningFilters.map(f => convertFilter(f, plan))) case p: SparkPlan if !p.isInstanceOf[CometNativeScanExec] && !p.isInstanceOf[CometIcebergNativeScanExec] diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala index 0ce8547563..f620652214 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala @@ -71,7 +71,8 @@ case class CometNativeScanExec( sourceKey: String) // Key for PlanDataInjector to match common+partition data at runtime extends CometLeafExec with DataSourceScanExec - with ShimStreamSourceAwareSparkPlan { + with ShimStreamSourceAwareSparkPlan + with CometScanWithPlanData { override lazy val metadata: Map[String, String] = if (originalPlan != null) originalPlan.metadata else Map.empty diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index af9e1df8a3..f74617ed28 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -27,6 +27,7 @@ import scala.jdk.CollectionConverters._ import org.apache.spark.{Partition, TaskContext} import org.apache.spark.broadcast.Broadcast +import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeSet, Expression, ExpressionSet, Generator, NamedExpression, SortOrder} @@ -87,14 +88,42 @@ private[comet] trait PlanDataInjector { /** * Registry and utilities for injecting per-partition planning data into operator trees. */ -private[comet] object PlanDataInjector { - - // Registry of injectors for different operator types - private val injectors: Seq[PlanDataInjector] = Seq( - IcebergPlanDataInjector, - NativeScanPlanDataInjector - // Future: DeltaPlanDataInjector, HudiPlanDataInjector, etc. - ) +private[comet] object PlanDataInjector extends Logging { + + // Registry of injectors for different operator types. The contrib/delta integration's + // DeltaPlanDataInjector is appended via one reflective class lookup -- present only when + // the contrib was bundled (i.e. -Pcontrib-delta on the Maven build). Default builds get + // the empty Option and an unmodified injectors list, so there's zero contrib surface at + // runtime on default builds. + private val injectors: Seq[PlanDataInjector] = { + val builtin: Seq[PlanDataInjector] = Seq(IcebergPlanDataInjector, NativeScanPlanDataInjector) + val deltaOpt: Option[PlanDataInjector] = + try { + // Scala compiles `object Foo` into BOTH `Foo.class` (a static-forwarder + // class with no MODULE$ field) AND `Foo$.class` (the module class that + // does have MODULE$). The trailing `$` selects the module class. + // scalastyle:off classforname + val cls = Class.forName("org.apache.spark.sql.comet.DeltaPlanDataInjector$") + // scalastyle:on classforname + Some(cls.getField("MODULE$").get(null).asInstanceOf[PlanDataInjector]) + } catch { + // Default builds (no -Pcontrib-delta) won't have the class -> silent None. + // This is the only EXPECTED miss, so it's the only quiet one. + case _: ClassNotFoundException => None + // The class IS on the classpath but couldn't be bound: missing MODULE$, + // access drift (NoSuchField/IllegalAccess), an initializer/linkage error, + // or a CCE on the PlanDataInjector cast. That's a misbuilt contrib jar, not + // a default build -- warn so it's diagnosable, then still decline so the + // rest of the planner stays alive. + case e: Throwable => + logWarning( + "Found org.apache.spark.sql.comet.DeltaPlanDataInjector$ on classpath " + + "but failed to load it; skipping contrib-delta plan-data injection", + e) + None + } + builtin ++ deltaOpt + } // O(1) lookup by op kind: most operators in any tree don't match any injector, so the per-op // `for (injector <- injectors if injector.canInject(op))` walk was paying N*M canInject calls @@ -718,8 +747,16 @@ abstract class CometNativeExec extends CometExec { */ def foreachUntilCometInput(plan: SparkPlan)(func: SparkPlan => Unit): Unit = { plan match { - case _: CometNativeScanExec | _: CometScanExec | _: CometBatchScanExec | - _: CometIcebergNativeScanExec | _: CometCsvNativeScanExec | _: ShuffleQueryStageExec | + // Match `CometLeafExec` first so contrib leaf scans (e.g. the Delta + // contrib's `CometDeltaNativeScanExec`) are recognised as input boundaries + // without requiring a core compile-time reference to the contrib class. + // All built-in leaf scans (`CometNativeScanExec`, `CometIcebergNativeScanExec`, + // `CometCsvNativeScanExec`) also extend `CometLeafExec`, so this is a + // strict superset of the previous enumeration -- it just generalises the + // input-boundary concept from "this fixed list" to "any leaf Comet exec". + case _: CometLeafExec => + func(plan) + case _: CometScanExec | _: CometBatchScanExec | _: ShuffleQueryStageExec | _: AQEShuffleReadExec | _: CometShuffleExchangeExec | _: CometUnionExec | _: CometTakeOrderedAndProjectExec | _: CometCoalesceExec | _: ReusedExchangeExec | _: CometBroadcastExchangeExec | _: BroadcastQueryStageExec | @@ -786,11 +823,16 @@ abstract class CometNativeExec extends CometExec { (Map.empty, Map.empty) } - case nativeScan: CometNativeScanExec => - nativeScan.ensureSubqueriesResolved() - ( - Map(nativeScan.sourceKey -> nativeScan.commonData), - Map(nativeScan.sourceKey -> nativeScan.perPartitionData)) + // Generic path for leaf scans that surface planning data via the + // `CometScanWithPlanData` trait. Catches `CometNativeScanExec` and any contrib + // leaf scan (e.g. the Delta contrib's `CometDeltaNativeScanExec`) without + // requiring core to compile-time reference contrib classes. + case s: CometScanWithPlanData => + s match { + case leaf: CometLeafExec => leaf.ensureSubqueriesResolved() + case _ => // no DPP lifecycle to drive + } + (Map(s.sourceKey -> s.commonData), Map(s.sourceKey -> s.perPartitionData)) // Broadcast stages are boundaries - don't collect per-partition data from inside them. // After DPP filtering, broadcast scans may have different partition counts than the @@ -901,6 +943,43 @@ abstract class CometLeafExec extends CometNativeExec with LeafExecNode { } } +/** + * Marker trait for scan execs that surface planning data (a `commonData` block + per-partition + * task bytes keyed by `sourceKey`) so that a parent `CometNativeExec` can find and inject the + * data when the scan is fused into a larger native subtree. + * + * Implemented by `CometNativeScanExec` and the contrib's `CometDeltaNativeScanExec` -- without + * it, [[PlanDataInjector.findAllPlanData]] cannot collect the per-partition tasks and the + * parent's native execution receives an empty input. (`CometIcebergNativeScanExec` does NOT use + * this trait; it has a dedicated `findAllPlanData` case.) + * + * Each implementation also resolves its own DPP subqueries via `ensureSubqueriesResolved` + * (overridden from [[CometLeafExec]]) before `commonData`/`perPartitionData` are read. + */ +trait CometScanWithPlanData { + def sourceKey: String + def commonData: Array[Byte] + def perPartitionData: Array[Array[Byte]] + + // DPP / partition filters that may carry AQE SubqueryAdaptiveBroadcast + // subqueries needing rewrite by CometPlanAdaptiveDynamicPruningFilters. + // Default empty: scans with dedicated handling (CometNativeScanExec, + // CometIcebergNativeScanExec) don't use this path. + def dynamicPruningFilters: Seq[Expression] = Nil + + // Install rewritten DPP filters on this scan. Implementers whose filters live + // in a @transient field (which TreeNode.makeCopy can't carry, #3510) update + // them via a transient side-channel and return `this` -- so the optimizer + // rule's rewrite lands on the SAME instance that executes, instead of a copy + // that gets dropped when the enclosing native block is rebuilt. Only called + // when `dynamicPruningFilters` is non-empty, so the default is never reached + // for scans that leave it empty. + def withDynamicPruningFilters(filters: Seq[Expression]): SparkPlan = + throw new UnsupportedOperationException( + s"${getClass.getSimpleName} exposes dynamicPruningFilters but does not " + + "override withDynamicPruningFilters") +} + abstract class CometUnaryExec extends CometNativeExec with UnaryExecNode abstract class CometBinaryExec extends CometNativeExec with BinaryExecNode diff --git a/spark/src/test/scala/org/apache/spark/sql/comet/CometScanWithPlanDataSuite.scala b/spark/src/test/scala/org/apache/spark/sql/comet/CometScanWithPlanDataSuite.scala new file mode 100644 index 0000000000..f845713713 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/comet/CometScanWithPlanDataSuite.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.comet + +import org.scalatest.funsuite.AnyFunSuite + +import org.apache.spark.sql.catalyst.expressions.Expression + +/** + * Unit coverage for the core SPI that lets out-of-tree contrib leaf scans (e.g. the Delta + * contrib's `CometDeltaNativeScanExec`) participate in Comet native planning without core holding + * a compile-time reference to them: the [[CometScanWithPlanData]] trait contract and the + * reflective `DeltaPlanDataInjector` slot in [[PlanDataInjector]]'s registry. + * + * Deliberately does not exercise the per-op injector registry mechanics; that surface is owned by + * `PlanDataInjectorSuite`. + */ +class CometScanWithPlanDataSuite extends AnyFunSuite { + + /** Minimal trait implementation that opts into none of the optional DPP behaviour. */ + private class StubScan extends CometScanWithPlanData { + override def sourceKey: String = "stub-key" + override def commonData: Array[Byte] = Array.emptyByteArray + override def perPartitionData: Array[Array[Byte]] = Array.empty + } + + test( + "CometScanWithPlanData defaults: no DPP filters and withDynamicPruningFilters is unsupported") { + val scan = new StubScan + assert(scan.dynamicPruningFilters == Nil, "a scan must expose no DPP filters by default") + // The default exists only so the DPP rule never calls it for scans that leave + // dynamicPruningFilters empty; if a scan does expose filters it must override this. + val ex = intercept[UnsupportedOperationException] { + scan.withDynamicPruningFilters(Seq.empty[Expression]) + } + assert( + ex.getMessage.contains("withDynamicPruningFilters"), + s"default failure must name the un-overridden method, got: ${ex.getMessage}") + } + + test("contrib DeltaPlanDataInjector slot is absent in default builds") { + // PlanDataInjector appends a reflective `DeltaPlanDataInjector$` slot to its registry only + // when the contrib-delta build bundled it. A default build must not contain the class, so the + // reflective lookup degrades to a plain ClassNotFoundException (-> None) and the registry keeps + // only its built-in injectors. (That the registry still passes non-scan operator trees through + // unchanged is covered by `PlanDataInjectorSuite`, which exercises the post-registry behaviour + // directly; this suite asserts only the contract A.1 adds -- the slot's optional presence.) + val deltaSlotAbsent = + try { + // scalastyle:off classforname + Class.forName("org.apache.spark.sql.comet.DeltaPlanDataInjector$") + // scalastyle:on classforname + false + } catch { + case _: ClassNotFoundException => true + } + assert( + deltaSlotAbsent, + "default (non -Pcontrib-delta) build must not carry the contrib DeltaPlanDataInjector") + } +}