Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1656,6 +1656,7 @@ def _task_to_record_batches(
current_batch,
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
projected_missing_fields=projected_missing_fields,
allow_timestamp_tz_mismatch=True,
)


Expand Down Expand Up @@ -1849,13 +1850,18 @@ def _to_requested_schema(
downcast_ns_timestamp_to_us: bool = False,
include_field_ids: bool = False,
projected_missing_fields: dict[int, Any] = EMPTY_DICT,
allow_timestamp_tz_mismatch: bool = False,
) -> pa.RecordBatch:
# We could reuse some of these visitors
struct_array = visit_with_partner(
requested_schema,
batch,
ArrowProjectionVisitor(
file_schema, downcast_ns_timestamp_to_us, include_field_ids, projected_missing_fields=projected_missing_fields
file_schema,
downcast_ns_timestamp_to_us,
include_field_ids,
projected_missing_fields=projected_missing_fields,
allow_timestamp_tz_mismatch=allow_timestamp_tz_mismatch,
),
ArrowAccessor(file_schema),
)
Expand All @@ -1868,6 +1874,7 @@ class ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, pa.Array | None]
_downcast_ns_timestamp_to_us: bool
_use_large_types: bool | None
_projected_missing_fields: dict[int, Any]
_allow_timestamp_tz_mismatch: bool

def __init__(
self,
Expand All @@ -1876,12 +1883,16 @@ def __init__(
include_field_ids: bool = False,
use_large_types: bool | None = None,
projected_missing_fields: dict[int, Any] = EMPTY_DICT,
allow_timestamp_tz_mismatch: bool = False,
) -> None:
self._file_schema = file_schema
self._include_field_ids = include_field_ids
self._downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us
self._use_large_types = use_large_types
self._projected_missing_fields = projected_missing_fields
# When True, allows projecting timestamptz (UTC) to timestamp (no tz).
# Allowed for reading (aligns with Spark); disallowed for writing to enforce Iceberg spec's strict typing.
self._allow_timestamp_tz_mismatch = allow_timestamp_tz_mismatch

if use_large_types is not None:
deprecation_message(
Expand All @@ -1896,16 +1907,19 @@ def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array:
if field.field_type.is_primitive:
if (target_type := schema_to_pyarrow(field.field_type, include_field_ids=self._include_field_ids)) != values.type:
if field.field_type == TimestampType():
# Downcasting of nanoseconds to microseconds
source_tz_compatible = values.type.tz is None or (
self._allow_timestamp_tz_mismatch and values.type.tz in UTC_ALIASES
)
if (
pa.types.is_timestamp(target_type)
and not target_type.tz
and pa.types.is_timestamp(values.type)
and not values.type.tz
and source_tz_compatible
):
# Downcasting of nanoseconds to microseconds
if target_type.unit == "us" and values.type.unit == "ns" and self._downcast_ns_timestamp_to_us:
return values.cast(target_type, safe=False)
elif target_type.unit == "us" and values.type.unit in {"s", "ms"}:
elif target_type.unit == "us" and values.type.unit in {"s", "ms", "us"}:
return values.cast(target_type)
raise ValueError(f"Unsupported schema projection from {values.type} to {target_type}")
elif field.field_type == TimestamptzType():
Expand All @@ -1915,6 +1929,7 @@ def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array:
and pa.types.is_timestamp(values.type)
and (values.type.tz in UTC_ALIASES or values.type.tz is None)
):
# Downcasting of nanoseconds to microseconds
if target_type.unit == "us" and values.type.unit == "ns" and self._downcast_ns_timestamp_to_us:
return values.cast(target_type, safe=False)
elif target_type.unit == "us" and values.type.unit in {"s", "ms", "us"}:
Expand Down
101 changes: 101 additions & 0 deletions tests/io/test_pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
expression_to_pyarrow,
parquet_path_to_id_mapping,
schema_to_pyarrow,
write_file,
)
from pyiceberg.manifest import DataFile, DataFileContent, FileFormat
from pyiceberg.partitioning import PartitionField, PartitionSpec
Expand Down Expand Up @@ -2725,6 +2726,106 @@ def test__to_requested_schema_timestamp_to_timestamptz_projection() -> None:
assert expected.equals(actual_result)


