3708: feat: expose comet metrics through Sparks external monitoring system#48
3708: feat: expose comet metrics through Sparks external monitoring system#48martin-augment wants to merge 1 commit intomainfrom
Conversation
WalkthroughThis pull request introduces metrics collection capabilities for Comet integration with Spark. A new ✨ Finishing Touches
🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment Tip You can make CodeRabbit's review stricter and more nitpicky using the `assertive` profile, if that's what you prefer.Change the |
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request enhances Comet's observability by integrating a dedicated metrics source ( Highlights
Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
Code Review: feat: expose comet metrics through Spark's external monitoring systemOverall this is a well-structured feature that hooks nicely into Spark's existing metrics infrastructure. The implementation is clean and the test coverage is a good start. I have several issues to raise, ranging from a significant correctness bug to minor style points. Critical Issue: Double-counting with AQE and dual rule applicationFile:
This means The derived A fix should track whether stats have already been recorded for a given plan root (e.g., tag the plan, or only record in one of the two injection sites), or move the recording to a different hook that fires exactly once per user-visible query. Moderate Issue:
|
| Severity | Issue |
|---|---|
| Critical | Double-counting due to dual injection of CometExecRule (AQE + preColumnarTransitions) |
| Moderate | Redundant tree traversal when COMET_EXPLAIN_FALLBACK_ENABLED=true |
| Moderate | Unbounded counters with no documentation about cumulative vs. rate semantics |
| Minor | Inconsistent type annotations on public Counter fields |
| Minor | StringBuilder allocated and discarded in forPlan |
| Minor | Missing comment explaining why CometSource lives in org.apache.spark package |
| Minor | Test does not cover SPARK_OPERATORS, TRANSITIONS, or the acceleration.ratio gauge |
The critical double-counting issue should be resolved before merging. The remaining items are improvements but not blockers.
There was a problem hiding this comment.
Code Review
This pull request introduces Comet metrics through Sparks external monitoring system. It includes changes to the build script to allow the new CometSource class, adds the CometSource class itself, modifies the CometExecRule to record coverage stats, and updates the CometDriverPlugin to register the Comet metrics source. A new test case is also added to verify that CometSource metrics are recorded.
| val stats = CometCoverageStats.forPlan(newPlan) | ||
| CometSource.recordStats(stats) |
There was a problem hiding this comment.
It's important to ensure that CometSource.recordStats(stats) is called only when newPlan is actually a Comet plan. Otherwise, the metrics might be misleading, as they would include Spark operators as well.
| val stats = CometCoverageStats.forPlan(newPlan) | |
| CometSource.recordStats(stats) | |
| if (newPlan.isInstanceOf[CometPlan]) { | |
| val stats = CometCoverageStats.forPlan(newPlan) | |
| CometSource.recordStats(stats) | |
| } |
| allowed_expr+="|^org/apache/spark/sql/ExtendedExplainGenerator.*$" | ||
| allowed_expr+="|^org/apache/spark/CometPlugin.class$" | ||
| allowed_expr+="|^org/apache/spark/CometDriverPlugin.*$" | ||
| allowed_expr+="|^org/apache/spark/CometSource.*$" |
There was a problem hiding this comment.
The regex CometSource.*$ allows any character after CometSource, but it seems like only the .class extension is expected. It is better to be more specific to avoid unexpected inclusions.
| allowed_expr+="|^org/apache/spark/CometSource.*$" | |
| allowed_expr+="|^org/apache/spark/CometSource.class$" |
🤖 Augment PR SummarySummary: Wires Comet plan-coverage/acceleration stats into Spark’s metrics system so external monitoring can track Comet usage. Changes:
🤖 Was this summary useful? React with 👍 or 👎 |
| var newPlan = transform(planWithJoinRewritten) | ||
|
|
||
| // Record coverage stats for metrics | ||
| val stats = CometCoverageStats.forPlan(newPlan) |
There was a problem hiding this comment.
Since CometExecRule is injected both as a QueryStagePrepRule and as a columnar preColumnarTransitions rule, this recordStats call may run multiple times per query/stage and inflate queries.planned/operator counters. Consider guarding so stats are recorded once per final plan (or only when the rule actually changes the plan).
Severity: medium
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
| 0, | ||
| Seq(), | ||
| 0, | ||
| new StringBuilder(), |
There was a problem hiding this comment.
Despite the Scaladoc, this still builds the full tree output into a StringBuilder via generateTreeString, which could allocate a large buffer on every planning pass. If the intent is stats-only, consider a traversal that updates CometCoverageStats without appending to a string.
Severity: medium
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
|
|
||
| /** | ||
| * Exposes following metrics (hooked from CometCoverageStats) | ||
| * - operators.native: Total operators executed natively |
There was a problem hiding this comment.
These metrics are derived from CometCoverageStats over the planned SparkPlan, so phrasing like “operators executed natively” / “fell back” reads like runtime execution and may be misleading. Consider clarifying the doc/metric semantics (planned operators vs executed operators).
Severity: low
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
| } | ||
| } | ||
|
|
||
| test("CometSource metrics are recorded") { |
There was a problem hiding this comment.
This test only checks the in-process CometSource counters, so it can pass even if CometSource isn’t actually registered with sc.env.metricsSystem (i.e., external monitoring can’t see it). Consider asserting the metrics registry/source contains the expected names to validate the exposure path.
Severity: low
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
There was a problem hiding this comment.
🧹 Nitpick comments (1)
spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala (1)
200-209: Avoid building an unused explain string in stats-only path.Line 203-209 computes coverage by calling
generateTreeString, but the generated text is discarded. A stats-only traversal would remove unnecessary allocation/work.♻️ Refactor sketch
object CometCoverageStats { /** * Compute coverage stats for a plan without generating explain string. */ def forPlan(plan: SparkPlan): CometCoverageStats = { val stats = new CometCoverageStats() val explainInfo = new ExtendedExplainInfo() - explainInfo.generateTreeString( - CometExplainInfo.getActualPlan(plan), - 0, - Seq(), - 0, - new StringBuilder(), - stats) + explainInfo.collectCoverageStats( + CometExplainInfo.getActualPlan(plan), + stats) stats } }// Add in ExtendedExplainInfo (stats-only traversal, no string appends) private[comet] def collectCoverageStats(node: TreeNode[_], planStats: CometCoverageStats): Unit = { // same node classification logic as generateTreeString(...) // recurse through innerChildren and children }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala` around lines 200 - 209, The forPlan method builds a full explain string by calling ExtendedExplainInfo.generateTreeString but only needs coverage stats, causing unnecessary allocation; add a stats-only traversal method (e.g., ExtendedExplainInfo.collectCoverageStats(node: TreeNode[_], planStats: CometCoverageStats)) that replicates the node classification/recurse logic from generateTreeString without creating or appending to a StringBuilder, and change forPlan to call CometExplainInfo.getActualPlan(plan) then collectCoverageStats on that node to populate CometCoverageStats instead of generateTreeString; ensure you reference the same classification helpers currently used in generateTreeString so behavior is unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala`:
- Around line 200-209: The forPlan method builds a full explain string by
calling ExtendedExplainInfo.generateTreeString but only needs coverage stats,
causing unnecessary allocation; add a stats-only traversal method (e.g.,
ExtendedExplainInfo.collectCoverageStats(node: TreeNode[_], planStats:
CometCoverageStats)) that replicates the node classification/recurse logic from
generateTreeString without creating or appending to a StringBuilder, and change
forPlan to call CometExplainInfo.getActualPlan(plan) then collectCoverageStats
on that node to populate CometCoverageStats instead of generateTreeString;
ensure you reference the same classification helpers currently used in
generateTreeString so behavior is unchanged.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 05daa1a7-cda2-443b-9008-38f3b23ed423
📒 Files selected for processing (6)
dev/ensure-jars-have-correct-contents.shspark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scalaspark/src/main/scala/org/apache/comet/rules/CometExecRule.scalaspark/src/main/scala/org/apache/spark/CometSource.scalaspark/src/main/scala/org/apache/spark/Plugins.scalaspark/src/test/scala/org/apache/spark/CometPluginsSuite.scala
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
|
|
||
| // Record coverage stats for metrics | ||
| val stats = CometCoverageStats.forPlan(newPlan) | ||
| CometSource.recordStats(stats) |
There was a problem hiding this comment.
Metrics inflated by multiple rule invocations with AQE
Medium Severity
CometExecRule is registered both via injectColumnar (as preColumnarTransitions) and via injectQueryStagePrepRule. With AQE enabled (the default), _apply is invoked multiple times per query — once from each injection point during re-optimization cycles. Each invocation calls CometSource.recordStats, inflating all counters including QUERIES_PLANNED, NATIVE_OPERATORS, and SPARK_OPERATORS. The acceleration.ratio gauge may also be skewed since operator counts from different plan stages have different compositions.
|
|
||
| // Record coverage stats for metrics | ||
| val stats = CometCoverageStats.forPlan(newPlan) | ||
| CometSource.recordStats(stats) |
There was a problem hiding this comment.
Stats captured before temporary placeholder nodes are removed
Low Severity
CometCoverageStats.forPlan(newPlan) is called before CometScanWrapper and CometSinkPlaceHolder are removed from the plan. Both extend CometPlan, so they are counted as cometOperators. When removed, CometScanWrapper is replaced by its wrapped Spark originalPlan (which was never counted), and CometSinkPlaceHolder is replaced by its already-counted child. This inflates cometOperators and deflates sparkOperators relative to the final plan.


3708: To review by AI