Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 89 additions & 11 deletions arrow-ipc/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,18 +180,21 @@ impl RecordBatchDecoder<'_> {
ArrowError::ParseError(format!("Field {field} does not have dict id"))
})?;

let value_array = self.dictionaries_by_id.get(&dict_id).ok_or_else(|| {
ArrowError::ParseError(format!(
"Cannot find a dictionary batch with dict id: {dict_id}"
))
})?;
let value_array = match self.dictionaries_by_id.get(&dict_id) {
Some(array) => array.clone(),
None => {
// Per the IPC spec, dictionary batches may be omitted when all
// values in the column are null. In that case we synthesize an
// empty values array so decoding can proceed.
if let Dictionary(_, value_type) = data_type {
arrow_array::new_empty_array(value_type.as_ref())
} else {
unreachable!()
}
}
};

self.create_dictionary_array(
index_node,
data_type,
&index_buffers,
value_array.clone(),
)
self.create_dictionary_array(index_node, data_type, &index_buffers, value_array)
}
Union(fields, mode) => {
let union_node = self.next_node(field)?;
Expand Down Expand Up @@ -3248,4 +3251,79 @@ mod tests {
let reader = StreamReader::try_new(Cursor::new(buf), None);
assert!(reader.is_err());
}

/// Per the IPC specification, dictionary batches may be omitted for
/// dictionary-encoded columns where all values are null. The C++
/// implementation relies on this and does not emit a dictionary batch
/// in that case. Verify that the Rust reader handles such streams
/// by synthesizing an empty dictionary instead of returning an error.
#[test]
fn test_read_null_dict_without_dictionary_batch() {
// Build an all-null dictionary-encoded column.
let keys = Int32Array::new_null(4);
let values: ArrayRef = new_empty_array(&DataType::Utf8);
let dict_array = DictionaryArray::new(keys, values);

let schema = Arc::new(Schema::new(vec![Field::new(
"d",
dict_array.data_type().clone(),
true,
)]));
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(dict_array)]).unwrap();

// Write a normal IPC stream (which includes the dictionary batch).
let full_stream = write_stream(&batch);

// Parse the stream into individual messages and reconstruct it
// without the DictionaryBatch message, simulating what C++ emits
// for an all-null dictionary column.
let mut stripped = Vec::new();
let mut cursor = Cursor::new(&full_stream);
loop {
// Each message is: [continuation (4 bytes)] [meta_len (4 bytes)]
// [metadata (meta_len bytes)] [body (bodyLength bytes)]
let mut header = [0u8; 4];
if cursor.read_exact(&mut header).is_err() {
break;
}
if header == CONTINUATION_MARKER && cursor.read_exact(&mut header).is_err() {
break;
}
let meta_len = u32::from_le_bytes(header) as usize;
if meta_len == 0 {
// EOS marker — write it through.
stripped.extend_from_slice(&CONTINUATION_MARKER);
stripped.extend_from_slice(&0u32.to_le_bytes());
break;
}
let mut meta_buf = vec![0u8; meta_len];
cursor.read_exact(&mut meta_buf).unwrap();

let message = root_as_message(&meta_buf).unwrap();
let body_len = message.bodyLength() as usize;
let mut body_buf = vec![0u8; body_len];
cursor.read_exact(&mut body_buf).unwrap();

if message.header_type() == crate::MessageHeader::DictionaryBatch {
// Skip the dictionary batch — this is what C++ does for
// all-null dictionary columns.
continue;
}
stripped.extend_from_slice(&CONTINUATION_MARKER);
stripped.extend_from_slice(&(meta_len as u32).to_le_bytes());
stripped.extend_from_slice(&meta_buf);
stripped.extend_from_slice(&body_buf);
}

// Reading the stripped stream must succeed.
let result = read_stream(&stripped).unwrap();
assert_eq!(result.num_rows(), 4);
assert_eq!(result.num_columns(), 1);

let col = result.column(0);
assert_eq!(col.null_count(), 4);
assert_eq!(col.len(), 4);
// The result must be a dictionary-typed array.
assert!(matches!(col.data_type(), DataType::Dictionary(_, _)));
}
}
Loading