Skip to content

Commit a05129a

Browse files
HippoBaroalamb
andauthored
feat(parquet): stream-encode definition/repetition levels incrementally (#9447)
# Which issue does this PR close? - Closes #9446. - closes #9636 # Rationale for this change When writing a Parquet column with very sparse data, `GenericColumnWriter` accumulates unbounded memory for definition and repetition levels. The raw `i16` values are appended into `Vec<i16>` sinks on every `write_batch` call and only RLE-encoded in bulk when a data page is flushed. For a column that is almost entirely nulls, the actual RLE-encoded output can be tiny, yet the intermediate buffer grows linearly with the number of rows. # What changes are included in this PR? Replace the two raw-level `Vec<i16>` sinks (`def_levels_sink` / `rep_levels_sink`) with streaming `LevelEncoder` fields (`def_levels_encoder` / `rep_levels_encoder`). Behavior is the same, but we keep running RLE-encoded state rather than the full list of rows in memory. Existing logic is reused. # Are these changes tested? Yes, all tests passing. Benchmarks show no regression. `list_primitive` benches improved by 3-5%: ``` Benchmarking list_primitive/default: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 6.1s, enable flat sampling, or reduce sample count to 60. list_primitive/default time: [1.2109 ms 1.2171 ms 1.2248 ms] thrpt: [1.6999 GiB/s 1.7105 GiB/s 1.7194 GiB/s] change: time: [−3.7197% −2.8848% −2.0036%] (p = 0.00 < 0.05) thrpt: [+2.0445% +2.9705% +3.8634%] Performance has improved. Found 4 outliers among 100 measurements (4.00%) 3 (3.00%) high mild 1 (1.00%) high severe Benchmarking list_primitive/bloom_filter: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 7.5s, enable flat sampling, or reduce sample count to 50. list_primitive/bloom_filter time: [1.4405 ms 1.4810 ms 1.5292 ms] thrpt: [1.3615 GiB/s 1.4058 GiB/s 1.4452 GiB/s] change: time: [−6.4332% −4.7568% −2.9048%] (p = 0.00 < 0.05) thrpt: [+2.9917% +4.9944% +6.8755%] Performance has improved. Found 5 outliers among 100 measurements (5.00%) 2 (2.00%) high mild 3 (3.00%) high severe Benchmarking list_primitive/parquet_2: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 6.3s, enable flat sampling, or reduce sample count to 60. list_primitive/parquet_2 time: [1.2271 ms 1.2311 ms 1.2362 ms] thrpt: [1.6841 GiB/s 1.6911 GiB/s 1.6966 GiB/s] change: time: [−5.8536% −4.9672% −4.1905%] (p = 0.00 < 0.05) thrpt: [+4.3738% +5.2269% +6.2175%] Performance has improved. Found 5 outliers among 100 measurements (5.00%) 2 (2.00%) high mild 3 (3.00%) high severe list_primitive/zstd time: [2.0056 ms 2.0148 ms 2.0262 ms] thrpt: [1.0275 GiB/s 1.0333 GiB/s 1.0381 GiB/s] change: time: [−4.7073% −3.6719% −2.6698%] (p = 0.00 < 0.05) thrpt: [+2.7431% +3.8118% +4.9398%] Performance has improved. Found 12 outliers among 100 measurements (12.00%) 2 (2.00%) high mild 10 (10.00%) high severe list_primitive/zstd_parquet_2 time: [2.0455 ms 2.0730 ms 2.1120 ms] thrpt: [1009.4 MiB/s 1.0043 GiB/s 1.0178 GiB/s] change: time: [−5.8626% −3.7672% −1.4196%] (p = 0.00 < 0.05) thrpt: [+1.4401% +3.9146% +6.2277%] Performance has improved. Found 7 outliers among 100 measurements (7.00%) 2 (2.00%) high mild 5 (5.00%) high severe Benchmarking list_primitive_non_null/default: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 6.6s, enable flat sampling, or reduce sample count to 60. list_primitive_non_null/default time: [1.3199 ms 1.3333 ms 1.3504 ms] thrpt: [1.5384 GiB/s 1.5581 GiB/s 1.5740 GiB/s] change: time: [−4.1662% −2.3491% −0.7148%] (p = 0.01 < 0.05) thrpt: [+0.7200% +2.4056% +4.3473%] Change within noise threshold. Found 6 outliers among 100 measurements (6.00%) 3 (3.00%) high mild 3 (3.00%) high severe Benchmarking list_primitive_non_null/bloom_filter: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 8.4s, enable flat sampling, or reduce sample count to 50. list_primitive_non_null/bloom_filter time: [1.6567 ms 1.6668 ms 1.6805 ms] thrpt: [1.2362 GiB/s 1.2464 GiB/s 1.2540 GiB/s] change: time: [−2.7884% −1.3493% +0.2820%] (p = 0.07 > 0.05) thrpt: [−0.2812% +1.3677% +2.8684%] No change in performance detected. Found 4 outliers among 100 measurements (4.00%) 1 (1.00%) high mild 3 (3.00%) high severe Benchmarking list_primitive_non_null/parquet_2: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 7.2s, enable flat sampling, or reduce sample count to 50. list_primitive_non_null/parquet_2 time: [1.4279 ms 1.4409 ms 1.4551 ms] thrpt: [1.4277 GiB/s 1.4418 GiB/s 1.4550 GiB/s] change: time: [−2.0598% −0.9952% −0.1318%] (p = 0.04 < 0.05) thrpt: [+0.1319% +1.0052% +2.1032%] Change within noise threshold. Found 3 outliers among 100 measurements (3.00%) 2 (2.00%) high mild 1 (1.00%) high severe list_primitive_non_null/zstd time: [2.6966 ms 2.7358 ms 2.7994 ms] thrpt: [759.93 MiB/s 777.60 MiB/s 788.89 MiB/s] change: time: [−3.8379% −2.1418% +0.0785%] (p = 0.03 < 0.05) thrpt: [−0.0784% +2.1887% +3.9911%] Change within noise threshold. Found 7 outliers among 100 measurements (7.00%) 3 (3.00%) high mild 4 (4.00%) high severe list_primitive_non_null/zstd_parquet_2 time: [2.7684 ms 2.7861 ms 2.8099 ms] thrpt: [757.07 MiB/s 763.55 MiB/s 768.44 MiB/s] change: time: [−6.4460% −4.1387% −2.1474%] (p = 0.00 < 0.05) thrpt: [+2.1946% +4.3174% +6.8901%] Performance has improved. ``` # Are there any user-facing changes? None. Some internal symbols are now unused. I added some `#[allow(dead_code)]` statements since these were experimental-visible and might be externally relied on. --------- Signed-off-by: Hippolyte Barraud <hippolyte.barraud@datadoghq.com> Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent 1f07e54 commit a05129a

File tree

6 files changed

+100
-90
lines changed

6 files changed

+100
-90
lines changed

parquet/src/column/reader.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1401,11 +1401,11 @@ mod tests {
14011401
// Helper: build a DataPage v2 for this list column.
14021402
let make_v2_page =
14031403
|rep_levels: &[i16], def_levels: &[i16], values: &[i32], num_rows: u32| -> Page {
1404-
let mut rep_enc = LevelEncoder::v2(max_rep_level, rep_levels.len());
1404+
let mut rep_enc = LevelEncoder::v2_streaming(max_rep_level);
14051405
rep_enc.put(rep_levels);
14061406
let rep_bytes = rep_enc.consume();
14071407

1408-
let mut def_enc = LevelEncoder::v2(max_def_level, def_levels.len());
1408+
let mut def_enc = LevelEncoder::v2_streaming(max_def_level);
14091409
def_enc.put(def_levels);
14101410
let def_bytes = def_enc.consume();
14111411

parquet/src/column/writer/mod.rs

Lines changed: 25 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -351,9 +351,9 @@ pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
351351
/// but we use a BTreeSet so that the output is deterministic
352352
encodings: BTreeSet<Encoding>,
353353
encoding_stats: Vec<PageEncodingStats>,
354-
// Reused buffers
355-
def_levels_sink: Vec<i16>,
356-
rep_levels_sink: Vec<i16>,
354+
// Streaming level encoders for definition/repetition levels.
355+
def_levels_encoder: LevelEncoder,
356+
rep_levels_encoder: LevelEncoder,
357357
data_pages: VecDeque<CompressedPage>,
358358
// column index and offset index
359359
column_index_builder: ColumnIndexBuilder,
@@ -411,15 +411,15 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
411411
};
412412

413413
Self {
414+
def_levels_encoder: Self::create_level_encoder(descr.max_def_level(), &props),
415+
rep_levels_encoder: Self::create_level_encoder(descr.max_rep_level(), &props),
414416
descr,
415417
props,
416418
statistics_enabled,
417419
page_writer,
418420
codec,
419421
compressor,
420422
encoder,
421-
def_levels_sink: vec![],
422-
rep_levels_sink: vec![],
423423
data_pages: VecDeque::new(),
424424
page_metrics,
425425
column_metrics,
@@ -647,6 +647,14 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
647647
})
648648
}
649649

