From 4b7b49b06049109d599d5db3d0ba37554bb99ccd Mon Sep 17 00:00:00 2001 From: Wolfgang Sourdeau Date: Fri, 26 Sep 2014 08:35:47 -0400 Subject: [PATCH 1/4] PLAT-692: implemented abortion of multipart uploads after an exception --- service/s3.cc | 49 ++++++++++++++++++++++++++++++++++++------------- service/s3.h | 16 +++++++++++++++- 2 files changed, 51 insertions(+), 14 deletions(-) diff --git a/service/s3.cc b/service/s3.cc index e33e8af3..a83dc3d5 100644 --- a/service/s3.cc +++ b/service/s3.cc @@ -889,6 +889,19 @@ finishMultiPartUpload(const std::string & bucket, } } +void +S3Api:: +abortMultiPartUpload(const std::string & bucket, + const std::string & resource, + const std::string & uploadId) + const +{ + auto result = erase(bucket, resource, "uploadId=" + uploadId); + if (result.code_ != 204) { + throw ML::Exception("error aborting multipart upload: " + uploadId); + } +} + void S3Api::MultiPartUploadPart:: fromXml(tinyxml2::XMLElement * element) @@ -2129,8 +2142,6 @@ struct StreamingUploadSource { void finish() { - if (exc) - std::rethrow_exception(exc); // cerr << "pushing last chunk " << chunkIndex << endl; flush(); @@ -2147,23 +2158,26 @@ struct StreamingUploadSource { // Make sure that an exception in uploading the last chunk doesn't // lead to a corrupt (truncated) file - if (exc) + if (exc) { + if (!metadata.commitOnThrow) { + owner->abortMultiPartUpload(bucket, "/" + object, + uploadId); + } std::rethrow_exception(exc); + } - string etag; try { - etag = owner->finishMultiPartUpload(bucket, "/" + object, - uploadId, - etags); + owner->finishMultiPartUpload(bucket, "/" + object, + uploadId, etags); } catch (...) { + if (!metadata.commitOnThrow) { + owner->abortMultiPartUpload(bucket, "/" + object, + uploadId); + } onException(); throw; } - //cerr << "final etag is " << etag << endl; - - if (exc) - std::rethrow_exception(exc); // double elapsed = Date::now().secondsSince(startDate); @@ -2195,6 +2209,10 @@ struct StreamingUploadSource { S3Api::Content(chunk.data, chunk.size, md5)); + if (chunk.index == metadata.throwChunk) { + throw ML::Exception("deterministic upload" + " exception"); + } if (putResult.code_ != 200) { cerr << putResult.bodyXmlStr() << endl; @@ -2537,10 +2555,15 @@ struct RegisterS3Handler { throw ML::Exception("unknown aws option " + name + "=" + value + " opening S3 object " + resource); } - else if(name == "num-threads") - { + else if (name == "num-threads") { md.numThreads = std::stoi(value); } + else if (name == "commit-on-throw") { + md.commitOnThrow = std::stoi(value); + } + else if (name == "throw-chunk") { + md.throwChunk = std::stoi(value); + } else { cerr << "warning: skipping unknown S3 option " << name << "=" << value << endl; diff --git a/service/s3.h b/service/s3.h index d4eab6c7..6215e0b8 100644 --- a/service/s3.h +++ b/service/s3.h @@ -245,7 +245,7 @@ struct S3Api : public AwsApi { ObjectMetadata(const Redundancy & redundancy) : redundancy(redundancy), serverSideEncryption(SSE_NONE), - numThreads(8) + numThreads(8), commitOnThrow(false), throwChunk(-1) { } @@ -259,6 +259,15 @@ struct S3Api : public AwsApi { std::map metadata; std::string acl; unsigned int numThreads; + + /* By default, exceptions thrown during the writing of one multipart + upload part cancels the upload. This flag enables to keep it in its + final state instead. */ + bool commitOnThrow; + + /* Index of the chunk during a write operation after which to emulate + an HTTP exception. */ + int throwChunk; }; /** Signed request that can be executed. */ @@ -558,6 +567,11 @@ struct S3Api : public AwsApi { const std::string & uploadId, const std::vector & etags) const; + void + abortMultiPartUpload(const std::string & bucket, + const std::string & resource, + const std::string & uploadId) const; + void uploadRecursive(std::string dirSrc, std::string bucketDest, bool includeDir); From 7b80902cc4d5f9ce3271155b37b7707a11cafdfa Mon Sep 17 00:00:00 2001 From: Wolfgang Sourdeau Date: Fri, 26 Sep 2014 12:22:11 -0400 Subject: [PATCH 2/4] s3: throw deterministic exceptions before sending the specified chunk --- service/s3.cc | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/service/s3.cc b/service/s3.cc index a83dc3d5..dbac3323 100644 --- a/service/s3.cc +++ b/service/s3.cc @@ -2187,6 +2187,7 @@ struct StreamingUploadSource { // << "MB/s" << " to " << etag << endl; } + /* upload threads */ void runThread() { while (!shutdown) { @@ -2199,6 +2200,11 @@ struct StreamingUploadSource { // << " with " << chunk.size << " bytes at index " // << chunk.index << endl; + if (chunk.index == metadata.throwChunk) { + throw ML::Exception("deterministic upload" + " exception at chunk %d", + chunk.index); + } // Upload the data string md5 = md5HashToHex(chunk.data, chunk.size); @@ -2209,10 +2215,6 @@ struct StreamingUploadSource { S3Api::Content(chunk.data, chunk.size, md5)); - if (chunk.index == metadata.throwChunk) { - throw ML::Exception("deterministic upload" - " exception"); - } if (putResult.code_ != 200) { cerr << putResult.bodyXmlStr() << endl; From 8d3aac396b3066d0c727a82235b45cc25e68d391 Mon Sep 17 00:00:00 2001 From: Wolfgang Sourdeau Date: Fri, 26 Sep 2014 15:39:10 -0400 Subject: [PATCH 3/4] s3: reworked exception handling --- service/s3.cc | 58 ++++++++++++++++++++++++++++++++------------------- 1 file changed, 37 insertions(+), 21 deletions(-) diff --git a/service/s3.cc b/service/s3.cc index dbac3323..019c6576 100644 --- a/service/s3.cc +++ b/service/s3.cc @@ -1978,8 +1978,8 @@ struct StreamingUploadSource { struct Impl { Impl() - : offset(0), chunkIndex(0), shutdown(false), - chunks(16) + : offset(0), chunkIndex(0), shutdown(false), chunks(16), + exceptionThrown(false), uploadAborted(false) { } @@ -2075,6 +2075,8 @@ struct StreamingUploadSource { std::mutex etagsLock; std::vector etags; std::exception_ptr exc; + bool exceptionThrown; + bool uploadAborted; ML::OnUriHandlerException onException; void start() @@ -2108,8 +2110,10 @@ struct StreamingUploadSource { std::streamsize write(const char_type* s, std::streamsize n) { - if (exc) - std::rethrow_exception(exc); + if (exc) { + handleException(); + return 0; + } size_t done = current.append(s, n); offset += done; @@ -2121,8 +2125,10 @@ struct StreamingUploadSource { //cerr << "writing " << n << " characters returned " // << done << endl; - if (exc) - std::rethrow_exception(exc); + if (exc) { + handleException(); + return 0; + } return done; } @@ -2142,6 +2148,11 @@ struct StreamingUploadSource { void finish() { + if (exc) { + handleException(); + return; + } + // cerr << "pushing last chunk " << chunkIndex << endl; flush(); @@ -2159,22 +2170,16 @@ struct StreamingUploadSource { // Make sure that an exception in uploading the last chunk doesn't // lead to a corrupt (truncated) file if (exc) { - if (!metadata.commitOnThrow) { - owner->abortMultiPartUpload(bucket, "/" + object, - uploadId); - } - std::rethrow_exception(exc); + handleException(); + return; } try { - owner->finishMultiPartUpload(bucket, "/" + object, - uploadId, etags); + owner->finishMultiPartUpload(bucket, "/" + object, uploadId, + etags); } catch (...) { - if (!metadata.commitOnThrow) { - owner->abortMultiPartUpload(bucket, "/" + object, - uploadId); - } + owner->abortMultiPartUpload(bucket, "/" + object, uploadId); onException(); throw; } @@ -2187,14 +2192,28 @@ struct StreamingUploadSource { // << "MB/s" << " to " << etag << endl; } + void handleException() + { + if (!uploadAborted) { + owner->abortMultiPartUpload(bucket, "/" + object, uploadId); + uploadAborted = true; + } + if (!exceptionThrown) { + exceptionThrown = true; + std::rethrow_exception(exc); + } + } + /* upload threads */ void runThread() { while (!shutdown) { Chunk chunk; if (chunks.tryPop(chunk, 0.01)) { - if (exc) + if (exc) { + while (chunks.tryPop(chunk)); return; + } try { //cerr << "got chunk " << chunk.index // << " with " << chunk.size << " bytes at index " @@ -2560,9 +2579,6 @@ struct RegisterS3Handler { else if (name == "num-threads") { md.numThreads = std::stoi(value); } - else if (name == "commit-on-throw") { - md.commitOnThrow = std::stoi(value); - } else if (name == "throw-chunk") { md.throwChunk = std::stoi(value); } From bf6c1b455736cd3bbd56983aea295801dd4df75c Mon Sep 17 00:00:00 2001 From: Wolfgang Sourdeau Date: Fri, 26 Sep 2014 16:32:30 -0400 Subject: [PATCH 4/4] leftover --- service/s3.h | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/service/s3.h b/service/s3.h index 6215e0b8..b3222a18 100644 --- a/service/s3.h +++ b/service/s3.h @@ -245,7 +245,7 @@ struct S3Api : public AwsApi { ObjectMetadata(const Redundancy & redundancy) : redundancy(redundancy), serverSideEncryption(SSE_NONE), - numThreads(8), commitOnThrow(false), throwChunk(-1) + numThreads(8), throwChunk(-1) { } @@ -260,11 +260,6 @@ struct S3Api : public AwsApi { std::string acl; unsigned int numThreads; - /* By default, exceptions thrown during the writing of one multipart - upload part cancels the upload. This flag enables to keep it in its - final state instead. */ - bool commitOnThrow; - /* Index of the chunk during a write operation after which to emulate an HTTP exception. */ int throwChunk;