Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 177 additions & 2 deletions lance_ray/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import lance
import pyarrow as pa
import ray
from lance.dataset import Index, IndexConfig, LanceDataset
from lance.indices import IndicesBuilder
from packaging import version
Expand Down Expand Up @@ -233,6 +234,160 @@ def merge_index_metadata_compat(dataset, index_id, index_type, **kwargs):
return dataset.merge_index_metadata(index_id)


@ray.remote
def _build_range_partition(
dataset_uri: str,
column: str,
name: str,
index_uuid: str,
replace: bool,
range_id: int,
table: pa.Table,
storage_options: Optional[dict[str, str]] = None,
) -> dict[str, Any]:
"""Build a single range partition of a sorted BTree index."""
try:
if column in table.column_names:
table = table.rename_columns(
["value" if c == column else c for c in table.column_names]
)

reader = pa.RecordBatchReader.from_batches(table.schema, table.to_batches())

dataset = LanceDataset(dataset_uri, storage_options=storage_options)
dataset.create_scalar_index(
column=column,
index_type="BTREE",
name=name,
replace=replace,
preprocessed_data=reader,
range_id=range_id,
index_uuid=index_uuid,
)

field_id = dataset.schema.get_field_index(column)

return {
"status": "success",
"range_id": range_id,
"fields": [field_id],
"uuid": index_uuid,
}
except Exception as exc:
logger.error("Sorted BTree range partition %d failed: %s", range_id, exc)
return {
"status": "error",
"range_id": range_id,
"error": str(exc),
}


def _build_sorted_btree_index(
*,
uri: str,
dataset: LanceDataset,
column: str,
name: str,
index_id: str,
replace: bool,
num_workers: int,
storage_options: Optional[dict[str, str]],
storage_options_provider: Any = None,
ray_remote_args: Optional[dict[str, Any]] = None,
**kwargs: Any,
) -> "lance.LanceDataset":
"""Range-based sorted BTree index build via Ray."""
from .io import read_lance

logger.info("Phase 1: Reading column '%s' with row IDs for sorted BTree", column)
ray_ds = read_lance(
uri,
columns=[column],
storage_options=storage_options,
scanner_options={"with_row_id": True},
)

logger.info("Phase 2: Sorting by column '%s'", column)
sorted_ds = ray_ds.sort(column)

logger.info("Phase 3: Splitting into %d range partitions", num_workers)
sorted_ds = sorted_ds.repartition(num_workers, shuffle=False)
table_refs = sorted_ds.to_arrow_refs()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ray.data.Dataset.sort(column) to perform a global range-partitioning shuffle. At 1B rows, this becomes a very heavy operation in terms of memory, network, and disk I/O. Could we better document the expected scale limits and recommend the machine size regarding different size of data?

Btw, this may involve object spilling and we may also estimate the memory/disk size on documentation
https://docs.ray.io/en/latest/ray-core/objects/object-spilling.html

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have much experience tbh in the exact scale that Ray sorting would be limiting. I read this blog: https://www.anyscale.com/blog/ray-breaks-the-usd1-tb-barrier-as-the-worlds-most-cost-efficient-sorting, but also saw https://discuss.ray.io/t/implementation-of-sort-is-not-optimal/12123, so looks like there is mixed result for Ray-based sorting. Do you have any sense of the recommended machine size and scale limits to share with? Do you suggest any other way to sort if not using Ray?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes—based on your investigation, ray sort looks like the best option for this use case, and I’m aligned with that direction.
From my previous experience, processing ~1Billion rows typically requires at least ~100 CPU cores, with a memory-to-core ratio around 4 GB/core (i.e., ~400 GB RAM) for workloads at this scale.
If possible, we could run a more robust test with a larger rows count (closer to production scale), so we can provide a more reliable capacity estimate.

actual_num_workers = len(table_refs)

logger.info("Phase 4: Building %d range partitions", actual_num_workers)
remote_opts = ray_remote_args or {}
build_task = _build_range_partition.options(**remote_opts)

