Skip to content

Commit 4a50b88

Browse files
committed
[Variant] Optimizing the ObjectBuilder::finish using iterator
1 parent 1d9afbc commit 4a50b88

File tree

1 file changed

+66
-100
lines changed

1 file changed

+66
-100
lines changed

parquet-variant/src/builder.rs

Lines changed: 66 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -64,19 +64,55 @@ fn write_offset(buf: &mut Vec<u8>, value: usize, nbytes: u8) {
6464
buf.extend_from_slice(&bytes[..nbytes as usize]);
6565
}
6666

67-
/// Write little-endian integer to buffer at a specific position
68-
fn write_offset_at_pos(buf: &mut [u8], start_pos: usize, value: usize, nbytes: u8) {
69-
let bytes = value.to_le_bytes();
70-
buf[start_pos..start_pos + nbytes as usize].copy_from_slice(&bytes[..nbytes as usize]);
71-
}
72-
7367
/// Append `value_size` bytes of given `value` into `dest`.
7468
fn append_packed_u32(dest: &mut Vec<u8>, value: u32, value_size: usize) {
7569
let n = dest.len() + value_size;
7670
dest.extend(value.to_le_bytes());
7771
dest.truncate(n);
7872
}
7973

74+
/// An iterator that yields the bytes of a packed u32 iterator.
75+
/// Will yield the first `packed_bytes` bytes of each item in the iterator.
76+
struct PackedU32Iterator<T: Iterator<Item = [u8; 4]>> {
77+
packed_bytes: usize,
78+
iterator: T,
79+
current_item: [u8; 4],
80+
current_byte: usize, // 0..3
81+
}
82+
83+
impl<T: Iterator<Item = [u8; 4]>> PackedU32Iterator<T> {
84+
fn new(packed_bytes: usize, iterator: T) -> Self {
85+
// eliminate corner cases in `next` by initializing with a fake already-consumed "first" item
86+
Self {
87+
packed_bytes,
88+
iterator,
89+
current_item: [0; 4],
90+
current_byte: packed_bytes,
91+
}
92+
}
93+
}
94+
95+
impl<T: Iterator<Item = [u8; 4]>> Iterator for PackedU32Iterator<T> {
96+
type Item = u8;
97+
98+
fn next(&mut self) -> Option<u8> {
99+
if self.current_byte >= self.packed_bytes {
100+
self.current_item = self.iterator.next()?;
101+
self.current_byte = 0;
102+
}
103+
104+
let rval = self.current_item[self.current_byte];
105+
self.current_byte += 1;
106+
Some(rval)
107+
}
108+
109+
fn size_hint(&self) -> (usize, Option<usize>) {
110+
let lower = (self.packed_bytes - self.current_byte)
111+
+ self.packed_bytes * self.iterator.size_hint().0;
112+
(lower, None)
113+
}
114+
}
115+
80116
/// Wrapper around a `Vec<u8>` that provides methods for appending
81117
/// primitive values, variant types, and metadata.
82118
///
@@ -368,63 +404,6 @@ impl ValueBuffer {
368404

369405
Ok(())
370406
}
371-
372-
/// Writes out the header byte for a variant object or list, from the starting position
373-
/// of the buffer, will return the position after this write
374-
fn append_header_start_from_buf_pos(
375-
&mut self,
376-
start_pos: usize, // the start position where the header will be inserted
377-
header_byte: u8,
378-
is_large: bool,
379-
num_fields: usize,
380-
) -> usize {
381-
let buffer = self.inner_mut();
382-
383-
// Write header at the original start position
384-
let mut header_pos = start_pos;
385-
386-
// Write header byte
387-
buffer[header_pos] = header_byte;
388-
header_pos += 1;
389-
390-
// Write number of fields
391-
if is_large {
392-
buffer[header_pos..header_pos + 4].copy_from_slice(&(num_fields as u32).to_le_bytes());
393-
header_pos += 4;
394-
} else {
395-
buffer[header_pos] = num_fields as u8;
396-
header_pos += 1;
397-
}
398-
399-
header_pos
400-
}
401-
402-
/// Writes out the offsets for an array of offsets, including the final offset (data size).
403-
/// from the starting position of the buffer, will return the position after this write
404-
fn append_offset_array_start_from_buf_pos(
405-
&mut self,
406-
start_pos: usize,
407-
offsets: impl IntoIterator<Item = usize>,
408-
data_size: Option<usize>,
409-
nbytes: u8,
410-
) -> usize {
411-
let buf = self.inner_mut();
412-
413-
let mut current_pos = start_pos;
414-
for relative_offset in offsets {
415-
write_offset_at_pos(buf, current_pos, relative_offset, nbytes);
416-
current_pos += nbytes as usize;
417-
}
418-
419-
// Write data_size
420-
if let Some(data_size) = data_size {
421-
// Write data_size at the end of the offsets
422-
write_offset_at_pos(buf, current_pos, data_size, nbytes);
423-
current_pos += nbytes as usize;
424-
}
425-
426-
current_pos
427-
}
428407
}
429408

