From d9efdbb90fcf564b3d3f460ebdce9d1966106fcd Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Fri, 5 Jan 2024 14:02:41 +0100 Subject: [PATCH 1/7] Arrow: Use case instead of wrapping a map/list Wrapping the list seems to introduce an odd behavior where `null` values are converted to an empty list `[]`. Resolves #251 --- dev/provision.py | 20 ++++++++++++++++++++ pyiceberg/io/pyarrow.py | 16 ++-------------- tests/test_integration.py | 14 +++++++++++++- 3 files changed, 35 insertions(+), 15 deletions(-) diff --git a/dev/provision.py b/dev/provision.py index 9917cd3f20..793ae70880 100644 --- a/dev/provision.py +++ b/dev/provision.py @@ -339,3 +339,23 @@ ) spark.sql("INSERT INTO default.test_table_add_column VALUES ('2', '2')") + +spark.sql( + """ +CREATE TABLE default.test_table_empty_list_and_map ( + col_list array, + col_map map +) +USING iceberg +TBLPROPERTIES ( + 'format-version'='1' +); +""" +) + +spark.sql( + """ +INSERT INTO default.test_table_empty_list_and_map +VALUES (null, null) +""" +) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index a537cf7a30..21c7b7e58f 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1052,24 +1052,12 @@ def field(self, field: NestedField, _: Optional[pa.Array], field_array: Optional return field_array def list(self, list_type: ListType, list_array: Optional[pa.Array], value_array: Optional[pa.Array]) -> Optional[pa.Array]: - return ( - pa.ListArray.from_arrays(list_array.offsets, self.cast_if_needed(list_type.element_field, value_array)) - if isinstance(list_array, pa.ListArray) - else None - ) + return list_array.cast(schema_to_pyarrow(list_type)) if isinstance(list_array, pa.ListArray) else None def map( self, map_type: MapType, map_array: Optional[pa.Array], key_result: Optional[pa.Array], value_result: Optional[pa.Array] ) -> Optional[pa.Array]: - return ( - pa.MapArray.from_arrays( - map_array.offsets, - self.cast_if_needed(map_type.key_field, key_result), - self.cast_if_needed(map_type.value_field, value_result), - ) - if isinstance(map_array, pa.MapArray) - else None - ) + return map_array.cast(schema_to_pyarrow(map_type)) if isinstance(map_array, pa.MapArray) else None def primitive(self, _: PrimitiveType, array: Optional[pa.Array]) -> Optional[pa.Array]: return array diff --git a/tests/test_integration.py b/tests/test_integration.py index 2a173be3b3..602cb0263d 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -93,6 +93,11 @@ def table_test_table_sanitized_character(catalog: Catalog) -> Table: return catalog.load_table("default.test_table_sanitized_character") +@pytest.fixture() +def table_test_empty_list_and_map(catalog: Catalog) -> Table: + return catalog.load_table("default.test_table_empty_list_and_map") + + TABLE_NAME = ("default", "t1") @@ -417,8 +422,15 @@ def test_upgrade_table_version(table_test_table_version: Table) -> None: @pytest.mark.integration -def test_reproduce_issue(table_test_table_sanitized_character: Table) -> None: +def test_sanitize_column_names(table_test_table_sanitized_character: Table) -> None: arrow_table = table_test_table_sanitized_character.scan().to_arrow() assert len(arrow_table.schema.names), 1 assert len(table_test_table_sanitized_character.schema().fields), 1 assert arrow_table.schema.names[0] == table_test_table_sanitized_character.schema().fields[0].name + + +@pytest.mark.integration +def test_null_list_and_map(table_test_empty_list_and_map: Table) -> None: + arrow_table = table_test_empty_list_and_map.scan().to_arrow() + assert arrow_table["col_list"].to_pylist() == [None] + assert arrow_table["col_map"].to_pylist() == [None] From e3f3463cfd2602b5146f6763a6e771f1e4181178 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Mon, 8 Jan 2024 12:15:52 -0800 Subject: [PATCH 2/7] Cleanup --- pyiceberg/io/pyarrow.py | 34 +++++++++++++++++++++++++++------- tests/io/test_pyarrow.py | 38 +++++++++++++++++++++++++++++++------- 2 files changed, 58 insertions(+), 14 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 21c7b7e58f..ab577e791e 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1018,12 +1018,20 @@ class ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, Optional[pa.Arra def __init__(self, file_schema: Schema): self.file_schema = file_schema - def cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array: + def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array: file_field = self.file_schema.find_field(field.field_id) if field.field_type.is_primitive and field.field_type != file_field.field_type: return values.cast(schema_to_pyarrow(promote(file_field.field_type, field.field_type))) return values + def _construct_field(self, field: NestedField, arrow_type: pa.DataType) -> pa.Field: + return pa.field( + name=field.name, + type=arrow_type, + nullable=field.optional, + metadata={DOC: field.doc, FIELD_ID: str(field.field_id)} if field.doc else {FIELD_ID: str(field.field_id)}, + ) + def schema(self, schema: Schema, schema_partner: Optional[pa.Array], struct_result: Optional[pa.Array]) -> Optional[pa.Array]: return struct_result @@ -1036,28 +1044,40 @@ def struct( fields: List[pa.Field] = [] for field, field_array in zip(struct.fields, field_results): if field_array is not None: - array = self.cast_if_needed(field, field_array) + array = self._cast_if_needed(field, field_array) field_arrays.append(array) - fields.append(pa.field(field.name, array.type, field.optional)) + fields.append(self._construct_field(field, array.type)) elif field.optional: arrow_type = schema_to_pyarrow(field.field_type) field_arrays.append(pa.nulls(len(struct_array), type=arrow_type)) - fields.append(pa.field(field.name, arrow_type, field.optional)) + fields.append(self._construct_field(field, arrow_type)) else: raise ResolveError(f"Field is required, and could not be found in the file: {field}") - return pa.StructArray.from_arrays(arrays=field_arrays, fields=pa.struct(fields)) + arr = pa.StructArray.from_arrays(arrays=field_arrays, fields=pa.struct(fields)) + return arr def field(self, field: NestedField, _: Optional[pa.Array], field_array: Optional[pa.Array]) -> Optional[pa.Array]: return field_array def list(self, list_type: ListType, list_array: Optional[pa.Array], value_array: Optional[pa.Array]) -> Optional[pa.Array]: - return list_array.cast(schema_to_pyarrow(list_type)) if isinstance(list_array, pa.ListArray) else None + if isinstance(list_array, pa.ListArray) and value_array is not None: + arrow_field = pa.list_(self._construct_field(list_type.element_field, value_array.type)) + return pa.ListArray.from_arrays(list_array.offsets, value_array, arrow_field) + else: + return None def map( self, map_type: MapType, map_array: Optional[pa.Array], key_result: Optional[pa.Array], value_result: Optional[pa.Array] ) -> Optional[pa.Array]: - return map_array.cast(schema_to_pyarrow(map_type)) if isinstance(map_array, pa.MapArray) else None + if isinstance(map_array, pa.MapArray) and key_result is not None and value_result is not None: + arrow_field = pa.map_( + self._construct_field(map_type.key_field, key_result.type), + self._construct_field(map_type.value_field, value_result.type), + ) + return pa.MapArray.from_arrays(map_array.offsets, key_result, value_result, arrow_field) + else: + return None def primitive(self, _: PrimitiveType, array: Optional[pa.Array]) -> Optional[pa.Array]: return array diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index be5f68e429..0c25d3fd2d 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -916,7 +916,13 @@ def test_read_list(schema_list: Schema, file_list: str) -> None: for actual, expected in zip(result_table.columns[0], [list(range(1, 10)), list(range(2, 20)), list(range(3, 30))]): assert actual.as_py() == expected - assert repr(result_table.schema) == "ids: list\n child 0, item: int32" + assert ( + repr(result_table.schema) + == """ids: list + child 0, element: int32 + -- field metadata -- + field_id: '51'""" + ) def test_read_map(schema_map: Schema, file_map: str) -> None: @@ -929,9 +935,13 @@ def test_read_map(schema_map: Schema, file_map: str) -> None: assert ( repr(result_table.schema) == """properties: map - child 0, entries: struct not null + child 0, entries: struct not null child 0, key: string not null - child 1, value: string""" + -- field metadata -- + field_id: '51' + child 1, value: string not null + -- field metadata -- + field_id: '52'""" ) @@ -1084,7 +1094,13 @@ def test_projection_nested_new_field(file_struct: str) -> None: for actual, expected in zip(result_table.columns[0], [None, None, None]): assert actual.as_py() == {"null": expected} assert len(result_table.columns[0]) == 3 - assert repr(result_table.schema) == "location: struct not null\n child 0, null: double" + assert ( + repr(result_table.schema) + == """location: struct not null + child 0, null: double + -- field metadata -- + field_id: '43'""" + ) def test_projection_nested_struct(schema_struct: Schema, file_struct: str) -> None: @@ -1155,11 +1171,19 @@ def test_projection_list_of_structs(schema_list_of_structs: Schema, file_list_of assert actual.as_py() == expected assert ( repr(result_table.schema) - == """locations: list> - child 0, item: struct + == """locations: list> + child 0, element: struct child 0, latitude: double not null + -- field metadata -- + field_id: '511' child 1, longitude: double not null - child 2, altitude: double""" + -- field metadata -- + field_id: '512' + child 2, altitude: double + -- field metadata -- + field_id: '513' + -- field metadata -- + field_id: '51'""" ) From 0d0a0761d0df97a138e26c2df98db826b19f993c Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Mon, 8 Jan 2024 13:28:05 -0800 Subject: [PATCH 3/7] Remove the field-ids for now --- pyiceberg/io/pyarrow.py | 14 ++++++++------ tests/io/test_pyarrow.py | 30 +++++++----------------------- 2 files changed, 15 insertions(+), 29 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index cdb015869b..d1a0e81105 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1030,7 +1030,8 @@ def _construct_field(self, field: NestedField, arrow_type: pa.DataType) -> pa.Fi name=field.name, type=arrow_type, nullable=field.optional, - metadata={DOC: field.doc, FIELD_ID: str(field.field_id)} if field.doc else {FIELD_ID: str(field.field_id)}, + # Enable this once https://github.com/apache/arrow/pull/39516 gets merged and released + # metadata={DOC: field.doc, FIELD_ID: str(field.field_id)} if field.doc else {FIELD_ID: str(field.field_id)}, ) def schema(self, schema: Schema, schema_partner: Optional[pa.Array], struct_result: Optional[pa.Array]) -> Optional[pa.Array]: @@ -1072,11 +1073,12 @@ def map( self, map_type: MapType, map_array: Optional[pa.Array], key_result: Optional[pa.Array], value_result: Optional[pa.Array] ) -> Optional[pa.Array]: if isinstance(map_array, pa.MapArray) and key_result is not None and value_result is not None: - arrow_field = pa.map_( - self._construct_field(map_type.key_field, key_result.type), - self._construct_field(map_type.value_field, value_result.type), - ) - return pa.MapArray.from_arrays(map_array.offsets, key_result, value_result, arrow_field) + # Enable this once https://github.com/apache/arrow/pull/39516 gets merged and released + # arrow_field = pa.map_( + # self._construct_field(map_type.key_field, key_result.type), + # self._construct_field(map_type.value_field, value_result.type), + # ) + return pa.MapArray.from_arrays(map_array.offsets, key_result, value_result) else: return None diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 03ed02f41c..f74f5ca8f9 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -916,10 +916,8 @@ def test_read_list(schema_list: Schema, file_list: str) -> None: assert ( repr(result_table.schema) - == """ids: list - child 0, element: int32 - -- field metadata -- - field_id: '51'""" + == """'ids: list + child 0, element: int32'""" ) @@ -932,14 +930,10 @@ def test_read_map(schema_map: Schema, file_map: str) -> None: assert ( repr(result_table.schema) - == """properties: map - child 0, entries: struct not null + == """'properties: map + child 0, entries: struct not null child 0, key: string not null - -- field metadata -- - field_id: '51' - child 1, value: string not null - -- field metadata -- - field_id: '52'""" + child 1, value: string'""" ) @@ -1095,9 +1089,7 @@ def test_projection_nested_new_field(file_struct: str) -> None: assert ( repr(result_table.schema) == """location: struct not null - child 0, null: double - -- field metadata -- - field_id: '43'""" + child 0, null: double""" ) @@ -1172,16 +1164,8 @@ def test_projection_list_of_structs(schema_list_of_structs: Schema, file_list_of == """locations: list> child 0, element: struct child 0, latitude: double not null - -- field metadata -- - field_id: '511' child 1, longitude: double not null - -- field metadata -- - field_id: '512' - child 2, altitude: double - -- field metadata -- - field_id: '513' - -- field metadata -- - field_id: '51'""" + child 2, altitude: double""" ) From 41a88a7cf7ecaadec507eda85d24ba2668b182c6 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Tue, 23 Jan 2024 22:55:27 +0100 Subject: [PATCH 4/7] Fix bug --- pyiceberg/io/pyarrow.py | 26 ++++--- tests/io/test_pyarrow.py | 146 ++++++++++++++++++++++++++++++++++++--- 2 files changed, 152 insertions(+), 20 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 440d439456..a7a1925689 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -168,6 +168,7 @@ LIST_ELEMENT_NAME = "element" MAP_KEY_NAME = "key" MAP_VALUE_NAME = "value" +DOC = "doc" T = TypeVar("T") @@ -1129,8 +1130,9 @@ def _construct_field(self, field: NestedField, arrow_type: pa.DataType) -> pa.Fi name=field.name, type=arrow_type, nullable=field.optional, - # Enable this once https://github.com/apache/arrow/pull/39516 gets merged and released - # metadata={DOC: field.doc, FIELD_ID: str(field.field_id)} if field.doc else {FIELD_ID: str(field.field_id)}, + metadata={DOC: field.doc, PYARROW_PARQUET_FIELD_ID_KEY: str(field.field_id)} + if field.doc + else {PYARROW_PARQUET_FIELD_ID_KEY: str(field.field_id)}, ) def schema(self, schema: Schema, schema_partner: Optional[pa.Array], struct_result: Optional[pa.Array]) -> Optional[pa.Array]: @@ -1155,8 +1157,7 @@ def struct( else: raise ResolveError(f"Field is required, and could not be found in the file: {field}") - arr = pa.StructArray.from_arrays(arrays=field_arrays, fields=pa.struct(fields)) - return arr + return pa.StructArray.from_arrays(arrays=field_arrays, fields=pa.struct(fields)) def field(self, field: NestedField, _: Optional[pa.Array], field_array: Optional[pa.Array]) -> Optional[pa.Array]: return field_array @@ -1164,7 +1165,11 @@ def field(self, field: NestedField, _: Optional[pa.Array], field_array: Optional def list(self, list_type: ListType, list_array: Optional[pa.Array], value_array: Optional[pa.Array]) -> Optional[pa.Array]: if isinstance(list_array, pa.ListArray) and value_array is not None: arrow_field = pa.list_(self._construct_field(list_type.element_field, value_array.type)) - return pa.ListArray.from_arrays(list_array.offsets, value_array, arrow_field) + if isinstance(value_array, pa.StructArray): + # Arrow does not allow reordering of fields, therefore we have to copy the array :( + return pa.ListArray.from_arrays(list_array.offsets, value_array, arrow_field) + else: + return list_array.cast(arrow_field) else: return None @@ -1172,12 +1177,11 @@ def map( self, map_type: MapType, map_array: Optional[pa.Array], key_result: Optional[pa.Array], value_result: Optional[pa.Array] ) -> Optional[pa.Array]: if isinstance(map_array, pa.MapArray) and key_result is not None and value_result is not None: - # Enable this once https://github.com/apache/arrow/pull/39516 gets merged and released - # arrow_field = pa.map_( - # self._construct_field(map_type.key_field, key_result.type), - # self._construct_field(map_type.value_field, value_result.type), - # ) - return pa.MapArray.from_arrays(map_array.offsets, key_result, value_result) + arrow_field = pa.map_( + self._construct_field(map_type.key_field, key_result.type), + self._construct_field(map_type.value_field, value_result.type), + ) + return pa.MapArray.from_arrays(map_array.offsets, key_result, value_result, arrow_field) else: return None diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index c67ac6539f..e2aff0b7c8 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -682,6 +682,24 @@ def schema_list_of_structs() -> Schema: ) +@pytest.fixture +def schema_map_of_structs() -> Schema: + return Schema( + NestedField( + 5, + "locations", + MapType( + key_id=51, + value_id=52, + key_type=StringType(), + value_type=StructType(NestedField(511, "lat", DoubleType()), NestedField(512, "long", DoubleType())), + element_required=False, + ), + required=False, + ), + ) + + @pytest.fixture def schema_map() -> Schema: return Schema( @@ -793,6 +811,25 @@ def file_list_of_structs(schema_list_of_structs: Schema, tmpdir: str) -> str: ) +@pytest.fixture +def file_map_of_structs(schema_map_of_structs: Schema, tmpdir: str) -> str: + pyarrow_schema = schema_to_pyarrow( + schema_map_of_structs, metadata={ICEBERG_SCHEMA: bytes(schema_map_of_structs.model_dump_json(), UTF8)} + ) + return _write_table_to_file( + f"file:{tmpdir}/e.parquet", + pyarrow_schema, + pa.Table.from_pylist( + [ + {"locations": {"1": {"lat": 52.371807, "long": 4.896029}, "2": {"lat": 52.387386, "long": 4.646219}}}, + {"locations": {}}, + {"locations": {"3": {"lat": 52.078663, "long": 4.288788}, "4": {"lat": 52.387386, "long": 4.646219}}}, + ], + schema=pyarrow_schema, + ), + ) + + @pytest.fixture def file_map(schema_map: Schema, tmpdir: str) -> str: pyarrow_schema = schema_to_pyarrow(schema_map, metadata={ICEBERG_SCHEMA: bytes(schema_map.model_dump_json(), UTF8)}) @@ -916,8 +953,10 @@ def test_read_list(schema_list: Schema, file_list: str) -> None: assert ( repr(result_table.schema) - == """'ids: list - child 0, element: int32'""" + == """ids: list + child 0, element: int32 + -- field metadata -- + PARQUET:field_id: '51'""" ) @@ -930,10 +969,14 @@ def test_read_map(schema_map: Schema, file_map: str) -> None: assert ( repr(result_table.schema) - == """'properties: map - child 0, entries: struct not null + == """properties: map + child 0, entries: struct not null child 0, key: string not null - child 1, value: string'""" + -- field metadata -- + PARQUET:field_id: '51' + child 1, value: string not null + -- field metadata -- + PARQUET:field_id: '52'""" ) @@ -1067,7 +1110,13 @@ def test_projection_nested_struct_subset(file_struct: str) -> None: assert actual.as_py() == {"lat": expected} assert len(result_table.columns[0]) == 3 - assert repr(result_table.schema) == "location: struct not null\n child 0, lat: double not null" + assert ( + repr(result_table.schema) + == """location: struct not null + child 0, lat: double not null + -- field metadata -- + PARQUET:field_id: '41'""" + ) def test_projection_nested_new_field(file_struct: str) -> None: @@ -1089,7 +1138,9 @@ def test_projection_nested_new_field(file_struct: str) -> None: assert ( repr(result_table.schema) == """location: struct not null - child 0, null: double""" + child 0, null: double + -- field metadata -- + PARQUET:field_id: '43'""" ) @@ -1119,7 +1170,16 @@ def test_projection_nested_struct(schema_struct: Schema, file_struct: str) -> No assert len(result_table.columns[0]) == 3 assert ( repr(result_table.schema) - == "location: struct not null\n child 0, lat: double\n child 1, null: double\n child 2, long: double" + == """location: struct not null + child 0, lat: double + -- field metadata -- + PARQUET:field_id: '41' + child 1, null: double + -- field metadata -- + PARQUET:field_id: '43' + child 2, long: double + -- field metadata -- + PARQUET:field_id: '42'""" ) @@ -1164,8 +1224,76 @@ def test_projection_list_of_structs(schema_list_of_structs: Schema, file_list_of == """locations: list> child 0, element: struct child 0, latitude: double not null + -- field metadata -- + PARQUET:field_id: '511' child 1, longitude: double not null - child 2, altitude: double""" + -- field metadata -- + PARQUET:field_id: '512' + child 2, altitude: double + -- field metadata -- + PARQUET:field_id: '513' + -- field metadata -- + PARQUET:field_id: '51'""" + ) + + +def test_projection_maps_of_structs(schema_map_of_structs: Schema, file_map_of_structs: str) -> None: + schema = Schema( + NestedField( + 5, + "locations", + MapType( + key_id=51, + value_id=52, + key_type=StringType(), + value_type=StructType( + NestedField(511, "latitude", DoubleType()), + NestedField(512, "longitude", DoubleType()), + NestedField(513, "altitude", DoubleType(), required=False), + ), + element_required=False, + ), + required=False, + ), + ) + + result_table = project(schema, [file_map_of_structs]) + assert len(result_table.columns) == 1 + assert len(result_table.columns[0]) == 3 + for actual, expected in zip( + result_table.columns[0], + [ + [ + ("1", {"latitude": 52.371807, "longitude": 4.896029, "altitude": None}), + ("2", {"latitude": 52.387386, "longitude": 4.646219, "altitude": None}), + ], + [], + [ + ("3", {"latitude": 52.078663, "longitude": 4.288788, "altitude": None}), + ("4", {"latitude": 52.387386, "longitude": 4.646219, "altitude": None}), + ], + ], + ): + assert actual.as_py() == expected + assert ( + repr(result_table.schema) + == """locations: map> + child 0, entries: struct not null> not null + child 0, key: string not null + -- field metadata -- + PARQUET:field_id: '51' + child 1, value: struct not null + child 0, latitude: double not null + -- field metadata -- + PARQUET:field_id: '511' + child 1, longitude: double not null + -- field metadata -- + PARQUET:field_id: '512' + child 2, altitude: double + -- field metadata -- + PARQUET:field_id: '513' + -- field metadata -- + PARQUET:field_id: '52'""" ) From 2ca18f781770a1055eb95a96496addd5b9c68900 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Tue, 23 Jan 2024 23:16:25 +0100 Subject: [PATCH 5/7] Fix the map --- pyiceberg/io/pyarrow.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index a7a1925689..d7e9262486 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1181,7 +1181,11 @@ def map( self._construct_field(map_type.key_field, key_result.type), self._construct_field(map_type.value_field, value_result.type), ) - return pa.MapArray.from_arrays(map_array.offsets, key_result, value_result, arrow_field) + if isinstance(value_result, pa.StructArray): + # Arrow does not allow reordering of fields, therefore we have to copy the array :( + return pa.MapArray.from_arrays(map_array.offsets, key_result, value_result, arrow_field) + else: + return map_array.cast(arrow_field) else: return None From 17f74980a75468d3f427a546412ef16ce9e619b4 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Fri, 26 Jan 2024 00:16:22 +0100 Subject: [PATCH 6/7] The best I can do --- dev/provision.py | 8 ++-- pyiceberg/io/pyarrow.py | 18 +++++---- tests/integration/test_reads.py | 8 +++- tests/io/test_pyarrow.py | 71 ++++++++------------------------- 4 files changed, 38 insertions(+), 67 deletions(-) diff --git a/dev/provision.py b/dev/provision.py index 6bfd475228..44086caf20 100644 --- a/dev/provision.py +++ b/dev/provision.py @@ -324,8 +324,9 @@ spark.sql( f""" CREATE TABLE {catalog_name}.default.test_table_empty_list_and_map ( - col_list array, - col_map map + col_list array, + col_map map, + col_list_with_struct array> ) USING iceberg TBLPROPERTIES ( @@ -337,6 +338,7 @@ spark.sql( f""" INSERT INTO {catalog_name}.default.test_table_empty_list_and_map - VALUES (null, null) + VALUES (null, null, null), + (array(), map(), array(struct(1))) """ ) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index d7e9262486..84d290a69a 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1130,9 +1130,7 @@ def _construct_field(self, field: NestedField, arrow_type: pa.DataType) -> pa.Fi name=field.name, type=arrow_type, nullable=field.optional, - metadata={DOC: field.doc, PYARROW_PARQUET_FIELD_ID_KEY: str(field.field_id)} - if field.doc - else {PYARROW_PARQUET_FIELD_ID_KEY: str(field.field_id)}, + metadata={DOC: field.doc} if field.doc is not None else None, ) def schema(self, schema: Schema, schema_partner: Optional[pa.Array], struct_result: Optional[pa.Array]) -> Optional[pa.Array]: @@ -1164,12 +1162,16 @@ def field(self, field: NestedField, _: Optional[pa.Array], field_array: Optional def list(self, list_type: ListType, list_array: Optional[pa.Array], value_array: Optional[pa.Array]) -> Optional[pa.Array]: if isinstance(list_array, pa.ListArray) and value_array is not None: - arrow_field = pa.list_(self._construct_field(list_type.element_field, value_array.type)) if isinstance(value_array, pa.StructArray): - # Arrow does not allow reordering of fields, therefore we have to copy the array :( - return pa.ListArray.from_arrays(list_array.offsets, value_array, arrow_field) - else: - return list_array.cast(arrow_field) + # This can be removed once this has been fixed: + # https://github.com/apache/arrow/issues/38809 + list_array = pa.ListArray.from_arrays( + list_array.offsets, + value_array + ) + + arrow_field = pa.list_(self._construct_field(list_type.element_field, value_array.type)) + return list_array.cast(arrow_field) else: return None diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index 7817a3f32b..3fc06fbddc 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -435,5 +435,9 @@ def test_sanitize_character(catalog: Catalog) -> None: def test_null_list_and_map(catalog: Catalog) -> None: table_test_empty_list_and_map = catalog.load_table("default.test_table_empty_list_and_map") arrow_table = table_test_empty_list_and_map.scan().to_arrow() - assert arrow_table["col_list"].to_pylist() == [None] - assert arrow_table["col_map"].to_pylist() == [None] + assert arrow_table["col_list"].to_pylist() == [None, []] + assert arrow_table["col_map"].to_pylist() == [None, []] + # This should be: + # assert arrow_table["col_list_with_struct"].to_pylist() == [None, [{'test': 1}]] + # Once https://github.com/apache/arrow/issues/38809 has been fixed + assert arrow_table["col_list_with_struct"].to_pylist() == [[], [{'test': 1}]] diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index e2aff0b7c8..e6f4de3596 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -954,9 +954,7 @@ def test_read_list(schema_list: Schema, file_list: str) -> None: assert ( repr(result_table.schema) == """ids: list - child 0, element: int32 - -- field metadata -- - PARQUET:field_id: '51'""" + child 0, element: int32""" ) @@ -972,11 +970,7 @@ def test_read_map(schema_map: Schema, file_map: str) -> None: == """properties: map child 0, entries: struct not null child 0, key: string not null - -- field metadata -- - PARQUET:field_id: '51' - child 1, value: string not null - -- field metadata -- - PARQUET:field_id: '52'""" + child 1, value: string not null""" ) @@ -1113,9 +1107,7 @@ def test_projection_nested_struct_subset(file_struct: str) -> None: assert ( repr(result_table.schema) == """location: struct not null - child 0, lat: double not null - -- field metadata -- - PARQUET:field_id: '41'""" + child 0, lat: double not null""" ) @@ -1138,9 +1130,7 @@ def test_projection_nested_new_field(file_struct: str) -> None: assert ( repr(result_table.schema) == """location: struct not null - child 0, null: double - -- field metadata -- - PARQUET:field_id: '43'""" + child 0, null: double""" ) @@ -1172,14 +1162,8 @@ def test_projection_nested_struct(schema_struct: Schema, file_struct: str) -> No repr(result_table.schema) == """location: struct not null child 0, lat: double - -- field metadata -- - PARQUET:field_id: '41' child 1, null: double - -- field metadata -- - PARQUET:field_id: '43' - child 2, long: double - -- field metadata -- - PARQUET:field_id: '42'""" + child 2, long: double""" ) @@ -1204,36 +1188,25 @@ def test_projection_list_of_structs(schema_list_of_structs: Schema, file_list_of result_table = project(schema, [file_list_of_structs]) assert len(result_table.columns) == 1 assert len(result_table.columns[0]) == 3 - for actual, expected in zip( - result_table.columns[0], + results = [row.as_py() for row in result_table.columns[0]] + assert results == [ [ - [ - {"latitude": 52.371807, "longitude": 4.896029, "altitude": None}, - {"latitude": 52.387386, "longitude": 4.646219, "altitude": None}, - ], - [], - [ - {"latitude": 52.078663, "longitude": 4.288788, "altitude": None}, - {"latitude": 52.387386, "longitude": 4.646219, "altitude": None}, - ], + {'latitude': 52.371807, 'longitude': 4.896029, 'altitude': None}, + {'latitude': 52.387386, 'longitude': 4.646219, 'altitude': None}, ], - ): - assert actual.as_py() == expected + [], + [ + {'latitude': 52.078663, 'longitude': 4.288788, 'altitude': None}, + {'latitude': 52.387386, 'longitude': 4.646219, 'altitude': None}, + ], + ] assert ( repr(result_table.schema) == """locations: list> child 0, element: struct child 0, latitude: double not null - -- field metadata -- - PARQUET:field_id: '511' child 1, longitude: double not null - -- field metadata -- - PARQUET:field_id: '512' - child 2, altitude: double - -- field metadata -- - PARQUET:field_id: '513' - -- field metadata -- - PARQUET:field_id: '51'""" + child 2, altitude: double""" ) @@ -1280,20 +1253,10 @@ def test_projection_maps_of_structs(schema_map_of_structs: Schema, file_map_of_s == """locations: map> child 0, entries: struct not null> not null child 0, key: string not null - -- field metadata -- - PARQUET:field_id: '51' child 1, value: struct not null child 0, latitude: double not null - -- field metadata -- - PARQUET:field_id: '511' child 1, longitude: double not null - -- field metadata -- - PARQUET:field_id: '512' - child 2, altitude: double - -- field metadata -- - PARQUET:field_id: '513' - -- field metadata -- - PARQUET:field_id: '52'""" + child 2, altitude: double""" ) From cdea7f2b4c39bc37f7ed89ba73c29308b65895b2 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Fri, 26 Jan 2024 00:18:28 +0100 Subject: [PATCH 7/7] Lint --- pyiceberg/io/pyarrow.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 84d290a69a..cbfb9f6d5a 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1165,10 +1165,7 @@ def list(self, list_type: ListType, list_array: Optional[pa.Array], value_array: if isinstance(value_array, pa.StructArray): # This can be removed once this has been fixed: # https://github.com/apache/arrow/issues/38809 - list_array = pa.ListArray.from_arrays( - list_array.offsets, - value_array - ) + list_array = pa.ListArray.from_arrays(list_array.offsets, value_array) arrow_field = pa.list_(self._construct_field(list_type.element_field, value_array.type)) return list_array.cast(arrow_field)