Skip to content

20555: feat : support spark compatible int to timestamp cast#263

Open
martin-augment wants to merge 4 commits intomainfrom
pr-20555-2026-03-05-14-20-50
Open

20555: feat : support spark compatible int to timestamp cast#263
martin-augment wants to merge 4 commits intomainfrom
pr-20555-2026-03-05-14-20-50

Conversation

@martin-augment
Copy link
Copy Markdown
Owner

20555: To review by AI

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Mar 5, 2026

Walkthrough

This change introduces a new SparkCast function for Spark-compatible CAST semantics converting integer types to timestamps with microsecond precision. The implementation includes casting from Int8, Int16, Int32, and Int64 to TimestampMicrosecond with saturating overflow behavior. It handles both array and scalar inputs with optional timezone awareness, defaulting to UTC. The function is integrated into the conversion module as a public UDF and includes unit tests validating array casts, overflow behavior, null handling, timezone-specific behavior, and error paths for unsupported types. SQL logic tests provide additional validation coverage.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch pr-20555-2026-03-05-14-20-50

Tip

Try Coding Plans. Let us write the prompt for your AI agent so you can ship faster (with fewer bugs).
Share your feedback on Discord.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

other
),
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return type mismatch: declared without timezone, produced with timezone

High Severity

return_field_from_args declares the return type as Timestamp(Microsecond, None) (no timezone) via parse_target_type, but invoke_with_args produces data with Timestamp(Microsecond, Some(session_tz)) by passing the session timezone to cast_to_timestamp. This schema mismatch between the planned return type and the actual output can cause downstream type validation failures and incorrect behavior. The reference implementation from_unixtime shows the correct pattern: declared and actual return types must match on the timezone component.

Additional Locations (1)

Fix in Cursor Fix in Web

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback: The Bugbot AI reviewer is correct! The UDF's return type says that the timezone is always NULL/None, but actually it is either the one specified in the session or UTC, as default. The mismatch may lead to problems in any other UDFs which consume the result of the "spark_cast()" function.

@claude
Copy link
Copy Markdown

claude bot commented Mar 5, 2026

Code Review: Spark-compatible int to timestamp cast

Overall this is a solid implementation with good test coverage. A few issues worth addressing before merging.

Bug: Return type mismatch between planning and execution

In return_field_from_args, the return field is declared as Timestamp(Microsecond, None) (no timezone). But invoke_with_args always attaches a timezone (defaulting to UTC). Arrow/DataFusion treat Timestamp(Microsecond, None) and Timestamp(Microsecond, Some(tz)) as semantically different types. The query planner sees timezone-naive output but the executor produces timezone-aware output, which can cause downstream type mismatches. Either encode the timezone into the return field during planning (see how from_utc_timestamp handles this in return_field_from_args), or produce a consistent timezone-naive result from both planning and execution.

Signature uses TypeSignature::Any(2) -- inconsistent with codebase conventions

This is the only function in the spark crate using TypeSignature::Any. Every other function (e.g., from_utc_timestamp, trunc, date_trunc) uses Signature::coercible with Coercion-based type constraints. TypeSignature::Any(2) bypasses the query planner's type coercion system entirely, so invalid inputs like spark_cast(1.5::float, 'timestamp') pass planning and only fail at execution with a less-helpful error. Consider using explicit coercions per supported integer type, consistent with how other functions declare their signatures in this codebase.

Nullable logic should only consider the value argument

args.arg_fields.iter().any(|f| f.is_nullable()) includes the second argument (the string literal). Output nullability should reflect only the first argument: args.arg_fields[0].is_nullable().

Performance: prefer array iterator over index loop

cast_int_to_timestamp uses an index loop with is_null(i) checks. A more idiomatic approach: builder.extend(arr.iter().map(|opt| opt.map(|v| secs_to_micros(v.into())))). This is cleaner and lets Arrow optimize null propagation via the bitmap.

Minor: mod cast should be pub mod cast

The datetime and other modules use pub mod for all submodules. Keeping this consistent makes the type accessible externally if needed.

What works well

  • Overflow behavior (saturating multiplication) is correctly implemented and documented
  • Test coverage is thorough: unit tests for all int types, nulls, overflow, and timezone propagation
  • The sqllogictest file covers the main SQL-level behavior
  • The secs_to_micros helper with inline is clean

@gemini-code-assist
Copy link
Copy Markdown

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request enhances the DataFusion Spark compatibility module by introducing a new spark_cast function. This function enables the conversion of integer types to timestamps, aligning with Spark's semantics. It interprets integer values as seconds from the Unix epoch, produces microsecond-precision timestamps, and includes robust overflow handling and timezone awareness to ensure accurate and consistent data transformations.

Highlights

  • New spark_cast Function: Introduced a new scalar user-defined function (UDF) named spark_cast to provide Spark-compatible type conversions within DataFusion.
  • Integer to Timestamp Conversion: Implemented support for casting various integer types (Int8, Int16, Int32, Int64) to Timestamp with microsecond precision. Integer inputs are interpreted as seconds since the Unix epoch.
  • Overflow Handling: Incorporated saturating multiplication to handle potential overflow during conversion, clamping values to i64::MAX or i64::MIN as per Spark's specification.
  • Timezone Support: The spark_cast function now respects session timezones, falling back to UTC if no specific timezone is set.
  • Comprehensive Testing: Added extensive unit tests and SQL logic tests to validate the spark_cast function's behavior across different integer types, null values, boundary conditions, and timezone scenarios.
Changelog
  • datafusion/spark/src/function/conversion/cast.rs
    • Added a new file containing the SparkCast struct, which implements the ScalarUDFImpl trait for Spark-compatible integer to timestamp casting.
    • Implemented helper functions secs_to_micros for saturating multiplication and cast_int_to_timestamp for array processing.
    • Included comprehensive unit tests for scalar and array inputs, covering various integer types, nulls, boundary values, overflow, and timezone handling.
  • datafusion/spark/src/function/conversion/mod.rs
    • Declared the new cast module.
    • Registered the spark_cast function using the make_udf_function! macro.
    • Exported spark_cast in the expr_fn module and added it to the list of available functions.
  • datafusion/sqllogictest/test_files/spark/conversion/cast_int_to_timestamp.slt
    • Added new SQL logic tests to verify the functionality of spark_cast for integer to timestamp conversions.
    • Included tests for different integer sizes (Int8, Int16, Int32, Int64), null handling, boundary values, negative timestamps, and error cases for unsupported source and target types.
Activity
  • The pull request was marked for review by AI.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@augmentcode
Copy link
Copy Markdown

augmentcode bot commented Mar 5, 2026

🤖 Augment PR Summary

Summary: Adds a Spark-compatible spark_cast scalar UDF to support CAST-style conversions using a string target type.

Changes:

  • Introduced SparkCast UDF (datafusion/spark/src/function/conversion/cast.rs) with a 2-arg signature: value + target type string.
  • Implemented the first conversion: Int8/Int16/Int32/Int64 (arrays and scalars) → TIMESTAMP at microsecond precision, interpreting integers as seconds since Unix epoch.
  • Added saturating overflow behavior when scaling seconds to microseconds.
  • Hooked the new function into the Spark conversion module, exported it via expr_fn, and registered it in functions().
  • Added Rust unit tests for nulls, negatives, overflow saturation, and timezone propagation.
  • Added SQLLogicTest coverage for core conversions plus expected error cases for unsupported source/target types.

Technical Notes: The implementation currently targets Spark’s integer→timestamp semantics and is structured to allow adding more cast targets over time.

🤖 Was this summary useful? React with 👍 or 👎

Copy link
Copy Markdown

@augmentcode augmentcode bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review completed. 4 suggestions posted.

Fix All in Augment

Comment augment review to trigger a new review at any time.

}
}

Ok(Arc::new(builder.finish().with_timezone_opt(timezone)))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cast_int_to_timestamp sets timezone metadata on the produced array (with_timezone_opt(timezone)), but spark_cast currently plans a return type of Timestamp(Microsecond, None) (see parse_target_type / return_field_from_args). If the execution result’s DataType doesn’t match the planned field type, this can cause schema/type validation failures when building batches or applying further casts.

Severity: high

Other Locations
  • datafusion/spark/src/function/conversion/cast.rs:196
  • datafusion/spark/src/function/conversion/cast.rs:227

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback: The Augment AI reviewer is correct! The UDF's return type says that the timezone is always NULL/None, but actually it is either the one specified in the session or UTC, as default. The mismatch may lead to problems in any other UDFs which consume the result of the "spark_cast()" function.

.unwrap_or_else(|| Arc::from("UTC"));

match target_type {
DataType::Timestamp(TimeUnit::Microsecond, None) => {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

invoke_with_args only dispatches on DataType::Timestamp(TimeUnit::Microsecond, None); if planning/coercion produces a timestamp with a timezone (e.g. Some("+00")/Some("UTC")), this will error even though it’s still a microsecond timestamp target. This strict match also makes future support for timezone-aware timestamp return types harder.

Severity: medium

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback: The Augment AI reviewer is correct! The UDF's return type says that the timezone is always NULL/None, but actually it is either the one specified in the session or UTC, as default. The mismatch may lead to problems in any other UDFs which consume the result of the "spark_cast()" function.

// Second arg: target datatype as Utf8 string literal (ex : 'timestamp')
signature: Signature::one_of(
vec![TypeSignature::Any(2)],
Volatility::Immutable,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The UDF is declared Volatility::Immutable, but invoke_with_args reads args.config_options.execution.time_zone and uses it to influence the returned timezone metadata. That makes results session-dependent, which can affect optimizations that assume immutability (e.g. constant folding/caching).

Severity: medium

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:usable; category:bug; feedback: The Augment AI reviewer is correct! Since the function result depends on external information (the session's timezone) it should be Mutable. Changing the timezone between two calls of the UDF will lead to different results.

) -> DataFusionResult<DataType> {
let [_, type_arg] = take_function_args("spark_cast", scalar_args)?;

match type_arg {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_target_type_from_scalar_args accepts ScalarValue::Utf8 / LargeUtf8 but not Utf8View, while other Spark functions accept Utf8View for string literals. If the planner/literal ends up as Utf8View, spark_cast would reject a valid string target type.

Severity: low

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:good-to-have; category:bug; feedback: The Augment AI reviewer is correct! All three String types could be supported, so there is no reason to exclude Utf8View. It will make the spark_cast UDF more usable

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
datafusion/spark/src/function/conversion/cast.rs (1)

505-526: Add one positive return_field_from_args test case.

You already test the error path; adding a success assertion for "timestamp" would lock down planner-facing return-field behavior.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@datafusion/spark/src/function/conversion/cast.rs` around lines 505 - 526, Add
a positive unit test that exercises SparkCast::return_field_from_args with a
supported target type "timestamp": instantiate SparkCast via SparkCast::new(),
build ReturnFieldArgs with an integer arg FieldRef (as in the existing test) and
a scalar_arguments entry Some(&ScalarValue::Timestamp(...)) for the target, call
cast.return_field_from_args(return_field_args), assert the Result is Ok, and
verify the returned Field/FieldRef has the Timestamp DataType (and nullability)
you expect; reference SparkCast::new, return_field_from_args, ReturnFieldArgs,
and ScalarValue::Timestamp when locating code to modify.
datafusion/sqllogictest/test_files/spark/conversion/cast_int_to_timestamp.slt (1)

56-66: Consider adding SQL-level overflow/timezone parity cases.

Rust tests cover saturation and timezone behavior, but this sqllogictest file currently validates only default-path SQL execution. Adding one bigint overflow case and one non-UTC session-timezone case would harden end-to-end coverage.

Also applies to: 116-128

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@datafusion/sqllogictest/test_files/spark/conversion/cast_int_to_timestamp.slt`
around lines 56 - 66, Add two end-to-end SQL checks to the
cast_int_to_timestamp.slt tests: one overflow/saturation case that calls
spark_cast with a very large bigint (e.g., a 64-bit max/min sentinel) and
asserts the expected saturated timestamp value used in the Rust tests, and one
session-timezone parity case that sets the SQL session time zone (e.g., SET TIME
ZONE 'America/Los_Angeles';) then runs SELECT spark_cast(1704067200::bigint,
'timestamp'); and asserts the timezone-shifted timestamp string — reference the
spark_cast calls in this file to locate where to insert these additional queries
so they mirror the Rust coverage for saturation and timezone behavior.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@datafusion/spark/src/function/conversion/cast.rs`:
- Around line 155-161: The return_field_from_args currently builds a Field whose
Timestamp type has no timezone but runtime RecordBatches use
Timestamp(Microsecond, Some(session_tz)), causing schema mismatch; update
return_field_from_args (and any helper that constructs target types such as
get_target_type_from_scalar_args) to include the session timezone when
constructing DataType::Timestamp(Microsecond, ...), ensuring the timezone
parameter is set to Some(session_tz) (or otherwise mirror whatever session_tz
the execution path uses) so the declared Field matches the runtime-produced
Timestamp types across the affected sites (return_field_from_args and the logic
around lines that produce runtime timestamps).

---

Nitpick comments:
In `@datafusion/spark/src/function/conversion/cast.rs`:
- Around line 505-526: Add a positive unit test that exercises
SparkCast::return_field_from_args with a supported target type "timestamp":
instantiate SparkCast via SparkCast::new(), build ReturnFieldArgs with an
integer arg FieldRef (as in the existing test) and a scalar_arguments entry
Some(&ScalarValue::Timestamp(...)) for the target, call
cast.return_field_from_args(return_field_args), assert the Result is Ok, and
verify the returned Field/FieldRef has the Timestamp DataType (and nullability)
you expect; reference SparkCast::new, return_field_from_args, ReturnFieldArgs,
and ScalarValue::Timestamp when locating code to modify.

In
`@datafusion/sqllogictest/test_files/spark/conversion/cast_int_to_timestamp.slt`:
- Around line 56-66: Add two end-to-end SQL checks to the
cast_int_to_timestamp.slt tests: one overflow/saturation case that calls
spark_cast with a very large bigint (e.g., a 64-bit max/min sentinel) and
asserts the expected saturated timestamp value used in the Rust tests, and one
session-timezone parity case that sets the SQL session time zone (e.g., SET TIME
ZONE 'America/Los_Angeles';) then runs SELECT spark_cast(1704067200::bigint,
'timestamp'); and asserts the timezone-shifted timestamp string — reference the
spark_cast calls in this file to locate where to insert these additional queries
so they mirror the Rust coverage for saturation and timezone behavior.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: d8f5a737-7a81-40ca-8e9b-c2ef844e05d5

📥 Commits

Reviewing files that changed from the base of the PR and between d784d60 and 91f202a.

📒 Files selected for processing (3)
  • datafusion/spark/src/function/conversion/cast.rs
  • datafusion/spark/src/function/conversion/mod.rs
  • datafusion/sqllogictest/test_files/spark/conversion/cast_int_to_timestamp.slt

Comment on lines +155 to +161
fn return_field_from_args(
&self,
args: ReturnFieldArgs,
) -> DataFusionResult<FieldRef> {
let nullable = args.arg_fields.iter().any(|f| f.is_nullable());
let target_type = get_target_type_from_scalar_args(args.scalar_arguments)?;
Ok(Arc::new(Field::new(self.name(), target_type, nullable)))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "1) Inspect SparkCast return-field vs runtime produced timestamp types"
rg -n --type rust -C3 \
  'return_field_from_args|DataType::Timestamp\(TimeUnit::Microsecond, None\)|with_timezone_opt\(|ScalarValue::TimestampMicrosecond\(' \
  datafusion/spark/src/function/conversion/cast.rs

echo
echo "2) Inspect ScalarUDFImpl contract and scalar function execution plumbing"
rg -n --type rust -C4 \
  'trait ScalarUDFImpl|return_field_from_args|invoke_with_args|ScalarFunctionArgs|return_field' \
  datafusion | head -200

echo
echo "3) Inspect batch/schema construction checks for datatype equality"
rg -n --type rust -C3 \
  'RecordBatch::try_new|schema' \
  datafusion/core/src/physical_plan/scalar_expr.rs | head -150

Repository: martin-augment/datafusion

Length of output: 1889


🌐 Web query:

DataFusion ScalarUDFImpl return_field_from_args contract type matching

💡 Result:

In DataFusion’s ScalarUDFImpl::return_field_from_args(ReturnFieldArgs), the type contract you can rely on is args.arg_fields, not args.scalar_arguments.

  • ReturnFieldArgs contains:

    • arg_fields: &[FieldRef] — the (planned) argument fields/types
    • scalar_arguments: &[Option<&ScalarValue>] — any constant literal arguments (if an argument is not a literal, it’s None) [1]
  • scalar_arguments[i] is not guaranteed to have the same DataType as arg_fields[i]. This is a known/reported behavior: with coercible signatures, arg_fields[i] can reflect the coerced type (e.g., Int16) while scalar_arguments[i] still carries the original literal type (e.g., Int64) [2].

  • If you need to branch on a literal’s value inside return_field_from_args (e.g., arrow_cast(x, 'Int16') -> Int16), use scalar_arguments for the value, but treat arg_fields as the authoritative types and be prepared to manually cast/interpret the ScalarValue to arg_fields[i].data_type() yourself [1][2].

Also note the documented determinism requirement: return_field_from_args must return the same output type for the same logical input even if expressions are simplified/rewritten during optimization [3].

Sources:
[1] DataFusion ReturnFieldArgs docs (fields and meaning) https://docs.rs/datafusion-expr/latest/datafusion_expr/struct.ReturnFieldArgs.html
[2] DataFusion issue report showing mismatch between arg_fields and scalar_arguments types https://www.mail-archive.com/github%40datafusion.apache.org/msg98861.html
[3] DataFusion ScalarUDFImpl::return_field_from_args requirements/docs https://docs.rs/datafusion-expr/latest/datafusion_expr/trait.ScalarUDFImpl.html


🌐 Web query:

DataFusion timestamp timezone Arrow ScalarValue TimestampMicrosecond

💡 Result:

In Arrow (and therefore DataFusion), a Timestamp(Microsecond, tz) is a 64-bit integer “microseconds since Unix epoch”, plus optional timezone metadata:

  • tz = Some("UTC") (or any non-empty tz) means the values represent an absolute instant relative to the Unix epoch and can be compared/ordered meaningfully across systems. Changing from one non-empty timezone string to another is metadata-only (no value change). [1]
  • tz = None / empty timezone means the values are “wall clock” timestamps in an unknown timezone (not equivalent to UTC), and Arrow treats this as different semantics from timestamp-with-timezone. [1]

What this means for ScalarValue::TimestampMicrosecond

In DataFusion’s Rust API, ScalarValue::TimestampMicrosecond(i64, Option<String>) carries that same timezone metadata. If you mix Some("UTC") and None you can hit coercion errors (DataFusion/Arrow can’t find a common type), e.g. comparisons like Timestamp(..., Some("UTC")) >= Timestamp(..., None) failing. [2]

Practical fixes

  • Make both sides the same Arrow datatype, including timezone (either both None, or both Some("UTC") / same tz). This often means explicitly casting or ensuring your schema uses the intended tz.
  • If you’re creating timestamps from strings in SQL, be aware DataFusion’s to_timestamp* functions use the session time zone (and to_timestamp returns Timestamp(..., TimeZone) tied to that session setting). You can set it via SET datafusion.execution.time_zone = '...'; (or SET TIMEZONE = '...'; depending on context). [3]

Sources: Arrow timestamp/timezone semantics [1], DataFusion coercion failure example with tz vs no-tz [2], DataFusion session time zone + to_timestamp behavior [3].


🌐 Web query:

DataFusion UDF return type schema validation runtime check

💡 Result:

In Apache Arrow DataFusion, a UDF’s declared return type/field is part of the logical plan schema, and DataFusion enforces schema consistency in two places:

  • Planning-time (schema validation): DataFusion’s invariants require that the physical plan schema produced by planning must exactly match the logical plan’s derived physical schema, and planners must return an error if a physical function’s derived schema doesn’t match the logical function’s derived schema. This applies even for plans as simple as Projection f(c1, c2). [1]

  • Execution-time (runtime expectation): DataFusion also requires that every produced RecordBatch schema equals the physical plan schema. If your UDF implementation actually returns arrays/batches whose Arrow DataType/schema doesn’t match what was planned, execution will fail (typically when operators rely on the planned schema for type-specific behavior/downcasting). [1]

For Scalar UDFs, the planned return schema comes from your implementation via:

  • ScalarUDFImpl::return_type (type-only), or
  • ScalarUDFImpl::return_field_from_args (full Field, including metadata/nullability).
    If you implement return_field_from_args, DataFusion expects return_type to error (to avoid inconsistent definitions). [2]

There is a config to skip a specific schema check, but it’s only for Aggregate input schema verification (datafusion.execution.skip_physical_aggregate_schema_check), not a general “skip UDF return type validation” switch. [3]

Sources:
[1] DataFusion invariants (planning-time schema equivalence + batch schema requirement) https://datafusion.apache.org/contributor-guide/specification/invariants.html
[2] ScalarUDF / ScalarUDFImpl return type vs return field API notes https://docs.rs/datafusion/latest/datafusion/logical_expr/struct.ScalarUDF.html
[3] DataFusion config: skip_physical_aggregate_schema_check https://datafusion.apache.org/user-guide/configs.html


Fix timestamp type mismatch: return-field timezone must match runtime-produced values.

Line 161 declares Timestamp(Microsecond, None), but lines 180, 227 produce Timestamp(Microsecond, Some(session_tz)) at runtime. DataFusion enforces exact schema parity between declared return type and produced RecordBatch; a timezone mismatch violates this invariant and will fail at execution time. The return field must either declare the session timezone consistently or be updated to match the actual output.

Applies to: lines 155–161, 169–180, 194–197, 227–229.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@datafusion/spark/src/function/conversion/cast.rs` around lines 155 - 161, The
return_field_from_args currently builds a Field whose Timestamp type has no
timezone but runtime RecordBatches use Timestamp(Microsecond, Some(session_tz)),
causing schema mismatch; update return_field_from_args (and any helper that
constructs target types such as get_target_type_from_scalar_args) to include the
session timezone when constructing DataType::Timestamp(Microsecond, ...),
ensuring the timezone parameter is set to Some(session_tz) (or otherwise mirror
whatever session_tz the execution path uses) so the declared Field matches the
runtime-produced Timestamp types across the affected sites
(return_field_from_args and the logic around lines that produce runtime
timestamps).

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback: The CodeRabbit AI reviewer is correct! The UDF's return type says that the timezone is always NULL/None, but actually it is either the one specified in the session or UTC, as default. The mismatch may lead to problems in any other UDFs which consume the result of the "spark_cast()" function.

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new spark_cast user-defined function to provide Spark-compatible casting, initially supporting integer types to timestamps. The implementation is well-structured with comprehensive documentation and extensive unit and integration tests. My review focuses on improving code clarity and idiomatic Rust usage in a few areas. I've suggested refactoring to use iterators for array processing, simplifying timezone handling, and making scalar value matching more concise. These changes should enhance maintainability without altering functionality.

Comment on lines +122 to +135
let arr = array.as_primitive::<T>();
let mut builder = TimestampMicrosecondBuilder::with_capacity(arr.len());

for i in 0..arr.len() {
if arr.is_null(i) {
builder.append_null();
} else {
// spark saturates to i64 min/max
let micros = secs_to_micros(arr.value(i).into());
builder.append_value(micros);
}
}

Ok(Arc::new(builder.finish().with_timezone_opt(timezone)))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The manual iteration and building of the TimestampMicrosecondArray can be expressed more idiomatically and concisely using iterators and collect. This improves readability and reduces boilerplate code.

    let arr = array.as_primitive::<T>();
    let result: arrow::array::TimestampMicrosecondArray = arr
        .iter()
        .map(|v| v.map(|v| secs_to_micros(v.into())))
        .collect();

    Ok(Arc::new(result.with_timezone_opt(timezone)))

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:good-to-have; category:bug; feedback: The Gemini AI reviewer is correct! The proposed solution is more idiomatic Rust that is easier to read.

Comment on lines +170 to +176
let session_tz: Arc<str> = args
.config_options
.execution
.time_zone
.clone()
.map(|s| Arc::from(s.as_str()))
.unwrap_or_else(|| Arc::from("UTC"));
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This logic for getting the session timezone can be simplified using as_deref and into, which makes it more concise and avoids an unnecessary clone.

        let session_tz: Arc<str> = args
            .config_options
            .execution
            .time_zone
            .as_deref()
            .unwrap_or("UTC")
            .into();

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:good-to-have; category:bug; feedback: The Gemini AI reviewer is correct! This would prevent the String cloning (memory allocation).

Comment on lines +213 to +226
let micros = match scalar {
ScalarValue::Null
| ScalarValue::Int8(None)
| ScalarValue::Int16(None)
| ScalarValue::Int32(None)
| ScalarValue::Int64(None) => None,
ScalarValue::Int8(Some(v)) => Some(secs_to_micros((*v).into())),
ScalarValue::Int16(Some(v)) => Some(secs_to_micros((*v).into())),
ScalarValue::Int32(Some(v)) => Some(secs_to_micros((*v).into())),
ScalarValue::Int64(Some(v)) => Some(secs_to_micros(*v)),
other => {
return exec_err!("Unsupported cast from {:?} to timestamp", other);
}
};
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The match statement for handling scalar integer values can be simplified by matching on Option<T> directly and using map to apply the conversion. This reduces repetition and improves readability.

            let micros = match scalar {
                ScalarValue::Int8(v) => v.map(|v| secs_to_micros(v.into())),
                ScalarValue::Int16(v) => v.map(|v| secs_to_micros(v.into())),
                ScalarValue::Int32(v) => v.map(|v| secs_to_micros(v.into())),
                ScalarValue::Int64(v) => v.map(secs_to_micros),
                ScalarValue::Null => None,
                other => {
                    return exec_err!("Unsupported cast from {:?} to timestamp", other);
                }
            };

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:good-to-have; category:bug; feedback: The Gemini AI reviewer is correct! The code could be simplified by using Option::map(). This is just more idiomatic style, not a functional improvement.

@martin-augment
Copy link
Copy Markdown
Owner Author

Bug: Return type mismatch between planning and execution

In return_field_from_args, the return field is declared as Timestamp(Microsecond, None) (no timezone). But invoke_with_args always attaches a timezone (defaulting to UTC). Arrow/DataFusion treat Timestamp(Microsecond, None) and Timestamp(Microsecond, Some(tz)) as semantically different types. The query planner sees timezone-naive output but the executor produces timezone-aware output, which can cause downstream type mismatches. Either encode the timezone into the return field during planning (see how from_utc_timestamp handles this in return_field_from_args), or produce a consistent timezone-naive result from both planning and execution.

value:useful; category:bug; feedback: The Claude AI reviewer is correct! The UDF's return type says that the timezone is always NULL/None, but actually it is either the one specified in the session or UTC, as default. The mismatch may lead to problems in any other UDFs which consume the result of the "spark_cast()" function.

@martin-augment
Copy link
Copy Markdown
Owner Author

Signature uses TypeSignature::Any(2) -- inconsistent with codebase conventions

This is the only function in the spark crate using TypeSignature::Any. Every other function (e.g., from_utc_timestamp, trunc, date_trunc) uses Signature::coercible with Coercion-based type constraints. TypeSignature::Any(2) bypasses the query planner's type coercion system entirely, so invalid inputs like spark_cast(1.5::float, 'timestamp') pass planning and only fail at execution with a less-helpful error. Consider using explicit coercions per supported integer type, consistent with how other functions declare their signatures in this codebase.

value:good-to-have; category:bug; feedback: The Claude AI reviewer is correct! Using Signature::Any(2) would allow any kind of arguments passed to the function. This will require checking the arguments in the invoke_with_args(), and return_type_with_args(). It would be better to use a more concrete/strict signature types, e.g. the first argument should be a logical_integer and the second one - a logical string. This way there won't be a need to check the arguments in every method that accepts them.

@martin-augment
Copy link
Copy Markdown
Owner Author

Nullable logic should only consider the value argument

args.arg_fields.iter().any(|f| f.is_nullable()) includes the second argument (the string literal). Output nullability should reflect only the first argument: args.arg_fields[0].is_nullable().

value:annoying; category:bug; feedback: The Claude AI reviewer is not correct! If the second argument is NULL then there is no information what is the target type and no casting could be performed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants