From 1b97da6d868a105043fd937869cdd03d3ca1758b Mon Sep 17 00:00:00 2001 From: Ankita Victor-Levi Date: Thu, 26 Feb 2026 09:44:43 +0000 Subject: [PATCH 1/8] Push dynamic filters down to ValueStream --- .../apache/gluten/config/VeloxConfig.scala | 11 + cpp/velox/compute/VeloxBackend.cc | 5 +- cpp/velox/compute/WholeStageResultIterator.cc | 2 + cpp/velox/config/VeloxConfig.h | 4 + .../operators/plannodes/RowVectorStream.cc | 111 +++++- .../operators/plannodes/RowVectorStream.h | 40 ++- cpp/velox/tests/CMakeLists.txt | 3 +- .../tests/ValueStreamDynamicFilterTest.cc | 318 ++++++++++++++++++ docs/velox-configuration.md | 1 + 9 files changed, 488 insertions(+), 7 deletions(-) create mode 100644 cpp/velox/tests/ValueStreamDynamicFilterTest.cc diff --git a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala index ee0866391ce0..9f1226cf9e6c 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala @@ -90,6 +90,9 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) { def hashProbeDynamicFilterPushdownEnabled: Boolean = getConf(HASH_PROBE_DYNAMIC_FILTER_PUSHDOWN_ENABLED) + + def valueStreamDynamicFilterEnabled: Boolean = + getConf(VALUE_STREAM_DYNAMIC_FILTER_ENABLED) } object VeloxConfig extends ConfigRegistry { @@ -468,6 +471,14 @@ object VeloxConfig extends ConfigRegistry { .booleanConf .createWithDefault(true) + val VALUE_STREAM_DYNAMIC_FILTER_ENABLED = + buildConf("spark.gluten.sql.columnar.backend.velox.valueStream.dynamicFilter.enabled") + .doc( + "Whether to apply dynamic filters pushed down from hash probe in the ValueStream" + + " (shuffle reader) operator to filter rows before they reach the hash join.") + .booleanConf + .createWithDefault(false) + val COLUMNAR_VELOX_FILE_HANDLE_CACHE_ENABLED = buildStaticConf("spark.gluten.sql.columnar.backend.velox.fileHandleCacheEnabled") .doc( diff --git a/cpp/velox/compute/VeloxBackend.cc b/cpp/velox/compute/VeloxBackend.cc index 45a64908e199..49bca9bfdb97 100644 --- a/cpp/velox/compute/VeloxBackend.cc +++ b/cpp/velox/compute/VeloxBackend.cc @@ -319,7 +319,10 @@ void VeloxBackend::initConnector(const std::shared_ptr(kHiveConnectorId, hiveConf, ioExecutor_.get())); // Register value-stream connector for runtime iterator-based inputs - velox::connector::registerConnector(std::make_shared(kIteratorConnectorId, hiveConf)); + auto valueStreamDynamicFilterEnabled = + backendConf_->get(kValueStreamDynamicFilterEnabled, kValueStreamDynamicFilterEnabledDefault); + velox::connector::registerConnector( + std::make_shared(kIteratorConnectorId, hiveConf, valueStreamDynamicFilterEnabled)); #ifdef GLUTEN_ENABLE_GPU if (backendConf_->get(kCudfEnableTableScan, kCudfEnableTableScanDefault) && diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index 157f9829b817..54b30e2f7beb 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -652,6 +652,8 @@ std::unordered_map WholeStageResultIterator::getQueryC std::to_string(veloxCfg_->get(kHashProbeDynamicFilterPushdownEnabled, true)); configs[velox::core::QueryConfig::kHashProbeBloomFilterPushdownMaxSize] = std::to_string(veloxCfg_->get(kHashProbeBloomFilterPushdownMaxSize, 0)); + configs["value_stream_dynamic_filter_enabled"] = + std::to_string(veloxCfg_->get(kValueStreamDynamicFilterEnabled, kValueStreamDynamicFilterEnabledDefault)); // spark.gluten.sql.columnar.backend.velox.SplitPreloadPerDriver takes no effect if // spark.gluten.sql.columnar.backend.velox.IOThreads is set to 0 configs[velox::core::QueryConfig::kMaxSplitPreloadPerDriver] = diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h index 566ce875aacc..5a2ad4b8c7eb 100644 --- a/cpp/velox/config/VeloxConfig.h +++ b/cpp/velox/config/VeloxConfig.h @@ -78,6 +78,10 @@ const std::string kHashProbeDynamicFilterPushdownEnabled = const std::string kHashProbeBloomFilterPushdownMaxSize = "spark.gluten.sql.columnar.backend.velox.hashProbe.bloomFilterPushdown.maxSize"; +const std::string kValueStreamDynamicFilterEnabled = + "spark.gluten.sql.columnar.backend.velox.valueStream.dynamicFilter.enabled"; +const bool kValueStreamDynamicFilterEnabledDefault = false; + const std::string kShowTaskMetricsWhenFinished = "spark.gluten.sql.columnar.backend.velox.showTaskMetricsWhenFinished"; const bool kShowTaskMetricsWhenFinishedDefault = false; diff --git a/cpp/velox/operators/plannodes/RowVectorStream.cc b/cpp/velox/operators/plannodes/RowVectorStream.cc index 7c0b00979a74..c26f27171dd0 100644 --- a/cpp/velox/operators/plannodes/RowVectorStream.cc +++ b/cpp/velox/operators/plannodes/RowVectorStream.cc @@ -19,6 +19,7 @@ #include "memory/VeloxColumnarBatch.h" #include "velox/exec/Driver.h" #include "velox/exec/Operator.h" +#include "velox/exec/OperatorUtils.h" #include "velox/exec/Task.h" #include "velox/vector/arrow/Bridge.h" @@ -113,7 +114,9 @@ ValueStreamDataSource::ValueStreamDataSource( const facebook::velox::connector::ColumnHandleMap& columnHandles, facebook::velox::connector::ConnectorQueryCtx* connectorQueryCtx) : outputType_(outputType), - pool_(connectorQueryCtx->memoryPool()) {} + pool_(connectorQueryCtx->memoryPool()), + dynamicFilterEnabled_( + connectorQueryCtx->sessionProperties()->get("value_stream_dynamic_filter_enabled", true)) {} void ValueStreamDataSource::addSplit(std::shared_ptr split) { // Cast to IteratorConnectorSplit to extract the iterator @@ -166,7 +169,113 @@ std::optional ValueStreamDataSource::next( completedRows_ += rowVector->size(); completedBytes_ += rowVector->estimateFlatSize(); + // Apply dynamic filters if any have been pushed down. + if (!dynamicFilters_.empty()) { + rowVector = applyDynamicFilters(rowVector); + if (!rowVector) { + // All rows filtered out, try next batch. + return next(size, future); + } + } + return rowVector; } +facebook::velox::RowVectorPtr ValueStreamDataSource::applyDynamicFilters( + const facebook::velox::RowVectorPtr& input) { + using namespace facebook::velox; + + const auto numRows = input->size(); + if (numRows == 0) { + return input; + } + + SelectivityVector rows(numRows, true); + + for (const auto& [channel, filter] : dynamicFilters_) { + if (!filter || channel >= input->childrenSize()) { + continue; + } + applyFilterOnColumn(filter, input->childAt(channel), rows); + if (!rows.hasSelections()) { + return nullptr; + } + } + + const auto passedCount = rows.countSelected(); + if (passedCount == numRows) { + return input; + } + + BufferPtr indices = allocateIndices(passedCount, pool_); + auto* rawIndices = indices->asMutable(); + vector_size_t idx = 0; + rows.applyToSelected([&](auto row) { rawIndices[idx++] = row; }); + + return exec::wrap(passedCount, std::move(indices), input); +} + +void ValueStreamDataSource::applyFilterOnColumn( + const std::shared_ptr& filter, + const facebook::velox::VectorPtr& vector, + facebook::velox::SelectivityVector& rows) { + using namespace facebook::velox; + + DecodedVector decoded(*vector, rows); + + rows.applyToSelected([&](auto row) { + if (decoded.isNullAt(row)) { + if (!filter->testNull()) { + rows.setValid(row, false); + } + return; + } + + bool pass = false; + switch (vector->typeKind()) { + case TypeKind::BOOLEAN: + pass = filter->testBool(decoded.valueAt(row)); + break; + case TypeKind::TINYINT: + pass = filter->testInt64(decoded.valueAt(row)); + break; + case TypeKind::SMALLINT: + pass = filter->testInt64(decoded.valueAt(row)); + break; + case TypeKind::INTEGER: + pass = filter->testInt64(decoded.valueAt(row)); + break; + case TypeKind::BIGINT: + pass = filter->testInt64(decoded.valueAt(row)); + break; + case TypeKind::HUGEINT: + pass = filter->testInt128(decoded.valueAt(row)); + break; + case TypeKind::REAL: + pass = filter->testFloat(decoded.valueAt(row)); + break; + case TypeKind::DOUBLE: + pass = filter->testDouble(decoded.valueAt(row)); + break; + case TypeKind::VARCHAR: + case TypeKind::VARBINARY: { + auto sv = decoded.valueAt(row); + pass = filter->testBytes(sv.data(), sv.size()); + break; + } + case TypeKind::TIMESTAMP: + pass = filter->testTimestamp(decoded.valueAt(row)); + break; + default: + // For unsupported types, let the row pass through. + pass = true; + break; + } + if (!pass) { + rows.setValid(row, false); + } + }); + rows.updateBounds(); +} + } // namespace gluten \ No newline at end of file diff --git a/cpp/velox/operators/plannodes/RowVectorStream.h b/cpp/velox/operators/plannodes/RowVectorStream.h index 6e6ccd1527d5..fe58c313cb39 100644 --- a/cpp/velox/operators/plannodes/RowVectorStream.h +++ b/cpp/velox/operators/plannodes/RowVectorStream.h @@ -23,7 +23,10 @@ #include "velox/connectors/Connector.h" #include "velox/exec/Driver.h" #include "velox/exec/Operator.h" +#include "velox/exec/OperatorUtils.h" #include "velox/exec/Task.h" +#include "velox/type/Filter.h" +#include "velox/vector/DecodedVector.h" namespace gluten { @@ -68,10 +71,16 @@ class ValueStreamDataSource : public facebook::velox::connector::DataSource { std::optional next(uint64_t size, facebook::velox::ContinueFuture& future) override; + const facebook::velox::common::SubfieldFilters* getFilters() const override { + return &emptyFilters_; + } + void addDynamicFilter( facebook::velox::column_index_t outputChannel, const std::shared_ptr& filter) override { - // Iterator-based sources don't support dynamic filtering + if (dynamicFilterEnabled_) { + dynamicFilters_[outputChannel] = filter; + } } uint64_t getCompletedBytes() override { @@ -87,6 +96,17 @@ class ValueStreamDataSource : public facebook::velox::connector::DataSource { } private: + // Applies dynamic filters to a batch, returning a dictionary-wrapped subset + // containing only the rows that pass all filters. + facebook::velox::RowVectorPtr applyDynamicFilters(const facebook::velox::RowVectorPtr& input); + + // Evaluates a Filter against a single column vector, deselecting rows that + // don't pass. + static void applyFilterOnColumn( + const std::shared_ptr& filter, + const facebook::velox::VectorPtr& vector, + facebook::velox::SelectivityVector& rows); + const facebook::velox::RowTypePtr outputType_; facebook::velox::memory::MemoryPool* pool_; @@ -94,6 +114,10 @@ class ValueStreamDataSource : public facebook::velox::connector::DataSource { std::shared_ptr currentIterator_{nullptr}; uint64_t completedBytes_{0}; uint64_t completedRows_{0}; + + folly::F14FastMap> dynamicFilters_; + const facebook::velox::common::SubfieldFilters emptyFilters_; + bool dynamicFilterEnabled_{true}; }; /// Table handle for iterator-based scans @@ -133,10 +157,15 @@ class ValueStreamColumnHandle : public facebook::velox::connector::ColumnHandle /// Connector implementation for iterator-based data sources class ValueStreamConnector : public facebook::velox::connector::Connector { public: - explicit ValueStreamConnector( + ValueStreamConnector( const std::string& id, - std::shared_ptr config) - : Connector(id, config) {} + std::shared_ptr config, + bool dynamicFilterEnabled = false) + : Connector(id, config), dynamicFilterEnabled_(dynamicFilterEnabled) {} + + bool canAddDynamicFilter() const override { + return dynamicFilterEnabled_; + } std::unique_ptr createDataSource( const facebook::velox::RowTypePtr& outputType, @@ -153,6 +182,9 @@ class ValueStreamConnector : public facebook::velox::connector::Connector { facebook::velox::connector::CommitStrategy commitStrategy) override { VELOX_UNSUPPORTED("ValueStreamConnector does not support data sinks"); } + + private: + bool dynamicFilterEnabled_; }; /// Factory for creating ValueStreamConnector instances diff --git a/cpp/velox/tests/CMakeLists.txt b/cpp/velox/tests/CMakeLists.txt index a690347bc4bc..0c61850e12ce 100644 --- a/cpp/velox/tests/CMakeLists.txt +++ b/cpp/velox/tests/CMakeLists.txt @@ -115,7 +115,8 @@ add_velox_test( VeloxRowToColumnarTest.cc VeloxColumnarBatchSerializerTest.cc VeloxColumnarBatchTest.cc - VeloxBatchResizerTest.cc) + VeloxBatchResizerTest.cc + ValueStreamDynamicFilterTest.cc) add_velox_test( velox_plan_conversion_test SOURCES diff --git a/cpp/velox/tests/ValueStreamDynamicFilterTest.cc b/cpp/velox/tests/ValueStreamDynamicFilterTest.cc new file mode 100644 index 000000000000..6e13bfed0145 --- /dev/null +++ b/cpp/velox/tests/ValueStreamDynamicFilterTest.cc @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "memory/VeloxColumnarBatch.h" +#include "operators/plannodes/RowVectorStream.h" +#include "velox/type/Filter.h" +#include "velox/vector/DecodedVector.h" +#include "velox/vector/FlatVector.h" +#include "velox/vector/tests/utils/VectorTestBase.h" + +using namespace facebook::velox; +using namespace facebook::velox::exec; +using namespace facebook::velox::common; + +namespace facebook::velox::test { + +/// A ColumnarBatchIterator that yields pre-built RowVectors as VeloxColumnarBatches. +class TestBatchIterator final : public gluten::ColumnarBatchIterator { + public: + explicit TestBatchIterator(std::vector batches) : batches_(std::move(batches)) {} + + std::shared_ptr next() override { + if (idx_ >= batches_.size()) { + return nullptr; + } + return std::make_shared(batches_[idx_++]); + } + + private: + std::vector batches_; + size_t idx_ = 0; +}; + +class ValueStreamDynamicFilterTest : public ::testing::Test, public VectorTestBase { + protected: + static void SetUpTestCase() { + memory::MemoryManager::testingSetInstance(memory::MemoryManager::Options{}); + } + + void SetUp() override { + // Register the connector if not already registered. + if (!connector::hasConnector(gluten::kIteratorConnectorId)) { + auto config = std::make_shared( + std::unordered_map()); + connector::registerConnector(std::make_shared( + gluten::kIteratorConnectorId, config, /*dynamicFilterEnabled=*/true)); + } + } + + /// Build a TableScanNode that reads from the value-stream connector. + std::shared_ptr makeTableScanNode( + const std::string& nodeId, + const RowTypePtr& outputType) { + auto tableHandle = + std::make_shared(gluten::kIteratorConnectorId); + + connector::ColumnHandleMap assignments; + for (int idx = 0; idx < outputType->size(); idx++) { + auto name = outputType->nameOf(idx); + auto type = outputType->childAt(idx); + assignments[name] = + std::make_shared(name, type); + } + + return std::make_shared( + nodeId, outputType, tableHandle, assignments); + } + + /// Create a split wrapping the given batches. + std::shared_ptr makeSplit( + std::vector batches) { + auto iter = std::make_shared( + std::make_unique(std::move(batches))); + return std::make_shared( + gluten::kIteratorConnectorId, std::move(iter)); + } + + /// Read all int64 values from column 0 of a serial-mode task. + std::vector readAllInt64(Task* task) { + std::vector result; + ContinueFuture future = ContinueFuture::makeEmpty(); + while (true) { + auto batch = task->next(&future); + if (!batch) { + break; + } + DecodedVector decoded(*batch->childAt(0)); + for (vector_size_t i = 0; i < batch->size(); i++) { + result.push_back(decoded.valueAt(i)); + } + } + return result; + } +}; + +// Test that without any filter, all rows pass through. +TEST_F(ValueStreamDynamicFilterTest, noFilterPassesAllRows) { + auto batch = makeRowVector({"id"}, {makeFlatVector({10, 20, 30})}); + auto outputType = asRowType(batch->type()); + auto scanNode = makeTableScanNode("vs0", outputType); + + auto queryCtx = core::QueryCtx::create(); + auto task = Task::create( + "test-nofilter", + core::PlanFragment{scanNode}, + 0, + queryCtx, + Task::ExecutionMode::kSerial); + + task->addSplit(scanNode->id(), Split{makeSplit({batch})}); + task->noMoreSplits(scanNode->id()); + + auto ids = readAllInt64(task.get()); + ASSERT_EQ(ids, (std::vector{10, 20, 30})); +} + +// Test that filtering works when filter is injected after first batch. +TEST_F(ValueStreamDynamicFilterTest, filterBigintRange) { + auto batch1 = makeRowVector({"id"}, {makeFlatVector({1, 2, 3, 4, 5})}); + auto batch2 = makeRowVector({"id"}, {makeFlatVector({6, 7, 8, 9, 10})}); + auto outputType = asRowType(batch1->type()); + auto scanNode = makeTableScanNode("vs1", outputType); + + auto queryCtx = core::QueryCtx::create(); + auto task = Task::create( + "test-bigint", + core::PlanFragment{scanNode}, + 0, + queryCtx, + Task::ExecutionMode::kSerial); + + // Add both batches as a single split. + task->addSplit(scanNode->id(), Split{makeSplit({batch1, batch2})}); + task->noMoreSplits(scanNode->id()); + + // First next() creates drivers and returns first batch (unfiltered). + ContinueFuture future = ContinueFuture::makeEmpty(); + auto firstBatch = task->next(&future); + ASSERT_NE(firstBatch, nullptr); + ASSERT_EQ(firstBatch->size(), 5); + + // Inject a BigintRange filter: keep only id >= 8. + task->testingVisitDrivers([&](Driver* driver) { + auto* op = driver->findOperator(scanNode->id()); + if (!op) { + return; + } + ASSERT_TRUE(op->canAddDynamicFilter()); + PushdownFilters pf; + pf.filters[0] = std::make_shared(8, 10, false); + pf.dynamicFilteredColumns.insert(0); + op->addDynamicFilterLocked("producer", pf); + }); + + // Second next() should return filtered batch. + auto secondBatch = task->next(&future); + ASSERT_NE(secondBatch, nullptr); + + DecodedVector decoded(*secondBatch->childAt(0)); + std::vector outputIds; + for (vector_size_t i = 0; i < secondBatch->size(); i++) { + outputIds.push_back(decoded.valueAt(i)); + } + ASSERT_EQ(outputIds, (std::vector{8, 9, 10})); + + auto end = task->next(&future); + ASSERT_EQ(end, nullptr); +} + +// Test that a filter eliminates all rows from a batch. +TEST_F(ValueStreamDynamicFilterTest, filterEliminatesEntireBatch) { + auto batch1 = makeRowVector({"id"}, {makeFlatVector({1, 2, 3})}); + auto batch2 = makeRowVector({"id"}, {makeFlatVector({100, 200, 300})}); + auto outputType = asRowType(batch1->type()); + auto scanNode = makeTableScanNode("vs2", outputType); + + auto queryCtx = core::QueryCtx::create(); + auto task = Task::create( + "test-eliminate", + core::PlanFragment{scanNode}, + 0, + queryCtx, + Task::ExecutionMode::kSerial); + + task->addSplit(scanNode->id(), Split{makeSplit({batch1, batch2})}); + task->noMoreSplits(scanNode->id()); + + ContinueFuture future = ContinueFuture::makeEmpty(); + auto firstBatch = task->next(&future); + ASSERT_NE(firstBatch, nullptr); + ASSERT_EQ(firstBatch->size(), 3); + + task->testingVisitDrivers([&](Driver* driver) { + auto* op = driver->findOperator(scanNode->id()); + if (!op) { + return; + } + PushdownFilters pf; + pf.filters[0] = std::make_shared(100, 300, false); + pf.dynamicFilteredColumns.insert(0); + op->addDynamicFilterLocked("producer", pf); + }); + + auto secondBatch = task->next(&future); + ASSERT_NE(secondBatch, nullptr); + + DecodedVector decoded(*secondBatch->childAt(0)); + std::vector outputIds; + for (vector_size_t i = 0; i < secondBatch->size(); i++) { + outputIds.push_back(decoded.valueAt(i)); + } + ASSERT_EQ(outputIds, (std::vector{100, 200, 300})); + + auto end = task->next(&future); + ASSERT_EQ(end, nullptr); +} + +// Test that nulls are filtered out when nullAllowed is false. +TEST_F(ValueStreamDynamicFilterTest, filterWithNulls) { + auto batch1 = makeRowVector({"id"}, {makeFlatVector({10, 20})}); + auto batch2 = makeRowVector( + {"id"}, + {makeNullableFlatVector({1, std::nullopt, 3, std::nullopt, 5})}); + auto outputType = asRowType(batch1->type()); + auto scanNode = makeTableScanNode("vs3", outputType); + + auto queryCtx = core::QueryCtx::create(); + auto task = Task::create( + "test-nulls", + core::PlanFragment{scanNode}, + 0, + queryCtx, + Task::ExecutionMode::kSerial); + + task->addSplit(scanNode->id(), Split{makeSplit({batch1, batch2})}); + task->noMoreSplits(scanNode->id()); + + ContinueFuture future = ContinueFuture::makeEmpty(); + auto firstBatch = task->next(&future); + ASSERT_NE(firstBatch, nullptr); + + task->testingVisitDrivers([&](Driver* driver) { + auto* op = driver->findOperator(scanNode->id()); + if (!op) { + return; + } + PushdownFilters pf; + pf.filters[0] = std::make_shared(3, 100, false); + pf.dynamicFilteredColumns.insert(0); + op->addDynamicFilterLocked("producer", pf); + }); + + auto secondBatch = task->next(&future); + ASSERT_NE(secondBatch, nullptr); + + DecodedVector decoded(*secondBatch->childAt(0)); + std::vector outputIds; + for (vector_size_t i = 0; i < secondBatch->size(); i++) { + ASSERT_FALSE(decoded.isNullAt(i)); + outputIds.push_back(decoded.valueAt(i)); + } + ASSERT_EQ(outputIds, (std::vector{3, 5})); + + auto end = task->next(&future); + ASSERT_EQ(end, nullptr); +} + +// Test canAddDynamicFilter returns true through the connector. +TEST_F(ValueStreamDynamicFilterTest, canAddDynamicFilter) { + auto batch = makeRowVector({"id"}, {makeFlatVector({1})}); + auto outputType = asRowType(batch->type()); + auto scanNode = makeTableScanNode("vs4", outputType); + + auto queryCtx = core::QueryCtx::create(); + auto task = Task::create( + "test-can-add", + core::PlanFragment{scanNode}, + 0, + queryCtx, + Task::ExecutionMode::kSerial); + + task->addSplit(scanNode->id(), Split{makeSplit({batch})}); + task->noMoreSplits(scanNode->id()); + + ContinueFuture future = ContinueFuture::makeEmpty(); + task->next(&future); + + bool found = false; + task->testingVisitDrivers([&](Driver* driver) { + auto* op = driver->findOperator(scanNode->id()); + if (op) { + ASSERT_TRUE(op->canAddDynamicFilter()); + found = true; + } + }); + ASSERT_TRUE(found); + + auto end = task->next(&future); + ASSERT_EQ(end, nullptr); +} + +} // namespace facebook::velox::test diff --git a/docs/velox-configuration.md b/docs/velox-configuration.md index f4a79c465211..40a1c3fdef54 100644 --- a/docs/velox-configuration.md +++ b/docs/velox-configuration.md @@ -74,6 +74,7 @@ nav_order: 16 | spark.gluten.sql.columnar.backend.velox.ssdChecksumReadVerificationEnabled | false | If true, checksum read verification from SSD is enabled. | | spark.gluten.sql.columnar.backend.velox.ssdDisableFileCow | false | True if copy on write should be disabled. | | spark.gluten.sql.columnar.backend.velox.ssdODirect | false | The O_DIRECT flag for cache writing | +| spark.gluten.sql.columnar.backend.velox.valueStream.dynamicFilter.enabled | false | Whether to apply dynamic filters pushed down from hash probe in the ValueStream (shuffle reader) operator to filter rows before they reach the hash join. | | spark.gluten.sql.enable.enhancedFeatures | true | Enable some features including iceberg native write and other features. | | spark.gluten.sql.rewrite.castArrayToString | true | When true, rewrite `cast(array as String)` to `concat('[', array_join(array, ', ', null), ']')` to allow offloading to Velox. | | spark.gluten.velox.castFromVarcharAddTrimNode | false | If true, will add a trim node which has the same sementic as vanilla Spark to CAST-from-varchar.Otherwise, do nothing. | From 26c2149d661449298b7db39192363f7f9ae8a9e1 Mon Sep 17 00:00:00 2001 From: Ankita Victor-Levi Date: Sat, 28 Feb 2026 17:35:53 +0530 Subject: [PATCH 2/8] Set to true --- .../src/main/scala/org/apache/gluten/config/VeloxConfig.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala index 9f1226cf9e6c..d1bd6f68f6c3 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala @@ -477,7 +477,7 @@ object VeloxConfig extends ConfigRegistry { "Whether to apply dynamic filters pushed down from hash probe in the ValueStream" + " (shuffle reader) operator to filter rows before they reach the hash join.") .booleanConf - .createWithDefault(false) + .createWithDefault(true) val COLUMNAR_VELOX_FILE_HANDLE_CACHE_ENABLED = buildStaticConf("spark.gluten.sql.columnar.backend.velox.fileHandleCacheEnabled") From 65ba188b7e931ed9b3b573a71cfe48f4d26e9e23 Mon Sep 17 00:00:00 2001 From: Ankita Victor-Levi Date: Sat, 28 Feb 2026 14:30:28 +0000 Subject: [PATCH 3/8] Set default as true --- cpp/velox/config/VeloxConfig.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h index 5a2ad4b8c7eb..834fbe6fba66 100644 --- a/cpp/velox/config/VeloxConfig.h +++ b/cpp/velox/config/VeloxConfig.h @@ -80,7 +80,7 @@ const std::string kHashProbeBloomFilterPushdownMaxSize = const std::string kValueStreamDynamicFilterEnabled = "spark.gluten.sql.columnar.backend.velox.valueStream.dynamicFilter.enabled"; -const bool kValueStreamDynamicFilterEnabledDefault = false; +const bool kValueStreamDynamicFilterEnabledDefault = true; const std::string kShowTaskMetricsWhenFinished = "spark.gluten.sql.columnar.backend.velox.showTaskMetricsWhenFinished"; const bool kShowTaskMetricsWhenFinishedDefault = false; From 9c184dcb8c8c9d0a09d1f068596da57bf8d55d99 Mon Sep 17 00:00:00 2001 From: Ankita Victor-Levi Date: Sat, 28 Feb 2026 14:43:26 +0000 Subject: [PATCH 4/8] Address review comments for config, SQL metrics --- .../gluten/metrics/OperatorMetrics.java | 3 ++ .../backendsapi/velox/VeloxMetricsApi.scala | 6 +++ .../gluten/metrics/JoinMetricsUpdater.scala | 9 +++++ .../apache/gluten/metrics/MetricsUtil.scala | 3 ++ .../gluten/execution/VeloxHashJoinSuite.scala | 37 +++++++++++++++++++ cpp/core/utils/Metrics.h | 1 + cpp/velox/compute/WholeStageResultIterator.cc | 5 ++- .../operators/plannodes/RowVectorStream.cc | 5 ++- .../operators/plannodes/RowVectorStream.h | 20 +++++++++- cpp/velox/substrait/SubstraitToVeloxPlan.cc | 4 +- 10 files changed, 87 insertions(+), 6 deletions(-) diff --git a/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java b/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java index 10563e507e9b..cf51d00925d1 100644 --- a/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java +++ b/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java @@ -38,6 +38,7 @@ public class OperatorMetrics implements IOperatorMetrics { public long numDynamicFiltersProduced; public long numDynamicFiltersAccepted; public long numReplacedWithDynamicFilterRows; + public long numDynamicFilteredRows; public long flushRowCount; public long loadedToValueHook; public long bloomFilterBlocksByteSize; @@ -83,6 +84,7 @@ public OperatorMetrics( long numDynamicFiltersProduced, long numDynamicFiltersAccepted, long numReplacedWithDynamicFilterRows, + long numDynamicFilteredRows, long flushRowCount, long loadedToValueHook, long bloomFilterBlocksByteSize, @@ -125,6 +127,7 @@ public OperatorMetrics( this.numDynamicFiltersProduced = numDynamicFiltersProduced; this.numDynamicFiltersAccepted = numDynamicFiltersAccepted; this.numReplacedWithDynamicFilterRows = numReplacedWithDynamicFilterRows; + this.numDynamicFilteredRows = numDynamicFilteredRows; this.flushRowCount = flushRowCount; this.loadedToValueHook = loadedToValueHook; this.bloomFilterBlocksByteSize = bloomFilterBlocksByteSize; diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala index 29d882c3d3db..28eb81a2cd70 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala @@ -628,6 +628,12 @@ class VeloxMetricsApi extends MetricsApi with Logging { "hashProbeDynamicFiltersProduced" -> SQLMetrics.createMetric( sparkContext, "number of hash probe dynamic filters produced"), + "valueStreamDynamicFiltersAccepted" -> SQLMetrics.createMetric( + sparkContext, + "number of dynamic filters accepted by value stream"), + "valueStreamDynamicFilteredRows" -> SQLMetrics.createMetric( + sparkContext, + "number of rows filtered by value stream dynamic filter"), "bloomFilterBlocksByteSize" -> SQLMetrics.createSizeMetric( sparkContext, "bloom filter blocks byte size"), diff --git a/backends-velox/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala b/backends-velox/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala index cf894b9da466..2f2859514476 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala @@ -101,6 +101,11 @@ class HashJoinMetricsUpdater(override val metrics: Map[String, SQLMetric]) val bloomFilterBlocksByteSize: SQLMetric = metrics("bloomFilterBlocksByteSize") + val valueStreamDynamicFiltersAccepted: SQLMetric = + metrics("valueStreamDynamicFiltersAccepted") + val valueStreamDynamicFilteredRows: SQLMetric = + metrics("valueStreamDynamicFilteredRows") + val streamPreProjectionCpuCount: SQLMetric = metrics("streamPreProjectionCpuCount") val streamPreProjectionWallNanos: SQLMetric = metrics("streamPreProjectionWallNanos") @@ -175,6 +180,10 @@ class HashJoinMetricsUpdater(override val metrics: Map[String, SQLMetric]) } loadLazyVectorTime += joinMetrics.asScala.last.loadLazyVectorTime + joinMetrics.asScala.foreach { m => + valueStreamDynamicFiltersAccepted += m.numDynamicFiltersAccepted + valueStreamDynamicFilteredRows += m.numDynamicFilteredRows + } } } diff --git a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala index 607de718ce07..b45b5170fc26 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala @@ -120,6 +120,7 @@ object MetricsUtil extends Logging { var numDynamicFiltersProduced: Long = 0 var numDynamicFiltersAccepted: Long = 0 var numReplacedWithDynamicFilterRows: Long = 0 + var numDynamicFilteredRows: Long = 0 var flushRowCount: Long = 0 var loadedToValueHook: Long = 0 var bloomFilterBlocksByteSize: Long = 0 @@ -155,6 +156,7 @@ object MetricsUtil extends Logging { numDynamicFiltersProduced += metrics.numDynamicFiltersProduced numDynamicFiltersAccepted += metrics.numDynamicFiltersAccepted numReplacedWithDynamicFilterRows += metrics.numReplacedWithDynamicFilterRows + numDynamicFilteredRows += metrics.numDynamicFilteredRows flushRowCount += metrics.flushRowCount loadedToValueHook += metrics.loadedToValueHook bloomFilterBlocksByteSize += metrics.bloomFilterBlocksByteSize @@ -197,6 +199,7 @@ object MetricsUtil extends Logging { numDynamicFiltersProduced, numDynamicFiltersAccepted, numReplacedWithDynamicFilterRows, + numDynamicFilteredRows, flushRowCount, loadedToValueHook, bloomFilterBlocksByteSize, diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala index 8db6b957753f..3b9f4ae7f83d 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala @@ -342,4 +342,41 @@ class VeloxHashJoinSuite extends VeloxWholeStageTransformerSuite { } } } + + test("Value stream dynamic filter pushdown") { + withSQLConf( + "spark.sql.autoBroadcastJoinThreshold" -> "-1", + "spark.sql.adaptive.enabled" -> "false", + GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key -> "true", + VeloxConfig.VALUE_STREAM_DYNAMIC_FILTER_ENABLED.key -> "true" + ) { + withTable("vs_probe_table", "vs_build_table") { + spark.sql(""" + CREATE TABLE vs_probe_table USING PARQUET + AS SELECT id as a FROM range(110001) + """) + + spark.sql(""" + CREATE TABLE vs_build_table USING PARQUET + AS SELECT id * 1000 as b FROM range(220002) + """) + + runQueryAndCompare( + "SELECT a FROM vs_probe_table JOIN vs_build_table ON a = b" + ) { + df => + val join = find(df.queryExecution.executedPlan) { + case _: ShuffledHashJoinExecTransformer => true + case _ => false + } + assert(join.isDefined) + val metrics = join.get.metrics + assert(metrics.contains("valueStreamDynamicFiltersAccepted")) + assert(metrics("valueStreamDynamicFiltersAccepted").value > 0) + assert(metrics.contains("valueStreamDynamicFilteredRows")) + assert(metrics("valueStreamDynamicFilteredRows").value > 0) + } + } + } + } } diff --git a/cpp/core/utils/Metrics.h b/cpp/core/utils/Metrics.h index 8e33a4a9c613..6d1c79bf6daa 100644 --- a/cpp/core/utils/Metrics.h +++ b/cpp/core/utils/Metrics.h @@ -67,6 +67,7 @@ struct Metrics { kNumDynamicFiltersProduced, kNumDynamicFiltersAccepted, kNumReplacedWithDynamicFilterRows, + kNumDynamicFilteredRows, kFlushRowCount, kLoadedToValueHook, kBloomFilterBlocksByteSize, diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index 54b30e2f7beb..6ca24907a063 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -43,6 +43,7 @@ namespace { const std::string kDynamicFiltersProduced = "dynamicFiltersProduced"; const std::string kDynamicFiltersAccepted = "dynamicFiltersAccepted"; const std::string kReplacedWithDynamicFilterRows = "replacedWithDynamicFilterRows"; +const std::string kDynamicFilteredRows = "dynamicFilteredRows"; const std::string kFlushRowCount = "flushRowCount"; const std::string kLoadedToValueHook = "loadedToValueHook"; const std::string kBloomFilterBlocksByteSize = "bloomFilterSize"; @@ -492,6 +493,8 @@ void WholeStageResultIterator::collectMetrics() { runtimeMetric("sum", second->customStats, kDynamicFiltersAccepted); metrics_->get(Metrics::kNumReplacedWithDynamicFilterRows)[metricIndex] = runtimeMetric("sum", second->customStats, kReplacedWithDynamicFilterRows); + metrics_->get(Metrics::kNumDynamicFilteredRows)[metricIndex] = + runtimeMetric("sum", second->customStats, kDynamicFilteredRows); metrics_->get(Metrics::kFlushRowCount)[metricIndex] = runtimeMetric("sum", second->customStats, kFlushRowCount); metrics_->get(Metrics::kLoadedToValueHook)[metricIndex] = runtimeMetric("sum", second->customStats, kLoadedToValueHook); @@ -652,8 +655,6 @@ std::unordered_map WholeStageResultIterator::getQueryC std::to_string(veloxCfg_->get(kHashProbeDynamicFilterPushdownEnabled, true)); configs[velox::core::QueryConfig::kHashProbeBloomFilterPushdownMaxSize] = std::to_string(veloxCfg_->get(kHashProbeBloomFilterPushdownMaxSize, 0)); - configs["value_stream_dynamic_filter_enabled"] = - std::to_string(veloxCfg_->get(kValueStreamDynamicFilterEnabled, kValueStreamDynamicFilterEnabledDefault)); // spark.gluten.sql.columnar.backend.velox.SplitPreloadPerDriver takes no effect if // spark.gluten.sql.columnar.backend.velox.IOThreads is set to 0 configs[velox::core::QueryConfig::kMaxSplitPreloadPerDriver] = diff --git a/cpp/velox/operators/plannodes/RowVectorStream.cc b/cpp/velox/operators/plannodes/RowVectorStream.cc index c26f27171dd0..d3278abb75a0 100644 --- a/cpp/velox/operators/plannodes/RowVectorStream.cc +++ b/cpp/velox/operators/plannodes/RowVectorStream.cc @@ -116,7 +116,7 @@ ValueStreamDataSource::ValueStreamDataSource( : outputType_(outputType), pool_(connectorQueryCtx->memoryPool()), dynamicFilterEnabled_( - connectorQueryCtx->sessionProperties()->get("value_stream_dynamic_filter_enabled", true)) {} + std::dynamic_pointer_cast(tableHandle)->dynamicFilterEnabled()) {} void ValueStreamDataSource::addSplit(std::shared_ptr split) { // Cast to IteratorConnectorSplit to extract the iterator @@ -198,6 +198,7 @@ facebook::velox::RowVectorPtr ValueStreamDataSource::applyDynamicFilters( } applyFilterOnColumn(filter, input->childAt(channel), rows); if (!rows.hasSelections()) { + dynamicFilteredRows_ += numRows; return nullptr; } } @@ -207,6 +208,8 @@ facebook::velox::RowVectorPtr ValueStreamDataSource::applyDynamicFilters( return input; } + dynamicFilteredRows_ += numRows - passedCount; + BufferPtr indices = allocateIndices(passedCount, pool_); auto* rawIndices = indices->asMutable(); vector_size_t idx = 0; diff --git a/cpp/velox/operators/plannodes/RowVectorStream.h b/cpp/velox/operators/plannodes/RowVectorStream.h index fe58c313cb39..9adf09342707 100644 --- a/cpp/velox/operators/plannodes/RowVectorStream.h +++ b/cpp/velox/operators/plannodes/RowVectorStream.h @@ -80,6 +80,7 @@ class ValueStreamDataSource : public facebook::velox::connector::DataSource { const std::shared_ptr& filter) override { if (dynamicFilterEnabled_) { dynamicFilters_[outputChannel] = filter; + numDynamicFiltersAccepted_++; } } @@ -92,7 +93,12 @@ class ValueStreamDataSource : public facebook::velox::connector::DataSource { } std::unordered_map getRuntimeStats() override { - return {}; + std::unordered_map stats; + stats["dynamicFiltersAccepted"] = facebook::velox::RuntimeMetric(numDynamicFiltersAccepted_); + if (dynamicFilteredRows_ > 0) { + stats["dynamicFilteredRows"] = facebook::velox::RuntimeMetric(dynamicFilteredRows_); + } + return stats; } private: @@ -118,21 +124,31 @@ class ValueStreamDataSource : public facebook::velox::connector::DataSource { folly::F14FastMap> dynamicFilters_; const facebook::velox::common::SubfieldFilters emptyFilters_; bool dynamicFilterEnabled_{true}; + uint64_t numDynamicFiltersAccepted_{0}; + uint64_t dynamicFilteredRows_{0}; }; /// Table handle for iterator-based scans class ValueStreamTableHandle : public facebook::velox::connector::ConnectorTableHandle { public: - explicit ValueStreamTableHandle(std::string connectorId) : ConnectorTableHandle(connectorId) {} + explicit ValueStreamTableHandle(std::string connectorId, bool dynamicFilterEnabled = true) + : ConnectorTableHandle(connectorId), dynamicFilterEnabled_(dynamicFilterEnabled) {} const std::string& name() const override { static const std::string kName = "ValueStreamTableHandle"; return kName; } + bool dynamicFilterEnabled() const { + return dynamicFilterEnabled_; + } + folly::dynamic serialize() const override { VELOX_NYI(); } + + private: + bool dynamicFilterEnabled_; }; /// Column handle for iterator-based scans diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index 727f4882e174..e9c31e2fcb2c 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -1313,7 +1313,9 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::constructValueStreamNode( auto outputType = ROW(std::move(outNames), std::move(veloxTypeList)); // Create TableHandle - auto tableHandle = std::make_shared(kIteratorConnectorId); + bool dynamicFilterEnabled = + veloxCfg_->get(kValueStreamDynamicFilterEnabled, kValueStreamDynamicFilterEnabledDefault); + auto tableHandle = std::make_shared(kIteratorConnectorId, dynamicFilterEnabled); // Create column assignments connector::ColumnHandleMap assignments; From d1cb9cc38d88850b29c3cb39ba752b059708a502 Mon Sep 17 00:00:00 2001 From: Ankita Victor-Levi Date: Sat, 28 Feb 2026 12:06:47 +0000 Subject: [PATCH 5/8] Disable ValueStream dynamic filter when probe side has non-deterministic expressions (cherry picked from commit 152bce7865ccc10619d5d0c69896282ad694a6e4) --- .../clickhouse/CHIteratorApi.scala | 3 +- .../backendsapi/velox/VeloxIteratorApi.scala | 10 ++++-- .../gluten/backendsapi/IteratorApi.scala | 3 +- .../execution/WholeStageTransformer.scala | 32 +++++++++++++++++-- .../WholeStageZippedPartitionsRDD.scala | 3 +- 5 files changed, 44 insertions(+), 7 deletions(-) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala index 2cd9d8516493..d83659a11f23 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala @@ -325,7 +325,8 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { updateNativeMetrics: IMetrics => Unit, partitionIndex: Int, materializeInput: Boolean, - enableCudf: Boolean): Iterator[ColumnarBatch] = { + enableCudf: Boolean, + disableValueStreamDynamicFilter: Boolean): Iterator[ColumnarBatch] = { // scalastyle:on argcount // Final iterator does not contain scan split, so pass empty split info to native here. diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala index 668e60b20542..87abeafd82a6 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala @@ -247,8 +247,14 @@ class VeloxIteratorApi extends IteratorApi with Logging { updateNativeMetrics: IMetrics => Unit, partitionIndex: Int, materializeInput: Boolean, - enableCudf: Boolean = false): Iterator[ColumnarBatch] = { - val extraConf = Map(GlutenConfig.COLUMNAR_CUDF_ENABLED.key -> enableCudf.toString).asJava + enableCudf: Boolean = false, + disableValueStreamDynamicFilter: Boolean = false): Iterator[ColumnarBatch] = { + val extraConfMap = mutable.Map( + GlutenConfig.COLUMNAR_CUDF_ENABLED.key -> enableCudf.toString) + if (disableValueStreamDynamicFilter) { + extraConfMap(VeloxConfig.VALUE_STREAM_DYNAMIC_FILTER_ENABLED.key) = "false" + } + val extraConf = extraConfMap.asJava val transKernel = NativePlanEvaluator.create(BackendsApiManager.getBackendName, extraConf) val columnarNativeIterator = inputIterators.map { diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala index 52be6c49547e..a17642a998fd 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala @@ -85,6 +85,7 @@ trait IteratorApi { updateNativeMetrics: IMetrics => Unit, partitionIndex: Int, materializeInput: Boolean = false, - enableCudf: Boolean = false): Iterator[ColumnarBatch] + enableCudf: Boolean = false, + disableValueStreamDynamicFilter: Boolean = false): Iterator[ColumnarBatch] // scalastyle:on argcount } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala index acef5d798ea0..aea3317855fa 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala @@ -51,7 +51,8 @@ case class TransformContext(outputAttributes: Seq[Attribute], root: RelNode) case class WholeStageTransformContext( root: PlanNode, substraitContext: SubstraitContext = null, - enableCudf: Boolean = false) + enableCudf: Boolean = false, + disableValueStreamDynamicFilter: Boolean = false) /** Base interface for a query plan that can be interpreted to Substrait representation. */ trait TransformSupport extends ValidatablePlan { @@ -257,7 +258,34 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f PlanBuilder.makePlan(substraitContext, Lists.newArrayList(childCtx.root), outNames) } - WholeStageTransformContext(planNode, substraitContext, isCudf) + WholeStageTransformContext( + planNode, + substraitContext, + isCudf, + hasNonDeterministicExprInJoinProbe(child)) + } + + /** + * Checks whether any HashJoin's probe (streamed) side contains non-deterministic expressions. + * When true, ValueStream dynamic filter pushdown must be disabled because the dynamic filter + * would be applied at the scan level (below the non-deterministic Project), changing how many + * times the non-deterministic expression is evaluated and thus altering its output sequence. + * See SPARK-10316. + */ + private def hasNonDeterministicExprInJoinProbe(plan: SparkPlan): Boolean = { + plan match { + case join: HashJoinLikeExecTransformer => + containsNonDeterministicExpr(join.streamedPlan) || + hasNonDeterministicExprInJoinProbe(join.streamedPlan) || + hasNonDeterministicExprInJoinProbe(join.buildPlan) + case other => + other.children.exists(hasNonDeterministicExprInJoinProbe) + } + } + + private def containsNonDeterministicExpr(plan: SparkPlan): Boolean = { + plan.expressions.exists(!_.deterministic) || + plan.children.exists(containsNonDeterministicExpr) } def doWholeStageTransform(): WholeStageTransformContext = { diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageZippedPartitionsRDD.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageZippedPartitionsRDD.scala index 521388faaeb7..d07382a8112a 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageZippedPartitionsRDD.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageZippedPartitionsRDD.scala @@ -54,7 +54,8 @@ class WholeStageZippedPartitionsRDD( updateNativeMetrics, split.index, materializeInput, - resCtx.enableCudf + resCtx.enableCudf, + resCtx.disableValueStreamDynamicFilter ) } } From db1fef52204bb38e4d16e9449e19a18276c431f6 Mon Sep 17 00:00:00 2001 From: Ankita Victor-Levi Date: Sat, 28 Feb 2026 18:01:33 +0000 Subject: [PATCH 6/8] Fix build --- .../java/org/apache/gluten/metrics/Metrics.java | 4 ++++ .../backendsapi/velox/VeloxIteratorApi.scala | 3 +-- .../gluten/metrics/JoinMetricsUpdater.scala | 4 ---- .../org/apache/gluten/metrics/MetricsUtil.scala | 15 +++++++++++++++ cpp/core/jni/JniWrapper.cc | 3 ++- .../org/apache/gluten/config/GlutenConfig.scala | 3 ++- .../gluten/execution/WholeStageTransformer.scala | 10 +++++----- 7 files changed, 29 insertions(+), 13 deletions(-) diff --git a/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java b/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java index 3d23dc94db97..30baae53199a 100644 --- a/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java +++ b/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java @@ -40,6 +40,7 @@ public class Metrics implements IMetrics { public long[] numDynamicFiltersProduced; public long[] numDynamicFiltersAccepted; public long[] numReplacedWithDynamicFilterRows; + public long[] numDynamicFilteredRows; public long[] flushRowCount; public long[] loadedToValueHook; public long[] bloomFilterBlocksByteSize; @@ -90,6 +91,7 @@ public Metrics( long[] numDynamicFiltersProduced, long[] numDynamicFiltersAccepted, long[] numReplacedWithDynamicFilterRows, + long[] numDynamicFilteredRows, long[] flushRowCount, long[] loadedToValueHook, long[] bloomFilterBlocksByteSize, @@ -134,6 +136,7 @@ public Metrics( this.numDynamicFiltersProduced = numDynamicFiltersProduced; this.numDynamicFiltersAccepted = numDynamicFiltersAccepted; this.numReplacedWithDynamicFilterRows = numReplacedWithDynamicFilterRows; + this.numDynamicFilteredRows = numDynamicFilteredRows; this.flushRowCount = flushRowCount; this.loadedToValueHook = loadedToValueHook; this.bloomFilterBlocksByteSize = bloomFilterBlocksByteSize; @@ -184,6 +187,7 @@ public OperatorMetrics getOperatorMetrics(int index) { numDynamicFiltersProduced[index], numDynamicFiltersAccepted[index], numReplacedWithDynamicFilterRows[index], + numDynamicFilteredRows[index], flushRowCount[index], loadedToValueHook[index], bloomFilterBlocksByteSize[index], diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala index 87abeafd82a6..450bac719685 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala @@ -249,8 +249,7 @@ class VeloxIteratorApi extends IteratorApi with Logging { materializeInput: Boolean, enableCudf: Boolean = false, disableValueStreamDynamicFilter: Boolean = false): Iterator[ColumnarBatch] = { - val extraConfMap = mutable.Map( - GlutenConfig.COLUMNAR_CUDF_ENABLED.key -> enableCudf.toString) + val extraConfMap = mutable.Map(GlutenConfig.COLUMNAR_CUDF_ENABLED.key -> enableCudf.toString) if (disableValueStreamDynamicFilter) { extraConfMap(VeloxConfig.VALUE_STREAM_DYNAMIC_FILTER_ENABLED.key) = "false" } diff --git a/backends-velox/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala b/backends-velox/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala index 2f2859514476..97a2b35f1e13 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala @@ -180,10 +180,6 @@ class HashJoinMetricsUpdater(override val metrics: Map[String, SQLMetric]) } loadLazyVectorTime += joinMetrics.asScala.last.loadLazyVectorTime - joinMetrics.asScala.foreach { m => - valueStreamDynamicFiltersAccepted += m.numDynamicFiltersAccepted - valueStreamDynamicFilteredRows += m.numDynamicFilteredRows - } } } diff --git a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala index b45b5170fc26..a55d09964898 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala @@ -298,6 +298,8 @@ object MetricsUtil extends Logging { curMetricsIdx } + val childStartMetricsIdx = newMetricsIdx + mutNode.children.foreach { child => val result = updateTransformerMetricsInternal( @@ -312,6 +314,19 @@ object MetricsUtil extends Logging { newMetricsIdx = result._2 } + // Collect ValueStream dynamic filter metrics from child operators (scan nodes) + // since these stats are reported on the ValueStream/TableScan operator, not on + // the HashProbe/HashBuild operators that are part of the join's own metrics. + mutNode.updater match { + case hju: HashJoinMetricsUpdater => + for (idx <- (newMetricsIdx + 1) to childStartMetricsIdx) { + val childOpMetrics = metrics.getOperatorMetrics(idx) + hju.valueStreamDynamicFiltersAccepted += childOpMetrics.numDynamicFiltersAccepted + hju.valueStreamDynamicFilteredRows += childOpMetrics.numDynamicFilteredRows + } + case _ => + } + (newOperatorIdx, newMetricsIdx) } diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index c48886195b22..a65d60e30c50 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -273,7 +273,7 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { env, metricsBuilderClass, "", - "([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[JLjava/lang/String;)V"); + "([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[JLjava/lang/String;)V"); nativeColumnarToRowInfoClass = createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/vectorized/NativeColumnarToRowInfo;"); @@ -589,6 +589,7 @@ JNIEXPORT jobject JNICALL Java_org_apache_gluten_metrics_IteratorMetricsJniWrapp longArray[Metrics::kNumDynamicFiltersProduced], longArray[Metrics::kNumDynamicFiltersAccepted], longArray[Metrics::kNumReplacedWithDynamicFilterRows], + longArray[Metrics::kNumDynamicFilteredRows], longArray[Metrics::kFlushRowCount], longArray[Metrics::kLoadedToValueHook], longArray[Metrics::kBloomFilterBlocksByteSize], diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala index ed2d54936655..21728a716f1a 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala @@ -614,7 +614,8 @@ object GlutenConfig extends ConfigRegistry { ("spark.gluten.velox.awsSdkLogLevel", "FATAL"), ("spark.gluten.velox.s3UseProxyFromEnv", "false"), ("spark.gluten.velox.s3PayloadSigningPolicy", "Never"), - (SQLConf.SESSION_LOCAL_TIMEZONE.key, SQLConf.SESSION_LOCAL_TIMEZONE.defaultValueString) + (SQLConf.SESSION_LOCAL_TIMEZONE.key, SQLConf.SESSION_LOCAL_TIMEZONE.defaultValueString), + ("spark.gluten.sql.columnar.backend.velox.valueStream.dynamicFilter.enabled", "true") ).foreach { case (k, defaultValue) => nativeConfMap.put(k, conf.getOrElse(k, defaultValue)) } val keys = Set( diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala index aea3317855fa..edf3c6ac9042 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala @@ -269,15 +269,15 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f * Checks whether any HashJoin's probe (streamed) side contains non-deterministic expressions. * When true, ValueStream dynamic filter pushdown must be disabled because the dynamic filter * would be applied at the scan level (below the non-deterministic Project), changing how many - * times the non-deterministic expression is evaluated and thus altering its output sequence. - * See SPARK-10316. + * times the non-deterministic expression is evaluated and thus altering its output sequence. See + * SPARK-10316. */ private def hasNonDeterministicExprInJoinProbe(plan: SparkPlan): Boolean = { plan match { case join: HashJoinLikeExecTransformer => containsNonDeterministicExpr(join.streamedPlan) || - hasNonDeterministicExprInJoinProbe(join.streamedPlan) || - hasNonDeterministicExprInJoinProbe(join.buildPlan) + hasNonDeterministicExprInJoinProbe(join.streamedPlan) || + hasNonDeterministicExprInJoinProbe(join.buildPlan) case other => other.children.exists(hasNonDeterministicExprInJoinProbe) } @@ -285,7 +285,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f private def containsNonDeterministicExpr(plan: SparkPlan): Boolean = { plan.expressions.exists(!_.deterministic) || - plan.children.exists(containsNonDeterministicExpr) + plan.children.exists(containsNonDeterministicExpr) } def doWholeStageTransform(): WholeStageTransformContext = { From f645fd2720c122dd18b29b2c49698961fcaf8d5f Mon Sep 17 00:00:00 2001 From: Ankita Victor-Levi Date: Wed, 4 Mar 2026 18:22:25 +0000 Subject: [PATCH 7/8] Resolve PR comments --- .../backendsapi/clickhouse/CHIteratorApi.scala | 2 +- .../gluten/backendsapi/velox/VeloxIteratorApi.scala | 4 ++-- .../scala/org/apache/gluten/config/VeloxConfig.scala | 2 +- cpp/velox/config/VeloxConfig.h | 2 +- .../org/apache/gluten/backendsapi/IteratorApi.scala | 2 +- .../org/apache/gluten/config/GlutenConfig.scala | 3 +-- .../gluten/execution/WholeStageTransformer.scala | 12 ++++++------ .../execution/WholeStageZippedPartitionsRDD.scala | 2 +- 8 files changed, 14 insertions(+), 15 deletions(-) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala index d83659a11f23..4d0a916e42e2 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala @@ -326,7 +326,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { partitionIndex: Int, materializeInput: Boolean, enableCudf: Boolean, - disableValueStreamDynamicFilter: Boolean): Iterator[ColumnarBatch] = { + supportsValueStreamDynamicFilter: Boolean): Iterator[ColumnarBatch] = { // scalastyle:on argcount // Final iterator does not contain scan split, so pass empty split info to native here. diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala index 450bac719685..93a50d635a50 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala @@ -248,9 +248,9 @@ class VeloxIteratorApi extends IteratorApi with Logging { partitionIndex: Int, materializeInput: Boolean, enableCudf: Boolean = false, - disableValueStreamDynamicFilter: Boolean = false): Iterator[ColumnarBatch] = { + supportsValueStreamDynamicFilter: Boolean = true): Iterator[ColumnarBatch] = { val extraConfMap = mutable.Map(GlutenConfig.COLUMNAR_CUDF_ENABLED.key -> enableCudf.toString) - if (disableValueStreamDynamicFilter) { + if (!supportsValueStreamDynamicFilter) { extraConfMap(VeloxConfig.VALUE_STREAM_DYNAMIC_FILTER_ENABLED.key) = "false" } val extraConf = extraConfMap.asJava diff --git a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala index d1bd6f68f6c3..9f1226cf9e6c 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala @@ -477,7 +477,7 @@ object VeloxConfig extends ConfigRegistry { "Whether to apply dynamic filters pushed down from hash probe in the ValueStream" + " (shuffle reader) operator to filter rows before they reach the hash join.") .booleanConf - .createWithDefault(true) + .createWithDefault(false) val COLUMNAR_VELOX_FILE_HANDLE_CACHE_ENABLED = buildStaticConf("spark.gluten.sql.columnar.backend.velox.fileHandleCacheEnabled") diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h index 834fbe6fba66..5a2ad4b8c7eb 100644 --- a/cpp/velox/config/VeloxConfig.h +++ b/cpp/velox/config/VeloxConfig.h @@ -80,7 +80,7 @@ const std::string kHashProbeBloomFilterPushdownMaxSize = const std::string kValueStreamDynamicFilterEnabled = "spark.gluten.sql.columnar.backend.velox.valueStream.dynamicFilter.enabled"; -const bool kValueStreamDynamicFilterEnabledDefault = true; +const bool kValueStreamDynamicFilterEnabledDefault = false; const std::string kShowTaskMetricsWhenFinished = "spark.gluten.sql.columnar.backend.velox.showTaskMetricsWhenFinished"; const bool kShowTaskMetricsWhenFinishedDefault = false; diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala index a17642a998fd..a6b005359352 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala @@ -86,6 +86,6 @@ trait IteratorApi { partitionIndex: Int, materializeInput: Boolean = false, enableCudf: Boolean = false, - disableValueStreamDynamicFilter: Boolean = false): Iterator[ColumnarBatch] + supportsValueStreamDynamicFilter: Boolean = true): Iterator[ColumnarBatch] // scalastyle:on argcount } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala index 21728a716f1a..ed2d54936655 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala @@ -614,8 +614,7 @@ object GlutenConfig extends ConfigRegistry { ("spark.gluten.velox.awsSdkLogLevel", "FATAL"), ("spark.gluten.velox.s3UseProxyFromEnv", "false"), ("spark.gluten.velox.s3PayloadSigningPolicy", "Never"), - (SQLConf.SESSION_LOCAL_TIMEZONE.key, SQLConf.SESSION_LOCAL_TIMEZONE.defaultValueString), - ("spark.gluten.sql.columnar.backend.velox.valueStream.dynamicFilter.enabled", "true") + (SQLConf.SESSION_LOCAL_TIMEZONE.key, SQLConf.SESSION_LOCAL_TIMEZONE.defaultValueString) ).foreach { case (k, defaultValue) => nativeConfMap.put(k, conf.getOrElse(k, defaultValue)) } val keys = Set( diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala index edf3c6ac9042..485ebc985e68 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala @@ -52,7 +52,7 @@ case class WholeStageTransformContext( root: PlanNode, substraitContext: SubstraitContext = null, enableCudf: Boolean = false, - disableValueStreamDynamicFilter: Boolean = false) + supportsValueStreamDynamicFilter: Boolean = true) /** Base interface for a query plan that can be interpreted to Substrait representation. */ trait TransformSupport extends ValidatablePlan { @@ -262,15 +262,15 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f planNode, substraitContext, isCudf, - hasNonDeterministicExprInJoinProbe(child)) + !hasNonDeterministicExprInJoinProbe(child)) } /** * Checks whether any HashJoin's probe (streamed) side contains non-deterministic expressions. - * When true, ValueStream dynamic filter pushdown must be disabled because the dynamic filter - * would be applied at the scan level (below the non-deterministic Project), changing how many - * times the non-deterministic expression is evaluated and thus altering its output sequence. See - * SPARK-10316. + * When true, ValueStream dynamic filter pushdown must be disabled because if left enabled, the + * dynamic filter would filter rows at the ValueStream (below the non-deterministic Project), + * changing how many times the non-deterministic expression is evaluated and thus altering its + * output sequence. See SPARK-10316. */ private def hasNonDeterministicExprInJoinProbe(plan: SparkPlan): Boolean = { plan match { diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageZippedPartitionsRDD.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageZippedPartitionsRDD.scala index d07382a8112a..393716bb7b1b 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageZippedPartitionsRDD.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageZippedPartitionsRDD.scala @@ -55,7 +55,7 @@ class WholeStageZippedPartitionsRDD( split.index, materializeInput, resCtx.enableCudf, - resCtx.disableValueStreamDynamicFilter + resCtx.supportsValueStreamDynamicFilter ) } } From 09ceae3fa12177603d6e3b27e66e83e232c1bcd4 Mon Sep 17 00:00:00 2001 From: Ankita Victor-Levi Date: Thu, 5 Mar 2026 10:34:27 +0000 Subject: [PATCH 8/8] Fix ValueStream dynamic filter config to flow from per-query SQLConf to native --- .../backendsapi/velox/VeloxIteratorApi.scala | 5 +++++ cpp/velox/compute/VeloxBackend.cc | 4 +--- cpp/velox/operators/plannodes/RowVectorStream.h | 14 ++++++-------- cpp/velox/tests/ValueStreamDynamicFilterTest.cc | 2 +- 4 files changed, 13 insertions(+), 12 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala index 93a50d635a50..d8a8d8541734 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter} import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper import org.apache.spark.sql.vectorized.ColumnarBatch @@ -252,6 +253,10 @@ class VeloxIteratorApi extends IteratorApi with Logging { val extraConfMap = mutable.Map(GlutenConfig.COLUMNAR_CUDF_ENABLED.key -> enableCudf.toString) if (!supportsValueStreamDynamicFilter) { extraConfMap(VeloxConfig.VALUE_STREAM_DYNAMIC_FILTER_ENABLED.key) = "false" + } else { + val veloxConf = new VeloxConfig(SQLConf.get) + extraConfMap(VeloxConfig.VALUE_STREAM_DYNAMIC_FILTER_ENABLED.key) = + veloxConf.valueStreamDynamicFilterEnabled.toString } val extraConf = extraConfMap.asJava val transKernel = NativePlanEvaluator.create(BackendsApiManager.getBackendName, extraConf) diff --git a/cpp/velox/compute/VeloxBackend.cc b/cpp/velox/compute/VeloxBackend.cc index 49bca9bfdb97..a9fd5e123976 100644 --- a/cpp/velox/compute/VeloxBackend.cc +++ b/cpp/velox/compute/VeloxBackend.cc @@ -319,10 +319,8 @@ void VeloxBackend::initConnector(const std::shared_ptr(kHiveConnectorId, hiveConf, ioExecutor_.get())); // Register value-stream connector for runtime iterator-based inputs - auto valueStreamDynamicFilterEnabled = - backendConf_->get(kValueStreamDynamicFilterEnabled, kValueStreamDynamicFilterEnabledDefault); velox::connector::registerConnector( - std::make_shared(kIteratorConnectorId, hiveConf, valueStreamDynamicFilterEnabled)); + std::make_shared(kIteratorConnectorId, hiveConf)); #ifdef GLUTEN_ENABLE_GPU if (backendConf_->get(kCudfEnableTableScan, kCudfEnableTableScanDefault) && diff --git a/cpp/velox/operators/plannodes/RowVectorStream.h b/cpp/velox/operators/plannodes/RowVectorStream.h index 9adf09342707..a5b32980933f 100644 --- a/cpp/velox/operators/plannodes/RowVectorStream.h +++ b/cpp/velox/operators/plannodes/RowVectorStream.h @@ -173,14 +173,15 @@ class ValueStreamColumnHandle : public facebook::velox::connector::ColumnHandle /// Connector implementation for iterator-based data sources class ValueStreamConnector : public facebook::velox::connector::Connector { public: - ValueStreamConnector( + explicit ValueStreamConnector( const std::string& id, - std::shared_ptr config, - bool dynamicFilterEnabled = false) - : Connector(id, config), dynamicFilterEnabled_(dynamicFilterEnabled) {} + std::shared_ptr config) + : Connector(id, config) {} + // Always return true so Velox routes dynamic filters to the DataSource. + // Per-query gating happens in ValueStreamDataSource::addDynamicFilter(). bool canAddDynamicFilter() const override { - return dynamicFilterEnabled_; + return true; } std::unique_ptr createDataSource( @@ -198,9 +199,6 @@ class ValueStreamConnector : public facebook::velox::connector::Connector { facebook::velox::connector::CommitStrategy commitStrategy) override { VELOX_UNSUPPORTED("ValueStreamConnector does not support data sinks"); } - - private: - bool dynamicFilterEnabled_; }; /// Factory for creating ValueStreamConnector instances diff --git a/cpp/velox/tests/ValueStreamDynamicFilterTest.cc b/cpp/velox/tests/ValueStreamDynamicFilterTest.cc index 6e13bfed0145..18707dea34bc 100644 --- a/cpp/velox/tests/ValueStreamDynamicFilterTest.cc +++ b/cpp/velox/tests/ValueStreamDynamicFilterTest.cc @@ -59,7 +59,7 @@ class ValueStreamDynamicFilterTest : public ::testing::Test, public VectorTestBa auto config = std::make_shared( std::unordered_map()); connector::registerConnector(std::make_shared( - gluten::kIteratorConnectorId, config, /*dynamicFilterEnabled=*/true)); + gluten::kIteratorConnectorId, config)); } }