def test__to_requested_schema_timestamptz_to_timestamp_projection() -> None:
# file is written with timestamp with timezone
file_schema = Schema(NestedField(1, "ts_field", TimestamptzType(), required=False))
batch = pa.record_batch(
[
pa.array(
[
datetime(2025, 8, 14, 12, 0, 0, tzinfo=timezone.utc),
datetime(2025, 8, 14, 13, 0, 0, tzinfo=timezone.utc),
],
type=pa.timestamp("us", tz="UTC"),
)
],
names=["ts_field"],
)

# table schema expects timestamp without timezone
table_schema = Schema(NestedField(1, "ts_field", TimestampType(), required=False))

# allow_timestamp_tz_mismatch=True enables reading timestamptz as timestamp
actual_result = _to_requested_schema(
table_schema, file_schema, batch, downcast_ns_timestamp_to_us=True, allow_timestamp_tz_mismatch=True
)
expected = pa.record_batch(
[
pa.array(
[
datetime(2025, 8, 14, 12, 0, 0),
datetime(2025, 8, 14, 13, 0, 0),
],
type=pa.timestamp("us"),
)
],
names=["ts_field"],
)

# expect actual_result to have no timezone
assert expected.equals(actual_result)


def test__to_requested_schema_timestamptz_to_timestamp_write_rejects() -> None:
"""Test that the write path (default) rejects timestamptz to timestamp casting.

This ensures we enforce the Iceberg spec distinction between timestamp and timestamptz on writes,
while the read path can be more permissive (like Spark) via allow_timestamp_tz_mismatch=True.
"""
# file is written with timestamp with timezone
file_schema = Schema(NestedField(1, "ts_field", TimestamptzType(), required=False))
batch = pa.record_batch(
[
pa.array(
[
datetime(2025, 8, 14, 12, 0, 0, tzinfo=timezone.utc),
datetime(2025, 8, 14, 13, 0, 0, tzinfo=timezone.utc),
],
type=pa.timestamp("us", tz="UTC"),
)
],
names=["ts_field"],
)

# table schema expects timestamp without timezone
table_schema = Schema(NestedField(1, "ts_field", TimestampType(), required=False))

# allow_timestamp_tz_mismatch=False (default, used in write path) should raise
with pytest.raises(ValueError, match="Unsupported schema projection"):
_to_requested_schema(
table_schema, file_schema, batch, downcast_ns_timestamp_to_us=True, allow_timestamp_tz_mismatch=False
)


def test_write_file_rejects_timestamptz_to_timestamp(tmp_path: Path) -> None:
"""Test that write_file rejects writing timestamptz data to a timestamp column."""
from pyiceberg.table import WriteTask

# Table expects timestamp (no tz), but data has timestamptz
table_schema = Schema(NestedField(1, "ts_field", TimestampType(), required=False))
task_schema = Schema(NestedField(1, "ts_field", TimestamptzType(), required=False))

arrow_data = pa.table({"ts_field": [datetime(2025, 8, 14, 12, 0, 0, tzinfo=timezone.utc)]})

table_metadata = TableMetadataV2(
location=f"file://{tmp_path}",
last_column_id=1,
format_version=2,
schemas=[table_schema],
partition_specs=[PartitionSpec()],
)

task = WriteTask(
write_uuid=uuid.uuid4(),
task_id=0,
record_batches=arrow_data.to_batches(),
schema=task_schema,
)

with pytest.raises(ValueError, match="Unsupported schema projection"):
list(write_file(io=PyArrowFileIO(), table_metadata=table_metadata, tasks=iter([task])))


def test__to_requested_schema_timestamps(
arrow_table_schema_with_all_timestamp_precisions: pa.Schema,
arrow_table_with_all_timestamp_precisions: pa.Table,
Expand Down