Skip to content

Commit 52376bc

Browse files
Merge branch 'main' into issue-9519
2 parents 97e687e + 61b5763 commit 52376bc

File tree

10 files changed

+694
-245
lines changed

10 files changed

+694
-245
lines changed

arrow-buffer/src/buffer/mutable.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -450,7 +450,13 @@ impl MutableBuffer {
450450

451451
/// Clear all existing data from this buffer.
452452
pub fn clear(&mut self) {
453-
self.len = 0
453+
self.len = 0;
454+
#[cfg(feature = "pool")]
455+
{
456+
if let Some(reservation) = self.reservation.lock().unwrap().as_mut() {
457+
reservation.resize(self.len);
458+
}
459+
}
454460
}
455461

456462
/// Returns the data stored in this buffer as a slice.
@@ -1371,7 +1377,7 @@ mod tests {
13711377
assert_eq!(pool.used(), 40);
13721378

13731379
// Truncate to zero
1374-
buffer.truncate(0);
1380+
buffer.clear();
13751381
assert_eq!(buffer.len(), 0);
13761382
assert_eq!(pool.used(), 0);
13771383
}

arrow-json/benches/json_reader.rs

Lines changed: 184 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,28 @@ use std::fmt::Write;
2828
use std::hint::black_box;
2929
use std::sync::Arc;
3030

31+
// Shared
3132
const ROWS: usize = 1 << 17; // 128K rows
3233
const BATCH_SIZE: usize = 1 << 13; // 8K rows per batch
3334

35+
// Wide object / struct
3436
const WIDE_FIELDS: usize = 64;
35-
const BINARY_BYTES: usize = 64;
3637
const WIDE_PROJECTION_TOTAL_FIELDS: usize = 100; // 100 fields total, select only 3
37-
const LIST_SHORT_ELEMENTS: usize = 5;
38-
const LIST_LONG_ELEMENTS: usize = 100;
38+
39+
// Binary
40+
const BINARY_BYTES: usize = 64;
41+
42+
// List
43+
const SHORT_LIST_ELEMENTS: usize = 5;
44+
const LONG_LIST_ELEMENTS: usize = 100;
45+
46+
// Map
47+
const SMALL_MAP_ENTRIES: usize = 5;
48+
const LARGE_MAP_ENTRIES: usize = 50;
49+
50+
// Run-end encoded
51+
const SHORT_REE_RUN_LENGTH: usize = 2;
52+
const LONG_REE_RUN_LENGTH: usize = 100;
3953

4054
fn decode_and_flush(decoder: &mut Decoder, data: &[u8]) {
4155
let mut offset = 0;
@@ -289,19 +303,174 @@ fn bench_decode_list(c: &mut Criterion) {
289303
let schema = build_list_schema();
290304

291305
// Short lists: tests list handling overhead (few elements per row)
292-
let short_data = build_list_json(ROWS, LIST_SHORT_ELEMENTS);
293-
bench_decode_schema(c, "decode_list_short_i64_json", &short_data, schema.clone());
306+
let short_data = build_list_json(ROWS, SHORT_LIST_ELEMENTS);
307+
bench_decode_schema(c, "decode_short_list_i64_json", &short_data, schema.clone());
294308

295309
// Long lists: tests child element decode throughput (many elements per row)
296-
let long_data = build_list_json(ROWS, LIST_LONG_ELEMENTS);
297-
bench_decode_schema(c, "decode_list_long_i64_json", &long_data, schema);
310+
let long_data = build_list_json(ROWS, LONG_LIST_ELEMENTS);
311+
bench_decode_schema(c, "decode_long_list_i64_json", &long_data, schema);
298312
}
299313

300314
fn bench_serialize_list(c: &mut Criterion) {
301315
let schema = build_list_schema();
302316

303-
let short_values = build_list_values(ROWS, LIST_SHORT_ELEMENTS);
304-
c.bench_function("decode_list_short_i64_serialize", |b| {
317+
let short_values = build_list_values(ROWS, SHORT_LIST_ELEMENTS);
318+
c.bench_function("decode_short_list_i64_serialize", |b| {
319+
b.iter(|| {
320+
let mut decoder = ReaderBuilder::new(schema.clone())
321+
.with_batch_size(BATCH_SIZE)
322+
.build_decoder()
323+
.unwrap();
324+
decoder.serialize(&short_values).unwrap();
325+
while let Some(_batch) = decoder.flush().unwrap() {}
326+
})
327+
});
328+
329+
let long_values = build_list_values(ROWS, LONG_LIST_ELEMENTS);
330+
c.bench_function("decode_long_list_i64_serialize", |b| {
331+
b.iter(|| {
332+
let mut decoder = ReaderBuilder::new(schema.clone())
333+
.with_batch_size(BATCH_SIZE)
334+
.build_decoder()
335+
.unwrap();
336+
decoder.serialize(&long_values).unwrap();
337+
while let Some(_batch) = decoder.flush().unwrap() {}
338+
})
339+
});
340+
}
341+
342+
fn build_map_json(rows: usize, entries: usize) -> Vec<u8> {
343+
let mut out = String::with_capacity(rows * (entries * 20 + 16));
344+
for row in 0..rows {
345+
out.push_str("{\"map\":{");
346+
for i in 0..entries {
347+
if i > 0 {
348+
out.push(',');
349+
}
350+
write!(&mut out, "\"k{}\":{}", i, (row + i) as i64).unwrap();
351+
}
352+
out.push_str("}}\n");
353+
}
354+
out.into_bytes()
355+
}
356+
357+
fn build_map_values(rows: usize, entries: usize) -> Vec<Value> {
358+
let mut out = Vec::with_capacity(rows);
359+
for row in 0..rows {
360+
let mut inner = Map::with_capacity(entries);
361+
for i in 0..entries {
362+
inner.insert(
363+
format!("k{i}"),
364+
Value::Number(Number::from((row + i) as i64)),
365+
);
366+
}
367+
let mut map = Map::with_capacity(1);
368+
map.insert("map".to_string(), Value::Object(inner));
369+
out.push(Value::Object(map));
370+
}
371+
out
372+
}
373+
374+
fn build_map_schema() -> Arc<Schema> {
375+
let entries_field = Arc::new(Field::new(
376+
"entries",
377+
DataType::Struct(
378+
vec![
379+
Field::new("keys", DataType::Utf8, false),
380+
Field::new("values", DataType::Int64, true),
381+
]
382+
.into(),
383+
),
384+
false,
385+
));
386+
Arc::new(Schema::new(vec![Field::new(
387+
"map",
388+
DataType::Map(entries_field, false),
389+
false,
390+
)]))
391+
}
392+
393+
fn bench_decode_map(c: &mut Criterion) {
394+
let schema = build_map_schema();
395+
396+
let small_data = build_map_json(ROWS, SMALL_MAP_ENTRIES);
397+
bench_decode_schema(c, "decode_small_map_json", &small_data, schema.clone());
398+
399+
let large_data = build_map_json(ROWS, LARGE_MAP_ENTRIES);
400+
bench_decode_schema(c, "decode_large_map_json", &large_data, schema);
401+
}
402+
403+
fn bench_serialize_map(c: &mut Criterion) {
404+
let schema = build_map_schema();
405+
406+
let small_values = build_map_values(ROWS, SMALL_MAP_ENTRIES);
407+
c.bench_function("decode_small_map_serialize", |b| {
408+
b.iter(|| {
409+
let mut decoder = ReaderBuilder::new(schema.clone())
410+
.with_batch_size(BATCH_SIZE)
411+
.build_decoder()
412+
.unwrap();
413+
decoder.serialize(&small_values).unwrap();
414+
while let Some(_batch) = decoder.flush().unwrap() {}
415+
})
416+
});
417+
418+
let large_values = build_map_values(ROWS, LARGE_MAP_ENTRIES);
419+
c.bench_function("decode_large_map_serialize", |b| {
420+
b.iter(|| {
421+
let mut decoder = ReaderBuilder::new(schema.clone())
422+
.with_batch_size(BATCH_SIZE)
423+
.build_decoder()
424+
.unwrap();
425+
decoder.serialize(&large_values).unwrap();
426+
while let Some(_batch) = decoder.flush().unwrap() {}
427+
})
428+
});
429+
}
430+
431+
fn build_ree_json(rows: usize, run_length: usize) -> Vec<u8> {
432+
let mut out = String::with_capacity(rows * 24);
433+
for row in 0..rows {
434+
let value = (row / run_length) as i64;
435+
writeln!(&mut out, "{{\"val\":{value}}}").unwrap();
436+
}
437+
out.into_bytes()
438+
}
439+
440+
fn build_ree_values(rows: usize, run_length: usize) -> Vec<Value> {
441+
let mut out = Vec::with_capacity(rows);
442+
for row in 0..rows {
443+
let value = (row / run_length) as i64;
444+
let mut map = Map::with_capacity(1);
445+
map.insert("val".to_string(), Value::Number(Number::from(value)));
446+
out.push(Value::Object(map));
447+
}
448+
out
449+
}
450+
451+
fn build_ree_schema() -> Arc<Schema> {
452+
let ree_type = DataType::RunEndEncoded(
453+
Arc::new(Field::new("run_ends", DataType::Int32, false)),
454+
Arc::new(Field::new("values", DataType::Int64, true)),
455+
);
456+
Arc::new(Schema::new(vec![Field::new("val", ree_type, false)]))
457+
}
458+
459+
fn bench_decode_ree(c: &mut Criterion) {
460+
let schema = build_ree_schema();
461+
462+
let short_data = build_ree_json(ROWS, SHORT_REE_RUN_LENGTH);
463+
bench_decode_schema(c, "decode_short_ree_runs_json", &short_data, schema.clone());
464+
465+
let long_data = build_ree_json(ROWS, LONG_REE_RUN_LENGTH);
466+
bench_decode_schema(c, "decode_long_ree_runs_json", &long_data, schema);
467+
}
468+
469+
fn bench_serialize_ree(c: &mut Criterion) {
470+
let schema = build_ree_schema();
471+
472+
let short_values = build_ree_values(ROWS, SHORT_REE_RUN_LENGTH);
473+
c.bench_function("decode_short_ree_runs_serialize", |b| {
305474
b.iter(|| {
306475
let mut decoder = ReaderBuilder::new(schema.clone())
307476
.with_batch_size(BATCH_SIZE)
@@ -312,8 +481,8 @@ fn bench_serialize_list(c: &mut Criterion) {
312481
})
313482
});
314483

315-
let long_values = build_list_values(ROWS, LIST_LONG_ELEMENTS);
316-
c.bench_function("decode_list_long_i64_serialize", |b| {
484+
let long_values = build_ree_values(ROWS, LONG_REE_RUN_LENGTH);
485+
c.bench_function("decode_long_ree_runs_serialize", |b| {
317486
b.iter(|| {
318487
let mut decoder = ReaderBuilder::new(schema.clone())
319488
.with_batch_size(BATCH_SIZE)
@@ -402,6 +571,10 @@ criterion_group!(
402571
bench_wide_projection,
403572
bench_decode_list,
404573
bench_serialize_list,
574+
bench_decode_map,
575+
bench_serialize_map,
576+
bench_decode_ree,
577+
bench_serialize_ree,
405578
bench_schema_inference
406579
);
407580
criterion_main!(benches);

arrow-json/src/reader/value_iter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ impl<R: BufRead> Iterator for ValueIter<R> {
7373
}
7474

7575
loop {
76-
self.line_buf.truncate(0);
76+
self.line_buf.clear();
7777
match self.reader.read_line(&mut self.line_buf) {
7878
Ok(0) => {
7979
// read_line returns 0 when stream reached EOF

0 commit comments

Comments
 (0)