Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public class Metrics implements IMetrics {
public long[] writeIOTime;
public long[] numWrittenFiles;

public long[] loadLazyVectorTime;

public SingleMetric singleMetric = new SingleMetric();

public String taskStats;
Expand Down Expand Up @@ -104,6 +106,7 @@ public Metrics(
long[] physicalWrittenBytes,
long[] writeIOTime,
long[] numWrittenFiles,
long[] loadLazyVectorTime,
String taskStats) {
this.inputRows = inputRows;
this.inputVectors = inputVectors;
Expand Down Expand Up @@ -145,6 +148,7 @@ public Metrics(
this.physicalWrittenBytes = physicalWrittenBytes;
this.writeIOTime = writeIOTime;
this.numWrittenFiles = numWrittenFiles;
this.loadLazyVectorTime = loadLazyVectorTime;
this.taskStats = taskStats;
}

Expand Down Expand Up @@ -191,7 +195,8 @@ public OperatorMetrics getOperatorMetrics(int index) {
dataSourceReadTime[index],
physicalWrittenBytes[index],
writeIOTime[index],
numWrittenFiles[index]);
numWrittenFiles[index],
loadLazyVectorTime[index]);
}

public SingleMetric getSingleMetrics() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ public class OperatorMetrics implements IOperatorMetrics {
public long writeIOTime;
public long numWrittenFiles;

public long loadLazyVectorTime;

/** Create an instance for operator metrics. */
public OperatorMetrics(
long inputRows,
Expand Down Expand Up @@ -96,7 +98,8 @@ public OperatorMetrics(
long dataSourceReadTime,
long physicalWrittenBytes,
long writeIOTime,
long numWrittenFiles) {
long numWrittenFiles,
long loadLazyVectorTime) {
this.inputRows = inputRows;
this.inputVectors = inputVectors;
this.inputBytes = inputBytes;
Expand Down Expand Up @@ -135,5 +138,6 @@ public OperatorMetrics(
this.physicalWrittenBytes = physicalWrittenBytes;
this.writeIOTime = writeIOTime;
this.numWrittenFiles = numWrittenFiles;
this.loadLazyVectorTime = loadLazyVectorTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,10 @@ class VeloxMetricsApi extends MetricsApi with Logging {
"ioWaitTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "io wait time"),
"storageReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "storage read bytes"),
"localReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "local ssd read bytes"),
"ramReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "ram read bytes")
"ramReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "ram read bytes"),
"loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric(
sparkContext,
"time of loading lazy vectors")
)

override def genBatchScanTransformerMetricsUpdater(
Expand Down Expand Up @@ -169,7 +172,10 @@ class VeloxMetricsApi extends MetricsApi with Logging {
"ioWaitTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "io wait time"),
"storageReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "storage read bytes"),
"localReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "local ssd read bytes"),
"ramReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "ram read bytes")
"ramReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "ram read bytes"),
"loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric(
sparkContext,
"time of loading lazy vectors")
)

override def genHiveTableScanTransformerMetricsUpdater(
Expand Down Expand Up @@ -216,7 +222,10 @@ class VeloxMetricsApi extends MetricsApi with Logging {
"ioWaitTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "io wait time"),
"storageReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "storage read bytes"),
"localReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "local ssd read bytes"),
"ramReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "ram read bytes")
"ramReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "ram read bytes"),
"loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric(
sparkContext,
"time of loading lazy vectors")
)

override def genFileSourceScanTransformerMetricsUpdater(
Expand All @@ -232,7 +241,10 @@ class VeloxMetricsApi extends MetricsApi with Logging {
"peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory bytes"),
"numMemoryAllocations" -> SQLMetrics.createMetric(
sparkContext,
"number of memory allocations")
"number of memory allocations"),
"loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric(
sparkContext,
"time of loading lazy vectors")
)

override def genFilterTransformerMetricsUpdater(
Expand All @@ -250,7 +262,10 @@ class VeloxMetricsApi extends MetricsApi with Logging {
"peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory bytes"),
"numMemoryAllocations" -> SQLMetrics.createMetric(
sparkContext,
"number of memory allocations")
"number of memory allocations"),
"loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric(
sparkContext,
"time of loading lazy vectors")
)

override def genProjectTransformerMetricsUpdater(
Expand Down Expand Up @@ -295,7 +310,10 @@ class VeloxMetricsApi extends MetricsApi with Logging {
"finalOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of final output rows"),
"finalOutputVectors" -> SQLMetrics.createMetric(
sparkContext,
"number of final output vectors")
"number of final output vectors"),
"loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric(
sparkContext,
"time of loading lazy vectors")
)

override def genHashAggregateTransformerMetricsUpdater(
Expand All @@ -312,7 +330,10 @@ class VeloxMetricsApi extends MetricsApi with Logging {
"peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory bytes"),
"numMemoryAllocations" -> SQLMetrics.createMetric(
sparkContext,
"number of memory allocations")
"number of memory allocations"),
"loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric(
sparkContext,
"time of loading lazy vectors")
)

