diff --git a/arrow-json/src/reader/binary_array.rs b/arrow-json/src/reader/binary_array.rs index 80df19d0172e..8e862223ef9e 100644 --- a/arrow-json/src/reader/binary_array.rs +++ b/arrow-json/src/reader/binary_array.rs @@ -15,17 +15,21 @@ // specific language governing permissions and limitations // under the License. +use std::marker::PhantomData; +use std::sync::Arc; + use crate::reader::tape::{Tape, TapeElement}; +use crate::reader::validation::{ErrorMarker, FailureKind}; use crate::reader::ArrayDecoder; use arrow_array::builder::GenericBinaryBuilder; -use arrow_array::{Array, GenericStringArray, OffsetSizeTrait}; +use arrow_array::{Array, GenericBinaryArray, GenericStringArray, OffsetSizeTrait}; use arrow_data::ArrayData; -use arrow_schema::ArrowError; +use arrow_schema::{ArrowError, DataType}; use base64::prelude::BASE64_STANDARD; use base64::Engine; -use std::marker::PhantomData; pub struct BinaryArrayDecoder { + data_type: Arc, is_nullable: bool, phantom: PhantomData, } @@ -33,6 +37,7 @@ pub struct BinaryArrayDecoder { impl BinaryArrayDecoder { pub fn new(is_nullable: bool) -> Self { Self { + data_type: Arc::new(GenericBinaryArray::::DATA_TYPE), is_nullable, phantom: Default::default(), } @@ -80,11 +85,28 @@ impl ArrayDecoder for BinaryArrayDecoder { Ok(builder.finish().into_data()) } - fn validate_row(&self, tape: &Tape<'_>, pos: u32) -> bool { - match tape.get(pos) { - TapeElement::String(p) => BASE64_STANDARD.decode(&tape.get_string(p)).is_ok(), - TapeElement::Null => self.is_nullable, - _ => false, - } + fn validate_row<'tape>( + &'tape self, + tape: &'tape Tape<'_>, + pos: u32, + row_idx: usize, + ) -> Result<(), Vec>> { + let failure = match tape.get(pos) { + TapeElement::String(p) => { + if BASE64_STANDARD.decode(tape.get_string(p)).is_ok() { + return Ok(()); + } + FailureKind::ParseFailure + } + TapeElement::Null => { + if self.is_nullable { + return Ok(()); + } + FailureKind::NullValue + } + _ => FailureKind::TypeMismatch, + }; + + ErrorMarker::err(row_idx, pos, failure, Arc::clone(&self.data_type)) } } diff --git a/arrow-json/src/reader/boolean_array.rs b/arrow-json/src/reader/boolean_array.rs index dc3748682b20..1ba2586507f3 100644 --- a/arrow-json/src/reader/boolean_array.rs +++ b/arrow-json/src/reader/boolean_array.rs @@ -15,21 +15,28 @@ // specific language governing permissions and limitations // under the License. +use std::sync::Arc; + use arrow_array::builder::BooleanBuilder; use arrow_array::Array; use arrow_data::ArrayData; -use arrow_schema::ArrowError; +use arrow_schema::{ArrowError, DataType}; use crate::reader::tape::{Tape, TapeElement}; +use crate::reader::validation::{ErrorMarker, FailureKind}; use crate::reader::ArrayDecoder; pub struct BooleanArrayDecoder { + data_type: Arc, is_nullable: bool, } impl BooleanArrayDecoder { pub fn new(is_nullable: bool) -> Self { - Self { is_nullable } + Self { + data_type: Arc::new(DataType::Boolean), + is_nullable, + } } } @@ -48,11 +55,23 @@ impl ArrayDecoder for BooleanArrayDecoder { Ok(builder.finish().into_data()) } - fn validate_row(&self, tape: &Tape<'_>, pos: u32) -> bool { - match tape.get(pos) { - TapeElement::Null => self.is_nullable, - TapeElement::True | TapeElement::False => true, - _ => false, - } + fn validate_row<'tape>( + &'tape self, + tape: &'tape Tape<'_>, + pos: u32, + row_idx: usize, + ) -> Result<(), Vec>> { + let failure = match tape.get(pos) { + TapeElement::True | TapeElement::False => return Ok(()), + TapeElement::Null => { + if self.is_nullable { + return Ok(()); + } + FailureKind::NullValue + } + _ => FailureKind::TypeMismatch, + }; + + ErrorMarker::err(row_idx, pos, failure, Arc::clone(&self.data_type)) } } diff --git a/arrow-json/src/reader/decimal_array.rs b/arrow-json/src/reader/decimal_array.rs index 078f079d7d2d..1364e88acae3 100644 --- a/arrow-json/src/reader/decimal_array.rs +++ b/arrow-json/src/reader/decimal_array.rs @@ -16,18 +16,21 @@ // under the License. use std::marker::PhantomData; +use std::sync::Arc; use arrow_array::builder::PrimitiveBuilder; use arrow_array::types::DecimalType; use arrow_array::Array; use arrow_cast::parse::parse_decimal; use arrow_data::ArrayData; -use arrow_schema::ArrowError; +use arrow_schema::{ArrowError, DataType}; use crate::reader::tape::{Tape, TapeElement}; +use crate::reader::validation::{ErrorMarker, FailureKind}; use crate::reader::ArrayDecoder; pub struct DecimalArrayDecoder { + data_type: Arc, precision: u8, scale: i8, is_nullable: bool, @@ -38,6 +41,7 @@ pub struct DecimalArrayDecoder { impl DecimalArrayDecoder { pub fn new(precision: u8, scale: i8, is_nullable: bool) -> Self { Self { + data_type: Arc::new(D::TYPE_CONSTRUCTOR(precision, scale)), precision, scale, is_nullable, @@ -102,18 +106,36 @@ where .into_data()) } - fn validate_row(&self, tape: &Tape<'_>, pos: u32) -> bool { - match tape.get(pos) { - TapeElement::Null => self.is_nullable, + fn validate_row<'tape>( + &'tape self, + tape: &'tape Tape<'_>, + pos: u32, + row_idx: usize, + ) -> Result<(), Vec>> { + let failure = match tape.get(pos) { + TapeElement::Null => { + if self.is_nullable { + return Ok(()); + } + FailureKind::NullValue + } TapeElement::String(idx) => { let s = tape.get_string(idx); - parse_decimal::(s, self.precision, self.scale).is_ok() + if parse_decimal::(s, self.precision, self.scale).is_ok() { + return Ok(()); + } + FailureKind::ParseFailure } TapeElement::Number(idx) => { let s = tape.get_string(idx); - parse_decimal::(s, self.precision, self.scale).is_ok() + if parse_decimal::(s, self.precision, self.scale).is_ok() { + return Ok(()); + } + FailureKind::ParseFailure } - _ => false, - } + _ => FailureKind::TypeMismatch, + }; + + ErrorMarker::err(row_idx, pos, failure, Arc::clone(&self.data_type)) } } diff --git a/arrow-json/src/reader/json_array.rs b/arrow-json/src/reader/json_array.rs index b6dcf7867036..f41a77aeae44 100644 --- a/arrow-json/src/reader/json_array.rs +++ b/arrow-json/src/reader/json_array.rs @@ -106,7 +106,12 @@ impl ArrayDecoder for JsonArrayDecoder { Ok(builder.finish().into_data()) } - fn validate_row(&self, _: &Tape<'_>, _: u32) -> bool { - true + fn validate_row<'tape>( + &'tape self, + _tape: &'tape Tape<'_>, + _pos: u32, + _row_idx: usize, + ) -> Result<(), Vec>> { + Ok(()) } } diff --git a/arrow-json/src/reader/list_array.rs b/arrow-json/src/reader/list_array.rs index 58d999ff480f..03cc2aa6e958 100644 --- a/arrow-json/src/reader/list_array.rs +++ b/arrow-json/src/reader/list_array.rs @@ -15,7 +15,10 @@ // specific language governing permissions and limitations // under the License. +use std::sync::Arc; + use crate::reader::tape::{Tape, TapeElement}; +use crate::reader::validation::{ErrorMarker, FailureKind}; use crate::reader::{make_decoder, ArrayDecoder}; use crate::StructMode; use arrow_array::builder::{BooleanBufferBuilder, BufferBuilder}; @@ -26,7 +29,7 @@ use arrow_schema::{ArrowError, DataType}; use std::marker::PhantomData; pub struct ListArrayDecoder { - data_type: DataType, + data_type: Arc, decoder: Box, phantom: PhantomData, is_nullable: bool, @@ -55,7 +58,7 @@ impl ListArrayDecoder { )?; Ok(Self { - data_type, + data_type: Arc::new(data_type), decoder, phantom: Default::default(), is_nullable, @@ -104,7 +107,7 @@ impl ArrayDecoder for ListArrayDecoder { let child_data = self.decoder.decode(tape, &child_pos)?; let nulls = nulls.as_mut().map(|x| NullBuffer::new(x.finish())); - let data = ArrayDataBuilder::new(self.data_type.clone()) + let data = ArrayDataBuilder::new((*self.data_type).clone()) .len(pos.len()) .nulls(nulls) .add_buffer(offsets.finish()) @@ -115,28 +118,65 @@ impl ArrayDecoder for ListArrayDecoder { Ok(unsafe { data.build_unchecked() }) } - fn validate_row(&self, tape: &Tape<'_>, pos: u32) -> bool { - let end_idx = match (tape.get(pos), self.is_nullable) { - (TapeElement::StartList(end_idx), _) => end_idx, - (TapeElement::Null, true) => { - return true; + fn validate_row<'tape>( + &'tape self, + tape: &'tape Tape<'_>, + pos: u32, + row_idx: usize, + ) -> Result<(), Vec>> { + let end_idx = match tape.get(pos) { + TapeElement::StartList(end_idx) => end_idx, + TapeElement::Null => { + if self.is_nullable { + return Ok(()); + } else { + return ErrorMarker::err( + row_idx, + pos, + FailureKind::NullValue, + Arc::clone(&self.data_type), + ); + } + } + _ => { + return ErrorMarker::err( + row_idx, + pos, + FailureKind::TypeMismatch, + Arc::clone(&self.data_type), + ); } - _ => return false, }; let mut cur_idx = pos + 1; + let mut element_idx = 0; while cur_idx < end_idx { - if !self.decoder.validate_row(tape, cur_idx) { - return false; + match self.decoder.validate_row(tape, cur_idx, row_idx) { + Ok(()) => {} + Err(mut child_errors) => { + for error in &mut child_errors { + error.array_indices.push(element_idx); + } + return Err(child_errors); + } } - // Advance to next field - if let Ok(next) = tape.next(cur_idx, "list value") { - cur_idx = next; - } else { - return false; + + match tape.next(cur_idx, "list value") { + Ok(next) => { + cur_idx = next; + element_idx += 1; + } + Err(_) => { + return ErrorMarker::err( + row_idx, + cur_idx, + FailureKind::TypeMismatch, + Arc::clone(&self.data_type), + ); + } } } - true + Ok(()) } } diff --git a/arrow-json/src/reader/map_array.rs b/arrow-json/src/reader/map_array.rs index 201f2940f3fc..c6b68128a396 100644 --- a/arrow-json/src/reader/map_array.rs +++ b/arrow-json/src/reader/map_array.rs @@ -15,7 +15,10 @@ // specific language governing permissions and limitations // under the License. +use std::sync::Arc; + use crate::reader::tape::{Tape, TapeElement}; +use crate::reader::validation::{ErrorMarker, FailureKind}; use crate::reader::{make_decoder, ArrayDecoder}; use crate::StructMode; use arrow_array::builder::{BooleanBufferBuilder, BufferBuilder}; @@ -25,7 +28,7 @@ use arrow_data::{ArrayData, ArrayDataBuilder}; use arrow_schema::{ArrowError, DataType}; pub struct MapArrayDecoder { - data_type: DataType, + data_type: Arc, keys: Box, values: Box, is_nullable: bool, @@ -74,7 +77,7 @@ impl MapArrayDecoder { )?; Ok(Self { - data_type, + data_type: Arc::new(data_type), keys, values, is_nullable, @@ -84,7 +87,7 @@ impl MapArrayDecoder { impl ArrayDecoder for MapArrayDecoder { fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { - let s = match &self.data_type { + let s = match &*self.data_type { DataType::Map(f, _) => match f.data_type() { s @ DataType::Struct(_) => s, _ => unreachable!(), @@ -147,7 +150,7 @@ impl ArrayDecoder for MapArrayDecoder { let nulls = nulls.as_mut().map(|x| NullBuffer::new(x.finish())); - let builder = ArrayDataBuilder::new(self.data_type.clone()) + let builder = ArrayDataBuilder::new((*self.data_type).clone()) .len(pos.len()) .buffers(vec![offsets.finish()]) .nulls(nulls) @@ -158,33 +161,67 @@ impl ArrayDecoder for MapArrayDecoder { Ok(unsafe { builder.build_unchecked() }) } - fn validate_row(&self, tape: &Tape<'_>, pos: u32) -> bool { + fn validate_row<'tape>( + &'tape self, + tape: &'tape Tape<'_>, + pos: u32, + row_idx: usize, + ) -> Result<(), Vec>> { let end_idx = match tape.get(pos) { TapeElement::StartObject(end_idx) => end_idx, TapeElement::Null => { - return self.is_nullable; + if self.is_nullable { + return Ok(()); + } else { + return ErrorMarker::err( + row_idx, + pos, + FailureKind::NullValue, + Arc::clone(&self.data_type), + ); + } + } + _ => { + return ErrorMarker::err( + row_idx, + pos, + FailureKind::TypeMismatch, + Arc::clone(&self.data_type), + ); } - _ => return false, }; let mut cur_idx = pos + 1; while cur_idx < end_idx { let key = cur_idx; - let Ok(value) = tape.next(key, "map key") else { - return false; + let value = match tape.next(key, "map key") { + Ok(v) => v, + Err(_) => { + return ErrorMarker::err( + row_idx, + key, + FailureKind::TypeMismatch, + Arc::clone(&self.data_type), + ); + } }; - if let Ok(i) = tape.next(value, "map value") { - cur_idx = i; - } else { - return false; - } + cur_idx = match tape.next(value, "map value") { + Ok(i) => i, + Err(_) => { + return ErrorMarker::err( + row_idx, + value, + FailureKind::TypeMismatch, + Arc::clone(&self.data_type), + ); + } + }; - if !(self.keys.validate_row(tape, key) && self.values.validate_row(tape, value)) { - return false; - } + self.keys.validate_row(tape, key, row_idx)?; + self.values.validate_row(tape, value, row_idx)?; } - true + Ok(()) } } diff --git a/arrow-json/src/reader/mod.rs b/arrow-json/src/reader/mod.rs index e16019d29952..b1a23397c921 100644 --- a/arrow-json/src/reader/mod.rs +++ b/arrow-json/src/reader/mod.rs @@ -163,6 +163,9 @@ use arrow_array::{ use arrow_data::ArrayData; use arrow_schema::{ArrowError, DataType, FieldRef, Schema, SchemaRef, TimeUnit}; pub use schema::*; +pub use validation::{ + build_detailed_errors, ErrorMarker, FailureKind, JsonType, ValidationError, DEFAULT_MAX_ERRORS, +}; mod binary_array; mod boolean_array; @@ -179,6 +182,7 @@ mod string_view_array; mod struct_array; mod tape; mod timestamp_array; +pub mod validation; /// A builder for [`Reader`] and [`Decoder`] pub struct ReaderBuilder { @@ -696,8 +700,8 @@ impl Decoder { }); let pos: Vec<_> = if self.allow_bad_data { - // filter out invalid rows before we attempt to deserialize - pos.filter(|p| self.decoder.validate_row(&tape, *p)) + pos.enumerate() + .filter_map(|(idx, p)| self.decoder.validate_row(&tape, p, idx).ok().map(|_| p)) .collect() } else { pos.collect() @@ -717,40 +721,63 @@ impl Decoder { } /// Flushes schema-conforming JSON in the current buffer to a [`RecordBatch`], and returns - /// an BooleanArray that marks good rows and an Option with invalid records, if - /// any exist + /// a BooleanArray that marks good rows, an Option with invalid records, + /// and detailed validation errors /// /// Returns `Ok(None)` if no buffered data /// /// Note: if called part way through decoding a record, this will return an error + #[allow(clippy::type_complexity)] pub fn flush_with_bad_data( &mut self, - ) -> Result)>, ArrowError> { + ) -> Result< + Option<( + RecordBatch, + BooleanArray, + Option, + Vec, + )>, + ArrowError, + > { let tape = self.tape_decoder.finish()?; if tape.num_rows() == 0 { return Ok(None); } - // First offset is null sentinel let mut next_object = 1; - let mut good_rows = Vec::with_capacity(tape.num_rows()); - - let (good, bad): (Vec<_>, Vec<_>) = (0..tape.num_rows()) + let positions: Vec<_> = (0..tape.num_rows()) .map(|_| { let next = tape.next(next_object, "row").unwrap(); - std::mem::replace(&mut next_object, next) }) - .partition(|p| { - let valid = self.decoder.validate_row(&tape, *p); - good_rows.push(valid); - valid - }); + .collect(); + + let mut good = Vec::new(); + let mut bad_positions = Vec::new(); + let mut all_markers = Vec::new(); + let mut good_rows = Vec::with_capacity(tape.num_rows()); - let bad_data = if !bad.is_empty() { + for (row_idx, p) in positions.into_iter().enumerate() { + match self.decoder.validate_row(&tape, p, row_idx) { + Ok(()) => { + good.push(p); + good_rows.push(true); + } + Err(markers) => { + bad_positions.push(p); + good_rows.push(false); + all_markers.extend(markers); + } + } + } + + all_markers.truncate(DEFAULT_MAX_ERRORS); + let errors = build_detailed_errors(&tape, all_markers); + + let bad_data = if !bad_positions.is_empty() { let mut json = JsonArrayDecoder::new(false); - let v = json.decode(&tape, &bad).unwrap(); + let v = json.decode(&tape, &bad_positions).unwrap(); Some(v.into()) } else { None @@ -766,7 +793,7 @@ impl Decoder { } }; - Ok(Some((batch, good_rows.into(), bad_data))) + Ok(Some((batch, good_rows.into(), bad_data, errors))) } } @@ -774,8 +801,16 @@ trait ArrayDecoder: Send { /// Decode elements from `tape` starting at the indexes contained in `pos` fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result; - /// Returns true if the row matches the schema - fn validate_row(&self, tape: &Tape<'_>, pos: u32) -> bool; + /// Validates a row and returns error markers if validation fails + /// + /// Returns Ok(()) if valid, Err(markers) if invalid. + /// Uses `&'tape self` to allow borrowing field names from the decoder. + fn validate_row<'tape>( + &'tape self, + tape: &'tape Tape<'_>, + pos: u32, + row_idx: usize, + ) -> Result<(), Vec>>; } macro_rules! primitive_decoder { @@ -3117,7 +3152,7 @@ mod tests { decoder.decode(&j6.as_bytes()).unwrap(); decoder.decode(&j7.as_bytes()).unwrap(); - let (good, mask, bad) = decoder.flush_with_bad_data().unwrap().unwrap(); + let (good, mask, bad, _errors) = decoder.flush_with_bad_data().unwrap().unwrap(); assert_eq!( mask, vec![true, false, false, false, true, false, false].into() @@ -3175,7 +3210,7 @@ mod tests { decoder.decode(j1.as_bytes()).unwrap(); decoder.decode(j2.as_bytes()).unwrap(); - let (good, mask, bad) = decoder.flush_with_bad_data().unwrap().unwrap(); + let (good, mask, bad, _errors) = decoder.flush_with_bad_data().unwrap().unwrap(); assert_eq!(mask, vec![false, true].into()); assert_eq!(good.num_rows(), 1); @@ -3185,4 +3220,458 @@ mod tests { assert_eq!(bad.len(), 1); assert_eq!(bad.value(0), j1); } + + #[test] + fn test_validation_details_primitive_errors() { + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new("value", DataType::Float64, false), + ])); + + let json = r#" + {"id": 1, "value": 1.5} + {"id": "invalid", "value": 2.5} + {"id": 3, "value": "also_invalid"} + "#; + + let mut decoder = ReaderBuilder::new(schema) + .with_allow_bad_data(true) + .build_decoder() + .unwrap(); + + decoder.decode(json.as_bytes()).unwrap(); + let (batch, _mask, _bad_data, errors) = decoder.flush_with_bad_data().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 1); // Only first row valid + assert_eq!(errors.len(), 2); + + // Check first error - invalid id + assert_eq!(errors[0].row_index, 1); + assert_eq!(errors[0].field_path, "id"); + assert!(matches!(errors[0].failure_kind, FailureKind::ParseFailure)); + + // Check second error - invalid value + assert_eq!(errors[1].row_index, 2); + assert_eq!(errors[1].field_path, "value"); + } + + #[test] + fn test_validation_details_nested_struct_errors() { + let schema = Arc::new(Schema::new(vec![Field::new( + "user", + DataType::Struct(Fields::from(vec![ + Field::new("name", DataType::Utf8, false), + Field::new("age", DataType::Int32, false), + Field::new( + "address", + DataType::Struct(Fields::from(vec![ + Field::new("city", DataType::Utf8, false), + Field::new("zipcode", DataType::Int32, false), + ])), + false, + ), + ])), + false, + )])); + + let json = r#" + {"user": {"name": "Alice", "age": 30, "address": {"city": "NYC", "zipcode": 10001}}} + {"user": {"name": "Bob", "age": "invalid", "address": {"city": "LA", "zipcode": "bad"}}} + "#; + + let mut decoder = ReaderBuilder::new(schema) + .with_allow_bad_data(true) + .build_decoder() + .unwrap(); + + decoder.decode(json.as_bytes()).unwrap(); + let (_batch, _mask, _bad_data, errors) = decoder.flush_with_bad_data().unwrap().unwrap(); + + // Should have 1 error - age is invalid, short-circuits before checking zipcode + assert_eq!(errors.len(), 1); + assert_eq!(errors[0].row_index, 1); + assert_eq!(errors[0].field_path, "age"); + } + + #[test] + fn test_validation_details_missing_required_fields() { + let schema = Arc::new(Schema::new(vec![Field::new( + "user", + DataType::Struct(Fields::from(vec![ + Field::new("name", DataType::Utf8, false), + Field::new("age", DataType::Int32, false), + Field::new("email", DataType::Utf8, false), + ])), + false, + )])); + + let json = r#" + {"user": {"name": "Alice", "age": 30, "email": "alice@example.com"}} + {"user": {"name": "Bob"}} + "#; + + let mut decoder = ReaderBuilder::new(schema) + .with_allow_bad_data(true) + .build_decoder() + .unwrap(); + + decoder.decode(json.as_bytes()).unwrap(); + let (_batch, _mask, _bad_data, errors) = decoder.flush_with_bad_data().unwrap().unwrap(); + + // Should have 2 errors - age and email both missing + assert_eq!(errors.len(), 2); + assert_eq!(errors[0].row_index, 1); + assert_eq!(errors[0].field_path, "age"); + assert_eq!(errors[0].failure_kind, FailureKind::MissingField); + + assert_eq!(errors[1].row_index, 1); + assert_eq!(errors[1].field_path, "email"); + assert_eq!(errors[1].failure_kind, FailureKind::MissingField); + } + + #[test] + fn test_validation_details_array_element_errors() { + let schema = Arc::new(Schema::new(vec![Field::new( + "items", + DataType::List(Arc::new(Field::new("item", DataType::Int32, false))), + false, + )])); + + let json = r#" + {"items": [1, 2, 3]} + {"items": [4, "invalid", 6]} + "#; + + let mut decoder = ReaderBuilder::new(schema) + .with_allow_bad_data(true) + .build_decoder() + .unwrap(); + + decoder.decode(json.as_bytes()).unwrap(); + let (_batch, _mask, _bad_data, errors) = decoder.flush_with_bad_data().unwrap().unwrap(); + + assert_eq!(errors.len(), 1); + assert_eq!(errors[0].row_index, 1); + // Array element error should include array index + assert_eq!(errors[0].field_path, "items[1]"); + assert!(matches!(errors[0].failure_kind, FailureKind::ParseFailure)); + } + + #[test] + fn test_validation_details_type_mismatch() { + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + ])); + + let json = r#" + {"id": 123, "name": "valid"} + {"id": {"nested": "object"}, "name": [1, 2, 3]} + "#; + + let mut decoder = ReaderBuilder::new(schema) + .with_allow_bad_data(true) + .build_decoder() + .unwrap(); + + decoder.decode(json.as_bytes()).unwrap(); + let (_batch, _mask, _bad_data, errors) = decoder.flush_with_bad_data().unwrap().unwrap(); + + assert_eq!(errors.len(), 1); + assert_eq!(errors[0].row_index, 1); + assert_eq!(errors[0].field_path, "id"); + assert_eq!(errors[0].failure_kind, FailureKind::TypeMismatch); + assert_eq!(errors[0].actual_type, Some(JsonType::Object)); + } + + #[test] + fn test_validation_details_null_value() { + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + ])); + + let json = r#" + {"id": 123, "name": "valid"} + {"id": null, "name": null} + "#; + + let mut decoder = ReaderBuilder::new(schema) + .with_allow_bad_data(true) + .build_decoder() + .unwrap(); + + decoder.decode(json.as_bytes()).unwrap(); + let (_batch, _mask, _bad_data, errors) = decoder.flush_with_bad_data().unwrap().unwrap(); + + assert_eq!(errors.len(), 1); + assert_eq!(errors[0].row_index, 1); + assert_eq!(errors[0].field_path, "id"); + assert_eq!(errors[0].failure_kind, FailureKind::NullValue); + assert_eq!(errors[0].actual_value.as_deref(), Some("null")); + } + + #[test] + #[ignore = "StructMode::ListOnly validation not yet implemented - all rows marked invalid"] + fn test_validation_details_struct_mode_list_only() { + use crate::StructMode; + + let schema = Arc::new(Schema::new(vec![Field::new( + "record", + DataType::Struct(Fields::from(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Utf8, false), + ])), + false, + )])); + + // ListOnly mode expects arrays like [1, "hello"] instead of {"a": 1, "b": "hello"} + let json = r#" + {"record": [1, "valid"]} + {"record": [2, 123]} + {"record": ["invalid", "text"]} + "#; + + let mut decoder = ReaderBuilder::new(schema) + .with_struct_mode(StructMode::ListOnly) + .with_allow_bad_data(true) + .build_decoder() + .unwrap(); + + decoder.decode(json.as_bytes()).unwrap(); + let (batch, mask, _bad_data, errors) = decoder.flush_with_bad_data().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 1); + assert_eq!(mask, BooleanArray::from(vec![true, false, false])); + assert_eq!(errors.len(), 2); + + // Row 1: b field has wrong type (number 123 instead of string) + assert_eq!(errors[0].row_index, 1); + assert_eq!(errors[0].field_path, "record[1]"); + assert_eq!(errors[0].failure_kind, FailureKind::TypeMismatch); + assert_eq!(errors[0].actual_type, Some(JsonType::Number)); + assert_eq!(errors[0].actual_value.as_deref(), Some("123")); + + // Row 2: a field has wrong type (string "invalid" instead of int) + assert_eq!(errors[1].row_index, 2); + assert_eq!(errors[1].field_path, "record[0]"); + assert_eq!(errors[1].failure_kind, FailureKind::ParseFailure); + assert_eq!(errors[1].actual_type, Some(JsonType::String)); + assert_eq!(errors[1].actual_value.as_deref(), Some("\"invalid\"")); + } + + #[test] + fn test_validation_details_nested_arrays() { + // Test List> - nested arrays + let schema = Arc::new(Schema::new(vec![Field::new( + "matrix", + DataType::List(Arc::new(Field::new( + "row", + DataType::List(Arc::new(Field::new("item", DataType::Int32, false))), + false, + ))), + false, + )])); + + let json = r#" + {"matrix": [[1, 2], [3, 4]]} + {"matrix": [[5, 6], [7, "invalid"]]} + "#; + + let mut decoder = ReaderBuilder::new(schema) + .with_allow_bad_data(true) + .build_decoder() + .unwrap(); + + decoder.decode(json.as_bytes()).unwrap(); + let (batch, mask, _bad_data, errors) = decoder.flush_with_bad_data().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 1); + assert_eq!(mask, BooleanArray::from(vec![true, false])); + assert_eq!(errors.len(), 1); + + // Error at matrix[1][1] - the "invalid" string + assert_eq!(errors[0].row_index, 1); + assert_eq!(errors[0].field_path, "matrix[1][1]"); + assert_eq!(errors[0].failure_kind, FailureKind::ParseFailure); + assert_eq!(errors[0].actual_type, Some(JsonType::String)); + assert_eq!(errors[0].actual_value.as_deref(), Some("\"invalid\"")); + } + + #[test] + fn test_validation_details_map_errors() { + let schema = Arc::new(Schema::new(vec![Field::new( + "properties", + DataType::Map( + Arc::new(Field::new( + "entries", + DataType::Struct(Fields::from(vec![ + Field::new("key", DataType::Utf8, false), + Field::new("value", DataType::Int32, false), + ])), + false, + )), + false, + ), + false, + )])); + + let json = r#" + {"properties": {"count": 42, "size": 100}} + {"properties": {"count": "not_a_number", "size": 200}} + "#; + + let mut decoder = ReaderBuilder::new(schema) + .with_allow_bad_data(true) + .build_decoder() + .unwrap(); + + decoder.decode(json.as_bytes()).unwrap(); + let (batch, mask, _bad_data, errors) = decoder.flush_with_bad_data().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 1); + assert_eq!(mask, BooleanArray::from(vec![true, false])); + assert_eq!(errors.len(), 1); + assert_eq!(errors[0].row_index, 1); + assert_eq!(errors[0].failure_kind, FailureKind::ParseFailure); + assert_eq!(errors[0].actual_type, Some(JsonType::String)); + assert_eq!(errors[0].actual_value.as_deref(), Some("\"not_a_number\"")); + } + + #[test] + fn test_validation_details_strict_mode_unknown_field() { + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + ])); + + let json = r#" + {"id": 1, "name": "Alice"} + {"id": 2, "name": "Bob", "extra_field": "should_error"} + {"id": 3, "name": "Charlie", "another": 123, "fields": true} + "#; + + let mut decoder = ReaderBuilder::new(schema) + .with_strict_mode(true) + .with_allow_bad_data(true) + .build_decoder() + .unwrap(); + + decoder.decode(json.as_bytes()).unwrap(); + let (batch, mask, _bad_data, errors) = decoder.flush_with_bad_data().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 1); // Only first row valid + assert_eq!(mask, BooleanArray::from(vec![true, false, false])); + assert_eq!(errors.len(), 2); + + assert_eq!(errors[0].row_index, 1); + assert_eq!(errors[0].field_path, "extra_field"); + assert_eq!(errors[0].failure_kind, FailureKind::TypeMismatch); + + assert_eq!(errors[1].row_index, 2); + assert_eq!(errors[1].field_path, "another"); + assert_eq!(errors[1].failure_kind, FailureKind::TypeMismatch); + } + + #[test] + fn test_validation_details_timestamp_errors() { + let schema = Arc::new(Schema::new(vec![Field::new( + "ts", + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + )])); + + let json = r#" + {"ts": "2024-01-15T10:30:00Z"} + {"ts": "not-a-timestamp"} + {"ts": "2024-13-45T99:99:99Z"} + "#; + + let mut decoder = ReaderBuilder::new(schema) + .with_allow_bad_data(true) + .build_decoder() + .unwrap(); + + decoder.decode(json.as_bytes()).unwrap(); + let (batch, mask, _bad_data, errors) = decoder.flush_with_bad_data().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 1); + assert_eq!(mask, BooleanArray::from(vec![true, false, false])); + assert_eq!(errors.len(), 2); + + assert_eq!(errors[0].row_index, 1); + assert_eq!(errors[0].field_path, "ts"); + assert_eq!(errors[0].failure_kind, FailureKind::ParseFailure); + assert_eq!(errors[0].actual_type, Some(JsonType::String)); + assert_eq!( + errors[0].actual_value.as_deref(), + Some("\"not-a-timestamp\"") + ); + + assert_eq!(errors[1].row_index, 2); + assert_eq!(errors[1].field_path, "ts"); + assert_eq!(errors[1].failure_kind, FailureKind::ParseFailure); + } + + #[test] + fn test_validation_details_bad_data_content() { + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + ])); + + let json = r#" + {"id": 1, "name": "Alice"} + {"id": "not_int", "name": "Bob"} + {"id": 3, "name": null} + "#; + + let mut decoder = ReaderBuilder::new(schema) + .with_allow_bad_data(true) + .build_decoder() + .unwrap(); + + decoder.decode(json.as_bytes()).unwrap(); + let (batch, mask, bad_data, errors) = decoder.flush_with_bad_data().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 1); + assert_eq!(mask, BooleanArray::from(vec![true, false, false])); + assert_eq!(errors.len(), 2); + + let bad_data = bad_data.expect("should have bad_data"); + assert_eq!(bad_data.len(), 2); + + let bad0 = bad_data.value(0); + let bad1 = bad_data.value(1); + assert!(bad0.contains("\"not_int\"") && bad0.contains("\"Bob\"")); + assert!(bad1.contains("null") && bad1.contains("\"id\":3") || bad1.contains("\"id\": 3")); + } + + #[test] + fn test_validation_error_display() { + let schema = Arc::new(Schema::new(vec![Field::new( + "count", + DataType::Int32, + false, + )])); + + let json = r#"{"count": "not_a_number"}"#; + + let mut decoder = ReaderBuilder::new(schema) + .with_allow_bad_data(true) + .build_decoder() + .unwrap(); + + decoder.decode(json.as_bytes()).unwrap(); + let (_batch, _mask, _bad_data, errors) = decoder.flush_with_bad_data().unwrap().unwrap(); + + assert_eq!(errors.len(), 1); + + let error_msg = errors[0].to_string(); + assert!(error_msg.contains("row 0")); + assert!(error_msg.contains("count")); + assert!(error_msg.contains("Int32")); + assert!(error_msg.contains("parse failure")); + assert!(error_msg.contains("\"not_a_number\"")); + } } diff --git a/arrow-json/src/reader/null_array.rs b/arrow-json/src/reader/null_array.rs index d86a9b1d92ba..e1385541b585 100644 --- a/arrow-json/src/reader/null_array.rs +++ b/arrow-json/src/reader/null_array.rs @@ -15,13 +15,31 @@ // specific language governing permissions and limitations // under the License. +use std::sync::Arc; + use crate::reader::tape::{Tape, TapeElement}; +use crate::reader::validation::{ErrorMarker, FailureKind}; use crate::reader::ArrayDecoder; use arrow_data::{ArrayData, ArrayDataBuilder}; use arrow_schema::{ArrowError, DataType}; -#[derive(Default)] -pub struct NullArrayDecoder {} +pub struct NullArrayDecoder { + data_type: Arc, +} + +impl Default for NullArrayDecoder { + fn default() -> Self { + Self::new() + } +} + +impl NullArrayDecoder { + pub fn new() -> Self { + Self { + data_type: Arc::new(DataType::Null), + } + } +} impl ArrayDecoder for NullArrayDecoder { fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { @@ -33,7 +51,17 @@ impl ArrayDecoder for NullArrayDecoder { ArrayDataBuilder::new(DataType::Null).len(pos.len()).build() } - fn validate_row(&self, tape: &Tape<'_>, pos: u32) -> bool { - matches!(tape.get(pos), TapeElement::Null) + fn validate_row<'tape>( + &'tape self, + tape: &'tape Tape<'_>, + pos: u32, + row_idx: usize, + ) -> Result<(), Vec>> { + let failure = match tape.get(pos) { + TapeElement::Null => return Ok(()), + _ => FailureKind::TypeMismatch, + }; + + ErrorMarker::err(row_idx, pos, failure, Arc::clone(&self.data_type)) } } diff --git a/arrow-json/src/reader/primitive_array.rs b/arrow-json/src/reader/primitive_array.rs index a9c7f2ed3000..55e558efcbb1 100644 --- a/arrow-json/src/reader/primitive_array.rs +++ b/arrow-json/src/reader/primitive_array.rs @@ -17,6 +17,7 @@ use num::NumCast; use std::marker::PhantomData; +use std::sync::Arc; use arrow_array::builder::PrimitiveBuilder; use arrow_array::{Array, ArrowPrimitiveType}; @@ -26,6 +27,7 @@ use arrow_schema::{ArrowError, DataType}; use half::f16; use crate::reader::tape::{Tape, TapeElement}; +use crate::reader::validation::{ErrorMarker, FailureKind}; use crate::reader::ArrayDecoder; /// A trait for JSON-specific primitive parsing logic @@ -74,7 +76,7 @@ impl ParseJsonNumber for f64 { } pub struct PrimitiveArrayDecoder { - data_type: DataType, + data_type: Arc, is_nullable: bool, // Invariant and Send phantom: PhantomData P>, @@ -83,7 +85,7 @@ pub struct PrimitiveArrayDecoder { impl PrimitiveArrayDecoder