430409
/// Builder for constructing metadata for [`Variant`] values.
@@ -1462,49 +1441,36 @@ impl<'a> ObjectBuilder<'a> {
14621441
let num_fields = self.fields.len();
14631442
let is_large = num_fields > u8::MAX as usize;
14641443

1465-
let header_size = 1 + // header byte
1466-
(if is_large { 4 } else { 1 }) + // num_fields
1467-
(num_fields * id_size as usize) + // field IDs
1468-
((num_fields + 1) * offset_size as usize); // field offsets + data_size
1444+
let num_fileds_size = if is_large { 4 } else { 1 }; // is_large: 4 bytes, else 1 byte.
14691445

1470-
let starting_offset = self.parent_value_offset_base;
1446+
let num_fields_bytes = num_fields.to_le_bytes();
1447+
let num_elements_bytes = num_fields_bytes.iter().take(num_fileds_size).copied();
14711448

1472-
// Shift existing data to make room for the header
1473-
let buffer = parent_buffer.inner_mut();
1474-
buffer.splice(
1475-
starting_offset..starting_offset,
1476-
std::iter::repeat_n(0u8, header_size),
1449+
let fields = PackedU32Iterator::new(
1450+
id_size as usize,
1451+
self.fields.keys().map(|offset| offset.to_le_bytes()),
1452+
);
1453+
let offsets = PackedU32Iterator::new(
1454+
offset_size as usize,
1455+
self.fields
1456+
.values()
1457+
.map(|offset| (*offset as u32).to_le_bytes()),
14771458
);
14781459

1479-
// Write header at the original start position
1480-
let mut header_pos = starting_offset;
1481-
1482-
// Write header byte
1460+
let data_size_bytes = (data_size as u32).to_le_bytes();
1461+
let data_size_bytes_iter = data_size_bytes.iter().take(offset_size as usize).copied();
14831462
let header = object_header(is_large, id_size, offset_size);
1463+
let bytess_to_splice = std::iter::once(header)
1464+
.chain(num_elements_bytes)
1465+
.chain(fields)
1466+
.chain(offsets)
1467+
.chain(data_size_bytes_iter);
1468+
1469+
let starting_offset = self.parent_value_offset_base;
1470+
// Shift existing data to make room for the header
1471+
let buffer = parent_buffer.inner_mut();
1472+
buffer.splice(starting_offset..starting_offset, bytess_to_splice);
14841473

1485-
header_pos = self
1486-
.parent_state
1487-
.buffer()
1488-
.append_header_start_from_buf_pos(header_pos, header, is_large, num_fields);
1489-
1490-
header_pos = self
1491-
.parent_state
1492-
.buffer()
1493-
.append_offset_array_start_from_buf_pos(
1494-
header_pos,
1495-
self.fields.keys().copied().map(|id| id as usize),
1496-
None,
1497-
id_size,
1498-
);
1499-
1500-
self.parent_state
1501-
.buffer()
1502-
.append_offset_array_start_from_buf_pos(
1503-
header_pos,
1504-
self.fields.values().copied(),
1505-
Some(data_size),
1506-
offset_size,
1507-
);
15081474
self.parent_state.finish(starting_offset);
15091475

15101476
// Mark that this object has been finished

0 commit comments

Comments
 (0)