From b85c19f93540d07fe5981cda46449a2e8d62b31e Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Sun, 4 Feb 2024 23:14:53 +0100 Subject: [PATCH 1/5] docs: Document Parquet write options --- mkdocs/docs/configuration.md | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index bfe1e62fac..129931e01a 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -62,7 +62,16 @@ You can also set the FileIO explicitly: | ---------- | -------------------------------- | ----------------------------------------------------------------------------------------------- | | py-io-impl | pyiceberg.io.fsspec.FsspecFileIO | Sets the FileIO explicitly to an implementation, and will fail explicitly if it can't be loaded | -For the FileIO there are several configuration options available: +General configuration: + +| Key | Options | Default | Description | +| --------------------------------- | --------------------------------- | ------- | ------------------------------------------------------------------------------------------- | +| `write.parquet.compression-codec` | `{uncompressed,zstd,gzip,snappy}` | zstd | Sets the Parquet compression coddec. | +| `write.parquet.compression-level` | Integer | null | Parquet compression level for the codec. If not set, it is up to PyIceberg | +| `write.parquet.page-size-bytes` | Size in bytes | 1MB | Set a target threshold for the approximate encoded size of data pages within a column chunk | +| `write.parquet.dict-size-bytes` | Size in bytes | 1MB | Set the dictionary page size limit per row group | + +Below the FileIO specific configuration is shown. ### S3 From cbd8adc3b4c5e6e23da14e3243a9c9f7289a9cfa Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Mon, 5 Feb 2024 10:53:23 +0100 Subject: [PATCH 2/5] Move to tables section --- mkdocs/docs/configuration.md | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index 129931e01a..a9c6f72d70 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -46,7 +46,20 @@ The environment variable picked up by Iceberg starts with `PYICEBERG_` and then For example, `PYICEBERG_CATALOG__DEFAULT__S3__ACCESS_KEY_ID`, sets `s3.access-key-id` on the `default` catalog. -## FileIO +# Tables + +Iceberg tables support table properties to configure table behavior. + +## Write options + +| Key | Options | Default | Description | +| --------------------------------- | --------------------------------- | ------- | ------------------------------------------------------------------------------------------- | +| `write.parquet.compression-codec` | `{uncompressed,zstd,gzip,snappy}` | zstd | Sets the Parquet compression coddec. | +| `write.parquet.compression-level` | Integer | null | Parquet compression level for the codec. If not set, it is up to PyIceberg | +| `write.parquet.page-size-bytes` | Size in bytes | 1MB | Set a target threshold for the approximate encoded size of data pages within a column chunk | +| `write.parquet.dict-size-bytes` | Size in bytes | 1MB | Set the dictionary page size limit per row group | + +# FileIO Iceberg works with the concept of a FileIO which is a pluggable module for reading, writing, and deleting files. By default, PyIceberg will try to initialize the FileIO that's suitable for the scheme (`s3://`, `gs://`, etc.) and will use the first one that's installed. @@ -62,17 +75,6 @@ You can also set the FileIO explicitly: | ---------- | -------------------------------- | ----------------------------------------------------------------------------------------------- | | py-io-impl | pyiceberg.io.fsspec.FsspecFileIO | Sets the FileIO explicitly to an implementation, and will fail explicitly if it can't be loaded | -General configuration: - -| Key | Options | Default | Description | -| --------------------------------- | --------------------------------- | ------- | ------------------------------------------------------------------------------------------- | -| `write.parquet.compression-codec` | `{uncompressed,zstd,gzip,snappy}` | zstd | Sets the Parquet compression coddec. | -| `write.parquet.compression-level` | Integer | null | Parquet compression level for the codec. If not set, it is up to PyIceberg | -| `write.parquet.page-size-bytes` | Size in bytes | 1MB | Set a target threshold for the approximate encoded size of data pages within a column chunk | -| `write.parquet.dict-size-bytes` | Size in bytes | 1MB | Set the dictionary page size limit per row group | - -Below the FileIO specific configuration is shown. - ### S3 | Key | Example | Description | From 7f582cb69f9e0ed54a930a786a9cbf5531e128cd Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Mon, 5 Feb 2024 13:39:28 +0100 Subject: [PATCH 3/5] Default to 2MB --- pyiceberg/io/pyarrow.py | 59 ++++++++++++++++++++++++++++++++--------- 1 file changed, 46 insertions(+), 13 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 7a94ce4c7d..99c1af5ad6 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -26,6 +26,7 @@ from __future__ import annotations import concurrent.futures +import fnmatch import itertools import logging import os @@ -811,12 +812,9 @@ def __init__(self, name_mapping: Optional[NameMapping] = None) -> None: self._field_names = [] self._name_mapping = name_mapping - def _current_path(self) -> str: - return ".".join(self._field_names) - def _field_id(self, field: pa.Field) -> int: if self._name_mapping: - return self._name_mapping.find(self._current_path()).field_id + return self._name_mapping.find(*self._field_names).field_id elif (field_id := _get_field_id(field)) is not None: return field_id else: @@ -1339,7 +1337,10 @@ def update_min(self, val: Any) -> None: def update_max(self, val: Any) -> None: self.current_max = val if self.current_max is None else max(val, self.current_max) - def min_as_bytes(self) -> bytes: + def min_as_bytes(self) -> Optional[bytes]: + if self.current_min is None: + return None + return self.serialize( self.current_min if self.trunc_length is None @@ -1720,13 +1721,14 @@ def write_file(table: Table, tasks: Iterator[WriteTask]) -> Iterator[DataFile]: except StopIteration: pass + parquet_writer_kwargs = _get_parquet_writer_kwargs(table.properties) + file_path = f'{table.location()}/data/{task.generate_data_file_filename("parquet")}' file_schema = schema_to_pyarrow(table.schema()) - collected_metrics: List[pq.FileMetaData] = [] fo = table.io.new_output(file_path) with fo.create(overwrite=True) as fos: - with pq.ParquetWriter(fos, schema=file_schema, version="1.0", metadata_collector=collected_metrics) as writer: + with pq.ParquetWriter(fos, schema=file_schema, version="1.0", **parquet_writer_kwargs) as writer: writer.write_table(task.df) data_file = DataFile( @@ -1735,21 +1737,52 @@ def write_file(table: Table, tasks: Iterator[WriteTask]) -> Iterator[DataFile]: file_format=FileFormat.PARQUET, partition=Record(), file_size_in_bytes=len(fo), - sort_order_id=task.sort_order_id, + # After this has been fixed: + # https://github.com/apache/iceberg-python/issues/271 + # sort_order_id=task.sort_order_id, + sort_order_id=None, # Just copy these from the table for now spec_id=table.spec().spec_id, equality_ids=None, key_metadata=None, ) - if len(collected_metrics) != 1: - # One file has been written - raise ValueError(f"Expected 1 entry, got: {collected_metrics}") - fill_parquet_file_metadata( data_file=data_file, - parquet_metadata=collected_metrics[0], + parquet_metadata=writer.writer.metadata, stats_columns=compute_statistics_plan(table.schema(), table.properties), parquet_column_mapping=parquet_path_to_id_mapping(table.schema()), ) return iter([data_file]) + + +def _get_parquet_writer_kwargs(table_properties: Properties) -> Dict[str, Any]: + def _get_int(key: str, default: Optional[int] = None) -> Optional[int]: + if value := table_properties.get(key): + try: + return int(value) + except ValueError as e: + raise ValueError(f"Could not parse table property {key} to an integer: {value}") from e + else: + return default + + for key_pattern in [ + "write.parquet.row-group-size-bytes", + "write.parquet.page-row-limit", + "write.parquet.bloom-filter-max-bytes", + "write.parquet.bloom-filter-enabled.column.*", + ]: + if unsupported_keys := fnmatch.filter(table_properties, key_pattern): + raise NotImplementedError(f"Parquet writer option(s) {unsupported_keys} not implemented") + + compression_codec = table_properties.get("write.parquet.compression-codec", "zstd") + compression_level = _get_int("write.parquet.compression-level") + if compression_codec == "uncompressed": + compression_codec = "none" + + return { + "compression": compression_codec, + "compression_level": compression_level, + "data_page_size": _get_int("write.parquet.page-size-bytes"), + "dictionary_pagesize_limit": _get_int("write.parquet.dict-size-bytes", default=2 * 1024 * 1024), + } From 8c3ff277439ae448ea002410280218e73f308e90 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Mon, 5 Feb 2024 13:42:46 +0100 Subject: [PATCH 4/5] Revert some unrelated changes --- mkdocs/docs/configuration.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index a9c6f72d70..9393a916fa 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -75,6 +75,8 @@ You can also set the FileIO explicitly: | ---------- | -------------------------------- | ----------------------------------------------------------------------------------------------- | | py-io-impl | pyiceberg.io.fsspec.FsspecFileIO | Sets the FileIO explicitly to an implementation, and will fail explicitly if it can't be loaded | +For the FileIO there are several configuration options available: + ### S3 | Key | Example | Description | From dc56b834552d9c4aa9a3b2375a3880efc8156229 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Mon, 5 Feb 2024 15:39:25 +0100 Subject: [PATCH 5/5] Update configuration.md Co-authored-by: Honah J. --- mkdocs/docs/configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index 9393a916fa..ce17931169 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -57,7 +57,7 @@ Iceberg tables support table properties to configure table behavior. | `write.parquet.compression-codec` | `{uncompressed,zstd,gzip,snappy}` | zstd | Sets the Parquet compression coddec. | | `write.parquet.compression-level` | Integer | null | Parquet compression level for the codec. If not set, it is up to PyIceberg | | `write.parquet.page-size-bytes` | Size in bytes | 1MB | Set a target threshold for the approximate encoded size of data pages within a column chunk | -| `write.parquet.dict-size-bytes` | Size in bytes | 1MB | Set the dictionary page size limit per row group | +| `write.parquet.dict-size-bytes` | Size in bytes | 2MB | Set the dictionary page size limit per row group | # FileIO