Skip to content

Commit 0e28fba

Browse files
committed
parquet: reuse level encoder allocations across page flushes
Previously, flushing a data page would `mem::replace` each level encoder with a freshly allocated one, consuming the old encoder to get its buffer. This allocated new internal `Vec`s on every page boundary. We now preserve the internal state of the encoder and reuse memory across pages. Signed-off-by: Hippolyte Barraud <hippolyte.barraud@datadoghq.com>
1 parent 10b4991 commit 0e28fba

4 files changed

Lines changed: 63 additions & 27 deletions

File tree

parquet/src/column/writer/mod.rs

Lines changed: 10 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ use crate::bloom_filter::Sbbf;
2424
use crate::file::page_index::column_index::ColumnIndexMetaData;
2525
use crate::file::page_index::offset_index::OffsetIndexMetaData;
2626
use std::collections::{BTreeSet, VecDeque};
27-
use std::mem;
2827
use std::str;
2928

3029
use crate::basic::{
@@ -1060,19 +1059,13 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
10601059
let mut buffer = vec![];
10611060

10621061
if max_rep_level > 0 {
1063-
let encoder = mem::replace(
1064-
&mut self.rep_levels_encoder,
1065-
Self::create_level_encoder(max_rep_level, &self.props),
1066-
);
1067-
buffer.extend_from_slice(&encoder.consume());
1062+
self.rep_levels_encoder
1063+
.flush_to(|data| buffer.extend_from_slice(data));
10681064
}
10691065

10701066
if max_def_level > 0 {
1071-
let encoder = mem::replace(
1072-
&mut self.def_levels_encoder,
1073-
Self::create_level_encoder(max_def_level, &self.props),
1074-
);
1075-
buffer.extend_from_slice(&encoder.consume());
1067+
self.def_levels_encoder
1068+
.flush_to(|data| buffer.extend_from_slice(data));
10761069
}
10771070

10781071
buffer.extend_from_slice(&values_data.buf);
@@ -1102,23 +1095,15 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
11021095
let mut buffer = vec![];
11031096

11041097
if max_rep_level > 0 {
1105-
let encoder = mem::replace(
1106-
&mut self.rep_levels_encoder,
1107-
Self::create_level_encoder(max_rep_level, &self.props),
1108-
);
1109-
let levels = encoder.consume();
1110-
rep_levels_byte_len = levels.len();
1111-
buffer.extend_from_slice(&levels);
1098+
self.rep_levels_encoder
1099+
.flush_to(|data| buffer.extend_from_slice(data));
1100+
rep_levels_byte_len = buffer.len();
11121101
}
11131102

11141103
if max_def_level > 0 {
1115-
let encoder = mem::replace(
1116-
&mut self.def_levels_encoder,
1117-
Self::create_level_encoder(max_def_level, &self.props),
1118-
);
1119-
let levels = encoder.consume();
1120-
def_levels_byte_len = levels.len();
1121-
buffer.extend_from_slice(&levels);
1104+
self.def_levels_encoder
1105+
.flush_to(|data| buffer.extend_from_slice(data));
1106+
def_levels_byte_len = buffer.len() - rep_levels_byte_len;
11221107
}
11231108

11241109
let uncompressed_size =

parquet/src/encodings/levels.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ impl LevelEncoder {
147147
/// Finalizes level encoder, flush all intermediate buffers and return resulting
148148
/// encoded buffer. Returned buffer is already truncated to encoded bytes only.
149149
#[inline]
150+
#[allow(unused)]
150151
pub fn consume(self) -> Vec<u8> {
151152
match self {
152153
LevelEncoder::Rle(encoder) => {
@@ -162,4 +163,34 @@ impl LevelEncoder {
162163
LevelEncoder::BitPacked(_, encoder) => encoder.consume(),
163164
}
164165
}
166+
167+
/// Flushes all intermediate buffers, passes the encoded data to `f`, then
168+
/// resets the encoder for reuse while retaining the buffer allocation.
169+
#[inline]
170+
pub fn flush_to<F, R>(&mut self, f: F) -> R
171+
where
172+
F: FnOnce(&[u8]) -> R,
173+
{
174+
let result = match self {
175+
LevelEncoder::Rle(encoder) => {
176+
let data = encoder.flush_buffer_mut();
177+
// Patch the 4-byte length header reserved at the start of the buffer
178+
let encoded_len = (data.len() - mem::size_of::<i32>()) as i32;
179+
data[..4].copy_from_slice(&encoded_len.to_le_bytes());
180+
f(data)
181+
}
182+
LevelEncoder::RleV2(encoder) => f(encoder.flush_buffer()),
183+
LevelEncoder::BitPacked(_, encoder) => f(encoder.flush_buffer()),
184+
};
185+
match self {
186+
LevelEncoder::Rle(encoder) => {
187+
encoder.clear();
188+
// Re-reserve the 4-byte length header for the next page
189+
encoder.skip(mem::size_of::<i32>());
190+
}
191+
LevelEncoder::RleV2(encoder) => encoder.clear(),
192+
LevelEncoder::BitPacked(_, encoder) => encoder.clear(),
193+
}
194+
result
195+
}
165196
}

parquet/src/encodings/rle.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,16 +177,22 @@ impl RleEncoder {
177177
/// Borrow equivalent of the `consume` method.
178178
/// Call `clear()` after invoking this method.
179179
#[inline]
180-
#[allow(unused)]
181180
pub fn flush_buffer(&mut self) -> &[u8] {
182181
self.flush();
183182
self.bit_writer.flush_buffer()
184183
}
185184

185+
/// Like `flush_buffer`, but returns mutable access to the internal buffer.
186+
/// Call `clear()` after invoking this method.
187+
#[inline]
188+
pub fn flush_buffer_mut(&mut self) -> &mut [u8] {
189+
self.flush();
190+
self.bit_writer.flush_buffer_mut()
191+
}
192+
186193
/// Clears the internal state so this encoder can be reused (e.g., after becoming
187194
/// full).
188195
#[inline]
189-
#[allow(unused)]
190196
pub fn clear(&mut self) {
191197
self.bit_writer.clear();
192198
self.num_buffered_values = 0;
@@ -196,6 +202,13 @@ impl RleEncoder {
196202
self.indicator_byte_pos = -1;
197203
}
198204

205+
/// Advances the buffer by `num_bytes` zero bytes, delegating to the
206+
/// underlying [`BitWriter::skip`].
207+
#[inline]
208+
pub fn skip(&mut self, num_bytes: usize) {
209+
self.bit_writer.skip(num_bytes);
210+
}
211+
199212
/// Flushes all remaining values and return the final byte buffer maintained by the
200213
/// internal writer.
201214
#[inline]

parquet/src/util/bit_util.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,13 @@ impl BitWriter {
219219
self.buffer()
220220
}
221221

222+
/// Like `flush_buffer`, but returns mutable access to the buffer.
223+
#[inline]
224+
pub fn flush_buffer_mut(&mut self) -> &mut [u8] {
225+
self.flush();
226+
&mut self.buffer
227+
}
228+
222229
/// Clears the internal state so the buffer can be reused.
223230
#[inline]
224231
pub fn clear(&mut self) {

0 commit comments

Comments
 (0)