Skip to content

Commit 15a6d02

Browse files
committed
simplify
1 parent 74abb8b commit 15a6d02

File tree

4 files changed

+63
-135
lines changed

4 files changed

+63
-135
lines changed

parquet/src/arrow/arrow_writer/byte_array.rs

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
use crate::basic::Encoding;
1919
use crate::bloom_filter::Sbbf;
20-
use crate::column::writer::encoder::{ColumnValueEncoder, DataPageValues, DictionaryPage};
20+
use crate::column::writer::encoder::{
21+
ColumnValueEncoder, DataPageValues, DictionaryPage, create_bloom_filter,
22+
};
2123
use crate::data_type::{AsBytes, ByteArray, Int32Type};
2224
use crate::encodings::encoding::{DeltaBitPackEncoder, Encoder};
2325
use crate::encodings::rle::RleEncoder;
@@ -423,7 +425,7 @@ pub struct ByteArrayEncoder {
423425
min_value: Option<ByteArray>,
424426
max_value: Option<ByteArray>,
425427
bloom_filter: Option<Sbbf>,
426-
bloom_filter_target_fpp: Option<f64>,
428+
bloom_filter_target_fpp: f64,
427429
geo_stats_accumulator: Option<Box<dyn GeoStatsAccumulator>>,
428430
}
429431

