diff --git a/lance_ray/index.py b/lance_ray/index.py index a51db4b..93f74c2 100755 --- a/lance_ray/index.py +++ b/lance_ray/index.py @@ -7,6 +7,7 @@ import lance import pyarrow as pa +import ray from lance.dataset import Index, IndexConfig, LanceDataset from lance.indices import IndicesBuilder from packaging import version @@ -233,6 +234,160 @@ def merge_index_metadata_compat(dataset, index_id, index_type, **kwargs): return dataset.merge_index_metadata(index_id) +@ray.remote +def _build_range_partition( + dataset_uri: str, + column: str, + name: str, + index_uuid: str, + replace: bool, + range_id: int, + table: pa.Table, + storage_options: Optional[dict[str, str]] = None, +) -> dict[str, Any]: + """Build a single range partition of a sorted BTree index.""" + try: + if column in table.column_names: + table = table.rename_columns( + ["value" if c == column else c for c in table.column_names] + ) + + reader = pa.RecordBatchReader.from_batches(table.schema, table.to_batches()) + + dataset = LanceDataset(dataset_uri, storage_options=storage_options) + dataset.create_scalar_index( + column=column, + index_type="BTREE", + name=name, + replace=replace, + preprocessed_data=reader, + range_id=range_id, + index_uuid=index_uuid, + ) + + field_id = dataset.schema.get_field_index(column) + + return { + "status": "success", + "range_id": range_id, + "fields": [field_id], + "uuid": index_uuid, + } + except Exception as exc: + logger.error("Sorted BTree range partition %d failed: %s", range_id, exc) + return { + "status": "error", + "range_id": range_id, + "error": str(exc), + } + + +def _build_sorted_btree_index( + *, + uri: str, + dataset: LanceDataset, + column: str, + name: str, + index_id: str, + replace: bool, + num_workers: int, + storage_options: Optional[dict[str, str]], + storage_options_provider: Any = None, + ray_remote_args: Optional[dict[str, Any]] = None, + **kwargs: Any, +) -> "lance.LanceDataset": + """Range-based sorted BTree index build via Ray.""" + from .io import read_lance + + logger.info("Phase 1: Reading column '%s' with row IDs for sorted BTree", column) + ray_ds = read_lance( + uri, + columns=[column], + storage_options=storage_options, + scanner_options={"with_row_id": True}, + ) + + logger.info("Phase 2: Sorting by column '%s'", column) + sorted_ds = ray_ds.sort(column) + + logger.info("Phase 3: Splitting into %d range partitions", num_workers) + sorted_ds = sorted_ds.repartition(num_workers, shuffle=False) + table_refs = sorted_ds.to_arrow_refs() + actual_num_workers = len(table_refs) + + logger.info("Phase 4: Building %d range partitions", actual_num_workers) + remote_opts = ray_remote_args or {} + build_task = _build_range_partition.options(**remote_opts) + + futures = [] + for i, table_ref in enumerate(table_refs): + fut = build_task.remote( + dataset_uri=uri, + column=column, + name=name, + index_uuid=index_id, + replace=replace, + range_id=i + 1, + table=table_ref, + storage_options=storage_options, + ) + futures.append(fut) + + results = ray.get(futures) + + failed_results = [r for r in results if r["status"] == "error"] + if failed_results: + error_messages = [r["error"] for r in failed_results] + raise RuntimeError( + f"Sorted BTree index building failed: {'; '.join(error_messages)}" + ) + + logger.info("Phase 5: Merging index metadata for index ID: %s", index_id) + dataset = LanceDataset( + uri, + storage_options=storage_options, + storage_options_provider=storage_options_provider, + ) + merge_index_metadata_compat(dataset, index_id, index_type="BTREE", **kwargs) + + logger.info("Phase 6: Committing sorted BTree index '%s'", name) + successful_results = [r for r in results if r["status"] == "success"] + if not successful_results: + raise RuntimeError("No successful sorted BTree index results found") + + fields = successful_results[0]["fields"] + fragment_ids_to_use = [f.fragment_id for f in dataset.get_fragments()] + + index = Index( + uuid=index_id, + name=name, + fields=fields, + dataset_version=dataset.version, + fragment_ids=set(fragment_ids_to_use), + index_version=0, + ) + + create_index_op = lance.LanceOperation.CreateIndex( + new_indices=[index], + removed_indices=[], + ) + + updated_dataset = lance.LanceDataset.commit( + uri, + create_index_op, + read_version=dataset.version, + storage_options=storage_options, + storage_options_provider=storage_options_provider, + ) + + logger.info( + "Successfully created sorted BTree index '%s' with %d range partitions", + name, + actual_num_workers, + ) + return updated_dataset + + def create_scalar_index( uri: Optional[str] = None, *, @@ -249,6 +404,7 @@ def create_scalar_index( name: Optional[str] = None, replace: bool = True, train: bool = True, + sorted: bool = False, fragment_ids: Optional[list[int]] = None, index_uuid: Optional[str] = None, num_workers: int = 4, @@ -271,6 +427,10 @@ def create_scalar_index( name: Name of the index (generated if None). replace: Whether to replace existing index with the same name (default: True). train: Whether to train the index (default: True). + sorted: Whether to use range-based sorted BTree indexing (default: False). + When True, the indexed column is globally sorted via Ray, split into + non-overlapping ranges, and each range builds a separate BTree partition. + Only valid with index_type="BTREE". fragment_ids: Optional list of fragment IDs to build index on. index_uuid: Optional fragment UUID for distributed indexing. num_workers: Number of Ray workers to use (keyword-only). @@ -353,8 +513,8 @@ def create_scalar_index( f"{type(index_type)}" ) - # Note: Ray initialization is now handled by the Pool, following the pattern from io.py - # This removes the need for explicit ray.init() calls + if sorted and (not isinstance(index_type, str) or index_type != "BTREE"): + raise ValueError("sorted=True is only supported with index_type='BTREE'") merged_storage_options: dict[str, Any] = {} if storage_options: @@ -437,6 +597,21 @@ def create_scalar_index( if not fragments: raise ValueError("Dataset contains no fragments") + if sorted: + return _build_sorted_btree_index( + uri=uri, + dataset=dataset, + column=column, + name=name, + index_id=index_id, + replace=replace, + num_workers=num_workers, + storage_options=merged_storage_options, + storage_options_provider=storage_options_provider, + ray_remote_args=ray_remote_args, + **kwargs, + ) + if fragment_ids is not None: available_fragment_ids = {f.fragment_id for f in fragments} invalid_fragments = set(fragment_ids) - available_fragment_ids diff --git a/tests/test_distributed_indexing.py b/tests/test_distributed_indexing.py index 9607c01..1b8efcb 100755 --- a/tests/test_distributed_indexing.py +++ b/tests/test_distributed_indexing.py @@ -894,6 +894,189 @@ def test_distributed_btree_index_string_column(self, temp_dir): ) +@pytest.mark.skipif( + not check_btree_version_compatibility(), + reason="Sorted B-tree indexing requires pylance >= 0.37.0. Current version: {}".format( + getattr(lance, "__version__", "unknown") + ), +) +class TestDistributedSortedBTreeIndexing: + """Tests for range-based sorted BTree indexing (sorted=True).""" + + def test_sorted_btree_basic(self, temp_dir): + """Build a sorted BTree index and verify queries work.""" + ds = generate_multi_fragment_dataset( + temp_dir, num_fragments=3, rows_per_fragment=500 + ) + + updated_dataset = lr.create_scalar_index( + uri=ds.uri, + column="id", + index_type="BTREE", + name="sorted_btree_idx", + sorted=True, + num_workers=3, + ) + + indices = updated_dataset.list_indices() + assert len(indices) > 0, "No indices found after sorted BTree build" + + our_index = None + for idx in indices: + if idx["name"] == "sorted_btree_idx": + our_index = idx + break + assert our_index is not None, "Sorted BTree index not found by name" + assert our_index["type"] == "BTree" + + eq_tbl = updated_dataset.scanner(filter="id = 100", columns=["id"]).to_table() + assert eq_tbl.num_rows == 1 + + rg_tbl = updated_dataset.scanner( + filter="id >= 200 AND id < 800", columns=["id"] + ).to_table() + assert rg_tbl.num_rows == 600 + + @pytest.fixture + def sorted_comp_datasets(self, tmp_path): + """Build sorted and fragment-based BTree indices on identical data.""" + sorted_ds = generate_multi_fragment_dataset( + tmp_path / "sorted", num_fragments=3, rows_per_fragment=500 + ) + fragment_ds = generate_multi_fragment_dataset( + tmp_path / "fragment", num_fragments=3, rows_per_fragment=500 + ) + + sorted_ds = lr.create_scalar_index( + uri=sorted_ds.uri, + column="id", + index_type="BTREE", + name="comp_sorted_idx", + sorted=True, + num_workers=2, + ) + fragment_ds = lr.create_scalar_index( + uri=fragment_ds.uri, + column="id", + index_type="BTREE", + name="comp_fragment_idx", + num_workers=2, + ) + + return {"sorted": sorted_ds, "fragment": fragment_ds} + + @pytest.mark.parametrize( + "test_name,filter_expr", + [ + ("Equality first", "id = 0"), + ("Equality middle", "id = 750"), + ("Equality last", "id = 1499"), + ("Range within", "id >= 100 AND id < 200"), + ("Cross fragment", "id >= 495 AND id < 505"), + ("Full range", "id >= 0 AND id < 1500"), + ("Non-existent", "id = 9999"), + ("Less than", "id < 100"), + ("Greater than", "id > 1400"), + ], + ) + def test_sorted_btree_vs_fragment_correctness( + self, sorted_comp_datasets, test_name, filter_expr + ): + """Compare sorted and fragment-based BTree query results.""" + sorted_ds = sorted_comp_datasets["sorted"] + fragment_ds = sorted_comp_datasets["fragment"] + + res_sorted = sorted_ds.scanner(filter=filter_expr, columns=["id"]).to_table() + res_fragment = fragment_ds.scanner( + filter=filter_expr, columns=["id"] + ).to_table() + + assert res_sorted.num_rows == res_fragment.num_rows, ( + f"Test '{test_name}': sorted returned {res_sorted.num_rows} rows, " + f"fragment returned {res_fragment.num_rows} rows for: {filter_expr}" + ) + + if res_sorted.num_rows > 0: + ids_sorted = sorted(res_sorted.column("id").to_pylist()) + ids_fragment = sorted(res_fragment.column("id").to_pylist()) + assert ids_sorted == ids_fragment + + def test_sorted_btree_string_column(self, temp_dir): + """Verify sorted BTree works on string columns.""" + all_data = [] + for i in range(400): + all_data.append({"id": i, "category": f"cat_{i:04d}"}) + + df = pd.DataFrame(all_data) + dataset = ray.data.from_pandas(df) + path = Path(temp_dir) / "string_sorted.lance" + lr.write_lance(dataset, str(path), min_rows_per_file=100, max_rows_per_file=100) + ds = lance.dataset(str(path)) + + updated_dataset = lr.create_scalar_index( + uri=ds.uri, + column="category", + index_type="BTREE", + name="str_sorted_idx", + sorted=True, + num_workers=2, + ) + + indices = updated_dataset.list_indices() + our_index = next( + (idx for idx in indices if idx["name"] == "str_sorted_idx"), None + ) + assert our_index is not None + assert our_index["type"] == "BTree" + + result = updated_dataset.scanner( + filter="category = 'cat_0200'", columns=["id", "category"] + ).to_table() + assert result.num_rows == 1 + assert result.column("id")[0].as_py() == 200 + + def test_sorted_btree_rejects_non_btree(self, temp_dir): + """sorted=True with non-BTREE index type raises ValueError.""" + ds = generate_multi_fragment_dataset( + temp_dir, num_fragments=2, rows_per_fragment=50 + ) + + with pytest.raises(ValueError, match="sorted=True is only supported"): + lr.create_scalar_index( + uri=ds.uri, + column="text", + index_type="INVERTED", + sorted=True, + num_workers=2, + ) + + def test_sorted_btree_single_worker(self, temp_dir): + """Edge case: sorted BTree with num_workers=1.""" + ds = generate_multi_fragment_dataset( + temp_dir, num_fragments=2, rows_per_fragment=100 + ) + + updated_dataset = lr.create_scalar_index( + uri=ds.uri, + column="id", + index_type="BTREE", + name="sorted_single_worker_idx", + sorted=True, + num_workers=1, + ) + + indices = updated_dataset.list_indices() + our_index = next( + (idx for idx in indices if idx["name"] == "sorted_single_worker_idx"), + None, + ) + assert our_index is not None + assert our_index["type"] == "BTree" + + eq_tbl = updated_dataset.scanner(filter="id = 50", columns=["id"]).to_table() + assert eq_tbl.num_rows == 1 + + class TestNamespaceIndexing: """Test cases for distributed indexing with DirectoryNamespace."""