Skip to content

Commit c133333

Browse files
RoseZhang123oldukhnoODukhno
authored
Make Parquet SBBF serialize/deserialize helpers public for external reuse (#8762)
# Which issue does this PR close? - Closes #8727 . # Rationale for this change Explained in the issue #8727 . # What changes are included in this PR? Make the following method signatures public: ``` pub(crate) fn new(bitset: &[u8]) -> Self; pub(crate) fn write<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> ``` # Are these changes tested? Added unit tests for them. # Are there any user-facing changes? Users is now able to deserialize SBBFs straight from storage and re-serialize them form raw bytes. --------- Co-authored-by: Oleksii Dukhno <oldukhno@microsoft.com> Co-authored-by: Oleksii Dukhno <oleksii.dukhno@gmail.com>
1 parent 6b290d1 commit c133333

File tree

1 file changed

+99
-2
lines changed
  • parquet/src/bloom_filter

1 file changed

+99
-2
lines changed

parquet/src/bloom_filter/mod.rs

Lines changed: 99 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -215,9 +215,18 @@ pub(crate) fn chunk_read_bloom_filter_header_and_offset(
215215
#[inline]
216216
pub(crate) fn read_bloom_filter_header_and_length(
217217
buffer: Bytes,
218+
) -> Result<(BloomFilterHeader, u64), ParquetError> {
219+
read_bloom_filter_header_and_length_from_bytes(buffer.as_ref())
220+
}
221+
222+
/// Given a byte slice, try to read out a bloom filter header and return both the header and
223+
/// length of the header.
224+
#[inline]
225+
fn read_bloom_filter_header_and_length_from_bytes(
226+
buffer: &[u8],
218227
) -> Result<(BloomFilterHeader, u64), ParquetError> {
219228
let total_length = buffer.len();
220-
let mut prot = ThriftSliceInputProtocol::new(buffer.as_ref());
229+
let mut prot = ThriftSliceInputProtocol::new(buffer);
221230
let header = BloomFilterHeader::read_thrift(&mut prot)
222231
.map_err(|e| ParquetError::General(format!("Could not read bloom filter header: {e}")))?;
223232
Ok((header, (total_length - prot.as_slice().len()) as u64))
@@ -283,7 +292,8 @@ impl Sbbf {
283292
/// Write the bloom filter data (header and then bitset) to the output. This doesn't
284293
/// flush the writer in order to boost performance of bulk writing all blocks. Caller
285294
/// must remember to flush the writer.
286-
pub(crate) fn write<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
295+
/// This method usually is used in conjunction with [`Self::from_bytes`] for serialization/deserialization.
296+
pub fn write<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
287297
let mut protocol = ThriftCompactOutputProtocol::new(&mut writer);
288298
self.header().write_thrift(&mut protocol).map_err(|e| {
289299
ParquetError::General(format!("Could not write bloom filter header: {e}"))
@@ -417,6 +427,54 @@ impl Sbbf {
417427
pub(crate) fn estimated_memory_size(&self) -> usize {
418428
self.0.capacity() * std::mem::size_of::<Block>()
419429
}
430+
431+
/// Reads a Sbff from Thrift encoded bytes
432+
///
433+
/// # Examples
434+
///
435+
/// ```no_run
436+
/// # use parquet::errors::Result;
437+
/// # use parquet::bloom_filter::Sbbf;
438+
/// # fn main() -> Result<()> {
439+
/// // In a real application, you would read serialized bloom filter bytes from a cache.
440+
/// // This example demonstrates the deserialization process.
441+
/// // Assuming you have bloom filter bytes from a Parquet file:
442+
/// # let serialized_bytes: Vec<u8> = vec![];
443+
/// let bloom_filter = Sbbf::from_bytes(&serialized_bytes)?;
444+
/// // Now you can use the bloom filter to check for values
445+
/// if bloom_filter.check(&"some_value") {
446+
/// println!("Value might be present (or false positive)");
447+
/// } else {
448+
/// println!("Value is definitely not present");
449+
/// }
450+
/// # Ok(())
451+
/// # }
452+
/// ```
453+
pub fn from_bytes(bytes: &[u8]) -> Result<Self, ParquetError> {
454+
let (header, header_len) = read_bloom_filter_header_and_length_from_bytes(bytes)?;
455+
456+
let bitset_length: u64 = header
457+
.num_bytes
458+
.try_into()
459+
.map_err(|_| ParquetError::General("Bloom filter length is invalid".to_string()))?;
460+
461+
// Validate that bitset consumes all remaining bytes
462+
if header_len + bitset_length != bytes.len() as u64 {
463+
return Err(ParquetError::General(format!(
464+
"Bloom filter data contains extra bytes: expected {} total bytes, got {}",
465+
header_len + bitset_length,
466+
bytes.len()
467+
)));
468+
}
469+
470+
let start = header_len as usize;
471+
let end = (header_len + bitset_length) as usize;
472+
let bitset = bytes
473+
.get(start..end)
474+
.ok_or_else(|| ParquetError::General("Bloom filter bitset is invalid".to_string()))?;
475+
476+
Ok(Self::new(bitset))
477+
}
420478
}
421479

422480
// per spec we use xxHash with seed=0
@@ -541,4 +599,43 @@ mod tests {
541599
assert_eq!(*num_bits, num_of_bits_from_ndv_fpp(*ndv, *fpp) as u64);
542600
}
543601
}
602+
603+
#[test]
604+
fn test_sbbf_write_round_trip() {
605+
// Create a bloom filter with a 32-byte bitset (minimum size)
606+
let bitset_bytes = vec![0u8; 32];
607+
let mut original = Sbbf::new(&bitset_bytes);
608+
609+
// Insert some test values
610+
let test_values = ["hello", "world", "rust", "parquet", "bloom", "filter"];
611+
for value in &test_values {
612+
original.insert(value);
613+
}
614+
615+
// Serialize to bytes
616+
let mut output = Vec::new();
617+
original.write(&mut output).unwrap();
618+
619+
// Validate header was written correctly
620+
let mut protocol = ThriftSliceInputProtocol::new(&output);
621+
let header = BloomFilterHeader::read_thrift(&mut protocol).unwrap();
622+
assert_eq!(header.num_bytes, bitset_bytes.len() as i32);
623+
assert_eq!(header.algorithm, BloomFilterAlgorithm::BLOCK);
624+
assert_eq!(header.hash, BloomFilterHash::XXHASH);
625+
assert_eq!(header.compression, BloomFilterCompression::UNCOMPRESSED);
626+
627+
// Deserialize using from_bytes
628+
let reconstructed = Sbbf::from_bytes(&output).unwrap();
629+
630+
// Most importantly: verify the bloom filter WORKS correctly after round-trip
631+
// Note: bloom filters can have false positives, but should never have false negatives
632+
// So we can't assert !check(), but we should verify inserted values are found
633+
for value in &test_values {
634+
assert!(
635+
reconstructed.check(value),
636+
"Value '{}' should be present after round-trip",
637+
value
638+
);
639+
}
640+
}
544641
}

0 commit comments

Comments
 (0)