Skip to content

Commit c891c35

Browse files
committed
refactor(parquet): replace magic 8 literals with named constants
The literal `8` appeared in two distinct roles throughout `RleEncoder`, `RleDecoder`, and their tests. Replacing each with a named constant makes the intent explicit and prevents the two meanings from being confused. `BIT_PACK_GROUP_SIZE = 8` The Parquet RLE/bit-packing hybrid format always bit-packs values in multiples of this count (spec: "we always bit-pack a multiple of 8 values at a time"). Every occurrence related to the staging buffer size, the repeat-count threshold that triggers the RLE decision, and the group-count arithmetic in bit-packed headers now uses this name. `u8::BITS` (= 8, from std) Used wherever a bit-count is divided by 8 to obtain a byte-count (e.g. `ceil(bit_width, u8::BITS as usize)`). This is a bits-per-byte conversion, a fundamentally different concept from the packing-group size. No behaviour change. Signed-off-by: Hippolyte Barraud <hippolyte.barraud@datadoghq.com>
1 parent ab9a7bc commit c891c35

3 files changed

Lines changed: 79 additions & 69 deletions

File tree

parquet/src/arrow/arrow_writer/levels.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
//! \[1\] [parquet-format#nested-encoding](https://github.com/apache/parquet-format#nested-encoding)
4242
4343
use crate::column::chunker::CdcChunk;
44+
use crate::column::writer::{LevelDataRef, ValueSelectionRef};
4445
use crate::errors::{ParquetError, Result};
4546
use arrow_array::cast::AsArray;
4647
use arrow_array::{Array, ArrayRef, OffsetSizeTrait};
@@ -822,6 +823,17 @@ impl LevelData {
822823
}
823824
}
824825

