diff --git a/pyproject.toml b/pyproject.toml index 1e050d01..2f8f3de9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ build-backend = "hatchling.build" [project] name = "servicex" -version = "3.3.0" +version = "3.3.1b" description = "Python SDK and CLI Client for ServiceX" readme = "README.md" license = { text = "BSD-3-Clause" } # SPDX short identifier diff --git a/servicex/minio_adapter.py b/servicex/minio_adapter.py index 913c6ccc..a311b6e9 100644 --- a/servicex/minio_adapter.py +++ b/servicex/minio_adapter.py @@ -140,15 +140,28 @@ async def download_file( localsize = path.stat().st_size if localsize == remotesize: return path.resolve() + + tmp_path = path.with_suffix(path.suffix + ".part") await s3.download_file( Bucket=self.bucket, Key=object_name, - Filename=path.as_posix(), + Filename=tmp_path.as_posix(), Config=_transferconfig, ) - localsize = path.stat().st_size + + # Ensure filesystem flush visibility + await asyncio.sleep(0.05) + localsize = tmp_path.stat().st_size + + # compare file size if localsize != remotesize: - raise RuntimeError(f"Download of {object_name} failed") + tmp_path.unlink(missing_ok=True) + raise RuntimeError( + f"Download of {object_name} failed:\n" + f" Local size - {localsize}\n" + f" Remote size - {remotesize}" + ) + tmp_path.replace(path) return path.resolve() @retry( diff --git a/servicex/query_core.py b/servicex/query_core.py index fb056703..b11919ff 100644 --- a/servicex/query_core.py +++ b/servicex/query_core.py @@ -691,7 +691,7 @@ async def get_signed_url( progress, download_progress, shorten_filename=self.configuration.shortened_downloaded_filename, # NOQA: E501 - expected_size=expected_size, + # expected_size=expected_size, ) ) ) # NOQA 501 @@ -714,7 +714,13 @@ async def get_signed_url( break # Now just wait until all of our tasks complete - await asyncio.gather(*download_tasks) + MAX_INFLIGHT = 100 + if len(download_tasks) >= MAX_INFLIGHT: + await asyncio.gather(*download_tasks) + download_tasks.clear() + + # print(f"Total tasks alive before gather: {len(asyncio.all_tasks())}") + # await asyncio.gather(*download_tasks) return result_uris async def as_files_async(