From 528da88d61b2c5e7e25c3f5348fe51f5a7242b3f Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sun, 9 Nov 2025 08:01:18 -0800 Subject: [PATCH 1/5] allow arrow timestamptz with UTC to be read as iceberg timestamp --- pyiceberg/io/pyarrow.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 2360cf2088..545a29a93a 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1901,11 +1901,11 @@ def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array: pa.types.is_timestamp(target_type) and not target_type.tz and pa.types.is_timestamp(values.type) - and not values.type.tz + and (values.type.tz in UTC_ALIASES or values.type.tz is None) ): if target_type.unit == "us" and values.type.unit == "ns" and self._downcast_ns_timestamp_to_us: return values.cast(target_type, safe=False) - elif target_type.unit == "us" and values.type.unit in {"s", "ms"}: + elif target_type.unit == "us" and values.type.unit in {"s", "ms", "us"}: return values.cast(target_type) raise ValueError(f"Unsupported schema projection from {values.type} to {target_type}") elif field.field_type == TimestamptzType(): From 89c6588d0af99d217dc3dbe76a5e71572566abf6 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sun, 9 Nov 2025 08:01:28 -0800 Subject: [PATCH 2/5] nit: comment --- pyiceberg/io/pyarrow.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 545a29a93a..4ef74bf8cd 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1896,13 +1896,13 @@ def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array: if field.field_type.is_primitive: if (target_type := schema_to_pyarrow(field.field_type, include_field_ids=self._include_field_ids)) != values.type: if field.field_type == TimestampType(): - # Downcasting of nanoseconds to microseconds if ( pa.types.is_timestamp(target_type) and not target_type.tz and pa.types.is_timestamp(values.type) and (values.type.tz in UTC_ALIASES or values.type.tz is None) ): + # Downcasting of nanoseconds to microseconds if target_type.unit == "us" and values.type.unit == "ns" and self._downcast_ns_timestamp_to_us: return values.cast(target_type, safe=False) elif target_type.unit == "us" and values.type.unit in {"s", "ms", "us"}: @@ -1915,6 +1915,7 @@ def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array: and pa.types.is_timestamp(values.type) and (values.type.tz in UTC_ALIASES or values.type.tz is None) ): + # Downcasting of nanoseconds to microseconds if target_type.unit == "us" and values.type.unit == "ns" and self._downcast_ns_timestamp_to_us: return values.cast(target_type, safe=False) elif target_type.unit == "us" and values.type.unit in {"s", "ms", "us"}: From 9f637fbe1a41a7399f7b38085649c320daf9bb7c Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sun, 9 Nov 2025 08:39:39 -0800 Subject: [PATCH 3/5] add testcase --- tests/io/test_pyarrow.py | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 89c11435e5..8b0222b769 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -2725,6 +2725,43 @@ def test__to_requested_schema_timestamp_to_timestamptz_projection() -> None: assert expected.equals(actual_result) +def test__to_requested_schema_timestamptz_to_timestamp_projection() -> None: + # file is written with timestamp with timezone + file_schema = Schema(NestedField(1, "ts_field", TimestamptzType(), required=False)) + batch = pa.record_batch( + [ + pa.array( + [ + datetime(2025, 8, 14, 12, 0, 0, tzinfo=timezone.utc), + datetime(2025, 8, 14, 13, 0, 0, tzinfo=timezone.utc), + ], + type=pa.timestamp("us", tz="UTC"), + ) + ], + names=["ts_field"], + ) + + # table schema expects timestamp without timezone + table_schema = Schema(NestedField(1, "ts_field", TimestampType(), required=False)) + + actual_result = _to_requested_schema(table_schema, file_schema, batch, downcast_ns_timestamp_to_us=True) + expected = pa.record_batch( + [ + pa.array( + [ + datetime(2025, 8, 14, 12, 0, 0), + datetime(2025, 8, 14, 13, 0, 0), + ], + type=pa.timestamp("us"), + ) + ], + names=["ts_field"], + ) + + # expect actual_result to have no timezone + assert expected.equals(actual_result) + + def test__to_requested_schema_timestamps( arrow_table_schema_with_all_timestamp_precisions: pa.Schema, arrow_table_with_all_timestamp_precisions: pa.Table, From c73f310c116af4d6bc9152999d0bd00e9bd6f483 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Tue, 27 Jan 2026 18:18:28 -0800 Subject: [PATCH 4/5] disable write path and add tests --- pyiceberg/io/pyarrow.py | 18 +++++++++-- tests/io/test_pyarrow.py | 66 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 81 insertions(+), 3 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 4ef74bf8cd..8d27a53380 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1656,6 +1656,7 @@ def _task_to_record_batches( current_batch, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, projected_missing_fields=projected_missing_fields, + allow_timestamp_tz_mismatch=True, ) @@ -1849,13 +1850,18 @@ def _to_requested_schema( downcast_ns_timestamp_to_us: bool = False, include_field_ids: bool = False, projected_missing_fields: dict[int, Any] = EMPTY_DICT, + allow_timestamp_tz_mismatch: bool = False, ) -> pa.RecordBatch: # We could reuse some of these visitors struct_array = visit_with_partner( requested_schema, batch, ArrowProjectionVisitor( - file_schema, downcast_ns_timestamp_to_us, include_field_ids, projected_missing_fields=projected_missing_fields + file_schema, + downcast_ns_timestamp_to_us, + include_field_ids, + projected_missing_fields=projected_missing_fields, + allow_timestamp_tz_mismatch=allow_timestamp_tz_mismatch, ), ArrowAccessor(file_schema), ) @@ -1868,6 +1874,7 @@ class ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, pa.Array | None] _downcast_ns_timestamp_to_us: bool _use_large_types: bool | None _projected_missing_fields: dict[int, Any] + _allow_timestamp_tz_mismatch: bool def __init__( self, @@ -1876,12 +1883,16 @@ def __init__( include_field_ids: bool = False, use_large_types: bool | None = None, projected_missing_fields: dict[int, Any] = EMPTY_DICT, + allow_timestamp_tz_mismatch: bool = False, ) -> None: self._file_schema = file_schema self._include_field_ids = include_field_ids self._downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us self._use_large_types = use_large_types self._projected_missing_fields = projected_missing_fields + # Allow reading timestamp with/without timezone interchangeably (aligns with Spark behavior) + # This is intentionally disabled on the write path to enforce the Iceberg spec distinction + self._allow_timestamp_tz_mismatch = allow_timestamp_tz_mismatch if use_large_types is not None: deprecation_message( @@ -1896,11 +1907,14 @@ def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array: if field.field_type.is_primitive: if (target_type := schema_to_pyarrow(field.field_type, include_field_ids=self._include_field_ids)) != values.type: if field.field_type == TimestampType(): + source_tz_compatible = values.type.tz is None or ( + self._allow_timestamp_tz_mismatch and values.type.tz in UTC_ALIASES + ) if ( pa.types.is_timestamp(target_type) and not target_type.tz and pa.types.is_timestamp(values.type) - and (values.type.tz in UTC_ALIASES or values.type.tz is None) + and source_tz_compatible ): # Downcasting of nanoseconds to microseconds if target_type.unit == "us" and values.type.unit == "ns" and self._downcast_ns_timestamp_to_us: diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 8b0222b769..04bc3ecfac 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -81,6 +81,7 @@ expression_to_pyarrow, parquet_path_to_id_mapping, schema_to_pyarrow, + write_file, ) from pyiceberg.manifest import DataFile, DataFileContent, FileFormat from pyiceberg.partitioning import PartitionField, PartitionSpec @@ -2744,7 +2745,10 @@ def test__to_requested_schema_timestamptz_to_timestamp_projection() -> None: # table schema expects timestamp without timezone table_schema = Schema(NestedField(1, "ts_field", TimestampType(), required=False)) - actual_result = _to_requested_schema(table_schema, file_schema, batch, downcast_ns_timestamp_to_us=True) + # allow_timestamp_tz_mismatch=True enables reading timestamptz as timestamp + actual_result = _to_requested_schema( + table_schema, file_schema, batch, downcast_ns_timestamp_to_us=True, allow_timestamp_tz_mismatch=True + ) expected = pa.record_batch( [ pa.array( @@ -2762,6 +2766,66 @@ def test__to_requested_schema_timestamptz_to_timestamp_projection() -> None: assert expected.equals(actual_result) +def test__to_requested_schema_timestamptz_to_timestamp_write_rejects() -> None: + """Test that the write path (default) rejects timestamptz to timestamp casting. + + This ensures we enforce the Iceberg spec distinction between timestamp and timestamptz on writes, + while the read path can be more permissive (like Spark) via allow_timestamp_tz_mismatch=True. + """ + # file is written with timestamp with timezone + file_schema = Schema(NestedField(1, "ts_field", TimestamptzType(), required=False)) + batch = pa.record_batch( + [ + pa.array( + [ + datetime(2025, 8, 14, 12, 0, 0, tzinfo=timezone.utc), + datetime(2025, 8, 14, 13, 0, 0, tzinfo=timezone.utc), + ], + type=pa.timestamp("us", tz="UTC"), + ) + ], + names=["ts_field"], + ) + + # table schema expects timestamp without timezone + table_schema = Schema(NestedField(1, "ts_field", TimestampType(), required=False)) + + # allow_timestamp_tz_mismatch=False (default, used in write path) should raise + with pytest.raises(ValueError, match="Unsupported schema projection"): + _to_requested_schema( + table_schema, file_schema, batch, downcast_ns_timestamp_to_us=True, allow_timestamp_tz_mismatch=False + ) + + +def test_write_file_rejects_timestamptz_to_timestamp(tmp_path: Path) -> None: + """Test that write_file rejects writing timestamptz data to a timestamp column.""" + from pyiceberg.table import WriteTask + + # Table expects timestamp (no tz), but data has timestamptz + table_schema = Schema(NestedField(1, "ts_field", TimestampType(), required=False)) + task_schema = Schema(NestedField(1, "ts_field", TimestamptzType(), required=False)) + + arrow_data = pa.table({"ts_field": [datetime(2025, 8, 14, 12, 0, 0, tzinfo=timezone.utc)]}) + + table_metadata = TableMetadataV2( + location=f"file://{tmp_path}", + last_column_id=1, + format_version=2, + schemas=[table_schema], + partition_specs=[PartitionSpec()], + ) + + task = WriteTask( + write_uuid=uuid.uuid4(), + task_id=0, + record_batches=arrow_data.to_batches(), + schema=task_schema, + ) + + with pytest.raises(ValueError, match="Unsupported schema projection"): + list(write_file(io=PyArrowFileIO(), table_metadata=table_metadata, tasks=iter([task]))) + + def test__to_requested_schema_timestamps( arrow_table_schema_with_all_timestamp_precisions: pa.Schema, arrow_table_with_all_timestamp_precisions: pa.Table, From 0d62b95dc674ee8623939e1f970f5535934514ff Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Tue, 27 Jan 2026 18:27:15 -0800 Subject: [PATCH 5/5] comment --- pyiceberg/io/pyarrow.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 8d27a53380..6a50e24d87 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1890,8 +1890,8 @@ def __init__( self._downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us self._use_large_types = use_large_types self._projected_missing_fields = projected_missing_fields - # Allow reading timestamp with/without timezone interchangeably (aligns with Spark behavior) - # This is intentionally disabled on the write path to enforce the Iceberg spec distinction + # When True, allows projecting timestamptz (UTC) to timestamp (no tz). + # Allowed for reading (aligns with Spark); disallowed for writing to enforce Iceberg spec's strict typing. self._allow_timestamp_tz_mismatch = allow_timestamp_tz_mismatch if use_large_types is not None: