diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala index 2cd9d8516493..4d0a916e42e2 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala @@ -325,7 +325,8 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { updateNativeMetrics: IMetrics => Unit, partitionIndex: Int, materializeInput: Boolean, - enableCudf: Boolean): Iterator[ColumnarBatch] = { + enableCudf: Boolean, + supportsValueStreamDynamicFilter: Boolean): Iterator[ColumnarBatch] = { // scalastyle:on argcount // Final iterator does not contain scan split, so pass empty split info to native here. diff --git a/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java b/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java index 3d23dc94db97..30baae53199a 100644 --- a/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java +++ b/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java @@ -40,6 +40,7 @@ public class Metrics implements IMetrics { public long[] numDynamicFiltersProduced; public long[] numDynamicFiltersAccepted; public long[] numReplacedWithDynamicFilterRows; + public long[] numDynamicFilteredRows; public long[] flushRowCount; public long[] loadedToValueHook; public long[] bloomFilterBlocksByteSize; @@ -90,6 +91,7 @@ public Metrics( long[] numDynamicFiltersProduced, long[] numDynamicFiltersAccepted, long[] numReplacedWithDynamicFilterRows, + long[] numDynamicFilteredRows, long[] flushRowCount, long[] loadedToValueHook, long[] bloomFilterBlocksByteSize, @@ -134,6 +136,7 @@ public Metrics( this.numDynamicFiltersProduced = numDynamicFiltersProduced; this.numDynamicFiltersAccepted = numDynamicFiltersAccepted; this.numReplacedWithDynamicFilterRows = numReplacedWithDynamicFilterRows; + this.numDynamicFilteredRows = numDynamicFilteredRows; this.flushRowCount = flushRowCount; this.loadedToValueHook = loadedToValueHook; this.bloomFilterBlocksByteSize = bloomFilterBlocksByteSize; @@ -184,6 +187,7 @@ public OperatorMetrics getOperatorMetrics(int index) { numDynamicFiltersProduced[index], numDynamicFiltersAccepted[index], numReplacedWithDynamicFilterRows[index], + numDynamicFilteredRows[index], flushRowCount[index], loadedToValueHook[index], bloomFilterBlocksByteSize[index], diff --git a/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java b/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java index 10563e507e9b..cf51d00925d1 100644 --- a/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java +++ b/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java @@ -38,6 +38,7 @@ public class OperatorMetrics implements IOperatorMetrics { public long numDynamicFiltersProduced; public long numDynamicFiltersAccepted; public long numReplacedWithDynamicFilterRows; + public long numDynamicFilteredRows; public long flushRowCount; public long loadedToValueHook; public long bloomFilterBlocksByteSize; @@ -83,6 +84,7 @@ public OperatorMetrics( long numDynamicFiltersProduced, long numDynamicFiltersAccepted, long numReplacedWithDynamicFilterRows, + long numDynamicFilteredRows, long flushRowCount, long loadedToValueHook, long bloomFilterBlocksByteSize, @@ -125,6 +127,7 @@ public OperatorMetrics( this.numDynamicFiltersProduced = numDynamicFiltersProduced; this.numDynamicFiltersAccepted = numDynamicFiltersAccepted; this.numReplacedWithDynamicFilterRows = numReplacedWithDynamicFilterRows; + this.numDynamicFilteredRows = numDynamicFilteredRows; this.flushRowCount = flushRowCount; this.loadedToValueHook = loadedToValueHook; this.bloomFilterBlocksByteSize = bloomFilterBlocksByteSize; diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala index 668e60b20542..d8a8d8541734 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter} import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper import org.apache.spark.sql.vectorized.ColumnarBatch @@ -247,8 +248,17 @@ class VeloxIteratorApi extends IteratorApi with Logging { updateNativeMetrics: IMetrics => Unit, partitionIndex: Int, materializeInput: Boolean, - enableCudf: Boolean = false): Iterator[ColumnarBatch] = { - val extraConf = Map(GlutenConfig.COLUMNAR_CUDF_ENABLED.key -> enableCudf.toString).asJava + enableCudf: Boolean = false, + supportsValueStreamDynamicFilter: Boolean = true): Iterator[ColumnarBatch] = { + val extraConfMap = mutable.Map(GlutenConfig.COLUMNAR_CUDF_ENABLED.key -> enableCudf.toString) + if (!supportsValueStreamDynamicFilter) { + extraConfMap(VeloxConfig.VALUE_STREAM_DYNAMIC_FILTER_ENABLED.key) = "false" + } else { + val veloxConf = new VeloxConfig(SQLConf.get) + extraConfMap(VeloxConfig.VALUE_STREAM_DYNAMIC_FILTER_ENABLED.key) = + veloxConf.valueStreamDynamicFilterEnabled.toString + } + val extraConf = extraConfMap.asJava val transKernel = NativePlanEvaluator.create(BackendsApiManager.getBackendName, extraConf) val columnarNativeIterator = inputIterators.map { diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala index 29d882c3d3db..28eb81a2cd70 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala @@ -628,6 +628,12 @@ class VeloxMetricsApi extends MetricsApi with Logging { "hashProbeDynamicFiltersProduced" -> SQLMetrics.createMetric( sparkContext, "number of hash probe dynamic filters produced"), + "valueStreamDynamicFiltersAccepted" -> SQLMetrics.createMetric( + sparkContext, + "number of dynamic filters accepted by value stream"), + "valueStreamDynamicFilteredRows" -> SQLMetrics.createMetric( + sparkContext, + "number of rows filtered by value stream dynamic filter"), "bloomFilterBlocksByteSize" -> SQLMetrics.createSizeMetric( sparkContext, "bloom filter blocks byte size"), diff --git a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala index ee0866391ce0..9f1226cf9e6c 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala @@ -90,6 +90,9 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) { def hashProbeDynamicFilterPushdownEnabled: Boolean = getConf(HASH_PROBE_DYNAMIC_FILTER_PUSHDOWN_ENABLED) + + def valueStreamDynamicFilterEnabled: Boolean = + getConf(VALUE_STREAM_DYNAMIC_FILTER_ENABLED) } object VeloxConfig extends ConfigRegistry { @@ -468,6 +471,14 @@ object VeloxConfig extends ConfigRegistry { .booleanConf .createWithDefault(true) + val VALUE_STREAM_DYNAMIC_FILTER_ENABLED = + buildConf("spark.gluten.sql.columnar.backend.velox.valueStream.dynamicFilter.enabled") + .doc( + "Whether to apply dynamic filters pushed down from hash probe in the ValueStream" + + " (shuffle reader) operator to filter rows before they reach the hash join.") + .booleanConf + .createWithDefault(false) + val COLUMNAR_VELOX_FILE_HANDLE_CACHE_ENABLED = buildStaticConf("spark.gluten.sql.columnar.backend.velox.fileHandleCacheEnabled") .doc( diff --git a/backends-velox/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala b/backends-velox/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala index cf894b9da466..97a2b35f1e13 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala @@ -101,6 +101,11 @@ class HashJoinMetricsUpdater(override val metrics: Map[String, SQLMetric]) val bloomFilterBlocksByteSize: SQLMetric = metrics("bloomFilterBlocksByteSize") + val valueStreamDynamicFiltersAccepted: SQLMetric = + metrics("valueStreamDynamicFiltersAccepted") + val valueStreamDynamicFilteredRows: SQLMetric = + metrics("valueStreamDynamicFilteredRows") + val streamPreProjectionCpuCount: SQLMetric = metrics("streamPreProjectionCpuCount") val streamPreProjectionWallNanos: SQLMetric = metrics("streamPreProjectionWallNanos") diff --git a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala index 607de718ce07..a55d09964898 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala @@ -120,6 +120,7 @@ object MetricsUtil extends Logging { var numDynamicFiltersProduced: Long = 0 var numDynamicFiltersAccepted: Long = 0 var numReplacedWithDynamicFilterRows: Long = 0 + var numDynamicFilteredRows: Long = 0 var flushRowCount: Long = 0 var loadedToValueHook: Long = 0 var bloomFilterBlocksByteSize: Long = 0 @@ -155,6 +156,7 @@ object MetricsUtil extends Logging { numDynamicFiltersProduced += metrics.numDynamicFiltersProduced numDynamicFiltersAccepted += metrics.numDynamicFiltersAccepted numReplacedWithDynamicFilterRows += metrics.numReplacedWithDynamicFilterRows + numDynamicFilteredRows += metrics.numDynamicFilteredRows flushRowCount += metrics.flushRowCount loadedToValueHook += metrics.loadedToValueHook bloomFilterBlocksByteSize += metrics.bloomFilterBlocksByteSize @@ -197,6 +199,7 @@ object MetricsUtil extends Logging { numDynamicFiltersProduced, numDynamicFiltersAccepted, numReplacedWithDynamicFilterRows, + numDynamicFilteredRows, flushRowCount, loadedToValueHook, bloomFilterBlocksByteSize, @@ -295,6 +298,8 @@ object MetricsUtil extends Logging { curMetricsIdx } + val childStartMetricsIdx = newMetricsIdx + mutNode.children.foreach { child => val result = updateTransformerMetricsInternal( @@ -309,6 +314,19 @@ object MetricsUtil extends Logging { newMetricsIdx = result._2 } + // Collect ValueStream dynamic filter metrics from child operators (scan nodes) + // since these stats are reported on the ValueStream/TableScan operator, not on + // the HashProbe/HashBuild operators that are part of the join's own metrics. + mutNode.updater match { + case hju: HashJoinMetricsUpdater => + for (idx <- (newMetricsIdx + 1) to childStartMetricsIdx) { + val childOpMetrics = metrics.getOperatorMetrics(idx) + hju.valueStreamDynamicFiltersAccepted += childOpMetrics.numDynamicFiltersAccepted + hju.valueStreamDynamicFilteredRows += childOpMetrics.numDynamicFilteredRows + } + case _ => + } + (newOperatorIdx, newMetricsIdx) } diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala index 8db6b957753f..3b9f4ae7f83d 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala @@ -342,4 +342,41 @@ class VeloxHashJoinSuite extends VeloxWholeStageTransformerSuite { } } } + + test("Value stream dynamic filter pushdown") { + withSQLConf( + "spark.sql.autoBroadcastJoinThreshold" -> "-1", + "spark.sql.adaptive.enabled" -> "false", + GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key -> "true", + VeloxConfig.VALUE_STREAM_DYNAMIC_FILTER_ENABLED.key -> "true" + ) { + withTable("vs_probe_table", "vs_build_table") { + spark.sql(""" + CREATE TABLE vs_probe_table USING PARQUET + AS SELECT id as a FROM range(110001) + """) + + spark.sql(""" + CREATE TABLE vs_build_table USING PARQUET + AS SELECT id * 1000 as b FROM range(220002) + """) + + runQueryAndCompare( + "SELECT a FROM vs_probe_table JOIN vs_build_table ON a = b" + ) { + df => + val join = find(df.queryExecution.executedPlan) { + case _: ShuffledHashJoinExecTransformer => true + case _ => false + } + assert(join.isDefined) + val metrics = join.get.metrics + assert(metrics.contains("valueStreamDynamicFiltersAccepted")) + assert(metrics("valueStreamDynamicFiltersAccepted").value > 0) + assert(metrics.contains("valueStreamDynamicFilteredRows")) + assert(metrics("valueStreamDynamicFilteredRows").value > 0) + } + } + } + } } diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index c48886195b22..a65d60e30c50 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -273,7 +273,7 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { env, metricsBuilderClass, "", - "([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[JLjava/lang/String;)V"); + "([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[JLjava/lang/String;)V"); nativeColumnarToRowInfoClass = createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/vectorized/NativeColumnarToRowInfo;"); @@ -589,6 +589,7 @@ JNIEXPORT jobject JNICALL Java_org_apache_gluten_metrics_IteratorMetricsJniWrapp longArray[Metrics::kNumDynamicFiltersProduced], longArray[Metrics::kNumDynamicFiltersAccepted], longArray[Metrics::kNumReplacedWithDynamicFilterRows], + longArray[Metrics::kNumDynamicFilteredRows], longArray[Metrics::kFlushRowCount], longArray[Metrics::kLoadedToValueHook], longArray[Metrics::kBloomFilterBlocksByteSize], diff --git a/cpp/core/utils/Metrics.h b/cpp/core/utils/Metrics.h index 8e33a4a9c613..6d1c79bf6daa 100644 --- a/cpp/core/utils/Metrics.h +++ b/cpp/core/utils/Metrics.h @@ -67,6 +67,7 @@ struct Metrics { kNumDynamicFiltersProduced, kNumDynamicFiltersAccepted, kNumReplacedWithDynamicFilterRows, + kNumDynamicFilteredRows, kFlushRowCount, kLoadedToValueHook, kBloomFilterBlocksByteSize, diff --git a/cpp/velox/compute/VeloxBackend.cc b/cpp/velox/compute/VeloxBackend.cc index 45a64908e199..a9fd5e123976 100644 --- a/cpp/velox/compute/VeloxBackend.cc +++ b/cpp/velox/compute/VeloxBackend.cc @@ -319,7 +319,8 @@ void VeloxBackend::initConnector(const std::shared_ptr(kHiveConnectorId, hiveConf, ioExecutor_.get())); // Register value-stream connector for runtime iterator-based inputs - velox::connector::registerConnector(std::make_shared(kIteratorConnectorId, hiveConf)); + velox::connector::registerConnector( + std::make_shared(kIteratorConnectorId, hiveConf)); #ifdef GLUTEN_ENABLE_GPU if (backendConf_->get(kCudfEnableTableScan, kCudfEnableTableScanDefault) && diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index 157f9829b817..6ca24907a063 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -43,6 +43,7 @@ namespace { const std::string kDynamicFiltersProduced = "dynamicFiltersProduced"; const std::string kDynamicFiltersAccepted = "dynamicFiltersAccepted"; const std::string kReplacedWithDynamicFilterRows = "replacedWithDynamicFilterRows"; +const std::string kDynamicFilteredRows = "dynamicFilteredRows"; const std::string kFlushRowCount = "flushRowCount"; const std::string kLoadedToValueHook = "loadedToValueHook"; const std::string kBloomFilterBlocksByteSize = "bloomFilterSize"; @@ -492,6 +493,8 @@ void WholeStageResultIterator::collectMetrics() { runtimeMetric("sum", second->customStats, kDynamicFiltersAccepted); metrics_->get(Metrics::kNumReplacedWithDynamicFilterRows)[metricIndex] = runtimeMetric("sum", second->customStats, kReplacedWithDynamicFilterRows); + metrics_->get(Metrics::kNumDynamicFilteredRows)[metricIndex] = + runtimeMetric("sum", second->customStats, kDynamicFilteredRows); metrics_->get(Metrics::kFlushRowCount)[metricIndex] = runtimeMetric("sum", second->customStats, kFlushRowCount); metrics_->get(Metrics::kLoadedToValueHook)[metricIndex] = runtimeMetric("sum", second->customStats, kLoadedToValueHook); diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h index 566ce875aacc..5a2ad4b8c7eb 100644 --- a/cpp/velox/config/VeloxConfig.h +++ b/cpp/velox/config/VeloxConfig.h @@ -78,6 +78,10 @@ const std::string kHashProbeDynamicFilterPushdownEnabled = const std::string kHashProbeBloomFilterPushdownMaxSize = "spark.gluten.sql.columnar.backend.velox.hashProbe.bloomFilterPushdown.maxSize"; +const std::string kValueStreamDynamicFilterEnabled = + "spark.gluten.sql.columnar.backend.velox.valueStream.dynamicFilter.enabled"; +const bool kValueStreamDynamicFilterEnabledDefault = false; + const std::string kShowTaskMetricsWhenFinished = "spark.gluten.sql.columnar.backend.velox.showTaskMetricsWhenFinished"; const bool kShowTaskMetricsWhenFinishedDefault = false; diff --git a/cpp/velox/operators/plannodes/RowVectorStream.cc b/cpp/velox/operators/plannodes/RowVectorStream.cc index 7c0b00979a74..d3278abb75a0 100644 --- a/cpp/velox/operators/plannodes/RowVectorStream.cc +++ b/cpp/velox/operators/plannodes/RowVectorStream.cc @@ -19,6 +19,7 @@ #include "memory/VeloxColumnarBatch.h" #include "velox/exec/Driver.h" #include "velox/exec/Operator.h" +#include "velox/exec/OperatorUtils.h" #include "velox/exec/Task.h" #include "velox/vector/arrow/Bridge.h" @@ -113,7 +114,9 @@ ValueStreamDataSource::ValueStreamDataSource( const facebook::velox::connector::ColumnHandleMap& columnHandles, facebook::velox::connector::ConnectorQueryCtx* connectorQueryCtx) : outputType_(outputType), - pool_(connectorQueryCtx->memoryPool()) {} + pool_(connectorQueryCtx->memoryPool()), + dynamicFilterEnabled_( + std::dynamic_pointer_cast(tableHandle)->dynamicFilterEnabled()) {} void ValueStreamDataSource::addSplit(std::shared_ptr split) { // Cast to IteratorConnectorSplit to extract the iterator @@ -166,7 +169,116 @@ std::optional ValueStreamDataSource::next( completedRows_ += rowVector->size(); completedBytes_ += rowVector->estimateFlatSize(); + // Apply dynamic filters if any have been pushed down. + if (!dynamicFilters_.empty()) { + rowVector = applyDynamicFilters(rowVector); + if (!rowVector) { + // All rows filtered out, try next batch. + return next(size, future); + } + } + return rowVector; } +facebook::velox::RowVectorPtr ValueStreamDataSource::applyDynamicFilters( + const facebook::velox::RowVectorPtr& input) { + using namespace facebook::velox; + + const auto numRows = input->size(); + if (numRows == 0) { + return input; + } + + SelectivityVector rows(numRows, true); + + for (const auto& [channel, filter] : dynamicFilters_) { + if (!filter || channel >= input->childrenSize()) { + continue; + } + applyFilterOnColumn(filter, input->childAt(channel), rows); + if (!rows.hasSelections()) { + dynamicFilteredRows_ += numRows; + return nullptr; + } + } + + const auto passedCount = rows.countSelected(); + if (passedCount == numRows) { + return input; + } + + dynamicFilteredRows_ += numRows - passedCount; + + BufferPtr indices = allocateIndices(passedCount, pool_); + auto* rawIndices = indices->asMutable(); + vector_size_t idx = 0; + rows.applyToSelected([&](auto row) { rawIndices[idx++] = row; }); + + return exec::wrap(passedCount, std::move(indices), input); +} + +void ValueStreamDataSource::applyFilterOnColumn( + const std::shared_ptr& filter, + const facebook::velox::VectorPtr& vector, + facebook::velox::SelectivityVector& rows) { + using namespace facebook::velox; + + DecodedVector decoded(*vector, rows); + + rows.applyToSelected([&](auto row) { + if (decoded.isNullAt(row)) { + if (!filter->testNull()) { + rows.setValid(row, false); + } + return; + } + + bool pass = false; + switch (vector->typeKind()) { + case TypeKind::BOOLEAN: + pass = filter->testBool(decoded.valueAt(row)); + break; + case TypeKind::TINYINT: + pass = filter->testInt64(decoded.valueAt(row)); + break; + case TypeKind::SMALLINT: + pass = filter->testInt64(decoded.valueAt(row)); + break; + case TypeKind::INTEGER: + pass = filter->testInt64(decoded.valueAt(row)); + break; + case TypeKind::BIGINT: + pass = filter->testInt64(decoded.valueAt(row)); + break; + case TypeKind::HUGEINT: + pass = filter->testInt128(decoded.valueAt(row)); + break; + case TypeKind::REAL: + pass = filter->testFloat(decoded.valueAt(row)); + break; + case TypeKind::DOUBLE: + pass = filter->testDouble(decoded.valueAt(row)); + break; + case TypeKind::VARCHAR: + case TypeKind::VARBINARY: { + auto sv = decoded.valueAt(row); + pass = filter->testBytes(sv.data(), sv.size()); + break; + } + case TypeKind::TIMESTAMP: + pass = filter->testTimestamp(decoded.valueAt(row)); + break; + default: + // For unsupported types, let the row pass through. + pass = true; + break; + } + if (!pass) { + rows.setValid(row, false); + } + }); + rows.updateBounds(); +} + } // namespace gluten \ No newline at end of file diff --git a/cpp/velox/operators/plannodes/RowVectorStream.h b/cpp/velox/operators/plannodes/RowVectorStream.h index 6e6ccd1527d5..a5b32980933f 100644 --- a/cpp/velox/operators/plannodes/RowVectorStream.h +++ b/cpp/velox/operators/plannodes/RowVectorStream.h @@ -23,7 +23,10 @@ #include "velox/connectors/Connector.h" #include "velox/exec/Driver.h" #include "velox/exec/Operator.h" +#include "velox/exec/OperatorUtils.h" #include "velox/exec/Task.h" +#include "velox/type/Filter.h" +#include "velox/vector/DecodedVector.h" namespace gluten { @@ -68,10 +71,17 @@ class ValueStreamDataSource : public facebook::velox::connector::DataSource { std::optional next(uint64_t size, facebook::velox::ContinueFuture& future) override; + const facebook::velox::common::SubfieldFilters* getFilters() const override { + return &emptyFilters_; + } + void addDynamicFilter( facebook::velox::column_index_t outputChannel, const std::shared_ptr& filter) override { - // Iterator-based sources don't support dynamic filtering + if (dynamicFilterEnabled_) { + dynamicFilters_[outputChannel] = filter; + numDynamicFiltersAccepted_++; + } } uint64_t getCompletedBytes() override { @@ -83,10 +93,26 @@ class ValueStreamDataSource : public facebook::velox::connector::DataSource { } std::unordered_map getRuntimeStats() override { - return {}; + std::unordered_map stats; + stats["dynamicFiltersAccepted"] = facebook::velox::RuntimeMetric(numDynamicFiltersAccepted_); + if (dynamicFilteredRows_ > 0) { + stats["dynamicFilteredRows"] = facebook::velox::RuntimeMetric(dynamicFilteredRows_); + } + return stats; } private: + // Applies dynamic filters to a batch, returning a dictionary-wrapped subset + // containing only the rows that pass all filters. + facebook::velox::RowVectorPtr applyDynamicFilters(const facebook::velox::RowVectorPtr& input); + + // Evaluates a Filter against a single column vector, deselecting rows that + // don't pass. + static void applyFilterOnColumn( + const std::shared_ptr& filter, + const facebook::velox::VectorPtr& vector, + facebook::velox::SelectivityVector& rows); + const facebook::velox::RowTypePtr outputType_; facebook::velox::memory::MemoryPool* pool_; @@ -94,21 +120,35 @@ class ValueStreamDataSource : public facebook::velox::connector::DataSource { std::shared_ptr currentIterator_{nullptr}; uint64_t completedBytes_{0}; uint64_t completedRows_{0}; + + folly::F14FastMap> dynamicFilters_; + const facebook::velox::common::SubfieldFilters emptyFilters_; + bool dynamicFilterEnabled_{true}; + uint64_t numDynamicFiltersAccepted_{0}; + uint64_t dynamicFilteredRows_{0}; }; /// Table handle for iterator-based scans class ValueStreamTableHandle : public facebook::velox::connector::ConnectorTableHandle { public: - explicit ValueStreamTableHandle(std::string connectorId) : ConnectorTableHandle(connectorId) {} + explicit ValueStreamTableHandle(std::string connectorId, bool dynamicFilterEnabled = true) + : ConnectorTableHandle(connectorId), dynamicFilterEnabled_(dynamicFilterEnabled) {} const std::string& name() const override { static const std::string kName = "ValueStreamTableHandle"; return kName; } + bool dynamicFilterEnabled() const { + return dynamicFilterEnabled_; + } + folly::dynamic serialize() const override { VELOX_NYI(); } + + private: + bool dynamicFilterEnabled_; }; /// Column handle for iterator-based scans @@ -138,6 +178,12 @@ class ValueStreamConnector : public facebook::velox::connector::Connector { std::shared_ptr config) : Connector(id, config) {} + // Always return true so Velox routes dynamic filters to the DataSource. + // Per-query gating happens in ValueStreamDataSource::addDynamicFilter(). + bool canAddDynamicFilter() const override { + return true; + } + std::unique_ptr createDataSource( const facebook::velox::RowTypePtr& outputType, const facebook::velox::connector::ConnectorTableHandlePtr& tableHandle, diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index 727f4882e174..e9c31e2fcb2c 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -1313,7 +1313,9 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::constructValueStreamNode( auto outputType = ROW(std::move(outNames), std::move(veloxTypeList)); // Create TableHandle - auto tableHandle = std::make_shared(kIteratorConnectorId); + bool dynamicFilterEnabled = + veloxCfg_->get(kValueStreamDynamicFilterEnabled, kValueStreamDynamicFilterEnabledDefault); + auto tableHandle = std::make_shared(kIteratorConnectorId, dynamicFilterEnabled); // Create column assignments connector::ColumnHandleMap assignments; diff --git a/cpp/velox/tests/CMakeLists.txt b/cpp/velox/tests/CMakeLists.txt index a690347bc4bc..0c61850e12ce 100644 --- a/cpp/velox/tests/CMakeLists.txt +++ b/cpp/velox/tests/CMakeLists.txt @@ -115,7 +115,8 @@ add_velox_test( VeloxRowToColumnarTest.cc VeloxColumnarBatchSerializerTest.cc VeloxColumnarBatchTest.cc - VeloxBatchResizerTest.cc) + VeloxBatchResizerTest.cc + ValueStreamDynamicFilterTest.cc) add_velox_test( velox_plan_conversion_test SOURCES diff --git a/cpp/velox/tests/ValueStreamDynamicFilterTest.cc b/cpp/velox/tests/ValueStreamDynamicFilterTest.cc new file mode 100644 index 000000000000..18707dea34bc --- /dev/null +++ b/cpp/velox/tests/ValueStreamDynamicFilterTest.cc @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "memory/VeloxColumnarBatch.h" +#include "operators/plannodes/RowVectorStream.h" +#include "velox/type/Filter.h" +#include "velox/vector/DecodedVector.h" +#include "velox/vector/FlatVector.h" +#include "velox/vector/tests/utils/VectorTestBase.h" + +using namespace facebook::velox; +using namespace facebook::velox::exec; +using namespace facebook::velox::common; + +namespace facebook::velox::test { + +/// A ColumnarBatchIterator that yields pre-built RowVectors as VeloxColumnarBatches. +class TestBatchIterator final : public gluten::ColumnarBatchIterator { + public: + explicit TestBatchIterator(std::vector batches) : batches_(std::move(batches)) {} + + std::shared_ptr next() override { + if (idx_ >= batches_.size()) { + return nullptr; + } + return std::make_shared(batches_[idx_++]); + } + + private: + std::vector batches_; + size_t idx_ = 0; +}; + +class ValueStreamDynamicFilterTest : public ::testing::Test, public VectorTestBase { + protected: + static void SetUpTestCase() { + memory::MemoryManager::testingSetInstance(memory::MemoryManager::Options{}); + } + + void SetUp() override { + // Register the connector if not already registered. + if (!connector::hasConnector(gluten::kIteratorConnectorId)) { + auto config = std::make_shared( + std::unordered_map()); + connector::registerConnector(std::make_shared( + gluten::kIteratorConnectorId, config)); + } + } + + /// Build a TableScanNode that reads from the value-stream connector. + std::shared_ptr makeTableScanNode( + const std::string& nodeId, + const RowTypePtr& outputType) { + auto tableHandle = + std::make_shared(gluten::kIteratorConnectorId); + + connector::ColumnHandleMap assignments; + for (int idx = 0; idx < outputType->size(); idx++) { + auto name = outputType->nameOf(idx); + auto type = outputType->childAt(idx); + assignments[name] = + std::make_shared(name, type); + } + + return std::make_shared( + nodeId, outputType, tableHandle, assignments); + } + + /// Create a split wrapping the given batches. + std::shared_ptr makeSplit( + std::vector batches) { + auto iter = std::make_shared( + std::make_unique(std::move(batches))); + return std::make_shared( + gluten::kIteratorConnectorId, std::move(iter)); + } + + /// Read all int64 values from column 0 of a serial-mode task. + std::vector readAllInt64(Task* task) { + std::vector result; + ContinueFuture future = ContinueFuture::makeEmpty(); + while (true) { + auto batch = task->next(&future); + if (!batch) { + break; + } + DecodedVector decoded(*batch->childAt(0)); + for (vector_size_t i = 0; i < batch->size(); i++) { + result.push_back(decoded.valueAt(i)); + } + } + return result; + } +}; + +// Test that without any filter, all rows pass through. +TEST_F(ValueStreamDynamicFilterTest, noFilterPassesAllRows) { + auto batch = makeRowVector({"id"}, {makeFlatVector({10, 20, 30})}); + auto outputType = asRowType(batch->type()); + auto scanNode = makeTableScanNode("vs0", outputType); + + auto queryCtx = core::QueryCtx::create(); + auto task = Task::create( + "test-nofilter", + core::PlanFragment{scanNode}, + 0, + queryCtx, + Task::ExecutionMode::kSerial); + + task->addSplit(scanNode->id(), Split{makeSplit({batch})}); + task->noMoreSplits(scanNode->id()); + + auto ids = readAllInt64(task.get()); + ASSERT_EQ(ids, (std::vector{10, 20, 30})); +} + +// Test that filtering works when filter is injected after first batch. +TEST_F(ValueStreamDynamicFilterTest, filterBigintRange) { + auto batch1 = makeRowVector({"id"}, {makeFlatVector({1, 2, 3, 4, 5})}); + auto batch2 = makeRowVector({"id"}, {makeFlatVector({6, 7, 8, 9, 10})}); + auto outputType = asRowType(batch1->type()); + auto scanNode = makeTableScanNode("vs1", outputType); + + auto queryCtx = core::QueryCtx::create(); + auto task = Task::create( + "test-bigint", + core::PlanFragment{scanNode}, + 0, + queryCtx, + Task::ExecutionMode::kSerial); + + // Add both batches as a single split. + task->addSplit(scanNode->id(), Split{makeSplit({batch1, batch2})}); + task->noMoreSplits(scanNode->id()); + + // First next() creates drivers and returns first batch (unfiltered). + ContinueFuture future = ContinueFuture::makeEmpty(); + auto firstBatch = task->next(&future); + ASSERT_NE(firstBatch, nullptr); + ASSERT_EQ(firstBatch->size(), 5); + + // Inject a BigintRange filter: keep only id >= 8. + task->testingVisitDrivers([&](Driver* driver) { + auto* op = driver->findOperator(scanNode->id()); + if (!op) { + return; + } + ASSERT_TRUE(op->canAddDynamicFilter()); + PushdownFilters pf; + pf.filters[0] = std::make_shared(8, 10, false); + pf.dynamicFilteredColumns.insert(0); + op->addDynamicFilterLocked("producer", pf); + }); + + // Second next() should return filtered batch. + auto secondBatch = task->next(&future); + ASSERT_NE(secondBatch, nullptr); + + DecodedVector decoded(*secondBatch->childAt(0)); + std::vector outputIds; + for (vector_size_t i = 0; i < secondBatch->size(); i++) { + outputIds.push_back(decoded.valueAt(i)); + } + ASSERT_EQ(outputIds, (std::vector{8, 9, 10})); + + auto end = task->next(&future); + ASSERT_EQ(end, nullptr); +} + +// Test that a filter eliminates all rows from a batch. +TEST_F(ValueStreamDynamicFilterTest, filterEliminatesEntireBatch) { + auto batch1 = makeRowVector({"id"}, {makeFlatVector({1, 2, 3})}); + auto batch2 = makeRowVector({"id"}, {makeFlatVector({100, 200, 300})}); + auto outputType = asRowType(batch1->type()); + auto scanNode = makeTableScanNode("vs2", outputType); + + auto queryCtx = core::QueryCtx::create(); + auto task = Task::create( + "test-eliminate", + core::PlanFragment{scanNode}, + 0, + queryCtx, + Task::ExecutionMode::kSerial); + + task->addSplit(scanNode->id(), Split{makeSplit({batch1, batch2})}); + task->noMoreSplits(scanNode->id()); + + ContinueFuture future = ContinueFuture::makeEmpty(); + auto firstBatch = task->next(&future); + ASSERT_NE(firstBatch, nullptr); + ASSERT_EQ(firstBatch->size(), 3); + + task->testingVisitDrivers([&](Driver* driver) { + auto* op = driver->findOperator(scanNode->id()); + if (!op) { + return; + } + PushdownFilters pf; + pf.filters[0] = std::make_shared(100, 300, false); + pf.dynamicFilteredColumns.insert(0); + op->addDynamicFilterLocked("producer", pf); + }); + + auto secondBatch = task->next(&future); + ASSERT_NE(secondBatch, nullptr); + + DecodedVector decoded(*secondBatch->childAt(0)); + std::vector outputIds; + for (vector_size_t i = 0; i < secondBatch->size(); i++) { + outputIds.push_back(decoded.valueAt(i)); + } + ASSERT_EQ(outputIds, (std::vector{100, 200, 300})); + + auto end = task->next(&future); + ASSERT_EQ(end, nullptr); +} + +// Test that nulls are filtered out when nullAllowed is false. +TEST_F(ValueStreamDynamicFilterTest, filterWithNulls) { + auto batch1 = makeRowVector({"id"}, {makeFlatVector({10, 20})}); + auto batch2 = makeRowVector( + {"id"}, + {makeNullableFlatVector({1, std::nullopt, 3, std::nullopt, 5})}); + auto outputType = asRowType(batch1->type()); + auto scanNode = makeTableScanNode("vs3", outputType); + + auto queryCtx = core::QueryCtx::create(); + auto task = Task::create( + "test-nulls", + core::PlanFragment{scanNode}, + 0, + queryCtx, + Task::ExecutionMode::kSerial); + + task->addSplit(scanNode->id(), Split{makeSplit({batch1, batch2})}); + task->noMoreSplits(scanNode->id()); + + ContinueFuture future = ContinueFuture::makeEmpty(); + auto firstBatch = task->next(&future); + ASSERT_NE(firstBatch, nullptr); + + task->testingVisitDrivers([&](Driver* driver) { + auto* op = driver->findOperator(scanNode->id()); + if (!op) { + return; + } + PushdownFilters pf; + pf.filters[0] = std::make_shared(3, 100, false); + pf.dynamicFilteredColumns.insert(0); + op->addDynamicFilterLocked("producer", pf); + }); + + auto secondBatch = task->next(&future); + ASSERT_NE(secondBatch, nullptr); + + DecodedVector decoded(*secondBatch->childAt(0)); + std::vector outputIds; + for (vector_size_t i = 0; i < secondBatch->size(); i++) { + ASSERT_FALSE(decoded.isNullAt(i)); + outputIds.push_back(decoded.valueAt(i)); + } + ASSERT_EQ(outputIds, (std::vector{3, 5})); + + auto end = task->next(&future); + ASSERT_EQ(end, nullptr); +} + +// Test canAddDynamicFilter returns true through the connector. +TEST_F(ValueStreamDynamicFilterTest, canAddDynamicFilter) { + auto batch = makeRowVector({"id"}, {makeFlatVector({1})}); + auto outputType = asRowType(batch->type()); + auto scanNode = makeTableScanNode("vs4", outputType); + + auto queryCtx = core::QueryCtx::create(); + auto task = Task::create( + "test-can-add", + core::PlanFragment{scanNode}, + 0, + queryCtx, + Task::ExecutionMode::kSerial); + + task->addSplit(scanNode->id(), Split{makeSplit({batch})}); + task->noMoreSplits(scanNode->id()); + + ContinueFuture future = ContinueFuture::makeEmpty(); + task->next(&future); + + bool found = false; + task->testingVisitDrivers([&](Driver* driver) { + auto* op = driver->findOperator(scanNode->id()); + if (op) { + ASSERT_TRUE(op->canAddDynamicFilter()); + found = true; + } + }); + ASSERT_TRUE(found); + + auto end = task->next(&future); + ASSERT_EQ(end, nullptr); +} + +} // namespace facebook::velox::test diff --git a/docs/velox-configuration.md b/docs/velox-configuration.md index f4a79c465211..40a1c3fdef54 100644 --- a/docs/velox-configuration.md +++ b/docs/velox-configuration.md @@ -74,6 +74,7 @@ nav_order: 16 | spark.gluten.sql.columnar.backend.velox.ssdChecksumReadVerificationEnabled | false | If true, checksum read verification from SSD is enabled. | | spark.gluten.sql.columnar.backend.velox.ssdDisableFileCow | false | True if copy on write should be disabled. | | spark.gluten.sql.columnar.backend.velox.ssdODirect | false | The O_DIRECT flag for cache writing | +| spark.gluten.sql.columnar.backend.velox.valueStream.dynamicFilter.enabled | false | Whether to apply dynamic filters pushed down from hash probe in the ValueStream (shuffle reader) operator to filter rows before they reach the hash join. | | spark.gluten.sql.enable.enhancedFeatures | true | Enable some features including iceberg native write and other features. | | spark.gluten.sql.rewrite.castArrayToString | true | When true, rewrite `cast(array as String)` to `concat('[', array_join(array, ', ', null), ']')` to allow offloading to Velox. | | spark.gluten.velox.castFromVarcharAddTrimNode | false | If true, will add a trim node which has the same sementic as vanilla Spark to CAST-from-varchar.Otherwise, do nothing. | diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala index 52be6c49547e..a6b005359352 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala @@ -85,6 +85,7 @@ trait IteratorApi { updateNativeMetrics: IMetrics => Unit, partitionIndex: Int, materializeInput: Boolean = false, - enableCudf: Boolean = false): Iterator[ColumnarBatch] + enableCudf: Boolean = false, + supportsValueStreamDynamicFilter: Boolean = true): Iterator[ColumnarBatch] // scalastyle:on argcount } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala index acef5d798ea0..485ebc985e68 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala @@ -51,7 +51,8 @@ case class TransformContext(outputAttributes: Seq[Attribute], root: RelNode) case class WholeStageTransformContext( root: PlanNode, substraitContext: SubstraitContext = null, - enableCudf: Boolean = false) + enableCudf: Boolean = false, + supportsValueStreamDynamicFilter: Boolean = true) /** Base interface for a query plan that can be interpreted to Substrait representation. */ trait TransformSupport extends ValidatablePlan { @@ -257,7 +258,34 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f PlanBuilder.makePlan(substraitContext, Lists.newArrayList(childCtx.root), outNames) } - WholeStageTransformContext(planNode, substraitContext, isCudf) + WholeStageTransformContext( + planNode, + substraitContext, + isCudf, + !hasNonDeterministicExprInJoinProbe(child)) + } + + /** + * Checks whether any HashJoin's probe (streamed) side contains non-deterministic expressions. + * When true, ValueStream dynamic filter pushdown must be disabled because if left enabled, the + * dynamic filter would filter rows at the ValueStream (below the non-deterministic Project), + * changing how many times the non-deterministic expression is evaluated and thus altering its + * output sequence. See SPARK-10316. + */ + private def hasNonDeterministicExprInJoinProbe(plan: SparkPlan): Boolean = { + plan match { + case join: HashJoinLikeExecTransformer => + containsNonDeterministicExpr(join.streamedPlan) || + hasNonDeterministicExprInJoinProbe(join.streamedPlan) || + hasNonDeterministicExprInJoinProbe(join.buildPlan) + case other => + other.children.exists(hasNonDeterministicExprInJoinProbe) + } + } + + private def containsNonDeterministicExpr(plan: SparkPlan): Boolean = { + plan.expressions.exists(!_.deterministic) || + plan.children.exists(containsNonDeterministicExpr) } def doWholeStageTransform(): WholeStageTransformContext = { diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageZippedPartitionsRDD.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageZippedPartitionsRDD.scala index 521388faaeb7..393716bb7b1b 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageZippedPartitionsRDD.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageZippedPartitionsRDD.scala @@ -54,7 +54,8 @@ class WholeStageZippedPartitionsRDD( updateNativeMetrics, split.index, materializeInput, - resCtx.enableCudf + resCtx.enableCudf, + resCtx.supportsValueStreamDynamicFilter ) } }