From efb05215e3da796efe9ee028422edea1206cd964 Mon Sep 17 00:00:00 2001 From: Rui Mo Date: Tue, 23 Sep 2025 11:17:45 +0100 Subject: [PATCH] Add datasource metrics --- .../org/apache/gluten/metrics/Metrics.java | 9 +++++++++ .../apache/gluten/metrics/OperatorMetrics.java | 6 ++++++ .../backendsapi/velox/VeloxMetricsApi.scala | 18 ++++++++++++++++++ .../metrics/BatchScanMetricsUpdater.scala | 2 ++ .../metrics/FileSourceScanMetricsUpdater.scala | 4 ++++ .../metrics/HiveTableScanMetricsUpdater.scala | 4 ++++ .../apache/gluten/metrics/MetricsUtil.scala | 6 ++++++ .../gluten/execution/VeloxMetricsSuite.scala | 12 ++++++++++++ cpp/core/jni/JniWrapper.cc | 4 +++- cpp/core/utils/Metrics.h | 2 ++ cpp/velox/compute/WholeStageResultIterator.cc | 6 ++++++ 11 files changed, 72 insertions(+), 1 deletion(-) diff --git a/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java b/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java index 8d5c92b0cd98..d9b16aef988b 100644 --- a/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java +++ b/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java @@ -52,6 +52,8 @@ public class Metrics implements IMetrics { public long[] localReadBytes; public long[] ramReadBytes; public long[] preloadSplits; + public long[] dataSourceAddSplitTime; + public long[] dataSourceReadTime; public long[] physicalWrittenBytes; public long[] writeIOTime; @@ -97,6 +99,8 @@ public Metrics( long[] localReadBytes, long[] ramReadBytes, long[] preloadSplits, + long[] dataSourceAddSplitTime, + long[] dataSourceReadTime, long[] physicalWrittenBytes, long[] writeIOTime, long[] numWrittenFiles, @@ -135,6 +139,9 @@ public Metrics( this.localReadBytes = localReadBytes; this.ramReadBytes = ramReadBytes; this.preloadSplits = preloadSplits; + this.dataSourceAddSplitTime = dataSourceAddSplitTime; + this.dataSourceReadTime = dataSourceReadTime; + this.physicalWrittenBytes = physicalWrittenBytes; this.writeIOTime = writeIOTime; this.numWrittenFiles = numWrittenFiles; @@ -180,6 +187,8 @@ public OperatorMetrics getOperatorMetrics(int index) { localReadBytes[index], ramReadBytes[index], preloadSplits[index], + dataSourceAddSplitTime[index], + dataSourceReadTime[index], physicalWrittenBytes[index], writeIOTime[index], numWrittenFiles[index]); diff --git a/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java b/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java index 36d827a288b6..24bedf0a466c 100644 --- a/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java +++ b/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java @@ -50,6 +50,8 @@ public class OperatorMetrics implements IOperatorMetrics { public long localReadBytes; public long ramReadBytes; public long preloadSplits; + public long dataSourceAddSplitTime; + public long dataSourceReadTime; public long physicalWrittenBytes; public long writeIOTime; @@ -90,6 +92,8 @@ public OperatorMetrics( long localReadBytes, long ramReadBytes, long preloadSplits, + long dataSourceAddSplitTime, + long dataSourceReadTime, long physicalWrittenBytes, long writeIOTime, long numWrittenFiles) { @@ -126,6 +130,8 @@ public OperatorMetrics( this.localReadBytes = localReadBytes; this.ramReadBytes = ramReadBytes; this.preloadSplits = preloadSplits; + this.dataSourceAddSplitTime = dataSourceAddSplitTime; + this.dataSourceReadTime = dataSourceReadTime; this.physicalWrittenBytes = physicalWrittenBytes; this.writeIOTime = writeIOTime; this.numWrittenFiles = numWrittenFiles; diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala index 901bad8be443..0feefe02bd2f 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala @@ -108,6 +108,12 @@ class VeloxMetricsApi extends MetricsApi with Logging { "skippedSplits" -> SQLMetrics.createMetric(sparkContext, "number of skipped splits"), "processedSplits" -> SQLMetrics.createMetric(sparkContext, "number of processed splits"), "preloadSplits" -> SQLMetrics.createMetric(sparkContext, "number of preloaded splits"), + "dataSourceAddSplitTime" -> SQLMetrics.createNanoTimingMetric( + sparkContext, + "data source add split time"), + "dataSourceReadTime" -> SQLMetrics.createNanoTimingMetric( + sparkContext, + "data source read time"), "skippedStrides" -> SQLMetrics.createMetric(sparkContext, "number of skipped row groups"), "processedStrides" -> SQLMetrics.createMetric(sparkContext, "number of processed row groups"), "remainingFilterTime" -> SQLMetrics.createNanoTimingMetric( @@ -149,6 +155,12 @@ class VeloxMetricsApi extends MetricsApi with Logging { "skippedSplits" -> SQLMetrics.createMetric(sparkContext, "number of skipped splits"), "processedSplits" -> SQLMetrics.createMetric(sparkContext, "number of processed splits"), "preloadSplits" -> SQLMetrics.createMetric(sparkContext, "number of preloaded splits"), + "dataSourceAddSplitTime" -> SQLMetrics.createNanoTimingMetric( + sparkContext, + "data source add split time"), + "dataSourceReadTime" -> SQLMetrics.createNanoTimingMetric( + sparkContext, + "data source read time"), "skippedStrides" -> SQLMetrics.createMetric(sparkContext, "number of skipped row groups"), "processedStrides" -> SQLMetrics.createMetric(sparkContext, "number of processed row groups"), "remainingFilterTime" -> SQLMetrics.createNanoTimingMetric( @@ -190,6 +202,12 @@ class VeloxMetricsApi extends MetricsApi with Logging { "skippedSplits" -> SQLMetrics.createMetric(sparkContext, "number of skipped splits"), "processedSplits" -> SQLMetrics.createMetric(sparkContext, "number of processed splits"), "preloadSplits" -> SQLMetrics.createMetric(sparkContext, "number of preloaded splits"), + "dataSourceAddSplitTime" -> SQLMetrics.createNanoTimingMetric( + sparkContext, + "data source add split time"), + "dataSourceReadTime" -> SQLMetrics.createNanoTimingMetric( + sparkContext, + "data source read time"), "skippedStrides" -> SQLMetrics.createMetric(sparkContext, "number of skipped row groups"), "processedStrides" -> SQLMetrics.createMetric(sparkContext, "number of processed row groups"), "remainingFilterTime" -> SQLMetrics.createNanoTimingMetric( diff --git a/backends-velox/src/main/scala/org/apache/gluten/metrics/BatchScanMetricsUpdater.scala b/backends-velox/src/main/scala/org/apache/gluten/metrics/BatchScanMetricsUpdater.scala index 31f6b5a313e4..886e8353b839 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/metrics/BatchScanMetricsUpdater.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/BatchScanMetricsUpdater.scala @@ -54,6 +54,8 @@ class BatchScanMetricsUpdater(val metrics: Map[String, SQLMetric]) extends Metri metrics("localReadBytes") += operatorMetrics.localReadBytes metrics("ramReadBytes") += operatorMetrics.ramReadBytes metrics("preloadSplits") += operatorMetrics.preloadSplits + metrics("dataSourceAddSplitTime") += operatorMetrics.dataSourceAddSplitTime + metrics("dataSourceReadTime") += operatorMetrics.dataSourceReadTime } } } diff --git a/backends-velox/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala b/backends-velox/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala index f8693f9246e3..b76525d7fbc7 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala @@ -42,6 +42,8 @@ class FileSourceScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric val skippedSplits: SQLMetric = metrics("skippedSplits") val processedSplits: SQLMetric = metrics("processedSplits") val preloadSplits: SQLMetric = metrics("preloadSplits") + val dataSourceAddSplitTime: SQLMetric = metrics("dataSourceAddSplitTime") + val dataSourceReadTime: SQLMetric = metrics("dataSourceReadTime") val skippedStrides: SQLMetric = metrics("skippedStrides") val processedStrides: SQLMetric = metrics("processedStrides") val remainingFilterTime: SQLMetric = metrics("remainingFilterTime") @@ -80,6 +82,8 @@ class FileSourceScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric localReadBytes += operatorMetrics.localReadBytes ramReadBytes += operatorMetrics.ramReadBytes preloadSplits += operatorMetrics.preloadSplits + dataSourceAddSplitTime += operatorMetrics.dataSourceAddSplitTime + dataSourceReadTime += operatorMetrics.dataSourceReadTime } } } diff --git a/backends-velox/src/main/scala/org/apache/gluten/metrics/HiveTableScanMetricsUpdater.scala b/backends-velox/src/main/scala/org/apache/gluten/metrics/HiveTableScanMetricsUpdater.scala index f7e0dc180091..c8e4e98cdff5 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/metrics/HiveTableScanMetricsUpdater.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/HiveTableScanMetricsUpdater.scala @@ -37,6 +37,8 @@ class HiveTableScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric] val skippedSplits: SQLMetric = metrics("skippedSplits") val processedSplits: SQLMetric = metrics("processedSplits") val preloadSplits: SQLMetric = metrics("preloadSplits") + val dataSourceAddSplitTime: SQLMetric = metrics("dataSourceAddSplitTime") + val dataSourceReadTime: SQLMetric = metrics("dataSourceReadTime") val skippedStrides: SQLMetric = metrics("skippedStrides") val processedStrides: SQLMetric = metrics("processedStrides") val remainingFilterTime: SQLMetric = metrics("remainingFilterTime") @@ -75,6 +77,8 @@ class HiveTableScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric] localReadBytes += operatorMetrics.localReadBytes ramReadBytes += operatorMetrics.ramReadBytes preloadSplits += operatorMetrics.preloadSplits + dataSourceAddSplitTime += operatorMetrics.dataSourceAddSplitTime + dataSourceReadTime += operatorMetrics.dataSourceReadTime } } } diff --git a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala index d9e269850bde..cd3e0eafa05a 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala @@ -133,6 +133,8 @@ object MetricsUtil extends Logging { var localReadBytes: Long = 0 var ramReadBytes: Long = 0 var preloadSplits: Long = 0 + var dataSourceAddSplitTime: Long = 0 + var dataSourceReadTime: Long = 0 var numWrittenFiles: Long = 0 val metricsIterator = operatorMetrics.iterator() @@ -163,6 +165,8 @@ object MetricsUtil extends Logging { localReadBytes += metrics.localReadBytes ramReadBytes += metrics.ramReadBytes preloadSplits += metrics.preloadSplits + dataSourceAddSplitTime += metrics.dataSourceAddSplitTime + dataSourceReadTime += metrics.dataSourceReadTime numWrittenFiles += metrics.numWrittenFiles } @@ -200,6 +204,8 @@ object MetricsUtil extends Logging { localReadBytes, ramReadBytes, preloadSplits, + dataSourceAddSplitTime, + dataSourceReadTime, physicalWrittenBytes, writeIOTime, numWrittenFiles diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala index 954a1eacc59f..db26ce298530 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala @@ -278,6 +278,18 @@ class VeloxMetricsSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa assert(metrics("ramReadBytes").value == 0) } + test("Velox datasource metrics") { + val df = spark.sql(s"SELECT * FROM metrics_t1") + val scans = collect(df.queryExecution.executedPlan) { + case scan: FileSourceScanExecTransformer => scan + } + df.collect() + assert(scans.length === 1) + val metrics = scans.head.metrics + assert(metrics("dataSourceReadTime").value > 0) + assert(metrics("dataSourceAddSplitTime").value > 0) + } + test("test nested loop join metrics") { withSQLConf() { runQueryAndCompare( diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index 543bc315974d..11e7ad15dea8 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -259,7 +259,7 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { env, metricsBuilderClass, "", - "([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[JLjava/lang/String;)V"); + "([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[JLjava/lang/String;)V"); nativeColumnarToRowInfoClass = createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/vectorized/NativeColumnarToRowInfo;"); @@ -585,6 +585,8 @@ JNIEXPORT jobject JNICALL Java_org_apache_gluten_metrics_IteratorMetricsJniWrapp longArray[Metrics::kLocalReadBytes], longArray[Metrics::kRamReadBytes], longArray[Metrics::kPreloadSplits], + longArray[Metrics::kDataSourceAddSplitWallNanos], + longArray[Metrics::kDataSourceReadWallNanos], longArray[Metrics::kPhysicalWrittenBytes], longArray[Metrics::kWriteIOTime], longArray[Metrics::kNumWrittenFiles], diff --git a/cpp/core/utils/Metrics.h b/cpp/core/utils/Metrics.h index cb48a137e711..d3169eb69fab 100644 --- a/cpp/core/utils/Metrics.h +++ b/cpp/core/utils/Metrics.h @@ -80,6 +80,8 @@ struct Metrics { kLocalReadBytes, kRamReadBytes, kPreloadSplits, + kDataSourceAddSplitWallNanos, + kDataSourceReadWallNanos, // Write metrics. kPhysicalWrittenBytes, diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index 323231b4fab7..039e67bd4fd2 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -49,6 +49,8 @@ const std::string kStorageReadBytes = "storageReadBytes"; const std::string kLocalReadBytes = "localReadBytes"; const std::string kRamReadBytes = "ramReadBytes"; const std::string kPreloadSplits = "readyPreloadedSplits"; +const std::string kDataSourceAddSplitWallNanos = "dataSourceAddSplitWallNanos"; +const std::string kDataSourceReadWallNanos = "dataSourceReadWallNanos"; const std::string kNumWrittenFiles = "numWrittenFiles"; const std::string kWriteIOTime = "writeIOWallNanos"; @@ -466,6 +468,10 @@ void WholeStageResultIterator::collectMetrics() { metrics_->get(Metrics::kRamReadBytes)[metricIndex] = runtimeMetric("sum", second->customStats, kRamReadBytes); metrics_->get(Metrics::kPreloadSplits)[metricIndex] = runtimeMetric("sum", entry.second->customStats, kPreloadSplits); + metrics_->get(Metrics::kDataSourceAddSplitWallNanos)[metricIndex] = + runtimeMetric("sum", second->customStats, kDataSourceAddSplitWallNanos); + metrics_->get(Metrics::kDataSourceReadWallNanos)[metricIndex] = + runtimeMetric("sum", second->customStats, kDataSourceReadWallNanos); metrics_->get(Metrics::kNumWrittenFiles)[metricIndex] = runtimeMetric("sum", entry.second->customStats, kNumWrittenFiles); metrics_->get(Metrics::kPhysicalWrittenBytes)[metricIndex] = second->physicalWrittenBytes;