-
Notifications
You must be signed in to change notification settings - Fork 0
18615: feat: Make extract SQL expression timezone aware. #16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,16 +19,23 @@ use std::any::Any; | |
| use std::str::FromStr; | ||
| use std::sync::Arc; | ||
|
|
||
| use arrow::array::{Array, ArrayRef, Float64Array, Int32Array}; | ||
| use arrow::array::timezone::Tz; | ||
| use arrow::array::{Array, ArrayRef, Float64Array, Int32Array, PrimitiveBuilder}; | ||
| use arrow::compute::kernels::cast_utils::IntervalUnit; | ||
| use arrow::compute::{binary, date_part, DatePart}; | ||
| use arrow::datatypes::DataType::{ | ||
| Date32, Date64, Duration, Interval, Time32, Time64, Timestamp, | ||
| }; | ||
| use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second}; | ||
| use arrow::datatypes::{DataType, Field, FieldRef, TimeUnit}; | ||
| use arrow::datatypes::{ | ||
| ArrowTimestampType, DataType, Field, FieldRef, TimeUnit, TimestampMicrosecondType, | ||
| TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, | ||
| }; | ||
|
|
||
| use datafusion_common::cast::as_primitive_array; | ||
| use datafusion_common::types::{logical_date, NativeType}; | ||
|
|
||
| use super::adjust_to_local_time; | ||
| use datafusion_common::{ | ||
| cast::{ | ||
| as_date32_array, as_date64_array, as_int32_array, as_time32_millisecond_array, | ||
|
|
@@ -56,7 +63,7 @@ use datafusion_macros::user_doc; | |
| argument( | ||
| name = "part", | ||
| description = r#"Part of the date to return. The following date parts are supported: | ||
|
|
||
| - year | ||
| - quarter (emits value in inclusive range [1, 4] based on which quartile of the year the date is in) | ||
| - month | ||
|
|
@@ -124,7 +131,7 @@ impl DatePartFunc { | |
| ], | ||
| Volatility::Immutable, | ||
| ), | ||
| aliases: vec![String::from("datepart")], | ||
| aliases: vec![String::from("datepart"), String::from("extract")], | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -173,6 +180,7 @@ impl ScalarUDFImpl for DatePartFunc { | |
| &self, | ||
| args: datafusion_expr::ScalarFunctionArgs, | ||
| ) -> Result<ColumnarValue> { | ||
| let config = &args.config_options; | ||
| let args = args.args; | ||
| let [part, array] = take_function_args(self.name(), args)?; | ||
|
|
||
|
|
@@ -193,12 +201,83 @@ impl ScalarUDFImpl for DatePartFunc { | |
| ColumnarValue::Scalar(scalar) => scalar.to_array()?, | ||
| }; | ||
|
|
||
| let (is_timezone_aware, tz_str_opt) = match array.data_type() { | ||
| Timestamp(_, Some(tz_str)) => (true, Some(Arc::clone(tz_str))), | ||
| _ => (false, None), | ||
| }; | ||
|
|
||
| let part_trim = part_normalization(&part); | ||
| let is_epoch = is_epoch(&part); | ||
|
|
||
| // Epoch is timezone-independent - it always returns seconds since 1970-01-01 UTC | ||
| let array = if is_epoch { | ||
| array | ||
| } else if is_timezone_aware { | ||
| // For timezone-aware timestamps, extract in their own timezone | ||
| match tz_str_opt.as_ref() { | ||
| Some(tz_str) => { | ||
| let tz = match tz_str.parse::<Tz>() { | ||
| Ok(tz) => tz, | ||
| Err(_) => return exec_err!("Invalid timezone"), | ||
| }; | ||
| match array.data_type() { | ||
| Timestamp(time_unit, _) => match time_unit { | ||
| Nanosecond => adjust_timestamp_array::< | ||
| TimestampNanosecondType, | ||
| >(&array, tz)?, | ||
| Microsecond => adjust_timestamp_array::< | ||
| TimestampMicrosecondType, | ||
| >(&array, tz)?, | ||
| Millisecond => adjust_timestamp_array::< | ||
| TimestampMillisecondType, | ||
| >(&array, tz)?, | ||
| Second => { | ||
| adjust_timestamp_array::<TimestampSecondType>(&array, tz)? | ||
| } | ||
| }, | ||
| _ => array, | ||
| } | ||
| } | ||
| None => array, | ||
| } | ||
| } else if let Timestamp(time_unit, None) = array.data_type() { | ||
| // For naive timestamps, interpret in session timezone if available | ||
| match config.execution.time_zone.as_ref() { | ||
| Some(tz_str) => { | ||
| let tz = match tz_str.parse::<Tz>() { | ||
| Ok(tz) => tz, | ||
| Err(_) => return exec_err!("Invalid timezone"), | ||
| }; | ||
|
|
||
| match time_unit { | ||
| Nanosecond => { | ||
| adjust_timestamp_array::<TimestampNanosecondType>(&array, tz)? | ||
| } | ||
| Microsecond => { | ||
| adjust_timestamp_array::<TimestampMicrosecondType>( | ||
| &array, tz, | ||
| )? | ||
| } | ||
| Millisecond => { | ||
| adjust_timestamp_array::<TimestampMillisecondType>( | ||
| &array, tz, | ||
| )? | ||
| } | ||
| Second => { | ||
| adjust_timestamp_array::<TimestampSecondType>(&array, tz)? | ||
| } | ||
| } | ||
| } | ||
| None => array, | ||
| } | ||
| } else { | ||
| array | ||
| }; | ||
|
|
||
| // using IntervalUnit here means we hand off all the work of supporting plurals (like "seconds") | ||
| // and synonyms ( like "ms,msec,msecond,millisecond") to Arrow | ||
| let arr = if let Ok(interval_unit) = IntervalUnit::from_str(part_trim) { | ||
| match interval_unit { | ||
| let extracted = match interval_unit { | ||
| IntervalUnit::Year => date_part(array.as_ref(), DatePart::Year)?, | ||
| IntervalUnit::Month => date_part(array.as_ref(), DatePart::Month)?, | ||
| IntervalUnit::Week => date_part(array.as_ref(), DatePart::Week)?, | ||
|
|
@@ -209,8 +288,42 @@ impl ScalarUDFImpl for DatePartFunc { | |
| IntervalUnit::Millisecond => seconds_as_i32(array.as_ref(), Millisecond)?, | ||
| IntervalUnit::Microsecond => seconds_as_i32(array.as_ref(), Microsecond)?, | ||
| IntervalUnit::Nanosecond => seconds_as_i32(array.as_ref(), Nanosecond)?, | ||
| // century and decade are not supported by `DatePart`, although they are supported in postgres | ||
| _ => return exec_err!("Date part '{part}' not supported"), | ||
| }; | ||
|
|
||
| // For fixed offsets (like +04:00, -05:30), apply the offset to extract values | ||
| if is_timezone_aware { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For timezone-aware timestamps, the array is already adjusted to local time via 🤖 Was this useful? React with 👍 or 👎 |
||
| if let Some(tz_str) = tz_str_opt.as_ref() { | ||
| let tz_str = tz_str.as_ref(); | ||
| if is_fixed_offset(tz_str) { | ||
| if let Some(offset_info) = extract_offset_components(tz_str) { | ||
| match interval_unit { | ||
| IntervalUnit::Hour => apply_hour_offset( | ||
| extracted.as_ref(), | ||
| offset_info.hours, | ||
| offset_info.minutes, | ||
| )?, | ||
| IntervalUnit::Minute => apply_minute_offset( | ||
| extracted.as_ref(), | ||
| offset_info.minutes, | ||
| )?, | ||
| IntervalUnit::Day => apply_day_offset( | ||
| extracted.as_ref(), | ||
| offset_info.hours, | ||
| )?, | ||
| _ => extracted, | ||
| } | ||
| } else { | ||
| extracted | ||
| } | ||
| } else { | ||
| extracted | ||
| } | ||
| } else { | ||
| extracted | ||
| } | ||
| } else { | ||
| extracted | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Double Timezone Offset Corrupts DataThe timezone offset is being applied twice for fixed offset timezones. The timestamp is first adjusted to local time via |
||
| } | ||
| } else { | ||
| // special cases that can be extracted (in postgres) but are not interval units | ||
|
|
@@ -240,23 +353,129 @@ impl ScalarUDFImpl for DatePartFunc { | |
| } | ||
| } | ||
|
|
||
| fn adjust_timestamp_array<T: ArrowTimestampType>( | ||
| array: &ArrayRef, | ||
| tz: Tz, | ||
| ) -> Result<ArrayRef> { | ||
| let mut builder = PrimitiveBuilder::<T>::new(); | ||
| let primitive_array = as_primitive_array::<T>(array)?; | ||
| for ts_opt in primitive_array.iter() { | ||
| match ts_opt { | ||
| None => builder.append_null(), | ||
| Some(ts) => { | ||
| let adjusted_ts = adjust_to_local_time::<T>(ts, tz)?; | ||
| builder.append_value(adjusted_ts); | ||
| } | ||
| } | ||
| } | ||
| Ok(Arc::new(builder.finish())) | ||
| } | ||
|
|
||
| fn is_epoch(part: &str) -> bool { | ||
| let part = part_normalization(part); | ||
| matches!(part.to_lowercase().as_str(), "epoch") | ||
| } | ||
|
|
||
| // Try to remove quote if exist, if the quote is invalid, return original string and let the downstream function handle the error | ||
| // Check if a timezone string is a fixed offset | ||
| fn is_fixed_offset(tz_str: &str) -> bool { | ||
| tz_str.starts_with('+') || tz_str.starts_with('-') | ||
| } | ||
|
|
||
| // Holds the components of a timezone offset (hours and minutes). | ||
| struct OffsetInfo { | ||
| hours: i32, | ||
| minutes: i32, | ||
| } | ||
|
|
||
| // Extracts the offset components from a timezone string like "+04:00" or "-05:30". | ||
| fn extract_offset_components(tz_str: &str) -> Option<OffsetInfo> { | ||
| if tz_str.len() < 6 { | ||
| return None; | ||
| } | ||
|
|
||
| let sign = match &tz_str[0..1] { | ||
| "+" => 1, | ||
| "-" => -1, | ||
| _ => return None, | ||
| }; | ||
|
|
||
| let hours: i32 = tz_str[1..3].parse().ok()?; | ||
| let minutes: i32 = tz_str[4..6].parse().ok()?; | ||
|
|
||
| Some(OffsetInfo { | ||
| hours: sign * hours, | ||
| minutes: sign * minutes, | ||
| }) | ||
| } | ||
|
Comment on lines
+379
to
+409
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chainVerify fixed-offset timezone format handling. The Potential concerns:
🏁 Script executed: #!/bin/bash
# Check what timezone string formats Arrow's Tz type accepts and produces
echo "Verifying Arrow timezone string formats..."
rg -n "parse::<Tz>" --type rust -A 2 -B 2
echo ""
echo "Check Arrow timezone documentation for valid formats"Length of output: 1959 [/run_scripts] <web_search> </web_search> 🏁 Script executed: #!/bin/bash
# Find where Tz is imported from and check its definition
echo "=== Finding Tz import in date_part.rs ==="
head -50 datafusion/functions/src/datetime/date_part.rs | grep -E "^use|^extern"
echo ""
echo "=== Searching for Tz type definition ==="
rg -n "^use.*\bTz\b" --type rust datafusion/functions/src/datetime/
echo ""
echo "=== Check if extract_offset_components is actually used ==="
rg -n "extract_offset_components" --type rust
echo ""
echo "=== Look for timezone format patterns in tests/examples ==="
rg -n "\\+[0-9]|\\-[0-9]" --type rust -B2 -A2 datafusion/functions/src/datetime/ | head -30Length of output: 3866 🌐 Web query: 💡 Result: Apache Arrow accepts two kinds of timezone strings for the Timestamp (Tz) type:
References: [1] Arrow FlatBuffers Timestamp docs (timezone accepted forms). [2] arrow-rs datatype docs (fixed-offset parsing examples). [3] Arrow Rust/schema & Python docs (timestamp timezone semantics). [/run_scripts] 🏁 Script executed: #!/bin/bash
# Check line 299 context where extract_offset_components is called
echo "=== Context around line 299 where extract_offset_components is called ==="
sed -n '290,310p' datafusion/functions/src/datetime/date_part.rs
echo ""
echo "=== Look for any timezone format tests ==="
rg -n "\\+[0-9][0-9]" --type rust datafusion/functions/src/datetime/ -B1 -A1 | head -40Length of output: 4975 Fix The function rejects valid timezone formats that Arrow accepts. Test cases in the codebase (date_trunc.rs) use formats like Update the parsing logic to handle:
Consider using a regex or more flexible parsing approach, and add explicit error handling instead of silently returning |
||
|
|
||
| // Applies the timezone offset to hour values in an array. | ||
| fn apply_hour_offset( | ||
| array: &dyn Array, | ||
| offset_hours: i32, | ||
| offset_minutes: i32, | ||
| ) -> Result<ArrayRef> { | ||
| let hour_array = as_int32_array(array)?; | ||
| let result: Int32Array = hour_array | ||
| .iter() | ||
| .map(|hour| { | ||
| hour.map(|h| { | ||
| let mut adjusted = h + offset_hours; | ||
| if offset_minutes.abs() >= 30 { | ||
| adjusted += if offset_minutes > 0 { 1 } else { -1 }; | ||
| } | ||
| ((adjusted % 24) + 24) % 24 | ||
| }) | ||
| }) | ||
| .collect(); | ||
| Ok(Arc::new(result)) | ||
| } | ||
|
|
||
| // Applies the timezone offset to minute values in an array. | ||
| fn apply_minute_offset(array: &dyn Array, offset_minutes: i32) -> Result<ArrayRef> { | ||
| let minute_array = as_int32_array(array)?; | ||
| let result: Int32Array = minute_array | ||
| .iter() | ||
| .map(|minute| { | ||
| minute.map(|m| { | ||
| let adjusted = m + offset_minutes; | ||
| ((adjusted % 60) + 60) % 60 | ||
| }) | ||
| }) | ||
| .collect(); | ||
| Ok(Arc::new(result)) | ||
| } | ||
|
Comment on lines
+433
to
+446
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar offset direction concern for minutes. This function has the same potential logic issue as 🤖 Prompt for AI Agents |
||
|
|
||
| // Applies the timezone offset to day values in an array. | ||
| fn apply_day_offset(array: &dyn Array, offset_hours: i32) -> Result<ArrayRef> { | ||
| let day_array = as_int32_array(array)?; | ||
| let result: Int32Array = day_array | ||
| .iter() | ||
| .map(|day| { | ||
| day.map(|d| { | ||
| if offset_hours >= 24 || offset_hours <= -24 { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
🤖 Was this useful? React with 👍 or 👎 |
||
| d + (offset_hours / 24) | ||
| } else if offset_hours > 0 { | ||
| d + 1 | ||
| } else if offset_hours < 0 { | ||
| d - 1 | ||
| } else { | ||
| d | ||
| } | ||
| }) | ||
| }) | ||
| .collect(); | ||
| Ok(Arc::new(result)) | ||
| } | ||
|
Comment on lines
+448
to
+468
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Day offset logic appears oversimplified. The
For example:
The current logic would treat both identically, which is incorrect. Consider calculating the day adjustment more precisely based on the actual hour value combined with the offset. |
||
|
|
||
| // Try to remove quotes if they exist. If the quotes are invalid, return original string. | ||
| fn part_normalization(part: &str) -> &str { | ||
| part.strip_prefix(|c| c == '\'' || c == '\"') | ||
| .and_then(|s| s.strip_suffix(|c| c == '\'' || c == '\"')) | ||
| .unwrap_or(part) | ||
| } | ||
|
|
||
| /// Invoke [`date_part`] on an `array` (e.g. Timestamp) and convert the | ||
| /// result to a total number of seconds, milliseconds, microseconds or | ||
| /// nanoseconds | ||
| // Converts seconds to i32 with the specified time unit. | ||
| fn seconds_as_i32(array: &dyn Array, unit: TimeUnit) -> Result<ArrayRef> { | ||
| // Nanosecond is neither supported in Postgres nor DuckDB, to avoid dealing | ||
| // with overflow and precision issue we don't support nanosecond | ||
| if unit == Nanosecond { | ||
| return not_impl_err!("Date part {unit:?} not supported"); | ||
|
|
@@ -277,7 +496,6 @@ fn seconds_as_i32(array: &dyn Array, unit: TimeUnit) -> Result<ArrayRef> { | |
| }; | ||
|
|
||
| let secs = date_part(array, DatePart::Second)?; | ||
| // This assumes array is primitive and not a dictionary | ||
| let secs = as_int32_array(secs.as_ref())?; | ||
| let subsecs = date_part(array, DatePart::Nanosecond)?; | ||
| let subsecs = as_int32_array(subsecs.as_ref())?; | ||
|
|
@@ -305,11 +523,8 @@ fn seconds_as_i32(array: &dyn Array, unit: TimeUnit) -> Result<ArrayRef> { | |
| } | ||
| } | ||
|
|
||
| /// Invoke [`date_part`] on an `array` (e.g. Timestamp) and convert the | ||
| /// result to a total number of seconds, milliseconds, microseconds or | ||
| /// nanoseconds | ||
| /// | ||
| /// Given epoch return f64, this is a duplicated function to optimize for f64 type | ||
| // Converts seconds to f64 with the specified time unit. | ||
| // Used for Interval and Duration types that need floating-point precision. | ||
| fn seconds(array: &dyn Array, unit: TimeUnit) -> Result<ArrayRef> { | ||
| let sf = match unit { | ||
| Second => 1_f64, | ||
|
|
@@ -318,7 +533,6 @@ fn seconds(array: &dyn Array, unit: TimeUnit) -> Result<ArrayRef> { | |
| Nanosecond => 1_000_000_000_f64, | ||
| }; | ||
| let secs = date_part(array, DatePart::Second)?; | ||
| // This assumes array is primitive and not a dictionary | ||
| let secs = as_int32_array(secs.as_ref())?; | ||
| let subsecs = date_part(array, DatePart::Nanosecond)?; | ||
| let subsecs = as_int32_array(subsecs.as_ref())?; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Verify EXTRACT timezone behavior aligns with SQL standards.
The implementation extracts timezone-aware timestamps in their own timezone (line 216) and naive timestamps in the session timezone (line 244). Please confirm this matches the intended SQL semantics for EXTRACT.
In standard SQL and most databases (PostgreSQL, etc.),
EXTRACT(HOUR FROM timestamp_with_tz)typically extracts the value in the session timezone, not the timestamp's embedded timezone. The current logic may produce unexpected results.For example, with session timezone
+00:00:EXTRACT(HOUR FROM TIMESTAMP '2025-10-30 10:45:30+02:00')🏁 Script executed:
Length of output: 288
Now I need to examine the actual code to verify what
adjust_timestamp_arraydoes and whether the logic in the review matches what PostgreSQL actually does:Based on my research, when the input value is of type timestamp with time zone, operations are performed with respect to a particular time zone; by default, operations are done with respect to the current TimeZone setting. Additionally, the originally stated or assumed time zone is not retained in PostgreSQL's timestamptz type—timestamps with time zones are converted to UTC, and the original timezone is not stored.
The review comment's concern is valid. The code's logic of extracting "in the timestamp's own timezone" deviates from standard SQL semantics and PostgreSQL's behavior. When PostgreSQL extracts a field like HOUR from a timezone-aware timestamp, it does so using the session's current timezone setting, not the timestamp's (non-existent) original timezone.
Your EXTRACT timezone behavior is incorrect; extract in session timezone, not timestamp's timezone.
The code extracts timezone-aware timestamps in their embedded timezone offset (lines 218-228) rather than the session timezone as PostgreSQL does by default. PostgreSQL performs such operations with respect to the current TimeZone setting by default, not the timestamp's original timezone. The
adjust_timestamp_arraycall converts the timestamp to a specific timezone, but this differs from PostgreSQL semantics where the session timezone is used unless explicitly overridden withAT TIME ZONE.For example, with session timezone UTC:
EXTRACT(HOUR FROM '2025-10-30 10:45:30+02:00')should extract in UTC → result: 8 (since 10:45:30+02:00 = 08:45:30 UTC)Align extraction with session timezone semantics for PostgreSQL compatibility.