Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions datafusion-cli/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use arrow::buffer::{Buffer, OffsetBuffer, ScalarBuffer};
use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef, TimeUnit};
use arrow::record_batch::RecordBatch;
use arrow::util::pretty::pretty_format_batches;
use datafusion::catalog::{Session, TableFunctionImpl};
use datafusion::catalog::{Session, TableFunctionArgs, TableFunctionImpl};
use datafusion::common::{Column, plan_err};
use datafusion::datasource::TableProvider;
use datafusion::datasource::memory::MemorySourceConfig;
Expand Down Expand Up @@ -326,7 +326,8 @@ fn fixed_len_byte_array_to_string(val: Option<&FixedLenByteArray>) -> Option<Str
pub struct ParquetMetadataFunc {}

impl TableFunctionImpl for ParquetMetadataFunc {
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
fn call_with_args(&self, args: TableFunctionArgs) -> Result<Arc<dyn TableProvider>> {
let exprs = args.args;
let filename = match exprs.first() {
Some(Expr::Literal(ScalarValue::Utf8(Some(s)), _)) => s, // single quote: parquet_metadata('x.parquet')
Some(Expr::Column(Column { name, .. })) => name, // double quote: parquet_metadata("x.parquet")
Expand Down Expand Up @@ -517,7 +518,8 @@ impl MetadataCacheFunc {
}

impl TableFunctionImpl for MetadataCacheFunc {
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
fn call_with_args(&self, args: TableFunctionArgs) -> Result<Arc<dyn TableProvider>> {
let exprs = args.args;
if !exprs.is_empty() {
return plan_err!("metadata_cache should have no arguments");
}
Expand Down Expand Up @@ -635,7 +637,8 @@ impl StatisticsCacheFunc {
}

impl TableFunctionImpl for StatisticsCacheFunc {
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
fn call_with_args(&self, args: TableFunctionArgs) -> Result<Arc<dyn TableProvider>> {
let exprs = args.args;
if !exprs.is_empty() {
return plan_err!("statistics_cache should have no arguments");
}
Expand Down Expand Up @@ -770,7 +773,8 @@ impl ListFilesCacheFunc {
}

impl TableFunctionImpl for ListFilesCacheFunc {
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
fn call_with_args(&self, args: TableFunctionArgs) -> Result<Arc<dyn TableProvider>> {
let exprs = args.args;
if !exprs.is_empty() {
return plan_err!("list_files_cache should have no arguments");
}
Expand Down
21 changes: 11 additions & 10 deletions datafusion-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,14 @@ cargo run --example dataframe -- dataframe

#### Category: Single Process

| Subcommand | File Path | Description |
| ---------- | ------------------------------------------------------- | ----------------------------------------------- |
| adv_udaf | [`udf/advanced_udaf.rs`](examples/udf/advanced_udaf.rs) | Advanced User Defined Aggregate Function (UDAF) |
| adv_udf | [`udf/advanced_udf.rs`](examples/udf/advanced_udf.rs) | Advanced User Defined Scalar Function (UDF) |
| adv_udwf | [`udf/advanced_udwf.rs`](examples/udf/advanced_udwf.rs) | Advanced User Defined Window Function (UDWF) |
| async_udf | [`udf/async_udf.rs`](examples/udf/async_udf.rs) | Asynchronous User Defined Scalar Function |
| udaf | [`udf/simple_udaf.rs`](examples/udf/simple_udaf.rs) | Simple UDAF example |
| udf | [`udf/simple_udf.rs`](examples/udf/simple_udf.rs) | Simple UDF example |
| udtf | [`udf/simple_udtf.rs`](examples/udf/simple_udtf.rs) | Simple UDTF example |
| udwf | [`udf/simple_udwf.rs`](examples/udf/simple_udwf.rs) | Simple UDWF example |
| Subcommand | File Path | Description |
| --------------- | ----------------------------------------------------------- | ----------------------------------------------- |
| adv_udaf | [`udf/advanced_udaf.rs`](examples/udf/advanced_udaf.rs) | Advanced User Defined Aggregate Function (UDAF) |
| adv_udf | [`udf/advanced_udf.rs`](examples/udf/advanced_udf.rs) | Advanced User Defined Scalar Function (UDF) |
| adv_udwf | [`udf/advanced_udwf.rs`](examples/udf/advanced_udwf.rs) | Advanced User Defined Window Function (UDWF) |
| async_udf | [`udf/async_udf.rs`](examples/udf/async_udf.rs) | Asynchronous User Defined Scalar Function |
| udaf | [`udf/simple_udaf.rs`](examples/udf/simple_udaf.rs) | Simple UDAF example |
| udf | [`udf/simple_udf.rs`](examples/udf/simple_udf.rs) | Simple UDF example |
| udtf | [`udf/simple_udtf.rs`](examples/udf/simple_udtf.rs) | Simple UDTF example |
| udwf | [`udf/simple_udwf.rs`](examples/udf/simple_udwf.rs) | Simple UDWF example |
| table_list_udtf | [`udf/table_list_udtf.rs`](examples/udf/table_list_udtf.rs) | Session-aware UDTF table list example |
8 changes: 7 additions & 1 deletion datafusion-examples/examples/udf/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
//!
//! ## Usage
//! ```bash
//! cargo run --example udf -- [all|adv_udaf|adv_udf|adv_udwf|async_udf|udaf|udf|udtf|udwf]
//! cargo run --example udf -- [all|adv_udaf|adv_udf|adv_udwf|async_udf|udaf|udf|udtf|udwf|table_list_udtf]
//! ```
//!
//! Each subcommand runs a corresponding example:
Expand Down Expand Up @@ -50,6 +50,9 @@
//!
//! - `udwf`
//! (file: simple_udwf.rs, desc: Simple UDWF example)
//!
//! - `table_list_udtf`
//! (file: table_list_udtf.rs, desc: Session-aware UDTF table list example)

mod advanced_udaf;
mod advanced_udf;
Expand All @@ -59,6 +62,7 @@ mod simple_udaf;
mod simple_udf;
mod simple_udtf;
mod simple_udwf;
mod table_list_udtf;

use datafusion::error::{DataFusionError, Result};
use strum::{IntoEnumIterator, VariantNames};
Expand All @@ -76,6 +80,7 @@ enum ExampleKind {
Udaf,
Udwf,
Udtf,
TableListUdtf,
}

impl ExampleKind {
Expand All @@ -101,6 +106,7 @@ impl ExampleKind {
ExampleKind::Udf => simple_udf::simple_udf().await?,
ExampleKind::Udtf => simple_udtf::simple_udtf().await?,
ExampleKind::Udwf => simple_udwf::simple_udwf().await?,
ExampleKind::TableListUdtf => table_list_udtf::table_list_udtf().await?,
}

Ok(())
Expand Down
5 changes: 3 additions & 2 deletions datafusion-examples/examples/udf/simple_udtf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use arrow::csv::reader::Format;
use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::catalog::{Session, TableFunctionImpl};
use datafusion::catalog::{Session, TableFunctionArgs, TableFunctionImpl};
use datafusion::common::{ScalarValue, plan_err};
use datafusion::datasource::TableProvider;
use datafusion::datasource::memory::MemorySourceConfig;
Expand Down Expand Up @@ -135,7 +135,8 @@ impl TableProvider for LocalCsvTable {
struct LocalCsvTableFunc {}

impl TableFunctionImpl for LocalCsvTableFunc {
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
fn call_with_args(&self, args: TableFunctionArgs) -> Result<Arc<dyn TableProvider>> {
let exprs = args.args;
let Some(Expr::Literal(ScalarValue::Utf8(Some(path)), _)) = exprs.first() else {
return plan_err!("read_csv requires at least one string argument");
};
Expand Down
128 changes: 128 additions & 0 deletions datafusion-examples/examples/udf/table_list_udtf.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! See `main.rs` for how to run it.

use std::sync::{Arc, LazyLock};

use arrow::array::{RecordBatch, StringBuilder};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use datafusion::{
catalog::{MemTable, TableFunctionArgs, TableFunctionImpl, TableProvider},
common::Result,
execution::SessionState,
prelude::SessionContext,
};
use datafusion_common::{DataFusionError, plan_err};
use tokio::{runtime::Handle, task::block_in_place};

const FUNCTION_NAME: &str = "table_list";

// The example shows, how to create UDTF that depends on the session state.
// Defines a `table_list` UDTF that returns a list of tables within the provided session.

pub async fn table_list_udtf() -> Result<()> {
let ctx = SessionContext::new();
ctx.register_udtf(FUNCTION_NAME, Arc::new(TableListUdtf));

// Register different kinds of tables.
ctx.sql("create view v as select 1")
.await?
.collect()
.await?;
ctx.sql("create table t(a int)").await?.collect().await?;

// Print results.
ctx.sql("select * from table_list()").await?.show().await?;

Ok(())
}

#[derive(Debug, Default)]
struct TableListUdtf;

static SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
SchemaRef::new(Schema::new(vec![
Field::new("catalog", DataType::Utf8, false),
Field::new("schema", DataType::Utf8, false),
Field::new("table", DataType::Utf8, false),
Field::new("type", DataType::Utf8, false),
]))
});

impl TableFunctionImpl for TableListUdtf {
fn call_with_args(&self, args: TableFunctionArgs) -> Result<Arc<dyn TableProvider>> {
if !args.args.is_empty() {
return plan_err!(
"{}: unexpected number of arguments: {}, expected: 0",
FUNCTION_NAME,
args.args.len()
);
}
let state = args
.session
.as_any()
.downcast_ref::<SessionState>()
.ok_or_else(|| {
DataFusionError::Internal("failed to downcast state".into())
})?;

let mut catalogs = StringBuilder::new();
let mut schemas = StringBuilder::new();
let mut tables = StringBuilder::new();
let mut types = StringBuilder::new();

let catalog_list = state.catalog_list();
for catalog_name in catalog_list.catalog_names() {
let Some(catalog) = catalog_list.catalog(&catalog_name) else {
continue;
};
for schema_name in catalog.schema_names() {
let Some(schema) = catalog.schema(&schema_name) else {
continue;
};
for table_name in schema.table_names() {
let Some(provider) = block_in_place(|| {
Handle::current().block_on(schema.table(&table_name))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

datafusion-examples/examples/udf/table_list_udtf.rs:100: Using block_in_place + Handle::current().block_on(...) will panic if table_list() is invoked without a Tokio runtime, and block_in_place also requires the multi-thread runtime. Since UDTFs can be called from various execution contexts, it may be worth documenting/guarding these runtime assumptions for the example.

Severity: medium

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:good-to-have; category:documentation; feedback: The Augment AI reviewer is correct! The example would work only in multi-threaded Tokio runtime. It would be good to document this (e.g. with a comment), so the users copying this example into their application are aware of this limitation.

})?
else {
continue;
};
catalogs.append_value(catalog_name.clone());
schemas.append_value(schema_name.clone());
tables.append_value(table_name.clone());
types.append_value(provider.table_type().to_string())
}
}
}
Comment on lines +90 to +111
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current implementation iterates through catalogs, schemas, and tables, and for each table, it blocks to get the table provider. Using block_in_place inside a loop can be inefficient as it blocks the current thread from running other tasks.

To improve performance, you could gather all the asynchronous schema.table() calls and execute them concurrently using futures::future::try_join_all. This would perform all the lookups in parallel.

        let mut table_futures = vec![];
        for catalog_name in catalog_list.catalog_names() {
            if let Some(catalog) = catalog_list.catalog(&catalog_name) {
                for schema_name in catalog.schema_names() {
                    if let Some(schema) = catalog.schema(&schema_name) {
                        for table_name in schema.table_names() {
                            let schema_clone = Arc::clone(&schema);
                            let catalog_name_clone = catalog_name.clone();
                            let schema_name_clone = schema_name.clone();
                            let table_name_clone = table_name.clone();
                            table_futures.push(async move {
                                schema_clone.table(&table_name_clone).await.map(|provider| {
                                    (catalog_name_clone, schema_name_clone, table_name_clone, provider)
                                })
                            });
                        }
                    }
                }
            }
        }

        let results = block_in_place(|| {
            Handle::current().block_on(futures::future::try_join_all(table_futures))
        })?;

        for (catalog_name, schema_name, table_name, provider) in results {
            if let Some(provider) = provider {
                catalogs.append_value(catalog_name);
                schemas.append_value(schema_name);
                tables.append_value(table_name);
                types.append_value(provider.table_type().to_string())
            }
        }

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:good-to-have; category:bug; feedback: The Gemini AI reviewer is correct! The current solution uses Tokio's block_in_place() method that works only in multi-threaded runtime. By using futures::try_join_all the code will work both in multi-threaded runtime and current-thread runtime. In addition it will be more concurrent.


let batch = RecordBatch::try_new(
Arc::clone(&SCHEMA),
vec![
Arc::new(catalogs.finish()),
Arc::new(schemas.finish()),
Arc::new(tables.finish()),
Arc::new(types.finish()),
],
)?;

Ok(Arc::new(MemTable::try_new(
batch.schema(),
vec![vec![batch]],
)?))
}
}
37 changes: 35 additions & 2 deletions datafusion/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use std::sync::Arc;
use crate::session::Session;
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use datafusion_common::Result;
use datafusion_common::{Constraints, Statistics, not_impl_err};
use datafusion_common::{Result, internal_err};
use datafusion_expr::Expr;

use datafusion_expr::dml::InsertOp;
Expand Down Expand Up @@ -507,10 +507,30 @@ pub trait TableProviderFactory: Debug + Sync + Send {
) -> Result<Arc<dyn TableProvider>>;
}

/// Describes arguments provided to the table function call.
pub struct TableFunctionArgs<'a> {
/// Call arguments.
pub args: &'a [Expr],
/// Session within which the function is called.
pub session: &'a dyn Session,
}
Comment on lines +510 to +516
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Future-proof TableFunctionArgs before this API ships.

This type is already constructed cross-crate from datafusion/core/src/execution/session_state.rs, so the current public-field shape freezes it immediately. If you ever need to carry more call metadata, adding a field becomes a breaking change for downstream UDTFs. A constructor plus #[non_exhaustive] would keep the new API extensible.

Suggested API hardening
+#[non_exhaustive]
 /// Describes arguments provided to the table function call.
 pub struct TableFunctionArgs<'a> {
     /// Call arguments.
     pub args: &'a [Expr],
     /// Session within which the function is called.
     pub session: &'a dyn Session,
 }
+
+impl<'a> TableFunctionArgs<'a> {
+    pub fn new(args: &'a [Expr], session: &'a dyn Session) -> Self {
+        Self { args, session }
+    }
+}

Then switch external construction sites to TableFunctionArgs::new(...).

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/// Describes arguments provided to the table function call.
pub struct TableFunctionArgs<'a> {
/// Call arguments.
pub args: &'a [Expr],
/// Session within which the function is called.
pub session: &'a dyn Session,
}
#[non_exhaustive]
/// Describes arguments provided to the table function call.
pub struct TableFunctionArgs<'a> {
/// Call arguments.
pub args: &'a [Expr],
/// Session within which the function is called.
pub session: &'a dyn Session,
}
impl<'a> TableFunctionArgs<'a> {
pub fn new(args: &'a [Expr], session: &'a dyn Session) -> Self {
Self { args, session }
}
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@datafusion/catalog/src/table.rs` around lines 510 - 516, Make
TableFunctionArgs future-proof by marking the struct #[non_exhaustive], making
its fields private, and adding a public constructor and accessors: add
TableFunctionArgs::new(args: &'a [Expr], session: &'a dyn Session) -> Self and
getters like args(&self) and session(&self); update all external construction
sites (e.g., where session_state.rs builds the struct) to call
TableFunctionArgs::new(...) instead of constructing fields directly so adding
new fields later won’t be breaking.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:good-to-have; category:bug; feedback: The CodeRabbit AI reviewer is correct! The new struct is fully public (it is public itself and all its fields are public too). This makes it too open for the external users and thus less extendable in the future. It would be better to hide the fields, add a constructor method and getters. This way more fields could be added later without breaking the existing users.


/// A trait for table function implementations
pub trait TableFunctionImpl: Debug + Sync + Send + Any {
/// Create a table provider
fn call(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>>;
#[deprecated(
since = "53.0.0",
note = "Implement `TableFunctionImpl::call_with_args` instead"
)]
fn call(&self, _args: &[Expr]) -> Result<Arc<dyn TableProvider>> {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

datafusion/catalog/src/table.rs:525: Because both TableFunctionImpl::call and call_with_args now have default implementations, it’s possible to impl TableFunctionImpl for X {} and compile successfully but only fail at runtime with internal_err!("unimplemented"). Is that loss of compile-time enforcement intentional for this API transition?

Severity: medium

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:valid-but-wont-fix; category:bug; feedback: The Augment AI reviewer is not correct! The default implementation for call_with_args is provided, so that the existing users do not need to fix their builds when they upgrade to the next version of DataFusion. Only the new users could break with an internal error but they will quickly realize this and fix their implementation.

internal_err!("unimplemented")
}

/// Create a table provider
fn call_with_args(&self, args: TableFunctionArgs) -> Result<Arc<dyn TableProvider>> {
#[expect(deprecated)]
self.call(args.args)
}
}

/// A table that uses a function to generate data
Expand Down Expand Up @@ -539,7 +559,20 @@ impl TableFunction {
}

/// Get the function implementation and generate a table
#[deprecated(
since = "53.0.0",
note = "Use `TableFunction::create_table_provider_with_args` instead"
)]
pub fn create_table_provider(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>> {
#[expect(deprecated)]
self.fun.call(args)
}

/// Get the function implementation and generate a table
pub fn create_table_provider_with_args(
&self,
args: TableFunctionArgs,
) -> Result<Arc<dyn TableProvider>> {
self.fun.call_with_args(args)
}
}
7 changes: 6 additions & 1 deletion datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1830,6 +1830,8 @@ impl ContextProvider for SessionContextProvider<'_> {
name: &str,
args: Vec<Expr>,
) -> datafusion_common::Result<Arc<dyn TableSource>> {
use datafusion_catalog::TableFunctionArgs;

let tbl_func = self
.state
.table_functions
Expand All @@ -1852,7 +1854,10 @@ impl ContextProvider for SessionContextProvider<'_> {
.and_then(|e| simplifier.simplify(e))
})
.collect::<datafusion_common::Result<Vec<_>>>()?;
let provider = tbl_func.create_table_provider(&args)?;
let provider = tbl_func.create_table_provider_with_args(TableFunctionArgs {
args: &args,
session: self.state,
})?;

Ok(provider_as_source(provider))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ use datafusion::error::Result;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::{ExecutionPlan, collect};
use datafusion::prelude::SessionContext;
use datafusion_catalog::Session;
use datafusion_catalog::TableFunctionImpl;
use datafusion_catalog::{Session, TableFunctionArgs};
use datafusion_common::{DFSchema, ScalarValue};
use datafusion_expr::{EmptyRelation, Expr, LogicalPlan, Projection, TableType};

Expand Down Expand Up @@ -200,7 +200,8 @@ impl SimpleCsvTable {
struct SimpleCsvTableFunc {}

impl TableFunctionImpl for SimpleCsvTableFunc {
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
fn call_with_args(&self, args: TableFunctionArgs) -> Result<Arc<dyn TableProvider>> {
let exprs = args.args;
let mut new_exprs = vec![];
let mut filepath = String::new();
for expr in exprs {
Expand Down Expand Up @@ -231,7 +232,7 @@ async fn test_udtf_type_coercion() -> Result<()> {
struct NoOpTableFunc;

impl TableFunctionImpl for NoOpTableFunc {
fn call(&self, _: &[Expr]) -> Result<Arc<dyn TableProvider>> {
fn call_with_args(&self, _: TableFunctionArgs) -> Result<Arc<dyn TableProvider>> {
let schema = Arc::new(arrow::datatypes::Schema::empty());
Ok(Arc::new(MemTable::try_new(schema, vec![vec![]])?))
}
Expand Down
Loading
Loading