Skip to content

Commit b2f9e42

Browse files
klion26Dandandan
authored andcommitted
[Variant] Optimize the object header generation logic in ObjectBuilder::finish (apache#8031)
# Which issue does this PR close? This pr wants to optimize the logic of `ObjectBuilder::finish` - Closes apache#7978 . # Rationale for this change This pr wants to optimize the logic of `ObjectBuilder::finish` # What changes are included in this PR? This PR wants to optimize `ObjectBuilder::finish` with packedu3 iterator # Are these changes tested? This pr was covered by existing test # Are there any user-facing changes? No
1 parent 4eb65a0 commit b2f9e42

8 files changed

Lines changed: 236 additions & 111 deletions

File tree

parquet-variant-compute/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ parquet-variant = { workspace = true }
3737
parquet-variant-json = { workspace = true }
3838
chrono = { workspace = true }
3939
uuid = { version = "1.18.0", features = ["v4"]}
40+
serde_json = "1.0"
4041

4142
[lib]
4243
name = "parquet_variant_compute"

parquet-variant-compute/benches/variant_kernels.rs

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,15 @@ use parquet_variant::{EMPTY_VARIANT_METADATA_BYTES, Variant, VariantBuilder};
2323
use parquet_variant_compute::{
2424
GetOptions, VariantArray, VariantArrayBuilder, json_to_variant, variant_get,
2525
};
26+
use parquet_variant_json::append_json;
2627
use rand::Rng;
2728
use rand::SeedableRng;
2829
use rand::distr::Alphanumeric;
2930
use rand::rngs::StdRng;
31+
use serde_json::Value;
3032
use std::fmt::Write;
3133
use std::sync::Arc;
34+
3235
fn benchmark_batch_json_string_to_variant(c: &mut Criterion) {
3336
let input_array = StringArray::from_iter_values(json_repeated_struct(8000));
3437
let array_ref: ArrayRef = Arc::new(input_array);
@@ -66,6 +69,58 @@ fn benchmark_batch_json_string_to_variant(c: &mut Criterion) {
6669
});
6770
});
6871

72+
let input_array = StringArray::from_iter_values(random_structure(8000, 200));
73+
let total_input_bytes = input_array
74+
.iter()
75+
.flatten() // filter None
76+
.map(|v| v.len())
77+
.sum::<usize>();
78+
let id = format!(
79+
"batch_json_string_to_variant object - 1 depth(200 fields) random_json({} bytes per document)",
80+
total_input_bytes / input_array.len()
81+
);
82+
let array_ref: ArrayRef = Arc::new(input_array);
83+
let string_array = array_ref.as_any().downcast_ref::<StringArray>().unwrap();
84+
let mut json_array: Vec<Value> = Vec::with_capacity(string_array.len());
85+
for i in 0..string_array.len() {
86+
json_array.push(serde_json::from_str(string_array.value(i)).unwrap());
87+
}
88+
c.bench_function(&id, |b| {
89+
b.iter(|| {
90+
let mut variant_array_builder = VariantArrayBuilder::new(string_array.len());
91+
for json in &json_array {
92+
append_json(json, &mut variant_array_builder).unwrap();
93+
}
94+
let _ = variant_array_builder.build();
95+
});
96+
});
97+
98+
let input_array = StringArray::from_iter_values(random_structure(8000, 100));
99+
let total_input_bytes = input_array
100+
.iter()
101+
.flatten() // filter None
102+
.map(|v| v.len())
103+
.sum::<usize>();
104+
let id = format!(
105+
"batch_json_string_to_variant object - 1 depth(100 fields) random_json({} bytes per document)",
106+
total_input_bytes / input_array.len()
107+
);
108+
let array_ref: ArrayRef = Arc::new(input_array);
109+
let string_array = array_ref.as_any().downcast_ref::<StringArray>().unwrap();
110+
let mut json_array: Vec<Value> = Vec::with_capacity(string_array.len());
111+
for i in 0..string_array.len() {
112+
json_array.push(serde_json::from_str(string_array.value(i)).unwrap());
113+
}
114+
c.bench_function(&id, |b| {
115+
b.iter(|| {
116+
let mut variant_array_builder = VariantArrayBuilder::new(string_array.len());
117+
for json in &json_array {
118+
append_json(json, &mut variant_array_builder).unwrap();
119+
}
120+
let _ = variant_array_builder.build();
121+
});
122+
});
123+
69124
let input_array = StringArray::from_iter_values(random_json_structure(8000));
70125
let total_input_bytes = input_array
71126
.iter()
@@ -240,6 +295,22 @@ fn random_json_structure(count: usize) -> impl Iterator<Item = String> {
240295
(0..count).map(move |_| generator.next().to_string())
241296
}
242297

