Skip to content

Commit d783743

Browse files
committed
use simpler API for writing batches
1 parent f622a3d commit d783743

File tree

1 file changed

+19
-46
lines changed

1 file changed

+19
-46
lines changed

parquet/benches/parquet_round_trip.rs

Lines changed: 19 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ use bytes::Bytes;
2525
use criterion::{Criterion, criterion_group, criterion_main};
2626
use parquet::arrow::ArrowWriter;
2727
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
28-
use parquet::arrow::arrow_writer::{ArrowColumnChunk, ArrowColumnWriter, compute_leaves};
2928
use parquet::basic::Encoding;
3029
use parquet::file::properties::WriterProperties;
3130
use rand::{Rng, SeedableRng, distr::StandardUniform, prelude::StdRng};
@@ -44,7 +43,7 @@ pub enum ColumnType {
4443
// arrow::util::bench_util::create_fsb_array with a seed
4544

4645
/// Creates a random (but fixed-seeded) array of fixed size with a given null density and length
47-
pub fn create_fsb_array_with_seed<Offset: OffsetSizeTrait>(
46+
fn create_fsb_array_with_seed<Offset: OffsetSizeTrait>(
4847
size: usize,
4948
null_density: f32,
5049
fixed_len: i32,
@@ -70,7 +69,7 @@ pub fn create_fsb_array_with_seed<Offset: OffsetSizeTrait>(
7069
.unwrap()
7170
}
7271

73-
pub fn schema(column_type: ColumnType, num_columns: usize) -> Arc<Schema> {
72+
fn schema(column_type: ColumnType, num_columns: usize) -> Arc<Schema> {
7473
let field_type = match column_type {
7574
ColumnType::Binary(_) => DataType::Utf8,
7675
ColumnType::FixedLen(size) => DataType::FixedSizeBinary(size),
@@ -86,7 +85,7 @@ pub fn schema(column_type: ColumnType, num_columns: usize) -> Arc<Schema> {
8685
Arc::new(Schema::new(fields))
8786
}
8887

89-
pub fn create_batch(
88+
fn create_batch(
9089
schema: &Arc<Schema>,
9190
column_type: ColumnType,
9291
seed: usize,
@@ -236,40 +235,11 @@ impl ParquetFileSpec {
236235
}
237236
}
238237

239-
pub fn encode_row_group(
240-
schema: &Arc<Schema>,
241-
spec: &ParquetFileSpec,
242-
mut column_writers: Vec<ArrowColumnWriter>,
243-
) -> Vec<ArrowColumnChunk> {
238+
fn file_from_spec(spec: ParquetFileSpec, buffer: &mut Vec<u8>) {
244239
const SEED: usize = 31;
245240
let num_rows = spec.rows_per_row_group.min(100);
246-
// use the same batch repeatedly otherwise the data generation will dominate the time
247-
let batch = create_batch(schema, spec.column_type, SEED, spec.num_columns, num_rows);
248-
let mut rows_written = 0;
249-
while rows_written < spec.rows_per_row_group {
250-
let rows_left = spec.rows_per_row_group - rows_written;
251-
let batch = if rows_left < batch.num_rows() {
252-
batch.slice(0, rows_left)
253-
} else {
254-
batch.clone()
255-
};
256-
257-
let mut writers = column_writers.iter_mut();
258-
for (field, column) in batch.schema().fields().iter().zip(batch.columns()) {
259-
for leaf in compute_leaves(field.as_ref(), column).unwrap() {
260-
writers.next().unwrap().write(&leaf).unwrap()
261-
}
262-
}
263-
rows_written += batch.num_rows();
264-
}
241+
let rows_to_write = spec.num_row_groups * spec.rows_per_row_group;
265242

266-
column_writers
267-
.into_iter()
268-
.map(|writer| writer.close().unwrap())
269-
.collect()
270-
}
271-
272-
pub fn file_from_spec(spec: ParquetFileSpec, buffer: &mut Vec<u8>) {
273243
let schema = schema(spec.column_type, spec.num_columns);
274244
let props = WriterProperties::builder()
275245
.set_max_row_group_size(spec.rows_per_row_group)
@@ -279,20 +249,23 @@ pub fn file_from_spec(spec: ParquetFileSpec, buffer: &mut Vec<u8>) {
279249
.set_compression(parquet::basic::Compression::UNCOMPRESSED)
280250
.build();
281251

282-
let writer = ArrowWriter::try_new(buffer, schema.clone(), Some(props)).unwrap();
283-
let (mut file_writer, row_group_factory) = writer.into_serialized_writer().unwrap();
252+
let mut writer = ArrowWriter::try_new(buffer, schema.clone(), Some(props)).unwrap();
284253

285-
for rg in 0..spec.num_row_groups {
286-
let col_writers = row_group_factory.create_column_writers(rg).unwrap();
254+
// use the same batch repeatedly otherwise the data generation will dominate the time
255+
let batch = create_batch(&schema, spec.column_type, SEED, spec.num_columns, num_rows);
287256

288-
let encoded_columns = encode_row_group(&schema, &spec, col_writers);
289-
let mut rg_writer = file_writer.next_row_group().unwrap();
290-
for col_chunk in encoded_columns.into_iter() {
291-
col_chunk.append_to_row_group(&mut rg_writer).unwrap();
292-
}
293-
rg_writer.close().unwrap();
257+
let mut rows_written = 0;
258+
while rows_written < rows_to_write {
259+
writer.write(&batch).unwrap();
260+
rows_written += num_rows;
294261
}
295-
file_writer.close().unwrap();
262+
263+
let parquet_metadata = writer.close().unwrap();
264+
assert_eq!(parquet_metadata.num_row_groups(), spec.num_row_groups);
265+
assert_eq!(
266+
parquet_metadata.file_metadata().num_rows() as usize,
267+
rows_to_write
268+
);
296269
}
297270

298271
fn read_write(c: &mut Criterion, spec: ParquetFileSpec, msg: &str) {

0 commit comments

Comments
 (0)