diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 41dba0cf12..466fceedc4 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1883,100 +1883,131 @@ def _match_deletes_to_data_file(data_entry: ManifestEntry, positional_delete_ent class DataScan(TableScan): - def _build_partition_projection(self, spec_id: int) -> BooleanExpression: - project = inclusive_projection(self.table_metadata.schema(), self.table_metadata.specs()[spec_id], self.case_sensitive) - return project(self.row_filter) - @cached_property - def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]: - return KeyDefaultDict(self._build_partition_projection) + def _manifest_planner(self) -> ManifestGroupPlanner: + return ManifestGroupPlanner( + table_metadata=self.table_metadata, + io=self.io, + row_filter=self.row_filter, + case_sensitive=self.case_sensitive, + options=self.options, + ) - def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]: - spec = self.table_metadata.specs()[spec_id] - return manifest_evaluator(spec, self.table_metadata.schema(), self.partition_filters[spec_id], self.case_sensitive) + @property + def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]: + return self._manifest_planner.partition_filters - def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]: - spec = self.table_metadata.specs()[spec_id] - partition_type = spec.partition_type(self.table_metadata.schema()) - partition_schema = Schema(*partition_type.fields) - partition_expr = self.partition_filters[spec_id] + def plan_files(self) -> Iterable[FileScanTask]: + """Plans the relevant files by filtering on the PartitionSpecs. - # The lambda created here is run in multiple threads. - # So we avoid creating _EvaluatorExpression methods bound to a single - # shared instance across multiple threads. - return lambda data_file: expression_evaluator(partition_schema, partition_expr, self.case_sensitive)(data_file.partition) + Returns: + List of FileScanTasks that contain both data and delete files. + """ + snapshot = self.snapshot() + if not snapshot: + return iter([]) - def _build_metrics_evaluator(self) -> Callable[[DataFile], bool]: - schema = self.table_metadata.schema() - include_empty_files = strtobool(self.options.get("include_empty_files", "false")) + return self._manifest_planner.plan_files(manifests=snapshot.manifests(self.io)) - # The lambda created here is run in multiple threads. - # So we avoid creating _InclusiveMetricsEvaluator methods bound to a single - # shared instance across multiple threads. - return lambda data_file: _InclusiveMetricsEvaluator( - schema, - self.row_filter, - self.case_sensitive, - include_empty_files, - ).eval(data_file) + def to_arrow(self) -> pa.Table: + """Read an Arrow table eagerly from this DataScan. - def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], ResidualEvaluator]: - spec = self.table_metadata.specs()[spec_id] + All rows will be loaded into memory at once. - from pyiceberg.expressions.visitors import residual_evaluator_of + Returns: + pa.Table: Materialized Arrow Table from the Iceberg table's DataScan + """ + from pyiceberg.io.pyarrow import ArrowScan - # The lambda created here is run in multiple threads. - # So we avoid creating _EvaluatorExpression methods bound to a single - # shared instance across multiple threads. - return lambda datafile: ( - residual_evaluator_of( - spec=spec, - expr=self.row_filter, - case_sensitive=self.case_sensitive, - schema=self.table_metadata.schema(), - ) - ) + return ArrowScan( + self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit + ).to_table(self.plan_files()) - @staticmethod - def _check_sequence_number(min_sequence_number: int, manifest: ManifestFile) -> bool: - """Ensure that no manifests are loaded that contain deletes that are older than the data. + def to_arrow_batch_reader(self) -> pa.RecordBatchReader: + """Return an Arrow RecordBatchReader from this DataScan. - Args: - min_sequence_number (int): The minimal sequence number. - manifest (ManifestFile): A ManifestFile that can be either data or deletes. + For large results, using a RecordBatchReader requires less memory than + loading an Arrow Table for the same DataScan, because a RecordBatch + is read one at a time. Returns: - Boolean indicating if it is either a data file, or a relevant delete file. + pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg table's DataScan + which can be used to read a stream of record batches one by one. """ - return manifest.content == ManifestContent.DATA or ( - # Not interested in deletes that are older than the data - manifest.content == ManifestContent.DELETES - and (manifest.sequence_number or INITIAL_SEQUENCE_NUMBER) >= min_sequence_number - ) + import pyarrow as pa - def plan_files(self) -> Iterable[FileScanTask]: - """Plans the relevant files by filtering on the PartitionSpecs. + from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow - Returns: - List of FileScanTasks that contain both data and delete files. - """ - snapshot = self.snapshot() - if not snapshot: - return iter([]) + target_schema = schema_to_pyarrow(self.projection()) + batches = ArrowScan( + self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit + ).to_record_batches(self.plan_files()) + return pa.RecordBatchReader.from_batches( + target_schema, + batches, + ).cast(target_schema) + + def count(self) -> int: + from pyiceberg.io.pyarrow import ArrowScan + + # Usage: Calculates the total number of records in a Scan that haven't had positional deletes. + res = 0 + # every task is a FileScanTask + tasks = self.plan_files() + + for task in tasks: + # task.residual is a Boolean Expression if the filter condition is fully satisfied by the + # partition value and task.delete_files represents that positional delete haven't been merged yet + # hence those files have to read as a pyarrow table applying the filter and deletes + if task.residual == AlwaysTrue() and len(task.delete_files) == 0: + # Every File has a metadata stat that stores the file record count + res += task.file.record_count + else: + arrow_scan = ArrowScan( + table_metadata=self.table_metadata, + io=self.io, + projected_schema=self.projection(), + row_filter=self.row_filter, + case_sensitive=self.case_sensitive, + ) + tbl = arrow_scan.to_table([task]) + res += len(tbl) + return res + + +class ManifestGroupPlanner: + io: FileIO + table_metadata: TableMetadata + row_filter: BooleanExpression + case_sensitive: bool + options: Properties + + def __init__( + self, + table_metadata: TableMetadata, + io: FileIO, + row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE, + case_sensitive: bool = True, + options: Properties = EMPTY_DICT, + ): + self.table_metadata = table_metadata + self.io = io + self.row_filter = _parse_row_filter(row_filter) + self.case_sensitive = case_sensitive + self.options = options + + def plan_files(self, manifests: List[ManifestFile]) -> Iterable[FileScanTask]: # step 1: filter manifests using partition summaries # the filter depends on the partition spec used to write the manifest file, so create a cache of filters for each spec id manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator) - - residual_evaluators: Dict[int, Callable[[DataFile], ResidualEvaluator]] = KeyDefaultDict(self._build_residual_evaluator) - manifests = [ - manifest_file - for manifest_file in snapshot.manifests(self.io) - if manifest_evaluators[manifest_file.partition_spec_id](manifest_file) + manifest_file for manifest_file in manifests if manifest_evaluators[manifest_file.partition_spec_id](manifest_file) ] + residual_evaluators: Dict[int, Callable[[DataFile], ResidualEvaluator]] = KeyDefaultDict(self._build_residual_evaluator) + # step 2: filter the data files in each manifest # this filter depends on the partition spec used to write the manifest file @@ -2027,71 +2058,76 @@ def plan_files(self) -> Iterable[FileScanTask]: for data_entry in data_entries ] - def to_arrow(self) -> pa.Table: - """Read an Arrow table eagerly from this DataScan. + @cached_property + def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]: + return KeyDefaultDict(self._build_partition_projection) - All rows will be loaded into memory at once. + def _build_partition_projection(self, spec_id: int) -> BooleanExpression: + project = inclusive_projection(self.table_metadata.schema(), self.table_metadata.specs()[spec_id], self.case_sensitive) + return project(self.row_filter) - Returns: - pa.Table: Materialized Arrow Table from the Iceberg table's DataScan - """ - from pyiceberg.io.pyarrow import ArrowScan + def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]: + spec = self.table_metadata.specs()[spec_id] + return manifest_evaluator(spec, self.table_metadata.schema(), self.partition_filters[spec_id], self.case_sensitive) - return ArrowScan( - self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit - ).to_table(self.plan_files()) + def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]: + spec = self.table_metadata.specs()[spec_id] + partition_type = spec.partition_type(self.table_metadata.schema()) + partition_schema = Schema(*partition_type.fields) + partition_expr = self.partition_filters[spec_id] - def to_arrow_batch_reader(self) -> pa.RecordBatchReader: - """Return an Arrow RecordBatchReader from this DataScan. + # The lambda created here is run in multiple threads. + # So we avoid creating _EvaluatorExpression methods bound to a single + # shared instance across multiple threads. + return lambda data_file: expression_evaluator(partition_schema, partition_expr, self.case_sensitive)(data_file.partition) - For large results, using a RecordBatchReader requires less memory than - loading an Arrow Table for the same DataScan, because a RecordBatch - is read one at a time. + def _build_metrics_evaluator(self) -> Callable[[DataFile], bool]: + schema = self.table_metadata.schema() + include_empty_files = strtobool(self.options.get("include_empty_files", "false")) - Returns: - pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg table's DataScan - which can be used to read a stream of record batches one by one. - """ - import pyarrow as pa + # The lambda created here is run in multiple threads. + # So we avoid creating _InclusiveMetricsEvaluator methods bound to a single + # shared instance across multiple threads. + return lambda data_file: _InclusiveMetricsEvaluator( + schema, + self.row_filter, + self.case_sensitive, + include_empty_files, + ).eval(data_file) - from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow + def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], ResidualEvaluator]: + spec = self.table_metadata.specs()[spec_id] - target_schema = schema_to_pyarrow(self.projection()) - batches = ArrowScan( - self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit - ).to_record_batches(self.plan_files()) + from pyiceberg.expressions.visitors import residual_evaluator_of - return pa.RecordBatchReader.from_batches( - target_schema, - batches, - ).cast(target_schema) + # The lambda created here is run in multiple threads. + # So we avoid creating _EvaluatorExpression methods bound to a single + # shared instance across multiple threads. + return lambda datafile: ( + residual_evaluator_of( + spec=spec, + expr=self.row_filter, + case_sensitive=self.case_sensitive, + schema=self.table_metadata.schema(), + ) + ) - def count(self) -> int: - from pyiceberg.io.pyarrow import ArrowScan + @staticmethod + def _check_sequence_number(min_sequence_number: int, manifest: ManifestFile) -> bool: + """Ensure that no manifests are loaded that contain deletes that are older than the data. - # Usage: Calculates the total number of records in a Scan that haven't had positional deletes. - res = 0 - # every task is a FileScanTask - tasks = self.plan_files() + Args: + min_sequence_number (int): The minimal sequence number. + manifest (ManifestFile): A ManifestFile that can be either data or deletes. - for task in tasks: - # task.residual is a Boolean Expression if the filter condition is fully satisfied by the - # partition value and task.delete_files represents that positional delete haven't been merged yet - # hence those files have to read as a pyarrow table applying the filter and deletes - if task.residual == AlwaysTrue() and len(task.delete_files) == 0: - # Every File has a metadata stat that stores the file record count - res += task.file.record_count - else: - arrow_scan = ArrowScan( - table_metadata=self.table_metadata, - io=self.io, - projected_schema=self.projection(), - row_filter=self.row_filter, - case_sensitive=self.case_sensitive, - ) - tbl = arrow_scan.to_table([task]) - res += len(tbl) - return res + Returns: + Boolean indicating if it is either a data file, or a relevant delete file. + """ + return manifest.content == ManifestContent.DATA or ( + # Not interested in deletes that are older than the data + manifest.content == ManifestContent.DELETES + and (manifest.sequence_number or INITIAL_SEQUENCE_NUMBER) >= min_sequence_number + ) @dataclass(frozen=True)