From 2ad3b6f72fde7ab220dd0bcdb37d89233205247c Mon Sep 17 00:00:00 2001 From: comphead Date: Mon, 30 Mar 2026 09:31:52 -0700 Subject: [PATCH 1/3] chore: `native_datafusion` to report scan task input metrics --- .../apache/spark/sql/comet/CometExecRDD.scala | 12 ++++ .../spark/sql/comet/CometMetricNode.scala | 11 ++++ .../sql/comet/CometTaskMetricsSuite.scala | 65 +++++++++++++++++++ 3 files changed, 88 insertions(+) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometExecRDD.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometExecRDD.scala index c5014818c4..5310b7c84a 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometExecRDD.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometExecRDD.scala @@ -139,6 +139,18 @@ private[spark] class CometExecRDD( ctx.addTaskCompletionListener[Unit] { _ => it.close() subqueries.foreach(sub => CometScalarSubquery.removeSubquery(it.id, sub)) + + // Propagate native scan metrics (bytes_scanned, output_rows) to Spark's task-level + // inputMetrics so they appear in the Spark UI "Input" column and are reported via + // the listener infrastructure. The native reader bypasses Hadoop's Java FileSystem, + // so thread-local FS statistics are never updated -- we bridge the gap here. + val bytesScannedMetric = nativeMetrics.findMetric("bytes_scanned") + val outputRowsMetric = nativeMetrics.findMetric("output_rows") + if (bytesScannedMetric.isDefined || outputRowsMetric.isDefined) { + val inputMetrics = ctx.taskMetrics().inputMetrics + bytesScannedMetric.foreach(m => inputMetrics.setBytesRead(m.value)) + outputRowsMetric.foreach(m => inputMetrics.setRecordsRead(m.value)) + } } } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala index 8c75df1d45..2867c54a45 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala @@ -79,10 +79,21 @@ case class CometMetricNode(metrics: Map[String, SQLMetric], children: Seq[CometM } } + // Called via JNI from `comet_metric_node.rs` def set_all_from_bytes(bytes: Array[Byte]): Unit = { val metricNode = Metric.NativeMetricNode.parseFrom(bytes) set_all(metricNode) } + + /** + * Finds a metric by name in this node or any descendant node. Returns the first match found via + * depth-first search. + */ + def findMetric(name: String): Option[SQLMetric] = { + metrics.get(name).orElse { + children.iterator.map(_.findMetric(name)).collectFirst { case Some(m) => m } + } + } } object CometMetricNode { diff --git a/spark/src/test/scala/org/apache/spark/sql/comet/CometTaskMetricsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/comet/CometTaskMetricsSuite.scala index 3946aab184..ec77dbc1fc 100644 --- a/spark/src/test/scala/org/apache/spark/sql/comet/CometTaskMetricsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/comet/CometTaskMetricsSuite.scala @@ -21,6 +21,7 @@ package org.apache.spark.sql.comet import scala.collection.mutable +import org.apache.spark.executor.InputMetrics import org.apache.spark.executor.ShuffleReadMetrics import org.apache.spark.executor.ShuffleWriteMetrics import org.apache.spark.scheduler.SparkListener @@ -30,6 +31,8 @@ import org.apache.spark.sql.comet.execution.shuffle.CometNativeShuffle import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.comet.CometConf + class CometTaskMetricsSuite extends CometTestBase with AdaptiveSparkPlanHelper { import testImplicits._ @@ -91,4 +94,66 @@ class CometTaskMetricsSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } } + + test("native_datafusion scan reports task-level input metrics matching Spark") { + withParquetTable((0 until 10000).map(i => (i, (i + 1).toLong)), "tbl") { + // Collect baseline input metrics from vanilla Spark (Comet disabled) + val (sparkBytes, sparkRecords) = collectInputMetrics(CometConf.COMET_ENABLED.key -> "false") + + // Collect input metrics from Comet native_datafusion scan + val (cometBytes, cometRecords) = collectInputMetrics( + CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION) + + // Records must match exactly + assert( + cometRecords == sparkRecords, + s"recordsRead mismatch: comet=$cometRecords, spark=$sparkRecords") + + // Bytes should be in the same ballpark -- both read the same Parquet file(s), + // but the exact byte count can differ due to reader implementation details + // (e.g. footer reads, page headers, buffering granularity). + assert(sparkBytes > 0, s"Spark bytesRead should be > 0, got $sparkBytes") + assert(cometBytes > 0, s"Comet bytesRead should be > 0, got $cometBytes") + val ratio = cometBytes.toDouble / sparkBytes.toDouble + assert( + ratio >= 0.85 && ratio <= 1.15, + s"bytesRead ratio out of range: comet=$cometBytes, spark=$sparkBytes, ratio=$ratio") + } + } + + /** + * Runs `SELECT * FROM tbl` with the given SQL config overrides and returns the aggregated + * (bytesRead, recordsRead) across all tasks. + */ + private def collectInputMetrics(confs: (String, String)*): (Long, Long) = { + val inputMetricsList = mutable.ArrayBuffer.empty[InputMetrics] + + val listener = new SparkListener { + override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { + val im = taskEnd.taskMetrics.inputMetrics + inputMetricsList.synchronized { + inputMetricsList += im + } + } + } + + spark.sparkContext.addSparkListener(listener) + try { + // Drain any earlier events + spark.sparkContext.listenerBus.waitUntilEmpty() + + withSQLConf(confs: _*) { + sql("SELECT * FROM tbl").collect() + } + + spark.sparkContext.listenerBus.waitUntilEmpty() + + assert(inputMetricsList.nonEmpty, s"No input metrics found for confs=$confs") + val totalBytes = inputMetricsList.map(_.bytesRead).sum + val totalRecords = inputMetricsList.map(_.recordsRead).sum + (totalBytes, totalRecords) + } finally { + spark.sparkContext.removeSparkListener(listener) + } + } } From e2c6093a3abca023a05df6c5aba850f806da634b Mon Sep 17 00:00:00 2001 From: comphead Date: Mon, 30 Mar 2026 09:54:18 -0700 Subject: [PATCH 2/3] chore: `native_datafusion` to report scan task input metrics --- .../org/apache/spark/sql/comet/CometTaskMetricsSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/comet/CometTaskMetricsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/comet/CometTaskMetricsSuite.scala index ec77dbc1fc..59d02512a0 100644 --- a/spark/src/test/scala/org/apache/spark/sql/comet/CometTaskMetricsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/comet/CometTaskMetricsSuite.scala @@ -116,7 +116,7 @@ class CometTaskMetricsSuite extends CometTestBase with AdaptiveSparkPlanHelper { assert(cometBytes > 0, s"Comet bytesRead should be > 0, got $cometBytes") val ratio = cometBytes.toDouble / sparkBytes.toDouble assert( - ratio >= 0.85 && ratio <= 1.15, + ratio >= 0.8 && ratio <= 1.2, s"bytesRead ratio out of range: comet=$cometBytes, spark=$sparkBytes, ratio=$ratio") } } From ac6b869680ed29f5212a4abef4e1bed0581320cb Mon Sep 17 00:00:00 2001 From: comphead Date: Mon, 30 Mar 2026 16:53:48 -0700 Subject: [PATCH 3/3] chore: `native_datafusion` to report scan task input metrics --- .../apache/spark/sql/comet/CometExecRDD.scala | 17 ++++++----------- .../spark/sql/comet/CometMetricNode.scala | 10 ---------- 2 files changed, 6 insertions(+), 21 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometExecRDD.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometExecRDD.scala index 5310b7c84a..c547b43d48 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometExecRDD.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometExecRDD.scala @@ -140,17 +140,12 @@ private[spark] class CometExecRDD( it.close() subqueries.foreach(sub => CometScalarSubquery.removeSubquery(it.id, sub)) - // Propagate native scan metrics (bytes_scanned, output_rows) to Spark's task-level - // inputMetrics so they appear in the Spark UI "Input" column and are reported via - // the listener infrastructure. The native reader bypasses Hadoop's Java FileSystem, - // so thread-local FS statistics are never updated -- we bridge the gap here. - val bytesScannedMetric = nativeMetrics.findMetric("bytes_scanned") - val outputRowsMetric = nativeMetrics.findMetric("output_rows") - if (bytesScannedMetric.isDefined || outputRowsMetric.isDefined) { - val inputMetrics = ctx.taskMetrics().inputMetrics - bytesScannedMetric.foreach(m => inputMetrics.setBytesRead(m.value)) - outputRowsMetric.foreach(m => inputMetrics.setRecordsRead(m.value)) - } + nativeMetrics.metrics + .get("bytes_scanned") + .foreach(m => ctx.taskMetrics().inputMetrics.setBytesRead(m.value)) + nativeMetrics.metrics + .get("output_rows") + .foreach(m => ctx.taskMetrics().inputMetrics.setRecordsRead(m.value)) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala index 2867c54a45..7883775c80 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala @@ -84,16 +84,6 @@ case class CometMetricNode(metrics: Map[String, SQLMetric], children: Seq[CometM val metricNode = Metric.NativeMetricNode.parseFrom(bytes) set_all(metricNode) } - - /** - * Finds a metric by name in this node or any descendant node. Returns the first match found via - * depth-first search. - */ - def findMetric(name: String): Option[SQLMetric] = { - metrics.get(name).orElse { - children.iterator.map(_.findMetric(name)).collectFirst { case Some(m) => m } - } - } } object CometMetricNode {