Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dev/ensure-jars-have-correct-contents.sh
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ allowed_expr+="|^org/apache/spark/sql/$"
allowed_expr+="|^org/apache/spark/sql/ExtendedExplainGenerator.*$"
allowed_expr+="|^org/apache/spark/CometPlugin.class$"
allowed_expr+="|^org/apache/spark/CometDriverPlugin.*$"
allowed_expr+="|^org/apache/spark/CometSource.*$"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The regex CometSource.*$ allows any character after CometSource, but it seems like only the .class extension is expected. It is better to be more specific to avoid unexpected inclusions.

Suggested change
allowed_expr+="|^org/apache/spark/CometSource.*$"
allowed_expr+="|^org/apache/spark/CometSource.class$"

allowed_expr+="|^org/apache/spark/CometTaskMemoryManager.class$"
allowed_expr+="|^org/apache/spark/CometTaskMemoryManager.*$"
allowed_expr+="|^scala-collection-compat.properties$"
Expand Down
19 changes: 19 additions & 0 deletions spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,25 @@ class CometCoverageStats {
}
}

object CometCoverageStats {

/**
* Compute coverage stats for a plan without generating explain string.
*/
def forPlan(plan: SparkPlan): CometCoverageStats = {
val stats = new CometCoverageStats()
val explainInfo = new ExtendedExplainInfo()
explainInfo.generateTreeString(
CometExplainInfo.getActualPlan(plan),
0,
Seq(),
0,
new StringBuilder(),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Despite the Scaladoc, this still builds the full tree output into a StringBuilder via generateTreeString, which could allocate a large buffer on every planning pass. If the intent is stats-only, consider a traversal that updates CometCoverageStats without appending to a string.

Severity: medium

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

stats)
stats
}
}

