Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 46 additions & 1 deletion pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -733,9 +733,33 @@ def namespace_to_string(identifier: str | Identifier, err: type[ValueError] | ty

return ".".join(segment.strip() for segment in tuple_identifier)

@abstractmethod
def supports_server_side_planning(self) -> bool:
"""Check if the catalog supports server-side scan planning."""
return False

@abstractmethod
def supports_purge_table(self) -> bool:
"""Check if the catalog supports purging tables."""

@abstractmethod
def supports_atomic_concurrent_updates(self) -> bool:
"""Check if the catalog supports atomic concurrent updates."""

@abstractmethod
def supports_nested_namespaces(self) -> bool:
"""Check if the catalog supports nested namespaces."""

@abstractmethod
def supports_schema_evolution(self) -> bool:
"""Check if the catalog supports schema evolution."""

@abstractmethod
def supports_slash_in_identifier(self) -> bool:
"""Check if the catalog supports slash in identifier."""

@abstractmethod
def supports_dot_in_identifier(self) -> bool:
"""Check if the catalog supports dot in identifier."""

@staticmethod
def identifier_to_database(
Expand Down Expand Up @@ -836,6 +860,27 @@ class MetastoreCatalog(Catalog, ABC):
def __init__(self, name: str, **properties: str):
super().__init__(name, **properties)

def supports_server_side_planning(self) -> bool:
return False

def supports_purge_table(self) -> bool:
return True

def supports_atomic_concurrent_updates(self) -> bool:
return True

def supports_nested_namespaces(self) -> bool:
return True

def supports_schema_evolution(self) -> bool:
return True

def supports_slash_in_identifier(self) -> bool:
return True

def supports_dot_in_identifier(self) -> bool:
return True

def create_table_transaction(
self,
identifier: str | Identifier,
Expand Down
3 changes: 3 additions & 0 deletions pyiceberg/catalog/bigquery_metastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ def __init__(self, name: str, **properties: str):
self.location = location
self.project_id = project_id

def supports_nested_namespaces(self) -> bool:
return False

def create_table(
self,
identifier: str | Identifier,
Expand Down
3 changes: 3 additions & 0 deletions pyiceberg/catalog/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ def __init__(self, name: str, client: Optional["DynamoDBClient"] = None, **prope
self.dynamodb_table_name = self.properties.get(DYNAMODB_TABLE_NAME, DYNAMODB_TABLE_NAME_DEFAULT)
self._ensure_catalog_table_exists_or_create()

def supports_nested_namespaces(self) -> bool:
return False

def _ensure_catalog_table_exists_or_create(self) -> None:
if self._dynamodb_table_exists():
return None
Expand Down
3 changes: 3 additions & 0 deletions pyiceberg/catalog/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,9 @@ def __init__(self, name: str, client: Optional["GlueClient"] = None, **propertie
if glue_catalog_id := properties.get(GLUE_ID):
_register_glue_catalog_id_with_glue_client(self.glue, glue_catalog_id)

def supports_nested_namespaces(self) -> bool:
return False

def _convert_glue_to_iceberg(self, glue_table: "TableTypeDef") -> Table:
if (database_name := glue_table.get("DatabaseName")) is None:
raise ValueError("Glue table is missing DatabaseName property")
Expand Down
20 changes: 19 additions & 1 deletion pyiceberg/catalog/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,24 @@ def drop_table(self, identifier: str | Identifier) -> None:
# When the namespace doesn't exist, it throws the same error
raise NoSuchTableError(f"Table does not exists: {table_name}") from e

def supports_purge_table(self) -> bool:
return False

def supports_atomic_concurrent_updates(self) -> bool:
return False

def supports_nested_namespaces(self) -> bool:
return False

def supports_schema_evolution(self) -> bool:
return False

def supports_slash_in_identifier(self) -> bool:
return False

def supports_dot_in_identifier(self) -> bool:
return False

def purge_table(self, identifier: str | Identifier) -> None:
# This requires to traverse the reachability set, and drop all the data files.
raise NotImplementedError("Not yet implemented")
Expand Down Expand Up @@ -729,7 +747,7 @@ def drop_namespace(self, namespace: str | Identifier) -> None:
open_client.drop_database(database_name, deleteData=False, cascade=False)
except InvalidOperationException as e:
raise NamespaceNotEmptyError(f"Database {database_name} is not empty") from e
except MetaException as e:
except (MetaException, NoSuchObjectException) as e:
raise NoSuchNamespaceError(f"Database does not exists: {database_name}") from e

def list_tables(self, namespace: str | Identifier) -> list[Identifier]:
Expand Down
21 changes: 21 additions & 0 deletions pyiceberg/catalog/noop.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,27 @@ def register_table(self, identifier: str | Identifier, metadata_location: str) -
def drop_table(self, identifier: str | Identifier) -> None:
raise NotImplementedError

def supports_server_side_planning(self) -> bool:
raise NotImplementedError

def supports_purge_table(self) -> bool:
raise NotImplementedError

def supports_atomic_concurrent_updates(self) -> bool:
raise NotImplementedError

def supports_nested_namespaces(self) -> bool:
raise NotImplementedError

def supports_schema_evolution(self) -> bool:
raise NotImplementedError

def supports_slash_in_identifier(self) -> bool:
raise NotImplementedError

def supports_dot_in_identifier(self) -> bool:
raise NotImplementedError

def purge_table(self, identifier: str | Identifier) -> None:
raise NotImplementedError

Expand Down
24 changes: 24 additions & 0 deletions pyiceberg/catalog/rest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,30 @@ def supports_server_side_planning(self) -> bool:
self.properties, REST_SCAN_PLANNING_ENABLED, REST_SCAN_PLANNING_ENABLED_DEFAULT
)

def supports_purge_table(self) -> bool:
"""Check if the catalog supports purging tables."""
return property_as_bool(self.properties, "supports_purge_table", True)

def supports_atomic_concurrent_updates(self) -> bool:
"""Check if the catalog supports atomic concurrent updates."""
return property_as_bool(self.properties, "supports_atomic_concurrent_updates", True)

def supports_nested_namespaces(self) -> bool:
"""Check if the catalog supports nested namespaces."""
return property_as_bool(self.properties, "supports_nested_namespaces", True)

def supports_schema_evolution(self) -> bool:
"""Check if the catalog supports schema evolution."""
return property_as_bool(self.properties, "supports_schema_evolution", True)

def supports_slash_in_identifier(self) -> bool:
"""Check if the catalog supports slash in identifier."""
return property_as_bool(self.properties, "supports_slash_in_identifier", True)

def supports_dot_in_identifier(self) -> bool:
"""Check if the catalog supports dot in identifier."""
return property_as_bool(self.properties, "supports_dot_in_identifier", True)

@retry(**_RETRY_ARGS)
def _plan_table_scan(self, identifier: str | Identifier, request: PlanTableScanRequest) -> PlanningResponse:
"""Submit a scan plan request to the REST server.
Expand Down
6 changes: 6 additions & 0 deletions pyiceberg/catalog/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,12 @@ def create_tables(self) -> None:
def destroy_tables(self) -> None:
SqlCatalogBaseTable.metadata.drop_all(self.engine)

def supports_slash_in_identifier(self) -> bool:
return False

def supports_dot_in_identifier(self) -> bool:
return False

def _convert_orm_to_iceberg(self, orm_table: IcebergTables) -> Table:
# Check for expected properties.
if not (metadata_location := orm_table.metadata_location):
Expand Down
32 changes: 14 additions & 18 deletions tests/integration/test_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,8 @@ def test_drop_table(test_catalog: Catalog, table_schema_nested: Schema, table_na
@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_purge_table(test_catalog: Catalog, table_schema_nested: Schema, table_name: str, database_name: str) -> None:
if isinstance(test_catalog, HiveCatalog):
pytest.skip("HiveCatalog does not support purge_table operation yet")
if not test_catalog.supports_purge_table():
pytest.skip("Catalog does not support purge_table operation")

identifier = (database_name, table_name)
test_catalog.create_namespace(database_name)
Expand Down Expand Up @@ -299,8 +299,8 @@ def test_update_table_transaction(test_catalog: Catalog, test_schema: Schema, ta
@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_update_schema_conflict(test_catalog: Catalog, test_schema: Schema, table_name: str, database_name: str) -> None:
if isinstance(test_catalog, HiveCatalog):
pytest.skip("HiveCatalog fails in this test, need to investigate")
if not test_catalog.supports_atomic_concurrent_updates():
pytest.skip("Catalog does not support atomic concurrent updates")

identifier = (database_name, table_name)

Expand Down Expand Up @@ -646,8 +646,8 @@ def test_rest_custom_namespace_separator(rest_catalog: RestCatalog, table_schema
def test_incompatible_partitioned_schema_evolution(
test_catalog: Catalog, test_schema: Schema, test_partition_spec: PartitionSpec, database_name: str, table_name: str
) -> None:
if isinstance(test_catalog, HiveCatalog):
pytest.skip("HiveCatalog does not support schema evolution")
if not test_catalog.supports_schema_evolution():
pytest.skip(f"{type(test_catalog).__name__} does not support schema evolution")

identifier = (database_name, table_name)
test_catalog.create_namespace(database_name)
Expand Down Expand Up @@ -675,7 +675,7 @@ def test_incompatible_partitioned_schema_evolution(
@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_namespace_with_slash(test_catalog: Catalog) -> None:
if isinstance(test_catalog, HiveCatalog):
if not test_catalog.supports_slash_in_identifier():
pytest.skip(f"{type(test_catalog).__name__} does not support slash in namespace")

namespace = ("new/db",)
Expand All @@ -700,8 +700,8 @@ def test_namespace_with_slash(test_catalog: Catalog) -> None:
def test_incompatible_sorted_schema_evolution(
test_catalog: Catalog, test_schema: Schema, test_sort_order: SortOrder, database_name: str, table_name: str
) -> None:
if isinstance(test_catalog, HiveCatalog):
pytest.skip("HiveCatalog does not support schema evolution")
if not test_catalog.supports_schema_evolution():
pytest.skip(f"{type(test_catalog).__name__} does not support schema evolution")

identifier = (database_name, table_name)
test_catalog.create_namespace(database_name)
Expand All @@ -720,7 +720,7 @@ def test_incompatible_sorted_schema_evolution(
@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_namespace_with_dot(test_catalog: Catalog) -> None:
if isinstance(test_catalog, (HiveCatalog, SqlCatalog)):
if not test_catalog.supports_dot_in_identifier():
pytest.skip(f"{type(test_catalog).__name__} does not support dot in namespace")

namespace = ("new.db",)
Expand All @@ -733,9 +733,8 @@ def test_namespace_with_dot(test_catalog: Catalog) -> None:
test_catalog.create_namespace(namespace)
assert test_catalog.namespace_exists(namespace)

# REST Catalog fixture treats this as a hierarchical namespace.
# Calling list namespaces will get `new`, not `new.db`.
if isinstance(test_catalog, RestCatalog):
# Hierarchical catalogs might treat this as multiple levels.
if test_catalog.supports_nested_namespaces():
namespaces = test_catalog.list_namespaces()
assert ("new",) in namespaces or ("new.db",) in namespaces
else:
Expand All @@ -751,7 +750,7 @@ def test_namespace_with_dot(test_catalog: Catalog) -> None:
@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_table_name_with_slash(test_catalog: Catalog, table_schema_simple: Schema) -> None:
if isinstance(test_catalog, (HiveCatalog, SqlCatalog)):
if not test_catalog.supports_slash_in_identifier():
pytest.skip(f"{type(test_catalog).__name__} does not support slash in table name")

namespace = ("ns_slash",)
Expand All @@ -778,7 +777,7 @@ def test_table_name_with_slash(test_catalog: Catalog, table_schema_simple: Schem
@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_table_name_with_dot(test_catalog: Catalog, table_schema_simple: Schema) -> None:
if isinstance(test_catalog, (HiveCatalog, SqlCatalog)):
if not test_catalog.supports_dot_in_identifier():
pytest.skip(f"{type(test_catalog).__name__} does not support dot in table name")

namespace = ("ns_dot",)
Expand Down Expand Up @@ -817,9 +816,6 @@ def test_drop_missing_table(test_catalog: Catalog, database_name: str) -> None:
@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_drop_nonexistent_namespace(test_catalog: Catalog) -> None:
if isinstance(test_catalog, HiveCatalog):
pytest.skip("HiveCatalog raises NoSuchObjectException instead of NoSuchNamespaceError")

namespace = ("non_existent_namespace",)
with pytest.raises(NoSuchNamespaceError):
test_catalog.drop_namespace(namespace)