8839: Add support for Union types in RowConverter#20
8839: Add support for Union types in RowConverter#20martin-augment wants to merge 1 commit intomainfrom
Union types in RowConverter#20Conversation
WalkthroughThe pull request adds comprehensive support for Union data types to the row-based encoding/decoding system. Changes include: a new Union variant in the Codec enum storing per-type-branch converters and mode information; extension of the Encoder enum with a Union variant containing child rows, type identifiers, offsets, and union mode; Union handling throughout the codec lifecycle including construction, encoding/decoding, and length calculation; and validation that Union types are supported in sort operations. The implementation handles both Dense and Sparse union modes and includes tests for round-trip encoding, type preservation, and ordering semantics. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
|
||
| let mut child_rows = Vec::with_capacity(converters.len()); | ||
| for (type_id, converter) in converters.iter().enumerate() { | ||
| let child_array = union_array.child(type_id as i8); |
There was a problem hiding this comment.
Using the enumerate index as type_id when calling union_array.child(type_id as i8) assumes union type IDs are dense and aligned with field order. Consider deriving the mapping from the actual union field type IDs; otherwise this may select the wrong child (or panic) when type IDs are non-contiguous.
🤖 Was this useful? React with 👍 or 👎
There was a problem hiding this comment.
value:incorrect-but-reasonable; category:bug; feedback:The Augment AI reviewer is not correct. For sparse unions the index could be used to get the variant/field of the union since sparse unions contain all its variants. Dense unions contain only an offset (the variant index) and the value for that variant. https://github.com/apache/arrow-rs/pull/8839/files#diff-8235802124a5a88240195fc4b8792cf39bb9a8703eb69308d10358966eeff765R1607 has the logic responsible for this.
| } | ||
| }; | ||
|
|
||
| let child_row = child_rows[type_id as usize].row(child_row_i); |
There was a problem hiding this comment.
Indexing child_rows by type_id as usize assumes type IDs are 0..N-1 and match the vector order. This can read the wrong child rows for unions with non-dense or non-zero-based type IDs (also applies to similar indexing in encode_column).
🤖 Was this useful? React with 👍 or 👎
There was a problem hiding this comment.
value:incorrect-but-reasonable; category:bug; feedback:The Augment AI reviewer is not correct. For sparse unions the index could be used to get the variant/field of the union since sparse unions contain all its variants. Dense unions contain only an offset (the variant index) and the value for that variant. https://github.com/apache/arrow-rs/pull/8839/files#diff-8235802124a5a88240195fc4b8792cf39bb9a8703eb69308d10358966eeff765R1607 has the logic responsible for this.
| mode, | ||
| } => { | ||
| let _union_array = column.as_any().downcast_ref::<UnionArray>().unwrap(); | ||
| let null_sentinel = if opts.descending { 0x00 } else { 0x01 }; |
There was a problem hiding this comment.
The first byte (null sentinel) is set solely based on opts.descending and not on validity, unlike other encoders that write 0x01 for valid and null_sentinel(opts) for null. This can produce inconsistent ordering/round-tripping for descending sorts and arrays with nulls.
🤖 Was this useful? React with 👍 or 👎
There was a problem hiding this comment.
value:annoying; category:bug; feedback:The Augment AI reviewer is not correct. The other encoders use the null_first option to calculate the sentinel value. See https://github.com/pydantic/arrow-rs/blob/55590118f1492addfbb40a736bdf7d67123d303f/arrow-row/src/lib.rs
| if options.descending { !id } else { id } | ||
| }; | ||
|
|
||
| let type_id = type_id_byte as i8; |
There was a problem hiding this comment.
Using type_id as usize to index rows_by_field/null_rows assumes dense, zero-based type IDs aligned with converters; this will misplace rows and build incorrect arrays for unions with arbitrary type IDs. (Related to the mapping issue noted above.)
🤖 Was this useful? React with 👍 or 👎
There was a problem hiding this comment.
value:incorrect-but-reasonable; category:bug; feedback:The Augment AI reviewer is not correct. For sparse unions the index could be used to get the variant/field of the union since sparse unions contain all its variants. Dense unions contain only an offset (the variant index) and the value for that variant. https://github.com/apache/arrow-rs/pull/8839/files#diff-8235802124a5a88240195fc4b8792cf39bb9a8703eb69308d10358966eeff765R1607 has the logic responsible for this.
Code Review: Add support for Union types in RowConverterThank you for implementing Union type support in the RowConverter! This is a valuable addition. Here's my detailed review: Code Quality and Best PracticesPositive Aspects✅ Consistent with existing patterns: The implementation follows the same approach used for Struct and Dictionary types, maintaining code consistency Issues Found1. Typo in comment (arrow-row/src/lib.rs:532)// since the encodedc ontents will be inverted if descending is setShould be: 2. Critical Bug: Type ID mapping issue (arrow-row/src/lib.rs:634)for (type_id, converter) in converters.iter().enumerate() {
let child_array = union_array.child(type_id as i8);Problem: This assumes type_ids are sequential starting from 0, but Arrow Union types can have arbitrary type_ids (e.g., type_ids could be [5, 10, 15]). The current code would:
Fix needed: Use the actual type_ids from the union fields: for (type_id, _field) in union_fields.iter() {
let child_array = union_array.child(type_id);
let converter = &converters[/* need mapping */];You'll need a mapping from type_id to converter index. 3. Same type ID mapping issue in decode (arrow-row/src/lib.rs:1912-1931)The 4. Same issue in child_arrays iteration (arrow-row/src/lib.rs:1939)for (field_idx, converter) in converters.iter().enumerate() {This assumes field indices match array positions, but with arbitrary type_ids this breaks. Potential BugsCritical
Minor
Performance ConsiderationsConcerns
Positive
Security Concerns
Test CoverageGood Coverage ✅
Missing Coverage
|
There was a problem hiding this comment.
Actionable comments posted: 3
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
arrow-row/src/lib.rs(10 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
arrow-row/src/lib.rs (2)
arrow-array/src/array/mod.rs (3)
new_null_array(874-876)offset(191-191)new_empty_array(856-859)arrow-array/src/array/union_array.rs (11)
nulls(460-460)nulls(569-572)nulls(777-779)type_ids(267-269)type_ids(462-465)offsets(272-274)type_id(261-264)offset(773-775)union_fields(2202-2210)Arc(1293-1299)Arc(2239-2244)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: claude-review
- GitHub Check: Cursor Bugbot
🔇 Additional comments (1)
arrow-row/src/lib.rs (1)
1743-1743: Verify null sentinel logic for Union encoding.Line 1743 uses
if opts.descending { 0x00 } else { 0x01 }for the null_sentinel, which differs from the pattern used in other codecs that call thenull_sentinel(opts)function (lines 1379-1384).The
null_sentinel()function considers bothdescendingandnulls_firstoptions. However, this appears to be encoding the "valid" sentinel (0x01) rather than the null sentinel (0x00), which might be intentionally different from struct encoding at line 1695.Please verify this is correct and consider adding a comment explaining why Union uses a different pattern than other nested types like Struct.
| DataType::Union(fields, mode) => { | ||
| // similar to dictionaries and lists, we set descending to false and negate nulls_first | ||
| // since the encodedc ontents will be inverted if descending is set | ||
| let options = SortOptions { | ||
| descending: false, | ||
| nulls_first: sort_field.options.nulls_first != sort_field.options.descending, | ||
| }; | ||
|
|
||
| let mut converters = Vec::with_capacity(fields.len()); | ||
| let mut null_rows = Vec::with_capacity(fields.len()); | ||
|
|
||
| for (_type_id, field) in fields.iter() { | ||
| let sort_field = | ||
| SortField::new_with_options(field.data_type().clone(), options); | ||
| let converter = RowConverter::new(vec![sort_field])?; | ||
|
|
||
| let null_array = new_null_array(field.data_type(), 1); | ||
| let nulls = converter.convert_columns(&[null_array])?; | ||
| let owned = OwnedRow { | ||
| data: nulls.buffer.into(), | ||
| config: nulls.config, | ||
| }; | ||
|
|
||
| converters.push(converter); | ||
| null_rows.push(owned); | ||
| } | ||
|
|
||
| Ok(Self::Union(converters, null_rows, *mode)) | ||
| } |
There was a problem hiding this comment.
Critical: Incorrect indexing for Union converters by type_id.
The code builds converters and null_rows using Vec indexed by iteration order (0, 1, 2, ...), but Union type_ids can be arbitrary i8 values (e.g., [1, 3, 4] as shown in the relevant code snippets).
Later in the encoder (line 637), the code iterates with converters.iter().enumerate(), treating the enumeration index as if it were the type_id. Then at line 638, it calls union_array.child(type_id as i8), which expects the actual union type_id, not the enumeration index.
This causes a mismatch when type_ids are non-sequential. For example, if union_fields has type_ids [1, 3, 4], then:
converters[0]corresponds to type_id 1converters[1]corresponds to type_id 3converters[2]corresponds to type_id 4
But at line 637-638, enumerate() gives (0, 1, 2) and the code treats these as type_ids, incorrectly calling union_array.child(0), union_array.child(1), union_array.child(2) instead of child(1), child(3), child(4).
To fix this, either:
- Use a
HashMap<i8, (RowConverter, OwnedRow)>indexed by actual type_id, or - Store the type_ids alongside the converters and use them for lookup
Example fix using approach 1:
- DataType::Union(fields, mode) => {
- // similar to dictionaries and lists, we set descending to false and negate nulls_first
- // since the encodedc ontents will be inverted if descending is set
- let options = SortOptions {
- descending: false,
- nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
- };
-
- let mut converters = Vec::with_capacity(fields.len());
- let mut null_rows = Vec::with_capacity(fields.len());
-
- for (_type_id, field) in fields.iter() {
- let sort_field =
- SortField::new_with_options(field.data_type().clone(), options);
- let converter = RowConverter::new(vec![sort_field])?;
-
- let null_array = new_null_array(field.data_type(), 1);
- let nulls = converter.convert_columns(&[null_array])?;
- let owned = OwnedRow {
- data: nulls.buffer.into(),
- config: nulls.config,
- };
-
- converters.push(converter);
- null_rows.push(owned);
- }
-
- Ok(Self::Union(converters, null_rows, *mode))
- }
+ DataType::Union(fields, mode) => {
+ let options = SortOptions {
+ descending: false,
+ nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
+ };
+
+ // Store converters indexed by type_id to handle non-sequential type_ids
+ let mut converters = Vec::with_capacity(fields.len());
+ let mut null_rows = Vec::with_capacity(fields.len());
+ let mut type_ids_list = Vec::with_capacity(fields.len());
+
+ for (type_id, field) in fields.iter() {
+ let sort_field =
+ SortField::new_with_options(field.data_type().clone(), options);
+ let converter = RowConverter::new(vec![sort_field])?;
+
+ let null_array = new_null_array(field.data_type(), 1);
+ let nulls = converter.convert_columns(&[null_array])?;
+ let owned = OwnedRow {
+ data: nulls.buffer.into(),
+ config: nulls.config,
+ };
+
+ type_ids_list.push(type_id);
+ converters.push(converter);
+ null_rows.push(owned);
+ }
+
+ Ok(Self::Union(converters, null_rows, type_ids_list, *mode))
+ }Then update the Codec enum to store the type_ids and adjust all access patterns accordingly throughout the encoder, decoder, etc.
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In arrow-row/src/lib.rs around lines 530 to 558, the Union branch builds
converters and null_rows indexed by iteration order but later code treats that
index as the union type_id; this breaks when union field type_ids are
non-sequential. Replace the Vecs with a mapping keyed by the actual i8 type_id
(e.g., HashMap<i8, (RowConverter, OwnedRow)>) or store a Vec of (type_id,
RowConverter, OwnedRow) and change Codec::Union to hold the type_ids mapping;
then update the encoder/decoder code that currently uses enumerate() to look up
by the real type_id (iterate over the map entries or use the stored type_id when
calling union_array.child(type_id) and when accessing the converter/null_row).
There was a problem hiding this comment.
value:incorrect-but-reasonable; category:bug; feedback:The Augment AI reviewer is not correct. For sparse unions the index could be used to get the variant/field of the union since sparse unions contain all its variants. Dense unions contain only an offset (the variant index) and the value for that variant. https://github.com/apache/arrow-rs/pull/8839/files#diff-8235802124a5a88240195fc4b8792cf39bb9a8703eb69308d10358966eeff765R1607 has the logic responsible for this.
| } | ||
| DataType::Union(fields, mode) => { | ||
| // similar to dictionaries and lists, we set descending to false and negate nulls_first | ||
| // since the encodedc ontents will be inverted if descending is set |
There was a problem hiding this comment.
Fix typo in comment.
Line 532 contains a typo: "encodedc ontents" should be "encoded contents".
- // similar to dictionaries and lists, we set descending to false and negate nulls_first
- // since the encodedc ontents will be inverted if descending is set
+ // similar to dictionaries and lists, we set descending to false and negate nulls_first
+ // since the encoded contents will be inverted if descending is setCommittable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In arrow-row/src/lib.rs around line 532, fix the typo in the inline comment by
replacing "encodedc ontents" with "encoded contents" so the comment reads
correctly (e.g., "since the encoded contents will be inverted if descending is
set").
There was a problem hiding this comment.
value:good-to-have; category:typo; feedback:The CodeRabbit AI reviewer is correct! There is a typo in the comment
| #[test] | ||
| fn test_sparse_union() { | ||
| // create a sparse union with Int32 (type_id = 0) and Utf8 (type_id = 1) | ||
| let int_array = Int32Array::from(vec![Some(1), None, Some(3), None, Some(5)]); | ||
| let str_array = StringArray::from(vec![None, Some("b"), None, Some("d"), None]); | ||
|
|
||
| // [1, "b", 3, "d", 5] | ||
| let type_ids = vec![0, 1, 0, 1, 0].into(); | ||
|
|
||
| let union_fields = [ | ||
| (0, Arc::new(Field::new("int", DataType::Int32, false))), | ||
| (1, Arc::new(Field::new("str", DataType::Utf8, false))), | ||
| ] | ||
| .into_iter() | ||
| .collect(); | ||
|
|
||
| let union_array = UnionArray::try_new( | ||
| union_fields, | ||
| type_ids, | ||
| None, | ||
| vec![Arc::new(int_array) as ArrayRef, Arc::new(str_array)], | ||
| ) | ||
| .unwrap(); | ||
|
|
||
| let union_type = union_array.data_type().clone(); | ||
| let converter = RowConverter::new(vec![SortField::new(union_type)]).unwrap(); | ||
|
|
||
| let rows = converter | ||
| .convert_columns(&[Arc::new(union_array.clone())]) | ||
| .unwrap(); | ||
|
|
||
| // round trip | ||
| let back = converter.convert_rows(&rows).unwrap(); | ||
| let back_union = back[0].as_any().downcast_ref::<UnionArray>().unwrap(); | ||
|
|
||
| assert_eq!(union_array.len(), back_union.len()); | ||
| for i in 0..union_array.len() { | ||
| assert_eq!(union_array.type_id(i), back_union.type_id(i)); | ||
| } | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_dense_union() { | ||
| // create a dense union with Int32 (type_id = 0) and use Utf8 (type_id = 1) | ||
| let int_array = Int32Array::from(vec![1, 3, 5]); | ||
| let str_array = StringArray::from(vec!["a", "b"]); | ||
|
|
||
| let type_ids = vec![0, 1, 0, 1, 0].into(); | ||
|
|
||
| // [1, "a", 3, "b", 5] | ||
| let offsets = vec![0, 0, 1, 1, 2].into(); | ||
|
|
||
| let union_fields = [ | ||
| (0, Arc::new(Field::new("int", DataType::Int32, false))), | ||
| (1, Arc::new(Field::new("str", DataType::Utf8, false))), | ||
| ] | ||
| .into_iter() | ||
| .collect(); | ||
|
|
||
| let union_array = UnionArray::try_new( | ||
| union_fields, | ||
| type_ids, | ||
| Some(offsets), // Dense mode | ||
| vec![Arc::new(int_array) as ArrayRef, Arc::new(str_array)], | ||
| ) | ||
| .unwrap(); | ||
|
|
||
| let union_type = union_array.data_type().clone(); | ||
| let converter = RowConverter::new(vec![SortField::new(union_type)]).unwrap(); | ||
|
|
||
| let rows = converter | ||
| .convert_columns(&[Arc::new(union_array.clone())]) | ||
| .unwrap(); | ||
|
|
||
| // round trip | ||
| let back = converter.convert_rows(&rows).unwrap(); | ||
| let back_union = back[0].as_any().downcast_ref::<UnionArray>().unwrap(); | ||
|
|
||
| assert_eq!(union_array.len(), back_union.len()); | ||
| for i in 0..union_array.len() { | ||
| assert_eq!(union_array.type_id(i), back_union.type_id(i)); | ||
| } | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_union_ordering() { | ||
| let int_array = Int32Array::from(vec![100, 5, 20]); | ||
| let str_array = StringArray::from(vec!["z", "a"]); | ||
|
|
||
| // [100, "z", 5, "a", 20] | ||
| let type_ids = vec![0, 1, 0, 1, 0].into(); | ||
| let offsets = vec![0, 0, 1, 1, 2].into(); | ||
|
|
||
| let union_fields = [ | ||
| (0, Arc::new(Field::new("int", DataType::Int32, false))), | ||
| (1, Arc::new(Field::new("str", DataType::Utf8, false))), | ||
| ] | ||
| .into_iter() | ||
| .collect(); | ||
|
|
||
| let union_array = UnionArray::try_new( | ||
| union_fields, | ||
| type_ids, | ||
| Some(offsets), | ||
| vec![Arc::new(int_array) as ArrayRef, Arc::new(str_array)], | ||
| ) | ||
| .unwrap(); | ||
|
|
||
| let union_type = union_array.data_type().clone(); | ||
| let converter = RowConverter::new(vec![SortField::new(union_type)]).unwrap(); | ||
|
|
||
| let rows = converter.convert_columns(&[Arc::new(union_array)]).unwrap(); | ||
|
|
||
| /* | ||
| expected ordering | ||
|
|
||
| row 2: 5 - type_id 0 | ||
| row 4: 20 - type_id 0 | ||
| row 0: 100 - type id 0 | ||
| row 3: "a" - type id 1 | ||
| row 1: "z" - type id 1 | ||
| */ | ||
|
|
||
| // 5 < "z" | ||
| assert!(rows.row(2) < rows.row(1)); | ||
|
|
||
| // 100 < "a" | ||
| assert!(rows.row(0) < rows.row(3)); | ||
|
|
||
| // among ints | ||
| // 5 < 20 | ||
| assert!(rows.row(2) < rows.row(4)); | ||
| // 20 < 100 | ||
| assert!(rows.row(4) < rows.row(0)); | ||
|
|
||
| // among strigns | ||
| // "a" < "z" | ||
| assert!(rows.row(3) < rows.row(1)); | ||
| } |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Add test coverage for non-sequential union type_ids.
The three union tests use sequential type_ids [0, 1], but Arrow Union types support arbitrary i8 type_ids such as [1, 3, 4] (as shown in the relevant code snippets from union_array.rs).
The current tests won't catch indexing bugs related to non-sequential type_ids. Consider adding a test case like:
#[test]
fn test_union_non_sequential_type_ids() {
let int_array = Int32Array::from(vec![1, 2, 3]);
let float_array = Float64Array::from(vec![4.0, 5.0]);
let str_array = StringArray::from(vec!["a"]);
// Use non-sequential type_ids: 1, 3, 4
let type_ids = vec![1, 3, 1, 4, 3].into();
let offsets = vec![0, 0, 1, 0, 1].into();
let union_fields = [
(1, Arc::new(Field::new("A", DataType::Int32, true))),
(3, Arc::new(Field::new("B", DataType::Float64, true))),
(4, Arc::new(Field::new("C", DataType::Utf8, true))),
]
.into_iter()
.collect();
let union_array = UnionArray::try_new(
union_fields,
type_ids,
Some(offsets),
vec![Arc::new(int_array), Arc::new(float_array), Arc::new(str_array)],
)
.unwrap();
let converter = RowConverter::new(vec![SortField::new(union_array.data_type().clone())]).unwrap();
let rows = converter.convert_columns(&[Arc::new(union_array.clone())]).unwrap();
let back = converter.convert_rows(&rows).unwrap();
// Verify round-trip correctness
assert_eq!(back[0].as_ref(), &union_array as &dyn Array);
}
value:good-to-have; category:typo; feedback:The Claude AI reviewer is correct! There is a typo in the comment |
value:incorrect-but-reasonable; category:bug; feedback:The Augment AI reviewer is not correct. For sparse unions the index could be used to get the variant/field of the union since sparse unions contain all its variants. Dense unions contain only an offset (the variant index) and the value for that variant. https://github.com/apache/arrow-rs/pull/8839/files#diff-8235802124a5a88240195fc4b8792cf39bb9a8703eb69308d10358966eeff765R1607 has the logic responsible for this. |
value:incorrect-but-reasonable; category:bug; feedback:The Augment AI reviewer is not correct. For sparse unions the index could be used to get the variant/field of the union since sparse unions contain all its variants. Dense unions contain only an offset (the variant index) and the value for that variant. https://github.com/apache/arrow-rs/pull/8839/files#diff-8235802124a5a88240195fc4b8792cf39bb9a8703eb69308d10358966eeff765R1607 has the logic responsible for this. |
8839: To review by AI