diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 2360cf2088..6a50e24d87 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1656,6 +1656,7 @@ def _task_to_record_batches( current_batch, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, projected_missing_fields=projected_missing_fields, + allow_timestamp_tz_mismatch=True, ) @@ -1849,13 +1850,18 @@ def _to_requested_schema( downcast_ns_timestamp_to_us: bool = False, include_field_ids: bool = False, projected_missing_fields: dict[int, Any] = EMPTY_DICT, + allow_timestamp_tz_mismatch: bool = False, ) -> pa.RecordBatch: # We could reuse some of these visitors struct_array = visit_with_partner( requested_schema, batch, ArrowProjectionVisitor( - file_schema, downcast_ns_timestamp_to_us, include_field_ids, projected_missing_fields=projected_missing_fields + file_schema, + downcast_ns_timestamp_to_us, + include_field_ids, + projected_missing_fields=projected_missing_fields, + allow_timestamp_tz_mismatch=allow_timestamp_tz_mismatch, ), ArrowAccessor(file_schema), ) @@ -1868,6 +1874,7 @@ class ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, pa.Array | None] _downcast_ns_timestamp_to_us: bool _use_large_types: bool | None _projected_missing_fields: dict[int, Any] + _allow_timestamp_tz_mismatch: bool def __init__( self, @@ -1876,12 +1883,16 @@ def __init__( include_field_ids: bool = False, use_large_types: bool | None = None, projected_missing_fields: dict[int, Any] = EMPTY_DICT, + allow_timestamp_tz_mismatch: bool = False, ) -> None: self._file_schema = file_schema self._include_field_ids = include_field_ids self._downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us self._use_large_types = use_large_types self._projected_missing_fields = projected_missing_fields + # When True, allows projecting timestamptz (UTC) to timestamp (no tz). + # Allowed for reading (aligns with Spark); disallowed for writing to enforce Iceberg spec's strict typing. + self._allow_timestamp_tz_mismatch = allow_timestamp_tz_mismatch if use_large_types is not None: deprecation_message( @@ -1896,16 +1907,19 @@ def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array: if field.field_type.is_primitive: if (target_type := schema_to_pyarrow(field.field_type, include_field_ids=self._include_field_ids)) != values.type: if field.field_type == TimestampType(): - # Downcasting of nanoseconds to microseconds + source_tz_compatible = values.type.tz is None or ( + self._allow_timestamp_tz_mismatch and values.type.tz in UTC_ALIASES + ) if ( pa.types.is_timestamp(target_type) and not target_type.tz and pa.types.is_timestamp(values.type) - and not values.type.tz + and source_tz_compatible ): + # Downcasting of nanoseconds to microseconds if target_type.unit == "us" and values.type.unit == "ns" and self._downcast_ns_timestamp_to_us: return values.cast(target_type, safe=False) - elif target_type.unit == "us" and values.type.unit in {"s", "ms"}: + elif target_type.unit == "us" and values.type.unit in {"s", "ms", "us"}: return values.cast(target_type) raise ValueError(f"Unsupported schema projection from {values.type} to {target_type}") elif field.field_type == TimestamptzType(): @@ -1915,6 +1929,7 @@ def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array: and pa.types.is_timestamp(values.type) and (values.type.tz in UTC_ALIASES or values.type.tz is None) ): + # Downcasting of nanoseconds to microseconds if target_type.unit == "us" and values.type.unit == "ns" and self._downcast_ns_timestamp_to_us: return values.cast(target_type, safe=False) elif target_type.unit == "us" and values.type.unit in {"s", "ms", "us"}: diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 89c11435e5..04bc3ecfac 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -81,6 +81,7 @@ expression_to_pyarrow, parquet_path_to_id_mapping, schema_to_pyarrow, + write_file, ) from pyiceberg.manifest import DataFile, DataFileContent, FileFormat from pyiceberg.partitioning import PartitionField, PartitionSpec @@ -2725,6 +2726,106 @@ def test__to_requested_schema_timestamp_to_timestamptz_projection() -> None: assert expected.equals(actual_result) +def test__to_requested_schema_timestamptz_to_timestamp_projection() -> None: + # file is written with timestamp with timezone + file_schema = Schema(NestedField(1, "ts_field", TimestamptzType(), required=False)) + batch = pa.record_batch( + [ + pa.array( + [ + datetime(2025, 8, 14, 12, 0, 0, tzinfo=timezone.utc), + datetime(2025, 8, 14, 13, 0, 0, tzinfo=timezone.utc), + ], + type=pa.timestamp("us", tz="UTC"), + ) + ], + names=["ts_field"], + ) + + # table schema expects timestamp without timezone + table_schema = Schema(NestedField(1, "ts_field", TimestampType(), required=False)) + + # allow_timestamp_tz_mismatch=True enables reading timestamptz as timestamp + actual_result = _to_requested_schema( + table_schema, file_schema, batch, downcast_ns_timestamp_to_us=True, allow_timestamp_tz_mismatch=True + ) + expected = pa.record_batch( + [ + pa.array( + [ + datetime(2025, 8, 14, 12, 0, 0), + datetime(2025, 8, 14, 13, 0, 0), + ], + type=pa.timestamp("us"), + ) + ], + names=["ts_field"], + ) + + # expect actual_result to have no timezone + assert expected.equals(actual_result) + + +def test__to_requested_schema_timestamptz_to_timestamp_write_rejects() -> None: + """Test that the write path (default) rejects timestamptz to timestamp casting. + + This ensures we enforce the Iceberg spec distinction between timestamp and timestamptz on writes, + while the read path can be more permissive (like Spark) via allow_timestamp_tz_mismatch=True. + """ + # file is written with timestamp with timezone + file_schema = Schema(NestedField(1, "ts_field", TimestamptzType(), required=False)) + batch = pa.record_batch( + [ + pa.array( + [ + datetime(2025, 8, 14, 12, 0, 0, tzinfo=timezone.utc), + datetime(2025, 8, 14, 13, 0, 0, tzinfo=timezone.utc), + ], + type=pa.timestamp("us", tz="UTC"), + ) + ], + names=["ts_field"], + ) + + # table schema expects timestamp without timezone + table_schema = Schema(NestedField(1, "ts_field", TimestampType(), required=False)) + + # allow_timestamp_tz_mismatch=False (default, used in write path) should raise + with pytest.raises(ValueError, match="Unsupported schema projection"): + _to_requested_schema( + table_schema, file_schema, batch, downcast_ns_timestamp_to_us=True, allow_timestamp_tz_mismatch=False + ) + + +def test_write_file_rejects_timestamptz_to_timestamp(tmp_path: Path) -> None: + """Test that write_file rejects writing timestamptz data to a timestamp column.""" + from pyiceberg.table import WriteTask + + # Table expects timestamp (no tz), but data has timestamptz + table_schema = Schema(NestedField(1, "ts_field", TimestampType(), required=False)) + task_schema = Schema(NestedField(1, "ts_field", TimestamptzType(), required=False)) + + arrow_data = pa.table({"ts_field": [datetime(2025, 8, 14, 12, 0, 0, tzinfo=timezone.utc)]}) + + table_metadata = TableMetadataV2( + location=f"file://{tmp_path}", + last_column_id=1, + format_version=2, + schemas=[table_schema], + partition_specs=[PartitionSpec()], + ) + + task = WriteTask( + write_uuid=uuid.uuid4(), + task_id=0, + record_batches=arrow_data.to_batches(), + schema=task_schema, + ) + + with pytest.raises(ValueError, match="Unsupported schema projection"): + list(write_file(io=PyArrowFileIO(), table_metadata=table_metadata, tasks=iter([task]))) + + def test__to_requested_schema_timestamps( arrow_table_schema_with_all_timestamp_precisions: pa.Schema, arrow_table_with_all_timestamp_precisions: pa.Table,