Skip to content

Commit 795b246

Browse files
gruuyarichox
authored andcommitted
feat(parquet): relax type compatility check in parquet ArrowWriter (apache#9099)
- Closes apache#9098. Don't require strict equality for nested fields (including inner field name/metadata), just require that nested data types are logically equivalent. Use `a.equals_datatype(b)` instead of `a == b` at the start of `LevelInfoBuilder::types_compatible`. Yes.
1 parent 19d40bd commit 795b246

File tree

2 files changed

+285
-2
lines changed

2 files changed

+285
-2
lines changed

parquet/src/arrow/arrow_writer/levels.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -541,6 +541,51 @@ impl LevelInfoBuilder {
541541
}
542542
}
543543
}
544+
545+
/// Determine if the fields are compatible for purposes of constructing `LevelBuilderInfo`.
546+
///
547+
/// Fields are compatible if they're the same type. Otherwise if one of them is a dictionary
548+
/// and the other is a native array, the dictionary values must have the same type as the
549+
/// native array
550+
fn types_compatible(a: &DataType, b: &DataType) -> bool {
551+
// if the Arrow data types are equal, the types are deemed compatible
552+
if a.equals_datatype(b) {
553+
return true;
554+
}
555+
556+
// get the values out of the dictionaries
557+
let (a, b) = match (a, b) {
558+
(DataType::Dictionary(_, va), DataType::Dictionary(_, vb)) => {
559+
(va.as_ref(), vb.as_ref())
560+
}
561+
(DataType::Dictionary(_, v), b) => (v.as_ref(), b),
562+
(a, DataType::Dictionary(_, v)) => (a, v.as_ref()),
563+
_ => (a, b),
564+
};
565+
566+
// now that we've got the values from one/both dictionaries, if the values
567+
// have the same Arrow data type, they're compatible
568+
if a == b {
569+
return true;
570+
}
571+
572+
// here we have different Arrow data types, but if the array contains the same type of data
573+
// then we consider the type compatible
574+
match a {
575+
// String, StringView and LargeString are compatible
576+
DataType::Utf8 => matches!(b, DataType::LargeUtf8 | DataType::Utf8View),
577+
DataType::Utf8View => matches!(b, DataType::LargeUtf8 | DataType::Utf8),
578+
DataType::LargeUtf8 => matches!(b, DataType::Utf8 | DataType::Utf8View),
579+
580+
// Binary, BinaryView and LargeBinary are compatible
581+
DataType::Binary => matches!(b, DataType::LargeBinary | DataType::BinaryView),
582+
DataType::BinaryView => matches!(b, DataType::LargeBinary | DataType::Binary),
583+
DataType::LargeBinary => matches!(b, DataType::Binary | DataType::BinaryView),
584+
585+
// otherwise we have incompatible types
586+
_ => false,
587+
}
588+
}
544589
}
545590
/// The data necessary to write a primitive Arrow array to parquet, taking into account
546591
/// any non-primitive parents it may have in the arrow representation

parquet/src/arrow/arrow_writer/mod.rs

Lines changed: 240 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1074,17 +1074,24 @@ fn get_fsb_array_slice(
10741074
#[cfg(test)]
10751075
mod tests {
10761076
use super::*;
1077+
use std::collections::HashMap;
10771078

10781079
use std::fs::File;
10791080

10801081
use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1081-
use crate::arrow::ARROW_SCHEMA_META_KEY;
1082+
use crate::arrow::{ARROW_SCHEMA_META_KEY, PARQUET_FIELD_ID_META_KEY};
1083+
use crate::column::page::{Page, PageReader};
1084+
use crate::file::metadata::thrift::PageHeader;
1085+
use crate::file::page_index::column_index::ColumnIndexMetaData;
1086+
use crate::file::reader::SerializedPageReader;
1087+
use crate::parquet_thrift::{ReadThrift, ThriftSliceInputProtocol};
1088+
use crate::schema::types::{ColumnPath, Type};
10821089
use arrow::datatypes::ToByteSlice;
10831090
use arrow::datatypes::{DataType, Schema};
10841091
use arrow::error::Result as ArrowResult;
10851092
use arrow::util::pretty::pretty_format_batches;
10861093
use arrow::{array::*, buffer::Buffer};
1087-
use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano, NullBuffer};
1094+
use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano, NullBuffer, OffsetBuffer, i256};
10881095
use arrow_schema::Fields;
10891096

10901097
use crate::basic::Encoding;
@@ -2469,6 +2476,237 @@ mod tests {
24692476
one_column_roundtrip_with_schema(Arc::new(d), schema);
24702477
}
24712478

