Skip to content

3842: chore: native_datafusion to report scan task input metrics#51

Open
martin-augment wants to merge 3 commits intomainfrom
pr-3842-2026-03-31-05-59-23
Open

3842: chore: native_datafusion to report scan task input metrics#51
martin-augment wants to merge 3 commits intomainfrom
pr-3842-2026-03-31-05-59-23

Conversation

@martin-augment
Copy link
Copy Markdown
Owner

3842: To review by AI

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Mar 31, 2026

Walkthrough

This change integrates native Comet execution metrics into Spark's task metrics system. It modifies CometExecRDD.compute to report bytes_scanned and output_rows metrics to Spark's task metrics upon task completion. A new public method set_all_from_bytes is added to CometMetricNode to deserialize native metric nodes from byte arrays for JNI communication. A test suite is introduced to verify that task-level input metrics (bytes read and records read) are properly reported and consistent between Spark and Comet execution modes.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch pr-3842-2026-03-31-05-59-23

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements the reporting of native task-level input metrics, specifically bytes read and records read, from Comet to Spark. While this is a valuable addition, the current implementation only retrieves metrics from the root node of the native metrics tree. This leads to incorrect reporting in queries involving filters or projections where the scan node is not the root. Feedback suggests traversing the metrics tree to aggregate these values and adding more comprehensive test cases, such as queries with filters, to verify metric accuracy in non-trivial execution plans.

Comment on lines +143 to +148
nativeMetrics.metrics
.get("bytes_scanned")
.foreach(m => ctx.taskMetrics().inputMetrics.setBytesRead(m.value))
nativeMetrics.metrics
.get("output_rows")
.foreach(m => ctx.taskMetrics().inputMetrics.setRecordsRead(m.value))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current implementation only retrieves metrics from the root node of the nativeMetrics tree. In most Comet execution plans (e.g., Scan -> Filter -> Project), the root node will be an operator like Project or Filter, which does not contain the bytes_scanned metric. As a result, bytesRead will be reported as 0 in the Spark UI for these queries. Furthermore, output_rows at the root node reflects the final result count, which may not match the number of records read from the source if a filter was applied.

To correctly report task-level input metrics, you should traverse the nativeMetrics tree and aggregate bytes_scanned and output_rows from all nodes that represent a scan (typically identified by the presence of the bytes_scanned metric). Consider adding a recursive helper method to CometMetricNode to perform this aggregation.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback: The Gemini AI reviewer is correct! Being executed in CometExecRDD this metrics collection is executed for all kinds of nodes, not just for the Scan ones. It would be better to move this logic to CometNativeScanExec#doExecuteColumnar(). This way it will collect only the Scan related metrics.

}
}

test("native_datafusion scan reports task-level input metrics matching Spark") {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This test case only covers a simple scan where the scan node is the root of the native plan. To ensure that input metrics are correctly reported in more realistic scenarios, consider adding a test case that includes a filter (e.g., sql("SELECT * FROM tbl WHERE _1 > 5000")). This will verify that metrics are correctly aggregated even when the scan node is not the root of the execution tree.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback: The Gemini AI reviewer is correct! Using a more complex SQL query (e.g. with a Filter) will expose the problem that the collection of the metrics is done on all CometExecRDDs. It should be done only for the Scan nodes, i.e. the ones created by CometNativeScanExec#doExecuteColumnar().

@claude
Copy link
Copy Markdown

claude bot commented Mar 31, 2026

Review: native_datafusion scan task input metrics

Thanks for this! Propagating bytes_scanned and output_rows into Spark's inputMetrics at task-completion time is a clean approach, and the test structure (listener + listenerBus.waitUntilEmpty()) follows the existing shuffle metrics test nicely. The JNI comment on set_all_from_bytes is a helpful addition.

I have a few observations worth thinking through.


Potential recordsRead issue in multi-operator plans

The task completion listener in CometExecRDD.compute() runs for every CometExecRDD, not only ones created by CometNativeScanExec. For a query with a Comet operator above the scan (e.g. SELECT * FROM tbl WHERE complex_expr(col)), the entire pipeline gets compiled into a single CometExecRDD via CometExec.doExecuteColumnar(). In that case:

  • The root nativeMetrics is the filter/sort operator's metric node.
  • nativeMetrics.metrics.get("bytes_scanned") returns None → no-op (correct).
  • nativeMetrics.metrics.get("output_rows") returns Some(...) because CometExec uses baselineMetrics, which includes output_rowssetRecordsRead is called with the operator's output rows (filtered/sorted rows), not the scan's rows.

For recordsRead, Spark's semantics are "records read from storage", so using the filter's output count would under-report. Would it be worth guarding this with something like a flag or by scoping the metric propagation to CometNativeScanExec.doExecuteColumnar() directly rather than inside CometExecRDD.compute()?


Test coverage for the multi-operator case

The test covers SELECT * FROM tbl (pure scan, no operators above). It might be worth adding a variant with a predicate that isn't pushed all the way into the Parquet row-group filter, to verify the recordsRead value is still meaningful in that case (or at least doesn't become misleading).