@@ -432,9 +434,7 @@ impl ColumnValueEncoder for ByteArrayEncoder {
432434
type Values = dyn Array;
433435
fn flush_bloom_filter(&mut self) -> Option<Sbbf> {
434436
let mut sbbf = self.bloom_filter.take()?;
435-
if let Some(target_fpp) = self.bloom_filter_target_fpp {
436-
sbbf.fold_to_target_fpp(target_fpp);
437-
}
437+
sbbf.fold_to_target_fpp(self.bloom_filter_target_fpp);
438438
Some(sbbf)
439439
}
440440

@@ -448,15 +448,7 @@ impl ColumnValueEncoder for ByteArrayEncoder {
448448

449449
let fallback = FallbackEncoder::new(descr, props)?;
450450

451-
let (bloom_filter, bloom_filter_target_fpp) =
452-
match props.bloom_filter_properties(descr.path()) {
453-
Some(bf_props) => {
454-
let (sbbf, target_fpp) =
455-
Sbbf::from_properties(bf_props, props.max_row_group_row_count())?;
456-
(Some(sbbf), target_fpp)
457-
}
458-
None => (None, None),
459-
};
451+
let (bloom_filter, bloom_filter_target_fpp) = create_bloom_filter(props, descr)?;
460452

461453
let statistics_enabled = props.statistics_enabled(descr.path());
462454

parquet/src/bloom_filter/mod.rs

Lines changed: 5 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@ use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash
8282
use crate::data_type::AsBytes;
8383
use crate::errors::{ParquetError, Result};
8484
use crate::file::metadata::ColumnChunkMetaData;
85-
use crate::file::properties::{BloomFilterProperties, DEFAULT_MAX_ROW_GROUP_ROW_COUNT};
8685
use crate::file::reader::ChunkReader;
8786
use crate::parquet_thrift::{
8887
ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol, ThriftCompactOutputProtocol,
@@ -205,14 +204,12 @@ impl std::ops::IndexMut<usize> for Block {
205204
/// parallel bit manipulation. When checking membership, only one block is accessed per query,
206205
/// eliminating the cache-miss penalty of standard Bloom filters.
207206
///
208-
/// ## Two sizing modes
207+
/// ## Sizing and folding
209208
///
210-
/// - **Fixed-size mode**: Created via [`Sbbf::new_with_ndv_fpp`] when the number of distinct
211-
/// values (NDV) is known. The filter is sized exactly for the given NDV and FPP.
212-
///
213-
/// - **Folding mode**: Created via [`Sbbf::new_with_num_of_bytes`] at a conservatively large
214-
/// size, then compacted after all values are inserted by calling [`Sbbf::fold_to_target_fpp`].
215-
/// This eliminates the need to know NDV upfront.
209+
/// Filters are initially sized for a maximum expected number of distinct values (NDV) via
210+
/// [`Sbbf::new_with_ndv_fpp`]. After all values are inserted, the filter is compacted by
211+
/// calling [`Sbbf::fold_to_target_fpp`], which folds the filter down to the smallest size
212+
/// that still meets the target false positive probability.
216213
///
217214
/// The creation of this structure is based on the [`crate::file::properties::BloomFilterProperties`]
218215
/// struct set via [`crate::file::properties::WriterProperties`] and is thus hidden by default.
@@ -276,34 +273,6 @@ pub(crate) fn num_of_bits_from_ndv_fpp(ndv: u64, fpp: f64) -> usize {
276273
}
277274

278275
impl Sbbf {
279-
/// Create a bloom filter from [`BloomFilterProperties`], returning the filter and an
280-
/// optional target FPP for folding mode.
281-
///
282-
/// | `ndv` | `max_bytes` | Behavior |
283-
/// |-------|-------------|----------|
284-
/// | Set | _ | Fixed-size filter sized for that NDV (legacy). Returns `None` for FPP. |
285-
/// | None | Set | Filter of `max_bytes`, folded at flush via [`Sbbf::fold_to_target_fpp`]. |
286-
/// | None | None | Filter sized for `max_distinct_items` (default: [`DEFAULT_MAX_ROW_GROUP_ROW_COUNT`]) items at the given FPP, folded at flush. |
287-
pub fn from_properties(
288-
props: &BloomFilterProperties,
289-
max_distinct_items: Option<usize>,
290-
) -> Result<(Self, Option<f64>)> {
291-
match props.ndv {
292-
Some(ndv) => {
293-
// Fixed-size mode: size based on explicit NDV (legacy behavior)
294-
Ok((Self::new_with_ndv_fpp(ndv, props.fpp)?, None))
295-
}
296-
None => {
297-
// Folding mode: allocate large, fold down at flush
298-
let max_bytes = props.max_bytes.unwrap_or_else(|| {
299-
let ndv = max_distinct_items.unwrap_or(DEFAULT_MAX_ROW_GROUP_ROW_COUNT) as u64;
300-
num_of_bits_from_ndv_fpp(ndv, props.fpp) / 8
301-
});
302-
Ok((Self::new_with_num_of_bytes(max_bytes), Some(props.fpp)))
303-
}
304-
}
305-
}
306-
307276
/// Create a new [Sbbf] with given number of distinct values and false positive probability.
308277
/// Will return an error if `fpp` is greater than or equal to 1.0 or less than 0.0.
309278
pub fn new_with_ndv_fpp(ndv: u64, fpp: f64) -> Result<Self, ParquetError> {

parquet/src/column/writer/encoder.rs

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ use crate::data_type::DataType;
2727
use crate::data_type::private::ParquetValueType;
2828
use crate::encodings::encoding::{DictEncoder, Encoder, get_encoder};
2929
use crate::errors::{ParquetError, Result};
30-
use crate::file::properties::{EnabledStatistics, WriterProperties};
30+
use crate::file::properties::{
31+
DEFAULT_MAX_ROW_GROUP_ROW_COUNT, EnabledStatistics, WriterProperties,
32+
};
3133
use crate::geospatial::accumulator::{GeoStatsAccumulator, try_new_geo_stats_accumulator};
3234
use crate::geospatial::statistics::GeospatialStatistics;
3335
use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
@@ -138,7 +140,7 @@ pub struct ColumnValueEncoderImpl<T: DataType> {
138140
min_value: Option<T::T>,
139141
max_value: Option<T::T>,
140142
bloom_filter: Option<Sbbf>,
141-
bloom_filter_target_fpp: Option<f64>,
143+
bloom_filter_target_fpp: f64,
142144
variable_length_bytes: Option<i64>,
143145
geo_stats_accumulator: Option<Box<dyn GeoStatsAccumulator>>,
144146
}
@@ -189,9 +191,7 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
189191

190192
fn flush_bloom_filter(&mut self) -> Option<Sbbf> {
191193
let mut sbbf = self.bloom_filter.take()?;
192-
if let Some(target_fpp) = self.bloom_filter_target_fpp {
193-
sbbf.fold_to_target_fpp(target_fpp);
194-
}
194+
sbbf.fold_to_target_fpp(self.bloom_filter_target_fpp);
195195
Some(sbbf)
196196
}
197197

@@ -210,15 +210,7 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
210210

211211
let statistics_enabled = props.statistics_enabled(descr.path());
212212

213-
let (bloom_filter, bloom_filter_target_fpp) =
214-
match props.bloom_filter_properties(descr.path()) {
215-
Some(bf_props) => {
216-
let (sbbf, target_fpp) =
217-
Sbbf::from_properties(bf_props, props.max_row_group_row_count())?;
218-
(Some(sbbf), target_fpp)
219-
}
220-
None => (None, None),
221-
};
213+
let (bloom_filter, bloom_filter_target_fpp) = create_bloom_filter(props, descr)?;
222214

223215
let geo_stats_accumulator = try_new_geo_stats_accumulator(descr);
224216

@@ -395,6 +387,28 @@ fn replace_zero<T: ParquetValueType>(val: &T, descr: &ColumnDescriptor, replace:
395387
}
396388
}
397389

390+
/// Creates a bloom filter sized for the column's configured NDV (or the row group size
391+
/// if NDV is not explicitly set), returning the filter and the target FPP for folding.
392+
pub(crate) fn create_bloom_filter(
393+
props: &WriterProperties,
394+
descr: &ColumnDescPtr,
395+
) -> Result<(Option<Sbbf>, f64)> {
396+
match props.bloom_filter_properties(descr.path()) {
397+
Some(bf_props) => {
398+
let ndv = bf_props.ndv.unwrap_or(
399+
props
400+
.max_row_group_row_count()
401+
.unwrap_or(DEFAULT_MAX_ROW_GROUP_ROW_COUNT) as u64,
402+
);
403+
Ok((
404+
Some(Sbbf::new_with_ndv_fpp(ndv, bf_props.fpp)?),
405+
bf_props.fpp,
406+
))
407+
}
408+
None => Ok((None, 0.0)),
409+
}
410+
}
411+
398412
fn update_geo_stats_accumulator<'a, T, I>(bounder: &mut dyn GeoStatsAccumulator, iter: I)
399413
where
400414
T: ParquetValueType + 'a,

parquet/src/file/properties.rs

Lines changed: 24 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,7 @@ pub const DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH: Option<usize> = Some(64);
5454
/// Default value for [`BloomFilterProperties::fpp`]
5555
pub const DEFAULT_BLOOM_FILTER_FPP: f64 = 0.05;
5656
/// Default value for [`BloomFilterProperties::ndv`]
57-
#[deprecated(note = "NDV is now optional; bloom filters use folding mode by default")]
58-
pub const DEFAULT_BLOOM_FILTER_NDV: u64 = 1_000_000_u64;
57+
pub const DEFAULT_BLOOM_FILTER_NDV: u64 = DEFAULT_MAX_ROW_GROUP_ROW_COUNT as u64;
5958
/// Default values for [`WriterProperties::statistics_truncate_length`]
6059
pub const DEFAULT_STATISTICS_TRUNCATE_LENGTH: Option<usize> = Some(64);
6160
/// Default value for [`WriterProperties::offset_index_disabled`]
@@ -997,11 +996,13 @@ impl WriterPropertiesBuilder {
997996
self
998997
}
999998

1000-
/// Sets default number of distinct values (ndv) for bloom filter for all columns.
999+
/// Sets default maximum expected number of distinct values (ndv) for bloom filter
1000+
/// for all columns (defaults to [`DEFAULT_BLOOM_FILTER_NDV`]).
10011001
///
1002-
/// When set, this activates fixed-size mode: the bloom filter is sized exactly for
1003-
/// the given NDV at the configured FPP, with no folding. When not set (default),
1004-
/// the bloom filter uses folding mode instead.
1002+
/// The bloom filter is initially sized for this many distinct values at the
1003+
/// configured FPP, then folded down after all values are inserted to achieve
1004+
/// optimal size. A good heuristic is to set this to the expected number of rows
1005+
/// in the row group.
10051006
///
10061007
/// Implicitly enables bloom writing, as if [`set_bloom_filter_enabled`] had
10071008
/// been called.
@@ -1012,26 +1013,6 @@ impl WriterPropertiesBuilder {
10121013
self
10131014
}
10141015

1015-
/// Sets the default maximum initial allocation size in bytes for bloom filter folding mode
1016-
/// for all columns.
1017-
///
1018-
/// When bloom filters use folding mode (no explicit NDV), this controls the initial
1019-
/// allocation size. The filter will be folded down at flush time to meet the target FPP.
1020-
/// If not set, the initial size is derived from `max_row_group_row_count` and `fpp`.
1021-
///
1022-
/// The value will be rounded up to the next power of two, bounded by
1023-
/// [`BITSET_MIN_LENGTH`](crate::bloom_filter::BITSET_MIN_LENGTH) and
1024-
/// [`BITSET_MAX_LENGTH`](crate::bloom_filter::BITSET_MAX_LENGTH).
1025-
///
1026-
/// Implicitly enables bloom writing, as if [`set_bloom_filter_enabled`] had been called.
1027-
///
1028-
/// [`set_bloom_filter_enabled`]: Self::set_bloom_filter_enabled
1029-
pub fn set_bloom_filter_max_bytes(mut self, value: usize) -> Self {
1030-
self.default_column_properties
1031-
.set_bloom_filter_max_bytes(value);
1032-
self
1033-
}
1034-
10351016
// ----------------------------------------------------------------------
10361017
// Setters for a specific column
10371018

@@ -1137,15 +1118,6 @@ impl WriterPropertiesBuilder {
11371118
self.get_mut_props(col).set_bloom_filter_ndv(value);
11381119
self
11391120
}
1140-
1141-
/// Sets the maximum initial allocation size in bytes for bloom filter folding mode
1142-
/// for a specific column.
1143-
///
1144-
/// Takes precedence over [`Self::set_bloom_filter_max_bytes`].
1145-
pub fn set_column_bloom_filter_max_bytes(mut self, col: ColumnPath, value: usize) -> Self {
1146-
self.get_mut_props(col).set_bloom_filter_max_bytes(value);
1147-
self
1148-
}
11491121
}
11501122

11511123
impl From<WriterProperties> for WriterPropertiesBuilder {
@@ -1225,15 +1197,12 @@ impl Default for EnabledStatistics {
12251197

12261198
/// Controls the bloom filter to be computed by the writer.
12271199
///
1228-
/// Two modes are supported:
1200+
/// The bloom filter is initially sized for `ndv` distinct values at the given `fpp`, then
1201+
/// automatically folded down after all values are inserted to achieve optimal size while
1202+
/// maintaining the target `fpp`. See [`Sbbf::fold_to_target_fpp`] for details on the
1203+
/// folding algorithm.
12291204
///
1230-
/// - **Fixed-size mode**: When `ndv` is set to `Some(n)`, the bloom filter is sized based on `ndv`
1231-
/// and `fpp` at allocation time. This is the legacy behavior.
1232-
///
1233-
/// - **Folding mode** (default): When `ndv` is `None`, a conservatively large bloom filter is
1234-
/// allocated (sized for worst-case NDV = max row group rows, or `max_bytes` if set), then
1235-
/// folded down at flush time to meet the target `fpp`. This eliminates the need to guess NDV
1236-
/// upfront and produces optimally-sized filters automatically.
1205+
/// [`Sbbf::fold_to_target_fpp`]: crate::bloom_filter::Sbbf::fold_to_target_fpp
12371206
#[derive(Debug, Clone, PartialEq)]
12381207
pub struct BloomFilterProperties {
12391208
/// False positive probability. This should be always between 0 and 1 exclusive. Defaults to [`DEFAULT_BLOOM_FILTER_FPP`].
@@ -1242,32 +1211,28 @@ pub struct BloomFilterProperties {
12421211
///
12431212
/// The bloom filter data structure is a trade of between disk and memory space versus fpp, the
12441213
/// smaller the fpp, the more memory and disk space is required, thus setting it to a reasonable value
1245-
/// e.g. 0.1, 0.01, or 0.001 is recommended.
1214+
/// e.g. 0.1, 0.05, or 0.001 is recommended.
1215+
///
1216+
/// This value also serves as the target FPP for bloom filter folding: after all values
1217+
/// are inserted, the filter is folded down to the smallest size that still meets this FPP.
12461218
pub fpp: f64,
1247-
/// Number of distinct values. When set to `Some(n)`, the bloom filter is sized exactly for
1248-
/// `n` distinct values at the given `fpp` (fixed-size mode). When `None` (default), the
1249-
/// filter uses folding mode instead.
1219+
/// Maximum expected number of distinct values. When `None` (default), the bloom filter
1220+
/// is sized based on the row group's `max_row_group_row_count` at runtime.
12501221
///
12511222
/// You should set this value by calling [`WriterPropertiesBuilder::set_bloom_filter_ndv`].
12521223
///
1253-
/// Usage of bloom filter is most beneficial for columns with large cardinality, so a good heuristic
1254-
/// is to set ndv to the number of rows. However, it can reduce disk size if you know in advance a smaller
1255-
/// number of distinct values.
1224+
/// The bloom filter is initially sized for this many distinct values at the given `fpp`,
1225+
/// then folded down after insertion to achieve optimal size. A good heuristic is to set
1226+
/// this to the expected number of rows in the row group. If fewer distinct values are
1227+
/// actually written, the filter will be automatically compacted via folding.
12561228
pub ndv: Option<u64>,
1257-
/// Maximum initial allocation size in bytes for folding mode. When `None` (default), the
1258-
/// initial size is derived from `max_row_group_row_count` and `fpp`. Only used when `ndv`
1259-
/// is `None`.
1260-
///
1261-
/// You should set this value by calling [`WriterPropertiesBuilder::set_bloom_filter_max_bytes`].
1262-
pub max_bytes: Option<usize>,
12631229
}
12641230

12651231
impl Default for BloomFilterProperties {
12661232
fn default() -> Self {
12671233
BloomFilterProperties {
12681234
fpp: DEFAULT_BLOOM_FILTER_FPP,
12691235
ndv: None,
1270-
max_bytes: None,
12711236
}
12721237
}
12731238
}
@@ -1364,22 +1329,14 @@ impl ColumnProperties {
13641329
.fpp = value;
13651330
}
13661331

1367-
/// Sets the number of distinct (unique) values for bloom filter for this column, and implicitly
1368-
/// enables bloom filter if not previously enabled. This activates fixed-size mode (no folding).
1332+
/// Sets the maximum expected number of distinct (unique) values for bloom filter for this
1333+
/// column, and implicitly enables bloom filter if not previously enabled.
13691334
fn set_bloom_filter_ndv(&mut self, value: u64) {
13701335
self.bloom_filter_properties
13711336
.get_or_insert_with(Default::default)
13721337
.ndv = Some(value);
13731338
}
13741339

1375-
/// Sets the maximum initial allocation size in bytes for bloom filter folding mode, and
1376-
/// implicitly enables bloom filter if not previously enabled.
1377-
fn set_bloom_filter_max_bytes(&mut self, value: usize) {
1378-
self.bloom_filter_properties
1379-
.get_or_insert_with(Default::default)
1380-
.max_bytes = Some(value);
1381-
}
1382-
13831340
/// Returns optional encoding for this column.
13841341
fn encoding(&self) -> Option<Encoding> {
13851342
self.encoding
@@ -1723,7 +1680,6 @@ mod tests {
17231680
Some(&BloomFilterProperties {
17241681
fpp: 0.1,
17251682
ndv: Some(100),
1726-
max_bytes: None,
17271683
})
17281684
);
17291685
}
@@ -1762,7 +1718,6 @@ mod tests {
17621718
Some(&BloomFilterProperties {
17631719
fpp: DEFAULT_BLOOM_FILTER_FPP,
17641720
ndv: None,
1765-
max_bytes: None,
17661721
})
17671722
);
17681723
}
@@ -1806,7 +1761,6 @@ mod tests {
18061761
Some(&BloomFilterProperties {
18071762
fpp: DEFAULT_BLOOM_FILTER_FPP,
18081763
ndv: Some(100),
1809-
max_bytes: None,
18101764
})
18111765
);
18121766
assert_eq!(
@@ -1817,7 +1771,6 @@ mod tests {
18171771
Some(&BloomFilterProperties {
18181772
fpp: 0.1,
18191773
ndv: None,
1820-
max_bytes: None,
18211774
})
18221775
);
18231776
}

0 commit comments

Comments
 (0)