Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 87 additions & 123 deletions arrow-json/benches/json_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,38 @@ fn decode_and_flush(decoder: &mut Decoder, data: &[u8]) {
}
}

fn bench_decode_schema(c: &mut Criterion, name: &str, data: &[u8], schema: Arc<Schema>) {
let mut group = c.benchmark_group(name);
group.throughput(Throughput::Bytes(data.len() as u64));
group.sample_size(50);
group.measurement_time(std::time::Duration::from_secs(5));
group.warm_up_time(std::time::Duration::from_secs(2));
group.sampling_mode(SamplingMode::Flat);
group.bench_function(BenchmarkId::from_parameter(ROWS), |b| {
b.iter(|| {
let mut decoder = ReaderBuilder::new(schema.clone())
.with_batch_size(BATCH_SIZE)
.build_decoder()
.unwrap();
decode_and_flush(&mut decoder, data);
})
});
group.finish();
}

fn bench_serialize_values(c: &mut Criterion, name: &str, values: &[Value], schema: Arc<Schema>) {
c.bench_function(name, |b| {
b.iter(|| {
let mut decoder = ReaderBuilder::new(schema.clone())
.with_batch_size(BATCH_SIZE)
.build_decoder()
.unwrap();
decoder.serialize(values).unwrap();
while let Some(_batch) = decoder.flush().unwrap() {}
})
});
}