Byte tolerance comment

The 20% tolerance window (ratio >= 0.8 && ratio <= 1.2) is reasonable, but a short note in the comment about what can cause the divergence (e.g. DataFusion reads column chunks differently from Spark's row-group reader, footer re-reads, buffering) would help future readers understand why a loose bound is intentional rather than accidental.


Minor: setBytesRead vs incBytesRead

setBytesRead overwrites any previously accumulated value. For the current architecture this is fine because bytes_scanned only appears in the root metrics of a scan-only CometExecRDD. Worth a brief comment like // scan is always the sole source of bytesRead for this task so it's clear the overwrite is intentional.


Overall this is a straightforward and useful change for the scan-only case. The main thing I'd want to resolve before merging is whether the output_rows path in multi-operator pipelines can produce a misleading recordsRead.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
spark/src/test/scala/org/apache/spark/sql/comet/CometTaskMetricsSuite.scala (1)

104-106: Make Comet enablement explicit in the native-scan run.

The second metrics collection currently depends on ambient default for CometConf.COMET_ENABLED. Setting it explicitly makes this test deterministic across suite/config changes.

✅ Proposed tweak
-      val (cometBytes, cometRecords) = collectInputMetrics(
-        CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION)
+      val (cometBytes, cometRecords) = collectInputMetrics(
+        CometConf.COMET_ENABLED.key -> "true",
+        CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@spark/src/test/scala/org/apache/spark/sql/comet/CometTaskMetricsSuite.scala`
around lines 104 - 106, The test relies on ambient Comet enablement; make it
deterministic by explicitly enabling Comet when calling collectInputMetrics:
pass CometConf.COMET_ENABLED.key -> "true" (or the appropriate boolean/string
form used elsewhere) alongside CometConf.COMET_NATIVE_SCAN_IMPL.key ->
CometConf.SCAN_NATIVE_DATAFUSION so the call to collectInputMetrics explicitly
sets COMET_ENABLED for the native-scan run.
spark/src/main/scala/org/apache/spark/sql/comet/CometExecRDD.scala (1)

143-148: Extract metric-name literals to shared constants.

Using inline "bytes_scanned" / "output_rows" here is easy to drift over time. Please centralize these keys (e.g., in CometMetricNode/CometExecRDD companion object) and reference constants at call sites.

♻️ Proposed refactor
--- a/spark/src/main/scala/org/apache/spark/sql/comet/CometExecRDD.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometExecRDD.scala
@@
-        nativeMetrics.metrics
-          .get("bytes_scanned")
+        nativeMetrics.metrics
+          .get(CometExecRDD.NativeBytesScannedMetric)
           .foreach(m => ctx.taskMetrics().inputMetrics.setBytesRead(m.value))
         nativeMetrics.metrics
-          .get("output_rows")
+          .get(CometExecRDD.NativeOutputRowsMetric)
           .foreach(m => ctx.taskMetrics().inputMetrics.setRecordsRead(m.value))
@@
 object CometExecRDD {
+  private val NativeBytesScannedMetric = "bytes_scanned"
+  private val NativeOutputRowsMetric = "output_rows"
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@spark/src/main/scala/org/apache/spark/sql/comet/CometExecRDD.scala` around
lines 143 - 148, The code in CometExecRDD uses inline metric-name literals
"bytes_scanned" and "output_rows"; extract these into shared constants (e.g.,
add val BytesScannedMetric = "bytes_scanned" and val OutputRowsMetric =
"output_rows" in the CometExecRDD companion object or CometMetricNode) and
replace the literal usages in CometExecRDD.scala (the get(...) calls that update
ctx.taskMetrics().inputMetrics) to reference those constants so keys are
centralized and less error-prone.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@spark/src/main/scala/org/apache/spark/sql/comet/CometExecRDD.scala`:
- Around line 143-148: The code in CometExecRDD uses inline metric-name literals
"bytes_scanned" and "output_rows"; extract these into shared constants (e.g.,
add val BytesScannedMetric = "bytes_scanned" and val OutputRowsMetric =
"output_rows" in the CometExecRDD companion object or CometMetricNode) and
replace the literal usages in CometExecRDD.scala (the get(...) calls that update
ctx.taskMetrics().inputMetrics) to reference those constants so keys are
centralized and less error-prone.

In `@spark/src/test/scala/org/apache/spark/sql/comet/CometTaskMetricsSuite.scala`:
- Around line 104-106: The test relies on ambient Comet enablement; make it
deterministic by explicitly enabling Comet when calling collectInputMetrics:
pass CometConf.COMET_ENABLED.key -> "true" (or the appropriate boolean/string
form used elsewhere) alongside CometConf.COMET_NATIVE_SCAN_IMPL.key ->
CometConf.SCAN_NATIVE_DATAFUSION so the call to collectInputMetrics explicitly
sets COMET_ENABLED for the native-scan run.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: c9c5299f-cfaf-4e89-ab95-7da490e4e763

📥 Commits

Reviewing files that changed from the base of the PR and between 7c5eb20 and ac6b869.

📒 Files selected for processing (3)
  • spark/src/main/scala/org/apache/spark/sql/comet/CometExecRDD.scala
  • spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala
  • spark/src/test/scala/org/apache/spark/sql/comet/CometTaskMetricsSuite.scala

@augmentcode
Copy link
Copy Markdown

augmentcode bot commented Mar 31, 2026

🤖 Augment PR Summary

Summary: This PR wires native DataFusion scan metrics into Spark task-level InputMetrics so Spark can report bytes/records read for native scans.

Changes:

  • Populate TaskContext.taskMetrics.inputMetrics from native metrics (bytes_scanned, output_rows) on task completion.
  • Document that CometMetricNode.set_all_from_bytes is invoked via JNI from Rust.
  • Add a new test that compares Comet’s native scan InputMetrics against vanilla Spark and validates bytes/records consistency.

Technical Notes: Metrics are pulled from the native CometMetricNode after iterator/plan teardown so the final per-task values are available to Spark’s listener/UI metrics pipeline.

🤖 Was this summary useful? React with 👍 or 👎

Copy link
Copy Markdown

@augmentcode augmentcode bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review completed. 1 suggestion posted.

Fix All in Augment

Comment augment review to trigger a new review at any time.

.foreach(m => ctx.taskMetrics().inputMetrics.setBytesRead(m.value))
nativeMetrics.metrics
.get("output_rows")
.foreach(m => ctx.taskMetrics().inputMetrics.setRecordsRead(m.value))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

output_rows is part of the baseline native metrics for many non-scan operators, so setting taskMetrics.inputMetrics.recordsRead from it here can misreport or overwrite Spark’s input metrics for non-scan stages. Consider gating this update so it only applies when the native metrics actually represent scan input (e.g., tied to scan-specific metrics like bytes_scanned).

Severity: medium

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback: The Augment AI reviewer is correct! Being executed in CometExecRDD this metrics collection is executed for all kinds of nodes, not just for the Scan ones. It would be better to move this logic to CometNativeScanExec#doExecuteColumnar(). This way it will collect only the Scan related metrics.

@martin-augment
Copy link
Copy Markdown
Owner Author

104-106: Make Comet enablement explicit in the native-scan run.

The second metrics collection currently depends on ambient default for CometConf.COMET_ENABLED. Setting it explicitly makes this test deterministic across suite/config changes.

value:good-to-have; category:bug; feedback: The CodeRabbit AI reviewer is correct! The test works fine because it uses the default value of COMET_ENABLED=true. It would be good to list this config explicitly to make it obvious and more stable.

@martin-augment
Copy link
Copy Markdown
Owner Author

Potential recordsRead issue in multi-operator plans

The task completion listener in CometExecRDD.compute() runs for every CometExecRDD, not only ones created by CometNativeScanExec. For a query with a Comet operator above the scan (e.g. SELECT * FROM tbl WHERE complex_expr(col)), the entire pipeline gets compiled into a single CometExecRDD via CometExec.doExecuteColumnar(). In that case:

  • The root nativeMetrics is the filter/sort operator's metric node.
  • nativeMetrics.metrics.get("bytes_scanned") returns None → no-op (correct).
  • nativeMetrics.metrics.get("output_rows") returns Some(...) because CometExec uses baselineMetrics, which includes output_rowssetRecordsRead is called with the operator's output rows (filtered/sorted rows), not the scan's rows.

For recordsRead, Spark's semantics are "records read from storage", so using the filter's output count would under-report. Would it be worth guarding this with something like a flag or by scoping the metric propagation to CometNativeScanExec.doExecuteColumnar() directly rather than inside CometExecRDD.compute()?

value:useful; category:bug; feedback: The Claude AI reviewer is correct! Being executed in CometExecRDD this metrics collection is executed for all kinds of nodes, not just for the Scan ones. It would be better to move this logic to CometNativeScanExec#doExecuteColumnar(). This way it will collect only the Scan related metrics.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants