Skip to content

Commit 87b512c

Browse files
author
Bert Vermeiren
committed
Fix: optimize projections for unnest logical plan
1 parent a9bfca7 commit 87b512c

File tree

5 files changed

+242
-231
lines changed

5 files changed

+242
-231
lines changed

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 7 additions & 192 deletions
Original file line numberDiff line numberDiff line change
@@ -48,16 +48,14 @@ use crate::{
4848
};
4949

5050
use super::dml::InsertOp;
51-
use super::plan::ColumnUnnestList;
5251
use arrow::compute::can_cast_types;
5352
use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
5453
use datafusion_common::display::ToStringifiedPlan;
5554
use datafusion_common::file_options::file_type::FileType;
5655
use datafusion_common::{
57-
exec_err, get_target_functional_dependencies, internal_err, not_impl_err,
58-
plan_datafusion_err, plan_err, Column, Constraints, DFSchema, DFSchemaRef,
59-
DataFusionError, NullEquality, Result, ScalarValue, TableReference, ToDFSchema,
60-
UnnestOptions,
56+
exec_err, get_target_functional_dependencies, not_impl_err, plan_datafusion_err,
57+
plan_err, Column, Constraints, DFSchema, DFSchemaRef, DataFusionError, NullEquality,
58+
Result, ScalarValue, TableReference, ToDFSchema, UnnestOptions,
6159
};
6260
use datafusion_expr_common::type_coercion::binary::type_union_resolution;
6361

@@ -2089,27 +2087,6 @@ pub fn unnest(input: LogicalPlan, columns: Vec<Column>) -> Result<LogicalPlan> {
20892087
unnest_with_options(input, columns, UnnestOptions::default())
20902088
}
20912089

