Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/pr_build_linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ jobs:
org.apache.spark.sql.comet.CometShuffleFallbackStickinessSuite
org.apache.spark.sql.comet.PlanDataInjectorSuite
org.apache.spark.sql.comet.CometDecimalArithmeticViewSuite
org.apache.spark.sql.comet.CometScanWithPlanDataSuite
org.apache.spark.sql.comet.util.UtilsSuite
org.apache.comet.objectstore.NativeConfigSuite
org.apache.spark.sql.CometToPrettyStringSuite
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/pr_build_macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ jobs:
org.apache.spark.sql.comet.CometShuffleFallbackStickinessSuite
org.apache.spark.sql.comet.PlanDataInjectorSuite
org.apache.spark.sql.comet.CometDecimalArithmeticViewSuite
org.apache.spark.sql.comet.CometScanWithPlanDataSuite
org.apache.spark.sql.comet.util.UtilsSuite
org.apache.comet.objectstore.NativeConfigSuite
org.apache.spark.sql.CometToPrettyStringSuite
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,15 @@ case object CometPlanAdaptiveDynamicPruningFilters
if icebergScan.runtimeFilters.exists(hasCometSAB) =>
logDebug("Converting AQE DPP for CometIcebergNativeScanExec")
convertIcebergScanDPP(icebergScan, plan)
// Comet scans whose DPP filters live in a @transient field (the contrib's
// CometDeltaNativeScanExec). transformExpressions/makeCopy can't rewrite
// them, and a rewritten copy is orphaned when the enclosing native block
// is rebuilt (#3510). The scan's `withDynamicPruningFilters` installs the
// rewrite in place and returns `this`, so it lands on the executing
// instance.
case p: CometScanWithPlanData if p.dynamicPruningFilters.exists(hasCometSAB) =>
logDebug(s"Converting AQE DPP for ${p.getClass.getSimpleName} in place")
p.withDynamicPruningFilters(p.dynamicPruningFilters.map(f => convertFilter(f, plan)))
case p: SparkPlan
if !p.isInstanceOf[CometNativeScanExec]
&& !p.isInstanceOf[CometIcebergNativeScanExec]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ case class CometNativeScanExec(
sourceKey: String) // Key for PlanDataInjector to match common+partition data at runtime
extends CometLeafExec
with DataSourceScanExec
with ShimStreamSourceAwareSparkPlan {
with ShimStreamSourceAwareSparkPlan
with CometScanWithPlanData {

override lazy val metadata: Map[String, String] =
if (originalPlan != null) originalPlan.metadata else Map.empty
Expand Down
109 changes: 94 additions & 15 deletions spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import scala.jdk.CollectionConverters._

import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeSet, Expression, ExpressionSet, Generator, NamedExpression, SortOrder}
Expand Down Expand Up @@ -87,14 +88,42 @@ private[comet] trait PlanDataInjector {
/**
* Registry and utilities for injecting per-partition planning data into operator trees.
*/
private[comet] object PlanDataInjector {

// Registry of injectors for different operator types
private val injectors: Seq[PlanDataInjector] = Seq(
IcebergPlanDataInjector,
NativeScanPlanDataInjector
// Future: DeltaPlanDataInjector, HudiPlanDataInjector, etc.
)
private[comet] object PlanDataInjector extends Logging {

// Registry of injectors for different operator types. The contrib/delta integration's
// DeltaPlanDataInjector is appended via one reflective class lookup -- present only when
// the contrib was bundled (i.e. -Pcontrib-delta on the Maven build). Default builds get
// the empty Option and an unmodified injectors list, so there's zero contrib surface at
// runtime on default builds.
private val injectors: Seq[PlanDataInjector] = {
val builtin: Seq[PlanDataInjector] = Seq(IcebergPlanDataInjector, NativeScanPlanDataInjector)
val deltaOpt: Option[PlanDataInjector] =
try {
// Scala compiles `object Foo` into BOTH `Foo.class` (a static-forwarder
// class with no MODULE$ field) AND `Foo$.class` (the module class that
// does have MODULE$). The trailing `$` selects the module class.
// scalastyle:off classforname
val cls = Class.forName("org.apache.spark.sql.comet.DeltaPlanDataInjector$")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would require every other contrib to have to add their own block of code. Consider ServiceLoader (META-INF/services/...PlanDataInjector) so contribs are discoverable without modifying core.

// scalastyle:on classforname
Some(cls.getField("MODULE$").get(null).asInstanceOf[PlanDataInjector])
} catch {
// Default builds (no -Pcontrib-delta) won't have the class -> silent None.
// This is the only EXPECTED miss, so it's the only quiet one.
case _: ClassNotFoundException => None
// The class IS on the classpath but couldn't be bound: missing MODULE$,
// access drift (NoSuchField/IllegalAccess), an initializer/linkage error,
// or a CCE on the PlanDataInjector cast. That's a misbuilt contrib jar, not
// a default build -- warn so it's diagnosable, then still decline so the
// rest of the planner stays alive.
case e: Throwable =>

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe use scala.util.control.NonFatal instead?

logWarning(
"Found org.apache.spark.sql.comet.DeltaPlanDataInjector$ on classpath " +
"but failed to load it; skipping contrib-delta plan-data injection",
e)
None
}
builtin ++ deltaOpt
}

// O(1) lookup by op kind: most operators in any tree don't match any injector, so the per-op
// `for (injector <- injectors if injector.canInject(op))` walk was paying N*M canInject calls
Expand Down Expand Up @@ -718,8 +747,16 @@ abstract class CometNativeExec extends CometExec {
*/
def foreachUntilCometInput(plan: SparkPlan)(func: SparkPlan => Unit): Unit = {
plan match {
case _: CometNativeScanExec | _: CometScanExec | _: CometBatchScanExec |
_: CometIcebergNativeScanExec | _: CometCsvNativeScanExec | _: ShuffleQueryStageExec |
// Match `CometLeafExec` first so contrib leaf scans (e.g. the Delta
// contrib's `CometDeltaNativeScanExec`) are recognised as input boundaries
// without requiring a core compile-time reference to the contrib class.
// All built-in leaf scans (`CometNativeScanExec`, `CometIcebergNativeScanExec`,
// `CometCsvNativeScanExec`) also extend `CometLeafExec`, so this is a
// strict superset of the previous enumeration -- it just generalises the
// input-boundary concept from "this fixed list" to "any leaf Comet exec".
case _: CometLeafExec =>
func(plan)
case _: CometScanExec | _: CometBatchScanExec | _: ShuffleQueryStageExec |
_: AQEShuffleReadExec | _: CometShuffleExchangeExec | _: CometUnionExec |
_: CometTakeOrderedAndProjectExec | _: CometCoalesceExec | _: ReusedExchangeExec |
_: CometBroadcastExchangeExec | _: BroadcastQueryStageExec |
Expand Down Expand Up @@ -786,11 +823,16 @@ abstract class CometNativeExec extends CometExec {
(Map.empty, Map.empty)
}

case nativeScan: CometNativeScanExec =>
nativeScan.ensureSubqueriesResolved()
(
Map(nativeScan.sourceKey -> nativeScan.commonData),
Map(nativeScan.sourceKey -> nativeScan.perPartitionData))
// Generic path for leaf scans that surface planning data via the
// `CometScanWithPlanData` trait. Catches `CometNativeScanExec` and any contrib
// leaf scan (e.g. the Delta contrib's `CometDeltaNativeScanExec`) without
// requiring core to compile-time reference contrib classes.
case s: CometScanWithPlanData =>
s match {
case leaf: CometLeafExec => leaf.ensureSubqueriesResolved()
case _ => // no DPP lifecycle to drive

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The iceberg path handles if (commonData.empty && perPartitionData.empty) by returning (Map.empty, Map.empty) but this path doesn't. An implementation returning an empty array will probably cause an exception downstream. Better to handle it here

}
(Map(s.sourceKey -> s.commonData), Map(s.sourceKey -> s.perPartitionData))

// Broadcast stages are boundaries - don't collect per-partition data from inside them.
// After DPP filtering, broadcast scans may have different partition counts than the
Expand Down Expand Up @@ -901,6 +943,43 @@ abstract class CometLeafExec extends CometNativeExec with LeafExecNode {
}
}

/**
* Marker trait for scan execs that surface planning data (a `commonData` block + per-partition
* task bytes keyed by `sourceKey`) so that a parent `CometNativeExec` can find and inject the
* data when the scan is fused into a larger native subtree.
*
* Implemented by `CometNativeScanExec` and the contrib's `CometDeltaNativeScanExec` -- without
* it, [[PlanDataInjector.findAllPlanData]] cannot collect the per-partition tasks and the
* parent's native execution receives an empty input. (`CometIcebergNativeScanExec` does NOT use
* this trait; it has a dedicated `findAllPlanData` case.)
*
* Each implementation also resolves its own DPP subqueries via `ensureSubqueriesResolved`
* (overridden from [[CometLeafExec]]) before `commonData`/`perPartitionData` are read.
*/
trait CometScanWithPlanData {
def sourceKey: String
def commonData: Array[Byte]
def perPartitionData: Array[Array[Byte]]

// DPP / partition filters that may carry AQE SubqueryAdaptiveBroadcast
// subqueries needing rewrite by CometPlanAdaptiveDynamicPruningFilters.
// Default empty: scans with dedicated handling (CometNativeScanExec,
// CometIcebergNativeScanExec) don't use this path.
def dynamicPruningFilters: Seq[Expression] = Nil

// Install rewritten DPP filters on this scan. Implementers whose filters live
// in a @transient field (which TreeNode.makeCopy can't carry, #3510) update
// them via a transient side-channel and return `this` -- so the optimizer
// rule's rewrite lands on the SAME instance that executes, instead of a copy
// that gets dropped when the enclosing native block is rebuilt. Only called
// when `dynamicPruningFilters` is non-empty, so the default is never reached
// for scans that leave it empty.
def withDynamicPruningFilters(filters: Seq[Expression]): SparkPlan =
throw new UnsupportedOperationException(
s"${getClass.getSimpleName} exposes dynamicPruningFilters but does not " +
"override withDynamicPruningFilters")
}

abstract class CometUnaryExec extends CometNativeExec with UnaryExecNode

abstract class CometBinaryExec extends CometNativeExec with BinaryExecNode
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.comet

import org.scalatest.funsuite.AnyFunSuite

import org.apache.spark.sql.catalyst.expressions.Expression

/**
* Unit coverage for the core SPI that lets out-of-tree contrib leaf scans (e.g. the Delta
* contrib's `CometDeltaNativeScanExec`) participate in Comet native planning without core holding
* a compile-time reference to them: the [[CometScanWithPlanData]] trait contract and the
* reflective `DeltaPlanDataInjector` slot in [[PlanDataInjector]]'s registry.
*
* Deliberately does not exercise the per-op injector registry mechanics; that surface is owned by
* `PlanDataInjectorSuite`.
*/
class CometScanWithPlanDataSuite extends AnyFunSuite {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This really needs a test that implements a stub/mock CometLeafExec with CometScanWithPlanData wired through findAllPlanData. That would confirm the new code path works independently of the existing scan classes.


/** Minimal trait implementation that opts into none of the optional DPP behaviour. */
private class StubScan extends CometScanWithPlanData {
override def sourceKey: String = "stub-key"
override def commonData: Array[Byte] = Array.emptyByteArray
override def perPartitionData: Array[Array[Byte]] = Array.empty
}

test(
"CometScanWithPlanData defaults: no DPP filters and withDynamicPruningFilters is unsupported") {
val scan = new StubScan
assert(scan.dynamicPruningFilters == Nil, "a scan must expose no DPP filters by default")
// The default exists only so the DPP rule never calls it for scans that leave
// dynamicPruningFilters empty; if a scan does expose filters it must override this.
val ex = intercept[UnsupportedOperationException] {
scan.withDynamicPruningFilters(Seq.empty[Expression])
}
assert(
ex.getMessage.contains("withDynamicPruningFilters"),
s"default failure must name the un-overridden method, got: ${ex.getMessage}")
}

test("contrib DeltaPlanDataInjector slot is absent in default builds") {
// PlanDataInjector appends a reflective `DeltaPlanDataInjector$` slot to its registry only
// when the contrib-delta build bundled it. A default build must not contain the class, so the
// reflective lookup degrades to a plain ClassNotFoundException (-> None) and the registry keeps
// only its built-in injectors. (That the registry still passes non-scan operator trees through
// unchanged is covered by `PlanDataInjectorSuite`, which exercises the post-registry behaviour
// directly; this suite asserts only the contract A.1 adds -- the slot's optional presence.)
val deltaSlotAbsent =
try {
// scalastyle:off classforname
Class.forName("org.apache.spark.sql.comet.DeltaPlanDataInjector$")
// scalastyle:on classforname
false
} catch {
case _: ClassNotFoundException => true
}
assert(
deltaSlotAbsent,
"default (non -Pcontrib-delta) build must not carry the contrib DeltaPlanDataInjector")
}
}
Loading