298+
fn random_structure(count: usize, max_fields: usize) -> impl Iterator<Item = String> {
299+
let mut generator = RandomJsonGenerator {
300+
null_weight: 5,
301+
string_weight: 25,
302+
number_weight: 25,
303+
boolean_weight: 10,
304+
object_weight: 25,
305+
array_weight: 0,
306+
max_fields,
307+
max_array_length: 0,
308+
max_depth: 1,
309+
..Default::default()
310+
};
311+
(0..count).map(move |_| generator.next_object().to_string())
312+
}
313+
243314
/// Creates JSON with random structure and fields.
244315
///
245316
/// Each type is created in proportion controlled by the
@@ -299,6 +370,82 @@ impl RandomJsonGenerator {
299370
&self.output_buffer
300371
}
301372

373+
fn next_object(&mut self) -> &str {
374+
self.output_buffer.clear();
375+
self.append_random_json_for_object();
376+
&self.output_buffer
377+
}
378+
379+
fn append_random_json_for_object(&mut self) {
380+
// use destructuring to ensure each field is used
381+
let Self {
382+
rng,
383+
null_weight,
384+
string_weight,
385+
number_weight,
386+
boolean_weight,
387+
max_fields,
388+
output_buffer,
389+
..
390+
} = self;
391+
392+
write!(output_buffer, "{{").unwrap();
393+
for i in 0..*max_fields {
394+
let key_length = rng.random_range(1..=20);
395+
let key: String = (0..key_length)
396+
.map(|_| rng.sample(Alphanumeric) as char)
397+
.collect();
398+
write!(output_buffer, "\"{key}\":").unwrap();
399+
400+
let total_weight = *null_weight + *string_weight + *number_weight + *boolean_weight;
401+
402+
// Generate a random number to determine the type
403+
let mut random_value: usize = rng.random_range(0..total_weight);
404+
405+
if random_value <= *null_weight {
406+
write!(output_buffer, "null").unwrap();
407+
} else {
408+
random_value -= *null_weight;
409+
410+
if random_value <= *string_weight {
411+
// Generate a random string between 1 and 20 characters
412+
let length = rng.random_range(1..=20);
413+
let random_string: String = (0..length)
414+
.map(|_| rng.sample(Alphanumeric) as char)
415+
.collect();
416+
write!(output_buffer, "\"{random_string}\"",).unwrap();
417+
} else {
418+
random_value -= *string_weight;
419+
420+
if random_value <= *number_weight {
421+
// 50% chance of generating an integer or a float
422+
if rng.random_bool(0.5) {
423+
// Generate a random integer
424+
let random_integer: i64 = rng.random_range(-1000..1000);
425+
write!(output_buffer, "{random_integer}",).unwrap();
426+
} else {
427+
// Generate a random float
428+
let random_float: f64 = rng.random_range(-1000.0..1000.0);
429+
write!(output_buffer, "{random_float}",).unwrap();
430+
}
431+
} else {
432+
random_value -= *number_weight;
433+
434+
if random_value <= *boolean_weight {
435+
// Generate a random boolean
436+
let random_boolean: bool = rng.random();
437+
write!(output_buffer, "{random_boolean}",).unwrap();
438+
}
439+
}
440+
}
441+
}
442+
if i < *max_fields - 1 {
443+
write!(output_buffer, ",").unwrap();
444+
}
445+
}
446+
write!(&mut self.output_buffer, "}}").unwrap();
447+
}
448+
302449
/// Appends a random JSON value to the output buffer.
303450
fn append_random_json(&mut self, current_depth: usize) {
304451
// use destructuring to ensure each field is used

parquet-variant-json/src/from_json.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ fn variant_from_number<'m, 'v>(n: &Number) -> Result<Variant<'m, 'v>, ArrowError
102102
}
103103
}
104104

