Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ build-backend = "hatchling.build"

[project]
name = "servicex"
version = "3.3.0"
version = "3.3.1b"
description = "Python SDK and CLI Client for ServiceX"
readme = "README.md"
license = { text = "BSD-3-Clause" } # SPDX short identifier
Expand Down
19 changes: 16 additions & 3 deletions servicex/minio_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,15 +140,28 @@ async def download_file(
localsize = path.stat().st_size
if localsize == remotesize:
return path.resolve()

tmp_path = path.with_suffix(path.suffix + ".part")
await s3.download_file(
Bucket=self.bucket,
Key=object_name,
Filename=path.as_posix(),
Filename=tmp_path.as_posix(),
Config=_transferconfig,
)
localsize = path.stat().st_size

# Ensure filesystem flush visibility
await asyncio.sleep(0.05)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a magic number and might not work in general. Would it be enough to download to the .part file, rename, then stat the new file?

localsize = tmp_path.stat().st_size

# compare file size
if localsize != remotesize:
raise RuntimeError(f"Download of {object_name} failed")
tmp_path.unlink(missing_ok=True)
raise RuntimeError(
f"Download of {object_name} failed:\n"
f" Local size - {localsize}\n"
f" Remote size - {remotesize}"
)
tmp_path.replace(path)
return path.resolve()

@retry(
Expand Down
10 changes: 8 additions & 2 deletions servicex/query_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ async def get_signed_url(
progress,
download_progress,
shorten_filename=self.configuration.shortened_downloaded_filename, # NOQA: E501
expected_size=expected_size,
# expected_size=expected_size,
)
)
) # NOQA 501
Expand All @@ -714,7 +714,13 @@ async def get_signed_url(
break

# Now just wait until all of our tasks complete
await asyncio.gather(*download_tasks)
MAX_INFLIGHT = 100
if len(download_tasks) >= MAX_INFLIGHT:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we already control this via sempahore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Number of downloads is controlled by a semaphore in the minio_adapter.py, but these new lines control the number of concurrent asyncio tasks . Maybe it's okay to await for thousands of tasks but only downloading small number of tasks. At least this doesn't fix the problem.

await asyncio.gather(*download_tasks)
download_tasks.clear()

# print(f"Total tasks alive before gather: {len(asyncio.all_tasks())}")
# await asyncio.gather(*download_tasks)
return result_uris

async def as_files_async(
Expand Down
Loading