From 6fd8143bcc85e9ac722c2cc78e0541aea6d520a5 Mon Sep 17 00:00:00 2001 From: noroshi Date: Thu, 5 Mar 2026 03:08:13 +0000 Subject: [PATCH] [GLUTEN-8063][VL] Export cpuNanos metric for InputIteratorTransformer Export Velox's cpuWallTiming.cpuNanos through the full metrics pipeline (C++ -> JNI -> Java -> Scala) and display it as a new metric on InputIteratorTransformer. This addresses the confusing high wall time reported in #8063 and #10618 by providing a breakdown: wallNanos shows time including while waiting for upstream dat), while cpuNanos shows the exclusive CPU time (actual work done by the operator itself). --- .../src/main/java/org/apache/gluten/metrics/Metrics.java | 4 ++++ .../main/java/org/apache/gluten/metrics/OperatorMetrics.java | 3 +++ .../org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala | 3 ++- .../apache/gluten/metrics/InputIteratorMetricsUpdater.scala | 1 + cpp/core/jni/JniWrapper.cc | 3 ++- cpp/core/utils/Metrics.h | 1 + cpp/velox/compute/WholeStageResultIterator.cc | 2 ++ 7 files changed, 15 insertions(+), 2 deletions(-) diff --git a/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java b/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java index 3d23dc94db97..5b32bb191a5e 100644 --- a/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java +++ b/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java @@ -29,6 +29,7 @@ public class Metrics implements IMetrics { public long[] outputBytes; public long[] cpuCount; public long[] wallNanos; + public long[] cpuNanos; public long[] scanTime; public long[] peakMemoryBytes; public long[] numMemoryAllocations; @@ -79,6 +80,7 @@ public Metrics( long[] outputBytes, long[] cpuCount, long[] wallNanos, + long[] cpuNanos, long veloxToArrow, long[] peakMemoryBytes, long[] numMemoryAllocations, @@ -122,6 +124,7 @@ public Metrics( this.outputBytes = outputBytes; this.cpuCount = cpuCount; this.wallNanos = wallNanos; + this.cpuNanos = cpuNanos; this.scanTime = scanTime; this.singleMetric.veloxToArrow = veloxToArrow; this.peakMemoryBytes = peakMemoryBytes; @@ -174,6 +177,7 @@ public OperatorMetrics getOperatorMetrics(int index) { outputBytes[index], cpuCount[index], wallNanos[index], + cpuNanos[index], peakMemoryBytes[index], numMemoryAllocations[index], spilledInputBytes[index], diff --git a/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java b/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java index 10563e507e9b..136fbdfd5332 100644 --- a/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java +++ b/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java @@ -27,6 +27,7 @@ public class OperatorMetrics implements IOperatorMetrics { public long outputBytes; public long cpuCount; public long wallNanos; + public long cpuNanos; public long scanTime; public long peakMemoryBytes; public long numMemoryAllocations; @@ -73,6 +74,7 @@ public OperatorMetrics( long outputBytes, long cpuCount, long wallNanos, + long cpuNanos, long peakMemoryBytes, long numMemoryAllocations, long spilledInputBytes, @@ -114,6 +116,7 @@ public OperatorMetrics( this.outputBytes = outputBytes; this.cpuCount = cpuCount; this.wallNanos = wallNanos; + this.cpuNanos = cpuNanos; this.scanTime = scanTime; this.peakMemoryBytes = peakMemoryBytes; this.numMemoryAllocations = numMemoryAllocations; diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala index 29d882c3d3db..4a0bcd210864 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala @@ -75,7 +75,8 @@ class VeloxMetricsApi extends MetricsApi with Logging { Map( "cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time count"), - "wallNanos" -> wallNanosMetric + "wallNanos" -> wallNanosMetric, + "cpuNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "cpu time") ) ++ outputMetrics } diff --git a/backends-velox/src/main/scala/org/apache/gluten/metrics/InputIteratorMetricsUpdater.scala b/backends-velox/src/main/scala/org/apache/gluten/metrics/InputIteratorMetricsUpdater.scala index 79f643264501..8d77bca63b04 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/metrics/InputIteratorMetricsUpdater.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/InputIteratorMetricsUpdater.scala @@ -25,6 +25,7 @@ case class InputIteratorMetricsUpdater(metrics: Map[String, SQLMetric], forBroad val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics] metrics("cpuCount") += operatorMetrics.cpuCount metrics("wallNanos") += operatorMetrics.wallNanos + metrics("cpuNanos") += operatorMetrics.cpuNanos if (!forBroadcast) { if (operatorMetrics.outputRows == 0 && operatorMetrics.outputVectors == 0) { // Sometimes, velox does not update metrics for intermediate operator, diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index c48886195b22..eff77e557003 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -273,7 +273,7 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { env, metricsBuilderClass, "", - "([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[JLjava/lang/String;)V"); + "([J[J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[JLjava/lang/String;)V"); nativeColumnarToRowInfoClass = createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/vectorized/NativeColumnarToRowInfo;"); @@ -578,6 +578,7 @@ JNIEXPORT jobject JNICALL Java_org_apache_gluten_metrics_IteratorMetricsJniWrapp longArray[Metrics::kOutputBytes], longArray[Metrics::kCpuCount], longArray[Metrics::kWallNanos], + longArray[Metrics::kCpuNanos], metrics ? metrics->veloxToArrow : -1, longArray[Metrics::kPeakMemoryBytes], longArray[Metrics::kNumMemoryAllocations], diff --git a/cpp/core/utils/Metrics.h b/cpp/core/utils/Metrics.h index 8e33a4a9c613..4109a87de4ea 100644 --- a/cpp/core/utils/Metrics.h +++ b/cpp/core/utils/Metrics.h @@ -52,6 +52,7 @@ struct Metrics { // CpuWallTiming. kCpuCount, kWallNanos, + kCpuNanos, kPeakMemoryBytes, kNumMemoryAllocations, diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index babcaf0e5f64..dc6e30b90a23 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -457,6 +457,7 @@ void WholeStageResultIterator::collectMetrics() { metrics_->get(Metrics::kOutputBytes)[metricIndex] = 0; metrics_->get(Metrics::kCpuCount)[metricIndex] = 0; metrics_->get(Metrics::kWallNanos)[metricIndex] = 0; + metrics_->get(Metrics::kCpuNanos)[metricIndex] = 0; metrics_->get(Metrics::kPeakMemoryBytes)[metricIndex] = 0; metrics_->get(Metrics::kNumMemoryAllocations)[metricIndex] = 0; metricIndex += 1; @@ -477,6 +478,7 @@ void WholeStageResultIterator::collectMetrics() { metrics_->get(Metrics::kOutputBytes)[metricIndex] = second->outputBytes; metrics_->get(Metrics::kCpuCount)[metricIndex] = second->cpuWallTiming.count; metrics_->get(Metrics::kWallNanos)[metricIndex] = second->cpuWallTiming.wallNanos; + metrics_->get(Metrics::kCpuNanos)[metricIndex] = second->cpuWallTiming.cpuNanos; metrics_->get(Metrics::kPeakMemoryBytes)[metricIndex] = second->peakMemoryBytes; metrics_->get(Metrics::kNumMemoryAllocations)[metricIndex] = second->numMemoryAllocations; metrics_->get(Metrics::kSpilledInputBytes)[metricIndex] = second->spilledInputBytes;