diff --git a/src/metaxy/metadata_store/lancedb.py b/src/metaxy/metadata_store/lancedb.py index f9af38b08..65c9d3307 100644 --- a/src/metaxy/metadata_store/lancedb.py +++ b/src/metaxy/metadata_store/lancedb.py @@ -15,6 +15,7 @@ from metaxy._utils import collect_to_polars from metaxy.metadata_store.base import MetadataStore, MetadataStoreConfig +from metaxy.metadata_store.exceptions import FeatureNotFoundError from metaxy.metadata_store.types import AccessMode from metaxy.metadata_store.utils import is_local_path, sanitize_uri from metaxy.models.types import CoercibleToFeatureKey, FeatureKey @@ -315,10 +316,140 @@ def _delete_metadata_impl( feature_key: FeatureKey, filter_expr: nw.Expr, ) -> None: - """Hard delete not yet supported for LanceDB backend.""" - raise NotImplementedError( - f"{self.__class__.__name__} does not yet implement hard delete" - ) + """Physically delete rows from LanceDB table (hard delete only). + + This method is ONLY called for hard deletes (soft=False). + Soft deletes are append-only: they just write tombstones without + calling this method. + + Implementation: Uses LanceDB's native delete() operation for efficient + in-place deletion without rewriting the entire table. + + Args: + feature_key: Feature to delete from + filter_expr: Narwhals expression to select rows to delete + """ + table_name = self._table_name(feature_key) + if not self._table_exists(table_name): + raise FeatureNotFoundError( + f"Feature {feature_key.to_string()} not found in store; cannot hard delete." + ) + + table = self._get_table(table_name) + + # Convert Narwhals expression to SQL WHERE clause using Ibis + filter_str = self._narwhals_expr_to_sql(filter_expr, table) + + # Use native LanceDB delete for efficient in-place deletion + table.delete(filter_str) + + def _narwhals_expr_to_sql(self, expr: nw.Expr, table: Any) -> str: + """Convert a Narwhals expression to SQL WHERE clause string for LanceDB. + + Uses Ibis to compile Narwhals expressions to proper SQL, then strips + identifier quoting since LanceDB doesn't support quoted column names. + + Args: + expr: Narwhals expression (e.g., nw.col("status") == "inactive") + table: LanceDB table to get schema from + + Returns: + SQL WHERE clause string (e.g., "status = 'inactive'") + """ + import ibis + import sqlglot + from sqlglot import exp + + # Create a temporary in-memory DuckDB connection for SQL compilation + # This is lightweight and doesn't require an actual database + temp_conn = ibis.duckdb.connect() + + # Get a sample of data from Lance table to create schema + lance_df = table.to_polars().head(1) + + # Create temporary Ibis table for SQL compilation + ibis_table = temp_conn.create_table("temp_table", lance_df, overwrite=True) + + # Apply the Narwhals filter to the Ibis table + filtered = nw.from_native(ibis_table, eager_only=False).filter(expr).to_native() + + # Parse the SQL and extract WHERE clause + sql = ibis.to_sql(filtered) + parsed = sqlglot.parse_one(sql) + where_expr = parsed.args.get("where") + predicate = where_expr.this if where_expr else exp.true() + + # Strip table qualifiers and unquote identifiers for LanceDB compatibility + def _strip_table_and_unquote(node: exp.Expression) -> exp.Expression: + if isinstance(node, exp.Column): + cleaned = node.copy() + cleaned.set("table", None) + # Unquote the identifier (LanceDB doesn't support quoted column names) + if isinstance(cleaned.this, exp.Identifier): + cleaned.this.set("quoted", False) + return cleaned + return node + + normalized = predicate.transform(_strip_table_and_unquote) + + # Transform MAKE_TIMESTAMPTZ calls to arrow_cast with ISO timestamps + # LanceDB/DataFusion doesn't support MAKE_TIMESTAMPTZ, but accepts arrow_cast + def _convert_timestamptz(node: exp.Expression) -> exp.Expression: + if ( + isinstance(node, exp.Anonymous) + and node.name.upper() == "MAKE_TIMESTAMPTZ" + ): + # Extract arguments: year, month, day, hour, minute, second, timezone + args = node.expressions + if len(args) >= 6: + year = ( + int(args[0].this) + if hasattr(args[0], "this") + else int(str(args[0])) + ) + month = ( + int(args[1].this) + if hasattr(args[1], "this") + else int(str(args[1])) + ) + day = ( + int(args[2].this) + if hasattr(args[2], "this") + else int(str(args[2])) + ) + hour = ( + int(args[3].this) + if hasattr(args[3], "this") + else int(str(args[3])) + ) + minute = ( + int(args[4].this) + if hasattr(args[4], "this") + else int(str(args[4])) + ) + # Second may be a float + second_val = ( + args[5].this if hasattr(args[5], "this") else str(args[5]) + ) + second = float(second_val) + # Format as ISO timestamp string + iso_str = f"{year:04d}-{month:02d}-{day:02d}T{hour:02d}:{minute:02d}:{second:06.3f}Z" + # Use arrow_cast to convert string to Timestamp with timezone + # DataFusion syntax: arrow_cast('2024-01-01T00:00:00.000Z', 'Timestamp(Microsecond, Some("UTC"))') + return exp.Anonymous( + this="arrow_cast", + expressions=[ + exp.Literal.string(iso_str), + exp.Literal.string('Timestamp(Microsecond, Some("UTC"))'), + ], + ) + return node + + normalized = normalized.transform(_convert_timestamptz) + where_clause = normalized.sql() + + temp_conn.disconnect() + return where_clause def read_metadata_in_store( self, diff --git a/tests/metadata_stores/test_provenance_golden_reference.py b/tests/metadata_stores/test_provenance_golden_reference.py index 385a231f4..86eca0e1a 100644 --- a/tests/metadata_stores/test_provenance_golden_reference.py +++ b/tests/metadata_stores/test_provenance_golden_reference.py @@ -1096,7 +1096,6 @@ def test_hard_delete_memory_store_only(any_store: MetadataStore): unsupported = { "ClickHouseMetadataStore", - "LanceDBMetadataStore", } if any_store.__class__.__name__ in unsupported: pytest.xfail(f"Hard delete pending for {any_store.__class__.__name__}")