Skip to content
Draft

DNM #11575

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,8 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
updateNativeMetrics: IMetrics => Unit,
partitionIndex: Int,
materializeInput: Boolean,
enableCudf: Boolean): Iterator[ColumnarBatch] = {
enableCudf: Boolean,
supportsValueStreamDynamicFilter: Boolean): Iterator[ColumnarBatch] = {
// scalastyle:on argcount

// Final iterator does not contain scan split, so pass empty split info to native here.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class Metrics implements IMetrics {
public long[] numDynamicFiltersProduced;
public long[] numDynamicFiltersAccepted;
public long[] numReplacedWithDynamicFilterRows;
public long[] numDynamicFilteredRows;
public long[] flushRowCount;
public long[] loadedToValueHook;
public long[] bloomFilterBlocksByteSize;
Expand Down Expand Up @@ -90,6 +91,7 @@ public Metrics(
long[] numDynamicFiltersProduced,
long[] numDynamicFiltersAccepted,
long[] numReplacedWithDynamicFilterRows,
long[] numDynamicFilteredRows,
long[] flushRowCount,
long[] loadedToValueHook,
long[] bloomFilterBlocksByteSize,
Expand Down Expand Up @@ -134,6 +136,7 @@ public Metrics(
this.numDynamicFiltersProduced = numDynamicFiltersProduced;
this.numDynamicFiltersAccepted = numDynamicFiltersAccepted;
this.numReplacedWithDynamicFilterRows = numReplacedWithDynamicFilterRows;
this.numDynamicFilteredRows = numDynamicFilteredRows;
this.flushRowCount = flushRowCount;
this.loadedToValueHook = loadedToValueHook;
this.bloomFilterBlocksByteSize = bloomFilterBlocksByteSize;
Expand Down Expand Up @@ -184,6 +187,7 @@ public OperatorMetrics getOperatorMetrics(int index) {
numDynamicFiltersProduced[index],
numDynamicFiltersAccepted[index],
numReplacedWithDynamicFilterRows[index],
numDynamicFilteredRows[index],
flushRowCount[index],
loadedToValueHook[index],
bloomFilterBlocksByteSize[index],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class OperatorMetrics implements IOperatorMetrics {
public long numDynamicFiltersProduced;
public long numDynamicFiltersAccepted;
public long numReplacedWithDynamicFilterRows;
public long numDynamicFilteredRows;
public long flushRowCount;
public long loadedToValueHook;
public long bloomFilterBlocksByteSize;
Expand Down Expand Up @@ -83,6 +84,7 @@ public OperatorMetrics(
long numDynamicFiltersProduced,
long numDynamicFiltersAccepted,
long numReplacedWithDynamicFilterRows,
long numDynamicFilteredRows,
long flushRowCount,
long loadedToValueHook,
long bloomFilterBlocksByteSize,
Expand Down Expand Up @@ -125,6 +127,7 @@ public OperatorMetrics(
this.numDynamicFiltersProduced = numDynamicFiltersProduced;
this.numDynamicFiltersAccepted = numDynamicFiltersAccepted;
this.numReplacedWithDynamicFilterRows = numReplacedWithDynamicFilterRows;
this.numDynamicFilteredRows = numDynamicFilteredRows;
this.flushRowCount = flushRowCount;
this.loadedToValueHook = loadedToValueHook;
this.bloomFilterBlocksByteSize = bloomFilterBlocksByteSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
import org.apache.spark.sql.vectorized.ColumnarBatch
Expand Down Expand Up @@ -247,8 +248,17 @@ class VeloxIteratorApi extends IteratorApi with Logging {
updateNativeMetrics: IMetrics => Unit,
partitionIndex: Int,
materializeInput: Boolean,
enableCudf: Boolean = false): Iterator[ColumnarBatch] = {
val extraConf = Map(GlutenConfig.COLUMNAR_CUDF_ENABLED.key -> enableCudf.toString).asJava
enableCudf: Boolean = false,
supportsValueStreamDynamicFilter: Boolean = true): Iterator[ColumnarBatch] = {
val extraConfMap = mutable.Map(GlutenConfig.COLUMNAR_CUDF_ENABLED.key -> enableCudf.toString)
if (!supportsValueStreamDynamicFilter) {
extraConfMap(VeloxConfig.VALUE_STREAM_DYNAMIC_FILTER_ENABLED.key) = "false"
} else {
val veloxConf = new VeloxConfig(SQLConf.get)
extraConfMap(VeloxConfig.VALUE_STREAM_DYNAMIC_FILTER_ENABLED.key) =
veloxConf.valueStreamDynamicFilterEnabled.toString
}
val extraConf = extraConfMap.asJava
val transKernel = NativePlanEvaluator.create(BackendsApiManager.getBackendName, extraConf)
val columnarNativeIterator =
inputIterators.map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,12 @@ class VeloxMetricsApi extends MetricsApi with Logging {
"hashProbeDynamicFiltersProduced" -> SQLMetrics.createMetric(
sparkContext,
"number of hash probe dynamic filters produced"),
"valueStreamDynamicFiltersAccepted" -> SQLMetrics.createMetric(
sparkContext,
"number of dynamic filters accepted by value stream"),
"valueStreamDynamicFilteredRows" -> SQLMetrics.createMetric(
sparkContext,
"number of rows filtered by value stream dynamic filter"),
"bloomFilterBlocksByteSize" -> SQLMetrics.createSizeMetric(
sparkContext,
"bloom filter blocks byte size"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) {

def hashProbeDynamicFilterPushdownEnabled: Boolean =
getConf(HASH_PROBE_DYNAMIC_FILTER_PUSHDOWN_ENABLED)

def valueStreamDynamicFilterEnabled: Boolean =
getConf(VALUE_STREAM_DYNAMIC_FILTER_ENABLED)
}

object VeloxConfig extends ConfigRegistry {
Expand Down Expand Up @@ -468,6 +471,14 @@ object VeloxConfig extends ConfigRegistry {
.booleanConf
.createWithDefault(true)

val VALUE_STREAM_DYNAMIC_FILTER_ENABLED =
buildConf("spark.gluten.sql.columnar.backend.velox.valueStream.dynamicFilter.enabled")
.doc(
"Whether to apply dynamic filters pushed down from hash probe in the ValueStream" +
" (shuffle reader) operator to filter rows before they reach the hash join.")
.booleanConf
.createWithDefault(false)

val COLUMNAR_VELOX_FILE_HANDLE_CACHE_ENABLED =
buildStaticConf("spark.gluten.sql.columnar.backend.velox.fileHandleCacheEnabled")
.doc(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ class HashJoinMetricsUpdater(override val metrics: Map[String, SQLMetric])

val bloomFilterBlocksByteSize: SQLMetric = metrics("bloomFilterBlocksByteSize")

val valueStreamDynamicFiltersAccepted: SQLMetric =
metrics("valueStreamDynamicFiltersAccepted")
val valueStreamDynamicFilteredRows: SQLMetric =
metrics("valueStreamDynamicFilteredRows")

val streamPreProjectionCpuCount: SQLMetric = metrics("streamPreProjectionCpuCount")
val streamPreProjectionWallNanos: SQLMetric = metrics("streamPreProjectionWallNanos")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ object MetricsUtil extends Logging {
var numDynamicFiltersProduced: Long = 0
var numDynamicFiltersAccepted: Long = 0
var numReplacedWithDynamicFilterRows: Long = 0
var numDynamicFilteredRows: Long = 0
var flushRowCount: Long = 0
var loadedToValueHook: Long = 0
var bloomFilterBlocksByteSize: Long = 0
Expand Down Expand Up @@ -155,6 +156,7 @@ object MetricsUtil extends Logging {
numDynamicFiltersProduced += metrics.numDynamicFiltersProduced
numDynamicFiltersAccepted += metrics.numDynamicFiltersAccepted
numReplacedWithDynamicFilterRows += metrics.numReplacedWithDynamicFilterRows
numDynamicFilteredRows += metrics.numDynamicFilteredRows
flushRowCount += metrics.flushRowCount
loadedToValueHook += metrics.loadedToValueHook
bloomFilterBlocksByteSize += metrics.bloomFilterBlocksByteSize
Expand Down Expand Up @@ -197,6 +199,7 @@ object MetricsUtil extends Logging {
numDynamicFiltersProduced,
numDynamicFiltersAccepted,
numReplacedWithDynamicFilterRows,
numDynamicFilteredRows,
flushRowCount,
loadedToValueHook,
bloomFilterBlocksByteSize,
Expand Down Expand Up @@ -295,6 +298,8 @@ object MetricsUtil extends Logging {
curMetricsIdx
}

val childStartMetricsIdx = newMetricsIdx

mutNode.children.foreach {
child =>
val result = updateTransformerMetricsInternal(
Expand All @@ -309,6 +314,19 @@ object MetricsUtil extends Logging {
newMetricsIdx = result._2
}

// Collect ValueStream dynamic filter metrics from child operators (scan nodes)
// since these stats are reported on the ValueStream/TableScan operator, not on
// the HashProbe/HashBuild operators that are part of the join's own metrics.
mutNode.updater match {
case hju: HashJoinMetricsUpdater =>
for (idx <- (newMetricsIdx + 1) to childStartMetricsIdx) {
val childOpMetrics = metrics.getOperatorMetrics(idx)
hju.valueStreamDynamicFiltersAccepted += childOpMetrics.numDynamicFiltersAccepted
hju.valueStreamDynamicFilteredRows += childOpMetrics.numDynamicFilteredRows
}
case _ =>
}

(newOperatorIdx, newMetricsIdx)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,4 +342,41 @@ class VeloxHashJoinSuite extends VeloxWholeStageTransformerSuite {
}
}
}

test("Value stream dynamic filter pushdown") {
withSQLConf(
"spark.sql.autoBroadcastJoinThreshold" -> "-1",
"spark.sql.adaptive.enabled" -> "false",
GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key -> "true",
VeloxConfig.VALUE_STREAM_DYNAMIC_FILTER_ENABLED.key -> "true"
) {
withTable("vs_probe_table", "vs_build_table") {
spark.sql("""
CREATE TABLE vs_probe_table USING PARQUET
AS SELECT id as a FROM range(110001)
""")

spark.sql("""
CREATE TABLE vs_build_table USING PARQUET
AS SELECT id * 1000 as b FROM range(220002)
""")

runQueryAndCompare(
"SELECT a FROM vs_probe_table JOIN vs_build_table ON a = b"
) {
df =>
val join = find(df.queryExecution.executedPlan) {
case _: ShuffledHashJoinExecTransformer => true
case _ => false
}
assert(join.isDefined)
val metrics = join.get.metrics
assert(metrics.contains("valueStreamDynamicFiltersAccepted"))
assert(metrics("valueStreamDynamicFiltersAccepted").value > 0)
assert(metrics.contains("valueStreamDynamicFilteredRows"))
assert(metrics("valueStreamDynamicFilteredRows").value > 0)
}
}
}
}
}
3 changes: 2 additions & 1 deletion cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
env,
metricsBuilderClass,
"<init>",
"([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[JLjava/lang/String;)V");
"([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[JLjava/lang/String;)V");

nativeColumnarToRowInfoClass =
createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/vectorized/NativeColumnarToRowInfo;");
Expand Down Expand Up @@ -589,6 +589,7 @@ JNIEXPORT jobject JNICALL Java_org_apache_gluten_metrics_IteratorMetricsJniWrapp
longArray[Metrics::kNumDynamicFiltersProduced],
longArray[Metrics::kNumDynamicFiltersAccepted],
longArray[Metrics::kNumReplacedWithDynamicFilterRows],
longArray[Metrics::kNumDynamicFilteredRows],
longArray[Metrics::kFlushRowCount],
longArray[Metrics::kLoadedToValueHook],
longArray[Metrics::kBloomFilterBlocksByteSize],
Expand Down
1 change: 1 addition & 0 deletions cpp/core/utils/Metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ struct Metrics {
kNumDynamicFiltersProduced,
kNumDynamicFiltersAccepted,
kNumReplacedWithDynamicFilterRows,
kNumDynamicFilteredRows,
kFlushRowCount,
kLoadedToValueHook,
kBloomFilterBlocksByteSize,
Expand Down
3 changes: 2 additions & 1 deletion cpp/velox/compute/VeloxBackend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,8 @@ void VeloxBackend::initConnector(const std::shared_ptr<velox::config::ConfigBase
std::make_shared<velox::connector::hive::HiveConnector>(kHiveConnectorId, hiveConf, ioExecutor_.get()));

// Register value-stream connector for runtime iterator-based inputs
velox::connector::registerConnector(std::make_shared<ValueStreamConnector>(kIteratorConnectorId, hiveConf));
velox::connector::registerConnector(
std::make_shared<ValueStreamConnector>(kIteratorConnectorId, hiveConf));

#ifdef GLUTEN_ENABLE_GPU
if (backendConf_->get<bool>(kCudfEnableTableScan, kCudfEnableTableScanDefault) &&
Expand Down
3 changes: 3 additions & 0 deletions cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ namespace {
const std::string kDynamicFiltersProduced = "dynamicFiltersProduced";
const std::string kDynamicFiltersAccepted = "dynamicFiltersAccepted";
const std::string kReplacedWithDynamicFilterRows = "replacedWithDynamicFilterRows";
const std::string kDynamicFilteredRows = "dynamicFilteredRows";
const std::string kFlushRowCount = "flushRowCount";
const std::string kLoadedToValueHook = "loadedToValueHook";
const std::string kBloomFilterBlocksByteSize = "bloomFilterSize";
Expand Down Expand Up @@ -492,6 +493,8 @@ void WholeStageResultIterator::collectMetrics() {
runtimeMetric("sum", second->customStats, kDynamicFiltersAccepted);
metrics_->get(Metrics::kNumReplacedWithDynamicFilterRows)[metricIndex] =
runtimeMetric("sum", second->customStats, kReplacedWithDynamicFilterRows);
metrics_->get(Metrics::kNumDynamicFilteredRows)[metricIndex] =
runtimeMetric("sum", second->customStats, kDynamicFilteredRows);
metrics_->get(Metrics::kFlushRowCount)[metricIndex] = runtimeMetric("sum", second->customStats, kFlushRowCount);
metrics_->get(Metrics::kLoadedToValueHook)[metricIndex] =
runtimeMetric("sum", second->customStats, kLoadedToValueHook);
Expand Down
4 changes: 4 additions & 0 deletions cpp/velox/config/VeloxConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ const std::string kHashProbeDynamicFilterPushdownEnabled =
const std::string kHashProbeBloomFilterPushdownMaxSize =
"spark.gluten.sql.columnar.backend.velox.hashProbe.bloomFilterPushdown.maxSize";

const std::string kValueStreamDynamicFilterEnabled =
"spark.gluten.sql.columnar.backend.velox.valueStream.dynamicFilter.enabled";
const bool kValueStreamDynamicFilterEnabledDefault = false;

const std::string kShowTaskMetricsWhenFinished = "spark.gluten.sql.columnar.backend.velox.showTaskMetricsWhenFinished";
const bool kShowTaskMetricsWhenFinishedDefault = false;

Expand Down
Loading
Loading