diff --git a/parquet/src/file/metadata/writer.rs b/parquet/src/file/metadata/writer.rs index 124bc11bddc5..0f865bfafee4 100644 --- a/parquet/src/file/metadata/writer.rs +++ b/parquet/src/file/metadata/writer.rs @@ -113,6 +113,10 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> { for (row_group_idx, row_group) in self.row_groups.iter_mut().enumerate() { for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate() { if let Some(column_index) = &column_indexes[row_group_idx][column_idx] { + // Missing indexes may also have the placeholder ColumnIndexMetaData::NONE + if matches!(column_index, ColumnIndexMetaData::NONE) { + continue; + } let start_offset = self.buf.bytes_written(); self.object_writer.write_column_index( column_index, @@ -227,22 +231,38 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> { None => builder.set_row_groups(row_groups), }; - let column_indexes: Option = column_indexes.map(|ovvi| { - ovvi.into_iter() - .map(|vi| { - vi.into_iter() - .map(|oi| oi.unwrap_or(ColumnIndexMetaData::NONE)) - .collect() - }) - .collect() - }); - - // FIXME(ets): this will panic if there's a missing index. - let offset_indexes: Option = offset_indexes.map(|ovvi| { - ovvi.into_iter() - .map(|vi| vi.into_iter().map(|oi| oi.unwrap()).collect()) - .collect() - }); + // test to see if all indexes for this file are empty + let all_none = column_indexes + .as_ref() + .is_some_and(|ci| ci.iter().all(|cii| cii.iter().all(|idx| idx.is_none()))); + let column_indexes: Option = if all_none { + None + } else { + column_indexes.map(|ovvi| { + ovvi.into_iter() + .map(|vi| { + vi.into_iter() + .map(|oi| oi.unwrap_or(ColumnIndexMetaData::NONE)) + .collect() + }) + .collect() + }) + }; + + // test to see if all indexes for this file are empty + let all_none = offset_indexes + .as_ref() + .is_some_and(|oi| oi.iter().all(|oii| oii.iter().all(|idx| idx.is_none()))); + let offset_indexes: Option = if all_none { + None + } else { + // FIXME(ets): this will panic if there's a missing index. + offset_indexes.map(|ovvi| { + ovvi.into_iter() + .map(|vi| vi.into_iter().map(|oi| oi.unwrap()).collect()) + .collect() + }) + }; builder = builder.set_column_index(column_indexes); builder = builder.set_offset_index(offset_indexes); diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index cbbcadf2067c..35948af022f1 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -1068,6 +1068,7 @@ mod tests { use crate::schema::parser::parse_message_type; use crate::schema::types; use crate::schema::types::{ColumnDescriptor, ColumnPath}; + use crate::util::test_common::file_util::get_test_file; use crate::util::test_common::rand_gen::RandGen; #[test] @@ -2442,4 +2443,74 @@ mod tests { start += 1; } } + + #[test] + fn test_rewrite_no_page_indexes() { + let file = get_test_file("alltypes_tiny_pages.parquet"); + let metadata = ParquetMetaDataReader::new() + .with_page_index_policy(PageIndexPolicy::Optional) + .parse_and_finish(&file) + .unwrap(); + + let props = Arc::new(WriterProperties::builder().build()); + let schema = metadata.file_metadata().schema_descr().root_schema_ptr(); + let output = Vec::::new(); + let mut writer = SerializedFileWriter::new(output, schema, props).unwrap(); + + for rg in metadata.row_groups() { + let mut rg_out = writer.next_row_group().unwrap(); + for column in rg.columns() { + let result = ColumnCloseResult { + bytes_written: column.compressed_size() as _, + rows_written: rg.num_rows() as _, + metadata: column.clone(), + bloom_filter: None, + column_index: None, + offset_index: None, + }; + rg_out.append_column(&file, result).unwrap(); + } + rg_out.close().unwrap(); + } + writer.close().unwrap(); + } + + #[test] + fn test_rewrite_missing_column_index() { + // this file has an INT96 column that lacks a column index entry + let file = get_test_file("alltypes_tiny_pages.parquet"); + let metadata = ParquetMetaDataReader::new() + .with_page_index_policy(PageIndexPolicy::Optional) + .parse_and_finish(&file) + .unwrap(); + + let props = Arc::new(WriterProperties::builder().build()); + let schema = metadata.file_metadata().schema_descr().root_schema_ptr(); + let output = Vec::::new(); + let mut writer = SerializedFileWriter::new(output, schema, props).unwrap(); + + let column_indexes = metadata.column_index(); + let offset_indexes = metadata.offset_index(); + + for (rg_idx, rg) in metadata.row_groups().iter().enumerate() { + let rg_column_indexes = column_indexes.and_then(|ci| ci.get(rg_idx)); + let rg_offset_indexes = offset_indexes.and_then(|oi| oi.get(rg_idx)); + let mut rg_out = writer.next_row_group().unwrap(); + for (col_idx, column) in rg.columns().iter().enumerate() { + let column_index = rg_column_indexes.and_then(|row| row.get(col_idx)).cloned(); + let offset_index = rg_offset_indexes.and_then(|row| row.get(col_idx)).cloned(); + let result = ColumnCloseResult { + bytes_written: column.compressed_size() as _, + rows_written: rg.num_rows() as _, + metadata: column.clone(), + bloom_filter: None, + column_index, + offset_index, + }; + rg_out.append_column(&file, result).unwrap(); + } + rg_out.close().unwrap(); + } + writer.close().unwrap(); + } }