Skip to content

Commit 757f11c

Browse files
committed
parquet: batch consecutive null/empty rows in write_list
Restructure `write_list()` to accumulate consecutive null and empty rows and flush them in a single `visit_leaves()` call using `extend(repeat_n(...))`, instead of calling `visit_leaves()` per row. With sparse data (99% nulls), a 4096-row batch previously triggered ~4000 individual tree traversals, each pushing a single value per leaf. Now consecutive null/empty runs are collapsed into one traversal that extends all leaf level buffers in bulk. This follows the same pattern already used by `write_struct()`. The `write_non_null_slice` path is unchanged since each non-null row has different offsets and cannot be batched. Benchmark results (list_primitive_sparse_99pct_null, 65536 rows, 99% nulls): benchmark baseline optimized change sparse_list/default 173.01 µs 147.14 µs -15% sparse_list/parquet_2 178.83 µs 147.19 µs -18% sparse_list/bloom_filter 241.27 µs 203.94 µs -15% sparse_list/zstd 196.23 µs 168.50 µs -14% sparse_list/zstd_parquet_2 155.58 µs 155.58 µs ~0% (baseline noisy) No measurable change on list benchmarks with 25% nulls. Signed-off-by: Hippolyte Barraud <hippolyte.barraud@datadoghq.com>
1 parent 9d0e8be commit 757f11c

File tree

2 files changed

+53
-19
lines changed

2 files changed

+53
-19
lines changed

parquet/benches/arrow_writer.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,10 @@ fn create_batches() -> Vec<(&'static str, RecordBatch)> {
411411
let batch = create_list_primitive_bench_batch_non_null(BATCH_SIZE, 0.25, 0.75).unwrap();
412412
batches.push(("list_primitive_non_null", batch));
413413

414+
// Sparse list: 99% nulls — exercises batched null handling in write_list
415+
let batch = create_list_primitive_bench_batch(BATCH_SIZE, 0.99, 0.75).unwrap();
416+
batches.push(("list_primitive_sparse_99pct_null", batch));
417+
414418
batches
415419
}
416420

parquet/src/arrow/arrow_writer/levels.rs

Lines changed: 49 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -335,51 +335,81 @@ impl LevelInfoBuilder {
335335
})
336336
};
337337

338-
let write_empty_slice = |child: &mut LevelInfoBuilder| {
339-
child.visit_leaves(|leaf| {
340-
let rep_levels = leaf.rep_levels.as_mut().unwrap();
341-
rep_levels.push(ctx.rep_level - 1);
342-
let def_levels = leaf.def_levels.as_mut().unwrap();
343-
def_levels.push(ctx.def_level - 1);
344-
})
338+
let flush_nulls = |child: &mut LevelInfoBuilder, count: usize| {
339+
if count > 0 {
340+
child.visit_leaves(|leaf| {
341+
leaf.rep_levels
342+
.as_mut()
343+
.unwrap()
344+
.extend(std::iter::repeat_n(ctx.rep_level - 1, count));
345+
leaf.def_levels
346+
.as_mut()
347+
.unwrap()
348+
.extend(std::iter::repeat_n(ctx.def_level - 2, count));
349+
});
350+
}
345351
};
346352

347-
let write_null_slice = |child: &mut LevelInfoBuilder| {
348-
child.visit_leaves(|leaf| {
349-
let rep_levels = leaf.rep_levels.as_mut().unwrap();
350-
rep_levels.push(ctx.rep_level - 1);
351-
let def_levels = leaf.def_levels.as_mut().unwrap();
352-
def_levels.push(ctx.def_level - 2);
353-
})
353+
let flush_empties = |child: &mut LevelInfoBuilder, count: usize| {
354+
if count > 0 {
355+
child.visit_leaves(|leaf| {
356+
leaf.rep_levels
357+
.as_mut()
358+
.unwrap()
359+
.extend(std::iter::repeat_n(ctx.rep_level - 1, count));
360+
leaf.def_levels
361+
.as_mut()
362+
.unwrap()
363+
.extend(std::iter::repeat_n(ctx.def_level - 1, count));
364+
});
365+
}
354366
};
355367

356368
match nulls {
357369
Some(nulls) => {
358370
let null_offset = range.start;
371+
let mut pending_nulls: usize = 0;
372+
let mut pending_empties: usize = 0;
373+
359374
// TODO: Faster bitmask iteration (#1757)
360375
for (idx, w) in offsets.windows(2).enumerate() {
361376
let is_valid = nulls.is_valid(idx + null_offset);
362377
let start_idx = w[0].as_usize();
363378
let end_idx = w[1].as_usize();
379+
364380
if !is_valid {
365-
write_null_slice(child)
381+
flush_empties(child, pending_empties);
382+
pending_empties = 0;
383+
pending_nulls += 1;
366384
} else if start_idx == end_idx {
367-
write_empty_slice(child)
385+
flush_nulls(child, pending_nulls);
386+
pending_nulls = 0;
387+
pending_empties += 1;
368388
} else {
369-
write_non_null_slice(child, start_idx, end_idx)
389+
flush_nulls(child, pending_nulls);
390+
pending_nulls = 0;
391+
flush_empties(child, pending_empties);
392+
pending_empties = 0;
393+
write_non_null_slice(child, start_idx, end_idx);
370394
}
371395
}
396+
flush_nulls(child, pending_nulls);
397+
flush_empties(child, pending_empties);
372398
}
373399
None => {
400+
let mut pending_empties: usize = 0;
374401
for w in offsets.windows(2) {
375402
let start_idx = w[0].as_usize();
376403
let end_idx = w[1].as_usize();
377404
if start_idx == end_idx {
378-
write_empty_slice(child)
405+
pending_empties += 1;
379406
} else {
380-
write_non_null_slice(child, start_idx, end_idx)
407+
flush_empties(child, pending_empties);
408+
pending_empties = 0;
409+
write_non_null_slice(child, start_idx, end_idx);
381410
}
382411
}
412+
flush_empties(child, pending_empties);
383413
}
384414
}
385415
}

0 commit comments

Comments
 (0)