Skip to content

Commit c6ea0a5

Browse files
adriangbclaudefriendlymatthewemkornfieldviirya
authored
Add bloom filter folding to automatically size SBBF filters (#9628)
## Summary Bloom filters now support **folding mode**: allocate a conservatively large filter (sized for worst-case NDV), insert all values during writing, then fold down at flush time to meet a target FPP. This eliminates the need to guess NDV upfront and produces optimally-sized filters automatically. ### Changes - `BloomFilterProperties.ndv` changed from `u64` to `Option<u64>` — when `None` (new default), the filter is sized based on `max_row_group_row_count`; when `Some(n)`, the explicit NDV is used - `DEFAULT_BLOOM_FILTER_NDV` redefined to `DEFAULT_MAX_ROW_GROUP_ROW_COUNT as u64` (was hardcoded `1_000_000`) - Added `Sbbf::fold_to_target_fpp()` and supporting methods (`num_folds_for_target_fpp`, `fold_n`, `num_blocks`) with comprehensive documentation - `flush_bloom_filter()` in both `ColumnValueEncoderImpl` and `ByteArrayEncoder` now folds the filter before returning it - New `create_bloom_filter()` helper in `encoder.rs` centralizes bloom filter construction logic ### How folding works The SBBF fold operation merges adjacent block pairs (`block[2i] | block[2i+1]`) via bitwise OR, halving the filter size. This differs from standard Bloom filter folding (which merges halves at distance `m/2`) because SBBF uses multiplicative hashing for block selection: ``` block_index = ((hash >> 32) * num_blocks) >> 32 ``` When `num_blocks` is halved, the new index becomes `floor(original_index / 2)`, so adjacent blocks map to the same position. The number of safe folds is determined analytically from the average per-block fill rate: after `k` folds, expected fill is `1 - (1-f)^(2^k)`, giving `FPP = fill^8`. This requires only a single popcount scan over the blocks (no scratch allocation), then O(log N) floating-point ops to find the optimal fold count. The actual fold is then performed in a single pass. ### Benchmarks Filter sized for 1M NDV, varying actual distinct values inserted. Measured on Apple M3 Pro. **Fold overhead (fold_to_target_fpp only):** | Actual NDV | Time | Throughput | |---|---|---| | 1,000 | 39.1 µs | 838 Melem/s | | 10,000 | 34.2 µs | 960 Melem/s | | 100,000 | 32.5 µs | 1.01 Gelem/s | **End-to-end (insert + fold) vs insert-only:** | Actual NDV | Insert only | Insert + fold | Fold overhead | |---|---|---|---| | 1,000 | 14.7 µs | 49.1 µs | 34.4 µs (70%) | | 10,000 | 30.7 µs | 58.1 µs | 27.4 µs (47%) | | 100,000 | 162.5 µs | 189.8 µs | 27.3 µs (14%) | The fold cost is dominated by the popcount scan over the initial (large) filter. For the common case (100K values into a 1M-NDV filter), folding adds only ~14% overhead to the total insert+fold time. ### References Sailhan & Stehr, ["Folding and Unfolding Bloom Filters"](https://hal.science/hal-01126174v1/document), IEEE iThings 2012. Liang, ["Blocked Bloom Filters: Speeding Up Point Lookups in Tiger Postgres' Native Columnstore"](https://www.tigerdata.com/blog/blocked-bloom-filters-speeding-up-point-lookups-in-tiger-postgres-native-columnstore) ### Breaking changes There are no breaking API changes However, when bloom filters are enabled without specifying the number of distinct values, the bloom filters are automatically sized. Previously they would be sized using the default value of `DEFAULT_BLOOM_FILTER_NDV` ## Test plan - [x] All existing bloom filter unit tests pass - [x] All existing integration tests (sync + async reader roundtrips) pass - [x] New unit tests: fold correctness, no false negatives after folding, FPP target respected, minimum size guard - [x] New unit tests: folded filter is bit-identical to a fresh filter of the same size (proves correctness via two lemmas about SBBF hashing) - [x] New unit tests: multi-step folding, folded FPP matches fresh FPP empirically, fold size matches optimal fixed-size filter - [x] New integration test: `i32_column_bloom_filter_fixed_ndv` — roundtrip with both overestimated and underestimated NDV - [x] Full `cargo test -p parquet` passes 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Co-authored-by: Matthew Kim <38759997+friendlymatthew@users.noreply.github.com> Co-authored-by: emkornfield <emkornfield@gmail.com> Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com> Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent 6f02fcf commit c6ea0a5

File tree

7 files changed

+876
-50
lines changed

7 files changed

+876
-50
lines changed

parquet/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,5 +275,9 @@ name = "row_selection_cursor"
275275
harness = false
276276
required-features = ["arrow"]
277277

278+
[[bench]]
279+
name = "bloom_filter"
280+
harness = false
281+
278282
[lib]
279283
bench = false

parquet/benches/bloom_filter.rs

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, criterion_main};
19+
use parquet::bloom_filter::Sbbf;
20+
21+
/// Build a bloom filter sized for `initial_ndv` at `fpp`, insert `num_values` distinct values,
22+
/// and return it ready for folding.
23+
fn build_filter(initial_ndv: u64, fpp: f64, num_values: u64) -> Sbbf {
24+
let mut sbbf = Sbbf::new_with_ndv_fpp(initial_ndv, fpp).unwrap();
25+
for i in 0..num_values {
26+
sbbf.insert(&i);
27+
}
28+
sbbf
29+
}
30+
31+
fn bench_fold_to_target_fpp(c: &mut Criterion) {
32+
let mut group = c.benchmark_group("fold_to_target_fpp");
33+
34+
// Realistic scenario: filter sized for 1M NDV, varying actual distinct values
35+
let initial_ndv = 1_000_000u64;
36+
let fpp = 0.05;
37+
38+
for num_values in [1_000u64, 10_000, 100_000] {
39+
let filter = build_filter(initial_ndv, fpp, num_values);
40+
let num_blocks = filter.num_blocks();
41+
group.throughput(Throughput::Elements(num_blocks as u64));
42+
group.bench_with_input(BenchmarkId::new("ndv", num_values), &filter, |b, filter| {
43+
b.iter_batched(
44+
|| filter.clone(),
45+
|mut f| {
46+
f.fold_to_target_fpp(fpp);
47+
f
48+
},
49+
criterion::BatchSize::SmallInput,
50+
);
51+
});
52+
}
53+
group.finish();
54+
}
55+
56+
fn bench_insert_and_fold(c: &mut Criterion) {
57+
let mut group = c.benchmark_group("insert_and_fold");
58+
59+
let initial_ndv = 1_000_000u64;
60+
let fpp = 0.05;
61+
62+
for num_values in [1_000u64, 10_000, 100_000] {
63+
group.throughput(Throughput::Elements(num_values));
64+
group.bench_with_input(
65+
BenchmarkId::new("values", num_values),
66+
&num_values,
67+
|b, &num_values| {
68+
b.iter(|| {
69+
let mut sbbf = Sbbf::new_with_ndv_fpp(initial_ndv, fpp).unwrap();
70+
for i in 0..num_values {
71+
sbbf.insert(&i);
72+
}
73+
sbbf.fold_to_target_fpp(fpp);
74+
sbbf
75+
});
76+
},
77+
);
78+
}
79+
group.finish();
80+
}
81+
82+
fn bench_insert_only(c: &mut Criterion) {
83+
let mut group = c.benchmark_group("insert_only");
84+
85+
let initial_ndv = 1_000_000u64;
86+
let fpp = 0.05;
87+
88+
for num_values in [1_000u64, 10_000, 100_000] {
89+
group.throughput(Throughput::Elements(num_values));
90+
group.bench_with_input(
91+
BenchmarkId::new("values", num_values),
92+
&num_values,
93+
|b, &num_values| {
94+
b.iter(|| {
95+
let mut sbbf = Sbbf::new_with_ndv_fpp(initial_ndv, fpp).unwrap();
96+
for i in 0..num_values {
97+
sbbf.insert(&i);
98+
}
99+
sbbf
100+
});
101+
},
102+
);
103+
}
104+
group.finish();
105+
}
106+
107+
criterion_group!(
108+
benches,
109+
bench_fold_to_target_fpp,
110+
bench_insert_and_fold,
111+
bench_insert_only
112+
);
113+
criterion_main!(benches);

