Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
282 changes: 159 additions & 123 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1883,100 +1883,131 @@ def _match_deletes_to_data_file(data_entry: ManifestEntry, positional_delete_ent


class DataScan(TableScan):
def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
project = inclusive_projection(self.table_metadata.schema(), self.table_metadata.specs()[spec_id], self.case_sensitive)
return project(self.row_filter)

@cached_property
def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
return KeyDefaultDict(self._build_partition_projection)
def _manifest_planner(self) -> ManifestGroupPlanner:
return ManifestGroupPlanner(
table_metadata=self.table_metadata,
io=self.io,
row_filter=self.row_filter,
case_sensitive=self.case_sensitive,
options=self.options,
)

def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]:
spec = self.table_metadata.specs()[spec_id]
return manifest_evaluator(spec, self.table_metadata.schema(), self.partition_filters[spec_id], self.case_sensitive)
@property
def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
return self._manifest_planner.partition_filters

def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]:
spec = self.table_metadata.specs()[spec_id]
partition_type = spec.partition_type(self.table_metadata.schema())
partition_schema = Schema(*partition_type.fields)
partition_expr = self.partition_filters[spec_id]
def plan_files(self) -> Iterable[FileScanTask]:
"""Plans the relevant files by filtering on the PartitionSpecs.

# The lambda created here is run in multiple threads.
# So we avoid creating _EvaluatorExpression methods bound to a single
# shared instance across multiple threads.
return lambda data_file: expression_evaluator(partition_schema, partition_expr, self.case_sensitive)(data_file.partition)
Returns:
List of FileScanTasks that contain both data and delete files.
"""
snapshot = self.snapshot()
if not snapshot:
return iter([])

def _build_metrics_evaluator(self) -> Callable[[DataFile], bool]:
schema = self.table_metadata.schema()
include_empty_files = strtobool(self.options.get("include_empty_files", "false"))
return self._manifest_planner.plan_files(manifests=snapshot.manifests(self.io))

# The lambda created here is run in multiple threads.
# So we avoid creating _InclusiveMetricsEvaluator methods bound to a single
# shared instance across multiple threads.
return lambda data_file: _InclusiveMetricsEvaluator(
schema,
self.row_filter,
self.case_sensitive,
include_empty_files,
).eval(data_file)
def to_arrow(self) -> pa.Table:
"""Read an Arrow table eagerly from this DataScan.

def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], ResidualEvaluator]:
spec = self.table_metadata.specs()[spec_id]
All rows will be loaded into memory at once.

from pyiceberg.expressions.visitors import residual_evaluator_of
Returns:
pa.Table: Materialized Arrow Table from the Iceberg table's DataScan
"""
from pyiceberg.io.pyarrow import ArrowScan

# The lambda created here is run in multiple threads.
# So we avoid creating _EvaluatorExpression methods bound to a single
# shared instance across multiple threads.
return lambda datafile: (
residual_evaluator_of(
spec=spec,
expr=self.row_filter,
case_sensitive=self.case_sensitive,
schema=self.table_metadata.schema(),
)
)
return ArrowScan(
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
).to_table(self.plan_files())

@staticmethod
def _check_sequence_number(min_sequence_number: int, manifest: ManifestFile) -> bool:
"""Ensure that no manifests are loaded that contain deletes that are older than the data.
def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
"""Return an Arrow RecordBatchReader from this DataScan.

Args:
min_sequence_number (int): The minimal sequence number.
manifest (ManifestFile): A ManifestFile that can be either data or deletes.
For large results, using a RecordBatchReader requires less memory than
loading an Arrow Table for the same DataScan, because a RecordBatch
is read one at a time.

Returns:
Boolean indicating if it is either a data file, or a relevant delete file.
pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg table's DataScan
which can be used to read a stream of record batches one by one.
"""
return manifest.content == ManifestContent.DATA or (
# Not interested in deletes that are older than the data
manifest.content == ManifestContent.DELETES
and (manifest.sequence_number or INITIAL_SEQUENCE_NUMBER) >= min_sequence_number
)
import pyarrow as pa

def plan_files(self) -> Iterable[FileScanTask]:
"""Plans the relevant files by filtering on the PartitionSpecs.
from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow

Returns:
List of FileScanTasks that contain both data and delete files.
"""
snapshot = self.snapshot()
if not snapshot:
return iter([])
target_schema = schema_to_pyarrow(self.projection())
batches = ArrowScan(
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
).to_record_batches(self.plan_files())

return pa.RecordBatchReader.from_batches(
target_schema,
batches,
).cast(target_schema)

def count(self) -> int:
from pyiceberg.io.pyarrow import ArrowScan

# Usage: Calculates the total number of records in a Scan that haven't had positional deletes.
res = 0
# every task is a FileScanTask
tasks = self.plan_files()

for task in tasks:
# task.residual is a Boolean Expression if the filter condition is fully satisfied by the
# partition value and task.delete_files represents that positional delete haven't been merged yet
# hence those files have to read as a pyarrow table applying the filter and deletes
if task.residual == AlwaysTrue() and len(task.delete_files) == 0:
# Every File has a metadata stat that stores the file record count
res += task.file.record_count
else:
arrow_scan = ArrowScan(
table_metadata=self.table_metadata,
io=self.io,
projected_schema=self.projection(),
row_filter=self.row_filter,
case_sensitive=self.case_sensitive,
)
tbl = arrow_scan.to_table([task])
res += len(tbl)
return res


class ManifestGroupPlanner:
io: FileIO
table_metadata: TableMetadata
row_filter: BooleanExpression
case_sensitive: bool
options: Properties

def __init__(
self,
table_metadata: TableMetadata,
io: FileIO,
row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE,
case_sensitive: bool = True,
options: Properties = EMPTY_DICT,
):
self.table_metadata = table_metadata
self.io = io
self.row_filter = _parse_row_filter(row_filter)
self.case_sensitive = case_sensitive
self.options = options

def plan_files(self, manifests: List[ManifestFile]) -> Iterable[FileScanTask]:
# step 1: filter manifests using partition summaries
# the filter depends on the partition spec used to write the manifest file, so create a cache of filters for each spec id

manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator)

residual_evaluators: Dict[int, Callable[[DataFile], ResidualEvaluator]] = KeyDefaultDict(self._build_residual_evaluator)

manifests = [
manifest_file
for manifest_file in snapshot.manifests(self.io)
if manifest_evaluators[manifest_file.partition_spec_id](manifest_file)
manifest_file for manifest_file in manifests if manifest_evaluators[manifest_file.partition_spec_id](manifest_file)
]

residual_evaluators: Dict[int, Callable[[DataFile], ResidualEvaluator]] = KeyDefaultDict(self._build_residual_evaluator)

# step 2: filter the data files in each manifest
# this filter depends on the partition spec used to write the manifest file

Expand Down Expand Up @@ -2027,71 +2058,76 @@ def plan_files(self) -> Iterable[FileScanTask]:
for data_entry in data_entries
]

def to_arrow(self) -> pa.Table:
"""Read an Arrow table eagerly from this DataScan.
@cached_property
def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
return KeyDefaultDict(self._build_partition_projection)

