From db8b8e4b421023206f6d736b6314d1b1bad7b4a4 Mon Sep 17 00:00:00 2001 From: "Oleg V. Kozlyuk" Date: Wed, 17 Dec 2025 18:10:25 +0100 Subject: [PATCH 1/3] Added capability to fetch dictionary values --- parquet/src/arrow/async_reader/mod.rs | 393 +++++++++++++++++++++++++- 1 file changed, 389 insertions(+), 4 deletions(-) diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 60f2ca1615a3..6f6ebfc84abd 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -33,19 +33,22 @@ use futures::future::{BoxFuture, FutureExt}; use futures::stream::Stream; use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; -use arrow_array::RecordBatch; -use arrow_schema::{Schema, SchemaRef}; +use arrow_array::{ArrayRef, RecordBatch}; +use arrow_schema::{DataType as ArrowType, Schema, SchemaRef}; use crate::arrow::arrow_reader::{ ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader, }; -use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash}; +use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash, Compression, PageType, Type as PhysicalType}; use crate::bloom_filter::{ SBBF_HEADER_SIZE_ESTIMATE, Sbbf, chunk_read_bloom_filter_header_and_offset, }; +use crate::compression::{create_codec, CodecOptions}; use crate::errors::{ParquetError, Result}; use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader}; +use crate::file::metadata::thrift::PageHeader; +use crate::parquet_thrift::{ReadThrift, ThriftSliceInputProtocol}; mod metadata; pub use metadata::*; @@ -475,6 +478,162 @@ impl ParquetRecordBatchStreamBuilder { Ok(Some(Sbbf::new(&bitset))) } + /// Read dictionary for a column in a row group + /// + /// Returns `None` if the column does not have a dictionary page. + /// Supports BYTE_ARRAY columns (String/Binary). + /// + /// # Arguments + /// * `row_group_idx` - Index of the row group + /// * `column_idx` - Index of the column within the row group + /// + /// # Errors + /// Returns an error if: + /// - The column type is not BYTE_ARRAY + /// - The dictionary page cannot be read or decoded + /// - Decompression fails + /// - UTF-8 validation fails (for String columns) + pub async fn get_row_group_column_dictionary( + &mut self, + row_group_idx: usize, + column_idx: usize, + ) -> Result> { + // 1. Get metadata for the row group and column + let row_group_metadata = self.metadata.row_group(row_group_idx); + let column_metadata = row_group_metadata.column(column_idx); + + // 2. Check if dictionary page exists + let dict_offset: u64 = match column_metadata.dictionary_page_offset() { + Some(offset) => offset + .try_into() + .map_err(|_| ParquetError::General("Dictionary page offset is invalid".to_string()))?, + None => return Ok(None), + }; + + // 3. Validate column type - only support BYTE_ARRAY + let physical_type = column_metadata.column_type(); + if physical_type != PhysicalType::BYTE_ARRAY { + return Err(ParquetError::General(format!( + "get_row_group_column_dictionary only supports BYTE_ARRAY columns, got {:?}", + physical_type + ))); + } + + // 4. Calculate dictionary page length + // Dictionary page length = data_page_offset - dictionary_page_offset + let data_page_offset: u64 = column_metadata + .data_page_offset() + .try_into() + .map_err(|_| ParquetError::General("Data page offset is invalid".to_string()))?; + + let dict_length = data_page_offset - dict_offset; + + // 5. Fetch dictionary page bytes + let buffer = self + .input + .0 + .get_bytes(dict_offset..dict_offset + dict_length) + .await?; + + // 6. Parse page header (Thrift encoded) + let mut prot = ThriftSliceInputProtocol::new(&buffer); + let page_header = PageHeader::read_thrift(&mut prot) + .map_err(|e| ParquetError::General(format!("Failed to read page header: {}", e)))?; + let header_len = prot.as_slice().as_ptr() as usize - buffer.as_ptr() as usize; + + // 7. Validate it's a dictionary page + if page_header.r#type != PageType::DICTIONARY_PAGE { + return Err(ParquetError::General(format!( + "Expected DICTIONARY_PAGE, got {:?}", + page_header.r#type + ))); + } + + let dict_header = page_header + .dictionary_page_header + .ok_or_else(|| ParquetError::General("Missing dictionary page header".to_string()))?; + + let num_values = dict_header.num_values as usize; + + // 8. Extract page data (after header) + let compressed_page_size = page_header.compressed_page_size as usize; + let uncompressed_page_size = page_header.uncompressed_page_size as usize; + let page_data = buffer.slice(header_len..header_len + compressed_page_size); + + // 9. Handle decompression if needed + let decompressed_data = if column_metadata.compression() != Compression::UNCOMPRESSED { + let codec_options = CodecOptions::default(); + let mut decompressor = create_codec(column_metadata.compression(), &codec_options)? + .ok_or_else(|| { + ParquetError::General("Failed to create decompressor".to_string()) + })?; + + let mut decompressed = Vec::with_capacity(uncompressed_page_size); + decompressor.decompress(&page_data, &mut decompressed, Some(uncompressed_page_size))?; + Bytes::from(decompressed) + } else { + page_data + }; + + // 10. Determine if this is String (Utf8) or Binary based on schema + let parquet_schema = self.metadata.file_metadata().schema_descr(); + let column_descr = parquet_schema.column(column_idx); + let validate_utf8 = is_utf8_column(&column_descr); + + // 11. Decode dictionary values from PLAIN encoding + // PLAIN encoding: each value is 4-byte little-endian length + bytes + let mut offsets = vec![0_i32]; + let mut values = Vec::new(); + let mut offset = 0; + + for _ in 0..num_values { + if offset + 4 > decompressed_data.len() { + return Err(ParquetError::EOF("Unexpected end of dictionary page".into())); + } + + let len_bytes: [u8; 4] = decompressed_data[offset..offset + 4] + .try_into() + .map_err(|_| ParquetError::EOF("Failed to read length".into()))?; + let len = u32::from_le_bytes(len_bytes) as usize; + offset += 4; + + if offset + len > decompressed_data.len() { + return Err(ParquetError::EOF("Unexpected end of dictionary page".into())); + } + + let value_bytes = &decompressed_data[offset..offset + len]; + + // Validate UTF-8 if needed + if validate_utf8 && std::str::from_utf8(value_bytes).is_err() { + return Err(ParquetError::General("Invalid UTF-8 in dictionary".to_string())); + } + + values.extend_from_slice(value_bytes); + offsets.push(values.len() as i32); + offset += len; + } + + // 12. Convert to Arrow array + let data_type = if validate_utf8 { + ArrowType::Utf8 + } else { + ArrowType::Binary + }; + + use arrow_array::make_array; + use arrow_buffer::Buffer; + use arrow_data::ArrayDataBuilder; + + let array_data = ArrayDataBuilder::new(data_type) + .len(num_values) + .add_buffer(Buffer::from_vec(offsets)) + .add_buffer(Buffer::from_vec(values)) + .build() + .map_err(|e| ParquetError::General(format!("Failed to build array: {}", e)))?; + + Ok(Some(make_array(array_data))) + } + /// Build a new [`ParquetRecordBatchStream`] /// /// See examples on [`ParquetRecordBatchStreamBuilder::new`] @@ -764,6 +923,24 @@ where } } +/// Determines if a column should be treated as UTF-8 based on its logical/converted type +fn is_utf8_column(column_descr: &crate::schema::types::ColumnDescriptor) -> bool { + use crate::basic::{ConvertedType, LogicalType}; + + let basic_info = column_descr.self_type().get_basic_info(); + + // Check logical type first (preferred) + if let Some(logical_type) = basic_info.logical_type_ref() { + return matches!(logical_type, LogicalType::String); + } + + // Fall back to converted type + matches!( + basic_info.converted_type(), + ConvertedType::UTF8 | ConvertedType::JSON + ) +} + #[cfg(test)] mod tests { use super::*; @@ -784,7 +961,7 @@ mod tests { use arrow_array::cast::AsArray; use arrow_array::types::Int32Type; use arrow_array::{ - Array, ArrayRef, BooleanArray, Int8Array, Int32Array, Int64Array, RecordBatchReader, + Array, ArrayRef, BinaryArray, BooleanArray, Int8Array, Int32Array, Int64Array, RecordBatchReader, Scalar, StringArray, StructArray, UInt64Array, }; use arrow_schema::{DataType, Field, Schema}; @@ -2308,4 +2485,212 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_get_row_group_column_dictionary_string() { + // Create a parquet file with dictionary-encoded string column + let schema = Arc::new(Schema::new(vec![Field::new( + "dict_col", + DataType::Utf8, + false, + )])); + + let mut parquet_data = Vec::new(); + let props = WriterProperties::builder() + .set_dictionary_enabled(true) + .build(); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(StringArray::from(vec![ + "apple", "banana", "apple", "cherry", "banana", + ]))], + ) + .unwrap(); + + let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + // Test reading dictionary + let async_reader = TestReader::new(Bytes::from(parquet_data)); + let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader) + .await + .unwrap(); + + let dict = builder + .get_row_group_column_dictionary(0, 0) + .await + .unwrap() + .expect("Should have dictionary"); + + let strings = dict.as_any().downcast_ref::().unwrap(); + // Dictionary should contain unique values: apple, banana, cherry + assert_eq!(strings.len(), 3); + let mut values: Vec<_> = (0..strings.len()).map(|i| strings.value(i)).collect(); + values.sort(); + assert_eq!(values, vec!["apple", "banana", "cherry"]); + } + + #[tokio::test] + async fn test_get_row_group_column_dictionary_binary() { + // Test with binary column + let schema = Arc::new(Schema::new(vec![Field::new( + "binary_col", + DataType::Binary, + false, + )])); + + let mut parquet_data = Vec::new(); + let props = WriterProperties::builder() + .set_dictionary_enabled(true) + .build(); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(BinaryArray::from(vec![ + b"data1".as_slice(), + b"data2".as_slice(), + b"data1".as_slice(), + b"data3".as_slice(), + ]))], + ) + .unwrap(); + + let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + let async_reader = TestReader::new(Bytes::from(parquet_data)); + let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader) + .await + .unwrap(); + + let dict = builder + .get_row_group_column_dictionary(0, 0) + .await + .unwrap() + .expect("Should have dictionary"); + + let binary = dict.as_any().downcast_ref::().unwrap(); + assert_eq!(binary.len(), 3); // unique values: data1, data2, data3 + } + + #[tokio::test] + async fn test_get_row_group_column_dictionary_none() { + // Create a parquet file without dictionary encoding + let schema = Arc::new(Schema::new(vec![Field::new( + "plain_col", + DataType::Utf8, + false, + )])); + + let mut parquet_data = Vec::new(); + let props = WriterProperties::builder() + .set_dictionary_enabled(false) + .build(); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(StringArray::from(vec!["hello", "world"]))], + ) + .unwrap(); + + let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + // Test reading - should return None + let async_reader = TestReader::new(Bytes::from(parquet_data)); + let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader) + .await + .unwrap(); + + let dict = builder + .get_row_group_column_dictionary(0, 0) + .await + .unwrap(); + + assert!(dict.is_none()); + } + + #[tokio::test] + async fn test_get_row_group_column_dictionary_compressed() { + // Test with compressed dictionary + let schema = Arc::new(Schema::new(vec![Field::new( + "dict_col", + DataType::Utf8, + false, + )])); + + let mut parquet_data = Vec::new(); + let props = WriterProperties::builder() + .set_dictionary_enabled(true) + .set_compression(crate::basic::Compression::SNAPPY) + .build(); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(StringArray::from(vec![ + "hello", "world", "hello", "parquet", + ]))], + ) + .unwrap(); + + let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + let async_reader = TestReader::new(Bytes::from(parquet_data)); + let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader) + .await + .unwrap(); + + let dict = builder + .get_row_group_column_dictionary(0, 0) + .await + .unwrap() + .expect("Should have dictionary"); + + let strings = dict.as_any().downcast_ref::().unwrap(); + assert_eq!(strings.len(), 3); // unique values: hello, world, parquet + } + + #[tokio::test] + async fn test_get_row_group_column_dictionary_wrong_type() { + // Test with non-BYTE_ARRAY column + let schema = Arc::new(Schema::new(vec![Field::new( + "int_col", + DataType::Int32, + false, + )])); + + let mut parquet_data = Vec::new(); + let props = WriterProperties::builder() + .set_dictionary_enabled(true) + .build(); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from(vec![1, 2, 1, 3]))], + ) + .unwrap(); + + let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + let async_reader = TestReader::new(Bytes::from(parquet_data)); + let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader) + .await + .unwrap(); + + // Should return error for non-BYTE_ARRAY type + let result = builder.get_row_group_column_dictionary(0, 0).await; + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("only supports BYTE_ARRAY")); + } } From 37005d2345b2bf865d8586d3bbc1d27af89ec04c Mon Sep 17 00:00:00 2001 From: "Oleg V. Kozlyuk" Date: Sun, 28 Dec 2025 16:51:50 +0100 Subject: [PATCH 2/3] Move dictionary read to ParquetMetaDataReader --- parquet/src/arrow/array_reader/mod.rs | 4 + parquet/src/arrow/async_reader/mod.rs | 393 +----------------------- parquet/src/arrow/mod.rs | 4 + parquet/src/file/metadata/dictionary.rs | 261 ++++++++++++++++ parquet/src/file/metadata/mod.rs | 3 + parquet/src/file/metadata/reader.rs | 178 +++++++++++ 6 files changed, 454 insertions(+), 389 deletions(-) create mode 100644 parquet/src/file/metadata/dictionary.rs diff --git a/parquet/src/arrow/array_reader/mod.rs b/parquet/src/arrow/array_reader/mod.rs index 54be89f23084..0db09dbacd05 100644 --- a/parquet/src/arrow/array_reader/mod.rs +++ b/parquet/src/arrow/array_reader/mod.rs @@ -58,6 +58,10 @@ pub use byte_array_dictionary::make_byte_array_dictionary_reader; pub use byte_view_array::make_byte_view_array_reader; #[allow(unused_imports)] // Only used for benchmarks pub use fixed_len_byte_array::make_fixed_len_byte_array_reader; + +// Re-export for use in file::metadata::dictionary +pub(crate) use byte_array::ByteArrayDecoder; +pub(crate) use crate::arrow::buffer::offset_buffer::OffsetBuffer; pub use fixed_size_list_array::FixedSizeListArrayReader; pub use list_array::ListArrayReader; pub use map_array::MapArrayReader; diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 6f6ebfc84abd..60f2ca1615a3 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -33,22 +33,19 @@ use futures::future::{BoxFuture, FutureExt}; use futures::stream::Stream; use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; -use arrow_array::{ArrayRef, RecordBatch}; -use arrow_schema::{DataType as ArrowType, Schema, SchemaRef}; +use arrow_array::RecordBatch; +use arrow_schema::{Schema, SchemaRef}; use crate::arrow::arrow_reader::{ ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader, }; -use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash, Compression, PageType, Type as PhysicalType}; +use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash}; use crate::bloom_filter::{ SBBF_HEADER_SIZE_ESTIMATE, Sbbf, chunk_read_bloom_filter_header_and_offset, }; -use crate::compression::{create_codec, CodecOptions}; use crate::errors::{ParquetError, Result}; use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader}; -use crate::file::metadata::thrift::PageHeader; -use crate::parquet_thrift::{ReadThrift, ThriftSliceInputProtocol}; mod metadata; pub use metadata::*; @@ -478,162 +475,6 @@ impl ParquetRecordBatchStreamBuilder { Ok(Some(Sbbf::new(&bitset))) } - /// Read dictionary for a column in a row group - /// - /// Returns `None` if the column does not have a dictionary page. - /// Supports BYTE_ARRAY columns (String/Binary). - /// - /// # Arguments - /// * `row_group_idx` - Index of the row group - /// * `column_idx` - Index of the column within the row group - /// - /// # Errors - /// Returns an error if: - /// - The column type is not BYTE_ARRAY - /// - The dictionary page cannot be read or decoded - /// - Decompression fails - /// - UTF-8 validation fails (for String columns) - pub async fn get_row_group_column_dictionary( - &mut self, - row_group_idx: usize, - column_idx: usize, - ) -> Result> { - // 1. Get metadata for the row group and column - let row_group_metadata = self.metadata.row_group(row_group_idx); - let column_metadata = row_group_metadata.column(column_idx); - - // 2. Check if dictionary page exists - let dict_offset: u64 = match column_metadata.dictionary_page_offset() { - Some(offset) => offset - .try_into() - .map_err(|_| ParquetError::General("Dictionary page offset is invalid".to_string()))?, - None => return Ok(None), - }; - - // 3. Validate column type - only support BYTE_ARRAY - let physical_type = column_metadata.column_type(); - if physical_type != PhysicalType::BYTE_ARRAY { - return Err(ParquetError::General(format!( - "get_row_group_column_dictionary only supports BYTE_ARRAY columns, got {:?}", - physical_type - ))); - } - - // 4. Calculate dictionary page length - // Dictionary page length = data_page_offset - dictionary_page_offset - let data_page_offset: u64 = column_metadata - .data_page_offset() - .try_into() - .map_err(|_| ParquetError::General("Data page offset is invalid".to_string()))?; - - let dict_length = data_page_offset - dict_offset; - - // 5. Fetch dictionary page bytes - let buffer = self - .input - .0 - .get_bytes(dict_offset..dict_offset + dict_length) - .await?; - - // 6. Parse page header (Thrift encoded) - let mut prot = ThriftSliceInputProtocol::new(&buffer); - let page_header = PageHeader::read_thrift(&mut prot) - .map_err(|e| ParquetError::General(format!("Failed to read page header: {}", e)))?; - let header_len = prot.as_slice().as_ptr() as usize - buffer.as_ptr() as usize; - - // 7. Validate it's a dictionary page - if page_header.r#type != PageType::DICTIONARY_PAGE { - return Err(ParquetError::General(format!( - "Expected DICTIONARY_PAGE, got {:?}", - page_header.r#type - ))); - } - - let dict_header = page_header - .dictionary_page_header - .ok_or_else(|| ParquetError::General("Missing dictionary page header".to_string()))?; - - let num_values = dict_header.num_values as usize; - - // 8. Extract page data (after header) - let compressed_page_size = page_header.compressed_page_size as usize; - let uncompressed_page_size = page_header.uncompressed_page_size as usize; - let page_data = buffer.slice(header_len..header_len + compressed_page_size); - - // 9. Handle decompression if needed - let decompressed_data = if column_metadata.compression() != Compression::UNCOMPRESSED { - let codec_options = CodecOptions::default(); - let mut decompressor = create_codec(column_metadata.compression(), &codec_options)? - .ok_or_else(|| { - ParquetError::General("Failed to create decompressor".to_string()) - })?; - - let mut decompressed = Vec::with_capacity(uncompressed_page_size); - decompressor.decompress(&page_data, &mut decompressed, Some(uncompressed_page_size))?; - Bytes::from(decompressed) - } else { - page_data - }; - - // 10. Determine if this is String (Utf8) or Binary based on schema - let parquet_schema = self.metadata.file_metadata().schema_descr(); - let column_descr = parquet_schema.column(column_idx); - let validate_utf8 = is_utf8_column(&column_descr); - - // 11. Decode dictionary values from PLAIN encoding - // PLAIN encoding: each value is 4-byte little-endian length + bytes - let mut offsets = vec![0_i32]; - let mut values = Vec::new(); - let mut offset = 0; - - for _ in 0..num_values { - if offset + 4 > decompressed_data.len() { - return Err(ParquetError::EOF("Unexpected end of dictionary page".into())); - } - - let len_bytes: [u8; 4] = decompressed_data[offset..offset + 4] - .try_into() - .map_err(|_| ParquetError::EOF("Failed to read length".into()))?; - let len = u32::from_le_bytes(len_bytes) as usize; - offset += 4; - - if offset + len > decompressed_data.len() { - return Err(ParquetError::EOF("Unexpected end of dictionary page".into())); - } - - let value_bytes = &decompressed_data[offset..offset + len]; - - // Validate UTF-8 if needed - if validate_utf8 && std::str::from_utf8(value_bytes).is_err() { - return Err(ParquetError::General("Invalid UTF-8 in dictionary".to_string())); - } - - values.extend_from_slice(value_bytes); - offsets.push(values.len() as i32); - offset += len; - } - - // 12. Convert to Arrow array - let data_type = if validate_utf8 { - ArrowType::Utf8 - } else { - ArrowType::Binary - }; - - use arrow_array::make_array; - use arrow_buffer::Buffer; - use arrow_data::ArrayDataBuilder; - - let array_data = ArrayDataBuilder::new(data_type) - .len(num_values) - .add_buffer(Buffer::from_vec(offsets)) - .add_buffer(Buffer::from_vec(values)) - .build() - .map_err(|e| ParquetError::General(format!("Failed to build array: {}", e)))?; - - Ok(Some(make_array(array_data))) - } - /// Build a new [`ParquetRecordBatchStream`] /// /// See examples on [`ParquetRecordBatchStreamBuilder::new`] @@ -923,24 +764,6 @@ where } } -/// Determines if a column should be treated as UTF-8 based on its logical/converted type -fn is_utf8_column(column_descr: &crate::schema::types::ColumnDescriptor) -> bool { - use crate::basic::{ConvertedType, LogicalType}; - - let basic_info = column_descr.self_type().get_basic_info(); - - // Check logical type first (preferred) - if let Some(logical_type) = basic_info.logical_type_ref() { - return matches!(logical_type, LogicalType::String); - } - - // Fall back to converted type - matches!( - basic_info.converted_type(), - ConvertedType::UTF8 | ConvertedType::JSON - ) -} - #[cfg(test)] mod tests { use super::*; @@ -961,7 +784,7 @@ mod tests { use arrow_array::cast::AsArray; use arrow_array::types::Int32Type; use arrow_array::{ - Array, ArrayRef, BinaryArray, BooleanArray, Int8Array, Int32Array, Int64Array, RecordBatchReader, + Array, ArrayRef, BooleanArray, Int8Array, Int32Array, Int64Array, RecordBatchReader, Scalar, StringArray, StructArray, UInt64Array, }; use arrow_schema::{DataType, Field, Schema}; @@ -2485,212 +2308,4 @@ mod tests { Ok(()) } - - #[tokio::test] - async fn test_get_row_group_column_dictionary_string() { - // Create a parquet file with dictionary-encoded string column - let schema = Arc::new(Schema::new(vec![Field::new( - "dict_col", - DataType::Utf8, - false, - )])); - - let mut parquet_data = Vec::new(); - let props = WriterProperties::builder() - .set_dictionary_enabled(true) - .build(); - - let batch = RecordBatch::try_new( - schema.clone(), - vec![Arc::new(StringArray::from(vec![ - "apple", "banana", "apple", "cherry", "banana", - ]))], - ) - .unwrap(); - - let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap(); - writer.write(&batch).unwrap(); - writer.close().unwrap(); - - // Test reading dictionary - let async_reader = TestReader::new(Bytes::from(parquet_data)); - let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader) - .await - .unwrap(); - - let dict = builder - .get_row_group_column_dictionary(0, 0) - .await - .unwrap() - .expect("Should have dictionary"); - - let strings = dict.as_any().downcast_ref::().unwrap(); - // Dictionary should contain unique values: apple, banana, cherry - assert_eq!(strings.len(), 3); - let mut values: Vec<_> = (0..strings.len()).map(|i| strings.value(i)).collect(); - values.sort(); - assert_eq!(values, vec!["apple", "banana", "cherry"]); - } - - #[tokio::test] - async fn test_get_row_group_column_dictionary_binary() { - // Test with binary column - let schema = Arc::new(Schema::new(vec![Field::new( - "binary_col", - DataType::Binary, - false, - )])); - - let mut parquet_data = Vec::new(); - let props = WriterProperties::builder() - .set_dictionary_enabled(true) - .build(); - - let batch = RecordBatch::try_new( - schema.clone(), - vec![Arc::new(BinaryArray::from(vec![ - b"data1".as_slice(), - b"data2".as_slice(), - b"data1".as_slice(), - b"data3".as_slice(), - ]))], - ) - .unwrap(); - - let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap(); - writer.write(&batch).unwrap(); - writer.close().unwrap(); - - let async_reader = TestReader::new(Bytes::from(parquet_data)); - let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader) - .await - .unwrap(); - - let dict = builder - .get_row_group_column_dictionary(0, 0) - .await - .unwrap() - .expect("Should have dictionary"); - - let binary = dict.as_any().downcast_ref::().unwrap(); - assert_eq!(binary.len(), 3); // unique values: data1, data2, data3 - } - - #[tokio::test] - async fn test_get_row_group_column_dictionary_none() { - // Create a parquet file without dictionary encoding - let schema = Arc::new(Schema::new(vec![Field::new( - "plain_col", - DataType::Utf8, - false, - )])); - - let mut parquet_data = Vec::new(); - let props = WriterProperties::builder() - .set_dictionary_enabled(false) - .build(); - - let batch = RecordBatch::try_new( - schema.clone(), - vec![Arc::new(StringArray::from(vec!["hello", "world"]))], - ) - .unwrap(); - - let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap(); - writer.write(&batch).unwrap(); - writer.close().unwrap(); - - // Test reading - should return None - let async_reader = TestReader::new(Bytes::from(parquet_data)); - let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader) - .await - .unwrap(); - - let dict = builder - .get_row_group_column_dictionary(0, 0) - .await - .unwrap(); - - assert!(dict.is_none()); - } - - #[tokio::test] - async fn test_get_row_group_column_dictionary_compressed() { - // Test with compressed dictionary - let schema = Arc::new(Schema::new(vec![Field::new( - "dict_col", - DataType::Utf8, - false, - )])); - - let mut parquet_data = Vec::new(); - let props = WriterProperties::builder() - .set_dictionary_enabled(true) - .set_compression(crate::basic::Compression::SNAPPY) - .build(); - - let batch = RecordBatch::try_new( - schema.clone(), - vec![Arc::new(StringArray::from(vec![ - "hello", "world", "hello", "parquet", - ]))], - ) - .unwrap(); - - let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap(); - writer.write(&batch).unwrap(); - writer.close().unwrap(); - - let async_reader = TestReader::new(Bytes::from(parquet_data)); - let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader) - .await - .unwrap(); - - let dict = builder - .get_row_group_column_dictionary(0, 0) - .await - .unwrap() - .expect("Should have dictionary"); - - let strings = dict.as_any().downcast_ref::().unwrap(); - assert_eq!(strings.len(), 3); // unique values: hello, world, parquet - } - - #[tokio::test] - async fn test_get_row_group_column_dictionary_wrong_type() { - // Test with non-BYTE_ARRAY column - let schema = Arc::new(Schema::new(vec![Field::new( - "int_col", - DataType::Int32, - false, - )])); - - let mut parquet_data = Vec::new(); - let props = WriterProperties::builder() - .set_dictionary_enabled(true) - .build(); - - let batch = RecordBatch::try_new( - schema.clone(), - vec![Arc::new(Int32Array::from(vec![1, 2, 1, 3]))], - ) - .unwrap(); - - let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap(); - writer.write(&batch).unwrap(); - writer.close().unwrap(); - - let async_reader = TestReader::new(Bytes::from(parquet_data)); - let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader) - .await - .unwrap(); - - // Should return error for non-BYTE_ARRAY type - let result = builder.get_row_group_column_dictionary(0, 0).await; - assert!(result.is_err()); - assert!(result - .unwrap_err() - .to_string() - .contains("only supports BYTE_ARRAY")); - } } diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs index 672ffb6fc521..fd46094b7d96 100644 --- a/parquet/src/arrow/mod.rs +++ b/parquet/src/arrow/mod.rs @@ -185,6 +185,10 @@ pub mod arrow_writer; mod buffer; mod decoder; +// Re-export for use in file::metadata::dictionary +#[cfg(feature = "arrow")] +pub(crate) use array_reader::{ByteArrayDecoder, OffsetBuffer}; + #[cfg(feature = "async")] pub mod async_reader; #[cfg(feature = "async")] diff --git a/parquet/src/file/metadata/dictionary.rs b/parquet/src/file/metadata/dictionary.rs new file mode 100644 index 000000000000..9bbabb7ec2d0 --- /dev/null +++ b/parquet/src/file/metadata/dictionary.rs @@ -0,0 +1,261 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Dictionary page decoding utilities + +use arrow_array::ArrayRef; +use bytes::Bytes; + +use crate::arrow::{ByteArrayDecoder, OffsetBuffer}; +use crate::basic::{Compression, Encoding, PageType}; +use crate::compression::{create_codec, CodecOptions}; +use crate::errors::{ParquetError, Result}; +use crate::file::metadata::thrift::PageHeader; +use crate::file::metadata::ColumnChunkMetaData; +use crate::parquet_thrift::{ReadThrift, ThriftSliceInputProtocol}; +use crate::schema::types::ColumnDescriptor; + +/// Decodes a dictionary page from raw bytes into an Arrow array. +/// +/// This function parses a dictionary page (including Thrift header), handles +/// decompression if needed, and decodes PLAIN-encoded byte array values into +/// an Arrow StringArray or BinaryArray based on the column's logical type. +/// +/// # Arguments +/// +/// * `buffer` - Raw bytes containing the complete dictionary page (header + data) +/// * `column_metadata` - Column chunk metadata for compression information +/// * `column_descriptor` - Column schema information for type detection +/// +/// # Returns +/// +/// * `Ok(ArrayRef)` - The decoded dictionary as a StringArray (for UTF-8 columns) +/// or BinaryArray (for binary columns) +/// * `Err(ParquetError)` - If: +/// - The page is not a valid DICTIONARY_PAGE +/// - Thrift header parsing fails +/// - Decompression fails +/// - PLAIN decoding fails +/// - UTF-8 validation fails (for String columns) +/// +/// # Example +/// +/// ```ignore +/// let array = decode_dictionary_page(buffer, column_metadata, column_descriptor)?; +/// ``` +pub fn decode_dictionary_page( + buffer: Bytes, + column_metadata: &ColumnChunkMetaData, + column_descriptor: &ColumnDescriptor, +) -> Result { + // Parse Thrift page header + let mut prot = ThriftSliceInputProtocol::new(&buffer); + let page_header = PageHeader::read_thrift(&mut prot) + .map_err(|e| ParquetError::General(format!("Failed to read page header: {}", e)))?; + let header_len = prot.as_slice().as_ptr() as usize - buffer.as_ptr() as usize; + + // Validate it's a dictionary page + if page_header.r#type != PageType::DICTIONARY_PAGE { + return Err(ParquetError::General(format!( + "Expected DICTIONARY_PAGE, got {:?}", + page_header.r#type + ))); + } + + let dict_header = page_header + .dictionary_page_header + .ok_or_else(|| ParquetError::General("Missing dictionary page header".to_string()))?; + + let num_values = dict_header.num_values as usize; + let compressed_page_size = page_header.compressed_page_size as usize; + let uncompressed_page_size = page_header.uncompressed_page_size as usize; + + // Extract page data (after header) + let page_data = buffer.slice(header_len..header_len + compressed_page_size); + + // Handle decompression if needed + let decompressed_data = if column_metadata.compression() != Compression::UNCOMPRESSED { + let codec_options = CodecOptions::default(); + let mut decompressor = create_codec(column_metadata.compression(), &codec_options)? + .ok_or_else(|| ParquetError::General("Failed to create decompressor".to_string()))?; + + let mut decompressed = Vec::with_capacity(uncompressed_page_size); + decompressor.decompress(&page_data, &mut decompressed, Some(uncompressed_page_size))?; + Bytes::from(decompressed) + } else { + page_data + }; + + // Determine if this is String (Utf8) or Binary based on schema + let validate_utf8 = is_utf8_column(column_descriptor); + + // Use ByteArrayDecoder to decode PLAIN-encoded values + let mut decoder = ByteArrayDecoder::new( + Encoding::PLAIN, + decompressed_data, + num_values, + Some(num_values), + validate_utf8, + )?; + + let mut buffer = OffsetBuffer::::default(); + decoder.read(&mut buffer, num_values, None)?; + + // Convert to Arrow array (Utf8 or Binary) + let value_type = if validate_utf8 { + arrow_schema::DataType::Utf8 + } else { + arrow_schema::DataType::Binary + }; + + Ok(buffer.into_array(None, value_type)) +} + +/// Determines if a column should be treated as UTF-8 based on its logical/converted type. +/// +/// Returns `true` if the column has: +/// - `LogicalType::String` (preferred modern type annotation) +/// - `ConvertedType::UTF8` or `ConvertedType::JSON` (legacy type annotation) +/// +/// # Arguments +/// +/// * `column_descr` - The column descriptor to check +/// +/// # Returns +/// +/// `true` if the column should be validated as UTF-8, `false` otherwise +pub fn is_utf8_column(column_descr: &ColumnDescriptor) -> bool { + use crate::basic::{ConvertedType, LogicalType}; + + let basic_info = column_descr.self_type().get_basic_info(); + + // Check logical type first (preferred) + if let Some(logical_type) = basic_info.logical_type_ref() { + return matches!(logical_type, LogicalType::String); + } + + // Fall back to converted type + matches!( + basic_info.converted_type(), + ConvertedType::UTF8 | ConvertedType::JSON + ) +} + +#[cfg(test)] +mod tests { + use crate::file::metadata::ParquetMetaDataReader; + use crate::util::test_common::file_util::get_test_file; + use arrow_array::{Array, BinaryArray, StringArray}; + use bytes::Bytes; + use std::io::Read; + + /// Helper to read a test file into bytes + fn read_test_file(file_name: &str) -> Bytes { + let mut file = get_test_file(file_name); + let mut buffer = Vec::new(); + file.read_to_end(&mut buffer).unwrap(); + Bytes::from(buffer) + } + + #[test] + fn test_read_column_dictionary_uncompressed() { + // plain-dict-uncompressed-checksum.parquet: col 1 = binary_field (BYTE_ARRAY dict, uncompressed) + // Note: No STRING annotation, so returns BinaryArray + let file_data = read_test_file("plain-dict-uncompressed-checksum.parquet"); + let reader = ParquetMetaDataReader::new(); + let metadata = reader.parse_and_finish(&file_data).unwrap(); + + let dict = ParquetMetaDataReader::read_column_dictionary(&file_data, &metadata, 0, 1).unwrap(); + + assert!(dict.is_some()); + let dict_array = dict.unwrap(); + let binary_array = dict_array + .as_any() + .downcast_ref::() + .expect("Expected BinaryArray"); + assert!(binary_array.len() > 0); + } + + #[test] + fn test_read_column_dictionary_binary() { + // alltypes_dictionary.parquet: col 8 = date_string_col (BYTE_ARRAY with dict, NO STRING annotation) + let file_data = read_test_file("alltypes_dictionary.parquet"); + let reader = ParquetMetaDataReader::new(); + let metadata = reader.parse_and_finish(&file_data).unwrap(); + + let dict = ParquetMetaDataReader::read_column_dictionary(&file_data, &metadata, 0, 8).unwrap(); + + assert!(dict.is_some()); + let dict_array = dict.unwrap(); + let binary_array = dict_array + .as_any() + .downcast_ref::() + .expect("Expected BinaryArray for pure BYTE_ARRAY column"); + assert!(binary_array.len() > 0); + } + + #[test] + fn test_read_column_dictionary_none() { + // alltypes_tiny_pages_plain.parquet: col 9 = string_col (PLAIN encoding, NO dictionary) + let file_data = read_test_file("alltypes_tiny_pages_plain.parquet"); + let reader = ParquetMetaDataReader::new(); + let metadata = reader.parse_and_finish(&file_data).unwrap(); + + let dict = ParquetMetaDataReader::read_column_dictionary(&file_data, &metadata, 0, 9).unwrap(); + + assert!(dict.is_none()); + } + + #[test] + fn test_read_column_dictionary_compressed() { + // rle-dict-snappy-checksum.parquet: col 1 = binary_field (BYTE_ARRAY with Snappy-compressed dict) + let file_data = read_test_file("rle-dict-snappy-checksum.parquet"); + let reader = ParquetMetaDataReader::new(); + let metadata = reader.parse_and_finish(&file_data).unwrap(); + + // Verify compression + assert_eq!( + metadata.row_group(0).column(1).compression(), + crate::basic::Compression::SNAPPY + ); + + let dict = ParquetMetaDataReader::read_column_dictionary(&file_data, &metadata, 0, 1).unwrap(); + + assert!(dict.is_some()); + let dict_array = dict.unwrap(); + // binary_field has no STRING annotation, so expect BinaryArray + let binary_array = dict_array + .as_any() + .downcast_ref::() + .expect("Expected BinaryArray"); + assert!(binary_array.len() > 0); + } + + #[test] + fn test_read_column_dictionary_wrong_type() { + // plain-dict-uncompressed-checksum.parquet: col 0 = long_field (INT32 with dict) + let file_data = read_test_file("plain-dict-uncompressed-checksum.parquet"); + let reader = ParquetMetaDataReader::new(); + let metadata = reader.parse_and_finish(&file_data).unwrap(); + + // INT32 column should fail + let result = ParquetMetaDataReader::read_column_dictionary(&file_data, &metadata, 0, 0); + + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("BYTE_ARRAY")); + } +} diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs index 6bd426ee677f..22777214b498 100644 --- a/parquet/src/file/metadata/mod.rs +++ b/parquet/src/file/metadata/mod.rs @@ -95,6 +95,9 @@ pub(crate) mod reader; pub(crate) mod thrift; mod writer; +#[cfg(feature = "arrow")] +pub(crate) mod dictionary; + use crate::basic::{EncodingMask, PageType}; #[cfg(feature = "encryption")] use crate::encryption::decrypt::FileDecryptor; diff --git a/parquet/src/file/metadata/reader.rs b/parquet/src/file/metadata/reader.rs index a18a5e68a9b5..f874dad7d4bb 100644 --- a/parquet/src/file/metadata/reader.rs +++ b/parquet/src/file/metadata/reader.rs @@ -829,6 +829,184 @@ impl ParquetMetaDataReader { pub fn decode_schema(buf: &[u8]) -> Result> { Ok(Arc::new(parquet_schema_from_bytes(buf)?)) } + + /// Read dictionary for a column in a row group using a synchronous reader. + /// + /// Returns `None` if the column does not have a dictionary page. + /// Currently only supports BYTE_ARRAY columns (String/Binary). + /// + /// # Arguments + /// + /// * `reader` - A [`ChunkReader`] providing access to the file data + /// * `metadata` - The Parquet file metadata + /// * `row_group_idx` - Index of the row group to read from + /// * `column_idx` - Index of the column within the row group + /// + /// # Errors + /// + /// Returns an error if: + /// - The row group index or column index is out of bounds + /// - The column type is not BYTE_ARRAY + /// - The dictionary page cannot be read or decoded + /// - Decompression fails + /// - UTF-8 validation fails (for String columns) + /// + /// # Example + /// + /// ```no_run + /// # use parquet::file::metadata::ParquetMetaDataReader; + /// # use parquet::file::reader::ChunkReader; + /// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); } + /// # fn get_metadata() -> parquet::file::metadata::ParquetMetaData { unimplemented!(); } + /// let file = open_parquet_file("some_path.parquet"); + /// let metadata = get_metadata(); + /// let dict = ParquetMetaDataReader::read_column_dictionary( + /// &file, + /// &metadata, + /// 0, // row group 0 + /// 0, // column 0 + /// ).unwrap(); + /// ``` + #[cfg(feature = "arrow")] + pub fn read_column_dictionary( + reader: &R, + metadata: &ParquetMetaData, + row_group_idx: usize, + column_idx: usize, + ) -> Result> { + use crate::basic::Type as PhysicalType; + + // Get row group and column metadata + let row_group_metadata = metadata.row_group(row_group_idx); + let column_metadata = row_group_metadata.column(column_idx); + + // Check if dictionary page exists + let dict_offset: u64 = match column_metadata.dictionary_page_offset() { + Some(offset) => offset.try_into().map_err(|_| { + ParquetError::General("Dictionary page offset is invalid".to_string()) + })?, + None => return Ok(None), + }; + + // Validate column type - only support BYTE_ARRAY + let physical_type = column_metadata.column_type(); + if physical_type != PhysicalType::BYTE_ARRAY { + return Err(ParquetError::General(format!( + "read_column_dictionary only supports BYTE_ARRAY columns, got {:?}", + physical_type + ))); + } + + // Calculate dictionary page length + let data_page_offset: u64 = column_metadata + .data_page_offset() + .try_into() + .map_err(|_| ParquetError::General("Data page offset is invalid".to_string()))?; + + let dict_length = data_page_offset - dict_offset; + + // Fetch dictionary page bytes + let buffer = reader.get_bytes(dict_offset, dict_length as usize)?; + + // Decode the dictionary page + let schema = metadata.file_metadata().schema_descr(); + let column_descriptor = schema.column(column_idx); + + let array = super::dictionary::decode_dictionary_page( + buffer, + column_metadata, + &column_descriptor, + )?; + + Ok(Some(array)) + } + + /// Read dictionary for a column in a row group using an asynchronous reader. + /// + /// Returns `None` if the column does not have a dictionary page. + /// Currently only supports BYTE_ARRAY columns (String/Binary). + /// + /// # Arguments + /// + /// * `fetch` - A [`MetadataFetch`] trait implementation for async data access + /// * `metadata` - The Parquet file metadata + /// * `row_group_idx` - Index of the row group to read from + /// * `column_idx` - Index of the column within the row group + /// + /// # Errors + /// + /// Returns an error if: + /// - The row group index or column index is out of bounds + /// - The column type is not BYTE_ARRAY + /// - The dictionary page cannot be read or decoded + /// - Decompression fails + /// - UTF-8 validation fails (for String columns) + /// + /// # Example + /// + /// ```ignore + /// let dict = ParquetMetaDataReader::load_column_dictionary( + /// &mut fetch, + /// &metadata, + /// 0, // row group 0 + /// 0, // column 0 + /// ).await?; + /// ``` + #[cfg(all(feature = "async", feature = "arrow"))] + pub async fn load_column_dictionary( + mut fetch: F, + metadata: &ParquetMetaData, + row_group_idx: usize, + column_idx: usize, + ) -> Result> { + use crate::basic::Type as PhysicalType; + + // Get row group and column metadata + let row_group_metadata = metadata.row_group(row_group_idx); + let column_metadata = row_group_metadata.column(column_idx); + + // Check if dictionary page exists + let dict_offset: u64 = match column_metadata.dictionary_page_offset() { + Some(offset) => offset.try_into().map_err(|_| { + ParquetError::General("Dictionary page offset is invalid".to_string()) + })?, + None => return Ok(None), + }; + + // Validate column type - only support BYTE_ARRAY + let physical_type = column_metadata.column_type(); + if physical_type != PhysicalType::BYTE_ARRAY { + return Err(ParquetError::General(format!( + "load_column_dictionary only supports BYTE_ARRAY columns, got {:?}", + physical_type + ))); + } + + // Calculate dictionary page length + let data_page_offset: u64 = column_metadata + .data_page_offset() + .try_into() + .map_err(|_| ParquetError::General("Data page offset is invalid".to_string()))?; + + let dict_length = data_page_offset - dict_offset; + + // Fetch dictionary page bytes asynchronously + let buffer = fetch + .fetch(dict_offset..dict_offset + dict_length) + .await?; + + // Decode the dictionary page + let schema = metadata.file_metadata().schema_descr(); + let column_descriptor = schema.column(column_idx); + + let array = super::dictionary::decode_dictionary_page( + buffer, + column_metadata, + &column_descriptor, + )?; + + Ok(Some(array)) + } } /// The bounds needed to read page indexes From d2392f5d488a338e7fc6979f987d370744853da2 Mon Sep 17 00:00:00 2001 From: "Oleg V. Kozlyuk" Date: Sun, 28 Dec 2025 17:17:26 +0100 Subject: [PATCH 3/3] Added async tests, moved tests to reader.rs --- parquet/src/file/metadata/dictionary.rs | 105 --------- parquet/src/file/metadata/reader.rs | 294 +++++++++++++++++++++++- 2 files changed, 293 insertions(+), 106 deletions(-) diff --git a/parquet/src/file/metadata/dictionary.rs b/parquet/src/file/metadata/dictionary.rs index 9bbabb7ec2d0..6eba58024441 100644 --- a/parquet/src/file/metadata/dictionary.rs +++ b/parquet/src/file/metadata/dictionary.rs @@ -154,108 +154,3 @@ pub fn is_utf8_column(column_descr: &ColumnDescriptor) -> bool { ConvertedType::UTF8 | ConvertedType::JSON ) } - -#[cfg(test)] -mod tests { - use crate::file::metadata::ParquetMetaDataReader; - use crate::util::test_common::file_util::get_test_file; - use arrow_array::{Array, BinaryArray, StringArray}; - use bytes::Bytes; - use std::io::Read; - - /// Helper to read a test file into bytes - fn read_test_file(file_name: &str) -> Bytes { - let mut file = get_test_file(file_name); - let mut buffer = Vec::new(); - file.read_to_end(&mut buffer).unwrap(); - Bytes::from(buffer) - } - - #[test] - fn test_read_column_dictionary_uncompressed() { - // plain-dict-uncompressed-checksum.parquet: col 1 = binary_field (BYTE_ARRAY dict, uncompressed) - // Note: No STRING annotation, so returns BinaryArray - let file_data = read_test_file("plain-dict-uncompressed-checksum.parquet"); - let reader = ParquetMetaDataReader::new(); - let metadata = reader.parse_and_finish(&file_data).unwrap(); - - let dict = ParquetMetaDataReader::read_column_dictionary(&file_data, &metadata, 0, 1).unwrap(); - - assert!(dict.is_some()); - let dict_array = dict.unwrap(); - let binary_array = dict_array - .as_any() - .downcast_ref::() - .expect("Expected BinaryArray"); - assert!(binary_array.len() > 0); - } - - #[test] - fn test_read_column_dictionary_binary() { - // alltypes_dictionary.parquet: col 8 = date_string_col (BYTE_ARRAY with dict, NO STRING annotation) - let file_data = read_test_file("alltypes_dictionary.parquet"); - let reader = ParquetMetaDataReader::new(); - let metadata = reader.parse_and_finish(&file_data).unwrap(); - - let dict = ParquetMetaDataReader::read_column_dictionary(&file_data, &metadata, 0, 8).unwrap(); - - assert!(dict.is_some()); - let dict_array = dict.unwrap(); - let binary_array = dict_array - .as_any() - .downcast_ref::() - .expect("Expected BinaryArray for pure BYTE_ARRAY column"); - assert!(binary_array.len() > 0); - } - - #[test] - fn test_read_column_dictionary_none() { - // alltypes_tiny_pages_plain.parquet: col 9 = string_col (PLAIN encoding, NO dictionary) - let file_data = read_test_file("alltypes_tiny_pages_plain.parquet"); - let reader = ParquetMetaDataReader::new(); - let metadata = reader.parse_and_finish(&file_data).unwrap(); - - let dict = ParquetMetaDataReader::read_column_dictionary(&file_data, &metadata, 0, 9).unwrap(); - - assert!(dict.is_none()); - } - - #[test] - fn test_read_column_dictionary_compressed() { - // rle-dict-snappy-checksum.parquet: col 1 = binary_field (BYTE_ARRAY with Snappy-compressed dict) - let file_data = read_test_file("rle-dict-snappy-checksum.parquet"); - let reader = ParquetMetaDataReader::new(); - let metadata = reader.parse_and_finish(&file_data).unwrap(); - - // Verify compression - assert_eq!( - metadata.row_group(0).column(1).compression(), - crate::basic::Compression::SNAPPY - ); - - let dict = ParquetMetaDataReader::read_column_dictionary(&file_data, &metadata, 0, 1).unwrap(); - - assert!(dict.is_some()); - let dict_array = dict.unwrap(); - // binary_field has no STRING annotation, so expect BinaryArray - let binary_array = dict_array - .as_any() - .downcast_ref::() - .expect("Expected BinaryArray"); - assert!(binary_array.len() > 0); - } - - #[test] - fn test_read_column_dictionary_wrong_type() { - // plain-dict-uncompressed-checksum.parquet: col 0 = long_field (INT32 with dict) - let file_data = read_test_file("plain-dict-uncompressed-checksum.parquet"); - let reader = ParquetMetaDataReader::new(); - let metadata = reader.parse_and_finish(&file_data).unwrap(); - - // INT32 column should fail - let result = ParquetMetaDataReader::read_column_dictionary(&file_data, &metadata, 0, 0); - - assert!(result.is_err()); - assert!(result.unwrap_err().to_string().contains("BYTE_ARRAY")); - } -} diff --git a/parquet/src/file/metadata/reader.rs b/parquet/src/file/metadata/reader.rs index f874dad7d4bb..8ef1920f63f4 100644 --- a/parquet/src/file/metadata/reader.rs +++ b/parquet/src/file/metadata/reader.rs @@ -1196,6 +1196,129 @@ mod tests { "EOF: Parquet file too small. Size is 1728 but need 1729" ); } + + /// Helper to read a test file into bytes + fn read_test_file(file_name: &str) -> Bytes { + use std::io::Read; + let mut file = get_test_file(file_name); + let mut buffer = Vec::new(); + file.read_to_end(&mut buffer).unwrap(); + Bytes::from(buffer) + } + + #[test] + #[cfg(feature = "arrow")] + fn test_read_column_dictionary_uncompressed() { + use arrow_array::{Array, BinaryArray}; + + // plain-dict-uncompressed-checksum.parquet: col 1 = binary_field (BYTE_ARRAY dict, uncompressed) + // Note: No STRING annotation, so returns BinaryArray + let file_data = read_test_file("plain-dict-uncompressed-checksum.parquet"); + let reader = ParquetMetaDataReader::new(); + let metadata = reader.parse_and_finish(&file_data).unwrap(); + + let dict = ParquetMetaDataReader::read_column_dictionary(&file_data, &metadata, 0, 1).unwrap(); + + assert!(dict.is_some()); + let dict_array = dict.unwrap(); + let binary_array = dict_array + .as_any() + .downcast_ref::() + .expect("Expected BinaryArray"); + + // Verify dictionary contents + assert_eq!(binary_array.len(), 1); + assert_eq!( + binary_array.value(0), + b"a655fd0e-9949-4059-bcae-fd6a002a4652" + ); + } + + #[test] + #[cfg(feature = "arrow")] + fn test_read_column_dictionary_binary() { + use arrow_array::{Array, BinaryArray}; + + // alltypes_dictionary.parquet: col 8 = date_string_col (BYTE_ARRAY with dict, NO STRING annotation) + let file_data = read_test_file("alltypes_dictionary.parquet"); + let reader = ParquetMetaDataReader::new(); + let metadata = reader.parse_and_finish(&file_data).unwrap(); + + let dict = ParquetMetaDataReader::read_column_dictionary(&file_data, &metadata, 0, 8).unwrap(); + + assert!(dict.is_some()); + let dict_array = dict.unwrap(); + let binary_array = dict_array + .as_any() + .downcast_ref::() + .expect("Expected BinaryArray for pure BYTE_ARRAY column"); + + // Verify dictionary contents + assert_eq!(binary_array.len(), 1); + assert_eq!(binary_array.value(0), b"01/01/09"); + } + + #[test] + #[cfg(feature = "arrow")] + fn test_read_column_dictionary_none() { + // alltypes_tiny_pages_plain.parquet: col 9 = string_col (PLAIN encoding, NO dictionary) + let file_data = read_test_file("alltypes_tiny_pages_plain.parquet"); + let reader = ParquetMetaDataReader::new(); + let metadata = reader.parse_and_finish(&file_data).unwrap(); + + let dict = ParquetMetaDataReader::read_column_dictionary(&file_data, &metadata, 0, 9).unwrap(); + + assert!(dict.is_none()); + } + + #[test] + #[cfg(feature = "arrow")] + fn test_read_column_dictionary_compressed() { + use arrow_array::{Array, BinaryArray}; + + // rle-dict-snappy-checksum.parquet: col 1 = binary_field (BYTE_ARRAY with Snappy-compressed dict) + let file_data = read_test_file("rle-dict-snappy-checksum.parquet"); + let reader = ParquetMetaDataReader::new(); + let metadata = reader.parse_and_finish(&file_data).unwrap(); + + // Verify compression + assert_eq!( + metadata.row_group(0).column(1).compression(), + crate::basic::Compression::SNAPPY + ); + + let dict = ParquetMetaDataReader::read_column_dictionary(&file_data, &metadata, 0, 1).unwrap(); + + assert!(dict.is_some()); + let dict_array = dict.unwrap(); + // binary_field has no STRING annotation, so expect BinaryArray + let binary_array = dict_array + .as_any() + .downcast_ref::() + .expect("Expected BinaryArray"); + + // Verify dictionary contents + assert_eq!(binary_array.len(), 1); + assert_eq!( + binary_array.value(0), + b"c95e263a-f5d4-401f-8107-5ca7146a1f98" + ); + } + + #[test] + #[cfg(feature = "arrow")] + fn test_read_column_dictionary_wrong_type() { + // plain-dict-uncompressed-checksum.parquet: col 0 = long_field (INT32 with dict) + let file_data = read_test_file("plain-dict-uncompressed-checksum.parquet"); + let reader = ParquetMetaDataReader::new(); + let metadata = reader.parse_and_finish(&file_data).unwrap(); + + // INT32 column should fail + let result = ParquetMetaDataReader::read_column_dictionary(&file_data, &metadata, 0, 0); + + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("BYTE_ARRAY")); + } } #[cfg(all(feature = "async", feature = "arrow", test))] @@ -1203,7 +1326,7 @@ mod async_tests { use super::*; use arrow::{array::Int32Array, datatypes::DataType}; - use arrow_array::RecordBatch; + use arrow_array::{Array, BinaryArray, RecordBatch}; use arrow_schema::{Field, Schema}; use bytes::Bytes; use futures::FutureExt; @@ -1602,4 +1725,173 @@ mod async_tests { read_and_check(f.as_file(), PageIndexPolicy::Optional).unwrap(); read_and_check(f.as_file(), PageIndexPolicy::Skip).unwrap(); } + + #[tokio::test] + async fn test_load_column_dictionary_uncompressed() { + // plain-dict-uncompressed-checksum.parquet: col 1 = binary_field (BYTE_ARRAY dict, uncompressed) + // Note: No STRING annotation, so returns BinaryArray + let mut file = get_test_file("plain-dict-uncompressed-checksum.parquet"); + + // Parse metadata to get schema info + let file_for_parse = file.try_clone().unwrap(); + let metadata = ParquetMetaDataReader::new() + .parse_and_finish(&file_for_parse) + .unwrap(); + + let mut fetch = |range| { + futures::future::ready(read_range(&mut file, range)) + }; + let input = MetadataFetchFn(&mut fetch); + + let dict = ParquetMetaDataReader::load_column_dictionary( + input, + &metadata, + 0, // row_group_idx + 1, // column_idx + ).await.unwrap(); + + assert!(dict.is_some()); + let dict_array = dict.unwrap(); + let binary_array = dict_array + .as_any() + .downcast_ref::() + .expect("Expected BinaryArray"); + + // Verify dictionary contents + assert_eq!(binary_array.len(), 1); + assert_eq!( + binary_array.value(0), + b"a655fd0e-9949-4059-bcae-fd6a002a4652" + ); + } + + #[tokio::test] + async fn test_load_column_dictionary_binary() { + // alltypes_dictionary.parquet: col 8 = date_string_col (BYTE_ARRAY with dict, NO STRING annotation) + let mut file = get_test_file("alltypes_dictionary.parquet"); + + let file_for_parse = file.try_clone().unwrap(); + let metadata = ParquetMetaDataReader::new() + .parse_and_finish(&file_for_parse) + .unwrap(); + + let mut fetch = |range| { + futures::future::ready(read_range(&mut file, range)) + }; + let input = MetadataFetchFn(&mut fetch); + + let dict = ParquetMetaDataReader::load_column_dictionary( + input, + &metadata, + 0, // row_group_idx + 8, // column_idx + ).await.unwrap(); + + assert!(dict.is_some()); + let dict_array = dict.unwrap(); + let binary_array = dict_array + .as_any() + .downcast_ref::() + .expect("Expected BinaryArray for pure BYTE_ARRAY column"); + + // Verify dictionary contents + assert_eq!(binary_array.len(), 1); + assert_eq!(binary_array.value(0), b"01/01/09"); + } + + #[tokio::test] + async fn test_load_column_dictionary_none() { + // alltypes_tiny_pages_plain.parquet: col 9 = string_col (PLAIN encoding, NO dictionary) + let mut file = get_test_file("alltypes_tiny_pages_plain.parquet"); + + let file_for_parse = file.try_clone().unwrap(); + let metadata = ParquetMetaDataReader::new() + .parse_and_finish(&file_for_parse) + .unwrap(); + + let mut fetch = |range| { + futures::future::ready(read_range(&mut file, range)) + }; + let input = MetadataFetchFn(&mut fetch); + + let dict = ParquetMetaDataReader::load_column_dictionary( + input, + &metadata, + 0, // row_group_idx + 9, // column_idx + ).await.unwrap(); + + assert!(dict.is_none()); + } + + #[tokio::test] + async fn test_load_column_dictionary_compressed() { + // rle-dict-snappy-checksum.parquet: col 1 = binary_field (BYTE_ARRAY with Snappy-compressed dict) + let mut file = get_test_file("rle-dict-snappy-checksum.parquet"); + + let file_for_parse = file.try_clone().unwrap(); + let metadata = ParquetMetaDataReader::new() + .parse_and_finish(&file_for_parse) + .unwrap(); + + // Verify compression + assert_eq!( + metadata.row_group(0).column(1).compression(), + crate::basic::Compression::SNAPPY + ); + + let mut fetch = |range| { + futures::future::ready(read_range(&mut file, range)) + }; + let input = MetadataFetchFn(&mut fetch); + + let dict = ParquetMetaDataReader::load_column_dictionary( + input, + &metadata, + 0, // row_group_idx + 1, // column_idx + ).await.unwrap(); + + assert!(dict.is_some()); + let dict_array = dict.unwrap(); + // binary_field has no STRING annotation, so expect BinaryArray + let binary_array = dict_array + .as_any() + .downcast_ref::() + .expect("Expected BinaryArray"); + + // Verify dictionary contents + assert_eq!(binary_array.len(), 1); + assert_eq!( + binary_array.value(0), + b"c95e263a-f5d4-401f-8107-5ca7146a1f98" + ); + } + + #[tokio::test] + async fn test_load_column_dictionary_wrong_type() { + // plain-dict-uncompressed-checksum.parquet: col 0 = long_field (INT32 with dict) + let mut file = get_test_file("plain-dict-uncompressed-checksum.parquet"); + + let file_for_parse = file.try_clone().unwrap(); + let metadata = ParquetMetaDataReader::new() + .parse_and_finish(&file_for_parse) + .unwrap(); + + let mut fetch = |range| { + futures::future::ready(read_range(&mut file, range)) + }; + let input = MetadataFetchFn(&mut fetch); + + // INT32 column should fail + let result = ParquetMetaDataReader::load_column_dictionary( + input, + &metadata, + 0, // row_group_idx + 0, // column_idx + ).await; + + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("BYTE_ARRAY")); + } }