650+
/// Creates a new streaming level encoder appropriate for the writer version.
651+
fn create_level_encoder(max_level: i16, props: &WriterProperties) -> LevelEncoder {
652+
match props.writer_version() {
653+
WriterVersion::PARQUET_1_0 => LevelEncoder::v1_streaming(Encoding::RLE, max_level),
654+
WriterVersion::PARQUET_2_0 => LevelEncoder::v2_streaming(max_level),
655+
}
656+
}
657+
650658
/// Writes mini batch of values, definition and repetition levels.
651659
/// This allows fine-grained processing of values and maintaining a reasonable
652660
/// page size.
@@ -677,7 +685,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
677685
// Update histogram
678686
self.page_metrics.update_definition_level_histogram(levels);
679687

680-
self.def_levels_sink.extend_from_slice(levels);
688+
self.def_levels_encoder.put(levels);
681689
values_to_write
682690
} else {
683691
num_levels
@@ -708,7 +716,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
708716
// Update histogram
709717
self.page_metrics.update_repetition_level_histogram(levels);
710718

711-
self.rep_levels_sink.extend_from_slice(levels);
719+
self.rep_levels_encoder.put(levels);
712720
} else {
713721
// Each value is exactly one row.
714722
// Equals to the number of values, we count nulls as well.
@@ -1060,23 +1068,13 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
10601068
let mut buffer = vec![];
10611069

10621070
if max_rep_level > 0 {
1063-
buffer.extend_from_slice(
1064-
&self.encode_levels_v1(
1065-
Encoding::RLE,
1066-
&self.rep_levels_sink[..],
1067-
max_rep_level,
1068-
)[..],
1069-
);
1071+
self.rep_levels_encoder
1072+
.flush_to(|data| buffer.extend_from_slice(data));
10701073
}
10711074

10721075
if max_def_level > 0 {
1073-
buffer.extend_from_slice(
1074-
&self.encode_levels_v1(
1075-
Encoding::RLE,
1076-
&self.def_levels_sink[..],
1077-
max_def_level,
1078-
)[..],
1079-
);
1076+
self.def_levels_encoder
1077+
.flush_to(|data| buffer.extend_from_slice(data));
10801078
}
10811079

10821080
buffer.extend_from_slice(&values_data.buf);
@@ -1106,15 +1104,15 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
11061104
let mut buffer = vec![];
11071105

11081106
if max_rep_level > 0 {
1109-
let levels = self.encode_levels_v2(&self.rep_levels_sink[..], max_rep_level);
1110-
rep_levels_byte_len = levels.len();
1111-
buffer.extend_from_slice(&levels[..]);
1107+
self.rep_levels_encoder
1108+
.flush_to(|data| buffer.extend_from_slice(data));
1109+
rep_levels_byte_len = buffer.len();
11121110
}
11131111

11141112
if max_def_level > 0 {
1115-
let levels = self.encode_levels_v2(&self.def_levels_sink[..], max_def_level);
1116-
def_levels_byte_len = levels.len();
1117-
buffer.extend_from_slice(&levels[..]);
1113+
self.def_levels_encoder
1114+
.flush_to(|data| buffer.extend_from_slice(data));
1115+
def_levels_byte_len = buffer.len() - rep_levels_byte_len;
11181116
}
11191117

11201118
let uncompressed_size =
@@ -1164,10 +1162,6 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
11641162

11651163
// Update total number of rows.
11661164
self.column_metrics.total_rows_written += self.page_metrics.num_buffered_rows as u64;
1167-
1168-
// Reset state.
1169-
self.rep_levels_sink.clear();
1170-
self.def_levels_sink.clear();
11711165
self.page_metrics.new_page();
11721166

11731167
Ok(())
@@ -1244,23 +1238,6 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
12441238
Ok(metadata)
12451239
}
12461240

