Skip to content

Commit 4e12a1f

Browse files
committed
Merge branch 'main' into avro-row-encoder
2 parents 504cf95 + 4cd2d14 commit 4e12a1f

File tree

15 files changed

+703
-572
lines changed

15 files changed

+703
-572
lines changed

arrow-avro/src/compression.rs

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use crate::errors::AvroError;
1819
use arrow_schema::ArrowError;
1920
#[cfg(any(
2021
feature = "deflate",
@@ -47,7 +48,7 @@ pub enum CompressionCodec {
4748

4849
impl CompressionCodec {
4950
#[allow(unused_variables)]
50-
pub(crate) fn decompress(&self, block: &[u8]) -> Result<Vec<u8>, ArrowError> {
51+
pub(crate) fn decompress(&self, block: &[u8]) -> Result<Vec<u8>, AvroError> {
5152
match self {
5253
#[cfg(feature = "deflate")]
5354
CompressionCodec::Deflate => {
@@ -57,7 +58,7 @@ impl CompressionCodec {
5758
Ok(out)
5859
}
5960
#[cfg(not(feature = "deflate"))]
60-
CompressionCodec::Deflate => Err(ArrowError::ParseError(
61+
CompressionCodec::Deflate => Err(AvroError::ParseError(
6162
"Deflate codec requires deflate feature".to_string(),
6263
)),
6364
#[cfg(feature = "snappy")]
@@ -70,50 +71,56 @@ impl CompressionCodec {
7071
let mut decoder = snap::raw::Decoder::new();
7172
let decoded = decoder
7273
.decompress_vec(block)
73-
.map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
74+
.map_err(|e| AvroError::External(Box::new(e)))?;
7475

7576
let checksum = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC).checksum(&decoded);
7677
if checksum != u32::from_be_bytes(crc.try_into().unwrap()) {
77-
return Err(ArrowError::ParseError("Snappy CRC mismatch".to_string()));
78+
return Err(AvroError::ParseError("Snappy CRC mismatch".to_string()));
7879
}
7980
Ok(decoded)
8081
}
8182
#[cfg(not(feature = "snappy"))]
82-
CompressionCodec::Snappy => Err(ArrowError::ParseError(
83+
CompressionCodec::Snappy => Err(AvroError::ParseError(
8384
"Snappy codec requires snappy feature".to_string(),
8485
)),
8586

8687
#[cfg(feature = "zstd")]
8788
CompressionCodec::ZStandard => {
8889
let mut decoder = zstd::Decoder::new(block)?;
8990
let mut out = Vec::new();
90-
decoder.read_to_end(&mut out)?;
91+
decoder
92+
.read_to_end(&mut out)
93+
.map_err(|e| AvroError::External(Box::new(e)))?;
9194
Ok(out)
9295
}
9396
#[cfg(not(feature = "zstd"))]
94-
CompressionCodec::ZStandard => Err(ArrowError::ParseError(
97+
CompressionCodec::ZStandard => Err(AvroError::ParseError(
9598
"ZStandard codec requires zstd feature".to_string(),
9699
)),
97100
#[cfg(feature = "bzip2")]
98101
CompressionCodec::Bzip2 => {
99102
let mut decoder = bzip2::read::BzDecoder::new(block);
100103
let mut out = Vec::new();
101-
decoder.read_to_end(&mut out)?;
104+
decoder
105+
.read_to_end(&mut out)
106+
.map_err(|e| AvroError::External(Box::new(e)))?;
102107
Ok(out)
103108
}
104109
#[cfg(not(feature = "bzip2"))]
105-
CompressionCodec::Bzip2 => Err(ArrowError::ParseError(
110+
CompressionCodec::Bzip2 => Err(AvroError::ParseError(
106111
"Bzip2 codec requires bzip2 feature".to_string(),
107112
)),
108113
#[cfg(feature = "xz")]
109114
CompressionCodec::Xz => {
110115
let mut decoder = xz::read::XzDecoder::new(block);
111116
let mut out = Vec::new();
112-
decoder.read_to_end(&mut out)?;
117+
decoder
118+
.read_to_end(&mut out)
119+
.map_err(|e| AvroError::External(Box::new(e)))?;
113120
Ok(out)
114121
}
115122
#[cfg(not(feature = "xz"))]
116-
CompressionCodec::Xz => Err(ArrowError::ParseError(
123+
CompressionCodec::Xz => Err(AvroError::ParseError(
117124
"XZ codec requires xz feature".to_string(),
118125
)),
119126
}

arrow-avro/src/errors.rs

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Common Avro errors and macros.
19+
20+
use arrow_schema::ArrowError;
21+
use core::num::TryFromIntError;
22+
use std::error::Error;
23+
use std::string::FromUtf8Error;
24+
use std::{io, str};
25+
26+
/// Avro error enumeration
27+
28+
#[derive(Debug)]
29+
#[non_exhaustive]
30+
pub enum AvroError {
31+
/// General Avro error.
32+
/// Returned when code violates normal workflow of working with Avro data.
33+
General(String),
34+
/// "Not yet implemented" Avro error.
35+
/// Returned when functionality is not yet available.
36+
NYI(String),
37+
/// "End of file" Avro error.
38+
/// Returned when IO related failures occur, e.g. when there are not enough bytes to
39+
/// decode.
40+
EOF(String),
41+
/// Arrow error.
42+
/// Returned when reading into arrow or writing from arrow.
43+
ArrowError(Box<ArrowError>),
44+
/// Error when the requested index is more than the
45+
/// number of items expected
46+
IndexOutOfBound(usize, usize),
47+
/// Error indicating that an unexpected or bad argument was passed to a function.
48+
InvalidArgument(String),
49+
/// Error indicating that a value could not be parsed.
50+
ParseError(String),
51+
/// Error indicating that a schema is invalid.
52+
SchemaError(String),
53+
/// An external error variant
54+
External(Box<dyn Error + Send + Sync>),
55+
/// Error during IO operations
56+
IoError(String, io::Error),
57+
/// Returned when a function needs more data to complete properly. The `usize` field indicates
58+
/// the total number of bytes required, not the number of additional bytes.
59+
NeedMoreData(usize),
60+
/// Returned when a function needs more data to complete properly.
61+
/// The `Range<u64>` indicates the range of bytes that are needed.
62+
NeedMoreDataRange(std::ops::Range<u64>),
63+
}
64+
65+
impl std::fmt::Display for AvroError {
66+
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
67+
match &self {
68+
AvroError::General(message) => {
69+
write!(fmt, "Avro error: {message}")
70+
}
71+
AvroError::NYI(message) => write!(fmt, "NYI: {message}"),
72+
AvroError::EOF(message) => write!(fmt, "EOF: {message}"),
73+
AvroError::ArrowError(message) => write!(fmt, "Arrow: {message}"),
74+
AvroError::IndexOutOfBound(index, bound) => {
75+
write!(fmt, "Index {index} out of bound: {bound}")
76+
}
77+
AvroError::InvalidArgument(message) => {
78+
write!(fmt, "Invalid argument: {message}")
79+
}
80+
AvroError::ParseError(message) => write!(fmt, "Parser error: {message}"),
81+
AvroError::SchemaError(message) => write!(fmt, "Schema error: {message}"),
82+
AvroError::External(e) => write!(fmt, "External: {e}"),
83+
AvroError::IoError(message, e) => write!(fmt, "I/O Error: {message}: {e}"),
84+
AvroError::NeedMoreData(needed) => write!(fmt, "NeedMoreData: {needed}"),
85+
AvroError::NeedMoreDataRange(range) => {
86+
write!(fmt, "NeedMoreDataRange: {}..{}", range.start, range.end)
87+
}
88+
}
89+
}
90+
}
91+
92+
impl Error for AvroError {
93+
fn source(&self) -> Option<&(dyn Error + 'static)> {
94+
match self {
95+
AvroError::External(e) => Some(e.as_ref()),
96+
AvroError::ArrowError(e) => Some(e.as_ref()),
97+
AvroError::IoError(_, e) => Some(e),
98+
_ => None,
99+
}
100+
}
101+
}
102+
103+
impl From<TryFromIntError> for AvroError {
104+
fn from(e: TryFromIntError) -> AvroError {
105+
AvroError::General(format!("Integer overflow: {e}"))
106+
}
107+
}
108+
109+
impl From<io::Error> for AvroError {
110+
fn from(e: io::Error) -> AvroError {
111+
AvroError::External(Box::new(e))
112+
}
113+
}
114+
115+
impl From<str::Utf8Error> for AvroError {
116+
fn from(e: str::Utf8Error) -> AvroError {
117+
AvroError::External(Box::new(e))
118+
}
119+
}
120+
121+
impl From<FromUtf8Error> for AvroError {
122+
fn from(e: FromUtf8Error) -> AvroError {
123+
AvroError::External(Box::new(e))
124+
}
125+
}
126+
127+
impl From<ArrowError> for AvroError {
128+
fn from(e: ArrowError) -> Self {
129+
AvroError::ArrowError(Box::new(e))
130+
}
131+
}
132+
133+
impl From<AvroError> for io::Error {
134+
fn from(e: AvroError) -> Self {
135+
io::Error::other(e)
136+
}
137+
}
138+
139+
impl From<AvroError> for ArrowError {
140+
fn from(e: AvroError) -> Self {
141+
match e {
142+
AvroError::External(inner) => ArrowError::from_external_error(inner),
143+
AvroError::IoError(msg, err) => ArrowError::IoError(msg, err),
144+
AvroError::ArrowError(inner) => *inner,
145+
other => ArrowError::AvroError(other.to_string()),
146+
}
147+
}
148+
}

arrow-avro/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,9 @@ pub mod compression;
195195
/// Avro data types and Arrow data types.
196196
pub mod codec;
197197

198+
/// AvroError variants
199+
pub mod errors;
200+
198201
/// Extension trait for AvroField to add Utf8View support
199202
///
200203
/// This trait adds methods for working with Utf8View support to the AvroField struct.

arrow-avro/src/reader/block.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717

1818
//! Decoder for [`Block`]
1919
20+
use crate::errors::AvroError;
2021
use crate::reader::vlq::VLQDecoder;
21-
use arrow_schema::ArrowError;
2222

2323
/// A file data block
2424
///
@@ -75,14 +75,14 @@ impl BlockDecoder {
7575
/// can then be used again to read the next block, if any
7676
///
7777
/// [`BufRead::fill_buf`]: std::io::BufRead::fill_buf
78-
pub fn decode(&mut self, mut buf: &[u8]) -> Result<usize, ArrowError> {
78+
pub fn decode(&mut self, mut buf: &[u8]) -> Result<usize, AvroError> {
7979
let max_read = buf.len();
8080
while !buf.is_empty() {
8181
match self.state {
8282
BlockDecoderState::Count => {
8383
if let Some(c) = self.vlq_decoder.long(&mut buf) {
8484
self.in_progress.count = c.try_into().map_err(|_| {
85-
ArrowError::ParseError(format!(
85+
AvroError::ParseError(format!(
8686
"Block count cannot be negative, got {c}"
8787
))
8888
})?;
@@ -93,9 +93,7 @@ impl BlockDecoder {
9393
BlockDecoderState::Size => {
9494
if let Some(c) = self.vlq_decoder.long(&mut buf) {
9595
self.bytes_remaining = c.try_into().map_err(|_| {
96-
ArrowError::ParseError(format!(
97-
"Block size cannot be negative, got {c}"
98-
))
96+
AvroError::ParseError(format!("Block size cannot be negative, got {c}"))
9997
})?;
10098

10199
self.in_progress.data.reserve(self.bytes_remaining);

0 commit comments

Comments
 (0)