Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 31 additions & 9 deletions arrow-json/src/reader/binary_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,29 @@
// specific language governing permissions and limitations
// under the License.

use std::marker::PhantomData;
use std::sync::Arc;

use crate::reader::tape::{Tape, TapeElement};
use crate::reader::validation::{ErrorMarker, FailureKind};
use crate::reader::ArrayDecoder;
use arrow_array::builder::GenericBinaryBuilder;
use arrow_array::{Array, GenericStringArray, OffsetSizeTrait};
use arrow_array::{Array, GenericBinaryArray, GenericStringArray, OffsetSizeTrait};
use arrow_data::ArrayData;
use arrow_schema::ArrowError;
use arrow_schema::{ArrowError, DataType};
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use std::marker::PhantomData;

pub struct BinaryArrayDecoder<O: OffsetSizeTrait> {
data_type: Arc<DataType>,
is_nullable: bool,
phantom: PhantomData<O>,
}

impl<O: OffsetSizeTrait> BinaryArrayDecoder<O> {
pub fn new(is_nullable: bool) -> Self {
Self {
data_type: Arc::new(GenericBinaryArray::<O>::DATA_TYPE),
is_nullable,
phantom: Default::default(),
}
Expand Down Expand Up @@ -80,11 +85,28 @@ impl<O: OffsetSizeTrait> ArrayDecoder for BinaryArrayDecoder<O> {
Ok(builder.finish().into_data())
}

fn validate_row(&self, tape: &Tape<'_>, pos: u32) -> bool {
match tape.get(pos) {
TapeElement::String(p) => BASE64_STANDARD.decode(&tape.get_string(p)).is_ok(),
TapeElement::Null => self.is_nullable,
_ => false,
}
fn validate_row<'tape>(
&'tape self,
tape: &'tape Tape<'_>,
pos: u32,
row_idx: usize,
) -> Result<(), Vec<ErrorMarker<'tape>>> {
let failure = match tape.get(pos) {
TapeElement::String(p) => {
if BASE64_STANDARD.decode(tape.get_string(p)).is_ok() {
return Ok(());
}
FailureKind::ParseFailure
}
TapeElement::Null => {
if self.is_nullable {
return Ok(());
}
FailureKind::NullValue
}
_ => FailureKind::TypeMismatch,
};

ErrorMarker::err(row_idx, pos, failure, Arc::clone(&self.data_type))
}
}
35 changes: 27 additions & 8 deletions arrow-json/src/reader/boolean_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,28 @@
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use arrow_array::builder::BooleanBuilder;
use arrow_array::Array;
use arrow_data::ArrayData;
use arrow_schema::ArrowError;
use arrow_schema::{ArrowError, DataType};

use crate::reader::tape::{Tape, TapeElement};
use crate::reader::validation::{ErrorMarker, FailureKind};
use crate::reader::ArrayDecoder;

pub struct BooleanArrayDecoder {
data_type: Arc<DataType>,
is_nullable: bool,
}

impl BooleanArrayDecoder {
pub fn new(is_nullable: bool) -> Self {
Self { is_nullable }
Self {
data_type: Arc::new(DataType::Boolean),
is_nullable,
}
}
}

Expand All @@ -48,11 +55,23 @@ impl ArrayDecoder for BooleanArrayDecoder {
Ok(builder.finish().into_data())
}

fn validate_row(&self, tape: &Tape<'_>, pos: u32) -> bool {
match tape.get(pos) {
TapeElement::Null => self.is_nullable,
TapeElement::True | TapeElement::False => true,
_ => false,
}
fn validate_row<'tape>(
&'tape self,
tape: &'tape Tape<'_>,
pos: u32,
row_idx: usize,
) -> Result<(), Vec<ErrorMarker<'tape>>> {
let failure = match tape.get(pos) {
TapeElement::True | TapeElement::False => return Ok(()),
TapeElement::Null => {
if self.is_nullable {
return Ok(());
}
FailureKind::NullValue
}
_ => FailureKind::TypeMismatch,
};

ErrorMarker::err(row_idx, pos, failure, Arc::clone(&self.data_type))
}
}
38 changes: 30 additions & 8 deletions arrow-json/src/reader/decimal_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,21 @@
// under the License.

use std::marker::PhantomData;
use std::sync::Arc;

use arrow_array::builder::PrimitiveBuilder;
use arrow_array::types::DecimalType;
use arrow_array::Array;
use arrow_cast::parse::parse_decimal;
use arrow_data::ArrayData;
use arrow_schema::ArrowError;
use arrow_schema::{ArrowError, DataType};

use crate::reader::tape::{Tape, TapeElement};
use crate::reader::validation::{ErrorMarker, FailureKind};
use crate::reader::ArrayDecoder;

