From 05fd5167a3a5e5f42f3a0f6a675af8ac1e8fc8ba Mon Sep 17 00:00:00 2001 From: Joaquin Hui Gomez <132194176+joaquinhuigomez@users.noreply.github.com> Date: Sun, 29 Mar 2026 16:52:22 +0100 Subject: [PATCH 1/3] fix: handle missing dictionary batch for null-only columns in IPC reader Per the IPC specification, dictionary batches may be omitted when all values in a dictionary-encoded column are null. The C++ implementation (Arrow C++ 17+) relies on this and does not emit a dictionary batch in such cases, which caused the Rust IPC reader to fail with: "Cannot find a dictionary batch with dict id: ..." When a dictionary batch is missing, synthesize an empty values array of the appropriate type so decoding can proceed. --- arrow-ipc/src/reader.rs | 98 ++++++++++++++++++++++++++++++++++++++--- 1 file changed, 92 insertions(+), 6 deletions(-) diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs index 6a5dc707d71b..ba17b27d6b27 100644 --- a/arrow-ipc/src/reader.rs +++ b/arrow-ipc/src/reader.rs @@ -180,17 +180,25 @@ impl RecordBatchDecoder<'_> { ArrowError::ParseError(format!("Field {field} does not have dict id")) })?; - let value_array = self.dictionaries_by_id.get(&dict_id).ok_or_else(|| { - ArrowError::ParseError(format!( - "Cannot find a dictionary batch with dict id: {dict_id}" - )) - })?; + let value_array = match self.dictionaries_by_id.get(&dict_id) { + Some(array) => array.clone(), + None => { + // Per the IPC spec, dictionary batches may be omitted when all + // values in the column are null. In that case we synthesize an + // empty values array so decoding can proceed. + if let Dictionary(_, value_type) = data_type { + arrow_array::new_empty_array(value_type.as_ref()) + } else { + unreachable!() + } + } + }; self.create_dictionary_array( index_node, data_type, &index_buffers, - value_array.clone(), + value_array, ) } Union(fields, mode) => { @@ -3248,4 +3256,82 @@ mod tests { let reader = StreamReader::try_new(Cursor::new(buf), None); assert!(reader.is_err()); } + + /// Per the IPC specification, dictionary batches may be omitted for + /// dictionary-encoded columns where all values are null. The C++ + /// implementation relies on this and does not emit a dictionary batch + /// in that case. Verify that the Rust reader handles such streams + /// by synthesizing an empty dictionary instead of returning an error. + #[test] + fn test_read_null_dict_without_dictionary_batch() { + // Build an all-null dictionary-encoded column. + let keys = Int32Array::new_null(4); + let values: ArrayRef = new_empty_array(&DataType::Utf8); + let dict_array = DictionaryArray::new(keys, values); + + let schema = Arc::new(Schema::new(vec![Field::new( + "d", + dict_array.data_type().clone(), + true, + )])); + let batch = + RecordBatch::try_new(schema.clone(), vec![Arc::new(dict_array)]).unwrap(); + + // Write a normal IPC stream (which includes the dictionary batch). + let full_stream = write_stream(&batch); + + // Parse the stream into individual messages and reconstruct it + // without the DictionaryBatch message, simulating what C++ emits + // for an all-null dictionary column. + let mut stripped = Vec::new(); + let mut cursor = Cursor::new(&full_stream); + loop { + // Each message is: [continuation (4 bytes)] [meta_len (4 bytes)] + // [metadata (meta_len bytes)] [body (bodyLength bytes)] + let mut header = [0u8; 4]; + if std::io::Read::read_exact(&mut cursor, &mut header).is_err() { + break; + } + if header == CONTINUATION_MARKER { + if std::io::Read::read_exact(&mut cursor, &mut header).is_err() { + break; + } + } + let meta_len = u32::from_le_bytes(header) as usize; + if meta_len == 0 { + // EOS marker — write it through. + stripped.extend_from_slice(&CONTINUATION_MARKER); + stripped.extend_from_slice(&0u32.to_le_bytes()); + break; + } + let mut meta_buf = vec![0u8; meta_len]; + std::io::Read::read_exact(&mut cursor, &mut meta_buf).unwrap(); + + let message = root_as_message(&meta_buf).unwrap(); + let body_len = message.bodyLength() as usize; + let mut body_buf = vec![0u8; body_len]; + std::io::Read::read_exact(&mut cursor, &mut body_buf).unwrap(); + + if message.header_type() == crate::MessageHeader::DictionaryBatch { + // Skip the dictionary batch — this is what C++ does for + // all-null dictionary columns. + continue; + } + stripped.extend_from_slice(&CONTINUATION_MARKER); + stripped.extend_from_slice(&(meta_len as u32).to_le_bytes()); + stripped.extend_from_slice(&meta_buf); + stripped.extend_from_slice(&body_buf); + } + + // Reading the stripped stream must succeed. + let result = read_stream(&stripped).unwrap(); + assert_eq!(result.num_rows(), 4); + assert_eq!(result.num_columns(), 1); + + let col = result.column(0); + assert_eq!(col.null_count(), 4); + assert_eq!(col.len(), 4); + // The result must be a dictionary-typed array. + assert!(matches!(col.data_type(), DataType::Dictionary(_, _))); + } } From b9e63df47395b311c479b3248401d38673acfa56 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 6 Apr 2026 16:24:18 -0400 Subject: [PATCH 2/3] fix clippy and fmt --- arrow-ipc/src/reader.rs | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs index ba17b27d6b27..0969f8b59f80 100644 --- a/arrow-ipc/src/reader.rs +++ b/arrow-ipc/src/reader.rs @@ -194,12 +194,7 @@ impl RecordBatchDecoder<'_> { } }; - self.create_dictionary_array( - index_node, - data_type, - &index_buffers, - value_array, - ) + self.create_dictionary_array(index_node, data_type, &index_buffers, value_array) } Union(fields, mode) => { let union_node = self.next_node(field)?; @@ -3274,8 +3269,7 @@ mod tests { dict_array.data_type().clone(), true, )])); - let batch = - RecordBatch::try_new(schema.clone(), vec![Arc::new(dict_array)]).unwrap(); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(dict_array)]).unwrap(); // Write a normal IPC stream (which includes the dictionary batch). let full_stream = write_stream(&batch); @@ -3292,10 +3286,10 @@ mod tests { if std::io::Read::read_exact(&mut cursor, &mut header).is_err() { break; } - if header == CONTINUATION_MARKER { - if std::io::Read::read_exact(&mut cursor, &mut header).is_err() { - break; - } + if header == CONTINUATION_MARKER + && std::io::Read::read_exact(&mut cursor, &mut header).is_err() + { + break; } let meta_len = u32::from_le_bytes(header) as usize; if meta_len == 0 { From 66482aea4ac76314fc43ed2deddcf5706d2eaabd Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 6 Apr 2026 16:27:06 -0400 Subject: [PATCH 3/3] simplify --- arrow-ipc/src/reader.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs index 0969f8b59f80..9afae78b0665 100644 --- a/arrow-ipc/src/reader.rs +++ b/arrow-ipc/src/reader.rs @@ -3283,12 +3283,10 @@ mod tests { // Each message is: [continuation (4 bytes)] [meta_len (4 bytes)] // [metadata (meta_len bytes)] [body (bodyLength bytes)] let mut header = [0u8; 4]; - if std::io::Read::read_exact(&mut cursor, &mut header).is_err() { + if cursor.read_exact(&mut header).is_err() { break; } - if header == CONTINUATION_MARKER - && std::io::Read::read_exact(&mut cursor, &mut header).is_err() - { + if header == CONTINUATION_MARKER && cursor.read_exact(&mut header).is_err() { break; } let meta_len = u32::from_le_bytes(header) as usize; @@ -3299,12 +3297,12 @@ mod tests { break; } let mut meta_buf = vec![0u8; meta_len]; - std::io::Read::read_exact(&mut cursor, &mut meta_buf).unwrap(); + cursor.read_exact(&mut meta_buf).unwrap(); let message = root_as_message(&meta_buf).unwrap(); let body_len = message.bodyLength() as usize; let mut body_buf = vec![0u8; body_len]; - std::io::Read::read_exact(&mut cursor, &mut body_buf).unwrap(); + cursor.read_exact(&mut body_buf).unwrap(); if message.header_type() == crate::MessageHeader::DictionaryBatch { // Skip the dictionary batch — this is what C++ does for