Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "quadfeather"
version = "2.0.1"
version = "2.1.0"
description = "Quadtree tiling from CSV/Apache Arrow for use with deepscatter in the browser."
readme = "README.md"
requires-python = ">=3.9"
Expand Down
139 changes: 83 additions & 56 deletions quadfeather/tiler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pyarrow as pa
from pyarrow import csv, feather, parquet as pq, compute as pc, ipc as ipc

from pyarrow import fs
import logging
from pathlib import Path
import sys
Expand Down Expand Up @@ -311,6 +311,7 @@ def main(
max_open_filehandles=33,
randomize=0,
limits=None,
filesystem: fs.FileSystem = fs.LocalFileSystem(),
):
"""
Run a tiler
Expand Down Expand Up @@ -394,6 +395,7 @@ def main(
max_open_filehandles=max_open_filehandles,
dictionaries=dictionaries,
randomize=randomize,
filesystem=filesystem
)
tiler.insert_files(files=rewritten_files)

Expand All @@ -413,7 +415,7 @@ class Quadtree:
def __init__(
self,
basedir: Path,
extent: Union[Rectangle, Tuple[Tuple[float, float], Tuple[float, float]]],
extent: Optional[Union[Rectangle, Tuple[Tuple[float, float], Tuple[float, float]]]] = None,
mode: Literal["read", "write", "append"] = "read",
dictionaries: Dict[str, pa.Array] = {},
schema: pa.Schema = pa.schema({}),
Expand All @@ -422,6 +424,7 @@ def __init__(
max_open_filehandles=32,
sidecars: Dict[str, str] = {},
randomize=0,
filesystem: fs.FileSystem = fs.LocalFileSystem()
):
"""
* sidecars: a dictionary of sidecar files to add to the tileset. The key is the column that
Expand All @@ -431,6 +434,7 @@ def __init__(
self.tile_size = tile_size
self.first_tile_size = first_tile_size
self.basedir = basedir
self.filesystem = filesystem
self.dictionaries = dictionaries
self.max_open_filehandles = max_open_filehandles
self.mode = mode
Expand All @@ -444,6 +448,8 @@ def __init__(
self.schema = schema
self._schemas: Optional[Dict[str, pa.Schema]] = None
if self.mode == "write":
if extent is None:
raise ValueError("Extent must be defined in write mode.")
self.root = Tile(
quadtree=self,
extent=extent,
Expand All @@ -454,15 +460,31 @@ def __init__(
self._macrotiles: Optional[List[Macrotile]] = None
self._bloom_cache: Dict[str, pa.Array] = {}

# Easier file system handling methods

def write_feather(self, write_path: Path, table: pa.Table, compression: Literal["zstd", "uncompressed"] = "zstd"):
with self.filesystem.open_output_stream(write_path.as_posix()) as f:
feather.write_feather(table, f, compression=compression)

def read_feather(self, read_path: Path, columns: Optional[List[str]] = None) -> pa.Table:
with self.filesystem.open_input_file(read_path.as_posix()) as f:
return feather.read_table(f, columns=columns)

def file_exists(self, path: Path) -> bool:
return self.filesystem.get_file_info(path.as_posix()).is_file

@staticmethod
def from_dir(basedir: Path, mode: Literal["read", "append"]) -> "Quadtree":
def from_dir(basedir: Path, mode: Literal["read", "append"], filesystem: fs.FileSystem = fs.LocalFileSystem()) -> "Quadtree":
# Load a quadtree from disk.
manifest = basedir / "manifest.feather"
if not manifest.exists():
manifest_path = basedir / "manifest.feather"
manifest_info = filesystem.get_file_info(manifest_path.as_posix())
if not manifest_info.is_file:
raise FileNotFoundError(
"Not able to load a quadtree without a manifest file."
)
manifest = feather.read_table(basedir / "manifest.feather")
manifest = None
with filesystem.open_input_file(manifest_path.as_posix()) as f:
manifest = feather.read_table(f)
sidecars = json.loads(manifest.schema.metadata[b"sidecars"])
schema = pa.ipc.read_schema(BytesIO(bytes(manifest.schema.metadata[b"schema"])))
extent = manifest.filter(pc.equal(manifest["key"], "0/0/0"))["extent"][
Expand All @@ -479,6 +501,7 @@ def from_dir(basedir: Path, mode: Literal["read", "append"]) -> "Quadtree":
mode=mode,
sidecars=sidecars,
schema=schema,
filesystem=filesystem,
)

def tiles(self):
Expand Down Expand Up @@ -547,7 +570,7 @@ def read_root_table(self, suffix: Optional[str] = ""):
path = path.with_suffix(f".{suffix}.feather")
else:
path = path.with_suffix(".feather")
return feather.read_table(path)
return self.read_feather(path)

def finalize(self):
manifest = self.root.finalize()
Expand All @@ -559,13 +582,13 @@ def finalize(self):
"schema": self.final_schema().serialize().to_pybytes(),
}
)
feather.write_feather(
tb, self.basedir / "manifest.feather", compression="uncompressed"
)
path = self.basedir / "manifest.feather"
self.write_feather(path, tb, compression="uncompressed")

@property
def manifest_table(self):
return feather.read_table(self.basedir / "manifest.feather")
path = self.basedir / "manifest.feather"
return self.read_feather(path)

def build_bloom_index(self, id_field: str, id_sidecar: Optional[str], m=28, k=10):
"""
Expand Down Expand Up @@ -682,11 +705,12 @@ def __init__(

def write_batch_to_filehandle(self, path: Path, batch: pa.Table):
if not path in self._open_filehandles:
open_filehandle = self.quadtree.filesystem.open_output_stream(path.as_posix())
if path.suffix == ".feather":
self._open_filehandles[path] = ipc.new_file(path, schema=batch.schema)
self._open_filehandles[path] = ipc.new_file(open_filehandle, schema=batch.schema)
elif path.suffix == ".parquet":
self._open_filehandles[path] = pq.ParquetWriter(
path, schema=batch.schema
open_filehandle, schema=batch.schema
)
else:
raise NotImplementedError(
Expand Down Expand Up @@ -729,7 +753,7 @@ def bloom_filter_loc(self, id_field: str, m: int, k: int):

def build_bloom_filter(self, id_field: str, m: int, k: int):
bloom_filter_loc = self.bloom_filter_loc(id_field, m, k)
bloom_filter_loc.parent.mkdir(parents=True, exist_ok=True)
self.quadtree.filesystem.create_dir(bloom_filter_loc.parent.as_posix(), recursive=True)
if bloom_filter_loc.exists():
return

Expand All @@ -739,7 +763,7 @@ def build_bloom_filter(self, id_field: str, m: int, k: int):

# At *read* time, we are carefuly to use bitmasks.

positions = np.zeros((2**m), np.bool)
positions = np.zeros((2**m), np.bool_)
tilenames = []
id_sidecar = self.quadtree.sidecars.get(id_field, None)

Expand Down Expand Up @@ -776,8 +800,11 @@ def build_bloom_filter(self, id_field: str, m: int, k: int):
# but the reason we're using arrow at all for these is to get bitmask-
# lists in a sane form, so it's not the end of the world.

self.bloom_filter_loc(id_field, m, k).parent.mkdir(parents=True, exist_ok=True)
feather.write_feather(tb, bloom_filter_loc, compression="zstd")
self.quadtree.filesystem.create_dir(
self.bloom_filter_loc(id_field, m, k).parent.as_posix(),
recursive=True
)
self.quadtree.write_feather(bloom_filter_loc, tb, compression="zstd")

def matched_file_loc(self, new_sidecar_name, m, k):
"""
Expand All @@ -798,10 +825,10 @@ def candidate_file_loc(self, new_sidecar_name, m, k):

def bloom_filter(self, id_field, m, k):
loc = self.bloom_filter_loc(id_field, m, k)
if not loc.exists():
if not self.quadtree.file_exists(loc):
logger.debug(("MAKING BLOOM FILTER", self.coords, id_field, m, k))
self.build_bloom_filter(id_field, m, k)
tb = feather.read_table(loc)
tb = self.quadtree.read_feather(loc)
return pc.list_flatten(tb.take([0])["bitmask"])

def bloom_filters_below_here(self, id_field, m, k, inclusive=True):
Expand All @@ -813,7 +840,7 @@ def bloom_filters_below_here(self, id_field, m, k, inclusive=True):
total_filter = self.bloom_filter(id_field, m, k)
else:
raise NotImplementedError("Not implemented")
total_filter = pa.array(np.zeros(2**m), pa.bool_())
# total_filter = pa.array(np.zeros(2**m), pa.bool_())
children = self.children()
while len(children) > 0:
child = children.pop()
Expand Down Expand Up @@ -922,32 +949,32 @@ def complete_insert_stage(self, id_field, m: int, k: int, new_sidecar_name):
# Close the lagging bloom filters
self.quadtree._bloom_cache = {}

if self.candidate_file_loc(new_sidecar_name, m, k).exists():
if self.quadtree.file_exists(self.candidate_file_loc(new_sidecar_name, m, k)):
# "Candidate file loc" means that we encountered a bloom filter match
# including all of the children of this tile.
if len(self.children()) == 0:
# If we have preliminary unmerged files but no children,
# this means that we're done inserting, and can rename it
# to be a 'matched' file.
self.candidate_file_loc(new_sidecar_name, m, k).rename(
self.matched_file_loc(new_sidecar_name, m, k)
self.quadtree.filesystem.move(
self.candidate_file_loc(new_sidecar_name, m, k).as_posix(),
self.matched_file_loc(new_sidecar_name, m, k).as_posix()
)
else:
# Otherwise, we need to recursively insert that file
# at an appropriate point in the tree.
for batch in pq.ParquetFile(
self.candidate_file_loc(new_sidecar_name, m, k)
).iter_batches():
b = pa.Table.from_batches([batch])
self.insert_for_join(
b,
id_field,
m,
k,
new_sidecar_name,
)
with self.quadtree.filesystem.open_input_file(self.candidate_file_loc(new_sidecar_name, m, k).as_posix()) as f:
for batch in pq.ParquetFile(f).iter_batches():
b = pa.Table.from_batches([batch])
self.insert_for_join(
b,
id_field,
m,
k,
new_sidecar_name,
)
# Clean up the candidate file, because it has now been inserted below here.
self.candidate_file_loc(new_sidecar_name, m, k).unlink()
self.quadtree.filesystem.delete_file(self.candidate_file_loc(new_sidecar_name, m, k).as_posix())
# Close the filehandles for the children immediately to avoid
# lagging writers
self.close_filehandles()
Expand Down Expand Up @@ -976,24 +1003,26 @@ def complete_join(self, id_field, m, k, new_sidecar_name):
except KeyError:
sidecar = None

if not self.matched_file_loc(new_sidecar_name, m, k).exists():
if not self.quadtree.file_exists(self.matched_file_loc(new_sidecar_name, m, k)):
# Can happen if there are no matches.
pass

else:
for tile in self.tiles():
dest = tile.filename.with_suffix(f".{new_sidecar_name}.feather")
if dest.exists():
if self.quadtree.file_exists(dest):
raise FileExistsError(f"File {dest} already exists.")
# logger.warning(f"File {dest} already exists.")
# continue
ids = tile.read_column(id_field, sidecar)
# Read *only* the relevant rows of the parquet file.
# So long as the
matches = pq.read_table(
self.matched_file_loc(new_sidecar_name, m, k),
filters=[(id_field, "in", ids)],
)
matched_sidecar_file = self.matched_file_loc(new_sidecar_name, m, k)
with self.quadtree.filesystem.open_input_file(matched_sidecar_file.as_posix()) as f:
matches = pq.read_table(
f,
filters=[(id_field, "in", ids)],
)
sort_indices = pc.index_in(ids, matches[id_field])

# Now we reshuffle the matches to be in the same order as the original file.
Expand All @@ -1005,11 +1034,7 @@ def complete_join(self, id_field, m, k, new_sidecar_name):
towrite = towrite.combine_chunks()
except pa.lib.ArrowInvalid:
logger.warning(f"Failed to combine chunks for {tile.coords}")
feather.write_feather(
towrite,
dest,
compression="uncompressed",
)
self.quadtree.write_feather(dest, towrite, compression="uncompressed")
for mychild in self.children():
mychild.complete_join(id_field, m, k, new_sidecar_name)

Expand Down Expand Up @@ -1177,7 +1202,7 @@ def finalize(self):
# First, we close any existing overflow buffers.
tile.close_overflow_buffers()
for tile in self.iterate(direction="top-down"):
if tile.overflow_loc.exists():
if self.quadtree.file_exists(tile.overflow_loc):
# Now we can begin again with the overall budget inserting at this point.
tile.permitted_children = self.overall_tile_budget()
tables = [
Expand All @@ -1187,7 +1212,8 @@ def finalize(self):
for k in self.sidecar_names
],
]
inputs = [pa.ipc.open_file(tb) for tb in tables]
files = [self.quadtree.filesystem.open_input_file(tb.as_posix()) for tb in tables]
inputs = [pa.ipc.open_file(file) for file in files]

def yielder() -> Iterator[pa.RecordBatch]:
for i in range(inputs[0].num_record_batches):
Expand All @@ -1201,7 +1227,9 @@ def yielder() -> Iterator[pa.RecordBatch]:
# Insert in chunks of 100 megabytes
for batch in rebatch(yielder(), 100e6):
tile.insert(batch)
tile.overflow_loc.unlink()

for table in tables:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this include the deletion of tile.overflow_loc unlink() too?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep! tables includes it

self.quadtree.filesystem.delete_file(table.as_posix())
# Manifest isn't used
tile.finalize()

Expand Down Expand Up @@ -1232,7 +1260,7 @@ def filename(self) -> Path:
return self._filename
local_name = Path(*map(str, self.coords)).with_suffix(".feather")
dest_file = self.quadtree.basedir / local_name
dest_file.parent.mkdir(parents=True, exist_ok=True)
self.quadtree.filesystem.create_dir(dest_file.parent.as_posix(), recursive=True)
self._filename = dest_file
return self._filename

Expand Down Expand Up @@ -1307,10 +1335,10 @@ def flush_data(self, destination, compression):
for k, v in self.quadtree.sidecars.items():
other_tbs[v][k] = frame[k]
frame = frame.drop(k)
feather.write_feather(frame, destination, compression=compression)
self.quadtree.write_feather(destination, frame, compression=compression)
for k, v in other_tbs.items():
feather.write_feather(
pa.table(v), destination.with_suffix(f".{k}.feather"), compression
self.quadtree.write_feather(
destination.with_suffix(f".{k}.feather"), pa.table(v), compression
)
self.min_ix = minmax["min"]
self.max_ix = minmax["max"]
Expand Down Expand Up @@ -1363,7 +1391,7 @@ def overflow_buffers(self):
p = self.overflow_loc
if k != "":
p = p.with_suffix(f".{k}.arrow")
self._sinks[k] = pa.OSFile(str(p), "wb")
self._sinks[k] = self.quadtree.filesystem.open_output_stream(p.as_posix())
self._overflow_writers[k] = pa.ipc.new_file(
self._sinks[k], self.quadtree.schemas[k]
)
Expand Down Expand Up @@ -1516,8 +1544,7 @@ def read_column(self, column: str, sidecar: Optional[str]) -> pa.Array:
fname = self.filename
if sidecar is not None:
fname = self.filename.with_suffix(f".{sidecar}.feather")
return feather.read_table(fname, columns=[column])[column]

return self.quadtree.read_feather(fname, columns=[column])[column]

def get_better_codes(col, counter=Counter()):
for a in pc.value_counts(col):
Expand Down
Loading