override def genExpandTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater =
Expand Down Expand Up @@ -381,7 +402,10 @@ class VeloxMetricsApi extends MetricsApi with Logging {
"spilledBytes" -> SQLMetrics.createSizeMetric(sparkContext, "bytes written for spilling"),
"spilledRows" -> SQLMetrics.createMetric(sparkContext, "total rows written for spilling"),
"spilledPartitions" -> SQLMetrics.createMetric(sparkContext, "total spilled partitions"),
"spilledFiles" -> SQLMetrics.createMetric(sparkContext, "total spilled files")
"spilledFiles" -> SQLMetrics.createMetric(sparkContext, "total spilled files"),
"loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric(
sparkContext,
"time of loading lazy vectors")
)

override def genWindowTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater =
Expand Down Expand Up @@ -411,7 +435,10 @@ class VeloxMetricsApi extends MetricsApi with Logging {
"peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory bytes"),
"numMemoryAllocations" -> SQLMetrics.createMetric(
sparkContext,
"number of memory allocations")
"number of memory allocations"),
"loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric(
sparkContext,
"time of loading lazy vectors")
)

override def genLimitTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater =
Expand All @@ -424,7 +451,10 @@ class VeloxMetricsApi extends MetricsApi with Logging {
"number of written bytes"),
"writeIONanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time of write IO"),
"wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time of write"),
"numWrittenFiles" -> SQLMetrics.createMetric(sparkContext, "number of written files")
"numWrittenFiles" -> SQLMetrics.createMetric(sparkContext, "number of written files"),
"loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric(
sparkContext,
"time of loading lazy vectors")
)

def genWriteFilesTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater =
Expand All @@ -444,7 +474,10 @@ class VeloxMetricsApi extends MetricsApi with Logging {
"spilledBytes" -> SQLMetrics.createSizeMetric(sparkContext, "bytes written for spilling"),
"spilledRows" -> SQLMetrics.createMetric(sparkContext, "total rows written for spilling"),
"spilledPartitions" -> SQLMetrics.createMetric(sparkContext, "total spilled partitions"),
"spilledFiles" -> SQLMetrics.createMetric(sparkContext, "total spilled files")
"spilledFiles" -> SQLMetrics.createMetric(sparkContext, "total spilled files"),
"loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric(
sparkContext,
"time of loading lazy vectors")
)

override def genSortTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater =
Expand Down Expand Up @@ -479,7 +512,10 @@ class VeloxMetricsApi extends MetricsApi with Logging {
"postProject cpu wall time count"),
"postProjectionWallNanos" -> SQLMetrics.createNanoTimingMetric(
sparkContext,
"time of postProjection")
"time of postProjection"),
"loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric(
sparkContext,
"time of loading lazy vectors")
)

override def genSortMergeJoinTransformerMetricsUpdater(
Expand Down Expand Up @@ -598,7 +634,10 @@ class VeloxMetricsApi extends MetricsApi with Logging {
"time of postProjection"),
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numOutputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
"numOutputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes")
"numOutputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
"loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric(
sparkContext,
"time of loading lazy vectors")
)

override def genHashJoinTransformerMetricsUpdater(
Expand Down Expand Up @@ -663,7 +702,10 @@ class VeloxMetricsApi extends MetricsApi with Logging {
"time of postProjection"),
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numOutputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
"numOutputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes")
"numOutputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
"loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric(
sparkContext,
"time of loading lazy vectors")
)

override def genNestedLoopJoinTransformerMetricsUpdater(
Expand All @@ -679,7 +721,10 @@ class VeloxMetricsApi extends MetricsApi with Logging {
"peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory bytes"),
"numMemoryAllocations" -> SQLMetrics.createMetric(
sparkContext,
"number of memory allocations")
"number of memory allocations"),
"loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric(
sparkContext,
"time of loading lazy vectors")
)

override def genSampleTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater =
Expand All @@ -690,7 +735,10 @@ class VeloxMetricsApi extends MetricsApi with Logging {
"inputVectors" -> SQLMetrics.createMetric(sparkContext, "number of input vectors"),
"inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of input bytes"),
"wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time of union"),
"cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time count")
"cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time count"),
"loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric(
sparkContext,
"time of loading lazy vectors")
)

override def genUnionTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class BatchScanMetricsUpdater(val metrics: Map[String, SQLMetric]) extends Metri
metrics("preloadSplits") += operatorMetrics.preloadSplits
metrics("dataSourceAddSplitTime") += operatorMetrics.dataSourceAddSplitTime
metrics("dataSourceReadTime") += operatorMetrics.dataSourceReadTime
metrics("loadLazyVectorTime") += operatorMetrics.loadLazyVectorTime
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class ExpandMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsU
metrics("wallNanos") += operatorMetrics.wallNanos
metrics("peakMemoryBytes") += operatorMetrics.peakMemoryBytes
metrics("numMemoryAllocations") += operatorMetrics.numMemoryAllocations
metrics("loadLazyVectorTime") += operatorMetrics.loadLazyVectorTime
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class FileSourceScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric
val storageReadBytes: SQLMetric = metrics("storageReadBytes")
val localReadBytes: SQLMetric = metrics("localReadBytes")
val ramReadBytes: SQLMetric = metrics("ramReadBytes")
val loadLazyVectorTime: SQLMetric = metrics("loadLazyVectorTime")

