diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index d81404f77a..aba3c173e6 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. import getpass +import socket import time from types import TracebackType from typing import ( @@ -34,10 +35,17 @@ AlreadyExistsException, FieldSchema, InvalidOperationException, + LockComponent, + LockLevel, + LockRequest, + LockResponse, + LockState, + LockType, MetaException, NoSuchObjectException, SerDeInfo, StorageDescriptor, + UnlockRequest, ) from hive_metastore.ttypes import Database as HiveDatabase from hive_metastore.ttypes import Table as HiveTable @@ -56,6 +64,7 @@ PropertiesUpdateSummary, ) from pyiceberg.exceptions import ( + CommitFailedException, NamespaceAlreadyExistsError, NamespaceNotEmptyError, NoSuchIcebergTableError, @@ -331,6 +340,15 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location: """ raise NotImplementedError + def _create_lock_request(self, database_name: str, table_name: str) -> LockRequest: + lock_component: LockComponent = LockComponent( + level=LockLevel.TABLE, type=LockType.EXCLUSIVE, dbname=database_name, tablename=table_name, isTransactional=True + ) + + lock_request: LockRequest = LockRequest(component=[lock_component], user=getpass.getuser(), hostname=socket.gethostname()) + + return lock_request + def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: """Update the table. @@ -363,15 +381,23 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons self._write_metadata(updated_metadata, current_table.io, new_metadata_location) # commit to hive - try: - with self._client as open_client: + # https://github.com/apache/hive/blob/master/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift#L1232 + with self._client as open_client: + lock: LockResponse = open_client.lock(self._create_lock_request(database_name, table_name)) + + try: + if lock.state != LockState.ACQUIRED: + raise CommitFailedException(f"Failed to acquire lock for {table_request.identifier}, state: {lock.state}") + tbl = open_client.get_table(dbname=database_name, tbl_name=table_name) tbl.parameters = _construct_parameters( metadata_location=new_metadata_location, previous_metadata_location=current_table.metadata_location ) open_client.alter_table(dbname=database_name, tbl_name=table_name, new_tbl=tbl) - except NoSuchObjectException as e: - raise NoSuchTableError(f"Table does not exist: {table_name}") from e + except NoSuchObjectException as e: + raise NoSuchTableError(f"Table does not exist: {table_name}") from e + finally: + open_client.unlock(UnlockRequest(lockid=lock.lockid)) return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location) diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index d487a6477e..c03bc78a18 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -22,10 +22,12 @@ import pyarrow.parquet as pq import pytest +from hive_metastore.ttypes import LockRequest, LockResponse, LockState, UnlockRequest from pyarrow.fs import S3FileSystem from pyiceberg.catalog import Catalog, load_catalog -from pyiceberg.exceptions import NoSuchTableError +from pyiceberg.catalog.hive import HiveCatalog, _HiveClient +from pyiceberg.exceptions import CommitFailedException, NoSuchTableError from pyiceberg.expressions import ( And, EqualTo, @@ -467,3 +469,25 @@ def test_null_list_and_map(catalog: Catalog) -> None: # assert arrow_table["col_list_with_struct"].to_pylist() == [None, [{'test': 1}]] # Once https://github.com/apache/arrow/issues/38809 has been fixed assert arrow_table["col_list_with_struct"].to_pylist() == [[], [{'test': 1}]] + + +@pytest.mark.integration +def test_hive_locking(catalog_hive: HiveCatalog) -> None: + table = create_table(catalog_hive) + + database_name: str + table_name: str + _, database_name, table_name = table.identifier + + hive_client: _HiveClient = _HiveClient(catalog_hive.properties["uri"]) + blocking_lock_request: LockRequest = catalog_hive._create_lock_request(database_name, table_name) + + with hive_client as open_client: + # Force a lock on the test table + lock: LockResponse = open_client.lock(blocking_lock_request) + assert lock.state == LockState.ACQUIRED + try: + with pytest.raises(CommitFailedException, match="(Failed to acquire lock for).*"): + table.transaction().set_properties(lock="fail").commit_transaction() + finally: + open_client.unlock(UnlockRequest(lock.lockid))