From 2ee7a426130a5d7030e2ecf1d832a7571971413a Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 7 Jan 2026 17:20:36 -0600 Subject: [PATCH 01/11] start on eval --- r/sedonadb/src/rust/src/expression.rs | 46 +++++++++++++++++++++++++-- 1 file changed, 44 insertions(+), 2 deletions(-) diff --git a/r/sedonadb/src/rust/src/expression.rs b/r/sedonadb/src/rust/src/expression.rs index 0add4b535..0568e8d12 100644 --- a/r/sedonadb/src/rust/src/expression.rs +++ b/r/sedonadb/src/rust/src/expression.rs @@ -17,10 +17,13 @@ use std::sync::Arc; -use datafusion_common::{Column, ScalarValue}; +use arrow_array::{ArrayRef, RecordBatch, RecordBatchReader}; +use arrow_schema::{FieldRef, Schema}; +use datafusion::physical_plan::PhysicalExpr; +use datafusion_common::{Column, DFSchema, ScalarValue}; use datafusion_expr::{ expr::{AggregateFunction, FieldMetadata, NullTreatment, ScalarFunction}, - BinaryExpr, Cast, Expr, Operator, + BinaryExpr, Cast, ColumnarValue, Expr, Operator, }; use savvy::{savvy, savvy_err, EnvironmentSexp}; use sedona::context::SedonaContext; @@ -172,6 +175,45 @@ impl SedonaDBExprFactory { Err(savvy_err!("Aggregate UDF '{name}' not found")) } } + + fn evaluate( + &self, + exprs_sexp: savvy::Sexp, + stream_in: savvy::Sexp, + stream_out: savvy::Sexp, + ) -> savvy::Result { + let exprs = Self::exprs(exprs_sexp)?; + let reader_in = crate::ffi::import_array_stream(stream_in)?; + + let physical_exprs = exprs + .into_iter() + .map(|e| { + self.ctx.ctx.create_physical_expr( + e, + &DFSchema::try_from(reader_in.schema().as_ref().clone())?, + ) + }) + .collect::>>>()?; + + let out_fields = physical_exprs + .iter() + .map(|e| e.return_field(&reader_in.schema())) + .collect::>>()?; + let out_schema = Arc::new(Schema::new(out_fields)); + + let mut out_batches = Vec::new(); + for batch in reader_in { + let batch = batch?; + let columns = physical_exprs + .iter() + .map(|e| e.evaluate(&batch)) + .collect::>>()?; + let out_batch = RecordBatch::try_new(out_schema.clone(), ColumnarValue::values_to_arrays(&columns)?)?; + out_batches.push(out_batch); + } + + todo!() + } } impl SedonaDBExprFactory { From 46cb16b586871285ec04a322759ff057c0572bc7 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 7 Jan 2026 20:26:38 -0600 Subject: [PATCH 02/11] first go at eval --- r/sedonadb/R/000-wrappers.R | 13 ++++++++++++ r/sedonadb/src/init.c | 11 ++++++++++ r/sedonadb/src/rust/api.h | 4 ++++ r/sedonadb/src/rust/src/expression.rs | 30 ++++++++++++++++++++++----- 4 files changed, 53 insertions(+), 5 deletions(-) diff --git a/r/sedonadb/R/000-wrappers.R b/r/sedonadb/R/000-wrappers.R index 6cd4654f2..859a72629 100644 --- a/r/sedonadb/R/000-wrappers.R +++ b/r/sedonadb/R/000-wrappers.R @@ -448,6 +448,18 @@ class(`SedonaDBExpr`) <- c("sedonadb::SedonaDBExpr__bundle", "savvy_sedonadb__se } } +`SedonaDBExprFactory_evaluate_scalar` <- function(self) { + function(`exprs_sexp`, `stream_in`, `stream_out`) { + .Call( + savvy_SedonaDBExprFactory_evaluate_scalar__impl, + `self`, + `exprs_sexp`, + `stream_in`, + `stream_out` + ) + } +} + `SedonaDBExprFactory_scalar_function` <- function(self) { function(`name`, `args`) { .savvy_wrap_SedonaDBExpr(.Call( @@ -465,6 +477,7 @@ class(`SedonaDBExpr`) <- c("sedonadb::SedonaDBExpr__bundle", "savvy_sedonadb__se e$`aggregate_function` <- `SedonaDBExprFactory_aggregate_function`(ptr) e$`binary` <- `SedonaDBExprFactory_binary`(ptr) e$`column` <- `SedonaDBExprFactory_column`(ptr) + e$`evaluate_scalar` <- `SedonaDBExprFactory_evaluate_scalar`(ptr) e$`scalar_function` <- `SedonaDBExprFactory_scalar_function`(ptr) class(e) <- c( diff --git a/r/sedonadb/src/init.c b/r/sedonadb/src/init.c index 0e9efae4b..60556fa4e 100644 --- a/r/sedonadb/src/init.c +++ b/r/sedonadb/src/init.c @@ -261,6 +261,15 @@ SEXP savvy_SedonaDBExprFactory_column__impl(SEXP self__, SEXP c_arg__name, return handle_result(res); } +SEXP savvy_SedonaDBExprFactory_evaluate_scalar__impl(SEXP self__, + SEXP c_arg__exprs_sexp, + SEXP c_arg__stream_in, + SEXP c_arg__stream_out) { + SEXP res = savvy_SedonaDBExprFactory_evaluate_scalar__ffi( + self__, c_arg__exprs_sexp, c_arg__stream_in, c_arg__stream_out); + return handle_result(res); +} + SEXP savvy_SedonaDBExprFactory_literal__impl(SEXP c_arg__array_xptr, SEXP c_arg__schema_xptr) { SEXP res = savvy_SedonaDBExprFactory_literal__ffi(c_arg__array_xptr, @@ -346,6 +355,8 @@ static const R_CallMethodDef CallEntries[] = { (DL_FUNC)&savvy_SedonaDBExprFactory_binary__impl, 4}, {"savvy_SedonaDBExprFactory_column__impl", (DL_FUNC)&savvy_SedonaDBExprFactory_column__impl, 3}, + {"savvy_SedonaDBExprFactory_evaluate_scalar__impl", + (DL_FUNC)&savvy_SedonaDBExprFactory_evaluate_scalar__impl, 4}, {"savvy_SedonaDBExprFactory_literal__impl", (DL_FUNC)&savvy_SedonaDBExprFactory_literal__impl, 2}, {"savvy_SedonaDBExprFactory_new__impl", diff --git a/r/sedonadb/src/rust/api.h b/r/sedonadb/src/rust/api.h index fac6258bd..876d7c264 100644 --- a/r/sedonadb/src/rust/api.h +++ b/r/sedonadb/src/rust/api.h @@ -78,6 +78,10 @@ SEXP savvy_SedonaDBExprFactory_binary__ffi(SEXP self__, SEXP c_arg__op, SEXP c_arg__lhs, SEXP c_arg__rhs); SEXP savvy_SedonaDBExprFactory_column__ffi(SEXP self__, SEXP c_arg__name, SEXP c_arg__qualifier); +SEXP savvy_SedonaDBExprFactory_evaluate_scalar__ffi(SEXP self__, + SEXP c_arg__exprs_sexp, + SEXP c_arg__stream_in, + SEXP c_arg__stream_out); SEXP savvy_SedonaDBExprFactory_literal__ffi(SEXP c_arg__array_xptr, SEXP c_arg__schema_xptr); SEXP savvy_SedonaDBExprFactory_new__ffi(SEXP c_arg__ctx); diff --git a/r/sedonadb/src/rust/src/expression.rs b/r/sedonadb/src/rust/src/expression.rs index 0568e8d12..75143d406 100644 --- a/r/sedonadb/src/rust/src/expression.rs +++ b/r/sedonadb/src/rust/src/expression.rs @@ -15,9 +15,11 @@ // specific language governing permissions and limitations // under the License. -use std::sync::Arc; +use std::{ptr::swap_nonoverlapping, sync::Arc}; -use arrow_array::{ArrayRef, RecordBatch, RecordBatchReader}; +use arrow_array::{ + ffi_stream::FFI_ArrowArrayStream, RecordBatch, RecordBatchIterator, RecordBatchReader, +}; use arrow_schema::{FieldRef, Schema}; use datafusion::physical_plan::PhysicalExpr; use datafusion_common::{Column, DFSchema, ScalarValue}; @@ -176,12 +178,17 @@ impl SedonaDBExprFactory { } } - fn evaluate( + fn evaluate_scalar( &self, exprs_sexp: savvy::Sexp, stream_in: savvy::Sexp, stream_out: savvy::Sexp, ) -> savvy::Result { + let out_void = unsafe { savvy_ffi::R_ExternalPtrAddr(stream_out.0) }; + if out_void.is_null() { + return Err(savvy_err!("external pointer to null in evaluate()")); + } + let exprs = Self::exprs(exprs_sexp)?; let reader_in = crate::ffi::import_array_stream(stream_in)?; @@ -202,17 +209,30 @@ impl SedonaDBExprFactory { let out_schema = Arc::new(Schema::new(out_fields)); let mut out_batches = Vec::new(); + let mut size = 0; for batch in reader_in { let batch = batch?; + size += batch.num_rows(); let columns = physical_exprs .iter() .map(|e| e.evaluate(&batch)) .collect::>>()?; - let out_batch = RecordBatch::try_new(out_schema.clone(), ColumnarValue::values_to_arrays(&columns)?)?; + let out_batch = RecordBatch::try_new( + out_schema.clone(), + ColumnarValue::values_to_arrays(&columns)?, + )?; out_batches.push(out_batch); } - todo!() + let reader = Box::new(RecordBatchIterator::new( + out_batches.into_iter().map(Ok), + out_schema, + )); + let mut ffi_stream = FFI_ArrowArrayStream::new(reader); + let ffi_out = out_void as *mut FFI_ArrowArrayStream; + unsafe { swap_nonoverlapping(&mut ffi_stream, ffi_out, 1) }; + + savvy::Sexp::try_from(size as f64) } } From 6c2a0c4ed6ae83c14225f30bc0fad7078d4dea87 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 7 Jan 2026 21:36:35 -0600 Subject: [PATCH 03/11] first pass at raw evaluation --- r/sedonadb/R/expression.R | 22 ++++++++++++++++++++++ r/sedonadb/src/rust/src/expression.rs | 11 +++++++++-- 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/r/sedonadb/R/expression.R b/r/sedonadb/R/expression.R index cca754a2c..902c151db 100644 --- a/r/sedonadb/R/expression.R +++ b/r/sedonadb/R/expression.R @@ -158,6 +158,28 @@ sd_eval_expr <- function(expr, expr_ctx = sd_expr_ctx(env = env), env = parent.f ) } +sd_eval <- function(stream, exprs, env = parent.frame()) { + stream <- nanoarrow::as_nanoarrow_array_stream( + stream, + geometry_schema = geoarrow::geoarrow_wkb() + ) + expr_ctx <- sd_expr_ctx(stream$get_schema(), env) + sd_exprs <- lapply(exprs, sd_eval_expr, expr_ctx = expr_ctx, env = env) + exprs_names <- names(exprs) + if (!is.null(exprs_names)) { + for (i in seq_along(sd_exprs)) { + name <- exprs_names[i] + if (!is.na(name) && name != "") { + sd_exprs[[i]] <- sd_expr_alias(sd_exprs[[i]], name, expr_ctx$factory) + } + } + } + + stream_out <- nanoarrow::nanoarrow_allocate_array_stream() + expr_ctx$factory$evaluate_scalar(sd_exprs, stream, stream_out) + stream_out +} + sd_eval_expr_inner <- function(expr, expr_ctx) { if (rlang::is_call(expr)) { # Extract `pkg::fun` or `fun` if this is a usual call (e.g., not diff --git a/r/sedonadb/src/rust/src/expression.rs b/r/sedonadb/src/rust/src/expression.rs index 75143d406..04aa1017c 100644 --- a/r/sedonadb/src/rust/src/expression.rs +++ b/r/sedonadb/src/rust/src/expression.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::{ptr::swap_nonoverlapping, sync::Arc}; +use std::{iter::zip, ptr::swap_nonoverlapping, sync::Arc}; use arrow_array::{ ffi_stream::FFI_ArrowArrayStream, RecordBatch, RecordBatchIterator, RecordBatchReader, @@ -190,6 +190,10 @@ impl SedonaDBExprFactory { } let exprs = Self::exprs(exprs_sexp)?; + let expr_names = exprs + .iter() + .map(|e| e.schema_name().to_string()) + .collect::>(); let reader_in = crate::ffi::import_array_stream(stream_in)?; let physical_exprs = exprs @@ -206,7 +210,10 @@ impl SedonaDBExprFactory { .iter() .map(|e| e.return_field(&reader_in.schema())) .collect::>>()?; - let out_schema = Arc::new(Schema::new(out_fields)); + let out_fields_named = zip(out_fields, expr_names) + .map(|(f, name)| f.as_ref().clone().with_name(name)) + .collect::>(); + let out_schema = Arc::new(Schema::new(out_fields_named)); let mut out_batches = Vec::new(); let mut size = 0; From f33705c48ab15a4c8b78e7cc3abc13aff5619721 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 7 Jan 2026 23:15:20 -0600 Subject: [PATCH 04/11] mostly working transmuter --- r/sedonadb/R/000-wrappers.R | 13 +++ r/sedonadb/R/dataframe.R | 22 ++++ r/sedonadb/src/init.c | 9 ++ r/sedonadb/src/rust/api.h | 2 + r/sedonadb/src/rust/src/dataframe.rs | 14 +++ r/sedonadb/src/rust/src/expression.rs | 138 +++++++++++++++++++++++++- 6 files changed, 195 insertions(+), 3 deletions(-) diff --git a/r/sedonadb/R/000-wrappers.R b/r/sedonadb/R/000-wrappers.R index 859a72629..26c1f376d 100644 --- a/r/sedonadb/R/000-wrappers.R +++ b/r/sedonadb/R/000-wrappers.R @@ -310,6 +310,18 @@ class(`InternalContext`) <- c( } } +`InternalDataFrame_transmute` <- function(self) { + function(`ctx`, `exprs_sexp`) { + `ctx` <- .savvy_extract_ptr(`ctx`, "sedonadb::InternalContext") + .savvy_wrap_InternalDataFrame(.Call( + savvy_InternalDataFrame_transmute__impl, + `self`, + `ctx`, + `exprs_sexp` + )) + } +} + `.savvy_wrap_InternalDataFrame` <- function(ptr) { e <- new.env(parent = emptyenv()) e$.ptr <- ptr @@ -327,6 +339,7 @@ class(`InternalContext`) <- c( e$`to_parquet` <- `InternalDataFrame_to_parquet`(ptr) e$`to_provider` <- `InternalDataFrame_to_provider`(ptr) e$`to_view` <- `InternalDataFrame_to_view`(ptr) + e$`transmute` <- `InternalDataFrame_transmute`(ptr) class(e) <- c( "sedonadb::InternalDataFrame", diff --git a/r/sedonadb/R/dataframe.R b/r/sedonadb/R/dataframe.R index fefc3a3d2..8a11c51a3 100644 --- a/r/sedonadb/R/dataframe.R +++ b/r/sedonadb/R/dataframe.R @@ -193,6 +193,28 @@ sd_preview <- function(.data, n = NULL, ascii = NULL, width = NULL) { invisible(.data) } +sd_transmute <- function(.data, ...) { + .data <- as_sedonadb_dataframe(.data) + expr_quos <- rlang::enquos(...) + env <- parent.frame() + + expr_ctx <- sd_expr_ctx(infer_nanoarrow_schema(.data), env) + exprs <- lapply(expr_quos, rlang::quo_get_expr) + sd_exprs <- lapply(exprs, sd_eval_expr, expr_ctx = expr_ctx, env = env) + exprs_names <- names(exprs) + if (!is.null(exprs_names)) { + for (i in seq_along(sd_exprs)) { + name <- exprs_names[i] + if (!is.na(name) && name != "") { + sd_exprs[[i]] <- sd_expr_alias(sd_exprs[[i]], name, expr_ctx$factory) + } + } + } + + df <- .data$df$transmute(.data$ctx, sd_exprs) + new_sedonadb_dataframe(.data$ctx, df) +} + #' Write DataFrame to (Geo)Parquet files #' #' Write this DataFrame to one or more (Geo)Parquet files. For input that contains diff --git a/r/sedonadb/src/init.c b/r/sedonadb/src/init.c index 60556fa4e..0da04e295 100644 --- a/r/sedonadb/src/init.c +++ b/r/sedonadb/src/init.c @@ -212,6 +212,13 @@ SEXP savvy_InternalDataFrame_to_view__impl(SEXP self__, SEXP c_arg__ctx, return handle_result(res); } +SEXP savvy_InternalDataFrame_transmute__impl(SEXP self__, SEXP c_arg__ctx, + SEXP c_arg__exprs_sexp) { + SEXP res = savvy_InternalDataFrame_transmute__ffi(self__, c_arg__ctx, + c_arg__exprs_sexp); + return handle_result(res); +} + SEXP savvy_SedonaDBExpr_alias__impl(SEXP self__, SEXP c_arg__name) { SEXP res = savvy_SedonaDBExpr_alias__ffi(self__, c_arg__name); return handle_result(res); @@ -339,6 +346,8 @@ static const R_CallMethodDef CallEntries[] = { (DL_FUNC)&savvy_InternalDataFrame_to_provider__impl, 1}, {"savvy_InternalDataFrame_to_view__impl", (DL_FUNC)&savvy_InternalDataFrame_to_view__impl, 4}, + {"savvy_InternalDataFrame_transmute__impl", + (DL_FUNC)&savvy_InternalDataFrame_transmute__impl, 3}, {"savvy_SedonaDBExpr_alias__impl", (DL_FUNC)&savvy_SedonaDBExpr_alias__impl, 2}, {"savvy_SedonaDBExpr_cast__impl", (DL_FUNC)&savvy_SedonaDBExpr_cast__impl, diff --git a/r/sedonadb/src/rust/api.h b/r/sedonadb/src/rust/api.h index 876d7c264..54559d13a 100644 --- a/r/sedonadb/src/rust/api.h +++ b/r/sedonadb/src/rust/api.h @@ -60,6 +60,8 @@ SEXP savvy_InternalDataFrame_to_provider__ffi(SEXP self__); SEXP savvy_InternalDataFrame_to_view__ffi(SEXP self__, SEXP c_arg__ctx, SEXP c_arg__table_ref, SEXP c_arg__overwrite); +SEXP savvy_InternalDataFrame_transmute__ffi(SEXP self__, SEXP c_arg__ctx, + SEXP c_arg__exprs_sexp); // methods and associated functions for SedonaDBExpr SEXP savvy_SedonaDBExpr_alias__ffi(SEXP self__, SEXP c_arg__name); diff --git a/r/sedonadb/src/rust/src/dataframe.rs b/r/sedonadb/src/rust/src/dataframe.rs index e34cee82d..4df20c3f8 100644 --- a/r/sedonadb/src/rust/src/dataframe.rs +++ b/r/sedonadb/src/rust/src/dataframe.rs @@ -33,6 +33,7 @@ use std::{iter::zip, ptr::swap_nonoverlapping, sync::Arc}; use tokio::runtime::Runtime; use crate::context::InternalContext; +use crate::expression::SedonaDBExprFactory; use crate::ffi::{import_schema, FFITableProviderR}; use crate::runtime::wait_for_future_captured_r; @@ -311,4 +312,17 @@ impl InternalDataFrame { let inner = self.inner.clone().select(exprs)?; Ok(new_data_frame(inner, self.runtime.clone())) } + + fn transmute( + &self, + ctx: &InternalContext, + exprs_sexp: savvy::Sexp, + ) -> savvy::Result { + let exprs = SedonaDBExprFactory::exprs(exprs_sexp)?; + + let plan = + SedonaDBExprFactory::select(self.inner.clone().into_unoptimized_plan(), exprs, vec![])?; + let inner = DataFrame::new(ctx.inner.ctx.state(), plan); + Ok(new_data_frame(inner, self.runtime.clone())) + } } diff --git a/r/sedonadb/src/rust/src/expression.rs b/r/sedonadb/src/rust/src/expression.rs index 04aa1017c..e193d3e14 100644 --- a/r/sedonadb/src/rust/src/expression.rs +++ b/r/sedonadb/src/rust/src/expression.rs @@ -22,10 +22,15 @@ use arrow_array::{ }; use arrow_schema::{FieldRef, Schema}; use datafusion::physical_plan::PhysicalExpr; -use datafusion_common::{Column, DFSchema, ScalarValue}; +use datafusion_common::{ + tree_node::{Transformed, TreeNode}, + Column, DFSchema, Result, ScalarValue, +}; use datafusion_expr::{ expr::{AggregateFunction, FieldMetadata, NullTreatment, ScalarFunction}, - BinaryExpr, Cast, ColumnarValue, Expr, Operator, + utils::{expr_as_column_expr, find_aggregate_exprs, find_window_exprs}, + BinaryExpr, Cast, ColumnarValue, Expr, LogicalPlan, LogicalPlanBuilder, + LogicalPlanBuilderOptions, Operator, }; use savvy::{savvy, savvy_err, EnvironmentSexp}; use sedona::context::SedonaContext; @@ -244,7 +249,7 @@ impl SedonaDBExprFactory { } impl SedonaDBExprFactory { - fn exprs(exprs_sexp: savvy::Sexp) -> savvy::Result> { + pub fn exprs(exprs_sexp: savvy::Sexp) -> savvy::Result> { savvy::ListSexp::try_from(exprs_sexp)? .iter() .map(|(_, item)| -> savvy::Result { @@ -254,6 +259,123 @@ impl SedonaDBExprFactory { }) .collect() } + + pub fn select( + base_plan: LogicalPlan, + exprs: Vec, + group_by_exprs: Vec, + ) -> datafusion_common::Result { + // Translated from DataFusion's SQL SELECT -> LogicalPlan constructor + // https://github.com/apache/datafusion/blob/102caeb2261c5ae006c201546cf74769d80ceff8/datafusion/sql/src/select.rs#L890-L1098 + + // First, find aggregates in SELECT + let aggr_exprs = find_aggregate_exprs(&exprs); + + // Process group by, aggregation or having + let AggregatePlanResult { + plan, + select_exprs: select_exprs_post_aggr, + } = if !aggr_exprs.is_empty() { + // We have aggregates, create aggregate plan + Self::aggregate( + &base_plan, + &exprs, + group_by_exprs, // empty group by + &aggr_exprs, + )? + } else { + // No aggregation needed + AggregatePlanResult { + plan: base_plan, + select_exprs: exprs.clone(), + } + }; + + // All of the window expressions + let window_func_exprs = find_window_exprs(&select_exprs_post_aggr); + + // Process window functions after aggregation + let plan = if window_func_exprs.is_empty() { + plan + } else { + let plan = LogicalPlanBuilder::window_plan(plan, window_func_exprs.clone())?; + + // Re-write the projection + let select_exprs_post_aggr = select_exprs_post_aggr + .iter() + .map(|expr| Self::rebase_expr(expr, &window_func_exprs, &plan)) + .collect::>>()?; + + // Final projection + LogicalPlanBuilder::from(plan) + .project(select_exprs_post_aggr)? + .build()? + }; + + // Final projection if no windows + if window_func_exprs.is_empty() { + LogicalPlanBuilder::from(plan) + .project(select_exprs_post_aggr)? + .build() + } else { + Ok(plan) + } + } + + /// Helper function to rebase expressions to reference columns from the plan. + /// Simplified version of datafusion-sql's rebase_expr (which is pub(crate)). + fn rebase_expr(expr: &Expr, base_exprs: &[Expr], plan: &LogicalPlan) -> Result { + let result = expr.clone().transform_down(|nested_expr| { + if base_exprs.contains(&nested_expr) { + Ok(Transformed::yes(expr_as_column_expr(&nested_expr, plan)?)) + } else { + Ok(Transformed::no(nested_expr)) + } + })?; + Ok(result.data) + } + + /// Create an aggregate plan from the given input, group by, and aggregate expressions. + /// Based on DataFusion's aggregate() method. + /// https://github.com/apache/datafusion/blob/102caeb2261c5ae006c201546cf74769d80ceff8/datafusion/sql/src/select.rs#L652-L764 + fn aggregate( + input: &LogicalPlan, + select_exprs: &[Expr], + group_by_exprs: Vec, + aggr_exprs: &[Expr], + ) -> Result { + // Create the aggregate plan + let options = LogicalPlanBuilderOptions::new().with_add_implicit_group_by_exprs(true); + let plan = LogicalPlanBuilder::from(input.clone()) + .with_options(options) + .aggregate(group_by_exprs, aggr_exprs.to_vec())? + .build()?; + + // Get the group_by_exprs from the constructed plan + let group_by_exprs = if let LogicalPlan::Aggregate(agg) = &plan { + &agg.group_expr + } else { + unreachable!(); + }; + + // Combine the original grouping and aggregate expressions into one list + let mut aggr_projection_exprs = vec![]; + for expr in group_by_exprs { + aggr_projection_exprs.push(expr.clone()); + } + aggr_projection_exprs.extend_from_slice(aggr_exprs); + + // Re-write the projection + let select_exprs_post_aggr = select_exprs + .iter() + .map(|expr| Self::rebase_expr(expr, &aggr_projection_exprs, input)) + .collect::>>()?; + + Ok(AggregatePlanResult { + plan, + select_exprs: select_exprs_post_aggr, + }) + } } impl TryFrom for &SedonaDBExpr { @@ -266,3 +388,13 @@ impl TryFrom for &SedonaDBExpr { .ok_or(savvy_err!("Invalid SedonaDBExpr object.")) } } + +/// Result of the `aggregate` function, containing the aggregate plan and +/// rewritten expressions that reference the aggregate output columns. +/// https://github.com/apache/datafusion/blob/102caeb2261c5ae006c201546cf74769d80ceff8/datafusion/sql/src/select.rs#L55-L68 +struct AggregatePlanResult { + /// The aggregate logical plan + plan: LogicalPlan, + /// SELECT expressions rewritten to reference aggregate output columns + select_exprs: Vec, +} From 74d7f01b8c85467fb00f1c35fdc9305a0f90280d Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 7 Jan 2026 23:43:46 -0600 Subject: [PATCH 05/11] better --- r/sedonadb/src/rust/src/expression.rs | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/r/sedonadb/src/rust/src/expression.rs b/r/sedonadb/src/rust/src/expression.rs index e193d3e14..d983debf1 100644 --- a/r/sedonadb/src/rust/src/expression.rs +++ b/r/sedonadb/src/rust/src/expression.rs @@ -335,6 +335,26 @@ impl SedonaDBExprFactory { Ok(result.data) } + /// Helper function to resolve column references to fully qualified columns. + /// Simplified version of datafusion-sql's resolve_columns (which is pub(crate)). + fn resolve_columns(expr: &Expr, plan: &LogicalPlan) -> Result { + let result = expr.clone().transform_up(|nested_expr| { + match nested_expr { + Expr::Column(col) => { + let (qualifier, field) = plan.schema().qualified_field_from_column(&col)?; + Ok(Transformed::yes(Expr::Column(Column::from(( + qualifier, field, + ))))) + } + _ => { + // keep recursing + Ok(Transformed::no(nested_expr)) + } + } + })?; + Ok(result.data) + } + /// Create an aggregate plan from the given input, group by, and aggregate expressions. /// Based on DataFusion's aggregate() method. /// https://github.com/apache/datafusion/blob/102caeb2261c5ae006c201546cf74769d80ceff8/datafusion/sql/src/select.rs#L652-L764 @@ -365,6 +385,12 @@ impl SedonaDBExprFactory { } aggr_projection_exprs.extend_from_slice(aggr_exprs); + // Now attempt to resolve columns and replace with fully-qualified columns + let aggr_projection_exprs = aggr_projection_exprs + .iter() + .map(|expr| Self::resolve_columns(expr, input)) + .collect::>>()?; + // Re-write the projection let select_exprs_post_aggr = select_exprs .iter() From cd2dfbd4d92f7d6f7068d4c43e16b51c6e5b16a1 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 7 Jan 2026 23:59:13 -0600 Subject: [PATCH 06/11] works! --- r/sedonadb/src/rust/src/expression.rs | 124 ++++++++++++++++++++------ 1 file changed, 98 insertions(+), 26 deletions(-) diff --git a/r/sedonadb/src/rust/src/expression.rs b/r/sedonadb/src/rust/src/expression.rs index d983debf1..d5000de9c 100644 --- a/r/sedonadb/src/rust/src/expression.rs +++ b/r/sedonadb/src/rust/src/expression.rs @@ -27,10 +27,10 @@ use datafusion_common::{ Column, DFSchema, Result, ScalarValue, }; use datafusion_expr::{ - expr::{AggregateFunction, FieldMetadata, NullTreatment, ScalarFunction}, - utils::{expr_as_column_expr, find_aggregate_exprs, find_window_exprs}, + expr::{AggregateFunction, FieldMetadata, NullTreatment, ScalarFunction, WindowFunction}, + utils::{expr_as_column_expr, find_aggregate_exprs, find_column_exprs, find_window_exprs}, BinaryExpr, Cast, ColumnarValue, Expr, LogicalPlan, LogicalPlanBuilder, - LogicalPlanBuilderOptions, Operator, + LogicalPlanBuilderOptions, Operator, WindowFunctionDefinition, }; use savvy::{savvy, savvy_err, EnvironmentSexp}; use sedona::context::SedonaContext; @@ -271,33 +271,57 @@ impl SedonaDBExprFactory { // First, find aggregates in SELECT let aggr_exprs = find_aggregate_exprs(&exprs); - // Process group by, aggregation or having - let AggregatePlanResult { - plan, - select_exprs: select_exprs_post_aggr, - } = if !aggr_exprs.is_empty() { - // We have aggregates, create aggregate plan - Self::aggregate( - &base_plan, - &exprs, - group_by_exprs, // empty group by - &aggr_exprs, - )? + // Determine if we should use aggregation or window functions + // If we have an explicit GROUP BY or can infer one, use aggregation + // Otherwise, treat aggregates as window functions + let use_aggregation = if !group_by_exprs.is_empty() { + true + } else if !aggr_exprs.is_empty() { + // Try to infer GROUP BY from columns outside aggregates + let all_columns = find_column_exprs(&exprs); + let agg_columns = find_column_exprs(&aggr_exprs); + let non_agg_columns: Vec<_> = all_columns + .into_iter() + .filter(|col| !agg_columns.contains(col)) + .collect(); + !non_agg_columns.is_empty() } else { - // No aggregation needed - AggregatePlanResult { - plan: base_plan, - select_exprs: exprs.clone(), - } + false }; - // All of the window expressions + // Process aggregation if appropriate + let (plan, select_exprs_post_aggr) = if use_aggregation && !aggr_exprs.is_empty() { + // We have aggregates with a valid GROUP BY, create aggregate plan + let result = Self::aggregate(&base_plan, &exprs, group_by_exprs, &aggr_exprs)?; + (result.plan, result.select_exprs) + } else if !aggr_exprs.is_empty() { + // We have aggregates but no valid GROUP BY - convert to window functions + // First resolve column references to be fully qualified + let exprs_resolved: Vec = exprs + .iter() + .map(|expr| Self::resolve_columns(expr, &base_plan)) + .collect::>>()?; + + let exprs_with_windows = Self::aggregates_to_window_functions(&exprs_resolved)?; + (base_plan, exprs_with_windows) + } else { + // No aggregation + (base_plan, exprs.clone()) + }; + + // All of the window expressions (includes aggregates converted to windows) let window_func_exprs = find_window_exprs(&select_exprs_post_aggr); // Process window functions after aggregation let plan = if window_func_exprs.is_empty() { plan } else { + // Resolve columns in window expressions to be fully qualified + let window_func_exprs: Vec = window_func_exprs + .iter() + .map(|expr| Self::resolve_columns(expr, &plan)) + .collect::>>()?; + let plan = LogicalPlanBuilder::window_plan(plan, window_func_exprs.clone())?; // Re-write the projection @@ -355,6 +379,29 @@ impl SedonaDBExprFactory { Ok(result.data) } + /// Convert aggregate functions to window functions with empty OVER clause + fn aggregates_to_window_functions(exprs: &[Expr]) -> Result> { + exprs + .iter() + .map(|expr| { + expr.clone().transform_up(|nested_expr| { + match nested_expr { + Expr::AggregateFunction(agg) => { + // Convert to window function with empty OVER () + let window_func = Expr::WindowFunction(Box::new(WindowFunction::new( + WindowFunctionDefinition::AggregateUDF(agg.func.clone()), + agg.params.args, + ))); + Ok(Transformed::yes(window_func)) + } + _ => Ok(Transformed::no(nested_expr)), + } + }) + .map(|t| t.data) + }) + .collect() + } + /// Create an aggregate plan from the given input, group by, and aggregate expressions. /// Based on DataFusion's aggregate() method. /// https://github.com/apache/datafusion/blob/102caeb2261c5ae006c201546cf74769d80ceff8/datafusion/sql/src/select.rs#L652-L764 @@ -364,6 +411,24 @@ impl SedonaDBExprFactory { group_by_exprs: Vec, aggr_exprs: &[Expr], ) -> Result { + // If group_by_exprs is empty, we need to extract column references from + // select_exprs that are NOT inside aggregate functions + let group_by_exprs = if group_by_exprs.is_empty() { + // Find all columns referenced in select expressions + let all_columns = find_column_exprs(select_exprs); + + // Find columns that are inside aggregate expressions + let agg_columns = find_column_exprs(aggr_exprs); + + // Keep only columns that are NOT inside aggregates + all_columns + .into_iter() + .filter(|col| !agg_columns.contains(col)) + .collect::>() + } else { + group_by_exprs + }; + // Create the aggregate plan let options = LogicalPlanBuilderOptions::new().with_add_implicit_group_by_exprs(true); let plan = LogicalPlanBuilder::from(input.clone()) @@ -371,9 +436,10 @@ impl SedonaDBExprFactory { .aggregate(group_by_exprs, aggr_exprs.to_vec())? .build()?; - // Get the group_by_exprs from the constructed plan - let group_by_exprs = if let LogicalPlan::Aggregate(agg) = &plan { - &agg.group_expr + // Get the group_by_exprs and aggr_exprs from the constructed plan + // (they may have been modified by implicit group by logic) + let (group_by_exprs, aggr_exprs_from_plan) = if let LogicalPlan::Aggregate(agg) = &plan { + (&agg.group_expr, &agg.aggr_expr) } else { unreachable!(); }; @@ -383,7 +449,7 @@ impl SedonaDBExprFactory { for expr in group_by_exprs { aggr_projection_exprs.push(expr.clone()); } - aggr_projection_exprs.extend_from_slice(aggr_exprs); + aggr_projection_exprs.extend_from_slice(aggr_exprs_from_plan); // Now attempt to resolve columns and replace with fully-qualified columns let aggr_projection_exprs = aggr_projection_exprs @@ -391,8 +457,14 @@ impl SedonaDBExprFactory { .map(|expr| Self::resolve_columns(expr, input)) .collect::>>()?; + // Resolve columns in select expressions too, so qualifiers match when rebasing + let select_exprs_resolved = select_exprs + .iter() + .map(|expr| Self::resolve_columns(expr, input)) + .collect::>>()?; + // Re-write the projection - let select_exprs_post_aggr = select_exprs + let select_exprs_post_aggr = select_exprs_resolved .iter() .map(|expr| Self::rebase_expr(expr, &aggr_projection_exprs, input)) .collect::>>()?; From c72fb811780b5df7c3825e87adc036efa9ed2954 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 7 Jan 2026 23:59:38 -0600 Subject: [PATCH 07/11] format --- r/sedonadb/src/rust/src/expression.rs | 28 ++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/r/sedonadb/src/rust/src/expression.rs b/r/sedonadb/src/rust/src/expression.rs index d5000de9c..23f086def 100644 --- a/r/sedonadb/src/rust/src/expression.rs +++ b/r/sedonadb/src/rust/src/expression.rs @@ -384,20 +384,22 @@ impl SedonaDBExprFactory { exprs .iter() .map(|expr| { - expr.clone().transform_up(|nested_expr| { - match nested_expr { - Expr::AggregateFunction(agg) => { - // Convert to window function with empty OVER () - let window_func = Expr::WindowFunction(Box::new(WindowFunction::new( - WindowFunctionDefinition::AggregateUDF(agg.func.clone()), - agg.params.args, - ))); - Ok(Transformed::yes(window_func)) + expr.clone() + .transform_up(|nested_expr| { + match nested_expr { + Expr::AggregateFunction(agg) => { + // Convert to window function with empty OVER () + let window_func = + Expr::WindowFunction(Box::new(WindowFunction::new( + WindowFunctionDefinition::AggregateUDF(agg.func.clone()), + agg.params.args, + ))); + Ok(Transformed::yes(window_func)) + } + _ => Ok(Transformed::no(nested_expr)), } - _ => Ok(Transformed::no(nested_expr)), - } - }) - .map(|t| t.data) + }) + .map(|t| t.data) }) .collect() } From d4c61ae584baa1a9dabb096d7ab97bff702cda05 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Thu, 22 Jan 2026 21:51:12 -0600 Subject: [PATCH 08/11] simplify implementation --- r/sedonadb/R/000-wrappers.R | 13 -- r/sedonadb/R/dataframe.R | 22 -- r/sedonadb/R/expression.R | 12 +- r/sedonadb/src/init.c | 9 - r/sedonadb/src/rust/api.h | 2 - r/sedonadb/src/rust/src/dataframe.rs | 14 -- r/sedonadb/src/rust/src/expression.rs | 238 +------------------- r/sedonadb/tests/testthat/test-expression.R | 21 ++ 8 files changed, 35 insertions(+), 296 deletions(-) diff --git a/r/sedonadb/R/000-wrappers.R b/r/sedonadb/R/000-wrappers.R index 26c1f376d..859a72629 100644 --- a/r/sedonadb/R/000-wrappers.R +++ b/r/sedonadb/R/000-wrappers.R @@ -310,18 +310,6 @@ class(`InternalContext`) <- c( } } -`InternalDataFrame_transmute` <- function(self) { - function(`ctx`, `exprs_sexp`) { - `ctx` <- .savvy_extract_ptr(`ctx`, "sedonadb::InternalContext") - .savvy_wrap_InternalDataFrame(.Call( - savvy_InternalDataFrame_transmute__impl, - `self`, - `ctx`, - `exprs_sexp` - )) - } -} - `.savvy_wrap_InternalDataFrame` <- function(ptr) { e <- new.env(parent = emptyenv()) e$.ptr <- ptr @@ -339,7 +327,6 @@ class(`InternalContext`) <- c( e$`to_parquet` <- `InternalDataFrame_to_parquet`(ptr) e$`to_provider` <- `InternalDataFrame_to_provider`(ptr) e$`to_view` <- `InternalDataFrame_to_view`(ptr) - e$`transmute` <- `InternalDataFrame_transmute`(ptr) class(e) <- c( "sedonadb::InternalDataFrame", diff --git a/r/sedonadb/R/dataframe.R b/r/sedonadb/R/dataframe.R index 8a11c51a3..fefc3a3d2 100644 --- a/r/sedonadb/R/dataframe.R +++ b/r/sedonadb/R/dataframe.R @@ -193,28 +193,6 @@ sd_preview <- function(.data, n = NULL, ascii = NULL, width = NULL) { invisible(.data) } -sd_transmute <- function(.data, ...) { - .data <- as_sedonadb_dataframe(.data) - expr_quos <- rlang::enquos(...) - env <- parent.frame() - - expr_ctx <- sd_expr_ctx(infer_nanoarrow_schema(.data), env) - exprs <- lapply(expr_quos, rlang::quo_get_expr) - sd_exprs <- lapply(exprs, sd_eval_expr, expr_ctx = expr_ctx, env = env) - exprs_names <- names(exprs) - if (!is.null(exprs_names)) { - for (i in seq_along(sd_exprs)) { - name <- exprs_names[i] - if (!is.na(name) && name != "") { - sd_exprs[[i]] <- sd_expr_alias(sd_exprs[[i]], name, expr_ctx$factory) - } - } - } - - df <- .data$df$transmute(.data$ctx, sd_exprs) - new_sedonadb_dataframe(.data$ctx, df) -} - #' Write DataFrame to (Geo)Parquet files #' #' Write this DataFrame to one or more (Geo)Parquet files. For input that contains diff --git a/r/sedonadb/R/expression.R b/r/sedonadb/R/expression.R index 902c151db..7e89fd5fa 100644 --- a/r/sedonadb/R/expression.R +++ b/r/sedonadb/R/expression.R @@ -138,6 +138,7 @@ print.SedonaDBExpr <- function(x, ...) { #' #' @param expr An R expression (e.g., the result of `quote()`). #' @param expr_ctx An `sd_expr_ctx()` +#' @param env An evaluation environment. Defaults to the calling environment. #' #' @returns A `SedonaDBExpr` #' @noRd @@ -158,7 +159,16 @@ sd_eval_expr <- function(expr, expr_ctx = sd_expr_ctx(env = env), env = parent.f ) } -sd_eval <- function(stream, exprs, env = parent.frame()) { +#' Evaluate a list of R expressions into a stream of RecordBatch +#' +#' @param stream Input stream, or an object (such as a `data.frame()`) +#' that can be coerced to one. +#' @param exprs An list of R expressions (e.g., the result of `quote()`). +#' @param env An evaluation environment. Defaults to the calling environment. +#' +#' @returns A `SedonaDBExpr` +#' @noRd +sd_eval_stream <- function(stream, exprs, env = parent.frame()) { stream <- nanoarrow::as_nanoarrow_array_stream( stream, geometry_schema = geoarrow::geoarrow_wkb() diff --git a/r/sedonadb/src/init.c b/r/sedonadb/src/init.c index 0da04e295..60556fa4e 100644 --- a/r/sedonadb/src/init.c +++ b/r/sedonadb/src/init.c @@ -212,13 +212,6 @@ SEXP savvy_InternalDataFrame_to_view__impl(SEXP self__, SEXP c_arg__ctx, return handle_result(res); } -SEXP savvy_InternalDataFrame_transmute__impl(SEXP self__, SEXP c_arg__ctx, - SEXP c_arg__exprs_sexp) { - SEXP res = savvy_InternalDataFrame_transmute__ffi(self__, c_arg__ctx, - c_arg__exprs_sexp); - return handle_result(res); -} - SEXP savvy_SedonaDBExpr_alias__impl(SEXP self__, SEXP c_arg__name) { SEXP res = savvy_SedonaDBExpr_alias__ffi(self__, c_arg__name); return handle_result(res); @@ -346,8 +339,6 @@ static const R_CallMethodDef CallEntries[] = { (DL_FUNC)&savvy_InternalDataFrame_to_provider__impl, 1}, {"savvy_InternalDataFrame_to_view__impl", (DL_FUNC)&savvy_InternalDataFrame_to_view__impl, 4}, - {"savvy_InternalDataFrame_transmute__impl", - (DL_FUNC)&savvy_InternalDataFrame_transmute__impl, 3}, {"savvy_SedonaDBExpr_alias__impl", (DL_FUNC)&savvy_SedonaDBExpr_alias__impl, 2}, {"savvy_SedonaDBExpr_cast__impl", (DL_FUNC)&savvy_SedonaDBExpr_cast__impl, diff --git a/r/sedonadb/src/rust/api.h b/r/sedonadb/src/rust/api.h index 54559d13a..876d7c264 100644 --- a/r/sedonadb/src/rust/api.h +++ b/r/sedonadb/src/rust/api.h @@ -60,8 +60,6 @@ SEXP savvy_InternalDataFrame_to_provider__ffi(SEXP self__); SEXP savvy_InternalDataFrame_to_view__ffi(SEXP self__, SEXP c_arg__ctx, SEXP c_arg__table_ref, SEXP c_arg__overwrite); -SEXP savvy_InternalDataFrame_transmute__ffi(SEXP self__, SEXP c_arg__ctx, - SEXP c_arg__exprs_sexp); // methods and associated functions for SedonaDBExpr SEXP savvy_SedonaDBExpr_alias__ffi(SEXP self__, SEXP c_arg__name); diff --git a/r/sedonadb/src/rust/src/dataframe.rs b/r/sedonadb/src/rust/src/dataframe.rs index 4df20c3f8..e34cee82d 100644 --- a/r/sedonadb/src/rust/src/dataframe.rs +++ b/r/sedonadb/src/rust/src/dataframe.rs @@ -33,7 +33,6 @@ use std::{iter::zip, ptr::swap_nonoverlapping, sync::Arc}; use tokio::runtime::Runtime; use crate::context::InternalContext; -use crate::expression::SedonaDBExprFactory; use crate::ffi::{import_schema, FFITableProviderR}; use crate::runtime::wait_for_future_captured_r; @@ -312,17 +311,4 @@ impl InternalDataFrame { let inner = self.inner.clone().select(exprs)?; Ok(new_data_frame(inner, self.runtime.clone())) } - - fn transmute( - &self, - ctx: &InternalContext, - exprs_sexp: savvy::Sexp, - ) -> savvy::Result { - let exprs = SedonaDBExprFactory::exprs(exprs_sexp)?; - - let plan = - SedonaDBExprFactory::select(self.inner.clone().into_unoptimized_plan(), exprs, vec![])?; - let inner = DataFrame::new(ctx.inner.ctx.state(), plan); - Ok(new_data_frame(inner, self.runtime.clone())) - } } diff --git a/r/sedonadb/src/rust/src/expression.rs b/r/sedonadb/src/rust/src/expression.rs index 23f086def..29a631e0e 100644 --- a/r/sedonadb/src/rust/src/expression.rs +++ b/r/sedonadb/src/rust/src/expression.rs @@ -22,15 +22,10 @@ use arrow_array::{ }; use arrow_schema::{FieldRef, Schema}; use datafusion::physical_plan::PhysicalExpr; -use datafusion_common::{ - tree_node::{Transformed, TreeNode}, - Column, DFSchema, Result, ScalarValue, -}; +use datafusion_common::{Column, DFSchema, Result, ScalarValue}; use datafusion_expr::{ - expr::{AggregateFunction, FieldMetadata, NullTreatment, ScalarFunction, WindowFunction}, - utils::{expr_as_column_expr, find_aggregate_exprs, find_column_exprs, find_window_exprs}, - BinaryExpr, Cast, ColumnarValue, Expr, LogicalPlan, LogicalPlanBuilder, - LogicalPlanBuilderOptions, Operator, WindowFunctionDefinition, + expr::{AggregateFunction, FieldMetadata, NullTreatment, ScalarFunction}, + BinaryExpr, Cast, ColumnarValue, Expr, Operator, }; use savvy::{savvy, savvy_err, EnvironmentSexp}; use sedona::context::SedonaContext; @@ -259,223 +254,6 @@ impl SedonaDBExprFactory { }) .collect() } - - pub fn select( - base_plan: LogicalPlan, - exprs: Vec, - group_by_exprs: Vec, - ) -> datafusion_common::Result { - // Translated from DataFusion's SQL SELECT -> LogicalPlan constructor - // https://github.com/apache/datafusion/blob/102caeb2261c5ae006c201546cf74769d80ceff8/datafusion/sql/src/select.rs#L890-L1098 - - // First, find aggregates in SELECT - let aggr_exprs = find_aggregate_exprs(&exprs); - - // Determine if we should use aggregation or window functions - // If we have an explicit GROUP BY or can infer one, use aggregation - // Otherwise, treat aggregates as window functions - let use_aggregation = if !group_by_exprs.is_empty() { - true - } else if !aggr_exprs.is_empty() { - // Try to infer GROUP BY from columns outside aggregates - let all_columns = find_column_exprs(&exprs); - let agg_columns = find_column_exprs(&aggr_exprs); - let non_agg_columns: Vec<_> = all_columns - .into_iter() - .filter(|col| !agg_columns.contains(col)) - .collect(); - !non_agg_columns.is_empty() - } else { - false - }; - - // Process aggregation if appropriate - let (plan, select_exprs_post_aggr) = if use_aggregation && !aggr_exprs.is_empty() { - // We have aggregates with a valid GROUP BY, create aggregate plan - let result = Self::aggregate(&base_plan, &exprs, group_by_exprs, &aggr_exprs)?; - (result.plan, result.select_exprs) - } else if !aggr_exprs.is_empty() { - // We have aggregates but no valid GROUP BY - convert to window functions - // First resolve column references to be fully qualified - let exprs_resolved: Vec = exprs - .iter() - .map(|expr| Self::resolve_columns(expr, &base_plan)) - .collect::>>()?; - - let exprs_with_windows = Self::aggregates_to_window_functions(&exprs_resolved)?; - (base_plan, exprs_with_windows) - } else { - // No aggregation - (base_plan, exprs.clone()) - }; - - // All of the window expressions (includes aggregates converted to windows) - let window_func_exprs = find_window_exprs(&select_exprs_post_aggr); - - // Process window functions after aggregation - let plan = if window_func_exprs.is_empty() { - plan - } else { - // Resolve columns in window expressions to be fully qualified - let window_func_exprs: Vec = window_func_exprs - .iter() - .map(|expr| Self::resolve_columns(expr, &plan)) - .collect::>>()?; - - let plan = LogicalPlanBuilder::window_plan(plan, window_func_exprs.clone())?; - - // Re-write the projection - let select_exprs_post_aggr = select_exprs_post_aggr - .iter() - .map(|expr| Self::rebase_expr(expr, &window_func_exprs, &plan)) - .collect::>>()?; - - // Final projection - LogicalPlanBuilder::from(plan) - .project(select_exprs_post_aggr)? - .build()? - }; - - // Final projection if no windows - if window_func_exprs.is_empty() { - LogicalPlanBuilder::from(plan) - .project(select_exprs_post_aggr)? - .build() - } else { - Ok(plan) - } - } - - /// Helper function to rebase expressions to reference columns from the plan. - /// Simplified version of datafusion-sql's rebase_expr (which is pub(crate)). - fn rebase_expr(expr: &Expr, base_exprs: &[Expr], plan: &LogicalPlan) -> Result { - let result = expr.clone().transform_down(|nested_expr| { - if base_exprs.contains(&nested_expr) { - Ok(Transformed::yes(expr_as_column_expr(&nested_expr, plan)?)) - } else { - Ok(Transformed::no(nested_expr)) - } - })?; - Ok(result.data) - } - - /// Helper function to resolve column references to fully qualified columns. - /// Simplified version of datafusion-sql's resolve_columns (which is pub(crate)). - fn resolve_columns(expr: &Expr, plan: &LogicalPlan) -> Result { - let result = expr.clone().transform_up(|nested_expr| { - match nested_expr { - Expr::Column(col) => { - let (qualifier, field) = plan.schema().qualified_field_from_column(&col)?; - Ok(Transformed::yes(Expr::Column(Column::from(( - qualifier, field, - ))))) - } - _ => { - // keep recursing - Ok(Transformed::no(nested_expr)) - } - } - })?; - Ok(result.data) - } - - /// Convert aggregate functions to window functions with empty OVER clause - fn aggregates_to_window_functions(exprs: &[Expr]) -> Result> { - exprs - .iter() - .map(|expr| { - expr.clone() - .transform_up(|nested_expr| { - match nested_expr { - Expr::AggregateFunction(agg) => { - // Convert to window function with empty OVER () - let window_func = - Expr::WindowFunction(Box::new(WindowFunction::new( - WindowFunctionDefinition::AggregateUDF(agg.func.clone()), - agg.params.args, - ))); - Ok(Transformed::yes(window_func)) - } - _ => Ok(Transformed::no(nested_expr)), - } - }) - .map(|t| t.data) - }) - .collect() - } - - /// Create an aggregate plan from the given input, group by, and aggregate expressions. - /// Based on DataFusion's aggregate() method. - /// https://github.com/apache/datafusion/blob/102caeb2261c5ae006c201546cf74769d80ceff8/datafusion/sql/src/select.rs#L652-L764 - fn aggregate( - input: &LogicalPlan, - select_exprs: &[Expr], - group_by_exprs: Vec, - aggr_exprs: &[Expr], - ) -> Result { - // If group_by_exprs is empty, we need to extract column references from - // select_exprs that are NOT inside aggregate functions - let group_by_exprs = if group_by_exprs.is_empty() { - // Find all columns referenced in select expressions - let all_columns = find_column_exprs(select_exprs); - - // Find columns that are inside aggregate expressions - let agg_columns = find_column_exprs(aggr_exprs); - - // Keep only columns that are NOT inside aggregates - all_columns - .into_iter() - .filter(|col| !agg_columns.contains(col)) - .collect::>() - } else { - group_by_exprs - }; - - // Create the aggregate plan - let options = LogicalPlanBuilderOptions::new().with_add_implicit_group_by_exprs(true); - let plan = LogicalPlanBuilder::from(input.clone()) - .with_options(options) - .aggregate(group_by_exprs, aggr_exprs.to_vec())? - .build()?; - - // Get the group_by_exprs and aggr_exprs from the constructed plan - // (they may have been modified by implicit group by logic) - let (group_by_exprs, aggr_exprs_from_plan) = if let LogicalPlan::Aggregate(agg) = &plan { - (&agg.group_expr, &agg.aggr_expr) - } else { - unreachable!(); - }; - - // Combine the original grouping and aggregate expressions into one list - let mut aggr_projection_exprs = vec![]; - for expr in group_by_exprs { - aggr_projection_exprs.push(expr.clone()); - } - aggr_projection_exprs.extend_from_slice(aggr_exprs_from_plan); - - // Now attempt to resolve columns and replace with fully-qualified columns - let aggr_projection_exprs = aggr_projection_exprs - .iter() - .map(|expr| Self::resolve_columns(expr, input)) - .collect::>>()?; - - // Resolve columns in select expressions too, so qualifiers match when rebasing - let select_exprs_resolved = select_exprs - .iter() - .map(|expr| Self::resolve_columns(expr, input)) - .collect::>>()?; - - // Re-write the projection - let select_exprs_post_aggr = select_exprs_resolved - .iter() - .map(|expr| Self::rebase_expr(expr, &aggr_projection_exprs, input)) - .collect::>>()?; - - Ok(AggregatePlanResult { - plan, - select_exprs: select_exprs_post_aggr, - }) - } } impl TryFrom for &SedonaDBExpr { @@ -488,13 +266,3 @@ impl TryFrom for &SedonaDBExpr { .ok_or(savvy_err!("Invalid SedonaDBExpr object.")) } } - -/// Result of the `aggregate` function, containing the aggregate plan and -/// rewritten expressions that reference the aggregate output columns. -/// https://github.com/apache/datafusion/blob/102caeb2261c5ae006c201546cf74769d80ceff8/datafusion/sql/src/select.rs#L55-L68 -struct AggregatePlanResult { - /// The aggregate logical plan - plan: LogicalPlan, - /// SELECT expressions rewritten to reference aggregate output columns - select_exprs: Vec, -} diff --git a/r/sedonadb/tests/testthat/test-expression.R b/r/sedonadb/tests/testthat/test-expression.R index f0f3d5af2..c3b59ccb6 100644 --- a/r/sedonadb/tests/testthat/test-expression.R +++ b/r/sedonadb/tests/testthat/test-expression.R @@ -78,3 +78,24 @@ test_that("errors that occur during evaluation have reasonable context", { function_without_a_translation <- function(x) x + 1L expect_snapshot(sd_eval_expr(quote(stop("this will error"))), error = TRUE) }) + +test_that("sd_eval_stream() evaluates scalar expressions synchronously", { + df_in <- data.frame(x = 1:10) + + # With no name provided, the DataFusion-generated name + df_out <- as.data.frame(sd_eval_stream(df_in, list(quote(x + 1L)))) + expect_identical(names(df_out), "x + Int32(1)") + + # With a name provided, we should get the column name + df_out <- as.data.frame(sd_eval_stream(df_in, list(y = quote(x + 1L)))) + expect_identical(df_out, data.frame(y = 2:11)) + + # Multiple in batches should yield multiple out batches + stream_in <- nanoarrow::basic_array_stream(list( + data.frame(x = 1:5), + data.frame(x = 6:10) + )) + stream_out <- sd_eval_stream(stream_in, list(y = quote(x + 1L))) + expect_identical(as.data.frame(stream_out$get_next()), data.frame(y = 2:6)) + expect_identical(as.data.frame(stream_out$get_next()), data.frame(y = 7:11)) +}) From dcd2cad23387d319ecdc42f554ebbcd7147d7cb7 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Thu, 22 Jan 2026 23:20:45 -0600 Subject: [PATCH 09/11] simplify impl --- r/sedonadb/NAMESPACE | 3 + r/sedonadb/R/000-wrappers.R | 35 +++++++---- r/sedonadb/R/dataframe.R | 86 +++++++++++++++++++++++++++ r/sedonadb/R/expression.R | 5 ++ r/sedonadb/R/pkg-dplyr.R | 1 + r/sedonadb/man/sd_filter.Rd | 26 ++++++++ r/sedonadb/man/sd_select.Rd | 23 +++++++ r/sedonadb/man/sd_transmute.Rd | 26 ++++++++ r/sedonadb/src/init.c | 25 ++++---- r/sedonadb/src/rust/api.h | 6 +- r/sedonadb/src/rust/src/dataframe.rs | 19 ++++++ r/sedonadb/src/rust/src/expression.rs | 75 +---------------------- 12 files changed, 230 insertions(+), 100 deletions(-) create mode 100644 r/sedonadb/man/sd_filter.Rd create mode 100644 r/sedonadb/man/sd_select.Rd create mode 100644 r/sedonadb/man/sd_transmute.Rd diff --git a/r/sedonadb/NAMESPACE b/r/sedonadb/NAMESPACE index 11a141b96..a7f631bf0 100644 --- a/r/sedonadb/NAMESPACE +++ b/r/sedonadb/NAMESPACE @@ -45,11 +45,14 @@ export(sd_expr_factory) export(sd_expr_literal) export(sd_expr_negative) export(sd_expr_scalar_function) +export(sd_filter) export(sd_preview) export(sd_read_parquet) export(sd_register_udf) +export(sd_select) export(sd_sql) export(sd_to_view) +export(sd_transmute) export(sd_view) export(sd_write_parquet) export(sedonadb_adbc) diff --git a/r/sedonadb/R/000-wrappers.R b/r/sedonadb/R/000-wrappers.R index 859a72629..40bb92637 100644 --- a/r/sedonadb/R/000-wrappers.R +++ b/r/sedonadb/R/000-wrappers.R @@ -212,6 +212,16 @@ class(`InternalContext`) <- c( } } +`InternalDataFrame_filter` <- function(self) { + function(`exprs_sexp`) { + .savvy_wrap_InternalDataFrame(.Call( + savvy_InternalDataFrame_filter__impl, + `self`, + `exprs_sexp` + )) + } +} + `InternalDataFrame_limit` <- function(self) { function(`n`) { .savvy_wrap_InternalDataFrame(.Call(savvy_InternalDataFrame_limit__impl, `self`, `n`)) @@ -224,6 +234,16 @@ class(`InternalContext`) <- c( } } +`InternalDataFrame_select` <- function(self) { + function(`exprs_sexp`) { + .savvy_wrap_InternalDataFrame(.Call( + savvy_InternalDataFrame_select__impl, + `self`, + `exprs_sexp` + )) + } +} + `InternalDataFrame_select_indices` <- function(self) { function(`names`, `indices`) { .savvy_wrap_InternalDataFrame(.Call( @@ -316,10 +336,12 @@ class(`InternalContext`) <- c( e$`collect` <- `InternalDataFrame_collect`(ptr) e$`compute` <- `InternalDataFrame_compute`(ptr) e$`count` <- `InternalDataFrame_count`(ptr) + e$`filter` <- `InternalDataFrame_filter`(ptr) e$`limit` <- `InternalDataFrame_limit`(ptr) e$`primary_geometry_column_index` <- `InternalDataFrame_primary_geometry_column_index`( ptr ) + e$`select` <- `InternalDataFrame_select`(ptr) e$`select_indices` <- `InternalDataFrame_select_indices`(ptr) e$`show` <- `InternalDataFrame_show`(ptr) e$`to_arrow_schema` <- `InternalDataFrame_to_arrow_schema`(ptr) @@ -448,18 +470,6 @@ class(`SedonaDBExpr`) <- c("sedonadb::SedonaDBExpr__bundle", "savvy_sedonadb__se } } -`SedonaDBExprFactory_evaluate_scalar` <- function(self) { - function(`exprs_sexp`, `stream_in`, `stream_out`) { - .Call( - savvy_SedonaDBExprFactory_evaluate_scalar__impl, - `self`, - `exprs_sexp`, - `stream_in`, - `stream_out` - ) - } -} - `SedonaDBExprFactory_scalar_function` <- function(self) { function(`name`, `args`) { .savvy_wrap_SedonaDBExpr(.Call( @@ -477,7 +487,6 @@ class(`SedonaDBExpr`) <- c("sedonadb::SedonaDBExpr__bundle", "savvy_sedonadb__se e$`aggregate_function` <- `SedonaDBExprFactory_aggregate_function`(ptr) e$`binary` <- `SedonaDBExprFactory_binary`(ptr) e$`column` <- `SedonaDBExprFactory_column`(ptr) - e$`evaluate_scalar` <- `SedonaDBExprFactory_evaluate_scalar`(ptr) e$`scalar_function` <- `SedonaDBExprFactory_scalar_function`(ptr) class(e) <- c( diff --git a/r/sedonadb/R/dataframe.R b/r/sedonadb/R/dataframe.R index fefc3a3d2..556749de8 100644 --- a/r/sedonadb/R/dataframe.R +++ b/r/sedonadb/R/dataframe.R @@ -193,6 +193,92 @@ sd_preview <- function(.data, n = NULL, ascii = NULL, width = NULL) { invisible(.data) } +#' Keep or drop columns of a SedonaDB DataFrame +#' +#' @inheritParams sd_count +#' @param ... One or more bare names. Evaluated like [dplyr::select()]. +#' +#' @returns An object of class sedonadb_dataframe +#' @export +#' +#' @examples +#' data.frame(x = 1:10, y = letters[1:10]) |> sd_select(x) +#' +sd_select <- function(.data, ...) { + .data <- as_sedonadb_dataframe(.data) + schema <- nanoarrow::infer_nanoarrow_schema(.data) + ptype <- nanoarrow::infer_nanoarrow_ptype(schema) + loc <- tidyselect::eval_select(rlang::expr(c(...)), data = ptype) + + df <- .data$df$select_indices(names(loc), loc - 1L) + new_sedonadb_dataframe(.data$ctx, df) +} + +#' Create, modify, and delete columns of a SedonaDB DataFrame +#' +#' @inheritParams sd_count +#' @param ... Named expressions for new columns to create. These are evaluated +#' in the same way as [dplyr::transmute()] except does not support extra +#' dplyr features such as `across()` or `.by`. +#' +#' @returns An object of class sedonadb_dataframe +#' @export +#' +#' @examples +#' data.frame(x = 1:10) |> +#' sd_transmute(y = x + 1L) +#' +sd_transmute <- function(.data, ...) { + .data <- as_sedonadb_dataframe(.data) + expr_quos <- rlang::enquos(...) + env <- parent.frame() + + expr_ctx <- sd_expr_ctx(infer_nanoarrow_schema(.data), env) + r_exprs <- expr_quos |> rlang::quos_auto_name() |> lapply(rlang::quo_get_expr) + sd_exprs <- lapply(r_exprs, sd_eval_expr, expr_ctx = expr_ctx, env = env) + + # Ensure inputs are given aliases to account for the expected column name + exprs_names <- names(r_exprs) + for (i in seq_along(sd_exprs)) { + name <- exprs_names[i] + if (!is.na(name) && name != "") { + sd_exprs[[i]] <- sd_expr_alias(sd_exprs[[i]], name, expr_ctx$factory) + } + } + + df <- .data$df$select(sd_exprs) + new_sedonadb_dataframe(.data$ctx, df) +} + +#' Keep rows of a SedonaDB DataFrame that match a condition +#' +#' @inheritParams sd_count +#' @param ... Unnamed expressions for filter conditions. These are evaluated +#' in the same way as [dplyr::filter()] except does not support extra +#' dplyr features such as `across()` or `.by`. +#' +#' @returns An object of class sedonadb_dataframe +#' @export +#' +#' @examples +#' data.frame(x = 1:10) |> +#' sd_filter(x > 5) +#' +sd_filter <- function(.data, ...) { + .data <- as_sedonadb_dataframe(.data) + rlang::check_dots_unnamed() + + expr_quos <- rlang::enquos(...) + env <- parent.frame() + + expr_ctx <- sd_expr_ctx(infer_nanoarrow_schema(.data), env) + r_exprs <- expr_quos |> lapply(rlang::quo_get_expr) + sd_exprs <- lapply(r_exprs, sd_eval_expr, expr_ctx = expr_ctx, env = env) + + df <- .data$df$filter(sd_exprs) + new_sedonadb_dataframe(.data$ctx, df) +} + #' Write DataFrame to (Geo)Parquet files #' #' Write this DataFrame to one or more (Geo)Parquet files. For input that contains diff --git a/r/sedonadb/R/expression.R b/r/sedonadb/R/expression.R index 7e89fd5fa..c88f60c59 100644 --- a/r/sedonadb/R/expression.R +++ b/r/sedonadb/R/expression.R @@ -161,6 +161,11 @@ sd_eval_expr <- function(expr, expr_ctx = sd_expr_ctx(env = env), env = parent.f #' Evaluate a list of R expressions into a stream of RecordBatch #' +#' Internally this is creating a DataFusion PhysicalExpr and evaluating +#' it sequentially on each batch. This is primarily a tool for testing +#' the result of expressions but also may be useful for exposing scalar +#' functions for synchronous use. +#' #' @param stream Input stream, or an object (such as a `data.frame()`) #' that can be coerced to one. #' @param exprs An list of R expressions (e.g., the result of `quote()`). diff --git a/r/sedonadb/R/pkg-dplyr.R b/r/sedonadb/R/pkg-dplyr.R index e916202db..4fbeebf22 100644 --- a/r/sedonadb/R/pkg-dplyr.R +++ b/r/sedonadb/R/pkg-dplyr.R @@ -29,4 +29,5 @@ select.sedonadb_dataframe <- function(.data, ...) { df <- .data$df$select_indices(names(loc), loc - 1L) new_sedonadb_dataframe(.data$ctx, df) } + # nolint end diff --git a/r/sedonadb/man/sd_filter.Rd b/r/sedonadb/man/sd_filter.Rd new file mode 100644 index 000000000..1a327ce3c --- /dev/null +++ b/r/sedonadb/man/sd_filter.Rd @@ -0,0 +1,26 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/dataframe.R +\name{sd_filter} +\alias{sd_filter} +\title{Keep rows of a SedonaDB DataFrame that match a condition} +\usage{ +sd_filter(.data, ...) +} +\arguments{ +\item{.data}{A sedonadb_dataframe} + +\item{...}{Unnamed expressions for filter conditions. These are evaluated +in the same way as \code{\link[dplyr:filter]{dplyr::filter()}} except does not support extra +dplyr features such as \code{across()} or \code{.by}.} +} +\value{ +An object of class sedonadb_dataframe +} +\description{ +Keep rows of a SedonaDB DataFrame that match a condition +} +\examples{ +data.frame(x = 1:10) |> + sd_filter(x > 5) + +} diff --git a/r/sedonadb/man/sd_select.Rd b/r/sedonadb/man/sd_select.Rd new file mode 100644 index 000000000..215236aa6 --- /dev/null +++ b/r/sedonadb/man/sd_select.Rd @@ -0,0 +1,23 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/dataframe.R +\name{sd_select} +\alias{sd_select} +\title{Keep or drop columns of a SedonaDB DataFrame} +\usage{ +sd_select(.data, ...) +} +\arguments{ +\item{.data}{A sedonadb_dataframe} + +\item{...}{One or more bare names. Evaluated like \code{\link[dplyr:select]{dplyr::select()}}.} +} +\value{ +An object of class sedonadb_dataframe +} +\description{ +Keep or drop columns of a SedonaDB DataFrame +} +\examples{ +data.frame(x = 1:10, y = letters[1:10]) |> sd_select(x) + +} diff --git a/r/sedonadb/man/sd_transmute.Rd b/r/sedonadb/man/sd_transmute.Rd new file mode 100644 index 000000000..4e226b7a7 --- /dev/null +++ b/r/sedonadb/man/sd_transmute.Rd @@ -0,0 +1,26 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/dataframe.R +\name{sd_transmute} +\alias{sd_transmute} +\title{Create, modify, and delete columns of a SedonaDB DataFrame} +\usage{ +sd_transmute(.data, ...) +} +\arguments{ +\item{.data}{A sedonadb_dataframe} + +\item{...}{Named expressions for new columns to create. These are evaluated +in the same way as \code{\link[dplyr:transmute]{dplyr::transmute()}} except does not support extra +dplyr features such as \code{across()} or \code{.by}.} +} +\value{ +An object of class sedonadb_dataframe +} +\description{ +Create, modify, and delete columns of a SedonaDB DataFrame +} +\examples{ +data.frame(x = 1:10) |> + sd_transmute(y = x + 1L) + +} diff --git a/r/sedonadb/src/init.c b/r/sedonadb/src/init.c index 60556fa4e..48cc32900 100644 --- a/r/sedonadb/src/init.c +++ b/r/sedonadb/src/init.c @@ -149,6 +149,11 @@ SEXP savvy_InternalDataFrame_count__impl(SEXP self__) { return handle_result(res); } +SEXP savvy_InternalDataFrame_filter__impl(SEXP self__, SEXP c_arg__exprs_sexp) { + SEXP res = savvy_InternalDataFrame_filter__ffi(self__, c_arg__exprs_sexp); + return handle_result(res); +} + SEXP savvy_InternalDataFrame_limit__impl(SEXP self__, SEXP c_arg__n) { SEXP res = savvy_InternalDataFrame_limit__ffi(self__, c_arg__n); return handle_result(res); @@ -159,6 +164,11 @@ SEXP savvy_InternalDataFrame_primary_geometry_column_index__impl(SEXP self__) { return handle_result(res); } +SEXP savvy_InternalDataFrame_select__impl(SEXP self__, SEXP c_arg__exprs_sexp) { + SEXP res = savvy_InternalDataFrame_select__ffi(self__, c_arg__exprs_sexp); + return handle_result(res); +} + SEXP savvy_InternalDataFrame_select_indices__impl(SEXP self__, SEXP c_arg__names, SEXP c_arg__indices) { @@ -261,15 +271,6 @@ SEXP savvy_SedonaDBExprFactory_column__impl(SEXP self__, SEXP c_arg__name, return handle_result(res); } -SEXP savvy_SedonaDBExprFactory_evaluate_scalar__impl(SEXP self__, - SEXP c_arg__exprs_sexp, - SEXP c_arg__stream_in, - SEXP c_arg__stream_out) { - SEXP res = savvy_SedonaDBExprFactory_evaluate_scalar__ffi( - self__, c_arg__exprs_sexp, c_arg__stream_in, c_arg__stream_out); - return handle_result(res); -} - SEXP savvy_SedonaDBExprFactory_literal__impl(SEXP c_arg__array_xptr, SEXP c_arg__schema_xptr) { SEXP res = savvy_SedonaDBExprFactory_literal__ffi(c_arg__array_xptr, @@ -321,10 +322,14 @@ static const R_CallMethodDef CallEntries[] = { (DL_FUNC)&savvy_InternalDataFrame_compute__impl, 2}, {"savvy_InternalDataFrame_count__impl", (DL_FUNC)&savvy_InternalDataFrame_count__impl, 1}, + {"savvy_InternalDataFrame_filter__impl", + (DL_FUNC)&savvy_InternalDataFrame_filter__impl, 2}, {"savvy_InternalDataFrame_limit__impl", (DL_FUNC)&savvy_InternalDataFrame_limit__impl, 2}, {"savvy_InternalDataFrame_primary_geometry_column_index__impl", (DL_FUNC)&savvy_InternalDataFrame_primary_geometry_column_index__impl, 1}, + {"savvy_InternalDataFrame_select__impl", + (DL_FUNC)&savvy_InternalDataFrame_select__impl, 2}, {"savvy_InternalDataFrame_select_indices__impl", (DL_FUNC)&savvy_InternalDataFrame_select_indices__impl, 3}, {"savvy_InternalDataFrame_show__impl", @@ -355,8 +360,6 @@ static const R_CallMethodDef CallEntries[] = { (DL_FUNC)&savvy_SedonaDBExprFactory_binary__impl, 4}, {"savvy_SedonaDBExprFactory_column__impl", (DL_FUNC)&savvy_SedonaDBExprFactory_column__impl, 3}, - {"savvy_SedonaDBExprFactory_evaluate_scalar__impl", - (DL_FUNC)&savvy_SedonaDBExprFactory_evaluate_scalar__impl, 4}, {"savvy_SedonaDBExprFactory_literal__impl", (DL_FUNC)&savvy_SedonaDBExprFactory_literal__impl, 2}, {"savvy_SedonaDBExprFactory_new__impl", diff --git a/r/sedonadb/src/rust/api.h b/r/sedonadb/src/rust/api.h index 876d7c264..b43df3f52 100644 --- a/r/sedonadb/src/rust/api.h +++ b/r/sedonadb/src/rust/api.h @@ -42,8 +42,10 @@ SEXP savvy_InternalContext_view__ffi(SEXP self__, SEXP c_arg__table_ref); SEXP savvy_InternalDataFrame_collect__ffi(SEXP self__, SEXP c_arg__out); SEXP savvy_InternalDataFrame_compute__ffi(SEXP self__, SEXP c_arg__ctx); SEXP savvy_InternalDataFrame_count__ffi(SEXP self__); +SEXP savvy_InternalDataFrame_filter__ffi(SEXP self__, SEXP c_arg__exprs_sexp); SEXP savvy_InternalDataFrame_limit__ffi(SEXP self__, SEXP c_arg__n); SEXP savvy_InternalDataFrame_primary_geometry_column_index__ffi(SEXP self__); +SEXP savvy_InternalDataFrame_select__ffi(SEXP self__, SEXP c_arg__exprs_sexp); SEXP savvy_InternalDataFrame_select_indices__ffi(SEXP self__, SEXP c_arg__names, SEXP c_arg__indices); SEXP savvy_InternalDataFrame_show__ffi(SEXP self__, SEXP c_arg__ctx, @@ -78,10 +80,6 @@ SEXP savvy_SedonaDBExprFactory_binary__ffi(SEXP self__, SEXP c_arg__op, SEXP c_arg__lhs, SEXP c_arg__rhs); SEXP savvy_SedonaDBExprFactory_column__ffi(SEXP self__, SEXP c_arg__name, SEXP c_arg__qualifier); -SEXP savvy_SedonaDBExprFactory_evaluate_scalar__ffi(SEXP self__, - SEXP c_arg__exprs_sexp, - SEXP c_arg__stream_in, - SEXP c_arg__stream_out); SEXP savvy_SedonaDBExprFactory_literal__ffi(SEXP c_arg__array_xptr, SEXP c_arg__schema_xptr); SEXP savvy_SedonaDBExprFactory_new__ffi(SEXP c_arg__ctx); diff --git a/r/sedonadb/src/rust/src/dataframe.rs b/r/sedonadb/src/rust/src/dataframe.rs index e34cee82d..275b1f4f5 100644 --- a/r/sedonadb/src/rust/src/dataframe.rs +++ b/r/sedonadb/src/rust/src/dataframe.rs @@ -21,6 +21,7 @@ use arrow_array::{RecordBatchIterator, RecordBatchReader}; use datafusion::catalog::MemTable; use datafusion::prelude::DataFrame; use datafusion_common::Column; +use datafusion_expr::utils::conjunction; use datafusion_expr::{select_expr::SelectExpr, Expr, SortExpr}; use datafusion_ffi::table_provider::FFI_TableProvider; use savvy::{savvy, savvy_err, sexp, IntoExtPtrSexp, Result}; @@ -33,6 +34,7 @@ use std::{iter::zip, ptr::swap_nonoverlapping, sync::Arc}; use tokio::runtime::Runtime; use crate::context::InternalContext; +use crate::expression::SedonaDBExprFactory; use crate::ffi::{import_schema, FFITableProviderR}; use crate::runtime::wait_for_future_captured_r; @@ -311,4 +313,21 @@ impl InternalDataFrame { let inner = self.inner.clone().select(exprs)?; Ok(new_data_frame(inner, self.runtime.clone())) } + + fn select(&self, exprs_sexp: savvy::Sexp) -> savvy::Result { + let exprs = SedonaDBExprFactory::exprs(exprs_sexp)?; + let inner = self.inner.clone().select(exprs)?; + Ok(new_data_frame(inner, self.runtime.clone())) + } + + fn filter(&self, exprs_sexp: savvy::Sexp) -> savvy::Result { + let exprs = SedonaDBExprFactory::exprs(exprs_sexp)?; + let inner = if let Some(single_filter) = conjunction(exprs) { + self.inner.clone().filter(single_filter)? + } else { + self.inner.clone() + }; + + Ok(new_data_frame(inner, self.runtime.clone())) + } } diff --git a/r/sedonadb/src/rust/src/expression.rs b/r/sedonadb/src/rust/src/expression.rs index 29a631e0e..e0753fd28 100644 --- a/r/sedonadb/src/rust/src/expression.rs +++ b/r/sedonadb/src/rust/src/expression.rs @@ -15,17 +15,12 @@ // specific language governing permissions and limitations // under the License. -use std::{iter::zip, ptr::swap_nonoverlapping, sync::Arc}; +use std::sync::Arc; -use arrow_array::{ - ffi_stream::FFI_ArrowArrayStream, RecordBatch, RecordBatchIterator, RecordBatchReader, -}; -use arrow_schema::{FieldRef, Schema}; -use datafusion::physical_plan::PhysicalExpr; -use datafusion_common::{Column, DFSchema, Result, ScalarValue}; +use datafusion_common::{Column, Result, ScalarValue}; use datafusion_expr::{ expr::{AggregateFunction, FieldMetadata, NullTreatment, ScalarFunction}, - BinaryExpr, Cast, ColumnarValue, Expr, Operator, + BinaryExpr, Cast, Expr, Operator, }; use savvy::{savvy, savvy_err, EnvironmentSexp}; use sedona::context::SedonaContext; @@ -177,70 +172,6 @@ impl SedonaDBExprFactory { Err(savvy_err!("Aggregate UDF '{name}' not found")) } } - - fn evaluate_scalar( - &self, - exprs_sexp: savvy::Sexp, - stream_in: savvy::Sexp, - stream_out: savvy::Sexp, - ) -> savvy::Result { - let out_void = unsafe { savvy_ffi::R_ExternalPtrAddr(stream_out.0) }; - if out_void.is_null() { - return Err(savvy_err!("external pointer to null in evaluate()")); - } - - let exprs = Self::exprs(exprs_sexp)?; - let expr_names = exprs - .iter() - .map(|e| e.schema_name().to_string()) - .collect::>(); - let reader_in = crate::ffi::import_array_stream(stream_in)?; - - let physical_exprs = exprs - .into_iter() - .map(|e| { - self.ctx.ctx.create_physical_expr( - e, - &DFSchema::try_from(reader_in.schema().as_ref().clone())?, - ) - }) - .collect::>>>()?; - - let out_fields = physical_exprs - .iter() - .map(|e| e.return_field(&reader_in.schema())) - .collect::>>()?; - let out_fields_named = zip(out_fields, expr_names) - .map(|(f, name)| f.as_ref().clone().with_name(name)) - .collect::>(); - let out_schema = Arc::new(Schema::new(out_fields_named)); - - let mut out_batches = Vec::new(); - let mut size = 0; - for batch in reader_in { - let batch = batch?; - size += batch.num_rows(); - let columns = physical_exprs - .iter() - .map(|e| e.evaluate(&batch)) - .collect::>>()?; - let out_batch = RecordBatch::try_new( - out_schema.clone(), - ColumnarValue::values_to_arrays(&columns)?, - )?; - out_batches.push(out_batch); - } - - let reader = Box::new(RecordBatchIterator::new( - out_batches.into_iter().map(Ok), - out_schema, - )); - let mut ffi_stream = FFI_ArrowArrayStream::new(reader); - let ffi_out = out_void as *mut FFI_ArrowArrayStream; - unsafe { swap_nonoverlapping(&mut ffi_stream, ffi_out, 1) }; - - savvy::Sexp::try_from(size as f64) - } } impl SedonaDBExprFactory { From 3cb052ce7dc309320c0c1cde09ffdc09197f2cc0 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Fri, 23 Jan 2026 11:10:53 -0600 Subject: [PATCH 10/11] test the minimal additions --- r/sedonadb/R/dataframe.R | 3 +- r/sedonadb/R/expression.R | 36 ----------- r/sedonadb/R/pkg-dplyr.R | 1 - r/sedonadb/man/sd_filter.Rd | 3 +- r/sedonadb/tests/testthat/test-dataframe.R | 68 +++++++++++++++++++++ r/sedonadb/tests/testthat/test-expression.R | 21 ------- 6 files changed, 70 insertions(+), 62 deletions(-) diff --git a/r/sedonadb/R/dataframe.R b/r/sedonadb/R/dataframe.R index 556749de8..614ed0662 100644 --- a/r/sedonadb/R/dataframe.R +++ b/r/sedonadb/R/dataframe.R @@ -261,8 +261,7 @@ sd_transmute <- function(.data, ...) { #' @export #' #' @examples -#' data.frame(x = 1:10) |> -#' sd_filter(x > 5) +#' data.frame(x = 1:10) |> sd_filter(x > 5) #' sd_filter <- function(.data, ...) { .data <- as_sedonadb_dataframe(.data) diff --git a/r/sedonadb/R/expression.R b/r/sedonadb/R/expression.R index c88f60c59..0d97c8989 100644 --- a/r/sedonadb/R/expression.R +++ b/r/sedonadb/R/expression.R @@ -159,42 +159,6 @@ sd_eval_expr <- function(expr, expr_ctx = sd_expr_ctx(env = env), env = parent.f ) } -#' Evaluate a list of R expressions into a stream of RecordBatch -#' -#' Internally this is creating a DataFusion PhysicalExpr and evaluating -#' it sequentially on each batch. This is primarily a tool for testing -#' the result of expressions but also may be useful for exposing scalar -#' functions for synchronous use. -#' -#' @param stream Input stream, or an object (such as a `data.frame()`) -#' that can be coerced to one. -#' @param exprs An list of R expressions (e.g., the result of `quote()`). -#' @param env An evaluation environment. Defaults to the calling environment. -#' -#' @returns A `SedonaDBExpr` -#' @noRd -sd_eval_stream <- function(stream, exprs, env = parent.frame()) { - stream <- nanoarrow::as_nanoarrow_array_stream( - stream, - geometry_schema = geoarrow::geoarrow_wkb() - ) - expr_ctx <- sd_expr_ctx(stream$get_schema(), env) - sd_exprs <- lapply(exprs, sd_eval_expr, expr_ctx = expr_ctx, env = env) - exprs_names <- names(exprs) - if (!is.null(exprs_names)) { - for (i in seq_along(sd_exprs)) { - name <- exprs_names[i] - if (!is.na(name) && name != "") { - sd_exprs[[i]] <- sd_expr_alias(sd_exprs[[i]], name, expr_ctx$factory) - } - } - } - - stream_out <- nanoarrow::nanoarrow_allocate_array_stream() - expr_ctx$factory$evaluate_scalar(sd_exprs, stream, stream_out) - stream_out -} - sd_eval_expr_inner <- function(expr, expr_ctx) { if (rlang::is_call(expr)) { # Extract `pkg::fun` or `fun` if this is a usual call (e.g., not diff --git a/r/sedonadb/R/pkg-dplyr.R b/r/sedonadb/R/pkg-dplyr.R index 4fbeebf22..e916202db 100644 --- a/r/sedonadb/R/pkg-dplyr.R +++ b/r/sedonadb/R/pkg-dplyr.R @@ -29,5 +29,4 @@ select.sedonadb_dataframe <- function(.data, ...) { df <- .data$df$select_indices(names(loc), loc - 1L) new_sedonadb_dataframe(.data$ctx, df) } - # nolint end diff --git a/r/sedonadb/man/sd_filter.Rd b/r/sedonadb/man/sd_filter.Rd index 1a327ce3c..a3d38ae14 100644 --- a/r/sedonadb/man/sd_filter.Rd +++ b/r/sedonadb/man/sd_filter.Rd @@ -20,7 +20,6 @@ An object of class sedonadb_dataframe Keep rows of a SedonaDB DataFrame that match a condition } \examples{ -data.frame(x = 1:10) |> - sd_filter(x > 5) +data.frame(x = 1:10) |> sd_filter(x > 5) } diff --git a/r/sedonadb/tests/testthat/test-dataframe.R b/r/sedonadb/tests/testthat/test-dataframe.R index 0bc3c4c1c..3d96c2398 100644 --- a/r/sedonadb/tests/testthat/test-dataframe.R +++ b/r/sedonadb/tests/testthat/test-dataframe.R @@ -286,3 +286,71 @@ test_that("sd_write_parquet validates geoparquet_version parameter", { "geoparquet_version must be" ) }) + +test_that("sd_select() works with dplyr-like select syntax", { + skip_if_not_installed("tidyselect") + + df_in <- data.frame(one = 1, two = "two", THREE = 3.0) + + expect_identical( + df_in |> sd_select(2:3) |> sd_collect(), + data.frame(two = "two", THREE = 3.0) + ) + + expect_identical( + df_in |> sd_select(three_renamed = THREE, one) |> sd_collect(), + data.frame(three_renamed = 3.0, one = 1) + ) + + expect_identical( + df_in |> sd_select(TWO = two) |> sd_collect(), + data.frame(TWO = "two") + ) +}) + +test_that("sd_transmute() works with dplyr-like transmute syntax", { + df_in <- data.frame(x = 1:10) + + # checks that (1) unnamed inputs like `x` are named `x` in the output + # and (2) named inputs are given an alias and (3) expressions are + # translated. + expect_identical( + df_in |> sd_transmute(x, y = x + 1L) |> sd_collect(), + data.frame(x = 1:10, y = 2:11) + ) + + # Check that the calling environment is handled + integer_one <- 1L + expect_identical( + df_in |> sd_transmute(x, y = x + integer_one) |> sd_collect(), + data.frame(x = 1:10, y = 2:11) + ) +}) + +test_that("sd_filter() works with dplyr-like filter syntax", { + df_in <- data.frame(x = 1:10) + + # Zero conditions + expect_identical( + df_in |> sd_filter() |> sd_collect(), + df_in + ) + + # One condition + expect_identical( + df_in |> sd_filter(x >= 5) |> sd_collect(), + data.frame(x = 5:10) + ) + + # Multiple conditions + expect_identical( + df_in |> sd_filter(x >= 5, x >= 6) |> sd_collect(), + data.frame(x = 6:10) + ) + + # Ensure null handling of conditions is dplyr-like (drops nulls) + expect_identical( + df_in |> sd_filter(x >= NA_integer_) |> sd_collect(), + data.frame(x = integer()) + ) +}) diff --git a/r/sedonadb/tests/testthat/test-expression.R b/r/sedonadb/tests/testthat/test-expression.R index c3b59ccb6..f0f3d5af2 100644 --- a/r/sedonadb/tests/testthat/test-expression.R +++ b/r/sedonadb/tests/testthat/test-expression.R @@ -78,24 +78,3 @@ test_that("errors that occur during evaluation have reasonable context", { function_without_a_translation <- function(x) x + 1L expect_snapshot(sd_eval_expr(quote(stop("this will error"))), error = TRUE) }) - -test_that("sd_eval_stream() evaluates scalar expressions synchronously", { - df_in <- data.frame(x = 1:10) - - # With no name provided, the DataFusion-generated name - df_out <- as.data.frame(sd_eval_stream(df_in, list(quote(x + 1L)))) - expect_identical(names(df_out), "x + Int32(1)") - - # With a name provided, we should get the column name - df_out <- as.data.frame(sd_eval_stream(df_in, list(y = quote(x + 1L)))) - expect_identical(df_out, data.frame(y = 2:11)) - - # Multiple in batches should yield multiple out batches - stream_in <- nanoarrow::basic_array_stream(list( - data.frame(x = 1:5), - data.frame(x = 6:10) - )) - stream_out <- sd_eval_stream(stream_in, list(y = quote(x + 1L))) - expect_identical(as.data.frame(stream_out$get_next()), data.frame(y = 2:6)) - expect_identical(as.data.frame(stream_out$get_next()), data.frame(y = 7:11)) -}) From 6355bc21b63863d14d085e19a97713634bec3c30 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Fri, 23 Jan 2026 11:18:54 -0600 Subject: [PATCH 11/11] doc --- r/sedonadb/R/dataframe.R | 5 ++++- r/sedonadb/man/sd_compute.Rd | 2 +- r/sedonadb/man/sd_count.Rd | 2 +- r/sedonadb/man/sd_filter.Rd | 2 +- r/sedonadb/man/sd_preview.Rd | 2 +- r/sedonadb/man/sd_select.Rd | 2 +- r/sedonadb/man/sd_to_view.Rd | 2 +- r/sedonadb/man/sd_transmute.Rd | 2 +- r/sedonadb/man/sd_write_parquet.Rd | 2 +- 9 files changed, 12 insertions(+), 9 deletions(-) diff --git a/r/sedonadb/R/dataframe.R b/r/sedonadb/R/dataframe.R index 614ed0662..464210fc1 100644 --- a/r/sedonadb/R/dataframe.R +++ b/r/sedonadb/R/dataframe.R @@ -80,7 +80,7 @@ as_sedonadb_dataframe.datafusion_table_provider <- function(x, ..., schema = NUL #' Count rows in a DataFrame #' -#' @param .data A sedonadb_dataframe +#' @param .data A sedonadb_dataframe or an object that can be coerced to one. #' #' @returns The number of rows after executing the query #' @export @@ -89,6 +89,7 @@ as_sedonadb_dataframe.datafusion_table_provider <- function(x, ..., schema = NUL #' sd_sql("SELECT 1 as one") |> sd_count() #' sd_count <- function(.data) { + .data <- as_sedonadb_dataframe(.data) .data$df$count() } @@ -331,6 +332,8 @@ sd_write_parquet <- function( geoparquet_version = "1.0", overwrite_bbox_columns = FALSE ) { + .data <- as_sedonadb_dataframe(.data) + # Determine single_file_output default based on path and partition_by if (is.null(single_file_output)) { single_file_output <- length(partition_by) == 0 && grepl("\\.parquet$", path) diff --git a/r/sedonadb/man/sd_compute.Rd b/r/sedonadb/man/sd_compute.Rd index 97590fd67..ecf7de0b9 100644 --- a/r/sedonadb/man/sd_compute.Rd +++ b/r/sedonadb/man/sd_compute.Rd @@ -10,7 +10,7 @@ sd_compute(.data) sd_collect(.data, ptype = NULL) } \arguments{ -\item{.data}{A sedonadb_dataframe} +\item{.data}{A sedonadb_dataframe or an object that can be coerced to one.} \item{ptype}{The target R object. See \link[nanoarrow:convert_array_stream]{nanoarrow::convert_array_stream}.} } diff --git a/r/sedonadb/man/sd_count.Rd b/r/sedonadb/man/sd_count.Rd index c93b9d53d..fb48dd285 100644 --- a/r/sedonadb/man/sd_count.Rd +++ b/r/sedonadb/man/sd_count.Rd @@ -7,7 +7,7 @@ sd_count(.data) } \arguments{ -\item{.data}{A sedonadb_dataframe} +\item{.data}{A sedonadb_dataframe or an object that can be coerced to one.} } \value{ The number of rows after executing the query diff --git a/r/sedonadb/man/sd_filter.Rd b/r/sedonadb/man/sd_filter.Rd index a3d38ae14..f5e642347 100644 --- a/r/sedonadb/man/sd_filter.Rd +++ b/r/sedonadb/man/sd_filter.Rd @@ -7,7 +7,7 @@ sd_filter(.data, ...) } \arguments{ -\item{.data}{A sedonadb_dataframe} +\item{.data}{A sedonadb_dataframe or an object that can be coerced to one.} \item{...}{Unnamed expressions for filter conditions. These are evaluated in the same way as \code{\link[dplyr:filter]{dplyr::filter()}} except does not support extra diff --git a/r/sedonadb/man/sd_preview.Rd b/r/sedonadb/man/sd_preview.Rd index 351dd5a76..c9e09f0af 100644 --- a/r/sedonadb/man/sd_preview.Rd +++ b/r/sedonadb/man/sd_preview.Rd @@ -7,7 +7,7 @@ sd_preview(.data, n = NULL, ascii = NULL, width = NULL) } \arguments{ -\item{.data}{A sedonadb_dataframe} +\item{.data}{A sedonadb_dataframe or an object that can be coerced to one.} \item{n}{The number of rows to preview. Use \code{Inf} to preview all rows. Defaults to \code{getOption("pillar.print_max")}.} diff --git a/r/sedonadb/man/sd_select.Rd b/r/sedonadb/man/sd_select.Rd index 215236aa6..9ef542861 100644 --- a/r/sedonadb/man/sd_select.Rd +++ b/r/sedonadb/man/sd_select.Rd @@ -7,7 +7,7 @@ sd_select(.data, ...) } \arguments{ -\item{.data}{A sedonadb_dataframe} +\item{.data}{A sedonadb_dataframe or an object that can be coerced to one.} \item{...}{One or more bare names. Evaluated like \code{\link[dplyr:select]{dplyr::select()}}.} } diff --git a/r/sedonadb/man/sd_to_view.Rd b/r/sedonadb/man/sd_to_view.Rd index 5c3ab020c..dce288498 100644 --- a/r/sedonadb/man/sd_to_view.Rd +++ b/r/sedonadb/man/sd_to_view.Rd @@ -7,7 +7,7 @@ sd_to_view(.data, table_ref, overwrite = FALSE) } \arguments{ -\item{.data}{A sedonadb_dataframe} +\item{.data}{A sedonadb_dataframe or an object that can be coerced to one.} \item{table_ref}{The name of the view reference} diff --git a/r/sedonadb/man/sd_transmute.Rd b/r/sedonadb/man/sd_transmute.Rd index 4e226b7a7..750e3fe81 100644 --- a/r/sedonadb/man/sd_transmute.Rd +++ b/r/sedonadb/man/sd_transmute.Rd @@ -7,7 +7,7 @@ sd_transmute(.data, ...) } \arguments{ -\item{.data}{A sedonadb_dataframe} +\item{.data}{A sedonadb_dataframe or an object that can be coerced to one.} \item{...}{Named expressions for new columns to create. These are evaluated in the same way as \code{\link[dplyr:transmute]{dplyr::transmute()}} except does not support extra diff --git a/r/sedonadb/man/sd_write_parquet.Rd b/r/sedonadb/man/sd_write_parquet.Rd index afd483849..17aa01047 100644 --- a/r/sedonadb/man/sd_write_parquet.Rd +++ b/r/sedonadb/man/sd_write_parquet.Rd @@ -15,7 +15,7 @@ sd_write_parquet( ) } \arguments{ -\item{.data}{A sedonadb_dataframe} +\item{.data}{A sedonadb_dataframe or an object that can be coerced to one.} \item{path}{A filename or directory to which parquet file(s) should be written}