All rows will be loaded into memory at once.
def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
project = inclusive_projection(self.table_metadata.schema(), self.table_metadata.specs()[spec_id], self.case_sensitive)
return project(self.row_filter)

Returns:
pa.Table: Materialized Arrow Table from the Iceberg table's DataScan
"""
from pyiceberg.io.pyarrow import ArrowScan
def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]:
spec = self.table_metadata.specs()[spec_id]
return manifest_evaluator(spec, self.table_metadata.schema(), self.partition_filters[spec_id], self.case_sensitive)

return ArrowScan(
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
).to_table(self.plan_files())
def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]:
spec = self.table_metadata.specs()[spec_id]
partition_type = spec.partition_type(self.table_metadata.schema())
partition_schema = Schema(*partition_type.fields)
partition_expr = self.partition_filters[spec_id]

def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
"""Return an Arrow RecordBatchReader from this DataScan.
# The lambda created here is run in multiple threads.
# So we avoid creating _EvaluatorExpression methods bound to a single
# shared instance across multiple threads.
return lambda data_file: expression_evaluator(partition_schema, partition_expr, self.case_sensitive)(data_file.partition)

For large results, using a RecordBatchReader requires less memory than
loading an Arrow Table for the same DataScan, because a RecordBatch
is read one at a time.
def _build_metrics_evaluator(self) -> Callable[[DataFile], bool]:
schema = self.table_metadata.schema()
include_empty_files = strtobool(self.options.get("include_empty_files", "false"))

Returns:
pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg table's DataScan
which can be used to read a stream of record batches one by one.
"""
import pyarrow as pa
# The lambda created here is run in multiple threads.
# So we avoid creating _InclusiveMetricsEvaluator methods bound to a single
# shared instance across multiple threads.
return lambda data_file: _InclusiveMetricsEvaluator(
schema,
self.row_filter,
self.case_sensitive,
include_empty_files,
).eval(data_file)

from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow
def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], ResidualEvaluator]:
spec = self.table_metadata.specs()[spec_id]

target_schema = schema_to_pyarrow(self.projection())
batches = ArrowScan(
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
).to_record_batches(self.plan_files())
from pyiceberg.expressions.visitors import residual_evaluator_of

return pa.RecordBatchReader.from_batches(
target_schema,
batches,
).cast(target_schema)
# The lambda created here is run in multiple threads.
# So we avoid creating _EvaluatorExpression methods bound to a single
# shared instance across multiple threads.
return lambda datafile: (
residual_evaluator_of(
spec=spec,
expr=self.row_filter,
case_sensitive=self.case_sensitive,
schema=self.table_metadata.schema(),
)
)

def count(self) -> int:
from pyiceberg.io.pyarrow import ArrowScan
@staticmethod
def _check_sequence_number(min_sequence_number: int, manifest: ManifestFile) -> bool:
"""Ensure that no manifests are loaded that contain deletes that are older than the data.

# Usage: Calculates the total number of records in a Scan that haven't had positional deletes.
res = 0
# every task is a FileScanTask
tasks = self.plan_files()
Args:
min_sequence_number (int): The minimal sequence number.
manifest (ManifestFile): A ManifestFile that can be either data or deletes.

for task in tasks:
# task.residual is a Boolean Expression if the filter condition is fully satisfied by the
# partition value and task.delete_files represents that positional delete haven't been merged yet
# hence those files have to read as a pyarrow table applying the filter and deletes
if task.residual == AlwaysTrue() and len(task.delete_files) == 0:
# Every File has a metadata stat that stores the file record count
res += task.file.record_count
else:
arrow_scan = ArrowScan(
table_metadata=self.table_metadata,
io=self.io,
projected_schema=self.projection(),
row_filter=self.row_filter,
case_sensitive=self.case_sensitive,
)
tbl = arrow_scan.to_table([task])
res += len(tbl)
return res
Returns:
Boolean indicating if it is either a data file, or a relevant delete file.
"""
return manifest.content == ManifestContent.DATA or (
# Not interested in deletes that are older than the data
manifest.content == ManifestContent.DELETES
and (manifest.sequence_number or INITIAL_SEQUENCE_NUMBER) >= min_sequence_number
)


@dataclass(frozen=True)
Expand Down