@@ -21,7 +21,7 @@ use std::ops::Range;
2121
2222use arrow_array:: { Array , ArrayRef } ;
2323use arrow_buffer:: BooleanBuffer ;
24- use arrow_schema:: { ArrowError , SortOptions } ;
24+ use arrow_schema:: { ArrowError , DataType , SortOptions } ;
2525
2626use crate :: cmp:: distinct;
2727use crate :: ord:: make_comparator;
@@ -152,13 +152,30 @@ pub fn partition(columns: &[ArrayRef]) -> Result<Partitions, ArrowError> {
152152 Ok ( Partitions ( Some ( acc) ) )
153153}
154154
155+ /// Returns true if `distinct` (via `compare_op`) can handle this data type.
156+ ///
157+ /// `compare_op` unwraps at most one level of dictionary, then dispatches on
158+ /// the leaf type. Anything else (REE, nested dictionary, nested/complex types)
159+ /// must go through `make_comparator` instead.
160+ fn supports_distinct ( dt : & DataType ) -> bool {
161+ let leaf = match dt {
162+ DataType :: Dictionary ( _, v) => v. as_ref ( ) ,
163+ dt => dt,
164+ } ;
165+ !leaf. is_nested ( )
166+ && !matches ! (
167+ leaf,
168+ DataType :: Dictionary ( _, _) | DataType :: RunEndEncoded ( _, _)
169+ )
170+ }
171+
155172/// Returns a mask with bits set whenever the value or nullability changes
156173fn find_boundaries ( v : & dyn Array ) -> Result < BooleanBuffer , ArrowError > {
157174 let slice_len = v. len ( ) - 1 ;
158175 let v1 = v. slice ( 0 , slice_len) ;
159176 let v2 = v. slice ( 1 , slice_len) ;
160177
161- if ! v. data_type ( ) . is_nested ( ) {
178+ if supports_distinct ( v. data_type ( ) ) {
162179 return Ok ( distinct ( & v1, & v2) ?. values ( ) . clone ( ) ) ;
163180 }
164181 // Given that we're only comparing values, null ordering in the input or
@@ -306,6 +323,88 @@ mod tests {
306323 ) ;
307324 }
308325
326+ #[ test]
327+ fn test_partition_run_end_encoded ( ) {
328+ let run_ends = Int32Array :: from ( vec ! [ 2 , 3 , 5 ] ) ;
329+ let values = StringArray :: from ( vec ! [ "x" , "y" , "x" ] ) ;
330+ let ree = RunArray :: try_new ( & run_ends, & values) . unwrap ( ) ;
331+ // logical: ["x", "x", "y", "x", "x"]
332+ let input = vec ! [ Arc :: new( ree) as _] ;
333+ assert_eq ! ( partition( & input) . unwrap( ) . ranges( ) , vec![ 0 ..2 , 2 ..3 , 3 ..5 ] , ) ;
334+ }
335+
336+ #[ test]
337+ fn test_partition_nested_run_end_encoded ( ) {
338+ // Inner REE (values of the outer): run_ends [1, 2, 3], values ["x", "y", "x"]
339+ // logical length 3: ["x", "y", "x"]
340+ let inner_run_ends = Int32Array :: from ( vec ! [ 1 , 2 , 3 ] ) ;
341+ let inner_values = StringArray :: from ( vec ! [ "x" , "y" , "x" ] ) ;
342+ let inner_ree = RunArray :: try_new ( & inner_run_ends, & inner_values) . unwrap ( ) ;
343+
344+ // Outer REE: run_ends [2, 3, 5], values = inner_ree (length 3)
345+ // logical: rows 0,1 → inner[0]="x", row 2 → inner[1]="y", rows 3,4 → inner[2]="x"
346+ // = ["x", "x", "y", "x", "x"]
347+ let outer_run_ends = Int32Array :: from ( vec ! [ 2 , 3 , 5 ] ) ;
348+ let outer_ree = RunArray :: try_new ( & outer_run_ends, & inner_ree) . unwrap ( ) ;
349+
350+ let input = vec ! [ Arc :: new( outer_ree) as ArrayRef ] ;
351+ assert_eq ! ( partition( & input) . unwrap( ) . ranges( ) , vec![ 0 ..2 , 2 ..3 , 3 ..5 ] ) ;
352+ }
353+
354+ #[ test]
355+ fn test_partition_ree_with_dictionary_values ( ) {
356+ // Dictionary values: keys [0, 1, 0], dict ["x", "y"] → logical ["x", "y", "x"]
357+ let dict_values = StringArray :: from ( vec ! [ "x" , "y" ] ) ;
358+ let keys = Int32Array :: from ( vec ! [ 0 , 1 , 0 ] ) ;
359+ let dict = DictionaryArray :: try_new ( keys, Arc :: new ( dict_values) ) . unwrap ( ) ;
360+
361+ // REE wrapping dict: run_ends [2, 3, 5] → logical [dict[0], dict[0], dict[1], dict[2], dict[2]]
362+ // = ["x", "x", "y", "x", "x"]
363+ let run_ends = Int32Array :: from ( vec ! [ 2 , 3 , 5 ] ) ;
364+ let ree = RunArray :: try_new ( & run_ends, & dict) . unwrap ( ) ;
365+ let input = vec ! [ Arc :: new( ree) as ArrayRef ] ;
366+ assert_eq ! ( partition( & input) . unwrap( ) . ranges( ) , vec![ 0 ..2 , 2 ..3 , 3 ..5 ] , ) ;
367+ }
368+
369+ #[ test]
370+ fn test_partition_dictionary ( ) {
371+ let values = StringArray :: from ( vec ! [ "x" , "y" ] ) ;
372+ let keys = Int32Array :: from ( vec ! [ 0 , 0 , 1 , 0 , 0 ] ) ;
373+ let dict = DictionaryArray :: try_new ( keys, Arc :: new ( values) ) . unwrap ( ) ;
374+ // logical: ["x", "x", "y", "x", "x"]
375+ let input = vec ! [ Arc :: new( dict) as _] ;
376+ assert_eq ! ( partition( & input) . unwrap( ) . ranges( ) , vec![ 0 ..2 , 2 ..3 , 3 ..5 ] , ) ;
377+ }
378+
379+ #[ test]
380+ fn test_partition_nested_dictionary ( ) {
381+ let inner_values = StringArray :: from ( vec ! [ "x" , "y" ] ) ;
382+ let inner_keys = Int32Array :: from ( vec ! [ 0 , 1 , 0 ] ) ;
383+ let inner_dict = DictionaryArray :: try_new ( inner_keys, Arc :: new ( inner_values) ) . unwrap ( ) ;
384+
385+ // Outer dict keys index into inner dict's logical values: ["x", "y", "x"]
386+ // keys [0, 0, 1, 2, 2] → logical ["x", "x", "y", "x", "x"]
387+ let outer_keys = Int32Array :: from ( vec ! [ 0 , 0 , 1 , 2 , 2 ] ) ;
388+ let outer_dict = DictionaryArray :: try_new ( outer_keys, Arc :: new ( inner_dict) ) . unwrap ( ) ;
389+ let input = vec ! [ Arc :: new( outer_dict) as ArrayRef ] ;
390+ assert_eq ! ( partition( & input) . unwrap( ) . ranges( ) , vec![ 0 ..2 , 2 ..3 , 3 ..5 ] , ) ;
391+ }
392+
393+ #[ test]
394+ fn test_partition_dictionary_with_ree_values ( ) {
395+ // REE values: run_ends [2, 3], values ["x", "y"] → logical ["x", "x", "y"]
396+ let run_ends = Int32Array :: from ( vec ! [ 2 , 3 ] ) ;
397+ let str_values = StringArray :: from ( vec ! [ "x" , "y" ] ) ;
398+ let ree = RunArray :: try_new ( & run_ends, & str_values) . unwrap ( ) ;
399+
400+ // Dictionary keys index into the REE's logical values
401+ // keys [0, 0, 2, 0, 0] → logical ["x", "x", "y", "x", "x"]
402+ let keys = Int32Array :: from ( vec ! [ 0 , 0 , 2 , 0 , 0 ] ) ;
403+ let dict = DictionaryArray :: try_new ( keys, Arc :: new ( ree) ) . unwrap ( ) ;
404+ let input = vec ! [ Arc :: new( dict) as ArrayRef ] ;
405+ assert_eq ! ( partition( & input) . unwrap( ) . ranges( ) , vec![ 0 ..2 , 2 ..3 , 3 ..5 ] , ) ;
406+ }
407+
309408 #[ test]
310409 fn test_partition_nested ( ) {
311410 let input = vec ! [
0 commit comments