From 0cb6dbdbcaf3ed4daddfa34dd9ca5607510df3b3 Mon Sep 17 00:00:00 2001 From: Richard Guo Date: Thu, 13 Mar 2025 15:48:32 -0400 Subject: [PATCH] handle filesystems --- pyproject.toml | 2 +- quadfeather/tiler.py | 139 ++++++++++++++++++++++++++----------------- tests/test_s3.py | 67 +++++++++++++++++++++ 3 files changed, 151 insertions(+), 57 deletions(-) create mode 100644 tests/test_s3.py diff --git a/pyproject.toml b/pyproject.toml index 1a4b1ed..2d24a92 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "quadfeather" -version = "2.0.1" +version = "2.1.0" description = "Quadtree tiling from CSV/Apache Arrow for use with deepscatter in the browser." readme = "README.md" requires-python = ">=3.9" diff --git a/quadfeather/tiler.py b/quadfeather/tiler.py index 8120ff8..873d14c 100644 --- a/quadfeather/tiler.py +++ b/quadfeather/tiler.py @@ -1,6 +1,6 @@ import pyarrow as pa from pyarrow import csv, feather, parquet as pq, compute as pc, ipc as ipc - +from pyarrow import fs import logging from pathlib import Path import sys @@ -311,6 +311,7 @@ def main( max_open_filehandles=33, randomize=0, limits=None, + filesystem: fs.FileSystem = fs.LocalFileSystem(), ): """ Run a tiler @@ -394,6 +395,7 @@ def main( max_open_filehandles=max_open_filehandles, dictionaries=dictionaries, randomize=randomize, + filesystem=filesystem ) tiler.insert_files(files=rewritten_files) @@ -413,7 +415,7 @@ class Quadtree: def __init__( self, basedir: Path, - extent: Union[Rectangle, Tuple[Tuple[float, float], Tuple[float, float]]], + extent: Optional[Union[Rectangle, Tuple[Tuple[float, float], Tuple[float, float]]]] = None, mode: Literal["read", "write", "append"] = "read", dictionaries: Dict[str, pa.Array] = {}, schema: pa.Schema = pa.schema({}), @@ -422,6 +424,7 @@ def __init__( max_open_filehandles=32, sidecars: Dict[str, str] = {}, randomize=0, + filesystem: fs.FileSystem = fs.LocalFileSystem() ): """ * sidecars: a dictionary of sidecar files to add to the tileset. The key is the column that @@ -431,6 +434,7 @@ def __init__( self.tile_size = tile_size self.first_tile_size = first_tile_size self.basedir = basedir + self.filesystem = filesystem self.dictionaries = dictionaries self.max_open_filehandles = max_open_filehandles self.mode = mode @@ -444,6 +448,8 @@ def __init__( self.schema = schema self._schemas: Optional[Dict[str, pa.Schema]] = None if self.mode == "write": + if extent is None: + raise ValueError("Extent must be defined in write mode.") self.root = Tile( quadtree=self, extent=extent, @@ -454,15 +460,31 @@ def __init__( self._macrotiles: Optional[List[Macrotile]] = None self._bloom_cache: Dict[str, pa.Array] = {} + # Easier file system handling methods + + def write_feather(self, write_path: Path, table: pa.Table, compression: Literal["zstd", "uncompressed"] = "zstd"): + with self.filesystem.open_output_stream(write_path.as_posix()) as f: + feather.write_feather(table, f, compression=compression) + + def read_feather(self, read_path: Path, columns: Optional[List[str]] = None) -> pa.Table: + with self.filesystem.open_input_file(read_path.as_posix()) as f: + return feather.read_table(f, columns=columns) + + def file_exists(self, path: Path) -> bool: + return self.filesystem.get_file_info(path.as_posix()).is_file + @staticmethod - def from_dir(basedir: Path, mode: Literal["read", "append"]) -> "Quadtree": + def from_dir(basedir: Path, mode: Literal["read", "append"], filesystem: fs.FileSystem = fs.LocalFileSystem()) -> "Quadtree": # Load a quadtree from disk. - manifest = basedir / "manifest.feather" - if not manifest.exists(): + manifest_path = basedir / "manifest.feather" + manifest_info = filesystem.get_file_info(manifest_path.as_posix()) + if not manifest_info.is_file: raise FileNotFoundError( "Not able to load a quadtree without a manifest file." ) - manifest = feather.read_table(basedir / "manifest.feather") + manifest = None + with filesystem.open_input_file(manifest_path.as_posix()) as f: + manifest = feather.read_table(f) sidecars = json.loads(manifest.schema.metadata[b"sidecars"]) schema = pa.ipc.read_schema(BytesIO(bytes(manifest.schema.metadata[b"schema"]))) extent = manifest.filter(pc.equal(manifest["key"], "0/0/0"))["extent"][ @@ -479,6 +501,7 @@ def from_dir(basedir: Path, mode: Literal["read", "append"]) -> "Quadtree": mode=mode, sidecars=sidecars, schema=schema, + filesystem=filesystem, ) def tiles(self): @@ -547,7 +570,7 @@ def read_root_table(self, suffix: Optional[str] = ""): path = path.with_suffix(f".{suffix}.feather") else: path = path.with_suffix(".feather") - return feather.read_table(path) + return self.read_feather(path) def finalize(self): manifest = self.root.finalize() @@ -559,13 +582,13 @@ def finalize(self): "schema": self.final_schema().serialize().to_pybytes(), } ) - feather.write_feather( - tb, self.basedir / "manifest.feather", compression="uncompressed" - ) + path = self.basedir / "manifest.feather" + self.write_feather(path, tb, compression="uncompressed") @property def manifest_table(self): - return feather.read_table(self.basedir / "manifest.feather") + path = self.basedir / "manifest.feather" + return self.read_feather(path) def build_bloom_index(self, id_field: str, id_sidecar: Optional[str], m=28, k=10): """ @@ -682,11 +705,12 @@ def __init__( def write_batch_to_filehandle(self, path: Path, batch: pa.Table): if not path in self._open_filehandles: + open_filehandle = self.quadtree.filesystem.open_output_stream(path.as_posix()) if path.suffix == ".feather": - self._open_filehandles[path] = ipc.new_file(path, schema=batch.schema) + self._open_filehandles[path] = ipc.new_file(open_filehandle, schema=batch.schema) elif path.suffix == ".parquet": self._open_filehandles[path] = pq.ParquetWriter( - path, schema=batch.schema + open_filehandle, schema=batch.schema ) else: raise NotImplementedError( @@ -729,7 +753,7 @@ def bloom_filter_loc(self, id_field: str, m: int, k: int): def build_bloom_filter(self, id_field: str, m: int, k: int): bloom_filter_loc = self.bloom_filter_loc(id_field, m, k) - bloom_filter_loc.parent.mkdir(parents=True, exist_ok=True) + self.quadtree.filesystem.create_dir(bloom_filter_loc.parent.as_posix(), recursive=True) if bloom_filter_loc.exists(): return @@ -739,7 +763,7 @@ def build_bloom_filter(self, id_field: str, m: int, k: int): # At *read* time, we are carefuly to use bitmasks. - positions = np.zeros((2**m), np.bool) + positions = np.zeros((2**m), np.bool_) tilenames = [] id_sidecar = self.quadtree.sidecars.get(id_field, None) @@ -776,8 +800,11 @@ def build_bloom_filter(self, id_field: str, m: int, k: int): # but the reason we're using arrow at all for these is to get bitmask- # lists in a sane form, so it's not the end of the world. - self.bloom_filter_loc(id_field, m, k).parent.mkdir(parents=True, exist_ok=True) - feather.write_feather(tb, bloom_filter_loc, compression="zstd") + self.quadtree.filesystem.create_dir( + self.bloom_filter_loc(id_field, m, k).parent.as_posix(), + recursive=True + ) + self.quadtree.write_feather(bloom_filter_loc, tb, compression="zstd") def matched_file_loc(self, new_sidecar_name, m, k): """ @@ -798,10 +825,10 @@ def candidate_file_loc(self, new_sidecar_name, m, k): def bloom_filter(self, id_field, m, k): loc = self.bloom_filter_loc(id_field, m, k) - if not loc.exists(): + if not self.quadtree.file_exists(loc): logger.debug(("MAKING BLOOM FILTER", self.coords, id_field, m, k)) self.build_bloom_filter(id_field, m, k) - tb = feather.read_table(loc) + tb = self.quadtree.read_feather(loc) return pc.list_flatten(tb.take([0])["bitmask"]) def bloom_filters_below_here(self, id_field, m, k, inclusive=True): @@ -813,7 +840,7 @@ def bloom_filters_below_here(self, id_field, m, k, inclusive=True): total_filter = self.bloom_filter(id_field, m, k) else: raise NotImplementedError("Not implemented") - total_filter = pa.array(np.zeros(2**m), pa.bool_()) + # total_filter = pa.array(np.zeros(2**m), pa.bool_()) children = self.children() while len(children) > 0: child = children.pop() @@ -922,32 +949,32 @@ def complete_insert_stage(self, id_field, m: int, k: int, new_sidecar_name): # Close the lagging bloom filters self.quadtree._bloom_cache = {} - if self.candidate_file_loc(new_sidecar_name, m, k).exists(): + if self.quadtree.file_exists(self.candidate_file_loc(new_sidecar_name, m, k)): # "Candidate file loc" means that we encountered a bloom filter match # including all of the children of this tile. if len(self.children()) == 0: # If we have preliminary unmerged files but no children, # this means that we're done inserting, and can rename it # to be a 'matched' file. - self.candidate_file_loc(new_sidecar_name, m, k).rename( - self.matched_file_loc(new_sidecar_name, m, k) + self.quadtree.filesystem.move( + self.candidate_file_loc(new_sidecar_name, m, k).as_posix(), + self.matched_file_loc(new_sidecar_name, m, k).as_posix() ) else: # Otherwise, we need to recursively insert that file # at an appropriate point in the tree. - for batch in pq.ParquetFile( - self.candidate_file_loc(new_sidecar_name, m, k) - ).iter_batches(): - b = pa.Table.from_batches([batch]) - self.insert_for_join( - b, - id_field, - m, - k, - new_sidecar_name, - ) + with self.quadtree.filesystem.open_input_file(self.candidate_file_loc(new_sidecar_name, m, k).as_posix()) as f: + for batch in pq.ParquetFile(f).iter_batches(): + b = pa.Table.from_batches([batch]) + self.insert_for_join( + b, + id_field, + m, + k, + new_sidecar_name, + ) # Clean up the candidate file, because it has now been inserted below here. - self.candidate_file_loc(new_sidecar_name, m, k).unlink() + self.quadtree.filesystem.delete_file(self.candidate_file_loc(new_sidecar_name, m, k).as_posix()) # Close the filehandles for the children immediately to avoid # lagging writers self.close_filehandles() @@ -976,24 +1003,26 @@ def complete_join(self, id_field, m, k, new_sidecar_name): except KeyError: sidecar = None - if not self.matched_file_loc(new_sidecar_name, m, k).exists(): + if not self.quadtree.file_exists(self.matched_file_loc(new_sidecar_name, m, k)): # Can happen if there are no matches. pass else: for tile in self.tiles(): dest = tile.filename.with_suffix(f".{new_sidecar_name}.feather") - if dest.exists(): + if self.quadtree.file_exists(dest): raise FileExistsError(f"File {dest} already exists.") # logger.warning(f"File {dest} already exists.") # continue ids = tile.read_column(id_field, sidecar) # Read *only* the relevant rows of the parquet file. # So long as the - matches = pq.read_table( - self.matched_file_loc(new_sidecar_name, m, k), - filters=[(id_field, "in", ids)], - ) + matched_sidecar_file = self.matched_file_loc(new_sidecar_name, m, k) + with self.quadtree.filesystem.open_input_file(matched_sidecar_file.as_posix()) as f: + matches = pq.read_table( + f, + filters=[(id_field, "in", ids)], + ) sort_indices = pc.index_in(ids, matches[id_field]) # Now we reshuffle the matches to be in the same order as the original file. @@ -1005,11 +1034,7 @@ def complete_join(self, id_field, m, k, new_sidecar_name): towrite = towrite.combine_chunks() except pa.lib.ArrowInvalid: logger.warning(f"Failed to combine chunks for {tile.coords}") - feather.write_feather( - towrite, - dest, - compression="uncompressed", - ) + self.quadtree.write_feather(dest, towrite, compression="uncompressed") for mychild in self.children(): mychild.complete_join(id_field, m, k, new_sidecar_name) @@ -1177,7 +1202,7 @@ def finalize(self): # First, we close any existing overflow buffers. tile.close_overflow_buffers() for tile in self.iterate(direction="top-down"): - if tile.overflow_loc.exists(): + if self.quadtree.file_exists(tile.overflow_loc): # Now we can begin again with the overall budget inserting at this point. tile.permitted_children = self.overall_tile_budget() tables = [ @@ -1187,7 +1212,8 @@ def finalize(self): for k in self.sidecar_names ], ] - inputs = [pa.ipc.open_file(tb) for tb in tables] + files = [self.quadtree.filesystem.open_input_file(tb.as_posix()) for tb in tables] + inputs = [pa.ipc.open_file(file) for file in files] def yielder() -> Iterator[pa.RecordBatch]: for i in range(inputs[0].num_record_batches): @@ -1201,7 +1227,9 @@ def yielder() -> Iterator[pa.RecordBatch]: # Insert in chunks of 100 megabytes for batch in rebatch(yielder(), 100e6): tile.insert(batch) - tile.overflow_loc.unlink() + + for table in tables: + self.quadtree.filesystem.delete_file(table.as_posix()) # Manifest isn't used tile.finalize() @@ -1232,7 +1260,7 @@ def filename(self) -> Path: return self._filename local_name = Path(*map(str, self.coords)).with_suffix(".feather") dest_file = self.quadtree.basedir / local_name - dest_file.parent.mkdir(parents=True, exist_ok=True) + self.quadtree.filesystem.create_dir(dest_file.parent.as_posix(), recursive=True) self._filename = dest_file return self._filename @@ -1307,10 +1335,10 @@ def flush_data(self, destination, compression): for k, v in self.quadtree.sidecars.items(): other_tbs[v][k] = frame[k] frame = frame.drop(k) - feather.write_feather(frame, destination, compression=compression) + self.quadtree.write_feather(destination, frame, compression=compression) for k, v in other_tbs.items(): - feather.write_feather( - pa.table(v), destination.with_suffix(f".{k}.feather"), compression + self.quadtree.write_feather( + destination.with_suffix(f".{k}.feather"), pa.table(v), compression ) self.min_ix = minmax["min"] self.max_ix = minmax["max"] @@ -1363,7 +1391,7 @@ def overflow_buffers(self): p = self.overflow_loc if k != "": p = p.with_suffix(f".{k}.arrow") - self._sinks[k] = pa.OSFile(str(p), "wb") + self._sinks[k] = self.quadtree.filesystem.open_output_stream(p.as_posix()) self._overflow_writers[k] = pa.ipc.new_file( self._sinks[k], self.quadtree.schemas[k] ) @@ -1516,8 +1544,7 @@ def read_column(self, column: str, sidecar: Optional[str]) -> pa.Array: fname = self.filename if sidecar is not None: fname = self.filename.with_suffix(f".{sidecar}.feather") - return feather.read_table(fname, columns=[column])[column] - + return self.quadtree.read_feather(fname, columns=[column])[column] def get_better_codes(col, counter=Counter()): for a in pc.value_counts(col): diff --git a/tests/test_s3.py b/tests/test_s3.py new file mode 100644 index 0000000..b1131ae --- /dev/null +++ b/tests/test_s3.py @@ -0,0 +1,67 @@ +from pyarrow import fs +from quadfeather.tiler import * +from pathlib import Path + +import pytest + +def get_s3_filesystem(region: str = "us-east-2"): + return fs.S3FileSystem( + region=region, + ) + +@pytest.mark.skip(reason="This is a slow test that requires a real S3 bucket.") +def test_s3_filesystem(bucket_name: str,NUM_POINTS=100_000, TILE_SIZE=10_000): + fs = get_s3_filesystem() + basedir = Path(bucket_name) / "tiles" + print("Creating test data...") + insert_data = pa.table( + { + "id": pa.array(np.arange(NUM_POINTS)).cast(pa.string()), + "x": np.random.normal(0, 10, NUM_POINTS), + "y": np.random.normal(0, 10, NUM_POINTS), + } + ) + xtent = pc.min_max(insert_data["x"]).as_py() + ytent = pc.min_max(insert_data["y"]).as_py() + print("Creating quadtree...") + qtree = Quadtree( + mode="write", + extent=((xtent["min"], xtent["max"]), (ytent["min"], ytent["max"])), + max_open_filehandles=33, + basedir=basedir, + tile_size=TILE_SIZE, + first_tile_size=int(TILE_SIZE / 4), + filesystem=fs, + ) + print("Inserting data...") + qtree.insert(insert_data) + + print("Finalizing quadtree...") + qtree.finalize() + + print("Adding sidecar...") + ids = pa.array(np.arange(NUM_POINTS)).cast(pa.string()) + insert_tb = pa.table({"id": ids, "join_field": ids}) + # Shuffle the insert table. + shuffled_indices = np.random.permutation(len(insert_tb)) + insert_tb = insert_tb.take(pa.array(shuffled_indices)) + + def stream(): + start = 0 + while start < len(insert_tb): + yield insert_tb.take(np.arange(start, start + int(NUM_POINTS / 100))) + start += int(NUM_POINTS / 100) + + qtree.join(stream(), "id", "new_sidecar") + + ### First, we just confirm that the root table was correctly built. + m = qtree.read_root_table("new_sidecar") + root_ids = qtree.read_root_table(None)["id"] + assert not "id" in m.column_names + assert "join_field" in m.column_names + assert (pc.all(pc.equal(m["join_field"], root_ids))).as_py() + + +# Run with aws-vault exec +if __name__ == "__main__": + test_s3_filesystem("quadfeather")