pub struct DecimalArrayDecoder<D: DecimalType> {
data_type: Arc<DataType>,
precision: u8,
scale: i8,
is_nullable: bool,
Expand All @@ -38,6 +41,7 @@ pub struct DecimalArrayDecoder<D: DecimalType> {
impl<D: DecimalType> DecimalArrayDecoder<D> {
pub fn new(precision: u8, scale: i8, is_nullable: bool) -> Self {
Self {
data_type: Arc::new(D::TYPE_CONSTRUCTOR(precision, scale)),
precision,
scale,
is_nullable,
Expand Down Expand Up @@ -102,18 +106,36 @@ where
.into_data())
}

fn validate_row(&self, tape: &Tape<'_>, pos: u32) -> bool {
match tape.get(pos) {
TapeElement::Null => self.is_nullable,
fn validate_row<'tape>(
&'tape self,
tape: &'tape Tape<'_>,
pos: u32,
row_idx: usize,
) -> Result<(), Vec<ErrorMarker<'tape>>> {
let failure = match tape.get(pos) {
TapeElement::Null => {
if self.is_nullable {
return Ok(());
}
FailureKind::NullValue
}
TapeElement::String(idx) => {
let s = tape.get_string(idx);
parse_decimal::<D>(s, self.precision, self.scale).is_ok()
if parse_decimal::<D>(s, self.precision, self.scale).is_ok() {
return Ok(());
}
FailureKind::ParseFailure
}
TapeElement::Number(idx) => {
let s = tape.get_string(idx);
parse_decimal::<D>(s, self.precision, self.scale).is_ok()
if parse_decimal::<D>(s, self.precision, self.scale).is_ok() {
return Ok(());
}
FailureKind::ParseFailure
}
_ => false,
}
_ => FailureKind::TypeMismatch,
};

ErrorMarker::err(row_idx, pos, failure, Arc::clone(&self.data_type))
}
}
9 changes: 7 additions & 2 deletions arrow-json/src/reader/json_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,12 @@ impl ArrayDecoder for JsonArrayDecoder {
Ok(builder.finish().into_data())
}

fn validate_row(&self, _: &Tape<'_>, _: u32) -> bool {
true
fn validate_row<'tape>(
&'tape self,
_tape: &'tape Tape<'_>,
_pos: u32,
_row_idx: usize,
) -> Result<(), Vec<super::ErrorMarker<'tape>>> {
Ok(())
}
}
74 changes: 57 additions & 17 deletions arrow-json/src/reader/list_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use crate::reader::tape::{Tape, TapeElement};
use crate::reader::validation::{ErrorMarker, FailureKind};
use crate::reader::{make_decoder, ArrayDecoder};
use crate::StructMode;
use arrow_array::builder::{BooleanBufferBuilder, BufferBuilder};
Expand All @@ -26,7 +29,7 @@ use arrow_schema::{ArrowError, DataType};
use std::marker::PhantomData;

pub struct ListArrayDecoder<O> {
data_type: DataType,
data_type: Arc<DataType>,
decoder: Box<dyn ArrayDecoder>,
phantom: PhantomData<O>,
is_nullable: bool,
Expand Down Expand Up @@ -55,7 +58,7 @@ impl<O: OffsetSizeTrait> ListArrayDecoder<O> {
)?;

Ok(Self {
data_type,
data_type: Arc::new(data_type),
decoder,
phantom: Default::default(),
is_nullable,
Expand Down Expand Up @@ -104,7 +107,7 @@ impl<O: OffsetSizeTrait> ArrayDecoder for ListArrayDecoder<O> {
let child_data = self.decoder.decode(tape, &child_pos)?;
let nulls = nulls.as_mut().map(|x| NullBuffer::new(x.finish()));

let data = ArrayDataBuilder::new(self.data_type.clone())
let data = ArrayDataBuilder::new((*self.data_type).clone())
.len(pos.len())
.nulls(nulls)
.add_buffer(offsets.finish())
Expand All @@ -115,28 +118,65 @@ impl<O: OffsetSizeTrait> ArrayDecoder for ListArrayDecoder<O> {
Ok(unsafe { data.build_unchecked() })
}

fn validate_row(&self, tape: &Tape<'_>, pos: u32) -> bool {
let end_idx = match (tape.get(pos), self.is_nullable) {
(TapeElement::StartList(end_idx), _) => end_idx,
(TapeElement::Null, true) => {
return true;
fn validate_row<'tape>(
&'tape self,
tape: &'tape Tape<'_>,
pos: u32,
row_idx: usize,
) -> Result<(), Vec<ErrorMarker<'tape>>> {
let end_idx = match tape.get(pos) {
TapeElement::StartList(end_idx) => end_idx,
TapeElement::Null => {
if self.is_nullable {
return Ok(());
} else {
return ErrorMarker::err(
row_idx,
pos,
FailureKind::NullValue,
Arc::clone(&self.data_type),
);
}
}
_ => {
return ErrorMarker::err(
row_idx,
pos,
FailureKind::TypeMismatch,
Arc::clone(&self.data_type),
);
}
_ => return false,
};

let mut cur_idx = pos + 1;
let mut element_idx = 0;
while cur_idx < end_idx {
if !self.decoder.validate_row(tape, cur_idx) {
return false;
match self.decoder.validate_row(tape, cur_idx, row_idx) {
Ok(()) => {}
Err(mut child_errors) => {
for error in &mut child_errors {
error.array_indices.push(element_idx);
}
return Err(child_errors);
}
}
// Advance to next field
if let Ok(next) = tape.next(cur_idx, "list value") {
cur_idx = next;
} else {
return false;

match tape.next(cur_idx, "list value") {
Ok(next) => {
cur_idx = next;
element_idx += 1;
}
Err(_) => {
return ErrorMarker::err(
row_idx,
cur_idx,
FailureKind::TypeMismatch,
Arc::clone(&self.data_type),
);
}
}
}

true
Ok(())
}
}
Loading
Loading