diff --git a/README.md b/README.md index 44c4250..cf40024 100644 --- a/README.md +++ b/README.md @@ -440,6 +440,8 @@ cf.clear_locks() ```bash # list cloud and local directories cloudfiles ls gs://bucket-folder/ +# resumable list cloud and local directories to sqlite db +cloudfiles ls --sqlite example.db gs://bucket-folder/ # parallel file transfer, no decompression cloudfiles -p 2 cp --progress -r s3://bkt/ gs://bkt2/ # change compression type to brotli @@ -497,6 +499,33 @@ cloudfiles ls -e "gs://bucket/prefix[ab]" # cloudfiles ls gs://bucket/prefixb ``` +### `ls` sqlite + +When there is a very large directory in the cloud, sometimes we want to download the file listing and file sizes locally for ease of searching and comparison. This feature allows you to initiate resumable downloads of a given bucket or directory on Google Cloud Storage or Amazone S3 endpoints. The `file`, `mem`, and `https` protocols do not support resumption (except for the Google Storage REST API which does have support). + +```bash +cloudfiles ls --sqlite example.db gs://bucket/prefix --progress +``` + +This will start downloading the data to `example.db` in the local directory. You can then search for files in the `files` table. + +```sql +SELECT sum(size) FROM files WHERE path LIKE '%example.jpg'; +``` + +For example, one plausible use for this technique is to check whether a copy of a large dataset has missing files. Download `original.db` and `copy.db` and then you can do a set difference. + +```sql +$ sqlite3 + +> ATTACH DATABASE 'original.db' AS original_data; +> ATTACH DATABASE 'copy.db' AS copied_data; + +> SELECT path from original_data.files + EXCEPT + SELECT path from copied_data.files; +``` + ### `alias` for Alternative S3 Endpoints You can set your own protocols for S3 compatible endpoints by creating dynamic or persistent aliases. CloudFiles comes with two official s3 endpoints that are important for the Seung Lab, `matrix://` and `tigerdata://` which point to Princeton S3 endpoints. Official aliases can't be overridden. diff --git a/cloudfiles/cloudfiles.py b/cloudfiles/cloudfiles.py index d20452e..f8a5581 100644 --- a/cloudfiles/cloudfiles.py +++ b/cloudfiles/cloudfiles.py @@ -1138,7 +1138,12 @@ def touch( # ) def list( - self, prefix:str = "", flat:bool = False + self, + prefix:str = "", + flat:bool = False, + size:bool = False, + return_resume_token:bool = False, + resume_token:Optional[str] = None, ) -> Generator[str,None,None]: """ List files with the given prefix. @@ -1160,7 +1165,13 @@ def list( Return: generated sequence of file paths relative to cloudpath """ with self._get_connection() as conn: - for f in conn.list_files(prefix, flat): + for f in conn.list_files( + prefix=prefix, + flat=flat, + size=size, + resume_token=resume_token, + return_resume_token=return_resume_token, + ): yield f def transfer_to( diff --git a/cloudfiles/interfaces.py b/cloudfiles/interfaces.py index 6c724e0..751596f 100644 --- a/cloudfiles/interfaces.py +++ b/cloudfiles/interfaces.py @@ -1,3 +1,5 @@ +from typing import Optional + import base64 import binascii from collections import defaultdict, namedtuple @@ -346,7 +348,14 @@ def delete_files(self, file_paths): for path in file_paths: self.delete_file(path) - def list_files(self, prefix, flat): + def list_files( + self, + prefix:str, + flat:bool = False, + size:bool = False, + return_resume_token:bool = False, + resume_token:Optional[str] = None, + ): """ List the files in the layer with the given prefix. @@ -354,7 +363,6 @@ def list_files(self, prefix, flat): while non-flat means generate all file paths with that prefix. """ - layer_path = self.get_path_to_file("") path = os.path.join(layer_path, prefix) @@ -405,8 +413,16 @@ def stripext(fname): filenames = list(map(stripext, filenames)) filenames.sort() - return iter(filenames) - + + if not size and not return_resume_token: + return iter(filenames) + elif size and return_resume_token: + return ( (filename, os.path.getsize(filename), None) for filename in filenames ) + elif size and not return_resume_token: + return ( (filename, os.path.getsize(filename)) for filename in filenames ) + elif not size and return_resume_token: + return ( (filename, None) for filename in filenames ) + class MemoryInterface(StorageInterface): def __init__(self, path, secrets=None, request_payer=None, **kwargs): global MEM_BUCKET_POOL_LOCK @@ -589,7 +605,14 @@ def delete_files(self, file_paths): for path in file_paths: self.delete_file(path) - def list_files(self, prefix, flat): + def list_files( + self, + prefix:str, + flat:bool = False, + size:bool = False, + return_resume_token:bool = False, + resume_token:Optional[str] = None, + ): """ List the files in the layer with the given prefix. @@ -631,7 +654,17 @@ def stripext(fname): filenames = list(map(stripext, filenames)) filenames.sort() - return iter(filenames) + + # The size operation could be made much faster + # but will require some surgery + if not size and not return_resume_token: + return iter(filenames) + elif size and return_resume_token: + return ( (filename, self.size(filename), None) for filename in filenames ) + elif size and not return_resume_token: + return ( (filename, self.size(filename)) for filename in filenames ) + elif not size and return_resume_token: + return ( (filename, None) for filename in filenames ) def subtree_size(self, prefix:str = "") -> tuple[int,int]: layer_path = self.get_path_to_file("") @@ -833,9 +866,15 @@ def delete_files(self, file_paths): except google.cloud.exceptions.NotFound: pass - @retry - def list_files(self, prefix, flat=False): + def list_files( + self, + prefix:str, + flat:bool = False, + size:bool = False, + return_resume_token:bool = False, + resume_token:Optional[str] = None, + ): """ List the files in the layer with the given prefix. @@ -848,29 +887,53 @@ def list_files(self, prefix, flat=False): delimiter = '/' if flat else None + items = "name" + if size: + items += ",size" + blobs = self._bucket.list_blobs( prefix=path, delimiter=delimiter, page_size=2500, - fields="items(name),nextPageToken,prefixes", + fields=f"items({items}),nextPageToken,prefixes", + page_token=resume_token, ) + def return_args(filename, blob, page): + nonlocal blobs + args = [ filename ] + if size: + args.append(blob.size) + if return_resume_token: + args.append(blobs.next_page_token) + + if len(args) == 1: + return args[0] + else: + return tuple(args) + for page in blobs.pages: if page.prefixes: - yield from ( - item.removeprefix(path) - for item in page.prefixes - ) + for item in page.prefixes: + ret = [ item.removeprefix(path) ] + if size: + ret.append(0) + if return_resume_token: + ret.append(page.next_page_token) + + if len(ret) == 1: + yield ret[0] + else: + yield tuple(ret) for blob in page: filename = blob.name.removeprefix(layer_path) if not filename: continue elif not flat and filename[-1] != '/': - yield filename + yield return_args(filename, blob, page) elif flat and '/' not in blob.name.removeprefix(path): - yield filename - + yield return_args(filename, blob, page) @retry def subtree_size(self, prefix:str = "") -> tuple[int,int]: @@ -1026,7 +1089,14 @@ def exists(self, file_path): def files_exist(self, file_paths): return {path: self.exists(path) for path in file_paths} - def _list_files_google(self, prefix, flat): + def _list_files_google( + self, + prefix:str, + flat:bool = False, + size:bool = False, + return_resume_token:bool = False, + resume_token:Optional[str] = None, + ): bucket = self._path.path.split('/')[0] prefix = posixpath.join( self._path.path.replace(bucket, '', 1), @@ -1037,10 +1107,18 @@ def _list_files_google(self, prefix, flat): headers = self.default_headers() + items = "name" + if size: + items += ",size" + + fields = f"items({items}),nextPageToken,prefixes" + @retry_if_not(AuthorizationError) def request(token): nonlocal headers - params = {} + params = { + "fields": fields, + } if prefix: params["prefix"] = prefix if token is not None: @@ -1048,6 +1126,7 @@ def request(token): if flat: params["delimiter"] = '/' + results = self.session.get( f"https://storage.googleapis.com/storage/v1/b/{bucket}/o", params=params, @@ -1067,17 +1146,33 @@ def request(token): token = None while True: results = request(token) + token = results.get("nextPageToken", None) if 'prefixes' in results: - yield from ( + itr = ( item.removeprefix(strip) for item in results["prefixes"] ) + if not size and not return_resume_token: + yield from itr + elif not size and return_resume_token: + yield from ( (pre, token) for pre in itr ) + elif size and not return_resume_token: + yield from ( (pre, 0) for pre in itr ) + else: + yield from ( (pre, 0, token) for pre in itr ) for res in results.get("items", []): - yield res["name"].removeprefix(strip) + name = res["name"].removeprefix(strip) + if not size and not return_resume_token: + yield name + elif size and not return_resume_token: + yield (name, int(res["size"])) + elif not size and return_resume_token: + yield (name, token) + else: + yield (name, int(res["size"]), token) - token = results.get("nextPageToken", None) if token is None: break @@ -1124,11 +1219,21 @@ def _list_files_apache(self, prefix, flat): if flat: break - def list_files(self, prefix, flat=False): + def list_files( + self, + prefix:str, + flat:bool = False, + size:bool = False, + return_resume_token:bool = False, + resume_token:Optional[str] = None, + ): if self._path.host == "https://storage.googleapis.com": - yield from self._list_files_google(prefix, flat) + yield from self._list_files_google(prefix, flat, size, return_resume_token, resume_token) return + if size or resume_token or return_resume_token: + raise NotImplementedError("size, resume_token, and return_resume_token are not yet implemented.") + url = posixpath.join(self._path.host, self._path.path, prefix) resp = requests.head(url) @@ -1490,7 +1595,14 @@ def delete_files(self, file_paths): **self._additional_attrs, ) - def list_files(self, prefix, flat=False): + def list_files( + self, + prefix:str, + flat:bool = False, + size:bool = False, + return_resume_token:bool = False, + resume_token:Optional[str] = None, + ): """ List the files in the layer with the given prefix. @@ -1517,7 +1629,7 @@ def s3lst(path, continuation_token=None): return self._conn.list_objects_v2(**kwargs) - resp = s3lst(path) + resp = s3lst(path, continuation_token=resume_token) # the case where the prefix is something like "build", but "build" is a subdirectory # so requery with "build/" to get the proper behavior if ( @@ -1528,7 +1640,7 @@ def s3lst(path, continuation_token=None): and len(resp.get("CommonPrefixes", [])) == 1 ): path += '/' - resp = s3lst(path) + resp = s3lst(path, continuation_token=resume_token) def iterate(resp): if 'CommonPrefixes' in resp.keys(): @@ -1540,24 +1652,37 @@ def iterate(resp): if 'Contents' not in resp.keys(): resp['Contents'] = [] + token = None + if resp["IsTruncated"]: + token = resp["NextContinuationToken"] + for item in resp['Contents']: key = item['Key'] filename = key.removeprefix(layer_path) if filename == '': continue - elif not flat and filename[-1] != '/': - yield filename - elif flat and '/' not in key.removeprefix(path): + elif flat and '/' in key.removeprefix(path): + continue + elif not flat and filename[-1] == "/": + continue + + if not size and not return_resume_token: yield filename + elif size and not return_resume_token: + yield (filename, int(item["Size"])) + elif not size and return_resume_token: + yield (filename, token) + else: + yield (filename, int(item["Size"]), token) - for filename in iterate(resp): - yield filename + for result in iterate(resp): + yield result while resp['IsTruncated'] and resp['NextContinuationToken']: resp = s3lst(path, resp['NextContinuationToken']) - for filename in iterate(resp): - yield filename + for result in iterate(resp): + yield result def subtree_size(self, prefix:str = "") -> tuple[int,int]: layer_path = self.get_path_to_file("") diff --git a/cloudfiles_cli/cloudfiles_cli.py b/cloudfiles_cli/cloudfiles_cli.py index 448fc4f..f6f2260 100644 --- a/cloudfiles_cli/cloudfiles_cli.py +++ b/cloudfiles_cli/cloudfiles_cli.py @@ -37,6 +37,8 @@ ) import cloudfiles.lib +from . import listing_db + def SI(val:int) -> str: if val < 1024: return f"{val} bytes" @@ -105,8 +107,10 @@ def license(): @click.option('--flat', is_flag=True, default=False, help='Only produce a single level of directory hierarchy.',show_default=True) @click.option('-e','--expr',is_flag=True, default=False, help=r'Use a limited regexp language (e.g. [abc123]{3}) to generate prefixes.', show_default=True) @click.option('--no-auth',is_flag=True, default=False, help='Uses the http API for read-only operations.', show_default=True) +@click.option('--sqlite', 'sqlite_filename', default="", help='Write results to this sqlite database.', show_default=True) +@click.option('--progress', is_flag=True, default=False, help="Show progress bar for sqlite.", show_default=True) @click.argument("cloudpath") -def ls(shortpath, flat, expr, cloudpath, no_auth): +def ls(shortpath, flat, expr, cloudpath, no_auth, sqlite_filename, progress): """Recursively lists the contents of a directory.""" cloudpath = normalize_path(cloudpath) @@ -123,6 +127,12 @@ def ls(shortpath, flat, expr, cloudpath, no_auth): flat = flat or flt + if sqlite_filename != "": + return listing_db.list( + cloudpath, sqlite_filename, + prefix, flat, progress + ) + cf = CloudFiles(cloudpath, no_sign_request=no_sign_request) iterables = [] if expr: diff --git a/cloudfiles_cli/listing_db.py b/cloudfiles_cli/listing_db.py new file mode 100644 index 0000000..0dbf7ce --- /dev/null +++ b/cloudfiles_cli/listing_db.py @@ -0,0 +1,174 @@ +import sqlite3 +import time + +from tqdm import tqdm + +from cloudfiles.lib import sip +from cloudfiles import CloudFiles + +# the maximum value of a host parameter number is +# SQLITE_MAX_VARIABLE_NUMBER, which defaults to 999 +# for SQLite versions prior to 3.32.0 (2020-05-22) or +# 32766 for SQLite versions after 3.32.0. +# https://www.sqlite.org/limits.html +SQLITE_MAX_PARAMS = 999 if sqlite3.sqlite_version_info <= (3,32,0) else 32766 +BIND = '?' + +def init_db(filename:str) -> sqlite3.Connection: + """Create tables, indexes, and tune SQLite for bulk-insert performance.""" + conn = sqlite3.connect(filename) + + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA synchronous=NORMAL") + conn.execute("PRAGMA cache_size=-131072") # 128 MB (negative = kilobytes) + + conn.execute(""" + CREATE TABLE IF NOT EXISTS files ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + path TEXT NOT NULL UNIQUE, + size INTEGER + ) + """) + + conn.execute(""" + CREATE TABLE IF NOT EXISTS checkpoint ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + page_token TEXT, + rows_done INTEGER DEFAULT 0, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP + ) + """) + + conn.execute( + """ + INSERT OR IGNORE + INTO checkpoint (id, page_token, rows_done) + VALUES (1, "", 0) + """ + ); + + conn.execute(""" + CREATE TABLE IF NOT EXISTS metadata ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT, + value TEXT + ) + """) + + conn.execute( + """ + INSERT OR IGNORE + INTO metadata (id, name, value) + VALUES (1, "cloudpath", "") + """ + ); + + conn.commit() + + return conn + +def get_cloudpath(conn:sqlite3.Connection) -> str: + return conn.execute( + f"SELECT value FROM metadata where name = 'cloudpath'" + ).fetchone()[0] + +def save_cloudpath(conn:sqlite3.Connection, cloudpath:str): + conn.execute( + f"UPDATE metadata SET value = ? where name = 'cloudpath'", [ cloudpath ] + ) + conn.commit() + +def save_checkpoint( + conn:sqlite3.Connection, + page_token:str|None, + rows_done:int, +) -> None: + """Persist the current GCS page token and running row count.""" + conn.execute(""" + UPDATE checkpoint + SET + page_token = ?, + rows_done = rows_done + ? + """, (page_token, rows_done)) + +def load_checkpoint(conn:sqlite3.Connection) -> tuple[str|None,int]: + """Return (page_token, rows_done) for a bucket, or (None, 0) if none.""" + row = conn.execute( + "SELECT page_token, rows_done FROM checkpoint WHERE id = 1" + ).fetchone() + return (row[0], row[1]) if row else (None, 0) + +def write_batch( + conn:sqlite3.Connection, + batch:list[tuple[str,int]], + page_token:str|None, +) -> None: + """Write a batch of (path, size) rows and save the checkpoint atomically.""" + for rows in sip(batch, SQLITE_MAX_PARAMS): + bindslist = ",".join([f"({BIND}, {BIND})"] * len(rows)) + flat_rows = [ x for row in rows for x in row ] + conn.execute( + f"INSERT INTO files (path,size) VALUES {bindslist}", + flat_rows + ) + +def list( + cloudpath:str, + db_name:str, + prefix:str = "", + flat:bool = False, + progress:bool = False, +): + with init_db(db_name) as conn: + + saved_cloudpath = get_cloudpath(conn) + if saved_cloudpath not in ("", cloudpath): + print( + f"Cloudpath does not match database. Aborting.\n" + f"Saved: {saved_cloudpath}\n" + f"Argument: {cloudpath}" + ) + return + + cf = CloudFiles(cloudpath) + + save_cloudpath(conn, cloudpath) + page_token, rows_done = load_checkpoint(conn) + + if progress and page_token is not None: + print(f"Resuming from {rows_done} rows.") + + iterator = cf.list( + prefix=prefix, + flat=flat, + size=True, + resume_token=page_token, + return_resume_token=True, + ) + + pbar = tqdm( + disable=(not progress), + desc="Files", + initial=rows_done, + ) + + with pbar: + for batch in sip(iterator, 5000): + latest_token = batch[-1][2] + batch = [ (row[0], row[1]) for row in batch ] + conn.execute("BEGIN TRANSACTION") + write_batch(conn, batch, latest_token) + save_checkpoint(conn, latest_token, len(batch)) + conn.commit() + + pbar.update(len(batch)) + + if progress: + print("creating index.") + + s = time.monotonic() + conn.execute("CREATE INDEX IF NOT EXISTS idx_files_path ON files(path)") + elapsed = time.monotonic() - s + if progress: + print(f"created index in {elapsed:.3f} sec.") + print("done.")