1247-
/// Encodes definition or repetition levels for Data Page v1.
1248-
#[inline]
1249-
fn encode_levels_v1(&self, encoding: Encoding, levels: &[i16], max_level: i16) -> Vec<u8> {
1250-
let mut encoder = LevelEncoder::v1(encoding, max_level, levels.len());
1251-
encoder.put(levels);
1252-
encoder.consume()
1253-
}
1254-
1255-
/// Encodes definition or repetition levels for Data Page v2.
1256-
/// Encoding is always RLE.
1257-
#[inline]
1258-
fn encode_levels_v2(&self, levels: &[i16], max_level: i16) -> Vec<u8> {
1259-
let mut encoder = LevelEncoder::v2(max_level, levels.len());
1260-
encoder.put(levels);
1261-
encoder.consume()
1262-
}
1263-
12641241
/// Writes compressed data page into underlying sink and updates global metrics.
12651242
#[inline]
12661243
fn write_data_page(&mut self, page: CompressedPage) -> Result<()> {

parquet/src/encodings/levels.rs

Lines changed: 50 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,7 @@ use super::rle::RleEncoder;
2121

2222
use crate::basic::Encoding;
2323
use crate::data_type::AsBytes;
24-
use crate::util::bit_util::{BitWriter, ceil, num_required_bits};
25-
26-
/// Computes max buffer size for level encoder/decoder based on encoding, max
27-
/// repetition/definition level and number of total buffered values (includes null
28-
/// values).
29-
#[inline]
30-
pub fn max_buffer_size(encoding: Encoding, max_level: i16, num_buffered_values: usize) -> usize {
31-
let bit_width = num_required_bits(max_level as u64);
32-
match encoding {
33-
Encoding::RLE => RleEncoder::max_buffer_size(bit_width, num_buffered_values),
34-
#[allow(deprecated)]
35-
Encoding::BIT_PACKED => ceil(num_buffered_values * bit_width as usize, 8),
36-
_ => panic!("Unsupported encoding type {encoding}"),
37-
}
38-
}
24+
use crate::util::bit_util::{BitWriter, num_required_bits};
3925

4026
/// Encoder for definition/repetition levels.
4127
/// Currently only supports Rle and BitPacked (dev/null) encoding, including v2.
@@ -46,46 +32,44 @@ pub enum LevelEncoder {
4632
}
4733

4834
impl LevelEncoder {
49-
/// Creates new level encoder based on encoding, max level and underlying byte buffer.
50-
/// For bit packed encoding it is assumed that buffer is already allocated with
51-
/// `levels::max_buffer_size` method.
52-
///
53-
/// Used to encode levels for Data Page v1.
35+
/// Creates a new streaming level encoder for Data Page v1.
5436
///
55-
/// Panics, if encoding is not supported.
56-
pub fn v1(encoding: Encoding, max_level: i16, capacity: usize) -> Self {
57-
let capacity_bytes = max_buffer_size(encoding, max_level, capacity);
58-
let mut buffer = Vec::with_capacity(capacity_bytes);
37+
/// Unlike [`v1`](Self::v1), this does not require knowing the number of values
38+
/// upfront, making it suitable for incremental encoding where levels are fed in
39+
/// as they arrive via [`put`](Self::put).
40+
pub fn v1_streaming(encoding: Encoding, max_level: i16) -> Self {
5941
let bit_width = num_required_bits(max_level as u64);
6042
match encoding {
6143
Encoding::RLE => {
6244
// Reserve space for length header
63-
buffer.extend_from_slice(&[0; 4]);
45+
let buffer = vec![0u8; 4];
6446
LevelEncoder::Rle(RleEncoder::new_from_buf(bit_width, buffer))
6547
}
6648
#[allow(deprecated)]
6749
Encoding::BIT_PACKED => {
68-
// Here we set full byte buffer without adjusting for num_buffered_values,
69-
// because byte buffer will already be allocated with size from
70-
// `max_buffer_size()` method.
71-
LevelEncoder::BitPacked(bit_width, BitWriter::new_from_buf(buffer))
50+
LevelEncoder::BitPacked(bit_width, BitWriter::new_from_buf(Vec::new()))
7251
}
7352
_ => panic!("Unsupported encoding type {encoding}"),
7453
}
7554
}
7655

77-
/// Creates new level encoder based on RLE encoding. Used to encode Data Page v2
78-
/// repetition and definition levels.
79-
pub fn v2(max_level: i16, capacity: usize) -> Self {
80-
let capacity_bytes = max_buffer_size(Encoding::RLE, max_level, capacity);
81-
let buffer = Vec::with_capacity(capacity_bytes);
56+
/// Creates a new streaming RLE level encoder for Data Page v2.
57+
///
58+
/// Unlike [`v2`](Self::v2), this does not require knowing the number of values
59+
/// upfront, making it suitable for incremental encoding where levels are fed in
60+
/// as they arrive via [`put`](Self::put).
61+
pub fn v2_streaming(max_level: i16) -> Self {
8262
let bit_width = num_required_bits(max_level as u64);
83-
LevelEncoder::RleV2(RleEncoder::new_from_buf(bit_width, buffer))
63+
LevelEncoder::RleV2(RleEncoder::new_from_buf(bit_width, Vec::new()))
8464
}
8565

8666
/// Put/encode levels vector into this level encoder.
8767
/// Returns number of encoded values that are less than or equal to length of the
8868
/// input buffer.
69+
///
70+
/// This method does **not** flush the underlying encoder, so it can be called
71+
/// incrementally across multiple batches without forcing run boundaries.
72+
/// The encoder is flushed automatically when [`consume`](Self::consume) is called.
8973
#[inline]
9074
pub fn put(&mut self, buffer: &[i16]) -> usize {
9175
let mut num_encoded = 0;
@@ -95,14 +79,12 @@ impl LevelEncoder {
9579
encoder.put(*value as u64);
9680
num_encoded += 1;
9781
}
98-
encoder.flush();
9982
}
10083
LevelEncoder::BitPacked(bit_width, ref mut encoder) => {
10184
for value in buffer {
10285
encoder.put_value(*value as u64, bit_width as usize);
10386
num_encoded += 1;
10487
}
105-
encoder.flush();
10688
}
10789
}
10890
num_encoded
@@ -111,6 +93,7 @@ impl LevelEncoder {
11193
/// Finalizes level encoder, flush all intermediate buffers and return resulting
11294
/// encoded buffer. Returned buffer is already truncated to encoded bytes only.
11395
#[inline]
96+
#[allow(unused)]
11497
pub fn consume(self) -> Vec<u8> {
11598
match self {
11699
LevelEncoder::Rle(encoder) => {
@@ -126,4 +109,34 @@ impl LevelEncoder {
126109
LevelEncoder::BitPacked(_, encoder) => encoder.consume(),
127110
}
128111
}
112+
113+
/// Flushes all intermediate buffers, passes the encoded data to `f`, then
114+
/// resets the encoder for reuse while retaining the buffer allocation.
115+
#[inline]
116+
pub fn flush_to<F, R>(&mut self, f: F) -> R
117+
where
118+
F: FnOnce(&[u8]) -> R,
119+
{
120+
let result = match self {
121+
LevelEncoder::Rle(encoder) => {
122+
let data = encoder.flush_buffer_mut();
123+
// Patch the 4-byte length header reserved at the start of the buffer
124+
let encoded_len = (data.len() - mem::size_of::<i32>()) as i32;
125+
data[..4].copy_from_slice(&encoded_len.to_le_bytes());
126+
f(data)
127+
}
128+
LevelEncoder::RleV2(encoder) => f(encoder.flush_buffer()),
129+
LevelEncoder::BitPacked(_, encoder) => f(encoder.flush_buffer()),
130+
};
131+
match self {
132+
LevelEncoder::Rle(encoder) => {
133+
encoder.clear();
134+
// Re-reserve the 4-byte length header for the next page
135+
encoder.skip(mem::size_of::<i32>());
136+
}
137+
LevelEncoder::RleV2(encoder) => encoder.clear(),
138+
LevelEncoder::BitPacked(_, encoder) => encoder.clear(),
139+
}
140+
result
141+
}
129142
}

parquet/src/encodings/rle.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,16 +177,22 @@ impl RleEncoder {
177177
/// Borrow equivalent of the `consume` method.
178178
/// Call `clear()` after invoking this method.
179179
#[inline]
180-
#[allow(unused)]
181180
pub fn flush_buffer(&mut self) -> &[u8] {
182181
self.flush();
183182
self.bit_writer.flush_buffer()
184183
}
185184

185+
/// Like `flush_buffer`, but returns mutable access to the internal buffer.
186+
/// Call `clear()` after invoking this method.
187+
#[inline]
188+
pub fn flush_buffer_mut(&mut self) -> &mut [u8] {
189+
self.flush();
190+
self.bit_writer.flush_buffer_mut()
191+
}
192+
186193
/// Clears the internal state so this encoder can be reused (e.g., after becoming
187194
/// full).
188195
#[inline]
189-
#[allow(unused)]
190196
pub fn clear(&mut self) {
191197
self.bit_writer.clear();
192198
self.num_buffered_values = 0;
@@ -196,6 +202,13 @@ impl RleEncoder {
196202
self.indicator_byte_pos = -1;
197203
}
198204

205+
/// Advances the buffer by `num_bytes` zero bytes, delegating to the
206+
/// underlying [`BitWriter::skip`].
207+
#[inline]
208+
pub fn skip(&mut self, num_bytes: usize) {
209+
self.bit_writer.skip(num_bytes);
210+
}
211+
199212
/// Flushes all remaining values and return the final byte buffer maintained by the
200213
/// internal writer.
201214
#[inline]

parquet/src/util/bit_util.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,13 @@ impl BitWriter {
219219
self.buffer()
220220
}
221221

222+
/// Like `flush_buffer`, but returns mutable access to the buffer.
223+
#[inline]
224+
pub fn flush_buffer_mut(&mut self) -> &mut [u8] {
225+
self.flush();
226+
&mut self.buffer
227+
}
228+
222229
/// Clears the internal state so the buffer can be reused.
223230
#[inline]
224231
pub fn clear(&mut self) {

parquet/src/util/test_common/page_util.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ impl DataPageBuilderImpl {
7575
if max_level <= 0 {
7676
return 0;
7777
}
78-
let mut level_encoder = LevelEncoder::v1(Encoding::RLE, max_level, levels.len());
78+
let mut level_encoder = LevelEncoder::v1_streaming(Encoding::RLE, max_level);
7979
level_encoder.put(levels);
8080
let encoded_levels = level_encoder.consume();
8181
// Actual encoded bytes (without length offset)

0 commit comments

Comments
 (0)