From 0689e240ca0d62d1b76bf771c9af61f959f6eda9 Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Mon, 5 Jan 2026 19:17:29 +0200 Subject: [PATCH 01/11] feat: add projection support to TapeDecoder for skipping unknown fields --- arrow-json/src/reader/mod.rs | 23 +++- arrow-json/src/reader/tape.rs | 223 +++++++++++++++++++++++++++++++--- 2 files changed, 227 insertions(+), 19 deletions(-) diff --git a/arrow-json/src/reader/mod.rs b/arrow-json/src/reader/mod.rs index f5fd1a8e7c38..ef1da72bb916 100644 --- a/arrow-json/src/reader/mod.rs +++ b/arrow-json/src/reader/mod.rs @@ -137,6 +137,7 @@ use crate::StructMode; use crate::reader::binary_array::{ BinaryArrayDecoder, BinaryViewDecoder, FixedSizeBinaryArrayDecoder, }; +use std::collections::HashSet; use std::io::BufRead; use std::sync::Arc; @@ -304,7 +305,7 @@ impl ReaderBuilder { }; let decoder = make_decoder( - data_type, + data_type.clone(), self.coerce_primitive, self.strict_mode, nullable, @@ -313,10 +314,28 @@ impl ReaderBuilder { let num_fields = self.schema.flattened_fields().len(); + // Extract projection: enable in non-strict mode to skip unknown fields + // In strict_mode, unknown fields cause errors, so projection skipping is not useful + // In non-strict mode, projection allows skipping fields not in the schema + // Performance overhead has been minimized via depth caching and short-circuit optimization + let projection = if self.strict_mode { + None + } else { + // Non-strict mode: always enable projection to skip unknown fields + match &data_type { + DataType::Struct(fields) if !fields.is_empty() => { + let field_names: HashSet = + fields.iter().map(|f| f.name().clone()).collect(); + Some(field_names) + } + _ => None, + } + }; + Ok(Decoder { decoder, is_field: self.is_field, - tape_decoder: TapeDecoder::new(self.batch_size, num_fields), + tape_decoder: TapeDecoder::new(self.batch_size, num_fields, projection), batch_size: self.batch_size, schema: self.schema, }) diff --git a/arrow-json/src/reader/tape.rs b/arrow-json/src/reader/tape.rs index 89ee3f778765..1901a0b6674c 100644 --- a/arrow-json/src/reader/tape.rs +++ b/arrow-json/src/reader/tape.rs @@ -19,6 +19,7 @@ use crate::reader::serializer::TapeSerializer; use arrow_schema::ArrowError; use memchr::memchr2; use serde_core::Serialize; +use std::collections::HashSet; use std::fmt::Write; /// We decode JSON to a flattened tape representation, @@ -237,8 +238,21 @@ enum DecoderState { /// /// Consists of `(literal, decoded length)` Literal(Literal, u8), + /// Skipping a value (for unprojected fields) + /// + /// Consists of: + /// - `depth`: Nesting level of objects/arrays being skipped (u32) + /// - `flags`: Bit-packed flags (in_string: bit 0, escape: bit 1) + SkipValue { + depth: u32, + flags: u8, + }, } +// Bit flags for SkipValue state +const SKIP_IN_STRING: u8 = 1 << 0; // 0x01 +const SKIP_ESCAPE: u8 = 1 << 1; // 0x02 + impl DecoderState { fn as_str(&self) -> &'static str { match self { @@ -251,6 +265,7 @@ impl DecoderState { DecoderState::Escape => "escape", DecoderState::Unicode(_, _, _) => "unicode literal", DecoderState::Literal(d, _) => d.as_str(), + DecoderState::SkipValue { .. } => "skip value", } } } @@ -315,12 +330,23 @@ pub struct TapeDecoder { /// A stack of [`DecoderState`] stack: Vec, + + /// Optional projection: set of field names to include + /// If None, all fields are parsed. If Some, only fields in the set are parsed. + projection: Option>, + + /// Cache current nesting depth to avoid O(depth) stack traversal on every field + /// Incremented when entering Object/List, decremented when exiting + current_nesting_depth: usize, } impl TapeDecoder { /// Create a new [`TapeDecoder`] with the provided batch size /// and an estimated number of fields in each row - pub fn new(batch_size: usize, num_fields: usize) -> Self { + /// + /// If `projection` is Some, only fields in the set will be parsed and written to the tape. + /// Other fields will be skipped during parsing. + pub fn new(batch_size: usize, num_fields: usize, projection: Option>) -> Self { let tokens_per_row = 2 + num_fields * 2; let mut offsets = Vec::with_capacity(batch_size * (num_fields * 2) + 1); offsets.push(0); @@ -335,6 +361,8 @@ impl TapeDecoder { cur_row: 0, bytes: Vec::with_capacity(num_fields * 2 * 8), stack: Vec::with_capacity(10), + projection, + current_nesting_depth: 0, } } @@ -372,6 +400,7 @@ impl TapeDecoder { let end_idx = self.elements.len() as u32; self.elements[start_idx as usize] = TapeElement::StartObject(end_idx); self.elements.push(TapeElement::EndObject(start_idx)); + self.current_nesting_depth -= 1; self.stack.pop(); } b => return Err(err(b, "parsing object")), @@ -387,6 +416,7 @@ impl TapeDecoder { let end_idx = self.elements.len() as u32; self.elements[start_idx as usize] = TapeElement::StartList(end_idx); self.elements.push(TapeElement::EndList(start_idx)); + self.current_nesting_depth -= 1; self.stack.pop(); } Some(_) => self.stack.push(DecoderState::Value), @@ -423,11 +453,13 @@ impl TapeDecoder { b'[' => { let idx = self.elements.len() as u32; self.elements.push(TapeElement::StartList(u32::MAX)); + self.current_nesting_depth += 1; DecoderState::List(idx) } b'{' => { let idx = self.elements.len() as u32; self.elements.push(TapeElement::StartObject(u32::MAX)); + self.current_nesting_depth += 1; DecoderState::Object(idx) } b => return Err(err(b, "parsing value")), @@ -449,7 +481,52 @@ impl TapeDecoder { DecoderState::Colon => { iter.skip_whitespace(); match next!(iter) { - b':' => self.stack.pop(), + b':' => { + self.stack.pop(); + + // Check projection: if the field is not in the projection set, + // replace the Value state with SkipValue + // IMPORTANT: Only apply projection at the top level (when there's exactly 1 Object state) + // Short-circuit: Check depth first (cheaper than Option unwrap + HashSet lookup) + if self.current_nesting_depth == 1 { + if let Some(ref projection) = self.projection { + // Get the field name from the last String element + if let Some(TapeElement::String(string_idx)) = + self.elements.last() + { + let string_idx = *string_idx as usize; + let start = self.offsets[string_idx]; + let end = self.offsets[string_idx + 1]; + let field_name = std::str::from_utf8( + &self.bytes[start..end], + ) + .map_err(|e| { + ArrowError::JsonError(format!( + "Invalid UTF-8 in field name: {}", + e + )) + })?; + + if !projection.contains(field_name) { + // Field not in projection: skip its value + // CRITICAL: Remove the field name from tape to maintain structure + // The tape must have paired field_name:value entries + self.elements.pop(); // Remove the String element + self.bytes.truncate(start); // Remove the field name bytes + self.offsets.pop(); // Remove the offset entry + + // Replace Value state with SkipValue + if let Some(last) = self.stack.last_mut() { + *last = DecoderState::SkipValue { + depth: 0, + flags: 0, // Both in_string and escape are false + }; + } + } + } + } + } + } b => return Err(err(b, "parsing colon")), }; } @@ -519,6 +596,118 @@ impl TapeDecoder { } *idx += 1; }, + // Skip a value for unprojected fields (optimized batch-processing version) + DecoderState::SkipValue { depth, flags } => { + loop { + if iter.is_empty() { + break; // Need more data, preserve state + } + + let in_string = (*flags & SKIP_IN_STRING) != 0; + let escape = (*flags & SKIP_ESCAPE) != 0; + + if in_string { + // Fast skip to next \ or " using SIMD + let _ = iter.skip_chrs(b'\\', b'"'); + + if iter.is_empty() { + break; + } + + let b = next!(iter); + match b { + b'\\' => *flags ^= SKIP_ESCAPE, // Toggle escape flag + b'"' if !escape => { + *flags &= !SKIP_IN_STRING; // Clear in_string flag + if *depth == 0 { + // String value ended at top level - check completion + iter.skip_whitespace(); + if let Some(next_b) = iter.peek() { + if matches!(next_b, b',' | b'}' | b']') { + self.stack.pop(); + break; + } + } + if iter.is_empty() { + break; // Need more data + } + } + } + _ if escape => *flags &= !SKIP_ESCAPE, // Clear escape flag + _ => {} + } + } else if *depth > 0 { + // Inside nested structure - fast skip to next structural character + let _ = iter + .advance_until(|b| matches!(b, b'"' | b'{' | b'[' | b'}' | b']')); + + if iter.is_empty() { + break; + } + + match next!(iter) { + b'"' => *flags |= SKIP_IN_STRING, // Set in_string flag + b'{' | b'[' => *depth += 1, + b'}' | b']' => { + *depth -= 1; + if *depth == 0 { + self.stack.pop(); + break; + } + } + _ => {} + } + } else { + // depth == 0: Skip simple value (number/literal/start of compound) + let _ = iter.advance_until(|b| { + matches!( + b, + b',' | b'}' + | b']' + | b' ' + | b'\n' + | b'\r' + | b'\t' + | b'"' + | b'{' + | b'[' + ) + }); + + if iter.is_empty() { + break; + } + + match iter.peek() { + Some(b',') | Some(b'}') | Some(b']') => { + self.stack.pop(); + break; + } + Some(b' ' | b'\n' | b'\r' | b'\t') => { + iter.skip_whitespace(); + if let Some(next_b) = iter.peek() { + if matches!(next_b, b',' | b'}' | b']') { + self.stack.pop(); + break; + } + } + if iter.is_empty() { + break; + } + } + Some(b'"') => { + next!(iter); + *flags |= SKIP_IN_STRING; // Set in_string flag + } + Some(b'{' | b'[') => { + next!(iter); + *depth += 1; + } + _ => {} + } + } + } + } } } @@ -767,7 +956,7 @@ mod tests { {"a": ["", "foo", ["bar", "c"]], "b": {"1": []}, "c": {"2": [1, 2, 3]} } "#; - let mut decoder = TapeDecoder::new(16, 2); + let mut decoder = TapeDecoder::new(16, 2, None); decoder.decode(a.as_bytes()).unwrap(); assert!(!decoder.has_partial_row()); assert_eq!(decoder.num_buffered_rows(), 7); @@ -877,21 +1066,21 @@ mod tests { #[test] fn test_invalid() { // Test invalid - let mut decoder = TapeDecoder::new(16, 2); + let mut decoder = TapeDecoder::new(16, 2, None); let err = decoder.decode(b"hello").unwrap_err().to_string(); assert_eq!( err, "Json error: Encountered unexpected 'h' whilst parsing value" ); - let mut decoder = TapeDecoder::new(16, 2); + let mut decoder = TapeDecoder::new(16, 2, None); let err = decoder.decode(b"{\"hello\": }").unwrap_err().to_string(); assert_eq!( err, "Json error: Encountered unexpected '}' whilst parsing value" ); - let mut decoder = TapeDecoder::new(16, 2); + let mut decoder = TapeDecoder::new(16, 2, None); let err = decoder .decode(b"{\"hello\": [ false, tru ]}") .unwrap_err() @@ -901,7 +1090,7 @@ mod tests { "Json error: Encountered unexpected ' ' whilst parsing literal" ); - let mut decoder = TapeDecoder::new(16, 2); + let mut decoder = TapeDecoder::new(16, 2, None); let err = decoder .decode(b"{\"hello\": \"\\ud8\"}") .unwrap_err() @@ -912,7 +1101,7 @@ mod tests { ); // Missing surrogate pair - let mut decoder = TapeDecoder::new(16, 2); + let mut decoder = TapeDecoder::new(16, 2, None); let err = decoder .decode(b"{\"hello\": \"\\ud83d\"}") .unwrap_err() @@ -923,40 +1112,40 @@ mod tests { ); // Test truncation - let mut decoder = TapeDecoder::new(16, 2); + let mut decoder = TapeDecoder::new(16, 2, None); decoder.decode(b"{\"he").unwrap(); assert!(decoder.has_partial_row()); assert_eq!(decoder.num_buffered_rows(), 1); let err = decoder.finish().unwrap_err().to_string(); assert_eq!(err, "Json error: Truncated record whilst reading string"); - let mut decoder = TapeDecoder::new(16, 2); + let mut decoder = TapeDecoder::new(16, 2, None); decoder.decode(b"{\"hello\" : ").unwrap(); let err = decoder.finish().unwrap_err().to_string(); assert_eq!(err, "Json error: Truncated record whilst reading value"); - let mut decoder = TapeDecoder::new(16, 2); + let mut decoder = TapeDecoder::new(16, 2, None); decoder.decode(b"{\"hello\" : [").unwrap(); let err = decoder.finish().unwrap_err().to_string(); assert_eq!(err, "Json error: Truncated record whilst reading list"); - let mut decoder = TapeDecoder::new(16, 2); + let mut decoder = TapeDecoder::new(16, 2, None); decoder.decode(b"{\"hello\" : tru").unwrap(); let err = decoder.finish().unwrap_err().to_string(); assert_eq!(err, "Json error: Truncated record whilst reading true"); - let mut decoder = TapeDecoder::new(16, 2); + let mut decoder = TapeDecoder::new(16, 2, None); decoder.decode(b"{\"hello\" : nu").unwrap(); let err = decoder.finish().unwrap_err().to_string(); assert_eq!(err, "Json error: Truncated record whilst reading null"); // Test invalid UTF-8 - let mut decoder = TapeDecoder::new(16, 2); + let mut decoder = TapeDecoder::new(16, 2, None); decoder.decode(b"{\"hello\" : \"world\xFF\"}").unwrap(); let err = decoder.finish().unwrap_err().to_string(); assert_eq!(err, "Json error: Encountered non-UTF-8 data"); - let mut decoder = TapeDecoder::new(16, 2); + let mut decoder = TapeDecoder::new(16, 2, None); decoder.decode(b"{\"\xe2\" : \"\x96\xa1\"}").unwrap(); let err = decoder.finish().unwrap_err().to_string(); assert_eq!(err, "Json error: Encountered truncated UTF-8 sequence"); @@ -964,11 +1153,11 @@ mod tests { #[test] fn test_invalid_surrogates() { - let mut decoder = TapeDecoder::new(16, 2); + let mut decoder = TapeDecoder::new(16, 2, None); let res = decoder.decode(b"{\"test\": \"\\ud800\\ud801\"}"); assert!(res.is_err()); - let mut decoder = TapeDecoder::new(16, 2); + let mut decoder = TapeDecoder::new(16, 2, None); let res = decoder.decode(b"{\"test\": \"\\udc00\\udc01\"}"); assert!(res.is_err()); } From cd84ec971a0fb5d7f5636bca09038e5d2ea835bf Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Mon, 5 Jan 2026 19:18:16 +0200 Subject: [PATCH 02/11] feat: add benchmark --- arrow-json/benches/reader.rs | 122 +++++++++++++++++++++++++++++++++++ 1 file changed, 122 insertions(+) create mode 100644 arrow-json/benches/reader.rs diff --git a/arrow-json/benches/reader.rs b/arrow-json/benches/reader.rs new file mode 100644 index 000000000000..72512e492f9b --- /dev/null +++ b/arrow-json/benches/reader.rs @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_json::ReaderBuilder; +use arrow_schema::{DataType, Field, Schema}; +use criterion::{ + BenchmarkId, Criterion, SamplingMode, Throughput, criterion_group, criterion_main, +}; +use std::fmt::Write; +use std::hint::black_box; +use std::sync::Arc; + +// Projection benchmark constants +const WIDE_PROJECTION_ROWS: usize = 1 << 14; // 16K rows +const WIDE_PROJECTION_TOTAL_FIELDS: usize = 100; // 100 fields total, select only 3 + +fn bench_decode_schema( + c: &mut Criterion, + name: &str, + data: &[u8], + schema: Arc, + rows: usize, +) { + let mut group = c.benchmark_group(name); + group.throughput(Throughput::Bytes(data.len() as u64)); + group.sample_size(50); + group.measurement_time(std::time::Duration::from_secs(5)); + group.warm_up_time(std::time::Duration::from_secs(2)); + group.sampling_mode(SamplingMode::Flat); + group.bench_function(BenchmarkId::from_parameter(rows), |b| { + b.iter(|| { + let mut decoder = ReaderBuilder::new(schema.clone()) + .with_batch_size(rows) + .build_decoder() + .unwrap(); + + let mut offset = 0; + while offset < data.len() { + let read = decoder.decode(black_box(&data[offset..])).unwrap(); + if read == 0 { + break; + } + offset += read; + } + + let batch = decoder.flush().unwrap(); + black_box(batch); + }) + }); + group.finish(); +} + +fn build_wide_projection_json(rows: usize, total_fields: usize) -> Vec { + // Estimate: each field ~15 bytes ("fXX":VVVVVVV,), total ~15*100 + overhead + let per_row_size = total_fields * 15 + 10; + let mut data = String::with_capacity(rows * per_row_size); + + for _row in 0..rows { + data.push('{'); + for i in 0..total_fields { + if i > 0 { + data.push(','); + } + // Use fixed-width values for stable benchmarks: 7 digits + let _ = write!(data, "\"f{}\":{:07}", i, i); + } + data.push('}'); + data.push('\n'); + } + data.into_bytes() +} + +fn criterion_benchmark(c: &mut Criterion) { + // Wide projection workload: tests overhead of parsing unused fields + let wide_projection_data = + build_wide_projection_json(WIDE_PROJECTION_ROWS, WIDE_PROJECTION_TOTAL_FIELDS); + + // Full schema: all 100 fields + let mut full_fields = Vec::new(); + for i in 0..WIDE_PROJECTION_TOTAL_FIELDS { + full_fields.push(Field::new(format!("f{}", i), DataType::Int64, false)); + } + let full_schema = Arc::new(Schema::new(full_fields)); + bench_decode_schema( + c, + "decode_wide_projection_full_json", + &wide_projection_data, + full_schema, + WIDE_PROJECTION_ROWS, + ); + + // Projected schema: only 3 fields (f0, f10, f50) out of 100 + let projected_schema = Arc::new(Schema::new(vec![ + Field::new("f0", DataType::Int64, false), + Field::new("f10", DataType::Int64, false), + Field::new("f50", DataType::Int64, false), + ])); + bench_decode_schema( + c, + "decode_wide_projection_narrow_json", + &wide_projection_data, + projected_schema, + WIDE_PROJECTION_ROWS, + ); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); From 96f075681b3623ff8682dc5b1fb4439c5cf66949 Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Mon, 5 Jan 2026 19:33:56 +0200 Subject: [PATCH 03/11] chore --- arrow-json/src/reader/mod.rs | 8 ++++---- arrow-json/src/reader/tape.rs | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/arrow-json/src/reader/mod.rs b/arrow-json/src/reader/mod.rs index ef1da72bb916..f3feed4bbfd2 100644 --- a/arrow-json/src/reader/mod.rs +++ b/arrow-json/src/reader/mod.rs @@ -314,10 +314,10 @@ impl ReaderBuilder { let num_fields = self.schema.flattened_fields().len(); - // Extract projection: enable in non-strict mode to skip unknown fields - // In strict_mode, unknown fields cause errors, so projection skipping is not useful - // In non-strict mode, projection allows skipping fields not in the schema - // Performance overhead has been minimized via depth caching and short-circuit optimization + // Extract projection field set from schema for projection-aware parsing + // - strict_mode: Disabled (unknown fields cause errors anyway) + // - non-strict mode: Enabled to skip JSON fields not present in the schema + // Performance overhead minimized via depth caching and short-circuit optimization let projection = if self.strict_mode { None } else { diff --git a/arrow-json/src/reader/tape.rs b/arrow-json/src/reader/tape.rs index 1901a0b6674c..7876afc5836c 100644 --- a/arrow-json/src/reader/tape.rs +++ b/arrow-json/src/reader/tape.rs @@ -486,8 +486,8 @@ impl TapeDecoder { // Check projection: if the field is not in the projection set, // replace the Value state with SkipValue - // IMPORTANT: Only apply projection at the top level (when there's exactly 1 Object state) - // Short-circuit: Check depth first (cheaper than Option unwrap + HashSet lookup) + // NOTE: Only apply projection at the top level (nesting_depth == 1) + // This means direct fields of the root object, not nested objects if self.current_nesting_depth == 1 { if let Some(ref projection) = self.projection { // Get the field name from the last String element From bc704880473801d1cec395a803e6900313c246db Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Tue, 6 Jan 2026 11:58:34 +0200 Subject: [PATCH 04/11] fix: handle code review --- arrow-json/src/reader/mod.rs | 15 +-- arrow-json/src/reader/tape.rs | 205 ++++++++++++++++------------------ 2 files changed, 99 insertions(+), 121 deletions(-) diff --git a/arrow-json/src/reader/mod.rs b/arrow-json/src/reader/mod.rs index f3feed4bbfd2..2cf39cef83c8 100644 --- a/arrow-json/src/reader/mod.rs +++ b/arrow-json/src/reader/mod.rs @@ -318,18 +318,11 @@ impl ReaderBuilder { // - strict_mode: Disabled (unknown fields cause errors anyway) // - non-strict mode: Enabled to skip JSON fields not present in the schema // Performance overhead minimized via depth caching and short-circuit optimization - let projection = if self.strict_mode { - None - } else { - // Non-strict mode: always enable projection to skip unknown fields - match &data_type { - DataType::Struct(fields) if !fields.is_empty() => { - let field_names: HashSet = - fields.iter().map(|f| f.name().clone()).collect(); - Some(field_names) - } - _ => None, + let projection: Option> = match &data_type { + DataType::Struct(fields) if !fields.is_empty() && !self.strict_mode => { + Some(fields.iter().map(|f| f.name().clone()).collect()) } + _ => None, }; Ok(Decoder { diff --git a/arrow-json/src/reader/tape.rs b/arrow-json/src/reader/tape.rs index 7876afc5836c..e35b4be5c1bf 100644 --- a/arrow-json/src/reader/tape.rs +++ b/arrow-json/src/reader/tape.rs @@ -238,21 +238,20 @@ enum DecoderState { /// /// Consists of `(literal, decoded length)` Literal(Literal, u8), - /// Skipping a value (for unprojected fields) + /// Skipping a value (for unprojected fields), not inside a string /// - /// Consists of: - /// - `depth`: Nesting level of objects/arrays being skipped (u32) - /// - `flags`: Bit-packed flags (in_string: bit 0, escape: bit 1) - SkipValue { - depth: u32, - flags: u8, - }, + /// Contains the nesting depth of objects/arrays being skipped + SkipValue(u32), + /// Skipping inside a string literal (for unprojected fields) + /// + /// Contains the nesting depth of objects/arrays + SkipString(u32), + /// Skipping an escape sequence inside a string (for unprojected fields) + /// + /// Contains the nesting depth of objects/arrays + SkipEscape(u32), } -// Bit flags for SkipValue state -const SKIP_IN_STRING: u8 = 1 << 0; // 0x01 -const SKIP_ESCAPE: u8 = 1 << 1; // 0x02 - impl DecoderState { fn as_str(&self) -> &'static str { match self { @@ -265,7 +264,9 @@ impl DecoderState { DecoderState::Escape => "escape", DecoderState::Unicode(_, _, _) => "unicode literal", DecoderState::Literal(d, _) => d.as_str(), - DecoderState::SkipValue { .. } => "skip value", + DecoderState::SkipValue(_) => "skip value", + DecoderState::SkipString(_) => "skip string", + DecoderState::SkipEscape(_) => "skip escape", } } } @@ -484,46 +485,30 @@ impl TapeDecoder { b':' => { self.stack.pop(); - // Check projection: if the field is not in the projection set, - // replace the Value state with SkipValue - // NOTE: Only apply projection at the top level (nesting_depth == 1) - // This means direct fields of the root object, not nested objects - if self.current_nesting_depth == 1 { - if let Some(ref projection) = self.projection { - // Get the field name from the last String element - if let Some(TapeElement::String(string_idx)) = - self.elements.last() - { - let string_idx = *string_idx as usize; - let start = self.offsets[string_idx]; - let end = self.offsets[string_idx + 1]; - let field_name = std::str::from_utf8( - &self.bytes[start..end], - ) - .map_err(|e| { - ArrowError::JsonError(format!( - "Invalid UTF-8 in field name: {}", - e - )) - })?; - - if !projection.contains(field_name) { - // Field not in projection: skip its value - // CRITICAL: Remove the field name from tape to maintain structure - // The tape must have paired field_name:value entries - self.elements.pop(); // Remove the String element - self.bytes.truncate(start); // Remove the field name bytes - self.offsets.pop(); // Remove the offset entry - - // Replace Value state with SkipValue - if let Some(last) = self.stack.last_mut() { - *last = DecoderState::SkipValue { - depth: 0, - flags: 0, // Both in_string and escape are false - }; - } - } - } + // Check projection at top level only (nesting_depth == 1) + if self.current_nesting_depth == 1 + && let Some(ref projection) = self.projection + && let Some(TapeElement::String(string_idx)) = self.elements.last() + { + let string_idx = *string_idx as usize; + let start = self.offsets[string_idx]; + let end = self.offsets[string_idx + 1]; + let field_name = std::str::from_utf8(&self.bytes[start..end]) + .map_err(|e| { + ArrowError::JsonError(format!( + "Invalid UTF-8 in field name: {e}" + )) + })?; + + if !projection.contains(field_name) { + // Field not in projection: skip its value + // Remove field name from tape (must have paired field_name:value) + self.elements.pop(); + self.bytes.truncate(start); + self.offsets.pop(); + + // Replace Value state with SkipValue + *self.stack.last_mut().unwrap() = DecoderState::SkipValue(0); } } } @@ -596,57 +581,20 @@ impl TapeDecoder { } *idx += 1; }, - // Skip a value for unprojected fields (optimized batch-processing version) - DecoderState::SkipValue { depth, flags } => { - loop { - if iter.is_empty() { - break; // Need more data, preserve state - } - - let in_string = (*flags & SKIP_IN_STRING) != 0; - let escape = (*flags & SKIP_ESCAPE) != 0; - - if in_string { - // Fast skip to next \ or " using SIMD - let _ = iter.skip_chrs(b'\\', b'"'); - - if iter.is_empty() { - break; - } - - let b = next!(iter); - match b { - b'\\' => *flags ^= SKIP_ESCAPE, // Toggle escape flag - b'"' if !escape => { - *flags &= !SKIP_IN_STRING; // Clear in_string flag - if *depth == 0 { - // String value ended at top level - check completion - iter.skip_whitespace(); - if let Some(next_b) = iter.peek() { - if matches!(next_b, b',' | b'}' | b']') { - self.stack.pop(); - break; - } - } - if iter.is_empty() { - break; // Need more data - } - } - } - _ if escape => *flags &= !SKIP_ESCAPE, // Clear escape flag - _ => {} - } - } else if *depth > 0 { + // Skip a value (not inside a string) + DecoderState::SkipValue(depth) => { + while !iter.is_empty() { + if *depth > 0 { // Inside nested structure - fast skip to next structural character - let _ = iter - .advance_until(|b| matches!(b, b'"' | b'{' | b'[' | b'}' | b']')); - + iter.advance_until(|b| matches!(b, b'"' | b'{' | b'[' | b'}' | b']')); if iter.is_empty() { break; } - match next!(iter) { - b'"' => *flags |= SKIP_IN_STRING, // Set in_string flag + b'"' => { + *state = DecoderState::SkipString(*depth); + break; + } b'{' | b'[' => *depth += 1, b'}' | b']' => { *depth -= 1; @@ -659,7 +607,7 @@ impl TapeDecoder { } } else { // depth == 0: Skip simple value (number/literal/start of compound) - let _ = iter.advance_until(|b| { + iter.advance_until(|b| { matches!( b, b',' | b'}' @@ -673,23 +621,22 @@ impl TapeDecoder { | b'[' ) }); - if iter.is_empty() { break; } - match iter.peek() { - Some(b',') | Some(b'}') | Some(b']') => { + Some(b',' | b'}' | b']') => { self.stack.pop(); break; } Some(b' ' | b'\n' | b'\r' | b'\t') => { iter.skip_whitespace(); - if let Some(next_b) = iter.peek() { - if matches!(next_b, b',' | b'}' | b']') { - self.stack.pop(); - break; - } + if iter + .peek() + .map_or(false, |b| matches!(b, b',' | b'}' | b']')) + { + self.stack.pop(); + break; } if iter.is_empty() { break; @@ -697,17 +644,55 @@ impl TapeDecoder { } Some(b'"') => { next!(iter); - *flags |= SKIP_IN_STRING; // Set in_string flag + *state = DecoderState::SkipString(0); + break; } Some(b'{' | b'[') => { next!(iter); - *depth += 1; + *depth = 1; } _ => {} } } } } + // Skip inside a string literal + DecoderState::SkipString(depth) => { + iter.skip_chrs(b'\\', b'"'); + if iter.is_empty() { + break; + } + match next!(iter) { + b'\\' => *state = DecoderState::SkipEscape(*depth), + b'"' => { + if *depth == 0 { + // String value ended at top level - check completion + iter.skip_whitespace(); + if iter + .peek() + .map_or(false, |b| matches!(b, b',' | b'}' | b']')) + { + self.stack.pop(); + } else if iter.is_empty() { + // Need more data, stay in a "finished string but not yet popped" state + // For simplicity, transition to SkipValue(0) and let it handle + *state = DecoderState::SkipValue(0); + } + } else { + *state = DecoderState::SkipValue(*depth); + } + } + _ => unreachable!(), + } + } + // Skip an escape sequence inside a string + DecoderState::SkipEscape(depth) => { + if iter.is_empty() { + break; + } + next!(iter); // consume escaped character + *state = DecoderState::SkipString(*depth); + } } } From 8bac0196cac566cc321fa7180456993a7d0a6176 Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Tue, 6 Jan 2026 12:02:17 +0200 Subject: [PATCH 05/11] fix: clippy --- arrow-json/src/reader/tape.rs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/arrow-json/src/reader/tape.rs b/arrow-json/src/reader/tape.rs index e35b4be5c1bf..18de23c66c15 100644 --- a/arrow-json/src/reader/tape.rs +++ b/arrow-json/src/reader/tape.rs @@ -631,9 +631,7 @@ impl TapeDecoder { } Some(b' ' | b'\n' | b'\r' | b'\t') => { iter.skip_whitespace(); - if iter - .peek() - .map_or(false, |b| matches!(b, b',' | b'}' | b']')) + if iter.peek().is_some_and(|b| matches!(b, b',' | b'}' | b']')) { self.stack.pop(); break; @@ -668,10 +666,7 @@ impl TapeDecoder { if *depth == 0 { // String value ended at top level - check completion iter.skip_whitespace(); - if iter - .peek() - .map_or(false, |b| matches!(b, b',' | b'}' | b']')) - { + if iter.peek().is_some_and(|b| matches!(b, b',' | b'}' | b']')) { self.stack.pop(); } else if iter.is_empty() { // Need more data, stay in a "finished string but not yet popped" state From df3ac61ae9a9e202c76340f5afb779920a8ca679 Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Tue, 6 Jan 2026 13:26:18 +0200 Subject: [PATCH 06/11] feat: add projection info method to TapeDecoder for top-level checks --- arrow-json/src/reader/tape.rs | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/arrow-json/src/reader/tape.rs b/arrow-json/src/reader/tape.rs index 18de23c66c15..dd521cfb0c3e 100644 --- a/arrow-json/src/reader/tape.rs +++ b/arrow-json/src/reader/tape.rs @@ -342,6 +342,19 @@ pub struct TapeDecoder { } impl TapeDecoder { + /// Returns projection info if we should check field projection. + /// Only applies at top level (nesting_depth == 1) with a projection set. + fn projection_info(&self) -> Option<(usize, &HashSet)> { + if self.current_nesting_depth != 1 { + return None; + } + let projection = self.projection.as_ref()?; + let TapeElement::String(string_idx) = *self.elements.last()? else { + return None; + }; + Some((string_idx as usize, projection)) + } + /// Create a new [`TapeDecoder`] with the provided batch size /// and an estimated number of fields in each row /// @@ -485,12 +498,8 @@ impl TapeDecoder { b':' => { self.stack.pop(); - // Check projection at top level only (nesting_depth == 1) - if self.current_nesting_depth == 1 - && let Some(ref projection) = self.projection - && let Some(TapeElement::String(string_idx)) = self.elements.last() - { - let string_idx = *string_idx as usize; + // Check projection at top level only + if let Some((string_idx, projection)) = self.projection_info() { let start = self.offsets[string_idx]; let end = self.offsets[string_idx + 1]; let field_name = std::str::from_utf8(&self.bytes[start..end]) From f5502d93c800a9c44f2f2a79d83b1f7f73174378 Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Tue, 6 Jan 2026 22:15:32 +0200 Subject: [PATCH 07/11] feat: enhance TapeDecoder to support strict mode for projection field validation --- arrow-json/src/reader/mod.rs | 30 ++++++++++++---------- arrow-json/src/reader/tape.rs | 48 ++++++++++++++++++++++------------- 2 files changed, 48 insertions(+), 30 deletions(-) diff --git a/arrow-json/src/reader/mod.rs b/arrow-json/src/reader/mod.rs index 2cf39cef83c8..25fcb5df02a1 100644 --- a/arrow-json/src/reader/mod.rs +++ b/arrow-json/src/reader/mod.rs @@ -304,31 +304,35 @@ impl ReaderBuilder { } }; - let decoder = make_decoder( - data_type.clone(), - self.coerce_primitive, - self.strict_mode, - nullable, - self.struct_mode, - )?; - let num_fields = self.schema.flattened_fields().len(); // Extract projection field set from schema for projection-aware parsing - // - strict_mode: Disabled (unknown fields cause errors anyway) - // - non-strict mode: Enabled to skip JSON fields not present in the schema - // Performance overhead minimized via depth caching and short-circuit optimization + // - strict_mode: fail-fast on unknown fields during tape parsing + // - non-strict mode: skip JSON fields not present in the schema let projection: Option> = match &data_type { - DataType::Struct(fields) if !fields.is_empty() && !self.strict_mode => { + DataType::Struct(fields) if !fields.is_empty() => { Some(fields.iter().map(|f| f.name().clone()).collect()) } _ => None, }; + let decoder = make_decoder( + data_type, + self.coerce_primitive, + self.strict_mode, + nullable, + self.struct_mode, + )?; + Ok(Decoder { decoder, is_field: self.is_field, - tape_decoder: TapeDecoder::new(self.batch_size, num_fields, projection), + tape_decoder: TapeDecoder::new( + self.batch_size, + num_fields, + projection, + self.strict_mode, + ), batch_size: self.batch_size, schema: self.schema, }) diff --git a/arrow-json/src/reader/tape.rs b/arrow-json/src/reader/tape.rs index dd521cfb0c3e..3210328c81be 100644 --- a/arrow-json/src/reader/tape.rs +++ b/arrow-json/src/reader/tape.rs @@ -336,6 +336,9 @@ pub struct TapeDecoder { /// If None, all fields are parsed. If Some, only fields in the set are parsed. projection: Option>, + /// If true, return error when encountering fields not in projection + strict_mode: bool, + /// Cache current nesting depth to avoid O(depth) stack traversal on every field /// Incremented when entering Object/List, decremented when exiting current_nesting_depth: usize, @@ -359,8 +362,13 @@ impl TapeDecoder { /// and an estimated number of fields in each row /// /// If `projection` is Some, only fields in the set will be parsed and written to the tape. - /// Other fields will be skipped during parsing. - pub fn new(batch_size: usize, num_fields: usize, projection: Option>) -> Self { + /// Other fields will be skipped during parsing (or rejected if `strict_mode` is true). + pub fn new( + batch_size: usize, + num_fields: usize, + projection: Option>, + strict_mode: bool, + ) -> Self { let tokens_per_row = 2 + num_fields * 2; let mut offsets = Vec::with_capacity(batch_size * (num_fields * 2) + 1); offsets.push(0); @@ -376,6 +384,7 @@ impl TapeDecoder { bytes: Vec::with_capacity(num_fields * 2 * 8), stack: Vec::with_capacity(10), projection, + strict_mode, current_nesting_depth: 0, } } @@ -510,6 +519,11 @@ impl TapeDecoder { })?; if !projection.contains(field_name) { + if self.strict_mode { + return Err(ArrowError::JsonError(format!( + "column '{field_name}' missing from schema" + ))); + } // Field not in projection: skip its value // Remove field name from tape (must have paired field_name:value) self.elements.pop(); @@ -945,7 +959,7 @@ mod tests { {"a": ["", "foo", ["bar", "c"]], "b": {"1": []}, "c": {"2": [1, 2, 3]} } "#; - let mut decoder = TapeDecoder::new(16, 2, None); + let mut decoder = TapeDecoder::new(16, 2, None, false); decoder.decode(a.as_bytes()).unwrap(); assert!(!decoder.has_partial_row()); assert_eq!(decoder.num_buffered_rows(), 7); @@ -1055,21 +1069,21 @@ mod tests { #[test] fn test_invalid() { // Test invalid - let mut decoder = TapeDecoder::new(16, 2, None); + let mut decoder = TapeDecoder::new(16, 2, None, false); let err = decoder.decode(b"hello").unwrap_err().to_string(); assert_eq!( err, "Json error: Encountered unexpected 'h' whilst parsing value" ); - let mut decoder = TapeDecoder::new(16, 2, None); + let mut decoder = TapeDecoder::new(16, 2, None, false); let err = decoder.decode(b"{\"hello\": }").unwrap_err().to_string(); assert_eq!( err, "Json error: Encountered unexpected '}' whilst parsing value" ); - let mut decoder = TapeDecoder::new(16, 2, None); + let mut decoder = TapeDecoder::new(16, 2, None, false); let err = decoder .decode(b"{\"hello\": [ false, tru ]}") .unwrap_err() @@ -1079,7 +1093,7 @@ mod tests { "Json error: Encountered unexpected ' ' whilst parsing literal" ); - let mut decoder = TapeDecoder::new(16, 2, None); + let mut decoder = TapeDecoder::new(16, 2, None, false); let err = decoder .decode(b"{\"hello\": \"\\ud8\"}") .unwrap_err() @@ -1090,7 +1104,7 @@ mod tests { ); // Missing surrogate pair - let mut decoder = TapeDecoder::new(16, 2, None); + let mut decoder = TapeDecoder::new(16, 2, None, false); let err = decoder .decode(b"{\"hello\": \"\\ud83d\"}") .unwrap_err() @@ -1101,40 +1115,40 @@ mod tests { ); // Test truncation - let mut decoder = TapeDecoder::new(16, 2, None); + let mut decoder = TapeDecoder::new(16, 2, None, false); decoder.decode(b"{\"he").unwrap(); assert!(decoder.has_partial_row()); assert_eq!(decoder.num_buffered_rows(), 1); let err = decoder.finish().unwrap_err().to_string(); assert_eq!(err, "Json error: Truncated record whilst reading string"); - let mut decoder = TapeDecoder::new(16, 2, None); + let mut decoder = TapeDecoder::new(16, 2, None, false); decoder.decode(b"{\"hello\" : ").unwrap(); let err = decoder.finish().unwrap_err().to_string(); assert_eq!(err, "Json error: Truncated record whilst reading value"); - let mut decoder = TapeDecoder::new(16, 2, None); + let mut decoder = TapeDecoder::new(16, 2, None, false); decoder.decode(b"{\"hello\" : [").unwrap(); let err = decoder.finish().unwrap_err().to_string(); assert_eq!(err, "Json error: Truncated record whilst reading list"); - let mut decoder = TapeDecoder::new(16, 2, None); + let mut decoder = TapeDecoder::new(16, 2, None, false); decoder.decode(b"{\"hello\" : tru").unwrap(); let err = decoder.finish().unwrap_err().to_string(); assert_eq!(err, "Json error: Truncated record whilst reading true"); - let mut decoder = TapeDecoder::new(16, 2, None); + let mut decoder = TapeDecoder::new(16, 2, None, false); decoder.decode(b"{\"hello\" : nu").unwrap(); let err = decoder.finish().unwrap_err().to_string(); assert_eq!(err, "Json error: Truncated record whilst reading null"); // Test invalid UTF-8 - let mut decoder = TapeDecoder::new(16, 2, None); + let mut decoder = TapeDecoder::new(16, 2, None, false); decoder.decode(b"{\"hello\" : \"world\xFF\"}").unwrap(); let err = decoder.finish().unwrap_err().to_string(); assert_eq!(err, "Json error: Encountered non-UTF-8 data"); - let mut decoder = TapeDecoder::new(16, 2, None); + let mut decoder = TapeDecoder::new(16, 2, None, false); decoder.decode(b"{\"\xe2\" : \"\x96\xa1\"}").unwrap(); let err = decoder.finish().unwrap_err().to_string(); assert_eq!(err, "Json error: Encountered truncated UTF-8 sequence"); @@ -1142,11 +1156,11 @@ mod tests { #[test] fn test_invalid_surrogates() { - let mut decoder = TapeDecoder::new(16, 2, None); + let mut decoder = TapeDecoder::new(16, 2, None, false); let res = decoder.decode(b"{\"test\": \"\\ud800\\ud801\"}"); assert!(res.is_err()); - let mut decoder = TapeDecoder::new(16, 2, None); + let mut decoder = TapeDecoder::new(16, 2, None, false); let res = decoder.decode(b"{\"test\": \"\\udc00\\udc01\"}"); assert!(res.is_err()); } From 881a19f117d777a908caf29690b4c3d6a9517baa Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Tue, 6 Jan 2026 22:50:43 +0200 Subject: [PATCH 08/11] feat: add projection support to ReaderBuilder for schema-aware parsing --- arrow-json/src/reader/mod.rs | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/arrow-json/src/reader/mod.rs b/arrow-json/src/reader/mod.rs index 25fcb5df02a1..364983e28574 100644 --- a/arrow-json/src/reader/mod.rs +++ b/arrow-json/src/reader/mod.rs @@ -183,6 +183,7 @@ pub struct ReaderBuilder { batch_size: usize, coerce_primitive: bool, strict_mode: bool, + projection: bool, is_field: bool, struct_mode: StructMode, @@ -203,6 +204,7 @@ impl ReaderBuilder { batch_size: 1024, coerce_primitive: false, strict_mode: false, + projection: false, is_field: false, struct_mode: Default::default(), schema, @@ -244,6 +246,7 @@ impl ReaderBuilder { batch_size: 1024, coerce_primitive: false, strict_mode: false, + projection: false, is_field: true, struct_mode: Default::default(), schema: Arc::new(Schema::new([field.into()])), @@ -276,6 +279,12 @@ impl ReaderBuilder { } } + /// Enables projection-aware parsing to skip fields not present in the schema. + /// This is ignored when `strict_mode` is true, which always checks projection. + pub fn with_projection(self, projection: bool) -> Self { + Self { projection, ..self } + } + /// Set the [`StructMode`] for the reader, which determines whether structs /// can be decoded from JSON as objects or lists. For more details refer to /// the enum documentation. Default is to use `ObjectOnly`. @@ -308,9 +317,10 @@ impl ReaderBuilder { // Extract projection field set from schema for projection-aware parsing // - strict_mode: fail-fast on unknown fields during tape parsing - // - non-strict mode: skip JSON fields not present in the schema + // - projection: skip JSON fields not present in the schema + let enable_projection = self.strict_mode || self.projection; let projection: Option> = match &data_type { - DataType::Struct(fields) if !fields.is_empty() => { + DataType::Struct(fields) if enable_projection && !fields.is_empty() => { Some(fields.iter().map(|f| f.name().clone()).collect()) } _ => None, From 4275af5c6765709b79ce2826f442a114cabfb536 Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Tue, 6 Jan 2026 22:51:03 +0200 Subject: [PATCH 09/11] chore --- arrow-json/benches/reader.rs | 4 ++++ arrow-json/src/reader/tape.rs | 24 ++++++++++++++---------- 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/arrow-json/benches/reader.rs b/arrow-json/benches/reader.rs index 72512e492f9b..f7e9bce8c442 100644 --- a/arrow-json/benches/reader.rs +++ b/arrow-json/benches/reader.rs @@ -34,6 +34,7 @@ fn bench_decode_schema( data: &[u8], schema: Arc, rows: usize, + projection: bool, ) { let mut group = c.benchmark_group(name); group.throughput(Throughput::Bytes(data.len() as u64)); @@ -45,6 +46,7 @@ fn bench_decode_schema( b.iter(|| { let mut decoder = ReaderBuilder::new(schema.clone()) .with_batch_size(rows) + .with_projection(projection) .build_decoder() .unwrap(); @@ -101,6 +103,7 @@ fn criterion_benchmark(c: &mut Criterion) { &wide_projection_data, full_schema, WIDE_PROJECTION_ROWS, + false, ); // Projected schema: only 3 fields (f0, f10, f50) out of 100 @@ -115,6 +118,7 @@ fn criterion_benchmark(c: &mut Criterion) { &wide_projection_data, projected_schema, WIDE_PROJECTION_ROWS, + true, ); } diff --git a/arrow-json/src/reader/tape.rs b/arrow-json/src/reader/tape.rs index 3210328c81be..86c8152de441 100644 --- a/arrow-json/src/reader/tape.rs +++ b/arrow-json/src/reader/tape.rs @@ -518,20 +518,24 @@ impl TapeDecoder { )) })?; - if !projection.contains(field_name) { - if self.strict_mode { + match (projection.contains(field_name), self.strict_mode) { + (true, _) => {} + (false, true) => { return Err(ArrowError::JsonError(format!( "column '{field_name}' missing from schema" ))); } - // Field not in projection: skip its value - // Remove field name from tape (must have paired field_name:value) - self.elements.pop(); - self.bytes.truncate(start); - self.offsets.pop(); - - // Replace Value state with SkipValue - *self.stack.last_mut().unwrap() = DecoderState::SkipValue(0); + (false, false) => { + // Field not in projection: skip its value + // Remove field name from tape (must have paired field_name:value) + self.elements.pop(); + self.bytes.truncate(start); + self.offsets.pop(); + + // Replace Value state with SkipValue + *self.stack.last_mut().unwrap() = + DecoderState::SkipValue(0); + } } } } From d0da5a5fedb86665f8fa6fb1f9ffe0636a804f33 Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Tue, 6 Jan 2026 23:25:24 +0200 Subject: [PATCH 10/11] feat: add test for projection skipping unknown fields in JSON reader --- arrow-json/src/reader/mod.rs | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/arrow-json/src/reader/mod.rs b/arrow-json/src/reader/mod.rs index 364983e28574..aeab478b5205 100644 --- a/arrow-json/src/reader/mod.rs +++ b/arrow-json/src/reader/mod.rs @@ -1809,6 +1809,39 @@ mod tests { ); } + #[test] + fn test_projection_skip_unknown_fields() { + // JSON has fields a, b, c but schema only has a, c + let buf = r#" + {"a": 1, "b": "ignored", "c": true} + {"a": 2, "b": "also ignored", "c": false} + "#; + + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("c", DataType::Boolean, true), + ])); + + // with_projection(true): skip unknown field "b" and succeed + let batch = ReaderBuilder::new(schema) + .with_projection(true) + .build(Cursor::new(buf.as_bytes())) + .unwrap() + .read() + .unwrap() + .unwrap(); + + assert_eq!(batch.num_rows(), 2); + assert_eq!(batch.num_columns(), 2); + + let a = batch.column(0).as_primitive::(); + assert_eq!(a.values(), &[1, 2]); + + let c = batch.column(1).as_boolean(); + assert!(c.value(0)); + assert!(!c.value(1)); + } + fn read_file(path: &str, schema: Option) -> Reader> { let file = File::open(path).unwrap(); let mut reader = BufReader::new(file); From 130c787e469ea7fc8f3ba0d887ab09bb41bec499 Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Mon, 19 Jan 2026 13:21:41 +0200 Subject: [PATCH 11/11] feat: add projection support to schema decoding benchmark --- arrow-json/benches/json-reader.rs | 11 ++- arrow-json/benches/reader.rs | 126 ------------------------------ 2 files changed, 10 insertions(+), 127 deletions(-) delete mode 100644 arrow-json/benches/reader.rs diff --git a/arrow-json/benches/json-reader.rs b/arrow-json/benches/json-reader.rs index 504839f8ffe2..d02790ad8428 100644 --- a/arrow-json/benches/json-reader.rs +++ b/arrow-json/benches/json-reader.rs @@ -179,7 +179,13 @@ fn bench_binary_hex(c: &mut Criterion) { bench_decode_binary(c, "decode_binary_view_hex_json", &binary_data, view_field); } -fn bench_decode_schema(c: &mut Criterion, name: &str, data: &[u8], schema: Arc) { +fn bench_decode_schema( + c: &mut Criterion, + name: &str, + data: &[u8], + schema: Arc, + projection: bool, +) { let mut group = c.benchmark_group(name); group.throughput(Throughput::Bytes(data.len() as u64)); group.sample_size(50); @@ -190,6 +196,7 @@ fn bench_decode_schema(c: &mut Criterion, name: &str, data: &[u8], schema: Arc, - rows: usize, - projection: bool, -) { - let mut group = c.benchmark_group(name); - group.throughput(Throughput::Bytes(data.len() as u64)); - group.sample_size(50); - group.measurement_time(std::time::Duration::from_secs(5)); - group.warm_up_time(std::time::Duration::from_secs(2)); - group.sampling_mode(SamplingMode::Flat); - group.bench_function(BenchmarkId::from_parameter(rows), |b| { - b.iter(|| { - let mut decoder = ReaderBuilder::new(schema.clone()) - .with_batch_size(rows) - .with_projection(projection) - .build_decoder() - .unwrap(); - - let mut offset = 0; - while offset < data.len() { - let read = decoder.decode(black_box(&data[offset..])).unwrap(); - if read == 0 { - break; - } - offset += read; - } - - let batch = decoder.flush().unwrap(); - black_box(batch); - }) - }); - group.finish(); -} - -fn build_wide_projection_json(rows: usize, total_fields: usize) -> Vec { - // Estimate: each field ~15 bytes ("fXX":VVVVVVV,), total ~15*100 + overhead - let per_row_size = total_fields * 15 + 10; - let mut data = String::with_capacity(rows * per_row_size); - - for _row in 0..rows { - data.push('{'); - for i in 0..total_fields { - if i > 0 { - data.push(','); - } - // Use fixed-width values for stable benchmarks: 7 digits - let _ = write!(data, "\"f{}\":{:07}", i, i); - } - data.push('}'); - data.push('\n'); - } - data.into_bytes() -} - -fn criterion_benchmark(c: &mut Criterion) { - // Wide projection workload: tests overhead of parsing unused fields - let wide_projection_data = - build_wide_projection_json(WIDE_PROJECTION_ROWS, WIDE_PROJECTION_TOTAL_FIELDS); - - // Full schema: all 100 fields - let mut full_fields = Vec::new(); - for i in 0..WIDE_PROJECTION_TOTAL_FIELDS { - full_fields.push(Field::new(format!("f{}", i), DataType::Int64, false)); - } - let full_schema = Arc::new(Schema::new(full_fields)); - bench_decode_schema( - c, - "decode_wide_projection_full_json", - &wide_projection_data, - full_schema, - WIDE_PROJECTION_ROWS, - false, - ); - - // Projected schema: only 3 fields (f0, f10, f50) out of 100 - let projected_schema = Arc::new(Schema::new(vec![ - Field::new("f0", DataType::Int64, false), - Field::new("f10", DataType::Int64, false), - Field::new("f50", DataType::Int64, false), - ])); - bench_decode_schema( - c, - "decode_wide_projection_narrow_json", - &wide_projection_data, - projected_schema, - WIDE_PROJECTION_ROWS, - true, - ); -} - -criterion_group!(benches, criterion_benchmark); -criterion_main!(benches);