parquet/src/arrow/arrow_writer/byte_array.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
use crate::basic::Encoding;
1919
use crate::bloom_filter::Sbbf;
20-
use crate::column::writer::encoder::{ColumnValueEncoder, DataPageValues, DictionaryPage};
20+
use crate::column::writer::encoder::{
21+
ColumnValueEncoder, DataPageValues, DictionaryPage, create_bloom_filter,
22+
};
2123
use crate::data_type::{AsBytes, ByteArray, Int32Type};
2224
use crate::encodings::encoding::{DeltaBitPackEncoder, Encoder};
2325
use crate::encodings::rle::RleEncoder;
@@ -423,14 +425,17 @@ pub struct ByteArrayEncoder {
423425
min_value: Option<ByteArray>,
424426
max_value: Option<ByteArray>,
425427
bloom_filter: Option<Sbbf>,
428+
bloom_filter_target_fpp: f64,
426429
geo_stats_accumulator: Option<Box<dyn GeoStatsAccumulator>>,
427430
}
428431

429432
impl ColumnValueEncoder for ByteArrayEncoder {
430433
type T = ByteArray;
431434
type Values = dyn Array;
432435
fn flush_bloom_filter(&mut self) -> Option<Sbbf> {
433-
self.bloom_filter.take()
436+
let mut sbbf = self.bloom_filter.take()?;
437+
sbbf.fold_to_target_fpp(self.bloom_filter_target_fpp);
438+
Some(sbbf)
434439
}
435440

436441
fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self>
@@ -443,10 +448,7 @@ impl ColumnValueEncoder for ByteArrayEncoder {
443448

444449
let fallback = FallbackEncoder::new(descr, props)?;
445450

446-
let bloom_filter = props
447-
.bloom_filter_properties(descr.path())
448-
.map(|props| Sbbf::new_with_ndv_fpp(props.ndv, props.fpp))
449-
.transpose()?;
451+
let (bloom_filter, bloom_filter_target_fpp) = create_bloom_filter(props, descr)?;
450452

451453
let statistics_enabled = props.statistics_enabled(descr.path());
452454

@@ -456,6 +458,7 @@ impl ColumnValueEncoder for ByteArrayEncoder {
456458
fallback,
457459
statistics_enabled,
458460
bloom_filter,
461+
bloom_filter_target_fpp,
459462
dict_encoder: dictionary,
460463
min_value: None,
461464
max_value: None,

parquet/src/arrow/arrow_writer/mod.rs

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2681,6 +2681,7 @@ mod tests {
26812681
values: ArrayRef,
26822682
schema: SchemaRef,
26832683
bloom_filter: bool,
2684+
bloom_filter_ndv: Option<u64>,
26842685
bloom_filter_position: BloomFilterPosition,
26852686
}
26862687

@@ -2692,6 +2693,7 @@ mod tests {
26922693
values,
26932694
schema: Arc::new(schema),
26942695
bloom_filter: false,
2696+
bloom_filter_ndv: None,
26952697
bloom_filter_position: BloomFilterPosition::AfterRowGroup,
26962698
}
26972699
}
@@ -2712,6 +2714,7 @@ mod tests {
27122714
values,
27132715
schema,
27142716
bloom_filter,
2717+
bloom_filter_ndv,
27152718
bloom_filter_position,
27162719
} = options;
27172720

@@ -2750,15 +2753,18 @@ mod tests {
27502753
for encoding in &encodings {
27512754
for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
27522755
for row_group_size in row_group_sizes {
2753-
let props = WriterProperties::builder()
2756+
let mut builder = WriterProperties::builder()
27542757
.set_writer_version(version)
27552758
.set_max_row_group_row_count(Some(row_group_size))
27562759
.set_dictionary_enabled(dictionary_size != 0)
27572760
.set_dictionary_page_size_limit(dictionary_size.max(1))
27582761
.set_encoding(*encoding)
27592762
.set_bloom_filter_enabled(bloom_filter)
2760-
.set_bloom_filter_position(bloom_filter_position)
2761-
.build();
2763+
.set_bloom_filter_position(bloom_filter_position);
2764+
if let Some(ndv) = bloom_filter_ndv {
2765+
builder = builder.set_bloom_filter_ndv(ndv);
2766+
}
2767+
let props = builder.build();
27622768

27632769
files.push(roundtrip_opts(&expected_batch, props))
27642770
}
@@ -3142,6 +3148,41 @@ mod tests {
31423148
);
31433149
}
31443150

3151+
/// Test that bloom filter folding produces correct results even when
3152+
/// the configured NDV differs significantly from actual NDV.
3153+
/// A large NDV means a larger initial filter that gets folded down;
3154+
/// a small NDV means a smaller initial filter.
3155+
#[test]
3156+
fn i32_column_bloom_filter_fixed_ndv() {
3157+
let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
3158+
3159+
// NDV much larger than actual distinct values — tests folding a large filter down
3160+
let mut options = RoundTripOptions::new(array.clone(), false);
3161+
options.bloom_filter = true;
3162+
options.bloom_filter_ndv = Some(1_000_000);
3163+
3164+
let files = one_column_roundtrip_with_options(options);
3165+
check_bloom_filter(
3166+
files,
3167+
"col".to_string(),
3168+
(0..SMALL_SIZE as i32).collect(),
3169+
(SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3170+
);
3171+
3172+
// NDV smaller than actual distinct values — tests the underestimate path
3173+
let mut options = RoundTripOptions::new(array, false);
3174+
options.bloom_filter = true;
3175+
options.bloom_filter_ndv = Some(3);
3176+
3177+
let files = one_column_roundtrip_with_options(options);
3178+
check_bloom_filter(
3179+
files,
3180+
"col".to_string(),
3181+
(0..SMALL_SIZE as i32).collect(),
3182+
(SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3183+
);
3184+
}
3185+
31453186
#[test]
31463187
fn binary_column_bloom_filter() {
31473188
let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();

0 commit comments

Comments
 (0)