826+
pub(crate) fn as_ref(&self) -> LevelDataRef<'_> {
827+
match self {
828+
Self::Absent => LevelDataRef::Absent,
829+
Self::Materialized(values) => LevelDataRef::Materialized(values),
830+
Self::Uniform { value, count } => LevelDataRef::Uniform {
831+
value: *value,
832+
count: *count,
833+
},
834+
}
835+
}
836+
825837
pub(crate) fn slice(&self, offset: usize, len: usize) -> Self {
826838
match self {
827839
Self::Absent => Self::Absent,
@@ -919,6 +931,17 @@ impl PartialEq for ValueSelection {
919931
impl Eq for ValueSelection {}
920932

921933
impl ValueSelection {
934+
pub(crate) fn as_ref(&self) -> ValueSelectionRef<'_> {
935+
match self {
936+
Self::Empty => ValueSelectionRef::Empty,
937+
Self::Dense { offset, len } => ValueSelectionRef::Dense {
938+
offset: *offset,
939+
len: *len,
940+
},
941+
Self::Sparse(indices) => ValueSelectionRef::Sparse(indices),
942+
}
943+
}
944+
922945
fn from_indices(indices: Vec<usize>) -> Self {
923946
match (indices.first().copied(), indices.last().copied()) {
924947
(None, _) => Self::Empty,

parquet/src/arrow/arrow_writer/mod.rs

Lines changed: 14 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,7 @@ use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
4141
use crate::column::page_encryption::PageEncryptor;
4242
use crate::column::writer::encoder::ColumnValueEncoder;
4343
use crate::column::writer::{
44-
ColumnCloseResult, ColumnWriter, GenericColumnWriter, LevelDataRef, ValueSelectionRef,
45-
get_column_writer,
44+
ColumnCloseResult, ColumnWriter, GenericColumnWriter, ValueSelectionRef, get_column_writer,
4645
};
4746
use crate::data_type::{ByteArray, FixedLenByteArray};
4847
#[cfg(feature = "encryption")]
@@ -54,7 +53,7 @@ use crate::file::reader::{ChunkReader, Length};
5453
use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
5554
use crate::parquet_thrift::{ThriftCompactOutputProtocol, WriteThrift};
5655
use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor};
57-
use levels::{ArrayLevels, LevelData, ValueSelection, calculate_array_levels};
56+
use levels::{ArrayLevels, calculate_array_levels};
5857

5958
mod byte_array;
6059
mod levels;
@@ -888,28 +887,6 @@ enum ArrowColumnWriterImpl {
888887
Column(ColumnWriter<'static>),
889888
}
890889

891-
fn level_data_ref(levels: &LevelData) -> LevelDataRef<'_> {
892-
match levels {
893-
LevelData::Absent => LevelDataRef::Absent,
894-
LevelData::Materialized(values) => LevelDataRef::Materialized(values),
895-
LevelData::Uniform { value, count } => LevelDataRef::Uniform {
896-
value: *value,
897-
count: *count,
898-
},
899-
}
900-
}
901-
902-
fn value_selection_ref(selection: &ValueSelection) -> ValueSelectionRef<'_> {
903-
match selection {
904-
ValueSelection::Empty => ValueSelectionRef::Empty,
905-
ValueSelection::Dense { offset, len } => ValueSelectionRef::Dense {
906-
offset: *offset,
907-
len: *len,
908-
},
909-
ValueSelection::Sparse(indices) => ValueSelectionRef::Sparse(indices),
910-
}
911-
}
912-
913890
impl ArrowColumnWriter {
914891
/// Write an [`ArrowLeafColumn`]
915892
pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> {
@@ -924,8 +901,8 @@ impl ArrowColumnWriter {
924901
) -> Result<()> {
925902
let levels = &col.0;
926903
let chunks = chunker.get_arrow_chunks(
927-
level_data_ref(levels.def_level_data()),
928-
level_data_ref(levels.rep_level_data()),
904+
levels.def_level_data().as_ref(),
905+
levels.rep_level_data().as_ref(),
929906
levels.array(),
930907
)?;
931908

@@ -1396,8 +1373,8 @@ fn write_leaf(
13961373
offset: 0,
13971374
len: values.len(),
13981375
},
1399-
level_data_ref(levels.def_level_data()),
1400-
level_data_ref(levels.rep_level_data()),
1376+
levels.def_level_data().as_ref(),
1377+
levels.rep_level_data().as_ref(),
14011378
None,
14021379
None,
14031380
None,
@@ -1558,8 +1535,8 @@ fn write_leaf(
15581535
offset: 0,
15591536
len: bytes.len(),
15601537
},
1561-
level_data_ref(levels.def_level_data()),
1562-
level_data_ref(levels.rep_level_data()),
1538+
levels.def_level_data().as_ref(),
1539+
levels.rep_level_data().as_ref(),
15631540
None,
15641541
None,
15651542
None,
@@ -1575,9 +1552,9 @@ fn write_primitive<E: ColumnValueEncoder>(
15751552
) -> Result<usize> {
15761553
writer.write_batch_internal(
15771554
values,
1578-
value_selection_ref(levels.value_selection()),
1579-
level_data_ref(levels.def_level_data()),
1580-
level_data_ref(levels.rep_level_data()),
1555+
levels.value_selection().as_ref(),
1556+
levels.def_level_data().as_ref(),
1557+
levels.rep_level_data().as_ref(),
15811558
None,
15821559
None,
15831560
None,
@@ -1591,9 +1568,9 @@ fn write_byte_array(
15911568
) -> Result<usize> {
15921569
writer.write_batch_internal(
15931570
values,
1594-
value_selection_ref(levels.value_selection()),
1595-
level_data_ref(levels.def_level_data()),
1596-
level_data_ref(levels.rep_level_data()),
1571+
levels.value_selection().as_ref(),
1572+
levels.def_level_data().as_ref(),
1573+
levels.rep_level_data().as_ref(),
15971574
None,
15981575
None,
15991576
None,

parquet/src/encodings/rle.rs

Lines changed: 42 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,12 @@ use bytes::Bytes;
4141
use crate::errors::{ParquetError, Result};
4242
use crate::util::bit_util::{self, BitReader, BitWriter, FromBitpacked};
4343

44-
/// Maximum groups of 8 values per bit-packed run. Current value is 64.
44+
/// Number of values in one bit-packed group. The Parquet RLE/bit-packing hybrid
45+
/// format always bit-packs values in multiples of this count (see the format spec:
46+
/// "we always bit-pack a multiple of 8 values at a time").
47+
const BIT_PACK_GROUP_SIZE: usize = 8;
48+
49+
/// Maximum groups of `BIT_PACK_GROUP_SIZE` values per bit-packed run. Current value is 64.
4550
const MAX_GROUPS_PER_BIT_PACKED_RUN: usize = 1 << 6;
4651

4752
/// A RLE/Bit-Packing hybrid encoder.
@@ -54,9 +59,9 @@ pub struct RleEncoder {
5459
bit_writer: BitWriter,
5560

5661
// Buffered values for bit-packed runs.
57-
buffered_values: [u64; 8],
62+
buffered_values: [u64; BIT_PACK_GROUP_SIZE],
5863

59-
// Number of current buffered values. Must be less than 8.
64+
// Number of current buffered values. Must be less than BIT_PACK_GROUP_SIZE.
6065
num_buffered_values: usize,
6166

6267
// The current (also last) value that was written and the count of how many
@@ -89,7 +94,7 @@ impl RleEncoder {
8994
RleEncoder {
9095
bit_width,
9196
bit_writer,
92-
buffered_values: [0; 8],
97+
buffered_values: [0; BIT_PACK_GROUP_SIZE],
9398
num_buffered_values: 0,
9499
current_value: 0,
95100
repeat_count: 0,
@@ -101,10 +106,10 @@ impl RleEncoder {
101106
/// Returns the maximum buffer size to encode `num_values` values with
102107
/// `bit_width`.
103108
pub fn max_buffer_size(bit_width: u8, num_values: usize) -> usize {
104-
// The maximum size occurs with the shortest possible runs of 8
105-
let num_runs = bit_util::ceil(num_values, 8);
109+
// The maximum size occurs with the shortest possible runs of BIT_PACK_GROUP_SIZE
110+
let num_runs = bit_util::ceil(num_values, BIT_PACK_GROUP_SIZE);
106111

107-
// The number of bytes in a run of 8
112+
// The number of bytes in a run of BIT_PACK_GROUP_SIZE
108113
let bytes_per_run = bit_width as usize;
109114

110115
// The maximum size if stored as shortest possible bit packed runs of 8
@@ -114,7 +119,8 @@ impl RleEncoder {
114119
let rle_len_prefix = 1;
115120

116121
// The length of an RLE run of 8
117-
let min_rle_run_size = rle_len_prefix + bit_util::ceil(bit_width as usize, 8);
122+
let min_rle_run_size =
123+
rle_len_prefix + bit_util::ceil(bit_width as usize, u8::BITS as usize);
118124

119125
// The maximum size if stored as shortest possible RLE runs of 8
120126
let rle_max_size = num_runs * min_rle_run_size;
@@ -123,7 +129,7 @@ impl RleEncoder {
123129
}
124130

125131
/// Returns `true` if the encoder is currently in RLE accumulation mode
126-
/// for the given value (i.e., `repeat_count >= 8` and `current_value == value`).
132+
/// for the given value (i.e., `repeat_count >= BIT_PACK_GROUP_SIZE` and `current_value == value`).
127133
///
128134
/// The encoder enters accumulation mode as soon as the 8th consecutive identical
129135
/// value has been seen: at that point `flush_buffered_values` has committed the
@@ -132,7 +138,7 @@ impl RleEncoder {
132138
/// repetitions in O(1) once this returns `true`.
133139
#[inline]
134140
pub fn is_accumulating(&self, value: u64) -> bool {
135-
self.repeat_count >= 8 && self.current_value == value
141+
self.repeat_count >= BIT_PACK_GROUP_SIZE && self.current_value == value
136142
}
137143

138144
/// Extends the current RLE run by `count` additional repetitions.
@@ -142,23 +148,24 @@ impl RleEncoder {
142148
/// returns `true` for the same value before calling this method.
143149
#[inline]
144150
pub fn extend_run(&mut self, count: usize) {
145-
debug_assert!(self.repeat_count >= 8);
151+
debug_assert!(self.repeat_count >= BIT_PACK_GROUP_SIZE);
146152
self.repeat_count += count;
147153
}
148154

149155
/// Encodes `value`, which must be representable with `bit_width` bits.
150156
#[inline]
151157
pub fn put(&mut self, value: u64) {
152-
// This function buffers 8 values at a time. After seeing 8 values, it
153-
// decides whether the current run should be encoded in bit-packed or RLE.
158+
// This function buffers BIT_PACK_GROUP_SIZE values at a time. After seeing that
159+
// many values, it decides whether the current run should be encoded in bit-packed
160+
// or RLE.
154161
if self.current_value == value {
155162
self.repeat_count += 1;
156-
if self.repeat_count > 8 {
163+
if self.repeat_count > BIT_PACK_GROUP_SIZE {
157164
// A continuation of last value. No need to buffer.
158165
return;
159166
}
160167
} else {
161-
if self.repeat_count >= 8 {
168+
if self.repeat_count >= BIT_PACK_GROUP_SIZE {
162169
// The current RLE run has ended and we've gathered enough. Flush first.
163170
debug_assert_eq!(self.bit_packed_count, 0);
164171
self.flush_rle_run();
@@ -169,9 +176,9 @@ impl RleEncoder {
169176

170177
self.buffered_values[self.num_buffered_values] = value;
171178
self.num_buffered_values += 1;
172-
if self.num_buffered_values == 8 {
179+
if self.num_buffered_values == BIT_PACK_GROUP_SIZE {
173180
// Buffered values are full. Flush them.
174-
debug_assert_eq!(self.bit_packed_count % 8, 0);
181+
debug_assert_eq!(self.bit_packed_count % BIT_PACK_GROUP_SIZE, 0);
175182
self.flush_buffered_values();
176183
}
177184
}
@@ -243,9 +250,9 @@ impl RleEncoder {
243250
if self.repeat_count > 0 && all_repeat {
244251
self.flush_rle_run();
245252
} else {
246-
// Buffer the last group of bit-packed values to 8 by padding with 0s.
253+
// Buffer the last group of bit-packed values to BIT_PACK_GROUP_SIZE by padding with 0s.
247254
if self.num_buffered_values > 0 {
248-
while self.num_buffered_values < 8 {
255+
while self.num_buffered_values < BIT_PACK_GROUP_SIZE {
249256
self.buffered_values[self.num_buffered_values] = 0;
250257
self.num_buffered_values += 1;
251258
}
@@ -263,7 +270,7 @@ impl RleEncoder {
263270
self.bit_writer.put_vlq_int(indicator_value as u64);
264271
self.bit_writer.put_aligned(
265272
self.current_value,
266-
bit_util::ceil(self.bit_width as usize, 8),
273+
bit_util::ceil(self.bit_width as usize, u8::BITS as usize),
267274
);
268275
self.num_buffered_values = 0;
269276
self.repeat_count = 0;
@@ -281,7 +288,7 @@ impl RleEncoder {
281288
self.num_buffered_values = 0;
282289
if update_indicator_byte {
283290
// Write the indicator byte to the reserved position in `bit_writer`
284-
let num_groups = self.bit_packed_count / 8;
291+
let num_groups = self.bit_packed_count / BIT_PACK_GROUP_SIZE;
285292
let indicator_byte = ((num_groups << 1) | 1) as u8;
286293
self.bit_writer
287294
.put_aligned_offset(indicator_byte, 1, self.indicator_byte_pos as usize);
@@ -291,19 +298,19 @@ impl RleEncoder {
291298
}
292299

293300
fn flush_buffered_values(&mut self) {
294-
if self.repeat_count >= 8 {
301+
if self.repeat_count >= BIT_PACK_GROUP_SIZE {
295302
self.num_buffered_values = 0;
296303
if self.bit_packed_count > 0 {
297304
// In this case we choose RLE encoding. Flush the current buffered values
298305
// as bit-packed encoding.
299-
debug_assert_eq!(self.bit_packed_count % 8, 0);
306+
debug_assert_eq!(self.bit_packed_count % BIT_PACK_GROUP_SIZE, 0);
300307
self.flush_bit_packed_run(true)
301308
}
302309
return;
303310
}
304311

305312
self.bit_packed_count += self.num_buffered_values;
306-
let num_groups = self.bit_packed_count / 8;
313+
let num_groups = self.bit_packed_count / BIT_PACK_GROUP_SIZE;
307314
if num_groups + 1 >= MAX_GROUPS_PER_BIT_PACKED_RUN {
308315
// We've reached the maximum value that can be hold in a single bit-packed
309316
// run.
@@ -584,10 +591,10 @@ impl RleDecoder {
584591
return Ok(false);
585592
}
586593
if indicator_value & 1 == 1 {
587-
self.bit_packed_left = ((indicator_value >> 1) * 8) as u32;
594+
self.bit_packed_left = ((indicator_value >> 1) * BIT_PACK_GROUP_SIZE as i64) as u32;
588595
} else {
589596
self.rle_left = (indicator_value >> 1) as u32;
590-
let value_width = bit_util::ceil(self.bit_width as usize, 8);
597+
let value_width = bit_util::ceil(self.bit_width as usize, u8::BITS as usize);
591598
self.current_value = bit_reader.get_aligned::<u64>(value_width);
592599
self.current_value.ok_or_else(|| {
593600
general_err!("parquet_data_error: not enough data for RLE decoding")
@@ -869,7 +876,7 @@ mod tests {
869876
&values[..],
870877
width as u8,
871878
None,
872-
2 * (1 + bit_util::ceil(width as i64, 8) as i32),
879+
2 * (1 + bit_util::ceil(width as i64, u8::BITS as i64) as i32),
873880
);
874881
}
875882

@@ -879,9 +886,12 @@ mod tests {
879886
for i in 0..101 {
880887
values.push(i % 2);
881888
}
882-
let num_groups = bit_util::ceil(100, 8) as u8;
889+
let num_groups = bit_util::ceil(100, BIT_PACK_GROUP_SIZE) as u8;
883890
expected_buffer.push((num_groups << 1) | 1);
884-
expected_buffer.resize(expected_buffer.len() + 100 / 8, 0b10101010);
891+
expected_buffer.resize(
892+
expected_buffer.len() + 100 / BIT_PACK_GROUP_SIZE,
893+
0b10101010,
894+
);
885895

886896
// For the last 4 0 and 1's, padded with 0.
887897
expected_buffer.push(0b00001010);
@@ -892,12 +902,12 @@ mod tests {
892902
1 + num_groups as i32,
893903
);
894904
for width in 2..MAX_WIDTH + 1 {
895-
let num_values = bit_util::ceil(100, 8) * 8;
905+
let num_values = bit_util::ceil(100, BIT_PACK_GROUP_SIZE) * BIT_PACK_GROUP_SIZE;
896906
validate_rle(
897907
&values,
898908
width as u8,
899909
None,
900-
1 + bit_util::ceil(width as i64 * num_values, 8) as i32,
910+
1 + bit_util::ceil(width as i64 * num_values as i64, u8::BITS as i64) as i32,
901911
);
902912
}
903913
}
@@ -1058,7 +1068,7 @@ mod tests {
10581068
let num_values = 2002;
10591069

10601070
// bit-packed header
1061-
let run_bytes = ceil(num_values * bit_width, 8) as u64;
1071+
let run_bytes = ceil(num_values * bit_width, u8::BITS as usize) as u64;
10621072
writer.put_vlq_int((run_bytes << 1) | 1);
10631073
for _ in 0..run_bytes {
10641074
writer.put_aligned(0xFF_u8, 1);

0 commit comments

Comments
 (0)