Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public class Metrics implements IMetrics {
public long[] localReadBytes;
public long[] ramReadBytes;
public long[] preloadSplits;
public long[] dataSourceAddSplitTime;
public long[] dataSourceReadTime;

public long[] physicalWrittenBytes;
public long[] writeIOTime;
Expand Down Expand Up @@ -97,6 +99,8 @@ public Metrics(
long[] localReadBytes,
long[] ramReadBytes,
long[] preloadSplits,
long[] dataSourceAddSplitTime,
long[] dataSourceReadTime,
long[] physicalWrittenBytes,
long[] writeIOTime,
long[] numWrittenFiles,
Expand Down Expand Up @@ -135,6 +139,9 @@ public Metrics(
this.localReadBytes = localReadBytes;
this.ramReadBytes = ramReadBytes;
this.preloadSplits = preloadSplits;
this.dataSourceAddSplitTime = dataSourceAddSplitTime;
this.dataSourceReadTime = dataSourceReadTime;

this.physicalWrittenBytes = physicalWrittenBytes;
this.writeIOTime = writeIOTime;
this.numWrittenFiles = numWrittenFiles;
Expand Down Expand Up @@ -180,6 +187,8 @@ public OperatorMetrics getOperatorMetrics(int index) {
localReadBytes[index],
ramReadBytes[index],
preloadSplits[index],
dataSourceAddSplitTime[index],
dataSourceReadTime[index],
physicalWrittenBytes[index],
writeIOTime[index],
numWrittenFiles[index]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public class OperatorMetrics implements IOperatorMetrics {
public long localReadBytes;
public long ramReadBytes;
public long preloadSplits;
public long dataSourceAddSplitTime;
public long dataSourceReadTime;

public long physicalWrittenBytes;
public long writeIOTime;
Expand Down Expand Up @@ -90,6 +92,8 @@ public OperatorMetrics(
long localReadBytes,
long ramReadBytes,
long preloadSplits,
long dataSourceAddSplitTime,
long dataSourceReadTime,
long physicalWrittenBytes,
long writeIOTime,
long numWrittenFiles) {
Expand Down Expand Up @@ -126,6 +130,8 @@ public OperatorMetrics(
this.localReadBytes = localReadBytes;
this.ramReadBytes = ramReadBytes;
this.preloadSplits = preloadSplits;
this.dataSourceAddSplitTime = dataSourceAddSplitTime;
this.dataSourceReadTime = dataSourceReadTime;
this.physicalWrittenBytes = physicalWrittenBytes;
this.writeIOTime = writeIOTime;
this.numWrittenFiles = numWrittenFiles;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ class VeloxMetricsApi extends MetricsApi with Logging {
"skippedSplits" -> SQLMetrics.createMetric(sparkContext, "number of skipped splits"),
"processedSplits" -> SQLMetrics.createMetric(sparkContext, "number of processed splits"),
"preloadSplits" -> SQLMetrics.createMetric(sparkContext, "number of preloaded splits"),
"dataSourceAddSplitTime" -> SQLMetrics.createNanoTimingMetric(
sparkContext,
"data source add split time"),
"dataSourceReadTime" -> SQLMetrics.createNanoTimingMetric(
sparkContext,
"data source read time"),
"skippedStrides" -> SQLMetrics.createMetric(sparkContext, "number of skipped row groups"),
"processedStrides" -> SQLMetrics.createMetric(sparkContext, "number of processed row groups"),
"remainingFilterTime" -> SQLMetrics.createNanoTimingMetric(
Expand Down Expand Up @@ -149,6 +155,12 @@ class VeloxMetricsApi extends MetricsApi with Logging {
"skippedSplits" -> SQLMetrics.createMetric(sparkContext, "number of skipped splits"),
"processedSplits" -> SQLMetrics.createMetric(sparkContext, "number of processed splits"),
"preloadSplits" -> SQLMetrics.createMetric(sparkContext, "number of preloaded splits"),
"dataSourceAddSplitTime" -> SQLMetrics.createNanoTimingMetric(
sparkContext,
"data source add split time"),
"dataSourceReadTime" -> SQLMetrics.createNanoTimingMetric(
sparkContext,
"data source read time"),
"skippedStrides" -> SQLMetrics.createMetric(sparkContext, "number of skipped row groups"),
"processedStrides" -> SQLMetrics.createMetric(sparkContext, "number of processed row groups"),
"remainingFilterTime" -> SQLMetrics.createNanoTimingMetric(
Expand Down Expand Up @@ -190,6 +202,12 @@ class VeloxMetricsApi extends MetricsApi with Logging {
"skippedSplits" -> SQLMetrics.createMetric(sparkContext, "number of skipped splits"),
"processedSplits" -> SQLMetrics.createMetric(sparkContext, "number of processed splits"),
"preloadSplits" -> SQLMetrics.createMetric(sparkContext, "number of preloaded splits"),
"dataSourceAddSplitTime" -> SQLMetrics.createNanoTimingMetric(
sparkContext,
"data source add split time"),
"dataSourceReadTime" -> SQLMetrics.createNanoTimingMetric(
sparkContext,
"data source read time"),
"skippedStrides" -> SQLMetrics.createMetric(sparkContext, "number of skipped row groups"),
"processedStrides" -> SQLMetrics.createMetric(sparkContext, "number of processed row groups"),
"remainingFilterTime" -> SQLMetrics.createNanoTimingMetric(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ class BatchScanMetricsUpdater(val metrics: Map[String, SQLMetric]) extends Metri
metrics("localReadBytes") += operatorMetrics.localReadBytes
metrics("ramReadBytes") += operatorMetrics.ramReadBytes
metrics("preloadSplits") += operatorMetrics.preloadSplits
metrics("dataSourceAddSplitTime") += operatorMetrics.dataSourceAddSplitTime
metrics("dataSourceReadTime") += operatorMetrics.dataSourceReadTime
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ class FileSourceScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric
val skippedSplits: SQLMetric = metrics("skippedSplits")
val processedSplits: SQLMetric = metrics("processedSplits")
val preloadSplits: SQLMetric = metrics("preloadSplits")
val dataSourceAddSplitTime: SQLMetric = metrics("dataSourceAddSplitTime")
val dataSourceReadTime: SQLMetric = metrics("dataSourceReadTime")
val skippedStrides: SQLMetric = metrics("skippedStrides")
val processedStrides: SQLMetric = metrics("processedStrides")
val remainingFilterTime: SQLMetric = metrics("remainingFilterTime")
Expand Down Expand Up @@ -80,6 +82,8 @@ class FileSourceScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric
localReadBytes += operatorMetrics.localReadBytes
ramReadBytes += operatorMetrics.ramReadBytes
preloadSplits += operatorMetrics.preloadSplits
dataSourceAddSplitTime += operatorMetrics.dataSourceAddSplitTime
dataSourceReadTime += operatorMetrics.dataSourceReadTime
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class HiveTableScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric]
val skippedSplits: SQLMetric = metrics("skippedSplits")
val processedSplits: SQLMetric = metrics("processedSplits")
val preloadSplits: SQLMetric = metrics("preloadSplits")
val dataSourceAddSplitTime: SQLMetric = metrics("dataSourceAddSplitTime")
val dataSourceReadTime: SQLMetric = metrics("dataSourceReadTime")
val skippedStrides: SQLMetric = metrics("skippedStrides")
val processedStrides: SQLMetric = metrics("processedStrides")
val remainingFilterTime: SQLMetric = metrics("remainingFilterTime")
Expand Down Expand Up @@ -75,6 +77,8 @@ class HiveTableScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric]
localReadBytes += operatorMetrics.localReadBytes
ramReadBytes += operatorMetrics.ramReadBytes
preloadSplits += operatorMetrics.preloadSplits
dataSourceAddSplitTime += operatorMetrics.dataSourceAddSplitTime
dataSourceReadTime += operatorMetrics.dataSourceReadTime
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ object MetricsUtil extends Logging {
var localReadBytes: Long = 0
var ramReadBytes: Long = 0
var preloadSplits: Long = 0
var dataSourceAddSplitTime: Long = 0
var dataSourceReadTime: Long = 0
var numWrittenFiles: Long = 0

val metricsIterator = operatorMetrics.iterator()
Expand Down Expand Up @@ -163,6 +165,8 @@ object MetricsUtil extends Logging {
localReadBytes += metrics.localReadBytes
ramReadBytes += metrics.ramReadBytes
preloadSplits += metrics.preloadSplits
dataSourceAddSplitTime += metrics.dataSourceAddSplitTime
dataSourceReadTime += metrics.dataSourceReadTime
numWrittenFiles += metrics.numWrittenFiles
}

Expand Down Expand Up @@ -200,6 +204,8 @@ object MetricsUtil extends Logging {
localReadBytes,
ramReadBytes,
preloadSplits,
dataSourceAddSplitTime,
dataSourceReadTime,
physicalWrittenBytes,
writeIOTime,
numWrittenFiles
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,18 @@ class VeloxMetricsSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa
assert(metrics("ramReadBytes").value == 0)
}

test("Velox datasource metrics") {
val df = spark.sql(s"SELECT * FROM metrics_t1")
val scans = collect(df.queryExecution.executedPlan) {
case scan: FileSourceScanExecTransformer => scan
}
df.collect()
assert(scans.length === 1)
val metrics = scans.head.metrics
assert(metrics("dataSourceReadTime").value > 0)
assert(metrics("dataSourceAddSplitTime").value > 0)
}

test("test nested loop join metrics") {
withSQLConf() {
runQueryAndCompare(
Expand Down
4 changes: 3 additions & 1 deletion cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
env,
metricsBuilderClass,
"<init>",
"([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[JLjava/lang/String;)V");
"([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[JLjava/lang/String;)V");

nativeColumnarToRowInfoClass =
createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/vectorized/NativeColumnarToRowInfo;");
Expand Down Expand Up @@ -585,6 +585,8 @@ JNIEXPORT jobject JNICALL Java_org_apache_gluten_metrics_IteratorMetricsJniWrapp
longArray[Metrics::kLocalReadBytes],
longArray[Metrics::kRamReadBytes],
longArray[Metrics::kPreloadSplits],
longArray[Metrics::kDataSourceAddSplitWallNanos],
longArray[Metrics::kDataSourceReadWallNanos],
longArray[Metrics::kPhysicalWrittenBytes],
longArray[Metrics::kWriteIOTime],
longArray[Metrics::kNumWrittenFiles],
Expand Down
2 changes: 2 additions & 0 deletions cpp/core/utils/Metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ struct Metrics {
kLocalReadBytes,
kRamReadBytes,
kPreloadSplits,
kDataSourceAddSplitWallNanos,
kDataSourceReadWallNanos,

// Write metrics.
kPhysicalWrittenBytes,
Expand Down
6 changes: 6 additions & 0 deletions cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ const std::string kStorageReadBytes = "storageReadBytes";
const std::string kLocalReadBytes = "localReadBytes";
const std::string kRamReadBytes = "ramReadBytes";
const std::string kPreloadSplits = "readyPreloadedSplits";
const std::string kDataSourceAddSplitWallNanos = "dataSourceAddSplitWallNanos";
const std::string kDataSourceReadWallNanos = "dataSourceReadWallNanos";
const std::string kNumWrittenFiles = "numWrittenFiles";
const std::string kWriteIOTime = "writeIOWallNanos";

Expand Down Expand Up @@ -466,6 +468,10 @@ void WholeStageResultIterator::collectMetrics() {
metrics_->get(Metrics::kRamReadBytes)[metricIndex] = runtimeMetric("sum", second->customStats, kRamReadBytes);
metrics_->get(Metrics::kPreloadSplits)[metricIndex] =
runtimeMetric("sum", entry.second->customStats, kPreloadSplits);
metrics_->get(Metrics::kDataSourceAddSplitWallNanos)[metricIndex] =
runtimeMetric("sum", second->customStats, kDataSourceAddSplitWallNanos);
metrics_->get(Metrics::kDataSourceReadWallNanos)[metricIndex] =
runtimeMetric("sum", second->customStats, kDataSourceReadWallNanos);
metrics_->get(Metrics::kNumWrittenFiles)[metricIndex] =
runtimeMetric("sum", entry.second->customStats, kNumWrittenFiles);
metrics_->get(Metrics::kPhysicalWrittenBytes)[metricIndex] = second->physicalWrittenBytes;
Expand Down