{ pub fn new(data_type: DataType, is_nullable: bool) -> Self { Self { - data_type, + data_type: Arc::new(data_type), is_nullable, phantom: Default::default(), } @@ -96,8 +98,8 @@ where P::Native: ParseJsonNumber + NumCast, { fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { - let mut builder = - PrimitiveBuilder::

::with_capacity(pos.len()).with_data_type(self.data_type.clone()); + let mut builder = PrimitiveBuilder::

::with_capacity(pos.len()) + .with_data_type((*self.data_type).clone()); let d = &self.data_type; for p in pos { @@ -159,33 +161,58 @@ where Ok(builder.finish().into_data()) } - fn validate_row(&self, tape: &Tape<'_>, pos: u32) -> bool { - match tape.get(pos) { - TapeElement::Null => self.is_nullable, + fn validate_row<'tape>( + &'tape self, + tape: &'tape Tape<'_>, + pos: u32, + row_idx: usize, + ) -> Result<(), Vec>> { + let failure = match tape.get(pos) { + TapeElement::Null => { + if self.is_nullable { + return Ok(()); + } + FailureKind::NullValue + } TapeElement::String(idx) => { let s = tape.get_string(idx); - P::parse(s).is_some() + if P::parse(s).is_some() { + return Ok(()); + } + FailureKind::ParseFailure } TapeElement::Number(idx) => { let s = tape.get_string(idx); let v: Option<

::Native> = ParseJsonNumber::parse(s.as_bytes()); - v.is_some() + if v.is_some() { + return Ok(()); + } + FailureKind::ParseFailure } TapeElement::F32(v) => { let v = f32::from_bits(v); let v: Option<

::Native> = NumCast::from(v); - v.is_some() + if v.is_some() { + return Ok(()); + } + FailureKind::ParseFailure } TapeElement::I32(v) => { let v: Option<

::Native> = NumCast::from(v); - v.is_some() + if v.is_some() { + return Ok(()); + } + FailureKind::ParseFailure } TapeElement::F64(high) => match tape.get(pos + 1) { TapeElement::F32(low) => { let v = f64::from_bits((high as u64) << 32 | low as u64); let v: Option<

::Native> = NumCast::from(v); - v.is_some() + if v.is_some() { + return Ok(()); + } + FailureKind::ParseFailure } _ => unreachable!(), }, @@ -193,11 +220,16 @@ where TapeElement::I32(low) => { let v = (high as i64) << 32 | (low as u32) as i64; let v: Option<

::Native> = NumCast::from(v); - v.is_some() + if v.is_some() { + return Ok(()); + } + FailureKind::ParseFailure } _ => unreachable!(), }, - _ => false, - } + _ => FailureKind::TypeMismatch, + }; + + ErrorMarker::err(row_idx, pos, failure, Arc::clone(&self.data_type)) } } diff --git a/arrow-json/src/reader/string_array.rs b/arrow-json/src/reader/string_array.rs index 406937beab5c..c08484e90f2d 100644 --- a/arrow-json/src/reader/string_array.rs +++ b/arrow-json/src/reader/string_array.rs @@ -15,19 +15,23 @@ // specific language governing permissions and limitations // under the License. +use std::marker::PhantomData; +use std::sync::Arc; + use arrow_array::builder::GenericStringBuilder; use arrow_array::{Array, GenericStringArray, OffsetSizeTrait}; use arrow_data::ArrayData; -use arrow_schema::ArrowError; -use std::marker::PhantomData; +use arrow_schema::{ArrowError, DataType}; use crate::reader::tape::{Tape, TapeElement}; +use crate::reader::validation::{ErrorMarker, FailureKind}; use crate::reader::ArrayDecoder; const TRUE: &str = "true"; const FALSE: &str = "false"; pub struct StringArrayDecoder { + data_type: Arc, coerce_primitive: bool, is_nullable: bool, phantom: PhantomData, @@ -36,6 +40,7 @@ pub struct StringArrayDecoder { impl StringArrayDecoder { pub fn new(coerce_primitive: bool, is_nullable: bool) -> Self { Self { + data_type: Arc::new(GenericStringArray::::DATA_TYPE), coerce_primitive, is_nullable, phantom: Default::default(), @@ -129,18 +134,35 @@ impl ArrayDecoder for StringArrayDecoder { Ok(builder.finish().into_data()) } - fn validate_row(&self, tape: &Tape<'_>, pos: u32) -> bool { - match tape.get(pos) { - TapeElement::String(_) => true, - TapeElement::Null => self.is_nullable, + fn validate_row<'tape>( + &'tape self, + tape: &'tape Tape<'_>, + pos: u32, + row_idx: usize, + ) -> Result<(), Vec>> { + let failure = match tape.get(pos) { + TapeElement::String(_) => return Ok(()), + TapeElement::Null => { + if self.is_nullable { + return Ok(()); + } + FailureKind::NullValue + } TapeElement::True | TapeElement::False | TapeElement::Number(_) | TapeElement::I64(_) | TapeElement::I32(_) | TapeElement::F32(_) - | TapeElement::F64(_) => self.coerce_primitive, - _ => false, - } + | TapeElement::F64(_) => { + if self.coerce_primitive { + return Ok(()); + } + FailureKind::TypeMismatch + } + _ => FailureKind::TypeMismatch, + }; + + ErrorMarker::err(row_idx, pos, failure, Arc::clone(&self.data_type)) } } diff --git a/arrow-json/src/reader/string_view_array.rs b/arrow-json/src/reader/string_view_array.rs index 08ba29e8faca..10333e96c3b0 100644 --- a/arrow-json/src/reader/string_view_array.rs +++ b/arrow-json/src/reader/string_view_array.rs @@ -163,7 +163,12 @@ impl ArrayDecoder for StringViewArrayDecoder { Ok(array.into_data()) } - fn validate_row(&self, _tape: &Tape<'_>, _pos: u32) -> bool { + fn validate_row<'tape>( + &'tape self, + _tape: &'tape Tape<'_>, + _pos: u32, + _row_idx: usize, + ) -> Result<(), Vec>> { todo!("string views are not yet supported") } } diff --git a/arrow-json/src/reader/struct_array.rs b/arrow-json/src/reader/struct_array.rs index 139abebf3b6f..d526998718c6 100644 --- a/arrow-json/src/reader/struct_array.rs +++ b/arrow-json/src/reader/struct_array.rs @@ -15,7 +15,10 @@ // specific language governing permissions and limitations // under the License. +use std::sync::Arc; + use crate::reader::tape::{Tape, TapeElement}; +use crate::reader::validation::{ErrorMarker, FailureKind}; use crate::reader::{make_decoder, ArrayDecoder, StructMode}; use arrow_array::builder::BooleanBufferBuilder; use arrow_buffer::buffer::NullBuffer; @@ -23,7 +26,7 @@ use arrow_data::{ArrayData, ArrayDataBuilder}; use arrow_schema::{ArrowError, DataType, Fields}; pub struct StructArrayDecoder { - data_type: DataType, + data_type: Arc, decoders: Vec>, strict_mode: bool, is_nullable: bool, @@ -57,7 +60,7 @@ impl StructArrayDecoder { .collect::, ArrowError>>()?; Ok(Self { - data_type, + data_type: Arc::new(data_type), decoders, strict_mode, is_nullable, @@ -190,7 +193,7 @@ impl ArrayDecoder for StructArrayDecoder { } } - let data = ArrayDataBuilder::new(self.data_type.clone()) + let data = ArrayDataBuilder::new((*self.data_type).clone()) .len(pos.len()) .nulls(nulls) .child_data(child_data); @@ -200,14 +203,33 @@ impl ArrayDecoder for StructArrayDecoder { Ok(unsafe { data.build_unchecked() }) } - fn validate_row(&self, tape: &Tape<'_>, pos: u32) -> bool { - let end_idx = match (tape.get(pos), self.is_nullable) { - (TapeElement::StartObject(end_idx), _) => end_idx, - (TapeElement::Null, true) => { - return true; + fn validate_row<'tape>( + &'tape self, + tape: &'tape Tape<'_>, + pos: u32, + row_idx: usize, + ) -> Result<(), Vec>> { + let end_idx = match tape.get(pos) { + TapeElement::StartObject(end_idx) => end_idx, + TapeElement::Null => { + if self.is_nullable { + return Ok(()); + } else { + return ErrorMarker::err( + row_idx, + pos, + FailureKind::NullValue, + Arc::clone(&self.data_type), + ); + } } _ => { - return false; + return ErrorMarker::err( + row_idx, + pos, + FailureKind::TypeMismatch, + Arc::clone(&self.data_type), + ); } }; @@ -216,46 +238,91 @@ impl ArrayDecoder for StructArrayDecoder { let mut cur_idx = pos + 1; while cur_idx < end_idx { - // Read field name let field_name = match tape.get(cur_idx) { TapeElement::String(s) => tape.get_string(s), - _ => return false, + _ => { + return ErrorMarker::err( + row_idx, + cur_idx, + FailureKind::TypeMismatch, + Arc::clone(&self.data_type), + ); + } }; - // Update child pos if match found match fields.iter().position(|x| x.name() == field_name) { Some(field_idx) => { let child_pos = cur_idx + 1; - if !self.decoders[field_idx].validate_row(tape, child_pos) { - return false; + + if let Err(mut child_errors) = + self.decoders[field_idx].validate_row(tape, child_pos, row_idx) + { + // Add field name for leaf validator errors that lack field context. + for error in &mut child_errors { + // Preserve field names already set by nested validators. + if error.field_name.is_none() { + error.field_name = Some(field_name); + } + } + return Err(child_errors); } + validated_fields[field_idx] = true; } None => { if self.strict_mode { - return false; + // Custom field_name - can't use helper + return Err(vec![ErrorMarker { + row_index: row_idx, + tape_pos: Some(cur_idx), + field_name: Some(field_name), + array_indices: Vec::new(), + error_kind: FailureKind::TypeMismatch, + expected_type: Arc::clone(&self.data_type), + }]); } } } - // Advance to next field cur_idx = match tape.next(cur_idx + 1, "field value") { Ok(i) => i, Err(_) => { - return false; + return ErrorMarker::err( + row_idx, + cur_idx, + FailureKind::TypeMismatch, + Arc::clone(&self.data_type), + ); } - } + }; } - validated_fields + // Early exit on happy path - no missing field errors + let all_valid = validated_fields .iter() .zip(fields) - .all(|(validated, field)| { - if !validated && !field.is_nullable() { - return false; - } - true - }) + .all(|(validated, field)| *validated || field.is_nullable()); + + if all_valid { + return Ok(()); + } + + // Error path: collect missing field errors + let mut missing_errors = Vec::new(); + for (validated, field) in validated_fields.iter().zip(fields) { + if !validated && !field.is_nullable() { + missing_errors.push(ErrorMarker { + row_index: row_idx, + tape_pos: None, + field_name: Some(field.name()), + array_indices: Vec::new(), + error_kind: FailureKind::MissingField, + expected_type: Arc::new(field.data_type().clone()), + }); + } + } + + Err(missing_errors) } } diff --git a/arrow-json/src/reader/tape.rs b/arrow-json/src/reader/tape.rs index afa3bd8f82ac..e626a918ba72 100644 --- a/arrow-json/src/reader/tape.rs +++ b/arrow-json/src/reader/tape.rs @@ -211,6 +211,17 @@ impl<'a> Tape<'a> { self.serialize(&mut out, idx); ArrowError::JsonError(format!("expected {expected} got {out}")) } + + /// Creates a new Tape for testing purposes + #[cfg(test)] + pub fn new(elements: &'a [TapeElement], strings: &'a str, string_offsets: &'a [usize]) -> Self { + Self { + elements, + strings, + string_offsets, + num_rows: 0, + } + } } /// States based on diff --git a/arrow-json/src/reader/timestamp_array.rs b/arrow-json/src/reader/timestamp_array.rs index db5535477025..83b55f6c09bc 100644 --- a/arrow-json/src/reader/timestamp_array.rs +++ b/arrow-json/src/reader/timestamp_array.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +use std::sync::Arc; + use chrono::TimeZone; use std::marker::PhantomData; @@ -26,11 +28,12 @@ use arrow_data::ArrayData; use arrow_schema::{ArrowError, DataType, TimeUnit}; use crate::reader::tape::{Tape, TapeElement}; +use crate::reader::validation::{ErrorMarker, FailureKind}; use crate::reader::ArrayDecoder; /// A specialized [`ArrayDecoder`] for timestamps pub struct TimestampArrayDecoder { - data_type: DataType, + data_type: Arc, timezone: Tz, is_nullable: bool, // Invariant and Send @@ -40,7 +43,7 @@ pub struct TimestampArrayDecoder { impl TimestampArrayDecoder { pub fn new(data_type: DataType, timezone: Tz, is_nullable: bool) -> Self { Self { - data_type, + data_type: Arc::new(data_type), timezone, is_nullable, phantom: Default::default(), @@ -54,8 +57,8 @@ where Tz: TimeZone + Send, { fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { - let mut builder = - PrimitiveBuilder::

