Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions arrow-avro/src/writer/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3380,4 +3380,50 @@ mod tests {
expected_row2.extend(avro_long_bytes(3)); // value
assert_bytes_eq(row_slice(&out, &offsets, 2), &expected_row2);
}

#[test]
fn encode_prefix_write_error() {
use crate::codec::AvroFieldBuilder;
use crate::schema::{AvroSchema, FingerprintAlgorithm};
use std::io;

struct FailWriter {
failed: bool,
}

impl io::Write for FailWriter {
fn write(&mut self, _buf: &[u8]) -> io::Result<usize> {
if !self.failed {
self.failed = true;
Err(io::Error::new(io::ErrorKind::Other, "fail write"))
} else {
Ok(0)
}
}

fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}

let schema = ArrowSchema::new(vec![Field::new("x", DataType::Int32, false)]);
let arr = Int32Array::from(vec![42]);
let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(arr)]).unwrap();
let avro_schema = AvroSchema::try_from(&schema).unwrap();
let fingerprint = avro_schema
.fingerprint(FingerprintAlgorithm::Rabin)
.unwrap();
let avro_root = AvroFieldBuilder::new(&avro_schema.schema().unwrap())
.build()
.unwrap();
let encoder = RecordEncoderBuilder::new(&avro_root, &schema)
.with_fingerprint(Some(fingerprint))
.build()
.unwrap();

let mut writer = FailWriter { failed: false };
let err = encoder.encode(&mut writer, &batch).unwrap_err();
let msg = format!("{err}");
assert!(msg.contains("write prefix"), "unexpected error: {msg}");
}
}
44 changes: 44 additions & 0 deletions arrow-avro/src/writer/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,50 @@ impl AvroFormat for AvroBinaryFormat {
}
}

#[cfg(test)]
mod tests {
use super::*;
use arrow_schema::{DataType, Field, Schema};

fn test_schema() -> Schema {
Schema::new(vec![Field::new("x", DataType::Int32, false)])
}

#[test]
fn avro_binary_format_rejects_compression() {
let mut format = AvroBinaryFormat::default();
let schema = test_schema();
let err = format
.start_stream(
&mut Vec::<u8>::new(),
&schema,
Some(CompressionCodec::Snappy),
)
.unwrap_err();
assert!(
err.to_string()
.contains("Compression not supported for Avro binary streaming")
);
}

#[test]
fn avro_soe_format_rejects_compression() {
let mut format = AvroSoeFormat::default();
let schema = test_schema();
let err = format
.start_stream(
&mut Vec::<u8>::new(),
&schema,
Some(CompressionCodec::Snappy),
)
.unwrap_err();
assert!(
err.to_string()
.contains("Compression not supported for Avro SOE streaming")
);
}
}

#[inline]
fn write_string<W: Write>(writer: &mut W, s: &str) -> Result<(), ArrowError> {
write_bytes(writer, s.as_bytes())
Expand Down
Loading