From cb8f7915c16805848020dd0fb1cabcb2bf4c6a7e Mon Sep 17 00:00:00 2001 From: Alex Stephen Date: Thu, 29 Jan 2026 13:40:34 -0800 Subject: [PATCH] Add catalog properties --- pyiceberg/catalog/__init__.py | 47 ++++++++++++++++++++++++- pyiceberg/catalog/bigquery_metastore.py | 3 ++ pyiceberg/catalog/dynamodb.py | 3 ++ pyiceberg/catalog/glue.py | 3 ++ pyiceberg/catalog/hive.py | 20 ++++++++++- pyiceberg/catalog/noop.py | 21 +++++++++++ pyiceberg/catalog/rest/__init__.py | 24 +++++++++++++ pyiceberg/catalog/sql.py | 6 ++++ tests/integration/test_catalog.py | 32 ++++++++--------- 9 files changed, 139 insertions(+), 20 deletions(-) diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index 315521555e..d6bd589f19 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -733,9 +733,33 @@ def namespace_to_string(identifier: str | Identifier, err: type[ValueError] | ty return ".".join(segment.strip() for segment in tuple_identifier) + @abstractmethod def supports_server_side_planning(self) -> bool: """Check if the catalog supports server-side scan planning.""" - return False + + @abstractmethod + def supports_purge_table(self) -> bool: + """Check if the catalog supports purging tables.""" + + @abstractmethod + def supports_atomic_concurrent_updates(self) -> bool: + """Check if the catalog supports atomic concurrent updates.""" + + @abstractmethod + def supports_nested_namespaces(self) -> bool: + """Check if the catalog supports nested namespaces.""" + + @abstractmethod + def supports_schema_evolution(self) -> bool: + """Check if the catalog supports schema evolution.""" + + @abstractmethod + def supports_slash_in_identifier(self) -> bool: + """Check if the catalog supports slash in identifier.""" + + @abstractmethod + def supports_dot_in_identifier(self) -> bool: + """Check if the catalog supports dot in identifier.""" @staticmethod def identifier_to_database( @@ -836,6 +860,27 @@ class MetastoreCatalog(Catalog, ABC): def __init__(self, name: str, **properties: str): super().__init__(name, **properties) + def supports_server_side_planning(self) -> bool: + return False + + def supports_purge_table(self) -> bool: + return True + + def supports_atomic_concurrent_updates(self) -> bool: + return True + + def supports_nested_namespaces(self) -> bool: + return True + + def supports_schema_evolution(self) -> bool: + return True + + def supports_slash_in_identifier(self) -> bool: + return True + + def supports_dot_in_identifier(self) -> bool: + return True + def create_table_transaction( self, identifier: str | Identifier, diff --git a/pyiceberg/catalog/bigquery_metastore.py b/pyiceberg/catalog/bigquery_metastore.py index b762c1047c..ee7cb1a264 100644 --- a/pyiceberg/catalog/bigquery_metastore.py +++ b/pyiceberg/catalog/bigquery_metastore.py @@ -98,6 +98,9 @@ def __init__(self, name: str, **properties: str): self.location = location self.project_id = project_id + def supports_nested_namespaces(self) -> bool: + return False + def create_table( self, identifier: str | Identifier, diff --git a/pyiceberg/catalog/dynamodb.py b/pyiceberg/catalog/dynamodb.py index 2d35b2c5e2..843b39fd8c 100644 --- a/pyiceberg/catalog/dynamodb.py +++ b/pyiceberg/catalog/dynamodb.py @@ -117,6 +117,9 @@ def __init__(self, name: str, client: Optional["DynamoDBClient"] = None, **prope self.dynamodb_table_name = self.properties.get(DYNAMODB_TABLE_NAME, DYNAMODB_TABLE_NAME_DEFAULT) self._ensure_catalog_table_exists_or_create() + def supports_nested_namespaces(self) -> bool: + return False + def _ensure_catalog_table_exists_or_create(self) -> None: if self._dynamodb_table_exists(): return None diff --git a/pyiceberg/catalog/glue.py b/pyiceberg/catalog/glue.py index 5c09cdbd0f..6c5a8789f7 100644 --- a/pyiceberg/catalog/glue.py +++ b/pyiceberg/catalog/glue.py @@ -350,6 +350,9 @@ def __init__(self, name: str, client: Optional["GlueClient"] = None, **propertie if glue_catalog_id := properties.get(GLUE_ID): _register_glue_catalog_id_with_glue_client(self.glue, glue_catalog_id) + def supports_nested_namespaces(self) -> bool: + return False + def _convert_glue_to_iceberg(self, glue_table: "TableTypeDef") -> Table: if (database_name := glue_table.get("DatabaseName")) is None: raise ValueError("Glue table is missing DatabaseName property") diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index 1bec186ca8..72d10f5fc6 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -650,6 +650,24 @@ def drop_table(self, identifier: str | Identifier) -> None: # When the namespace doesn't exist, it throws the same error raise NoSuchTableError(f"Table does not exists: {table_name}") from e + def supports_purge_table(self) -> bool: + return False + + def supports_atomic_concurrent_updates(self) -> bool: + return False + + def supports_nested_namespaces(self) -> bool: + return False + + def supports_schema_evolution(self) -> bool: + return False + + def supports_slash_in_identifier(self) -> bool: + return False + + def supports_dot_in_identifier(self) -> bool: + return False + def purge_table(self, identifier: str | Identifier) -> None: # This requires to traverse the reachability set, and drop all the data files. raise NotImplementedError("Not yet implemented") @@ -729,7 +747,7 @@ def drop_namespace(self, namespace: str | Identifier) -> None: open_client.drop_database(database_name, deleteData=False, cascade=False) except InvalidOperationException as e: raise NamespaceNotEmptyError(f"Database {database_name} is not empty") from e - except MetaException as e: + except (MetaException, NoSuchObjectException) as e: raise NoSuchNamespaceError(f"Database does not exists: {database_name}") from e def list_tables(self, namespace: str | Identifier) -> list[Identifier]: diff --git a/pyiceberg/catalog/noop.py b/pyiceberg/catalog/noop.py index 0dc6fdb7f4..09906ce169 100644 --- a/pyiceberg/catalog/noop.py +++ b/pyiceberg/catalog/noop.py @@ -85,6 +85,27 @@ def register_table(self, identifier: str | Identifier, metadata_location: str) - def drop_table(self, identifier: str | Identifier) -> None: raise NotImplementedError + def supports_server_side_planning(self) -> bool: + raise NotImplementedError + + def supports_purge_table(self) -> bool: + raise NotImplementedError + + def supports_atomic_concurrent_updates(self) -> bool: + raise NotImplementedError + + def supports_nested_namespaces(self) -> bool: + raise NotImplementedError + + def supports_schema_evolution(self) -> bool: + raise NotImplementedError + + def supports_slash_in_identifier(self) -> bool: + raise NotImplementedError + + def supports_dot_in_identifier(self) -> bool: + raise NotImplementedError + def purge_table(self, identifier: str | Identifier) -> None: raise NotImplementedError diff --git a/pyiceberg/catalog/rest/__init__.py b/pyiceberg/catalog/rest/__init__.py index 802be28565..bd535a1407 100644 --- a/pyiceberg/catalog/rest/__init__.py +++ b/pyiceberg/catalog/rest/__init__.py @@ -403,6 +403,30 @@ def supports_server_side_planning(self) -> bool: self.properties, REST_SCAN_PLANNING_ENABLED, REST_SCAN_PLANNING_ENABLED_DEFAULT ) + def supports_purge_table(self) -> bool: + """Check if the catalog supports purging tables.""" + return property_as_bool(self.properties, "supports_purge_table", True) + + def supports_atomic_concurrent_updates(self) -> bool: + """Check if the catalog supports atomic concurrent updates.""" + return property_as_bool(self.properties, "supports_atomic_concurrent_updates", True) + + def supports_nested_namespaces(self) -> bool: + """Check if the catalog supports nested namespaces.""" + return property_as_bool(self.properties, "supports_nested_namespaces", True) + + def supports_schema_evolution(self) -> bool: + """Check if the catalog supports schema evolution.""" + return property_as_bool(self.properties, "supports_schema_evolution", True) + + def supports_slash_in_identifier(self) -> bool: + """Check if the catalog supports slash in identifier.""" + return property_as_bool(self.properties, "supports_slash_in_identifier", True) + + def supports_dot_in_identifier(self) -> bool: + """Check if the catalog supports dot in identifier.""" + return property_as_bool(self.properties, "supports_dot_in_identifier", True) + @retry(**_RETRY_ARGS) def _plan_table_scan(self, identifier: str | Identifier, request: PlanTableScanRequest) -> PlanningResponse: """Submit a scan plan request to the REST server. diff --git a/pyiceberg/catalog/sql.py b/pyiceberg/catalog/sql.py index dc24abcf96..e346026d19 100644 --- a/pyiceberg/catalog/sql.py +++ b/pyiceberg/catalog/sql.py @@ -149,6 +149,12 @@ def create_tables(self) -> None: def destroy_tables(self) -> None: SqlCatalogBaseTable.metadata.drop_all(self.engine) + def supports_slash_in_identifier(self) -> bool: + return False + + def supports_dot_in_identifier(self) -> bool: + return False + def _convert_orm_to_iceberg(self, orm_table: IcebergTables) -> Table: # Check for expected properties. if not (metadata_location := orm_table.metadata_location): diff --git a/tests/integration/test_catalog.py b/tests/integration/test_catalog.py index 0e39beb241..6b037c8569 100644 --- a/tests/integration/test_catalog.py +++ b/tests/integration/test_catalog.py @@ -246,8 +246,8 @@ def test_drop_table(test_catalog: Catalog, table_schema_nested: Schema, table_na @pytest.mark.integration @pytest.mark.parametrize("test_catalog", CATALOGS) def test_purge_table(test_catalog: Catalog, table_schema_nested: Schema, table_name: str, database_name: str) -> None: - if isinstance(test_catalog, HiveCatalog): - pytest.skip("HiveCatalog does not support purge_table operation yet") + if not test_catalog.supports_purge_table(): + pytest.skip("Catalog does not support purge_table operation") identifier = (database_name, table_name) test_catalog.create_namespace(database_name) @@ -299,8 +299,8 @@ def test_update_table_transaction(test_catalog: Catalog, test_schema: Schema, ta @pytest.mark.integration @pytest.mark.parametrize("test_catalog", CATALOGS) def test_update_schema_conflict(test_catalog: Catalog, test_schema: Schema, table_name: str, database_name: str) -> None: - if isinstance(test_catalog, HiveCatalog): - pytest.skip("HiveCatalog fails in this test, need to investigate") + if not test_catalog.supports_atomic_concurrent_updates(): + pytest.skip("Catalog does not support atomic concurrent updates") identifier = (database_name, table_name) @@ -646,8 +646,8 @@ def test_rest_custom_namespace_separator(rest_catalog: RestCatalog, table_schema def test_incompatible_partitioned_schema_evolution( test_catalog: Catalog, test_schema: Schema, test_partition_spec: PartitionSpec, database_name: str, table_name: str ) -> None: - if isinstance(test_catalog, HiveCatalog): - pytest.skip("HiveCatalog does not support schema evolution") + if not test_catalog.supports_schema_evolution(): + pytest.skip(f"{type(test_catalog).__name__} does not support schema evolution") identifier = (database_name, table_name) test_catalog.create_namespace(database_name) @@ -675,7 +675,7 @@ def test_incompatible_partitioned_schema_evolution( @pytest.mark.integration @pytest.mark.parametrize("test_catalog", CATALOGS) def test_namespace_with_slash(test_catalog: Catalog) -> None: - if isinstance(test_catalog, HiveCatalog): + if not test_catalog.supports_slash_in_identifier(): pytest.skip(f"{type(test_catalog).__name__} does not support slash in namespace") namespace = ("new/db",) @@ -700,8 +700,8 @@ def test_namespace_with_slash(test_catalog: Catalog) -> None: def test_incompatible_sorted_schema_evolution( test_catalog: Catalog, test_schema: Schema, test_sort_order: SortOrder, database_name: str, table_name: str ) -> None: - if isinstance(test_catalog, HiveCatalog): - pytest.skip("HiveCatalog does not support schema evolution") + if not test_catalog.supports_schema_evolution(): + pytest.skip(f"{type(test_catalog).__name__} does not support schema evolution") identifier = (database_name, table_name) test_catalog.create_namespace(database_name) @@ -720,7 +720,7 @@ def test_incompatible_sorted_schema_evolution( @pytest.mark.integration @pytest.mark.parametrize("test_catalog", CATALOGS) def test_namespace_with_dot(test_catalog: Catalog) -> None: - if isinstance(test_catalog, (HiveCatalog, SqlCatalog)): + if not test_catalog.supports_dot_in_identifier(): pytest.skip(f"{type(test_catalog).__name__} does not support dot in namespace") namespace = ("new.db",) @@ -733,9 +733,8 @@ def test_namespace_with_dot(test_catalog: Catalog) -> None: test_catalog.create_namespace(namespace) assert test_catalog.namespace_exists(namespace) - # REST Catalog fixture treats this as a hierarchical namespace. - # Calling list namespaces will get `new`, not `new.db`. - if isinstance(test_catalog, RestCatalog): + # Hierarchical catalogs might treat this as multiple levels. + if test_catalog.supports_nested_namespaces(): namespaces = test_catalog.list_namespaces() assert ("new",) in namespaces or ("new.db",) in namespaces else: @@ -751,7 +750,7 @@ def test_namespace_with_dot(test_catalog: Catalog) -> None: @pytest.mark.integration @pytest.mark.parametrize("test_catalog", CATALOGS) def test_table_name_with_slash(test_catalog: Catalog, table_schema_simple: Schema) -> None: - if isinstance(test_catalog, (HiveCatalog, SqlCatalog)): + if not test_catalog.supports_slash_in_identifier(): pytest.skip(f"{type(test_catalog).__name__} does not support slash in table name") namespace = ("ns_slash",) @@ -778,7 +777,7 @@ def test_table_name_with_slash(test_catalog: Catalog, table_schema_simple: Schem @pytest.mark.integration @pytest.mark.parametrize("test_catalog", CATALOGS) def test_table_name_with_dot(test_catalog: Catalog, table_schema_simple: Schema) -> None: - if isinstance(test_catalog, (HiveCatalog, SqlCatalog)): + if not test_catalog.supports_dot_in_identifier(): pytest.skip(f"{type(test_catalog).__name__} does not support dot in table name") namespace = ("ns_dot",) @@ -817,9 +816,6 @@ def test_drop_missing_table(test_catalog: Catalog, database_name: str) -> None: @pytest.mark.integration @pytest.mark.parametrize("test_catalog", CATALOGS) def test_drop_nonexistent_namespace(test_catalog: Catalog) -> None: - if isinstance(test_catalog, HiveCatalog): - pytest.skip("HiveCatalog raises NoSuchObjectException instead of NoSuchNamespaceError") - namespace = ("non_existent_namespace",) with pytest.raises(NoSuchNamespaceError): test_catalog.drop_namespace(namespace)