Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 77 additions & 58 deletions tosfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2469,29 +2469,35 @@ def _upload_chunk(self, final: bool = False) -> bool:
self.buffer = io.BytesIO()

self.buffer.seek(0)
content = self.buffer.read()
part = retryable_func_executor(
lambda: self.fs.tos_client.upload_part(
self.bucket,
self.key,
self.upload_id,
self.part_number,
content=content,
),
max_retry_num=self.fs.max_retry_num,
)
self.parts.append(
PartInfo(
part_number=self.part_number,
etag=part.etag,
part_size=len(content),
offset=None,
hash_crc64_ecma=None,
is_completed=None,
content = None
try:
content = self.buffer.read()
part = retryable_func_executor(
lambda: self.fs.tos_client.upload_part(
self.bucket,
self.key,
self.upload_id,
self.part_number,
content=content,
),
max_retry_num=self.fs.max_retry_num,
)
)
self.part_number += 1
self.buffer = io.BytesIO()
self.parts.append(
PartInfo(
part_number=self.part_number,
etag=part.etag,
part_size=len(content),
offset=None,
hash_crc64_ecma=None,
is_completed=None,
)
)
self.part_number += 1
self.buffer = io.BytesIO()
finally:
content = None
self.buffer.seek(0)
self.buffer.truncate()

if self.autocommit and final:
self.commit()
Expand Down Expand Up @@ -2543,47 +2549,60 @@ def fetch() -> bytes:
def commit(self) -> None:
"""Complete multipart upload or PUT."""
logger.debug("Commit %s", self)
if self.tell() == 0 and self.upload_id is not None:
if self.buffer is not None:
logger.debug("Empty file committed %s", self)
data = None
try:
if self.tell() == 0 and self.upload_id is not None:
if self.buffer is not None:
logger.debug("Empty file committed %s", self)
retryable_func_executor(
lambda: self.fs.tos_client.abort_multipart_upload(
self.bucket, self.key, self.upload_id
),
max_retry_num=self.fs.max_retry_num,
)
self.fs.touch(self.path, **self.kwargs)
elif self.upload_id is None and self.buffer is not None:
logger.debug("One-shot upload of %s", self)
self.buffer.seek(0)
data = self.buffer.read()
retryable_func_executor(
lambda: self.fs.tos_client.abort_multipart_upload(
self.bucket, self.key, self.upload_id
lambda: self.fs.tos_client.put_object(
self.bucket, self.key, content=data
),
max_retry_num=self.fs.max_retry_num,
)
elif self.upload_id is not None:
logger.debug("Complete multi-part upload for %s ", self)
retryable_func_executor(
lambda: self.fs.tos_client.complete_multipart_upload(
self.bucket,
self.key,
upload_id=self.upload_id,
parts=self.parts,
),
max_retry_num=self.fs.max_retry_num,
)
self.fs.touch(self.path, **self.kwargs)
elif self.upload_id is None and self.buffer is not None:
logger.debug("One-shot upload of %s", self)
self.buffer.seek(0)
data = self.buffer.read()
retryable_func_executor(
lambda: self.fs.tos_client.put_object(
self.bucket, self.key, content=data
),
max_retry_num=self.fs.max_retry_num,
)
elif self.upload_id is not None:
logger.debug("Complete multi-part upload for %s ", self)
retryable_func_executor(
lambda: self.fs.tos_client.complete_multipart_upload(
self.bucket,
self.key,
upload_id=self.upload_id,
parts=self.parts,
),
max_retry_num=self.fs.max_retry_num,
)

self.buffer = None
finally:
self.closed = True
self.buffer = None
self.upload_id = None
self.parts = []
data = None

def discard(self) -> None:
"""Close the file without writing."""
if self.upload_id:
retryable_func_executor(
lambda: self.fs.tos_client.abort_multipart_upload(
self.bucket, self.key, self.upload_id
),
max_retry_num=self.fs.max_retry_num,
)
self.buffer = None
try:
if self.upload_id:
retryable_func_executor(
lambda: self.fs.tos_client.abort_multipart_upload(
self.bucket, self.key, self.upload_id
),
max_retry_num=self.fs.max_retry_num,
)
self.buffer = None
finally:
self.closed = True
self.buffer = None
self.upload_id = None
self.parts = []