diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs index 6a5dc707d71b..9afae78b0665 100644 --- a/arrow-ipc/src/reader.rs +++ b/arrow-ipc/src/reader.rs @@ -180,18 +180,21 @@ impl RecordBatchDecoder<'_> { ArrowError::ParseError(format!("Field {field} does not have dict id")) })?; - let value_array = self.dictionaries_by_id.get(&dict_id).ok_or_else(|| { - ArrowError::ParseError(format!( - "Cannot find a dictionary batch with dict id: {dict_id}" - )) - })?; + let value_array = match self.dictionaries_by_id.get(&dict_id) { + Some(array) => array.clone(), + None => { + // Per the IPC spec, dictionary batches may be omitted when all + // values in the column are null. In that case we synthesize an + // empty values array so decoding can proceed. + if let Dictionary(_, value_type) = data_type { + arrow_array::new_empty_array(value_type.as_ref()) + } else { + unreachable!() + } + } + }; - self.create_dictionary_array( - index_node, - data_type, - &index_buffers, - value_array.clone(), - ) + self.create_dictionary_array(index_node, data_type, &index_buffers, value_array) } Union(fields, mode) => { let union_node = self.next_node(field)?; @@ -3248,4 +3251,79 @@ mod tests { let reader = StreamReader::try_new(Cursor::new(buf), None); assert!(reader.is_err()); } + + /// Per the IPC specification, dictionary batches may be omitted for + /// dictionary-encoded columns where all values are null. The C++ + /// implementation relies on this and does not emit a dictionary batch + /// in that case. Verify that the Rust reader handles such streams + /// by synthesizing an empty dictionary instead of returning an error. + #[test] + fn test_read_null_dict_without_dictionary_batch() { + // Build an all-null dictionary-encoded column. + let keys = Int32Array::new_null(4); + let values: ArrayRef = new_empty_array(&DataType::Utf8); + let dict_array = DictionaryArray::new(keys, values); + + let schema = Arc::new(Schema::new(vec![Field::new( + "d", + dict_array.data_type().clone(), + true, + )])); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(dict_array)]).unwrap(); + + // Write a normal IPC stream (which includes the dictionary batch). + let full_stream = write_stream(&batch); + + // Parse the stream into individual messages and reconstruct it + // without the DictionaryBatch message, simulating what C++ emits + // for an all-null dictionary column. + let mut stripped = Vec::new(); + let mut cursor = Cursor::new(&full_stream); + loop { + // Each message is: [continuation (4 bytes)] [meta_len (4 bytes)] + // [metadata (meta_len bytes)] [body (bodyLength bytes)] + let mut header = [0u8; 4]; + if cursor.read_exact(&mut header).is_err() { + break; + } + if header == CONTINUATION_MARKER && cursor.read_exact(&mut header).is_err() { + break; + } + let meta_len = u32::from_le_bytes(header) as usize; + if meta_len == 0 { + // EOS marker — write it through. + stripped.extend_from_slice(&CONTINUATION_MARKER); + stripped.extend_from_slice(&0u32.to_le_bytes()); + break; + } + let mut meta_buf = vec![0u8; meta_len]; + cursor.read_exact(&mut meta_buf).unwrap(); + + let message = root_as_message(&meta_buf).unwrap(); + let body_len = message.bodyLength() as usize; + let mut body_buf = vec![0u8; body_len]; + cursor.read_exact(&mut body_buf).unwrap(); + + if message.header_type() == crate::MessageHeader::DictionaryBatch { + // Skip the dictionary batch — this is what C++ does for + // all-null dictionary columns. + continue; + } + stripped.extend_from_slice(&CONTINUATION_MARKER); + stripped.extend_from_slice(&(meta_len as u32).to_le_bytes()); + stripped.extend_from_slice(&meta_buf); + stripped.extend_from_slice(&body_buf); + } + + // Reading the stripped stream must succeed. + let result = read_stream(&stripped).unwrap(); + assert_eq!(result.num_rows(), 4); + assert_eq!(result.num_columns(), 1); + + let col = result.column(0); + assert_eq!(col.null_count(), 4); + assert_eq!(col.len(), 4); + // The result must be a dictionary-typed array. + assert!(matches!(col.data_type(), DataType::Dictionary(_, _))); + } }