From 01a1105dca8910ff3a2932cb47f59700cca22f36 Mon Sep 17 00:00:00 2001 From: Ankita Victor-Levi Date: Fri, 6 Mar 2026 07:11:28 +0000 Subject: [PATCH 1/3] Translate might_contain as a subfield filter --- .../SparkExprToSubfieldFilterParser.cc | 69 +++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/cpp/velox/operators/functions/SparkExprToSubfieldFilterParser.cc b/cpp/velox/operators/functions/SparkExprToSubfieldFilterParser.cc index 20baaa413eb0..78139a1b429f 100644 --- a/cpp/velox/operators/functions/SparkExprToSubfieldFilterParser.cc +++ b/cpp/velox/operators/functions/SparkExprToSubfieldFilterParser.cc @@ -16,11 +16,64 @@ */ #include "operators/functions/SparkExprToSubfieldFilterParser.h" +#include "velox/common/base/BloomFilter.h" +#include "velox/expression/Expr.h" +#include "velox/vector/ComplexVector.h" + namespace gluten { using namespace facebook::velox; namespace { + +VectorPtr toConstant(const core::TypedExprPtr& expr, core::ExpressionEvaluator* evaluator) { + auto exprSet = evaluator->compile(expr); + if (!exprSet->exprs()[0]->isConstantExpr()) { + return nullptr; + } + RowVector input(evaluator->pool(), ROW({}, {}), nullptr, 1, std::vector{}); + SelectivityVector rows(1); + VectorPtr result; + try { + evaluator->evaluate(exprSet.get(), rows, input, result); + } catch (const VeloxUserError&) { + return nullptr; + } + return result; +} + +/// Filter backed by Velox's BloomFilter<> serialized data from bloom_filter_agg. +class SparkBloomFilter final : public common::Filter { + public: + SparkBloomFilter(std::vector serializedData, bool nullAllowed) + : Filter(true, nullAllowed, common::FilterKind::kBigintValuesUsingBloomFilter), + serializedData_(std::move(serializedData)) {} + + bool testInt64(int64_t value) const final { + return BloomFilter<>::mayContain(serializedData_.data(), folly::hasher()(value)); + } + + bool testInt64Range(int64_t /*min*/, int64_t /*max*/, bool /*hasNull*/) const final { + return true; + } + + std::unique_ptr clone(std::optional nullAllowed) const override { + return std::make_unique(serializedData_, nullAllowed.value_or(nullAllowed_)); + } + + bool testingEquals(const Filter& other) const override { + auto* otherFilter = dynamic_cast(&other); + return otherFilter != nullptr && serializedData_ == otherFilter->serializedData_; + } + + folly::dynamic serialize() const override { + VELOX_UNSUPPORTED("Serialization is not supported for SparkBloomFilter"); + } + + private: + std::vector serializedData_; +}; + std::optional>> combine( facebook::velox::common::Subfield& subfield, std::unique_ptr& filter) { @@ -30,6 +83,7 @@ std::optional>> @@ -93,6 +147,21 @@ SparkExprToSubfieldFilterParser::leafCallToSubfieldFilter( } return std::make_pair(std::move(subfield), facebook::velox::exec::isNotNull()); } + } else if (call.name() == "might_contain" && !negated) { + // might_contain(bloomFilter, value) — the column to filter is input[1]. + if (call.inputs().size() >= 2) { + const auto* valueSide = call.inputs()[1].get(); + if (toSubfield(valueSide, subfield)) { + auto bloomFilterValue = toConstant(call.inputs()[0], evaluator); + if (bloomFilterValue && !bloomFilterValue->isNullAt(0)) { + auto sv = bloomFilterValue->as>()->valueAt(0); + std::vector data(sv.data(), sv.data() + sv.size()); + std::unique_ptr filter = + std::make_unique(std::move(data), false /*nullAllowed*/); + return combine(subfield, filter); + } + } + } } return std::nullopt; } From 0258168fe9066acb1abab25e091c62387382611d Mon Sep 17 00:00:00 2001 From: Ankita Victor-Levi Date: Fri, 6 Mar 2026 07:11:34 +0000 Subject: [PATCH 2/3] Add tests for might_contain subfield filter --- cpp/velox/tests/CMakeLists.txt | 2 + .../SparkExprToSubfieldFilterParserTest.cc | 192 ++++++++++++++++++ 2 files changed, 194 insertions(+) create mode 100644 cpp/velox/tests/SparkExprToSubfieldFilterParserTest.cc diff --git a/cpp/velox/tests/CMakeLists.txt b/cpp/velox/tests/CMakeLists.txt index a690347bc4bc..a5b87a0ab7ca 100644 --- a/cpp/velox/tests/CMakeLists.txt +++ b/cpp/velox/tests/CMakeLists.txt @@ -128,6 +128,8 @@ add_velox_test( VeloxToSubstraitTypeTest.cc) add_velox_test(spark_functions_test SOURCES SparkFunctionTest.cc FunctionTest.cc) +add_velox_test(spark_expr_to_subfield_filter_parser_test SOURCES + SparkExprToSubfieldFilterParserTest.cc) add_velox_test(runtime_test SOURCES RuntimeTest.cc) add_velox_test(velox_memory_test SOURCES MemoryManagerTest.cc) add_velox_test(buffer_outputstream_test SOURCES BufferOutputStreamTest.cc) diff --git a/cpp/velox/tests/SparkExprToSubfieldFilterParserTest.cc b/cpp/velox/tests/SparkExprToSubfieldFilterParserTest.cc new file mode 100644 index 000000000000..3916820b6f60 --- /dev/null +++ b/cpp/velox/tests/SparkExprToSubfieldFilterParserTest.cc @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "operators/functions/SparkExprToSubfieldFilterParser.h" +#include "velox/common/base/BloomFilter.h" +#include "velox/core/QueryCtx.h" +#include "velox/expression/Expr.h" +#include "velox/vector/tests/utils/VectorTestBase.h" + +using namespace facebook::velox; +using namespace facebook::velox::common; + +namespace gluten { +namespace { + +class SparkExprToSubfieldFilterParserTest : public ::testing::Test, + public test::VectorTestBase { + protected: + static void SetUpTestCase() { + memory::MemoryManager::testingSetInstance(memory::MemoryManager::Options{}); + } + + /// Builds a serialized Velox BloomFilter containing the given int64 values. + std::vector makeSerializedBloomFilter( + const std::vector& values) { + BloomFilter<> bf; + bf.reset(std::max(100, values.size() * 4)); + for (auto v : values) { + bf.insert(folly::hasher()(v)); + } + std::vector data(bf.serializedSize()); + bf.serialize(data.data()); + return data; + } + + /// Creates a ConstantTypedExpr wrapping serialized bloom filter bytes. + core::TypedExprPtr makeVarbinaryConstant(const std::vector& data) { + auto vector = makeFlatVector( + std::vector{StringView(data.data(), data.size())}, + VARBINARY()); + return std::make_shared(vector); + } + + /// Creates a null VARBINARY constant expression. + core::TypedExprPtr makeNullVarbinaryConstant() { + auto vector = BaseVector::createNullConstant(VARBINARY(), 1, pool()); + return std::make_shared(vector); + } + + /// Constructs a might_contain(bloomFilter, value) CallTypedExpr. + core::CallTypedExprPtr makeMightContainCall( + const core::TypedExprPtr& bloomFilterExpr, + const core::TypedExprPtr& valueExpr) { + return std::make_shared( + BOOLEAN(), "might_contain", bloomFilterExpr, valueExpr); + } + + /// Calls leafCallToSubfieldFilter on the parser. Returns (subfield, nullptr) + /// when the parser cannot translate the expression. + std::pair> parse( + const core::CallTypedExprPtr& call, + bool negated = false) { + if (auto result = + parser_.leafCallToSubfieldFilter(*call, &evaluator_, negated)) { + return std::move(result.value()); + } + return std::make_pair(Subfield(), nullptr); + } + + private: + std::shared_ptr queryCtx_{core::QueryCtx::create()}; + exec::SimpleExpressionEvaluator evaluator_{queryCtx_.get(), pool()}; + SparkExprToSubfieldFilterParser parser_; +}; + +TEST_F(SparkExprToSubfieldFilterParserTest, mightContainBasic) { + std::vector inserted = {42, 100, 200, 300}; + auto serialized = makeSerializedBloomFilter(inserted); + + auto bloomExpr = makeVarbinaryConstant(serialized); + auto columnExpr = + std::make_shared(BIGINT(), "a"); + auto call = makeMightContainCall(bloomExpr, columnExpr); + + auto [subfield, filter] = parse(call); + + ASSERT_TRUE(filter); + + // Verify the subfield points to column "a". + ASSERT_EQ(subfield.path().size(), 1); + EXPECT_EQ(*subfield.path()[0], Subfield::NestedField("a")); + + // All inserted values must pass the bloom filter. + for (auto v : inserted) { + EXPECT_TRUE(filter->testInt64(v)) << "Value " << v << " should pass"; + } + + // Most non-inserted values should be rejected. + int falsePositives = 0; + for (int64_t v = 1000; v < 2000; v++) { + if (filter->testInt64(v)) { + ++falsePositives; + } + } + EXPECT_LT(falsePositives, 100) << "Too many false positives"; +} + +TEST_F(SparkExprToSubfieldFilterParserTest, mightContainNullBloomFilter) { + auto nullExpr = makeNullVarbinaryConstant(); + auto columnExpr = + std::make_shared(BIGINT(), "a"); + auto call = makeMightContainCall(nullExpr, columnExpr); + + auto [subfield, filter] = parse(call); + EXPECT_FALSE(filter); +} + +TEST_F(SparkExprToSubfieldFilterParserTest, mightContainNegated) { + auto serialized = makeSerializedBloomFilter({42}); + auto bloomExpr = makeVarbinaryConstant(serialized); + auto columnExpr = + std::make_shared(BIGINT(), "a"); + auto call = makeMightContainCall(bloomExpr, columnExpr); + + auto [subfield, filter] = parse(call, /*negated=*/true); + EXPECT_FALSE(filter); +} + +TEST_F(SparkExprToSubfieldFilterParserTest, mightContainNonColumnValue) { + auto serialized = makeSerializedBloomFilter({42}); + auto bloomExpr = makeVarbinaryConstant(serialized); + // Use a constant (not a column reference) as the value argument. + auto constValue = makeVarbinaryConstant(serialized); // type doesn't matter + auto call = makeMightContainCall(bloomExpr, constValue); + + auto [subfield, filter] = parse(call); + EXPECT_FALSE(filter); +} + +TEST_F(SparkExprToSubfieldFilterParserTest, mightContainInt64Range) { + auto serialized = makeSerializedBloomFilter({42}); + auto bloomExpr = makeVarbinaryConstant(serialized); + auto columnExpr = + std::make_shared(BIGINT(), "a"); + auto call = makeMightContainCall(bloomExpr, columnExpr); + + auto [subfield, filter] = parse(call); + ASSERT_TRUE(filter); + + // Bloom filters cannot efficiently prune integer ranges. + EXPECT_TRUE(filter->testInt64Range(0, 1000, false)); + EXPECT_TRUE(filter->testInt64Range(0, 1000, true)); +} + +TEST_F(SparkExprToSubfieldFilterParserTest, mightContainClone) { + std::vector inserted = {42, 100}; + auto serialized = makeSerializedBloomFilter(inserted); + auto bloomExpr = makeVarbinaryConstant(serialized); + auto columnExpr = + std::make_shared(BIGINT(), "a"); + auto call = makeMightContainCall(bloomExpr, columnExpr); + + auto [subfield, filter] = parse(call); + ASSERT_TRUE(filter); + + auto cloned = filter->clone(std::nullopt); + ASSERT_TRUE(cloned); + EXPECT_TRUE(filter->testingEquals(*cloned)); + + for (auto v : inserted) { + EXPECT_TRUE(cloned->testInt64(v)); + } +} + +} // namespace +} // namespace gluten From 49b2ef59659b9705573a297de7238978ef971140 Mon Sep 17 00:00:00 2001 From: Ankita Victor-Levi Date: Mon, 9 Mar 2026 09:46:22 +0000 Subject: [PATCH 3/3] Address comments --- .../SparkExprToSubfieldFilterParser.cc | 30 +++++++++++-------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/cpp/velox/operators/functions/SparkExprToSubfieldFilterParser.cc b/cpp/velox/operators/functions/SparkExprToSubfieldFilterParser.cc index 78139a1b429f..9c4e9cab5edf 100644 --- a/cpp/velox/operators/functions/SparkExprToSubfieldFilterParser.cc +++ b/cpp/velox/operators/functions/SparkExprToSubfieldFilterParser.cc @@ -26,6 +26,9 @@ using namespace facebook::velox; namespace { +// Evaluates an expression as a constant. Returns nullptr if the expression is +// not constant or evaluation fails. Errors are intentionally swallowed because +// a non-evaluable expression simply means the filter cannot be pushed down. VectorPtr toConstant(const core::TypedExprPtr& expr, core::ExpressionEvaluator* evaluator) { auto exprSet = evaluator->compile(expr); if (!exprSet->exprs()[0]->isConstantExpr()) { @@ -42,15 +45,16 @@ VectorPtr toConstant(const core::TypedExprPtr& expr, core::ExpressionEvaluator* return result; } -/// Filter backed by Velox's BloomFilter<> serialized data from bloom_filter_agg. -class SparkBloomFilter final : public common::Filter { +/// Subfield filter backed by Velox's BloomFilter from bloom_filter_agg / might_contain. +class SparkMightContain final : public common::Filter { public: - SparkBloomFilter(std::vector serializedData, bool nullAllowed) - : Filter(true, nullAllowed, common::FilterKind::kBigintValuesUsingBloomFilter), - serializedData_(std::move(serializedData)) {} + SparkMightContain(const char* serializedData, bool nullAllowed) + : Filter(true, nullAllowed, common::FilterKind::kBigintValuesUsingBloomFilter) { + bloomFilter_.merge(serializedData); + } bool testInt64(int64_t value) const final { - return BloomFilter<>::mayContain(serializedData_.data(), folly::hasher()(value)); + return bloomFilter_.mayContain(folly::hasher()(value)); } bool testInt64Range(int64_t /*min*/, int64_t /*max*/, bool /*hasNull*/) const final { @@ -58,20 +62,21 @@ class SparkBloomFilter final : public common::Filter { } std::unique_ptr clone(std::optional nullAllowed) const override { - return std::make_unique(serializedData_, nullAllowed.value_or(nullAllowed_)); + std::vector data(bloomFilter_.serializedSize()); + bloomFilter_.serialize(data.data()); + return std::make_unique(data.data(), nullAllowed.value_or(nullAllowed_)); } bool testingEquals(const Filter& other) const override { - auto* otherFilter = dynamic_cast(&other); - return otherFilter != nullptr && serializedData_ == otherFilter->serializedData_; + return dynamic_cast(&other) != nullptr; } folly::dynamic serialize() const override { - VELOX_UNSUPPORTED("Serialization is not supported for SparkBloomFilter"); + VELOX_UNSUPPORTED("Serialization is not supported for SparkMightContain"); } private: - std::vector serializedData_; + BloomFilter<> bloomFilter_; }; std::optional>> combine( @@ -155,9 +160,8 @@ SparkExprToSubfieldFilterParser::leafCallToSubfieldFilter( auto bloomFilterValue = toConstant(call.inputs()[0], evaluator); if (bloomFilterValue && !bloomFilterValue->isNullAt(0)) { auto sv = bloomFilterValue->as>()->valueAt(0); - std::vector data(sv.data(), sv.data() + sv.size()); std::unique_ptr filter = - std::make_unique(std::move(data), false /*nullAllowed*/); + std::make_unique(sv.data(), false /*nullAllowed*/); return combine(subfield, filter); } }