From 367a9b1fda5b419a43b2d8d3d4a1e24a01e10838 Mon Sep 17 00:00:00 2001 From: Daniel Weeks Date: Fri, 9 Feb 2024 19:33:06 -0800 Subject: [PATCH 01/12] Add hive locking for commit path --- pyiceberg/catalog/hive.py | 38 ++++++++++++++++++++++++++++++++------ 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index d81404f77a..792d8b5f4f 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. import getpass +import socket import time from types import TracebackType from typing import ( @@ -34,10 +35,17 @@ AlreadyExistsException, FieldSchema, InvalidOperationException, + LockComponent, + LockLevel, + LockRequest, + LockResponse, + LockState, + LockType, MetaException, NoSuchObjectException, SerDeInfo, StorageDescriptor, + UnlockRequest, ) from hive_metastore.ttypes import Database as HiveDatabase from hive_metastore.ttypes import Table as HiveTable @@ -56,6 +64,7 @@ PropertiesUpdateSummary, ) from pyiceberg.exceptions import ( + CommitFailedException, NamespaceAlreadyExistsError, NamespaceNotEmptyError, NoSuchIcebergTableError, @@ -67,7 +76,7 @@ from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema, SchemaVisitor, visit from pyiceberg.serializers import FromInputFile -from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, TableProperties, update_table_metadata +from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, update_table_metadata from pyiceberg.table.metadata import new_table_metadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.typedef import EMPTY_DICT @@ -155,7 +164,7 @@ def _construct_hive_storage_descriptor(schema: Schema, location: Optional[str]) PROP_TABLE_TYPE = "table_type" PROP_METADATA_LOCATION = "metadata_location" PROP_PREVIOUS_METADATA_LOCATION = "previous_metadata_location" -DEFAULT_PROPERTIES = {TableProperties.PARQUET_COMPRESSION: TableProperties.PARQUET_COMPRESSION_DEFAULT} +DEFAULT_PROPERTIES = {'write.parquet.compression-codec': 'zstd'} def _construct_parameters(metadata_location: str, previous_metadata_location: Optional[str] = None) -> Dict[str, Any]: @@ -331,6 +340,15 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location: """ raise NotImplementedError + def _create_lock_request(self, database_name: str, table_name: str) -> LockRequest: + lock_component: LockComponent = LockComponent( + level=LockLevel.TABLE, type=LockType.EXCLUSIVE, dbname=database_name, tablename=table_name, isTransactional=True + ) + + lock_request: LockRequest = LockRequest(component=[lock_component], user=getpass.getuser(), hostname=socket.gethostname()) + + return lock_request + def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: """Update the table. @@ -363,15 +381,23 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons self._write_metadata(updated_metadata, current_table.io, new_metadata_location) # commit to hive - try: - with self._client as open_client: + # https://github.com/apache/hive/blob/master/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift#L1232 + with self._client as open_client: + lock: LockResponse = open_client.lock(self._create_lock_request(database_name, table_name)) + + try: + if lock.state != LockState.ACQUIRED: + raise CommitFailedException(f"Failed to acquire lock for {table_request.identifier}, state: {lock.state}") + tbl = open_client.get_table(dbname=database_name, tbl_name=table_name) tbl.parameters = _construct_parameters( metadata_location=new_metadata_location, previous_metadata_location=current_table.metadata_location ) open_client.alter_table(dbname=database_name, tbl_name=table_name, new_tbl=tbl) - except NoSuchObjectException as e: - raise NoSuchTableError(f"Table does not exist: {table_name}") from e + except NoSuchObjectException as e: + raise NoSuchTableError(f"Table does not exist: {table_name}") from e + finally: + open_client.unlock(UnlockRequest(lockid=lock.lockid)) return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location) From ff0e8a01591fd0c51427bd3a34caa152f9148c03 Mon Sep 17 00:00:00 2001 From: Daniel Weeks Date: Fri, 9 Feb 2024 22:30:33 -0800 Subject: [PATCH 02/12] Add integration test --- tests/integration/test_reads.py | 28 +++++++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index d487a6477e..3a05d3a726 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -22,10 +22,12 @@ import pyarrow.parquet as pq import pytest +from hive_metastore.ttypes import LockRequest, LockResponse, LockState, UnlockRequest from pyarrow.fs import S3FileSystem from pyiceberg.catalog import Catalog, load_catalog -from pyiceberg.exceptions import NoSuchTableError +from pyiceberg.catalog.hive import HiveCatalog, _HiveClient +from pyiceberg.exceptions import CommitFailedException, NoSuchTableError from pyiceberg.expressions import ( And, EqualTo, @@ -467,3 +469,27 @@ def test_null_list_and_map(catalog: Catalog) -> None: # assert arrow_table["col_list_with_struct"].to_pylist() == [None, [{'test': 1}]] # Once https://github.com/apache/arrow/issues/38809 has been fixed assert arrow_table["col_list_with_struct"].to_pylist() == [[], [{'test': 1}]] + + +@pytest.mark.integration +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive')]) +def test_hive_locking(catalog: HiveCatalog) -> None: + table = create_table(catalog) + + database_name: str + table_name: str + database_name, table_name = table.identifier + + hive_client: _HiveClient = catalog._client + blocking_lock_request: LockRequest = catalog._create_lock_request(database_name, table_name) + + with hive_client as open_client: + # Force a lock on the test table + lock: LockResponse = open_client.lock(blocking_lock_request) + assert lock.state == LockState.ACQUIRED + + try: + with pytest.raises(CommitFailedException, match="Cannot acquire lock"): + table.transaction().set_properties(lock="fail").commit_transaction() + finally: + open_client.unlock(UnlockRequest(lock.lockid)) From 8eac35ba76001014e9529154413d5b6367fcb335 Mon Sep 17 00:00:00 2001 From: Daniel Weeks Date: Fri, 9 Feb 2024 22:41:14 -0800 Subject: [PATCH 03/12] Fix identifier assignment --- tests/integration/test_reads.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index 3a05d3a726..c859ab4e19 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -478,7 +478,7 @@ def test_hive_locking(catalog: HiveCatalog) -> None: database_name: str table_name: str - database_name, table_name = table.identifier + database_name, table_name = catalog.identifier_to_database_and_table(table.identifier) hive_client: _HiveClient = catalog._client blocking_lock_request: LockRequest = catalog._create_lock_request(database_name, table_name) From a49f9856d42a695878cfcc3a8fe618274282a4a5 Mon Sep 17 00:00:00 2001 From: Daniel Weeks Date: Fri, 9 Feb 2024 22:53:12 -0800 Subject: [PATCH 04/12] Fix identifer unpacking --- tests/integration/test_reads.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index c859ab4e19..4bab0aa067 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -478,7 +478,7 @@ def test_hive_locking(catalog: HiveCatalog) -> None: database_name: str table_name: str - database_name, table_name = catalog.identifier_to_database_and_table(table.identifier) + _ignored, database_name, table_name = table.identifier hive_client: _HiveClient = catalog._client blocking_lock_request: LockRequest = catalog._create_lock_request(database_name, table_name) From b9405a64cb98f53d228b931a6e6b0f5554da77bf Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Sat, 10 Feb 2024 08:00:15 +0100 Subject: [PATCH 05/12] Import properties --- pyiceberg/catalog/hive.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index 792d8b5f4f..260217e26e 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -76,7 +76,7 @@ from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema, SchemaVisitor, visit from pyiceberg.serializers import FromInputFile -from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, update_table_metadata +from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, TableProperties, update_table_metadata from pyiceberg.table.metadata import new_table_metadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.typedef import EMPTY_DICT From ce070e2a03d9b0d6d8f61289d985fc106dbda392 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Sat, 10 Feb 2024 08:00:22 +0100 Subject: [PATCH 06/12] Use properties --- pyiceberg/catalog/hive.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index 260217e26e..aba3c173e6 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -164,7 +164,7 @@ def _construct_hive_storage_descriptor(schema: Schema, location: Optional[str]) PROP_TABLE_TYPE = "table_type" PROP_METADATA_LOCATION = "metadata_location" PROP_PREVIOUS_METADATA_LOCATION = "previous_metadata_location" -DEFAULT_PROPERTIES = {'write.parquet.compression-codec': 'zstd'} +DEFAULT_PROPERTIES = {TableProperties.PARQUET_COMPRESSION: TableProperties.PARQUET_COMPRESSION_DEFAULT} def _construct_parameters(metadata_location: str, previous_metadata_location: Optional[str] = None) -> Dict[str, Any]: From 7ff3950d98282d6dc920b82a10e8fae6dda63eed Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Sat, 10 Feb 2024 08:00:37 +0100 Subject: [PATCH 07/12] Remove unnecessary fixture --- tests/integration/test_reads.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index 4bab0aa067..3b4935ff36 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -472,13 +472,12 @@ def test_null_list_and_map(catalog: Catalog) -> None: @pytest.mark.integration -@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive')]) -def test_hive_locking(catalog: HiveCatalog) -> None: - table = create_table(catalog) +def test_hive_locking(catalog_hive: HiveCatalog) -> None: + table = create_table(catalog_hive) database_name: str table_name: str - _ignored, database_name, table_name = table.identifier + _, database_name, table_name = table.identifier hive_client: _HiveClient = catalog._client blocking_lock_request: LockRequest = catalog._create_lock_request(database_name, table_name) From 6e992ba8fdee0c0d2d6f9631ae8a101701bc34da Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Sat, 10 Feb 2024 08:03:42 +0100 Subject: [PATCH 08/12] Update `catalog_hive` --- tests/integration/test_reads.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index 3b4935ff36..b56948e4c4 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -479,8 +479,8 @@ def test_hive_locking(catalog_hive: HiveCatalog) -> None: table_name: str _, database_name, table_name = table.identifier - hive_client: _HiveClient = catalog._client - blocking_lock_request: LockRequest = catalog._create_lock_request(database_name, table_name) + hive_client: _HiveClient = catalog_hive._client + blocking_lock_request: LockRequest = catalog_hive._create_lock_request(database_name, table_name) with hive_client as open_client: # Force a lock on the test table From d2c023f9e454017af50b1a2639386e9f7c4125d6 Mon Sep 17 00:00:00 2001 From: Daniel Weeks Date: Sat, 10 Feb 2024 07:14:39 -0800 Subject: [PATCH 09/12] Separate lock client from commit client --- tests/integration/test_reads.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index b56948e4c4..6521daf649 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -487,8 +487,8 @@ def test_hive_locking(catalog_hive: HiveCatalog) -> None: lock: LockResponse = open_client.lock(blocking_lock_request) assert lock.state == LockState.ACQUIRED - try: - with pytest.raises(CommitFailedException, match="Cannot acquire lock"): - table.transaction().set_properties(lock="fail").commit_transaction() - finally: - open_client.unlock(UnlockRequest(lock.lockid)) + with pytest.raises(CommitFailedException, match="Cannot acquire lock"): + table.transaction().set_properties(lock="fail").commit_transaction() + + with hive_client as open_client: + open_client.unlock(UnlockRequest(lock.lockid)) From 7f6dc46dd3bff46fd892be4f7f2f264df9d2ad8c Mon Sep 17 00:00:00 2001 From: Daniel Weeks Date: Sat, 10 Feb 2024 07:26:23 -0800 Subject: [PATCH 10/12] Fix match expression --- tests/integration/test_reads.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index 6521daf649..69c2378a94 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -487,7 +487,7 @@ def test_hive_locking(catalog_hive: HiveCatalog) -> None: lock: LockResponse = open_client.lock(blocking_lock_request) assert lock.state == LockState.ACQUIRED - with pytest.raises(CommitFailedException, match="Cannot acquire lock"): + with pytest.raises(CommitFailedException, match="(Failed to acquire lock for).*"): table.transaction().set_properties(lock="fail").commit_transaction() with hive_client as open_client: From 2b675624af473d6b899510abf9cb868aa908841a Mon Sep 17 00:00:00 2001 From: Daniel Weeks Date: Sat, 10 Feb 2024 07:34:12 -0800 Subject: [PATCH 11/12] Use separate client for locking --- tests/integration/test_reads.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index 69c2378a94..1a615c2ff6 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -479,16 +479,15 @@ def test_hive_locking(catalog_hive: HiveCatalog) -> None: table_name: str _, database_name, table_name = table.identifier - hive_client: _HiveClient = catalog_hive._client + hive_client: _HiveClient = _HiveClient(catalog_hive.properties["uri"]) blocking_lock_request: LockRequest = catalog_hive._create_lock_request(database_name, table_name) with hive_client as open_client: # Force a lock on the test table lock: LockResponse = open_client.lock(blocking_lock_request) assert lock.state == LockState.ACQUIRED - - with pytest.raises(CommitFailedException, match="(Failed to acquire lock for).*"): - table.transaction().set_properties(lock="fail").commit_transaction() - - with hive_client as open_client: - open_client.unlock(UnlockRequest(lock.lockid)) + try: + with pytest.raises(CommitFailedException, match="(Failed to acquire lock for).*",): + table.transaction().set_properties(lock="fail").commit_transaction() + finally: + open_client.unlock(UnlockRequest(lock.lockid)) From f43ebccfa6bdf415b3e7a41ab08bcedcfc7f3041 Mon Sep 17 00:00:00 2001 From: Daniel Weeks Date: Sat, 10 Feb 2024 07:40:55 -0800 Subject: [PATCH 12/12] Lint --- tests/integration/test_reads.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index 1a615c2ff6..c03bc78a18 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -487,7 +487,7 @@ def test_hive_locking(catalog_hive: HiveCatalog) -> None: lock: LockResponse = open_client.lock(blocking_lock_request) assert lock.state == LockState.ACQUIRED try: - with pytest.raises(CommitFailedException, match="(Failed to acquire lock for).*",): + with pytest.raises(CommitFailedException, match="(Failed to acquire lock for).*"): table.transaction().set_properties(lock="fail").commit_transaction() finally: open_client.unlock(UnlockRequest(lock.lockid))