2479+
#[test]
2480+
fn arrow_writer_test_type_compatibility() {
2481+
fn ensure_compatible_write<T1, T2>(array1: T1, array2: T2, expected_result: T1)
2482+
where
2483+
T1: Array + 'static,
2484+
T2: Array + 'static,
2485+
{
2486+
let schema1 = Arc::new(Schema::new(vec![Field::new(
2487+
"a",
2488+
array1.data_type().clone(),
2489+
false,
2490+
)]));
2491+
2492+
let file = tempfile().unwrap();
2493+
let mut writer =
2494+
ArrowWriter::try_new(file.try_clone().unwrap(), schema1.clone(), None).unwrap();
2495+
2496+
let rb1 = RecordBatch::try_new(schema1.clone(), vec![Arc::new(array1)]).unwrap();
2497+
writer.write(&rb1).unwrap();
2498+
2499+
let schema2 = Arc::new(Schema::new(vec![Field::new(
2500+
"a",
2501+
array2.data_type().clone(),
2502+
false,
2503+
)]));
2504+
let rb2 = RecordBatch::try_new(schema2, vec![Arc::new(array2)]).unwrap();
2505+
writer.write(&rb2).unwrap();
2506+
2507+
writer.close().unwrap();
2508+
2509+
let mut record_batch_reader =
2510+
ParquetRecordBatchReader::try_new(file.try_clone().unwrap(), 1024).unwrap();
2511+
let actual_batch = record_batch_reader.next().unwrap().unwrap();
2512+
2513+
let expected_batch =
2514+
RecordBatch::try_new(schema1, vec![Arc::new(expected_result)]).unwrap();
2515+
assert_eq!(actual_batch, expected_batch);
2516+
}
2517+
2518+
// check compatibility between native and dictionaries
2519+
2520+
ensure_compatible_write(
2521+
DictionaryArray::new(
2522+
UInt8Array::from_iter_values(vec![0]),
2523+
Arc::new(StringArray::from_iter_values(vec!["parquet"])),
2524+
),
2525+
StringArray::from_iter_values(vec!["barquet"]),
2526+
DictionaryArray::new(
2527+
UInt8Array::from_iter_values(vec![0, 1]),
2528+
Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
2529+
),
2530+
);
2531+
2532+
ensure_compatible_write(
2533+
StringArray::from_iter_values(vec!["parquet"]),
2534+
DictionaryArray::new(
2535+
UInt8Array::from_iter_values(vec![0]),
2536+
Arc::new(StringArray::from_iter_values(vec!["barquet"])),
2537+
),
2538+
StringArray::from_iter_values(vec!["parquet", "barquet"]),
2539+
);
2540+
2541+
// check compatibility between dictionaries with different key types
2542+
2543+
ensure_compatible_write(
2544+
DictionaryArray::new(
2545+
UInt8Array::from_iter_values(vec![0]),
2546+
Arc::new(StringArray::from_iter_values(vec!["parquet"])),
2547+
),
2548+
DictionaryArray::new(
2549+
UInt16Array::from_iter_values(vec![0]),
2550+
Arc::new(StringArray::from_iter_values(vec!["barquet"])),
2551+
),
2552+
DictionaryArray::new(
2553+
UInt8Array::from_iter_values(vec![0, 1]),
2554+
Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
2555+
),
2556+
);
2557+
2558+
// check compatibility between dictionaries with different value types
2559+
ensure_compatible_write(
2560+
DictionaryArray::new(
2561+
UInt8Array::from_iter_values(vec![0]),
2562+
Arc::new(StringArray::from_iter_values(vec!["parquet"])),
2563+
),
2564+
DictionaryArray::new(
2565+
UInt8Array::from_iter_values(vec![0]),
2566+
Arc::new(LargeStringArray::from_iter_values(vec!["barquet"])),
2567+
),
2568+
DictionaryArray::new(
2569+
UInt8Array::from_iter_values(vec![0, 1]),
2570+
Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
2571+
),
2572+
);
2573+
2574+
// check compatibility between a dictionary and a native array with a different type
2575+
ensure_compatible_write(
2576+
DictionaryArray::new(
2577+
UInt8Array::from_iter_values(vec![0]),
2578+
Arc::new(StringArray::from_iter_values(vec!["parquet"])),
2579+
),
2580+
LargeStringArray::from_iter_values(vec!["barquet"]),
2581+
DictionaryArray::new(
2582+
UInt8Array::from_iter_values(vec![0, 1]),
2583+
Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
2584+
),
2585+
);
2586+
2587+
// check compatibility for string types
2588+
2589+
ensure_compatible_write(
2590+
StringArray::from_iter_values(vec!["parquet"]),
2591+
LargeStringArray::from_iter_values(vec!["barquet"]),
2592+
StringArray::from_iter_values(vec!["parquet", "barquet"]),
2593+
);
2594+
2595+
ensure_compatible_write(
2596+
LargeStringArray::from_iter_values(vec!["parquet"]),
2597+
StringArray::from_iter_values(vec!["barquet"]),
2598+
LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
2599+
);
2600+
2601+
ensure_compatible_write(
2602+
StringArray::from_iter_values(vec!["parquet"]),
2603+
StringViewArray::from_iter_values(vec!["barquet"]),
2604+
StringArray::from_iter_values(vec!["parquet", "barquet"]),
2605+
);
2606+
2607+
ensure_compatible_write(
2608+
StringViewArray::from_iter_values(vec!["parquet"]),
2609+
StringArray::from_iter_values(vec!["barquet"]),
2610+
StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
2611+
);
2612+
2613+
ensure_compatible_write(
2614+
LargeStringArray::from_iter_values(vec!["parquet"]),
2615+
StringViewArray::from_iter_values(vec!["barquet"]),
2616+
LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
2617+
);
2618+
2619+
ensure_compatible_write(
2620+
StringViewArray::from_iter_values(vec!["parquet"]),
2621+
LargeStringArray::from_iter_values(vec!["barquet"]),
2622+
StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
2623+
);
2624+
2625+
// check compatibility for binary types
2626+
2627+
ensure_compatible_write(
2628+
BinaryArray::from_iter_values(vec![b"parquet"]),
2629+
LargeBinaryArray::from_iter_values(vec![b"barquet"]),
2630+
BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
2631+
);
2632+
2633+
ensure_compatible_write(
2634+
LargeBinaryArray::from_iter_values(vec![b"parquet"]),
2635+
BinaryArray::from_iter_values(vec![b"barquet"]),
2636+
LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
2637+
);
2638+
2639+
ensure_compatible_write(
2640+
BinaryArray::from_iter_values(vec![b"parquet"]),
2641+
BinaryViewArray::from_iter_values(vec![b"barquet"]),
2642+
BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
2643+
);
2644+
2645+
ensure_compatible_write(
2646+
BinaryViewArray::from_iter_values(vec![b"parquet"]),
2647+
BinaryArray::from_iter_values(vec![b"barquet"]),
2648+
BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
2649+
);
2650+
2651+
ensure_compatible_write(
2652+
BinaryViewArray::from_iter_values(vec![b"parquet"]),
2653+
LargeBinaryArray::from_iter_values(vec![b"barquet"]),
2654+
BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
2655+
);
2656+
2657+
ensure_compatible_write(
2658+
LargeBinaryArray::from_iter_values(vec![b"parquet"]),
2659+
BinaryViewArray::from_iter_values(vec![b"barquet"]),
2660+
LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
2661+
);
2662+
2663+
// check compatibility for list types
2664+
2665+
let list_field_metadata = HashMap::from_iter(vec![(
2666+
PARQUET_FIELD_ID_META_KEY.to_string(),
2667+
"1".to_string(),
2668+
)]);
2669+
let list_field = Field::new_list_field(DataType::Int32, false);
2670+
2671+
let values1 = Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4]));
2672+
let offsets1 = OffsetBuffer::new(vec![0, 2, 5].into());
2673+
2674+
let values2 = Arc::new(Int32Array::from(vec![5, 6, 7, 8, 9]));
2675+
let offsets2 = OffsetBuffer::new(vec![0, 3, 5].into());
2676+
2677+
let values_expected = Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]));
2678+
let offsets_expected = OffsetBuffer::new(vec![0, 2, 5, 8, 10].into());
2679+
2680+
ensure_compatible_write(
2681+
// when the initial schema has the metadata ...
2682+
ListArray::try_new(
2683+
Arc::new(
2684+
list_field
2685+
.clone()
2686+
.with_metadata(list_field_metadata.clone()),
2687+
),
2688+
offsets1,
2689+
values1,
2690+
None,
2691+
)
2692+
.unwrap(),
2693+
// ... and some intermediate schema doesn't have the metadata
2694+
ListArray::try_new(Arc::new(list_field.clone()), offsets2, values2, None).unwrap(),
2695+
// ... the write will still go through, and the resulting schema will inherit the initial metadata
2696+
ListArray::try_new(
2697+
Arc::new(
2698+
list_field
2699+
.clone()
2700+
.with_metadata(list_field_metadata.clone()),
2701+
),
2702+
offsets_expected,
2703+
values_expected,
2704+
None,
2705+
)
2706+
.unwrap(),
2707+
);
2708+
}
2709+
24722710
#[test]
24732711
fn arrow_writer_primitive_dictionary() {
24742712
// define schema

0 commit comments

Comments
 (0)