futures = []
for i, table_ref in enumerate(table_refs):
fut = build_task.remote(
dataset_uri=uri,
column=column,
name=name,
index_uuid=index_id,
replace=replace,
range_id=i + 1,
table=table_ref,
storage_options=storage_options,
)
futures.append(fut)

results = ray.get(futures)

failed_results = [r for r in results if r["status"] == "error"]
if failed_results:
error_messages = [r["error"] for r in failed_results]
raise RuntimeError(
f"Sorted BTree index building failed: {'; '.join(error_messages)}"
)

logger.info("Phase 5: Merging index metadata for index ID: %s", index_id)
dataset = LanceDataset(
uri,
storage_options=storage_options,
storage_options_provider=storage_options_provider,
)
merge_index_metadata_compat(dataset, index_id, index_type="BTREE", **kwargs)

logger.info("Phase 6: Committing sorted BTree index '%s'", name)
successful_results = [r for r in results if r["status"] == "success"]
if not successful_results:
raise RuntimeError("No successful sorted BTree index results found")

fields = successful_results[0]["fields"]
fragment_ids_to_use = [f.fragment_id for f in dataset.get_fragments()]

index = Index(
uuid=index_id,
name=name,
fields=fields,
dataset_version=dataset.version,
fragment_ids=set(fragment_ids_to_use),
index_version=0,
)

create_index_op = lance.LanceOperation.CreateIndex(
new_indices=[index],
removed_indices=[],
)

updated_dataset = lance.LanceDataset.commit(
uri,
create_index_op,
read_version=dataset.version,
storage_options=storage_options,
storage_options_provider=storage_options_provider,
)

logger.info(
"Successfully created sorted BTree index '%s' with %d range partitions",
name,
actual_num_workers,
)
return updated_dataset


def create_scalar_index(
uri: Optional[str] = None,
*,
Expand All @@ -249,6 +404,7 @@ def create_scalar_index(
name: Optional[str] = None,
replace: bool = True,
train: bool = True,
sorted: bool = False,
fragment_ids: Optional[list[int]] = None,
index_uuid: Optional[str] = None,
num_workers: int = 4,
Expand All @@ -271,6 +427,10 @@ def create_scalar_index(
name: Name of the index (generated if None).
replace: Whether to replace existing index with the same name (default: True).
train: Whether to train the index (default: True).
sorted: Whether to use range-based sorted BTree indexing (default: False).
When True, the indexed column is globally sorted via Ray, split into
non-overlapping ranges, and each range builds a separate BTree partition.
Only valid with index_type="BTREE".
fragment_ids: Optional list of fragment IDs to build index on.
index_uuid: Optional fragment UUID for distributed indexing.
num_workers: Number of Ray workers to use (keyword-only).
Expand Down Expand Up @@ -353,8 +513,8 @@ def create_scalar_index(
f"{type(index_type)}"
)

# Note: Ray initialization is now handled by the Pool, following the pattern from io.py
# This removes the need for explicit ray.init() calls
if sorted and (not isinstance(index_type, str) or index_type != "BTREE"):
raise ValueError("sorted=True is only supported with index_type='BTREE'")

merged_storage_options: dict[str, Any] = {}
if storage_options:
Expand Down Expand Up @@ -437,6 +597,21 @@ def create_scalar_index(
if not fragments:
raise ValueError("Dataset contains no fragments")

if sorted:
return _build_sorted_btree_index(
uri=uri,
dataset=dataset,
column=column,
name=name,
index_id=index_id,
replace=replace,
num_workers=num_workers,
storage_options=merged_storage_options,
storage_options_provider=storage_options_provider,
ray_remote_args=ray_remote_args,
**kwargs,
)

if fragment_ids is not None:
available_fragment_ids = {f.fragment_id for f in fragments}
invalid_fragments = set(fragment_ids) - available_fragment_ids
Expand Down
Loading
Loading