2092-
// Get the data type of a multi-dimensional type after unnesting it
2093-
// with a given depth
2094-
fn get_unnested_list_datatype_recursive(
2095-
data_type: &DataType,
2096-
depth: usize,
2097-
) -> Result<DataType> {
2098-
match data_type {
2099-
DataType::List(field)
2100-
| DataType::FixedSizeList(field, _)
2101-
| DataType::LargeList(field) => {
2102-
if depth == 1 {
2103-
return Ok(field.data_type().clone());
2104-
}
2105-
return get_unnested_list_datatype_recursive(field.data_type(), depth - 1);
2106-
}
2107-
_ => {}
2108-
};
2109-
2110-
internal_err!("trying to unnest on invalid data type {:?}", data_type)
2111-
}
2112-
21132090
pub fn get_struct_unnested_columns(
21142091
col_name: &String,
21152092
inner_fields: &Fields,
@@ -2120,53 +2097,6 @@ pub fn get_struct_unnested_columns(
21202097
.collect()
21212098
}
21222099

2123-
// Based on data type, either struct or a variant of list
2124-
// return a set of columns as the result of unnesting
2125-
// the input columns.
2126-
// For example, given a column with name "a",
2127-
// - List(Element) returns ["a"] with data type Element
2128-
// - Struct(field1, field2) returns ["a.field1","a.field2"]
2129-
// For list data type, an argument depth is used to specify
2130-
// the recursion level
2131-
pub fn get_unnested_columns(
2132-
col_name: &String,
2133-
data_type: &DataType,
2134-
depth: usize,
2135-
) -> Result<Vec<(Column, Arc<Field>)>> {
2136-
let mut qualified_columns = Vec::with_capacity(1);
2137-
2138-
match data_type {
2139-
DataType::List(_) | DataType::FixedSizeList(_, _) | DataType::LargeList(_) => {
2140-
let data_type = get_unnested_list_datatype_recursive(data_type, depth)?;
2141-
let new_field = Arc::new(Field::new(
2142-
col_name, data_type,
2143-
// Unnesting may produce NULLs even if the list is not null.
2144-
// For example: unnest([1], []) -> 1, null
2145-
true,
2146-
));
2147-
let column = Column::from_name(col_name);
2148-
// let column = Column::from((None, &new_field));
2149-
qualified_columns.push((column, new_field));
2150-
}
2151-
DataType::Struct(fields) => {
2152-
qualified_columns.extend(fields.iter().map(|f| {
2153-
let new_name = format!("{}.{}", col_name, f.name());
2154-
let column = Column::from_name(&new_name);
2155-
let new_field = f.as_ref().clone().with_name(new_name);
2156-
// let column = Column::from((None, &f));
2157-
(column, Arc::new(new_field))
2158-
}))
2159-
}
2160-
_ => {
2161-
return internal_err!(
2162-
"trying to unnest on invalid data type {:?}",
2163-
data_type
2164-
);
2165-
}
2166-
};
2167-
Ok(qualified_columns)
2168-
}
2169-
21702100
/// Create a [`LogicalPlan::Unnest`] plan with options
21712101
/// This function receive a list of columns to be unnested
21722102
/// because multiple unnest can be performed on the same column (e.g unnest with different depth)
@@ -2201,126 +2131,11 @@ pub fn unnest_with_options(
22012131
columns_to_unnest: Vec<Column>,
22022132
options: UnnestOptions,
22032133
) -> Result<LogicalPlan> {
2204-
let mut list_columns: Vec<(usize, ColumnUnnestList)> = vec![];
2205-
let mut struct_columns = vec![];
2206-
let indices_to_unnest = columns_to_unnest
2207-
.iter()
2208-
.map(|c| Ok((input.schema().index_of_column(c)?, c)))
2209-
.collect::<Result<HashMap<usize, &Column>>>()?;
2210-
2211-
let input_schema = input.schema();
2212-
2213-
let mut dependency_indices = vec![];
2214-
// Transform input schema into new schema
2215-
// Given this comprehensive example
2216-
//
2217-
// input schema:
2218-
// 1.col1_unnest_placeholder: list[list[int]],
2219-
// 2.col1: list[list[int]]
2220-
// 3.col2: list[int]
2221-
// with unnest on unnest(col1,depth=2), unnest(col1,depth=1) and unnest(col2,depth=1)
2222-
// output schema:
2223-
// 1.unnest_col1_depth_2: int
2224-
// 2.unnest_col1_depth_1: list[int]
2225-
// 3.col1: list[list[int]]
2226-
// 4.unnest_col2_depth_1: int
2227-
// Meaning the placeholder column will be replaced by its unnested variation(s), note
2228-
// the plural.
2229-
let fields = input_schema
2230-
.iter()
2231-
.enumerate()
2232-
.map(|(index, (original_qualifier, original_field))| {
2233-
match indices_to_unnest.get(&index) {
2234-
Some(column_to_unnest) => {
2235-
let recursions_on_column = options
2236-
.recursions
2237-
.iter()
2238-
.filter(|p| -> bool { &p.input_column == *column_to_unnest })
2239-
.collect::<Vec<_>>();
2240-
let mut transformed_columns = recursions_on_column
2241-
.iter()
2242-
.map(|r| {
2243-
list_columns.push((
2244-
index,
2245-
ColumnUnnestList {
2246-
output_column: r.output_column.clone(),
2247-
depth: r.depth,
2248-
},
2249-
));
2250-
Ok(get_unnested_columns(
2251-
&r.output_column.name,
2252-
original_field.data_type(),
2253-
r.depth,
2254-
)?
2255-
.into_iter()
2256-
.next()
2257-
.unwrap()) // because unnesting a list column always result into one result
2258-
})
2259-
.collect::<Result<Vec<(Column, Arc<Field>)>>>()?;
2260-
if transformed_columns.is_empty() {
2261-
transformed_columns = get_unnested_columns(
2262-
&column_to_unnest.name,
2263-
original_field.data_type(),
2264-
1,
2265-
)?;
2266-
match original_field.data_type() {
2267-
DataType::Struct(_) => {
2268-
struct_columns.push(index);
2269-
}
2270-
DataType::List(_)
2271-
| DataType::FixedSizeList(_, _)
2272-
| DataType::LargeList(_) => {
2273-
list_columns.push((
2274-
index,
2275-
ColumnUnnestList {
2276-
output_column: Column::from_name(
2277-
&column_to_unnest.name,
2278-
),
2279-
depth: 1,
2280-
},
2281-
));
2282-
}
2283-
_ => {}
2284-
};
2285-
}
2286-
2287-
// new columns dependent on the same original index
2288-
dependency_indices
2289-
.extend(std::iter::repeat_n(index, transformed_columns.len()));
2290-
Ok(transformed_columns
2291-
.iter()
2292-
.map(|(col, field)| (col.relation.to_owned(), field.to_owned()))
2293-
.collect())
2294-
}
2295-
None => {
2296-
dependency_indices.push(index);
2297-
Ok(vec![(
2298-
original_qualifier.cloned(),
2299-
Arc::clone(original_field),
2300-
)])
2301-
}
2302-
}
2303-
})
2304-
.collect::<Result<Vec<_>>>()?
2305-
.into_iter()
2306-
.flatten()
2307-
.collect::<Vec<_>>();
2308-
2309-
let metadata = input_schema.metadata().clone();
2310-
let df_schema = DFSchema::new_with_metadata(fields, metadata)?;
2311-
// We can use the existing functional dependencies:
2312-
let deps = input_schema.functional_dependencies().clone();
2313-
let schema = Arc::new(df_schema.with_functional_dependencies(deps)?);
2314-
2315-
Ok(LogicalPlan::Unnest(Unnest {
2316-
input: Arc::new(input),
2317-
exec_columns: columns_to_unnest,
2318-
list_type_columns: list_columns,
2319-
struct_type_columns: struct_columns,
2320-
dependency_indices,
2321-
schema,
2134+
Ok(LogicalPlan::Unnest(Unnest::try_new(
2135+
Arc::new(input),
2136+
columns_to_unnest,
23222137
options,
2323-
}))
2138+
)?))
23242139
}
23252140

23262141
#[cfg(test)]

0 commit comments

Comments
 (0)