105-
fn append_json(json: &Value, builder: &mut impl VariantBuilderExt) -> Result<(), ArrowError> {
105+
pub fn append_json(json: &Value, builder: &mut impl VariantBuilderExt) -> Result<(), ArrowError> {
106106
match json {
107107
Value::Null => builder.append_value(Variant::Null),
108108
Value::Bool(b) => builder.append_value(*b),

parquet-variant-json/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,5 +34,5 @@
3434
mod from_json;
3535
mod to_json;
3636

37-
pub use from_json::JsonToVariant;
37+
pub use from_json::{JsonToVariant, append_json};
3838
pub use to_json::VariantToJson;

parquet-variant/src/builder.rs

Lines changed: 6 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
// KIND, either express or implied. See the License for the
1515
// specific language governing permissions and limitations
1616
// under the License.
17-
use crate::decoder::{VariantBasicType, VariantPrimitiveType};
17+
use crate::decoder::{OffsetSizeBytes, VariantBasicType, VariantPrimitiveType};
1818
use crate::{
1919
ShortString, Variant, VariantDecimal4, VariantDecimal8, VariantDecimal16, VariantList,
2020
VariantMetadata, VariantObject,
@@ -43,21 +43,15 @@ fn short_string_header(len: usize) -> u8 {
4343
(len as u8) << 2 | VariantBasicType::ShortString as u8
4444
}
4545

46-
pub(crate) fn int_size(v: usize) -> u8 {
46+
pub(crate) fn int_size(v: usize) -> OffsetSizeBytes {
4747
match v {
48-
0..=0xFF => 1,
49-
0x100..=0xFFFF => 2,
50-
0x10000..=0xFFFFFF => 3,
51-
_ => 4,
48+
0..=0xFF => OffsetSizeBytes::One,
49+
0x100..=0xFFFF => OffsetSizeBytes::Two,
50+
0x10000..=0xFFFFFF => OffsetSizeBytes::Three,
51+
_ => OffsetSizeBytes::Four,
5252
}
5353
}
5454

55-
/// Write little-endian integer to buffer at a specific position
56-
fn write_offset_at_pos(buf: &mut [u8], start_pos: usize, value: usize, nbytes: u8) {
57-
let bytes = value.to_le_bytes();
58-
buf[start_pos..start_pos + nbytes as usize].copy_from_slice(&bytes[..nbytes as usize]);
59-
}
60-
6155
/// Wrapper around a `Vec<u8>` that provides methods for appending
6256
/// primitive values, variant types, and metadata.
6357
///
@@ -358,63 +352,6 @@ impl ValueBuilder {
358352
);
359353
state.finish();
360354
}
361-
362-
/// Writes out the header byte for a variant object or list, from the starting position
363-
/// of the builder, will return the position after this write
364-
pub(crate) fn append_header_start_from_buf_pos(
365-
&mut self,
366-
start_pos: usize, // the start position where the header will be inserted
367-
header_byte: u8,
368-
is_large: bool,
369-
num_fields: usize,
370-
) -> usize {
371-
let buffer = self.inner_mut();
372-
373-
// Write header at the original start position
374-
let mut header_pos = start_pos;
375-
376-
// Write header byte
377-
buffer[header_pos] = header_byte;
378-
header_pos += 1;
379-
380-
// Write number of fields
381-
if is_large {
382-
buffer[header_pos..header_pos + 4].copy_from_slice(&(num_fields as u32).to_le_bytes());
383-
header_pos += 4;
384-
} else {
385-
buffer[header_pos] = num_fields as u8;
386-
header_pos += 1;
387-
}
388-
389-
header_pos
390-
}
391-
392-
/// Writes out the offsets for an array of offsets, including the final offset (data size).
393-
/// from the starting position of the buffer, will return the position after this write
394-
pub(crate) fn append_offset_array_start_from_buf_pos(
395-
&mut self,
396-
start_pos: usize,
397-
offsets: impl IntoIterator<Item = usize>,
398-
data_size: Option<usize>,
399-
nbytes: u8,
400-
) -> usize {
401-
let buf = self.inner_mut();
402-
403-
let mut current_pos = start_pos;
404-
for relative_offset in offsets {
405-
write_offset_at_pos(buf, current_pos, relative_offset, nbytes);
406-
current_pos += nbytes as usize;
407-
}
408-
409-
// Write data_size
410-
if let Some(data_size) = data_size {
411-
// Write data_size at the end of the offsets
412-
write_offset_at_pos(buf, current_pos, data_size, nbytes);
413-
current_pos += nbytes as usize;
414-
}
415-
416-
current_pos
417-
}
418355
}
419356

420357
/// A trait for managing state specific to different builder types.

parquet-variant/src/builder/list.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ impl<'a, S: BuilderSpecificState> ListBuilder<'a, S> {
174174
// Make sure to reserve enough capacity to handle the extra bytes we'll truncate.
175175
let mut bytes_to_splice = Vec::with_capacity(header_size + 3);
176176
// Write header
177-
let header = array_header(is_large, offset_size);
177+
let header = array_header(is_large, offset_size as _);
178178
bytes_to_splice.push(header);
179179

180180
append_packed_u32(&mut bytes_to_splice, num_elements as u32, num_elements_size);

parquet-variant/src/builder/metadata.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ impl WritableMetadataBuilder {
206206

207207
// Determine appropriate offset size based on the larger of dict size or total string size
208208
let max_offset = std::cmp::max(total_dict_size, nkeys);
209-
let offset_size = int_size(max_offset);
209+
let offset_size = int_size(max_offset) as u8;
210210

211211
let offset_start = 1 + offset_size as usize;
212212
let string_start = offset_start + (nkeys + 1) * offset_size as usize;

0 commit comments

Comments
 (0)