::with_capacity(pos.len()).with_data_type(self.data_type.clone()); + let mut builder = PrimitiveBuilder::

::with_capacity(pos.len()) + .with_data_type((*self.data_type).clone()); for p in pos { match tape.get(*p) { @@ -110,29 +113,51 @@ where Ok(builder.finish().into_data()) } - fn validate_row(&self, tape: &Tape<'_>, pos: u32) -> bool { - match tape.get(pos) { - TapeElement::Null => self.is_nullable, + fn validate_row<'tape>( + &'tape self, + tape: &'tape Tape<'_>, + pos: u32, + row_idx: usize, + ) -> Result<(), Vec>> { + let failure = match tape.get(pos) { + TapeElement::Null => { + if self.is_nullable { + return Ok(()); + } + FailureKind::NullValue + } TapeElement::String(idx) => { let s = tape.get_string(idx); - if let Ok(d) = string_to_datetime(&self.timezone, s) { + let valid = if let Ok(d) = string_to_datetime(&self.timezone, s) { match P::UNIT { TimeUnit::Nanosecond => d.timestamp_nanos_opt().is_some(), _ => true, } } else { false + }; + + if valid { + return Ok(()); } + FailureKind::ParseFailure } TapeElement::Number(idx) => { let s = tape.get_string(idx); let b = s.as_bytes(); - lexical_core::parse::(b) + let valid = lexical_core::parse::(b) .or_else(|_| lexical_core::parse::(b).map(|x| x as i64)) - .is_ok() + .is_ok(); + + if valid { + return Ok(()); + } + FailureKind::ParseFailure } - TapeElement::I32(_) | TapeElement::I64(_) => true, - _ => false, - } + TapeElement::I32(_) | TapeElement::I64(_) => return Ok(()), + _ => FailureKind::TypeMismatch, + }; + + ErrorMarker::err(row_idx, pos, failure, Arc::clone(&self.data_type)) } } diff --git a/arrow-json/src/reader/validation.rs b/arrow-json/src/reader/validation.rs new file mode 100644 index 000000000000..34439027fd36 --- /dev/null +++ b/arrow-json/src/reader/validation.rs @@ -0,0 +1,358 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Validation error collection for JSON deserialization + +use arrow_schema::DataType; +use std::sync::Arc; + +use super::tape::{Tape, TapeElement}; + +/// Default maximum number of errors to collect +pub const DEFAULT_MAX_ERRORS: usize = 1000; + +const MAX_VALUE_LENGTH: usize = 256; + +/// Type of validation failure +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum FailureKind { + /// Required field is not present in JSON object + MissingField, + /// Field has null value but is non-nullable + NullValue, + /// JSON type doesn't match expected schema type + TypeMismatch, + /// Value failed to parse into target type + ParseFailure, +} + +/// Error marker stored during validation +#[derive(Debug, Clone)] +pub struct ErrorMarker<'tape> { + /// Row index in the batch + pub row_index: usize, + /// Position in tape where error occurred + pub tape_pos: Option, + /// Field name extracted from tape + pub field_name: Option<&'tape str>, + /// Array indices from parent arrays (innermost to outermost) + pub array_indices: Vec, + /// Type of validation failure + pub error_kind: FailureKind, + /// Expected schema type + pub expected_type: Arc, +} + +impl ErrorMarker<'_> { + /// Create a validation error result with a single error marker + /// + /// This is a convenience method for the common case of returning a single + /// validation error with no field name or array indices. + pub fn err( + row_idx: usize, + pos: u32, + kind: FailureKind, + expected_type: Arc, + ) -> Result<(), Vec> { + Err(vec![Self { + row_index: row_idx, + tape_pos: Some(pos), + field_name: None, + array_indices: Vec::new(), + error_kind: kind, + expected_type, + }]) + } +} + +/// Detailed validation error with extracted context +#[derive(Debug, Clone)] +pub struct ValidationError { + /// Row index in the batch + pub row_index: usize, + /// Field path where error occurred (e.g., "items[2]", "user.age") + pub field_path: String, + /// Type of validation failure + pub failure_kind: FailureKind, + /// Expected schema type + pub expected_type: Arc, + /// Actual JSON type encountered + pub actual_type: Option, + /// Actual JSON value encountered + pub actual_value: Option, +} + +impl std::fmt::Display for ValidationError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "row {}, field '{}': ", self.row_index, self.field_path)?; + match self.failure_kind { + FailureKind::MissingField => { + write!(f, "required field is missing")?; + } + FailureKind::NullValue => { + write!(f, "null value for non-nullable field")?; + } + FailureKind::TypeMismatch | FailureKind::ParseFailure => { + if let Some(actual_type) = self.actual_type { + write!( + f, + "cannot deserialize {} value as {}", + actual_type, self.expected_type + )?; + } else { + write!(f, "cannot deserialize value as {}", self.expected_type)?; + } + if matches!(self.failure_kind, FailureKind::ParseFailure) { + write!(f, " (parse failure)")?; + } + } + } + if let Some(val) = &self.actual_value { + write!(f, ", got: {}", val)?; + } + Ok(()) + } +} + +/// JSON value type +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum JsonType { + /// JSON null + Null, + /// JSON boolean + Boolean, + /// JSON number + Number, + /// JSON string + String, + /// JSON array + Array, + /// JSON object + Object, +} + +impl std::fmt::Display for JsonType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + JsonType::Null => write!(f, "null"), + JsonType::Boolean => write!(f, "boolean"), + JsonType::Number => write!(f, "number"), + JsonType::String => write!(f, "string"), + JsonType::Array => write!(f, "array"), + JsonType::Object => write!(f, "object"), + } + } +} + +fn truncate_value(v: String) -> String { + if v.len() > MAX_VALUE_LENGTH { + format!("{}...", &v[..MAX_VALUE_LENGTH]) + } else { + v + } +} + +/// Extract string representation of tape element for error reporting +pub fn extract_value(tape: &Tape<'_>, pos: u32) -> Option { + Some(match tape.get(pos) { + TapeElement::Null => "null".to_string(), + TapeElement::True => "true".to_string(), + TapeElement::False => "false".to_string(), + TapeElement::String(idx) => format!("\"{}\"", tape.get_string(idx)), + TapeElement::Number(idx) => tape.get_string(idx).to_string(), + TapeElement::I32(v) => v.to_string(), + TapeElement::F32(v) => f32::from_bits(v).to_string(), + TapeElement::I64(high) => match tape.get(pos + 1) { + TapeElement::I32(low) => { + let v = ((high as i64) << 32) | (low as u32) as i64; + v.to_string() + } + _ => return None, + }, + TapeElement::F64(high) => match tape.get(pos + 1) { + TapeElement::F32(low) => { + let v = f64::from_bits(((high as u64) << 32) | low as u64); + v.to_string() + } + _ => return None, + }, + TapeElement::StartObject(_) => "{...}".to_string(), + TapeElement::StartList(_) => "[...]".to_string(), + _ => return None, + }) +} + +/// Convert TapeElement to JsonType +pub fn tape_element_type(tape: &Tape<'_>, pos: u32) -> JsonType { + match tape.get(pos) { + TapeElement::Null => JsonType::Null, + TapeElement::True | TapeElement::False => JsonType::Boolean, + TapeElement::Number(_) + | TapeElement::I32(_) + | TapeElement::F32(_) + | TapeElement::I64(_) + | TapeElement::F64(_) => JsonType::Number, + TapeElement::String(_) => JsonType::String, + TapeElement::StartList(_) => JsonType::Array, + TapeElement::StartObject(_) => JsonType::Object, + _ => JsonType::Null, + } +} + +/// Extract field name from tape at error position +/// +/// In JSON objects, the tape structure is: [StartObject, String(field_name), value, ...] +/// So the field name precedes its value by one position. +/// Returns None for array elements, invalid positions, or if field name cannot be extracted. +fn extract_field_name_from_tape(tape: &Tape<'_>, error_pos: Option) -> Option { + if let Some(pos) = error_pos { + if pos > 0 { + if let TapeElement::String(idx) = tape.get(pos - 1) { + return Some(tape.get_string(idx).to_string()); + } + } + } + None +} + +/// Build detailed errors from markers collected during validation +pub fn build_detailed_errors( + tape: &Tape<'_>, + markers: Vec>, +) -> Vec { + markers + .into_iter() + .map(|marker| { + let field_path = build_field_path(&marker, tape); + + let actual_type = marker.tape_pos.map(|pos| tape_element_type(tape, pos)); + + let actual_value = marker + .tape_pos + .and_then(|pos| extract_value(tape, pos)) + .map(truncate_value); + + ValidationError { + row_index: marker.row_index, + field_path, + failure_kind: marker.error_kind, + expected_type: marker.expected_type, + actual_type, + actual_value, + } + }) + .collect() +} + +/// Build field path from marker +fn build_field_path(marker: &ErrorMarker<'_>, tape: &Tape<'_>) -> String { + let base_name = marker + .field_name + .map(|s| s.to_string()) + .or_else(|| extract_field_name_from_tape(tape, marker.tape_pos)); + + if marker.array_indices.is_empty() { + base_name.unwrap_or_else(|| "".to_string()) + } else { + let indices = marker + .array_indices + .iter() + .rev() + .map(|i| format!("[{}]", i)) + .collect::(); + + match base_name { + Some(name) => format!("{}{}", name, indices), + None => indices, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_build_field_path_simple() { + let marker = ErrorMarker { + row_index: 0, + tape_pos: Some(0), + field_name: Some("age"), + array_indices: Vec::new(), + error_kind: FailureKind::MissingField, + expected_type: Arc::new(DataType::Int32), + }; + + let path = build_field_path(&marker, &crate::reader::tape::Tape::new(&[], "", &[])); + assert_eq!(path, "age"); + } + + #[test] + fn test_build_field_path_with_array_indices() { + let marker = ErrorMarker { + row_index: 0, + tape_pos: Some(0), + field_name: Some("items"), + array_indices: vec![2, 5], // innermost first, reversed in output + error_kind: FailureKind::ParseFailure, + expected_type: Arc::new(DataType::Int32), + }; + + let path = build_field_path(&marker, &crate::reader::tape::Tape::new(&[], "", &[])); + assert_eq!(path, "items[5][2]"); + } + + #[test] + fn test_build_field_path_array_only() { + let marker = ErrorMarker { + row_index: 0, + tape_pos: Some(0), + field_name: None, + array_indices: vec![1, 2], + error_kind: FailureKind::ParseFailure, + expected_type: Arc::new(DataType::Int32), + }; + + let path = build_field_path(&marker, &crate::reader::tape::Tape::new(&[], "", &[])); + assert_eq!(path, "[2][1]"); + } + + #[test] + fn test_build_field_path_unknown() { + let marker = ErrorMarker { + row_index: 0, + tape_pos: Some(0), + field_name: None, + array_indices: Vec::new(), + error_kind: FailureKind::TypeMismatch, + expected_type: Arc::new(DataType::Int32), + }; + + let path = build_field_path(&marker, &crate::reader::tape::Tape::new(&[], "", &[])); + assert_eq!(path, ""); + } + + #[test] + fn test_json_type_display() { + assert_eq!(JsonType::Null.to_string(), "null"); + assert_eq!(JsonType::Boolean.to_string(), "boolean"); + assert_eq!(JsonType::Number.to_string(), "number"); + assert_eq!(JsonType::String.to_string(), "string"); + assert_eq!(JsonType::Array.to_string(), "array"); + assert_eq!(JsonType::Object.to_string(), "object"); + } +}