override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = {
inputMetrics.bridgeIncBytesRead(rawInputBytes.value)
Expand Down Expand Up @@ -84,6 +85,7 @@ class FileSourceScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric
preloadSplits += operatorMetrics.preloadSplits
dataSourceAddSplitTime += operatorMetrics.dataSourceAddSplitTime
dataSourceReadTime += operatorMetrics.dataSourceReadTime
loadLazyVectorTime += operatorMetrics.loadLazyVectorTime
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class FilterMetricsUpdater(
metrics("wallNanos") += operatorMetrics.wallNanos
metrics("peakMemoryBytes") += operatorMetrics.peakMemoryBytes
metrics("numMemoryAllocations") += operatorMetrics.numMemoryAllocations
metrics("loadLazyVectorTime") += operatorMetrics.loadLazyVectorTime
extraMetrics.foreach {
case (name, metric) =>
name match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class GenerateMetricsUpdater(val metrics: Map[String, SQLMetric]) extends Metric
metrics("wallNanos") += nativeMetrics.wallNanos
metrics("peakMemoryBytes") += nativeMetrics.peakMemoryBytes
metrics("numMemoryAllocations") += nativeMetrics.numMemoryAllocations
metrics("loadLazyVectorTime") += nativeMetrics.loadLazyVectorTime
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.utils.SparkMetricsUtil
import org.apache.spark.task.TaskResources

import scala.collection.JavaConverters._

trait HashAggregateMetricsUpdater extends MetricsUpdater {
def updateAggregationMetrics(
aggregationMetrics: java.util.ArrayList[OperatorMetrics],
Expand Down Expand Up @@ -53,6 +55,8 @@ class HashAggregateMetricsUpdaterImpl(val metrics: Map[String, SQLMetric])
val finalOutputRows: SQLMetric = metrics("finalOutputRows")
val finalOutputVectors: SQLMetric = metrics("finalOutputVectors")

val loadLazyVectorTime: SQLMetric = metrics("loadLazyVectorTime")

override def updateAggregationMetrics(
aggregationMetrics: java.util.ArrayList[OperatorMetrics],
aggParams: AggregationParams): Unit = {
Expand Down Expand Up @@ -85,6 +89,9 @@ class HashAggregateMetricsUpdaterImpl(val metrics: Map[String, SQLMetric])
rowConstructionWallNanos += aggregationMetrics.get(idx).wallNanos
idx += 1
}

loadLazyVectorTime += aggregationMetrics.asScala.last.loadLazyVectorTime

if (TaskResources.inSparkTask()) {
SparkMetricsUtil.incMemoryBytesSpilled(
TaskResources.getLocalTaskContext().taskMetrics(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class HiveTableScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric]
val storageReadBytes: SQLMetric = metrics("storageReadBytes")
val localReadBytes: SQLMetric = metrics("localReadBytes")
val ramReadBytes: SQLMetric = metrics("ramReadBytes")
val loadLazyVectorTime: SQLMetric = metrics("loadLazyVectorTime")

override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = {
inputMetrics.bridgeIncBytesRead(rawInputBytes.value)
Expand Down Expand Up @@ -79,6 +80,7 @@ class HiveTableScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric]
preloadSplits += operatorMetrics.preloadSplits
dataSourceAddSplitTime += operatorMetrics.dataSourceAddSplitTime
dataSourceReadTime += operatorMetrics.dataSourceReadTime
loadLazyVectorTime += operatorMetrics.loadLazyVectorTime
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import org.apache.spark.task.TaskResources

import java.util

import scala.collection.JavaConverters._

trait JoinMetricsUpdater extends MetricsUpdater {
def updateJoinMetrics(
joinMetrics: java.util.ArrayList[OperatorMetrics],
Expand Down Expand Up @@ -103,6 +105,8 @@ class HashJoinMetricsUpdater(override val metrics: Map[String, SQLMetric])
val buildPreProjectionCpuCount: SQLMetric = metrics("buildPreProjectionCpuCount")
val buildPreProjectionWallNanos: SQLMetric = metrics("buildPreProjectionWallNanos")

val loadLazyVectorTime: SQLMetric = metrics("loadLazyVectorTime")

override protected def updateJoinMetricsInternal(
joinMetrics: java.util.ArrayList[OperatorMetrics],
joinParams: JoinParams): Unit = {
Expand Down Expand Up @@ -166,6 +170,8 @@ class HashJoinMetricsUpdater(override val metrics: Map[String, SQLMetric])
TaskResources.getLocalTaskContext().taskMetrics(),
hashBuildMetrics.spilledBytes)
}

loadLazyVectorTime += joinMetrics.asScala.last.loadLazyVectorTime
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class LimitMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsUp
metrics("wallNanos") += operatorMetrics.wallNanos
metrics("peakMemoryBytes") += operatorMetrics.peakMemoryBytes
metrics("numMemoryAllocations") += operatorMetrics.numMemoryAllocations
metrics("loadLazyVectorTime") += operatorMetrics.loadLazyVectorTime
}
}
}
Loading
Loading