From c0eb149a45930fe0700e73ccfea3bd8dcacdefd9 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Sun, 15 Mar 2026 14:27:01 -0700 Subject: [PATCH] project_comet_metrics --- dev/ensure-jars-have-correct-contents.sh | 1 + .../apache/comet/ExtendedExplainInfo.scala | 19 ++++++ .../apache/comet/rules/CometExecRule.scala | 7 ++- .../scala/org/apache/spark/CometSource.scala | 62 +++++++++++++++++++ .../main/scala/org/apache/spark/Plugins.scala | 3 + .../org/apache/spark/CometPluginsSuite.scala | 22 ++++++- 6 files changed, 112 insertions(+), 2 deletions(-) create mode 100644 spark/src/main/scala/org/apache/spark/CometSource.scala diff --git a/dev/ensure-jars-have-correct-contents.sh b/dev/ensure-jars-have-correct-contents.sh index 570aeabb2b..084936475d 100755 --- a/dev/ensure-jars-have-correct-contents.sh +++ b/dev/ensure-jars-have-correct-contents.sh @@ -93,6 +93,7 @@ allowed_expr+="|^org/apache/spark/sql/$" allowed_expr+="|^org/apache/spark/sql/ExtendedExplainGenerator.*$" allowed_expr+="|^org/apache/spark/CometPlugin.class$" allowed_expr+="|^org/apache/spark/CometDriverPlugin.*$" +allowed_expr+="|^org/apache/spark/CometSource.*$" allowed_expr+="|^org/apache/spark/CometTaskMemoryManager.class$" allowed_expr+="|^org/apache/spark/CometTaskMemoryManager.*$" allowed_expr+="|^scala-collection-compat.properties$" diff --git a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala index f47428e801..d30a1fe788 100644 --- a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala +++ b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala @@ -192,6 +192,25 @@ class CometCoverageStats { } } +object CometCoverageStats { + + /** + * Compute coverage stats for a plan without generating explain string. + */ + def forPlan(plan: SparkPlan): CometCoverageStats = { + val stats = new CometCoverageStats() + val explainInfo = new ExtendedExplainInfo() + explainInfo.generateTreeString( + CometExplainInfo.getActualPlan(plan), + 0, + Seq(), + 0, + new StringBuilder(), + stats) + stats + } +} + object CometExplainInfo { val EXTENSION_INFO = new TreeNodeTag[Set[String]]("CometExtensionInfo") diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala index 76e741e3bf..7f89935724 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala @@ -21,6 +21,7 @@ package org.apache.comet.rules import scala.collection.mutable.ListBuffer +import org.apache.spark.CometSource import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.{Divide, DoubleLiteral, EqualNullSafe, EqualTo, Expression, FloatLiteral, GreaterThan, GreaterThanOrEqual, KnownFloatingPointNormalized, LessThan, LessThanOrEqual, NamedExpression, Remainder} import org.apache.spark.sql.catalyst.optimizer.NormalizeNaNAndZero @@ -47,7 +48,7 @@ import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ -import org.apache.comet.{CometConf, CometExplainInfo, ExtendedExplainInfo} +import org.apache.comet.{CometConf, CometCoverageStats, CometExplainInfo, ExtendedExplainInfo} import org.apache.comet.CometConf.{COMET_SPARK_TO_ARROW_ENABLED, COMET_SPARK_TO_ARROW_SUPPORTED_OPERATOR_LIST} import org.apache.comet.CometSparkSessionExtensions._ import org.apache.comet.rules.CometExecRule.allExecs @@ -389,6 +390,10 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { var newPlan = transform(planWithJoinRewritten) + // Record coverage stats for metrics + val stats = CometCoverageStats.forPlan(newPlan) + CometSource.recordStats(stats) + // if the plan cannot be run fully natively then explain why (when appropriate // config is enabled) if (CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.get()) { diff --git a/spark/src/main/scala/org/apache/spark/CometSource.scala b/spark/src/main/scala/org/apache/spark/CometSource.scala new file mode 100644 index 0000000000..e243b48d0f --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/CometSource.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark + +import org.apache.spark.metrics.source.Source + +import com.codahale.metrics.{Counter, Gauge, MetricRegistry} + +import org.apache.comet.CometCoverageStats + +/** + * Exposes following metrics (hooked from CometCoverageStats) + * - operators.native: Total operators executed natively + * - operators.spark: Total operators that fell back to Spark + * - queries.planned: Total queries processed + * - transitions: Total Spark-to-Comet transitions + * - acceleration.ratio: native / (native + spark) + */ +object CometSource extends Source { + override val sourceName = "comet" + override val metricRegistry = new MetricRegistry() + + val NATIVE_OPERATORS: Counter = + metricRegistry.counter(MetricRegistry.name("operators", "native")) + val SPARK_OPERATORS = metricRegistry.counter(MetricRegistry.name("operators", "spark")) + val QUERIES_PLANNED = metricRegistry.counter(MetricRegistry.name("queries", "planned")) + val TRANSITIONS = metricRegistry.counter(MetricRegistry.name("transitions")) + + metricRegistry.register( + MetricRegistry.name("acceleration", "ratio"), + new Gauge[Double] { + override def getValue: Double = { + val native = NATIVE_OPERATORS.getCount + val total = native + SPARK_OPERATORS.getCount + if (total > 0) native.toDouble / total else 0.0 + } + }) + + def recordStats(stats: CometCoverageStats): Unit = { + NATIVE_OPERATORS.inc(stats.cometOperators) + SPARK_OPERATORS.inc(stats.sparkOperators) + TRANSITIONS.inc(stats.transitions) + QUERIES_PLANNED.inc() + } +} diff --git a/spark/src/main/scala/org/apache/spark/Plugins.scala b/spark/src/main/scala/org/apache/spark/Plugins.scala index 2529f08cfb..027acf257d 100644 --- a/spark/src/main/scala/org/apache/spark/Plugins.scala +++ b/spark/src/main/scala/org/apache/spark/Plugins.scala @@ -57,6 +57,9 @@ class CometDriverPlugin extends DriverPlugin with Logging with ShimCometDriverPl // register CometSparkSessionExtensions if it isn't already registered CometDriverPlugin.registerCometSessionExtension(sc.conf) + // Register Comet metrics source + sc.env.metricsSystem.registerSource(CometSource) + if (CometSparkSessionExtensions.shouldOverrideMemoryConf(sc.getConf)) { val execMemOverhead = if (sc.getConf.contains(EXECUTOR_MEMORY_OVERHEAD.key)) { sc.getConf.getSizeAsMb(EXECUTOR_MEMORY_OVERHEAD.key) diff --git a/spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala b/spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala index c493b22f79..20ed9c1449 100644 --- a/spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala @@ -19,7 +19,9 @@ package org.apache.spark -import org.apache.spark.sql.CometTestBase +import java.io.File + +import org.apache.spark.sql.{CometTestBase, SaveMode} import org.apache.spark.sql.internal.StaticSQLConf class CometPluginsSuite extends CometTestBase { @@ -77,6 +79,24 @@ class CometPluginsSuite extends CometTestBase { } } + test("CometSource metrics are recorded") { + val nativeBefore = CometSource.NATIVE_OPERATORS.getCount + val queriesBefore = CometSource.QUERIES_PLANNED.getCount + + withTempPath { dir => + val path = new File(dir, "test.parquet").toString + spark.range(1000).toDF("id").write.mode(SaveMode.Overwrite).parquet(path) + spark.read.parquet(path).filter("id > 500").collect() + } + + assert( + CometSource.QUERIES_PLANNED.getCount > queriesBefore, + "queries.planned should increment after query") + assert( + CometSource.NATIVE_OPERATORS.getCount > nativeBefore, + "operators.native should increment for native execution") + } + test("Default Comet memory overhead") { val execMemOverhead1 = spark.conf.get("spark.executor.memoryOverhead") val execMemOverhead2 = spark.sessionState.conf.getConfString("spark.executor.memoryOverhead")