From 52c0e47548ea4625537d497ecbf6c3feccc862a5 Mon Sep 17 00:00:00 2001 From: KyungEon Choi Date: Mon, 29 Dec 2025 16:33:58 -0600 Subject: [PATCH 01/16] Add sleep before check local file size --- servicex/minio_adapter.py | 1 + 1 file changed, 1 insertion(+) diff --git a/servicex/minio_adapter.py b/servicex/minio_adapter.py index 913c6ccc..f25667d5 100644 --- a/servicex/minio_adapter.py +++ b/servicex/minio_adapter.py @@ -137,6 +137,7 @@ async def download_file( if path.exists(): # if file size is the same, let's not download anything # maybe move to a better verification mechanism with e-tags in the future + await asyncio.sleep(0.3) localsize = path.stat().st_size if localsize == remotesize: return path.resolve() From 0067e41fe4867ebeda37b74f3ee5f25b3ec06674 Mon Sep 17 00:00:00 2001 From: KyungEon Choi Date: Mon, 29 Dec 2025 16:48:44 -0600 Subject: [PATCH 02/16] Add temp file name --- servicex/minio_adapter.py | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/servicex/minio_adapter.py b/servicex/minio_adapter.py index f25667d5..95a9cf62 100644 --- a/servicex/minio_adapter.py +++ b/servicex/minio_adapter.py @@ -137,19 +137,36 @@ async def download_file( if path.exists(): # if file size is the same, let's not download anything # maybe move to a better verification mechanism with e-tags in the future - await asyncio.sleep(0.3) localsize = path.stat().st_size if localsize == remotesize: return path.resolve() + + tmp_path = path.with_suffix(path.suffix + ".part") await s3.download_file( Bucket=self.bucket, Key=object_name, - Filename=path.as_posix(), + Filename=tmp_path.as_posix(), Config=_transferconfig, ) - localsize = path.stat().st_size + + # Ensure filesystem flush visibility + await asyncio.sleep(0) + localsize = tmp_path.stat().st_size if localsize != remotesize: - raise RuntimeError(f"Download of {object_name} failed") + # tmp_path.unlink(missing_ok=True) + raise RuntimeError(f"Download of {object_name} failed: local size - {localsize}, remote size - {remotesize}") + + tmp_path.replace(path) + + # await s3.download_file( + # Bucket=self.bucket, + # Key=object_name, + # Filename=path.as_posix(), + # Config=_transferconfig, + # ) + # localsize = path.stat().st_size + # if localsize != remotesize: + # raise RuntimeError(f"Download of {object_name} failed") return path.resolve() @retry( From d037e741a095d35d1cf8b188471bd9b13ab1d457 Mon Sep 17 00:00:00 2001 From: KyungEon Choi Date: Mon, 29 Dec 2025 20:44:08 -0600 Subject: [PATCH 03/16] increase flush time --- servicex/minio_adapter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/servicex/minio_adapter.py b/servicex/minio_adapter.py index 95a9cf62..dd8f5e1f 100644 --- a/servicex/minio_adapter.py +++ b/servicex/minio_adapter.py @@ -150,7 +150,7 @@ async def download_file( ) # Ensure filesystem flush visibility - await asyncio.sleep(0) + await asyncio.sleep(0.5) localsize = tmp_path.stat().st_size if localsize != remotesize: # tmp_path.unlink(missing_ok=True) From 9f6863719636264bb02c261b0a8ad98344275f00 Mon Sep 17 00:00:00 2001 From: KyungEon Choi Date: Mon, 29 Dec 2025 20:53:14 -0600 Subject: [PATCH 04/16] Reduce boto3 concurrency --- servicex/minio_adapter.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/servicex/minio_adapter.py b/servicex/minio_adapter.py index dd8f5e1f..94bb0fa3 100644 --- a/servicex/minio_adapter.py +++ b/servicex/minio_adapter.py @@ -39,7 +39,7 @@ from servicex.models import ResultFile, TransformStatus # Maximum five simultaneous streams per individual file download -_transferconfig = TransferConfig(max_concurrency=5) +_transferconfig = TransferConfig(max_concurrency=1) # Maximum ten files simultaneously being downloaded (configurable with init_s3_config) _file_transfer_sem = asyncio.Semaphore(10) # Maximum five buckets being queried at once @@ -150,7 +150,7 @@ async def download_file( ) # Ensure filesystem flush visibility - await asyncio.sleep(0.5) + await asyncio.sleep(0.05) localsize = tmp_path.stat().st_size if localsize != remotesize: # tmp_path.unlink(missing_ok=True) From d2415683718c95b94dd4c46df9db1a63a0750d0b Mon Sep 17 00:00:00 2001 From: KyungEon Choi Date: Mon, 29 Dec 2025 21:06:35 -0600 Subject: [PATCH 05/16] Internal retry --- servicex/minio_adapter.py | 42 +++++++++++++++++++++------------------ 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/servicex/minio_adapter.py b/servicex/minio_adapter.py index 94bb0fa3..0fa4431c 100644 --- a/servicex/minio_adapter.py +++ b/servicex/minio_adapter.py @@ -107,9 +107,9 @@ async def list_bucket(self) -> List[ResultFile]: ] return rv - @retry( - stop=stop_after_attempt(3), wait=wait_random_exponential(max=60), reraise=True - ) + # @retry( + # stop=stop_after_attempt(3), wait=wait_random_exponential(max=60), reraise=True + # ) async def download_file( self, object_name: str, @@ -142,21 +142,25 @@ async def download_file( return path.resolve() tmp_path = path.with_suffix(path.suffix + ".part") - await s3.download_file( - Bucket=self.bucket, - Key=object_name, - Filename=tmp_path.as_posix(), - Config=_transferconfig, - ) - - # Ensure filesystem flush visibility - await asyncio.sleep(0.05) - localsize = tmp_path.stat().st_size - if localsize != remotesize: - # tmp_path.unlink(missing_ok=True) - raise RuntimeError(f"Download of {object_name} failed: local size - {localsize}, remote size - {remotesize}") - - tmp_path.replace(path) + for attempt in range(1, 4): + if tmp_path.exists(): + tmp_path.unlink() + + await s3.download_file( + Bucket=self.bucket, + Key=object_name, + Filename=tmp_path.as_posix(), + Config=_transferconfig, + ) + + # Ensure filesystem flush visibility + await asyncio.sleep(0) + localsize = tmp_path.stat().st_size + if localsize == remotesize: + tmp_path.replace(path) + return path.resolve() + # tmp_path.unlink(missing_ok=True) + raise RuntimeError(f"Download of {object_name} failed: local size - {localsize}, remote size - {remotesize}") # await s3.download_file( # Bucket=self.bucket, @@ -167,7 +171,7 @@ async def download_file( # localsize = path.stat().st_size # if localsize != remotesize: # raise RuntimeError(f"Download of {object_name} failed") - return path.resolve() + # return path.resolve() @retry( stop=stop_after_attempt(3), wait=wait_random_exponential(max=60), reraise=True From 9403c1c08e51b0e70405dad7ecab06f25fb0f1fa Mon Sep 17 00:00:00 2001 From: KyungEon Choi Date: Mon, 29 Dec 2025 21:14:49 -0600 Subject: [PATCH 06/16] Change boto3 setting --- servicex/minio_adapter.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/servicex/minio_adapter.py b/servicex/minio_adapter.py index 0fa4431c..9cc99874 100644 --- a/servicex/minio_adapter.py +++ b/servicex/minio_adapter.py @@ -39,7 +39,11 @@ from servicex.models import ResultFile, TransformStatus # Maximum five simultaneous streams per individual file download -_transferconfig = TransferConfig(max_concurrency=1) +_transferconfig = TransferConfig( + multipart_chunksize=16 * 1024 * 1024, + max_concurrency=4, + max_io_queue=20, +) # Maximum ten files simultaneously being downloaded (configurable with init_s3_config) _file_transfer_sem = asyncio.Semaphore(10) # Maximum five buckets being queried at once From eda843cd1fe604dc4879f7a972f03baf7778fdaf Mon Sep 17 00:00:00 2001 From: KyungEon Choi Date: Mon, 29 Dec 2025 21:55:08 -0600 Subject: [PATCH 07/16] check etag --- servicex/minio_adapter.py | 48 ++++++++++++++++----------------------- 1 file changed, 20 insertions(+), 28 deletions(-) diff --git a/servicex/minio_adapter.py b/servicex/minio_adapter.py index 9cc99874..db0f0692 100644 --- a/servicex/minio_adapter.py +++ b/servicex/minio_adapter.py @@ -39,11 +39,7 @@ from servicex.models import ResultFile, TransformStatus # Maximum five simultaneous streams per individual file download -_transferconfig = TransferConfig( - multipart_chunksize=16 * 1024 * 1024, - max_concurrency=4, - max_io_queue=20, -) +_transferconfig = TransferConfig(max_concurrency=5) # Maximum ten files simultaneously being downloaded (configurable with init_s3_config) _file_transfer_sem = asyncio.Semaphore(10) # Maximum five buckets being queried at once @@ -111,9 +107,9 @@ async def list_bucket(self) -> List[ResultFile]: ] return rv - # @retry( - # stop=stop_after_attempt(3), wait=wait_random_exponential(max=60), reraise=True - # ) + @retry( + stop=stop_after_attempt(3), wait=wait_random_exponential(max=60), reraise=True + ) async def download_file( self, object_name: str, @@ -146,25 +142,21 @@ async def download_file( return path.resolve() tmp_path = path.with_suffix(path.suffix + ".part") - for attempt in range(1, 4): - if tmp_path.exists(): - tmp_path.unlink() - - await s3.download_file( - Bucket=self.bucket, - Key=object_name, - Filename=tmp_path.as_posix(), - Config=_transferconfig, - ) - - # Ensure filesystem flush visibility - await asyncio.sleep(0) - localsize = tmp_path.stat().st_size - if localsize == remotesize: - tmp_path.replace(path) - return path.resolve() - # tmp_path.unlink(missing_ok=True) - raise RuntimeError(f"Download of {object_name} failed: local size - {localsize}, remote size - {remotesize}") + await s3.download_file( + Bucket=self.bucket, + Key=object_name, + Filename=tmp_path.as_posix(), + Config=_transferconfig, + ) + + # Ensure filesystem flush visibility + await asyncio.sleep(0.05) + localsize = tmp_path.stat().st_size + if localsize != remotesize: + # tmp_path.unlink(missing_ok=True) + raise RuntimeError(f"Download of {object_name} failed: local size - {localsize}, remote size - {remotesize}, etag - {info["ETag"].strip('"')}") + + tmp_path.replace(path) # await s3.download_file( # Bucket=self.bucket, @@ -175,7 +167,7 @@ async def download_file( # localsize = path.stat().st_size # if localsize != remotesize: # raise RuntimeError(f"Download of {object_name} failed") - # return path.resolve() + return path.resolve() @retry( stop=stop_after_attempt(3), wait=wait_random_exponential(max=60), reraise=True From 038a08b2aa6f3691e3cc9e03c638f64054734c6a Mon Sep 17 00:00:00 2001 From: KyungEon Choi Date: Mon, 29 Dec 2025 22:19:35 -0600 Subject: [PATCH 08/16] delete if size doesn't match --- servicex/minio_adapter.py | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/servicex/minio_adapter.py b/servicex/minio_adapter.py index db0f0692..9249e2e8 100644 --- a/servicex/minio_adapter.py +++ b/servicex/minio_adapter.py @@ -153,20 +153,10 @@ async def download_file( await asyncio.sleep(0.05) localsize = tmp_path.stat().st_size if localsize != remotesize: - # tmp_path.unlink(missing_ok=True) + tmp_path.unlink(missing_ok=True) raise RuntimeError(f"Download of {object_name} failed: local size - {localsize}, remote size - {remotesize}, etag - {info["ETag"].strip('"')}") - tmp_path.replace(path) - - # await s3.download_file( - # Bucket=self.bucket, - # Key=object_name, - # Filename=path.as_posix(), - # Config=_transferconfig, - # ) - # localsize = path.stat().st_size - # if localsize != remotesize: - # raise RuntimeError(f"Download of {object_name} failed") + tmp_path.replace(path) return path.resolve() @retry( From f2217413892d903cba71977ebcc0b0da92d2bd2b Mon Sep 17 00:00:00 2001 From: KyungEon Choi Date: Mon, 29 Dec 2025 22:25:11 -0600 Subject: [PATCH 09/16] temp change --- servicex/minio_adapter.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/servicex/minio_adapter.py b/servicex/minio_adapter.py index 9249e2e8..005a1b9b 100644 --- a/servicex/minio_adapter.py +++ b/servicex/minio_adapter.py @@ -152,11 +152,16 @@ async def download_file( # Ensure filesystem flush visibility await asyncio.sleep(0.05) localsize = tmp_path.stat().st_size + + # compare file size if localsize != remotesize: - tmp_path.unlink(missing_ok=True) - raise RuntimeError(f"Download of {object_name} failed: local size - {localsize}, remote size - {remotesize}, etag - {info["ETag"].strip('"')}") + print(f"Download of {object_name} failed: \ + local size - {localsize}, remote size - {remotesize}") + # tmp_path.unlink(missing_ok=True) + # raise RuntimeError(f"Download of {object_name} failed: \ + # local size - {localsize}, remote size - {remotesize}") - tmp_path.replace(path) + tmp_path.replace(path) return path.resolve() @retry( From 4ff75127d7b44dd8fdb8ccb1f3422c30cb67a961 Mon Sep 17 00:00:00 2001 From: KyungEon Choi Date: Mon, 29 Dec 2025 22:38:25 -0600 Subject: [PATCH 10/16] Reduce boto3 max concurrency --- servicex/minio_adapter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/servicex/minio_adapter.py b/servicex/minio_adapter.py index 005a1b9b..64832f95 100644 --- a/servicex/minio_adapter.py +++ b/servicex/minio_adapter.py @@ -39,7 +39,7 @@ from servicex.models import ResultFile, TransformStatus # Maximum five simultaneous streams per individual file download -_transferconfig = TransferConfig(max_concurrency=5) +_transferconfig = TransferConfig(max_concurrency=2) # Maximum ten files simultaneously being downloaded (configurable with init_s3_config) _file_transfer_sem = asyncio.Semaphore(10) # Maximum five buckets being queried at once From 4ff9971ab24debfafe4f458368d14937867901b2 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 30 Dec 2025 18:55:44 +0000 Subject: [PATCH 11/16] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- servicex/minio_adapter.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/servicex/minio_adapter.py b/servicex/minio_adapter.py index 64832f95..3508896c 100644 --- a/servicex/minio_adapter.py +++ b/servicex/minio_adapter.py @@ -155,8 +155,10 @@ async def download_file( # compare file size if localsize != remotesize: - print(f"Download of {object_name} failed: \ - local size - {localsize}, remote size - {remotesize}") + print( + f"Download of {object_name} failed: \ + local size - {localsize}, remote size - {remotesize}" + ) # tmp_path.unlink(missing_ok=True) # raise RuntimeError(f"Download of {object_name} failed: \ # local size - {localsize}, remote size - {remotesize}") From 4c6adb3f9c6decc50d2742f0163038b322754923 Mon Sep 17 00:00:00 2001 From: KyungEon Choi Date: Tue, 30 Dec 2025 16:47:42 -0600 Subject: [PATCH 12/16] temp check asyncio tasks --- servicex/query_core.py | 1 + 1 file changed, 1 insertion(+) diff --git a/servicex/query_core.py b/servicex/query_core.py index fb056703..88778444 100644 --- a/servicex/query_core.py +++ b/servicex/query_core.py @@ -714,6 +714,7 @@ async def get_signed_url( break # Now just wait until all of our tasks complete + print(f"Total tasks alive before gather: {len(asyncio.all_tasks())}") await asyncio.gather(*download_tasks) return result_uris From 7eb6ee20bf05982fabff6592b87abafb9ef5eb07 Mon Sep 17 00:00:00 2001 From: KyungEon Choi Date: Fri, 2 Jan 2026 14:49:10 -0600 Subject: [PATCH 13/16] Max asyncio tasks --- servicex/minio_adapter.py | 14 +++++--------- servicex/query_core.py | 9 +++++++-- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/servicex/minio_adapter.py b/servicex/minio_adapter.py index 3508896c..d045b35e 100644 --- a/servicex/minio_adapter.py +++ b/servicex/minio_adapter.py @@ -39,7 +39,7 @@ from servicex.models import ResultFile, TransformStatus # Maximum five simultaneous streams per individual file download -_transferconfig = TransferConfig(max_concurrency=2) +_transferconfig = TransferConfig(max_concurrency=5) # Maximum ten files simultaneously being downloaded (configurable with init_s3_config) _file_transfer_sem = asyncio.Semaphore(10) # Maximum five buckets being queried at once @@ -155,14 +155,10 @@ async def download_file( # compare file size if localsize != remotesize: - print( - f"Download of {object_name} failed: \ - local size - {localsize}, remote size - {remotesize}" - ) - # tmp_path.unlink(missing_ok=True) - # raise RuntimeError(f"Download of {object_name} failed: \ - # local size - {localsize}, remote size - {remotesize}") - + tmp_path.unlink(missing_ok=True) + raise RuntimeError(f"Download of {object_name} failed:\n" + f" Local size - {localsize}\n" + f" Remote size - {remotesize}") tmp_path.replace(path) return path.resolve() diff --git a/servicex/query_core.py b/servicex/query_core.py index 88778444..826eafca 100644 --- a/servicex/query_core.py +++ b/servicex/query_core.py @@ -714,8 +714,13 @@ async def get_signed_url( break # Now just wait until all of our tasks complete - print(f"Total tasks alive before gather: {len(asyncio.all_tasks())}") - await asyncio.gather(*download_tasks) + MAX_INFLIGHT = 100 + if len(download_tasks) >= MAX_INFLIGHT: + await asyncio.gather(*download_tasks) + download_tasks.clear() + + # print(f"Total tasks alive before gather: {len(asyncio.all_tasks())}") + # await asyncio.gather(*download_tasks) return result_uris async def as_files_async( From c7960058af5656873e3f34466155b55847522060 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 2 Jan 2026 20:49:23 +0000 Subject: [PATCH 14/16] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- servicex/minio_adapter.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/servicex/minio_adapter.py b/servicex/minio_adapter.py index d045b35e..a311b6e9 100644 --- a/servicex/minio_adapter.py +++ b/servicex/minio_adapter.py @@ -156,9 +156,11 @@ async def download_file( # compare file size if localsize != remotesize: tmp_path.unlink(missing_ok=True) - raise RuntimeError(f"Download of {object_name} failed:\n" - f" Local size - {localsize}\n" - f" Remote size - {remotesize}") + raise RuntimeError( + f"Download of {object_name} failed:\n" + f" Local size - {localsize}\n" + f" Remote size - {remotesize}" + ) tmp_path.replace(path) return path.resolve() From 1d3838cbd713852477c1f3f1066f9881d3808e90 Mon Sep 17 00:00:00 2001 From: KyungEon Choi Date: Wed, 18 Feb 2026 21:16:49 -0600 Subject: [PATCH 15/16] Disable expected size --- servicex/query_core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/servicex/query_core.py b/servicex/query_core.py index 826eafca..b11919ff 100644 --- a/servicex/query_core.py +++ b/servicex/query_core.py @@ -691,7 +691,7 @@ async def get_signed_url( progress, download_progress, shorten_filename=self.configuration.shortened_downloaded_filename, # NOQA: E501 - expected_size=expected_size, + # expected_size=expected_size, ) ) ) # NOQA 501 From 60e11e8a43235cc66f058b1aa7bba43dcfc1fd29 Mon Sep 17 00:00:00 2001 From: KyungEon Choi Date: Wed, 18 Feb 2026 21:25:17 -0600 Subject: [PATCH 16/16] temporary update version --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 1e050d01..2f8f3de9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ build-backend = "hatchling.build" [project] name = "servicex" -version = "3.3.0" +version = "3.3.1b" description = "Python SDK and CLI Client for ServiceX" readme = "README.md" license = { text = "BSD-3-Clause" } # SPDX short identifier