fn build_schema(field_count: usize) -> Arc<Schema> {
// Builds a schema with fields named f0..f{field_count-1}, all Int64 and non-nullable.
let fields: Vec<Field> = (0..field_count)
Expand Down Expand Up @@ -118,33 +150,13 @@ fn build_wide_values(rows: usize, fields: usize) -> Vec<Value> {
fn bench_decode_wide_object(c: &mut Criterion) {
let data = build_wide_json(ROWS, WIDE_FIELDS);
let schema = build_schema(WIDE_FIELDS);

c.bench_function("decode_wide_object_i64_json", |b| {
b.iter(|| {
let mut decoder = ReaderBuilder::new(schema.clone())
.with_batch_size(BATCH_SIZE)
.build_decoder()
.unwrap();
decode_and_flush(&mut decoder, &data);
})
});
bench_decode_schema(c, "decode_wide_object_json", &data, schema);
}

fn bench_serialize_wide_object(c: &mut Criterion) {
let values = build_wide_values(ROWS, WIDE_FIELDS);
let schema = build_schema(WIDE_FIELDS);

c.bench_function("decode_wide_object_i64_serialize", |b| {
b.iter(|| {
let mut decoder = ReaderBuilder::new(schema.clone())
.with_batch_size(BATCH_SIZE)
.build_decoder()
.unwrap();

decoder.serialize(&values).unwrap();
while let Some(_batch) = decoder.flush().unwrap() {}
})
});
bench_serialize_values(c, "decode_wide_object_serialize", &values, schema);
}

fn bench_decode_binary(c: &mut Criterion, name: &str, data: &[u8], field: Arc<Field>) {
Expand Down Expand Up @@ -197,25 +209,6 @@ fn bench_binary_hex(c: &mut Criterion) {
bench_decode_binary(c, "decode_binary_view_hex_json", &binary_data, view_field);
}

fn bench_decode_schema(c: &mut Criterion, name: &str, data: &[u8], schema: Arc<Schema>) {
let mut group = c.benchmark_group(name);
group.throughput(Throughput::Bytes(data.len() as u64));
group.sample_size(50);
group.measurement_time(std::time::Duration::from_secs(5));
group.warm_up_time(std::time::Duration::from_secs(2));
group.sampling_mode(SamplingMode::Flat);
group.bench_function(BenchmarkId::from_parameter(ROWS), |b| {
b.iter(|| {
let mut decoder = ReaderBuilder::new(schema.clone())
.with_batch_size(BATCH_SIZE)
.build_decoder()
.unwrap();
decode_and_flush(&mut decoder, data);
})
});
group.finish();
}
Comment on lines -200 to -217
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to the top


fn build_wide_projection_json(rows: usize, total_fields: usize) -> Vec<u8> {
// Estimate: each field ~15 bytes ("fXX":VVVVVVV,), total ~15*100 + overhead
let per_row_size = total_fields * 15 + 10;
Expand Down Expand Up @@ -291,52 +284,49 @@ fn build_list_values(rows: usize, elements: usize) -> Vec<Value> {
out
}

fn build_list_schema() -> Arc<Schema> {
Arc::new(Schema::new(vec![Field::new(
"list",
DataType::List(Arc::new(Field::new_list_field(DataType::Int64, false))),
false,
)]))
}

fn bench_decode_list(c: &mut Criterion) {
let schema = build_list_schema();

// Short lists: tests list handling overhead (few elements per row)
let short_data = build_list_json(ROWS, SHORT_LIST_ELEMENTS);
bench_decode_schema(c, "decode_short_list_i64_json", &short_data, schema.clone());

// Long lists: tests child element decode throughput (many elements per row)
let long_data = build_list_json(ROWS, LONG_LIST_ELEMENTS);
bench_decode_schema(c, "decode_long_list_i64_json", &long_data, schema);
let child = Arc::new(Field::new_list_field(DataType::Int64, false));

for (dt, label) in [
(DataType::List(child.clone()), "list"),
(DataType::ListView(child), "list_view"),
] {
let schema = Arc::new(Schema::new(vec![Field::new("list", dt, false)]));
bench_decode_schema(
c,
&format!("decode_{label}_short_json"),
&short_data,
schema.clone(),
);
bench_decode_schema(c, &format!("decode_{label}_long_json"), &long_data, schema);
}
}

fn bench_serialize_list(c: &mut Criterion) {
let schema = build_list_schema();

let short_values = build_list_values(ROWS, SHORT_LIST_ELEMENTS);
c.bench_function("decode_short_list_i64_serialize", |b| {
b.iter(|| {
let mut decoder = ReaderBuilder::new(schema.clone())
.with_batch_size(BATCH_SIZE)
.build_decoder()
.unwrap();
decoder.serialize(&short_values).unwrap();
while let Some(_batch) = decoder.flush().unwrap() {}
})
});

let long_values = build_list_values(ROWS, LONG_LIST_ELEMENTS);
c.bench_function("decode_long_list_i64_serialize", |b| {
b.iter(|| {
let mut decoder = ReaderBuilder::new(schema.clone())
.with_batch_size(BATCH_SIZE)
.build_decoder()
.unwrap();
decoder.serialize(&long_values).unwrap();
while let Some(_batch) = decoder.flush().unwrap() {}
})
});
let child = Arc::new(Field::new_list_field(DataType::Int64, false));

for (dt, label) in [
(DataType::List(child.clone()), "list"),
(DataType::ListView(child), "list_view"),
] {
let schema = Arc::new(Schema::new(vec![Field::new("list", dt, false)]));
bench_serialize_values(
c,
&format!("decode_{label}_short_serialize"),
&short_values,
schema.clone(),
);
bench_serialize_values(
c,
&format!("decode_{label}_long_serialize"),
&long_values,
schema,
);
}
}

fn build_map_json(rows: usize, entries: usize) -> Vec<u8> {
Expand Down Expand Up @@ -394,38 +384,25 @@ fn bench_decode_map(c: &mut Criterion) {
let schema = build_map_schema();

let small_data = build_map_json(ROWS, SMALL_MAP_ENTRIES);
bench_decode_schema(c, "decode_small_map_json", &small_data, schema.clone());
bench_decode_schema(c, "decode_map_small_json", &small_data, schema.clone());

let large_data = build_map_json(ROWS, LARGE_MAP_ENTRIES);
bench_decode_schema(c, "decode_large_map_json", &large_data, schema);
bench_decode_schema(c, "decode_map_large_json", &large_data, schema);
}

fn bench_serialize_map(c: &mut Criterion) {
let schema = build_map_schema();

let small_values = build_map_values(ROWS, SMALL_MAP_ENTRIES);
c.bench_function("decode_small_map_serialize", |b| {
b.iter(|| {
let mut decoder = ReaderBuilder::new(schema.clone())
.with_batch_size(BATCH_SIZE)
.build_decoder()
.unwrap();
decoder.serialize(&small_values).unwrap();
while let Some(_batch) = decoder.flush().unwrap() {}
})
});
bench_serialize_values(
c,
"decode_map_small_serialize",
&small_values,
schema.clone(),
);

let large_values = build_map_values(ROWS, LARGE_MAP_ENTRIES);
c.bench_function("decode_large_map_serialize", |b| {
b.iter(|| {
let mut decoder = ReaderBuilder::new(schema.clone())
.with_batch_size(BATCH_SIZE)
.build_decoder()
.unwrap();
decoder.serialize(&large_values).unwrap();
while let Some(_batch) = decoder.flush().unwrap() {}
})
});
bench_serialize_values(c, "decode_map_large_serialize", &large_values, schema);
}

fn build_ree_json(rows: usize, run_length: usize) -> Vec<u8> {
Expand Down Expand Up @@ -460,38 +437,25 @@ fn bench_decode_ree(c: &mut Criterion) {
let schema = build_ree_schema();

let short_data = build_ree_json(ROWS, SHORT_REE_RUN_LENGTH);
bench_decode_schema(c, "decode_short_ree_runs_json", &short_data, schema.clone());
bench_decode_schema(c, "decode_ree_short_json", &short_data, schema.clone());

let long_data = build_ree_json(ROWS, LONG_REE_RUN_LENGTH);
bench_decode_schema(c, "decode_long_ree_runs_json", &long_data, schema);
bench_decode_schema(c, "decode_ree_long_json", &long_data, schema);
}

fn bench_serialize_ree(c: &mut Criterion) {
let schema = build_ree_schema();

let short_values = build_ree_values(ROWS, SHORT_REE_RUN_LENGTH);
c.bench_function("decode_short_ree_runs_serialize", |b| {
b.iter(|| {
let mut decoder = ReaderBuilder::new(schema.clone())
.with_batch_size(BATCH_SIZE)
.build_decoder()
.unwrap();
decoder.serialize(&short_values).unwrap();
while let Some(_batch) = decoder.flush().unwrap() {}
})
});
bench_serialize_values(
c,
"decode_ree_short_serialize",
&short_values,
schema.clone(),
);

let long_values = build_ree_values(ROWS, LONG_REE_RUN_LENGTH);
c.bench_function("decode_long_ree_runs_serialize", |b| {
b.iter(|| {
let mut decoder = ReaderBuilder::new(schema.clone())
.with_batch_size(BATCH_SIZE)
.build_decoder()
.unwrap();
decoder.serialize(&long_values).unwrap();
while let Some(_batch) = decoder.flush().unwrap() {}
})
});
bench_serialize_values(c, "decode_ree_long_serialize", &long_values, schema);
}

fn bench_schema_inference(c: &mut Criterion) {
Expand Down
Loading