Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 135 additions & 4 deletions src/metaxy/metadata_store/lancedb.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from metaxy._utils import collect_to_polars
from metaxy.metadata_store.base import MetadataStore, MetadataStoreConfig
from metaxy.metadata_store.exceptions import FeatureNotFoundError
from metaxy.metadata_store.types import AccessMode
from metaxy.metadata_store.utils import is_local_path, sanitize_uri
from metaxy.models.types import CoercibleToFeatureKey, FeatureKey
Expand Down Expand Up @@ -315,10 +316,140 @@ def _delete_metadata_impl(
feature_key: FeatureKey,
filter_expr: nw.Expr,
) -> None:
"""Hard delete not yet supported for LanceDB backend."""
raise NotImplementedError(
f"{self.__class__.__name__} does not yet implement hard delete"
)
"""Physically delete rows from LanceDB table (hard delete only).

This method is ONLY called for hard deletes (soft=False).
Soft deletes are append-only: they just write tombstones without
calling this method.

Implementation: Uses LanceDB's native delete() operation for efficient
in-place deletion without rewriting the entire table.

Args:
feature_key: Feature to delete from
filter_expr: Narwhals expression to select rows to delete
"""
table_name = self._table_name(feature_key)
if not self._table_exists(table_name):
raise FeatureNotFoundError(
f"Feature {feature_key.to_string()} not found in store; cannot hard delete."
)

table = self._get_table(table_name)

# Convert Narwhals expression to SQL WHERE clause using Ibis
filter_str = self._narwhals_expr_to_sql(filter_expr, table)

# Use native LanceDB delete for efficient in-place deletion
table.delete(filter_str)

def _narwhals_expr_to_sql(self, expr: nw.Expr, table: Any) -> str:
"""Convert a Narwhals expression to SQL WHERE clause string for LanceDB.

Uses Ibis to compile Narwhals expressions to proper SQL, then strips
identifier quoting since LanceDB doesn't support quoted column names.

Args:
expr: Narwhals expression (e.g., nw.col("status") == "inactive")
table: LanceDB table to get schema from

Returns:
SQL WHERE clause string (e.g., "status = 'inactive'")
"""
import ibis
import sqlglot
from sqlglot import exp

# Create a temporary in-memory DuckDB connection for SQL compilation
# This is lightweight and doesn't require an actual database
temp_conn = ibis.duckdb.connect()

# Get a sample of data from Lance table to create schema
lance_df = table.to_polars().head(1)

# Create temporary Ibis table for SQL compilation
ibis_table = temp_conn.create_table("temp_table", lance_df, overwrite=True)

# Apply the Narwhals filter to the Ibis table
filtered = nw.from_native(ibis_table, eager_only=False).filter(expr).to_native()

# Parse the SQL and extract WHERE clause
sql = ibis.to_sql(filtered)
parsed = sqlglot.parse_one(sql)
where_expr = parsed.args.get("where")
predicate = where_expr.this if where_expr else exp.true()

# Strip table qualifiers and unquote identifiers for LanceDB compatibility
def _strip_table_and_unquote(node: exp.Expression) -> exp.Expression:
if isinstance(node, exp.Column):
cleaned = node.copy()
cleaned.set("table", None)
# Unquote the identifier (LanceDB doesn't support quoted column names)
if isinstance(cleaned.this, exp.Identifier):
cleaned.this.set("quoted", False)
return cleaned
return node

normalized = predicate.transform(_strip_table_and_unquote)

# Transform MAKE_TIMESTAMPTZ calls to arrow_cast with ISO timestamps
# LanceDB/DataFusion doesn't support MAKE_TIMESTAMPTZ, but accepts arrow_cast
def _convert_timestamptz(node: exp.Expression) -> exp.Expression:
if (
isinstance(node, exp.Anonymous)
and node.name.upper() == "MAKE_TIMESTAMPTZ"
):
# Extract arguments: year, month, day, hour, minute, second, timezone
args = node.expressions
if len(args) >= 6:
year = (
int(args[0].this)
if hasattr(args[0], "this")
else int(str(args[0]))
)
month = (
int(args[1].this)
if hasattr(args[1], "this")
else int(str(args[1]))
)
day = (
int(args[2].this)
if hasattr(args[2], "this")
else int(str(args[2]))
)
hour = (
int(args[3].this)
if hasattr(args[3], "this")
else int(str(args[3]))
)
minute = (
int(args[4].this)
if hasattr(args[4], "this")
else int(str(args[4]))
)
# Second may be a float
second_val = (
args[5].this if hasattr(args[5], "this") else str(args[5])
)
second = float(second_val)
# Format as ISO timestamp string
iso_str = f"{year:04d}-{month:02d}-{day:02d}T{hour:02d}:{minute:02d}:{second:06.3f}Z"
# Use arrow_cast to convert string to Timestamp with timezone
# DataFusion syntax: arrow_cast('2024-01-01T00:00:00.000Z', 'Timestamp(Microsecond, Some("UTC"))')
return exp.Anonymous(
this="arrow_cast",
expressions=[
exp.Literal.string(iso_str),
exp.Literal.string('Timestamp(Microsecond, Some("UTC"))'),
],
)
return node

normalized = normalized.transform(_convert_timestamptz)
where_clause = normalized.sql()

temp_conn.disconnect()
return where_clause

def read_metadata_in_store(
self,
Expand Down
1 change: 0 additions & 1 deletion tests/metadata_stores/test_provenance_golden_reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -1096,7 +1096,6 @@ def test_hard_delete_memory_store_only(any_store: MetadataStore):

unsupported = {
"ClickHouseMetadataStore",
"LanceDBMetadataStore",
}
if any_store.__class__.__name__ in unsupported:
pytest.xfail(f"Hard delete pending for {any_store.__class__.__name__}")
Expand Down
Loading