object CometExplainInfo {
val EXTENSION_INFO = new TreeNodeTag[Set[String]]("CometExtensionInfo")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package org.apache.comet.rules

import scala.collection.mutable.ListBuffer

import org.apache.spark.CometSource
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{Divide, DoubleLiteral, EqualNullSafe, EqualTo, Expression, FloatLiteral, GreaterThan, GreaterThanOrEqual, KnownFloatingPointNormalized, LessThan, LessThanOrEqual, NamedExpression, Remainder}
import org.apache.spark.sql.catalyst.optimizer.NormalizeNaNAndZero
Expand All @@ -47,7 +48,7 @@ import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

import org.apache.comet.{CometConf, CometExplainInfo, ExtendedExplainInfo}
import org.apache.comet.{CometConf, CometCoverageStats, CometExplainInfo, ExtendedExplainInfo}
import org.apache.comet.CometConf.{COMET_SPARK_TO_ARROW_ENABLED, COMET_SPARK_TO_ARROW_SUPPORTED_OPERATOR_LIST}
import org.apache.comet.CometSparkSessionExtensions._
import org.apache.comet.rules.CometExecRule.allExecs
Expand Down Expand Up @@ -389,6 +390,10 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {

var newPlan = transform(planWithJoinRewritten)

// Record coverage stats for metrics
val stats = CometCoverageStats.forPlan(newPlan)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since CometExecRule is injected both as a QueryStagePrepRule and as a columnar preColumnarTransitions rule, this recordStats call may run multiple times per query/stage and inflate queries.planned/operator counters. Consider guarding so stats are recorded once per final plan (or only when the rule actually changes the plan).

Severity: medium

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

CometSource.recordStats(stats)
Comment on lines +394 to +395
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

It's important to ensure that CometSource.recordStats(stats) is called only when newPlan is actually a Comet plan. Otherwise, the metrics might be misleading, as they would include Spark operators as well.

Suggested change
val stats = CometCoverageStats.forPlan(newPlan)
CometSource.recordStats(stats)
if (newPlan.isInstanceOf[CometPlan]) {
val stats = CometCoverageStats.forPlan(newPlan)
CometSource.recordStats(stats)
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Metrics inflated by multiple rule invocations with AQE

Medium Severity

CometExecRule is registered both via injectColumnar (as preColumnarTransitions) and via injectQueryStagePrepRule. With AQE enabled (the default), _apply is invoked multiple times per query — once from each injection point during re-optimization cycles. Each invocation calls CometSource.recordStats, inflating all counters including QUERIES_PLANNED, NATIVE_OPERATORS, and SPARK_OPERATORS. The acceleration.ratio gauge may also be skewed since operator counts from different plan stages have different compositions.

Fix in Cursor Fix in Web

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stats captured before temporary placeholder nodes are removed

Low Severity

CometCoverageStats.forPlan(newPlan) is called before CometScanWrapper and CometSinkPlaceHolder are removed from the plan. Both extend CometPlan, so they are counted as cometOperators. When removed, CometScanWrapper is replaced by its wrapped Spark originalPlan (which was never counted), and CometSinkPlaceHolder is replaced by its already-counted child. This inflates cometOperators and deflates sparkOperators relative to the final plan.

Fix in Cursor Fix in Web


// if the plan cannot be run fully natively then explain why (when appropriate
// config is enabled)
if (CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.get()) {
Expand Down
62 changes: 62 additions & 0 deletions spark/src/main/scala/org/apache/spark/CometSource.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark

import org.apache.spark.metrics.source.Source

import com.codahale.metrics.{Counter, Gauge, MetricRegistry}

import org.apache.comet.CometCoverageStats

/**
* Exposes following metrics (hooked from CometCoverageStats)
* - operators.native: Total operators executed natively
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These metrics are derived from CometCoverageStats over the planned SparkPlan, so phrasing like “operators executed natively” / “fell back” reads like runtime execution and may be misleading. Consider clarifying the doc/metric semantics (planned operators vs executed operators).

Severity: low

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

* - operators.spark: Total operators that fell back to Spark
* - queries.planned: Total queries processed
* - transitions: Total Spark-to-Comet transitions
* - acceleration.ratio: native / (native + spark)
*/
object CometSource extends Source {
override val sourceName = "comet"
override val metricRegistry = new MetricRegistry()

val NATIVE_OPERATORS: Counter =
metricRegistry.counter(MetricRegistry.name("operators", "native"))
val SPARK_OPERATORS = metricRegistry.counter(MetricRegistry.name("operators", "spark"))
val QUERIES_PLANNED = metricRegistry.counter(MetricRegistry.name("queries", "planned"))
val TRANSITIONS = metricRegistry.counter(MetricRegistry.name("transitions"))

metricRegistry.register(
MetricRegistry.name("acceleration", "ratio"),
new Gauge[Double] {
override def getValue: Double = {
val native = NATIVE_OPERATORS.getCount
val total = native + SPARK_OPERATORS.getCount
if (total > 0) native.toDouble / total else 0.0
}
})

def recordStats(stats: CometCoverageStats): Unit = {
NATIVE_OPERATORS.inc(stats.cometOperators)
SPARK_OPERATORS.inc(stats.sparkOperators)
TRANSITIONS.inc(stats.transitions)
QUERIES_PLANNED.inc()
}
}
3 changes: 3 additions & 0 deletions spark/src/main/scala/org/apache/spark/Plugins.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ class CometDriverPlugin extends DriverPlugin with Logging with ShimCometDriverPl
// register CometSparkSessionExtensions if it isn't already registered
CometDriverPlugin.registerCometSessionExtension(sc.conf)

// Register Comet metrics source
sc.env.metricsSystem.registerSource(CometSource)

if (CometSparkSessionExtensions.shouldOverrideMemoryConf(sc.getConf)) {
val execMemOverhead = if (sc.getConf.contains(EXECUTOR_MEMORY_OVERHEAD.key)) {
sc.getConf.getSizeAsMb(EXECUTOR_MEMORY_OVERHEAD.key)
Expand Down
22 changes: 21 additions & 1 deletion spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

package org.apache.spark

import org.apache.spark.sql.CometTestBase
import java.io.File

import org.apache.spark.sql.{CometTestBase, SaveMode}
import org.apache.spark.sql.internal.StaticSQLConf

class CometPluginsSuite extends CometTestBase {
Expand Down Expand Up @@ -77,6 +79,24 @@ class CometPluginsSuite extends CometTestBase {
}
}

test("CometSource metrics are recorded") {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test only checks the in-process CometSource counters, so it can pass even if CometSource isn’t actually registered with sc.env.metricsSystem (i.e., external monitoring can’t see it). Consider asserting the metrics registry/source contains the expected names to validate the exposure path.

Severity: low

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

val nativeBefore = CometSource.NATIVE_OPERATORS.getCount
val queriesBefore = CometSource.QUERIES_PLANNED.getCount

withTempPath { dir =>
val path = new File(dir, "test.parquet").toString
spark.range(1000).toDF("id").write.mode(SaveMode.Overwrite).parquet(path)
spark.read.parquet(path).filter("id > 500").collect()
}

assert(
CometSource.QUERIES_PLANNED.getCount > queriesBefore,
"queries.planned should increment after query")
assert(
CometSource.NATIVE_OPERATORS.getCount > nativeBefore,
"operators.native should increment for native execution")
}

test("Default Comet memory overhead") {
val execMemOverhead1 = spark.conf.get("spark.executor.memoryOverhead")
val execMemOverhead2 = spark.sessionState.conf.getConfString("spark.executor.memoryOverhead")
Expand Down
Loading