From b4953f16c5d90e12ed97b6c3c53ad5859f14ddee Mon Sep 17 00:00:00 2001 From: Adam Richardson Date: Tue, 11 Aug 2020 19:12:10 -0400 Subject: [PATCH 1/2] implement ListParts endpoint --- src/main/scala/io/findify/s3mock/S3Mock.scala | 1 + .../s3mock/error/NoSuchUploadException.scala | 11 +++ .../s3mock/provider/FileProvider.scala | 34 ++++++++-- .../s3mock/provider/InMemoryProvider.scala | 34 +++++++++- .../io/findify/s3mock/provider/Provider.scala | 2 +- .../findify/s3mock/response/ListParts.scala | 33 +++++++++ .../io/findify/s3mock/route/ListParts.scala | 68 +++++++++++++++++++ 7 files changed, 174 insertions(+), 9 deletions(-) create mode 100644 src/main/scala/io/findify/s3mock/error/NoSuchUploadException.scala create mode 100644 src/main/scala/io/findify/s3mock/response/ListParts.scala create mode 100644 src/main/scala/io/findify/s3mock/route/ListParts.scala diff --git a/src/main/scala/io/findify/s3mock/S3Mock.scala b/src/main/scala/io/findify/s3mock/S3Mock.scala index 86fc07f..0efa1e9 100644 --- a/src/main/scala/io/findify/s3mock/S3Mock.scala +++ b/src/main/scala/io/findify/s3mock/S3Mock.scala @@ -44,6 +44,7 @@ class S3Mock(port:Int, provider:Provider)(implicit system:ActorSystem = ActorSys } ~ parameterMap { params => path(RemainingPath) { key => concat( + ListParts().route(bucket, key.toString()), GetObject().route(bucket, key.toString(), params), CopyObjectMultipart().route(bucket, key.toString()), CopyObject().route(bucket, key.toString()), diff --git a/src/main/scala/io/findify/s3mock/error/NoSuchUploadException.scala b/src/main/scala/io/findify/s3mock/error/NoSuchUploadException.scala new file mode 100644 index 0000000..7b425f9 --- /dev/null +++ b/src/main/scala/io/findify/s3mock/error/NoSuchUploadException.scala @@ -0,0 +1,11 @@ +package io.findify.s3mock.error + +case class NoSuchUploadException(bucket: String, key: String, uploadId: String) + extends Exception(s"upload id does not exist: $uploadId") { + def toXML = + + NoSuchUpload + The specified multipart upload does not exist + /{bucket}/{key} + +} diff --git a/src/main/scala/io/findify/s3mock/provider/FileProvider.scala b/src/main/scala/io/findify/s3mock/provider/FileProvider.scala index f9a3341..c2bf3e1 100644 --- a/src/main/scala/io/findify/s3mock/provider/FileProvider.scala +++ b/src/main/scala/io/findify/s3mock/provider/FileProvider.scala @@ -7,7 +7,7 @@ import better.files.File import better.files.File.OpenOptions import com.amazonaws.services.s3.model.ObjectMetadata import com.typesafe.scalalogging.LazyLogging -import io.findify.s3mock.error.{NoSuchBucketException, NoSuchKeyException} +import io.findify.s3mock.error.{NoSuchBucketException, NoSuchKeyException, NoSuchUploadException} import io.findify.s3mock.provider.metadata.{MapMetadataStore, MetadataStore} import io.findify.s3mock.request.{CompleteMultipartUpload, CreateBucketConfiguration} import io.findify.s3mock.response._ @@ -114,12 +114,11 @@ class FileProvider(dir:String) extends Provider with LazyLogging { override def putObjectMultipartComplete(bucket:String, key:String, uploadId:String, request:CompleteMultipartUpload): CompleteMultipartUploadResult = { val bucketFile = File(s"$dir/$bucket") if (!bucketFile.exists) throw NoSuchBucketException(bucket) - val files = request.parts.map(part => File(s"$dir/.mp/$bucket/$key/$uploadId/${part.partNumber}")) - val parts = files.map(f => f.byteArray) + val partFiles = request.parts.map(part => File(s"$dir/.mp/$bucket/$key/$uploadId/${part.partNumber}")) val file = File(s"$dir/$bucket/$key") file.createIfNotExists(createParents = true) - val data = parts.fold(Array[Byte]())(_ ++ _) - file.writeBytes(data.toIterator) + file.clear() + partFiles.foreach(partFile => file.appendByteArray(partFile.byteArray)) File(s"$dir/.mp/$bucket/$key").delete() val hash = file.md5 metadataStore.get(bucket, key).foreach {m => @@ -130,6 +129,31 @@ class FileProvider(dir:String) extends Provider with LazyLogging { CompleteMultipartUploadResult(bucket, key, hash) } + override def listParts(bucket: String, key: String, uploadId: String, markerOpt: Option[Int], countOpt: Option[Int]): ListParts = { + val marker = markerOpt.filter(0.to(10000).contains(_)).getOrElse(0) + val count = countOpt.filter(1.to(10000).contains(_)).getOrElse(10000) + + logger.debug(s"listing parts for upload $uploadId to s3://$bucket/$key (marker $marker, count $count)") + + val bucketDir = File(s"$dir/$bucket") + if (!bucketDir.exists) throw NoSuchBucketException(bucket) + + val uploadDir = File(s"$dir/.mp/$bucket/$key/$uploadId") + if (!uploadDir.exists) throw NoSuchUploadException(bucket, key, uploadId) + + val partFiles = uploadDir.list.flatMap(file => file.name.toIntOption.map { partNo => (partNo, file) }).toSeq.sortBy(_._1) + val highestNumber = partFiles.lastOption.map(_._1).getOrElse(0) + + val parts = partFiles + .filter { case (partNo, _) => partNo >= marker } + .take(count) + .map { case (partNo, file) => Part(partNo, file.md5.toLowerCase, file.size, file.lastModifiedTime) } + + val nextMarker = parts.lastOption.map(_.partNo).filter(_ < highestNumber) + + ListParts(bucket, key, uploadId, markerOpt.getOrElse(0), nextMarker, count, parts.toList) + } + override def copyObject(sourceBucket: String, sourceKey: String, destBucket: String, destKey: String, newMeta: Option[ObjectMetadata] = None): CopyObjectResult = { val sourceBucketFile = File(s"$dir/$sourceBucket") val destBucketFile = File(s"$dir/$destBucket") diff --git a/src/main/scala/io/findify/s3mock/provider/InMemoryProvider.scala b/src/main/scala/io/findify/s3mock/provider/InMemoryProvider.scala index a9f4ecd..7b55b51 100644 --- a/src/main/scala/io/findify/s3mock/provider/InMemoryProvider.scala +++ b/src/main/scala/io/findify/s3mock/provider/InMemoryProvider.scala @@ -6,7 +6,7 @@ import java.util.{Date, UUID} import akka.http.scaladsl.model.DateTime import com.amazonaws.services.s3.model.ObjectMetadata import com.typesafe.scalalogging.LazyLogging -import io.findify.s3mock.error.{NoSuchBucketException, NoSuchKeyException} +import io.findify.s3mock.error.{NoSuchBucketException, NoSuchKeyException, NoSuchUploadException} import io.findify.s3mock.provider.metadata.{InMemoryMetadataStore, MetadataStore} import io.findify.s3mock.request.{CompleteMultipartUpload, CreateBucketConfiguration} import io.findify.s3mock.response._ @@ -25,7 +25,7 @@ class InMemoryProvider extends Provider with LazyLogging { private case class KeyContents(lastModificationTime: DateTime, data: Array[Byte]) - private case class MultipartChunk(partNo: Int, data: Array[Byte]) extends Ordered[MultipartChunk] { + private case class MultipartChunk(partNo: Int, data: Array[Byte], etag: String, lastModifed: Instant) extends Ordered[MultipartChunk] { override def compare(that: MultipartChunk): Int = partNo compareTo that.partNo } @@ -117,7 +117,8 @@ class InMemoryProvider extends Provider with LazyLogging { bucketDataStore.get(bucket) match { case Some(_) => logger.debug(s"uploading multipart chunk $partNumber for s3://$bucket/$key") - multipartTempStore.getOrElseUpdate(uploadId, new mutable.TreeSet).add(MultipartChunk(partNumber, data)) + val chunk = MultipartChunk(partNumber, data, DigestUtils.md5Hex(data), Instant.now()) + multipartTempStore.getOrElseUpdate(uploadId, new mutable.TreeSet).add(chunk) case None => throw NoSuchBucketException(bucket) } } @@ -139,6 +140,33 @@ class InMemoryProvider extends Provider with LazyLogging { } } + override def listParts(bucket: String, key: String, uploadId: String, markerOpt: Option[Int], countOpt: Option[Int]): ListParts = { + val marker = markerOpt.filter(0.to(10000).contains(_)).getOrElse(0) + val count = countOpt.filter(1.to(10000).contains(_)).getOrElse(10000) + + logger.debug(s"listing parts for upload $uploadId to s3://$bucket/$key (marker $marker, count $count)") + + if(!bucketDataStore.contains(bucket)) throw NoSuchBucketException(bucket) + + multipartTempStore.get(uploadId) match { + case Some(chunks) => { + val sorted = chunks.toSeq.sortBy(_.partNo) + val highestNumber = sorted.lastOption.map(_.partNo).getOrElse(0) + + val parts = sorted + .filter(_.partNo >= marker) + .take(count) + .map(chunk => Part(chunk.partNo, chunk.etag, chunk.data.length, chunk.lastModifed)) + .toList + + val nextMarker = parts.lastOption.map(_.partNo).filter(_ < highestNumber) + + ListParts(bucket, key, uploadId, marker, nextMarker, count, parts) + } + case None => throw NoSuchUploadException(bucket, key, uploadId) + } + } + override def copyObject(sourceBucket: String, sourceKey: String, destBucket: String, destKey: String, newMeta: Option[ObjectMetadata] = None): CopyObjectResult = { (bucketDataStore.get(sourceBucket), bucketDataStore.get(destBucket)) match { case (Some(srcBucketContent), Some(dstBucketContent)) => diff --git a/src/main/scala/io/findify/s3mock/provider/Provider.scala b/src/main/scala/io/findify/s3mock/provider/Provider.scala index cef2e68..2fb3d1a 100644 --- a/src/main/scala/io/findify/s3mock/provider/Provider.scala +++ b/src/main/scala/io/findify/s3mock/provider/Provider.scala @@ -7,7 +7,6 @@ import io.findify.s3mock.response._ case class GetObjectData(bytes: Array[Byte], metadata: Option[ObjectMetadata]) - /** * Interface for provider implementations. */ @@ -21,6 +20,7 @@ trait Provider { def putObjectMultipartStart(bucket:String, key:String, metadata: ObjectMetadata):InitiateMultipartUploadResult def putObjectMultipartPart(bucket:String, key:String, partNumber:Int, uploadId:String, data:Array[Byte]):Unit def putObjectMultipartComplete(bucket:String, key:String, uploadId:String, request:CompleteMultipartUpload):CompleteMultipartUploadResult + def listParts(bucket: String, key: String, uploadId: String, marker: Option[Int], count: Option[Int]): ListParts def deleteObject(bucket:String, key:String):Unit def deleteBucket(bucket:String):Unit def copyObject(sourceBucket: String, sourceKey: String, destBucket: String, destKey: String, newMeta: Option[ObjectMetadata] = None): CopyObjectResult diff --git a/src/main/scala/io/findify/s3mock/response/ListParts.scala b/src/main/scala/io/findify/s3mock/response/ListParts.scala new file mode 100644 index 0000000..f5c54f3 --- /dev/null +++ b/src/main/scala/io/findify/s3mock/response/ListParts.scala @@ -0,0 +1,33 @@ +package io.findify.s3mock.response + +import java.time.Instant + +case class Part(partNo: Int, etag: String, size: Long, lastModified: Instant) { + def toXML = + + {partNo} + {etag} + {size} + {lastModified} + +} + +case class ListParts(bucket: String, + key: String, + uploadId: String, + marker: Int, + nextMarker: Option[Int], + maxParts: Int, + parts: List[Part]) { + def toXml = + + {bucket} + {key} + {uploadId} + {marker} + {nextMarker.map(partNo => {partNo}).toSeq} + {maxParts} + {nextMarker.isDefined} + {parts.map(_.toXML)} + +} diff --git a/src/main/scala/io/findify/s3mock/route/ListParts.scala b/src/main/scala/io/findify/s3mock/route/ListParts.scala new file mode 100644 index 0000000..3cc182d --- /dev/null +++ b/src/main/scala/io/findify/s3mock/route/ListParts.scala @@ -0,0 +1,68 @@ +package io.findify.s3mock.route + +import akka.http.scaladsl.common.{NameOptionReceptacle, NameReceptacle} +import akka.http.scaladsl.model.{ + ContentType, + HttpCharsets, + HttpEntity, + HttpResponse, + MediaTypes, + StatusCodes +} +import akka.http.scaladsl.server.Directives.{complete, get, parameters} +import com.typesafe.scalalogging.LazyLogging +import io.findify.s3mock.error.{ + InternalErrorException, + NoSuchBucketException, + NoSuchKeyException, + NoSuchUploadException +} +import io.findify.s3mock.provider.Provider +import akka.http.scaladsl.model.HttpEntity + +import scala.util.{Failure, Success, Try} + +case class ListParts()(implicit provider: Provider) extends LazyLogging { + def route(bucket: String, path: String) = get { + parameters( + Symbol("uploadId"), + new NameOptionReceptacle[Int]("part-number-marker"), + new NameOptionReceptacle[Int]("max-parts") + ) { (uploadId: String, marker: Option[Int], count: Option[Int]) => + { + complete { + logger.debug(s"list parts bucket=$bucket path=$path uploadId=$uploadId") + val (status, response) = Try( + provider.listParts(bucket, path, uploadId, marker, count) + ) match { + case Success(listParts) => + (StatusCodes.OK, listParts.toXml) + case Failure(e) => + val (message, status, response) = e match { + case e: NoSuchBucketException => + ("no such bucket", StatusCodes.NotFound, e.toXML) + case e: NoSuchUploadException => + ("no such upload", StatusCodes.NotFound, e.toXML) + case e => + ( + e.toString, + StatusCodes.InternalServerError, + InternalErrorException(e).toXML + ) + } + logger.info(s"cannot list parts of upload $uploadId in $bucket/$path: $message") + (status, response) + } + + HttpResponse( + status, + entity = HttpEntity( + ContentType(MediaTypes.`application/xml`, HttpCharsets.`UTF-8`), + response.toString + ) + ) + } + } + } + } +} From 6379e12b29db6d744ffb0cff5dce536d2eaa5e99 Mon Sep 17 00:00:00 2001 From: Adam Richardson Date: Tue, 11 Aug 2020 19:12:21 -0400 Subject: [PATCH 2/2] test ListParts endpoint --- .../io/findify/s3mock/GetPutObjectTest.scala | 4 +- .../findify/s3mock/MultipartUploadTest.scala | 24 +++++++++- .../transfermanager/MultipartUploadTest.scala | 48 +++++++++++++++++++ 3 files changed, 72 insertions(+), 4 deletions(-) create mode 100644 src/test/scala/io/findify/s3mock/transfermanager/MultipartUploadTest.scala diff --git a/src/test/scala/io/findify/s3mock/GetPutObjectTest.scala b/src/test/scala/io/findify/s3mock/GetPutObjectTest.scala index 4fd034b..1667f75 100644 --- a/src/test/scala/io/findify/s3mock/GetPutObjectTest.scala +++ b/src/test/scala/io/findify/s3mock/GetPutObjectTest.scala @@ -111,10 +111,10 @@ class GetPutObjectTest extends S3MockTest { it should "work with = in path" in { s3.createBucket("urlencoded") - s3.listBuckets().exists(_.getName == "urlencoded") shouldBe true + s3.listBuckets().asScala.exists(_.getName == "urlencoded") shouldBe true s3.putObject("urlencoded", "path/with=123/foo", "bar=") s3.putObject("urlencoded", "path/withoutequals/foo", "bar") - val result = s3.listObjects("urlencoded").getObjectSummaries.toList.map(_.getKey) + val result = s3.listObjects("urlencoded").getObjectSummaries.asScala.map(_.getKey) result shouldBe List("path/with=123/foo", "path/withoutequals/foo") getContent(s3.getObject("urlencoded", "path/with=123/foo")) shouldBe "bar=" getContent(s3.getObject("urlencoded", "path/withoutequals/foo")) shouldBe "bar" diff --git a/src/test/scala/io/findify/s3mock/MultipartUploadTest.scala b/src/test/scala/io/findify/s3mock/MultipartUploadTest.scala index 7c0f132..cda0a1a 100644 --- a/src/test/scala/io/findify/s3mock/MultipartUploadTest.scala +++ b/src/test/scala/io/findify/s3mock/MultipartUploadTest.scala @@ -2,6 +2,7 @@ package io.findify.s3mock import java.io.ByteArrayInputStream import java.nio.charset.Charset +import java.time.Instant import akka.actor.ActorSystem import akka.http.scaladsl.Http @@ -37,6 +38,25 @@ class MultipartUploadTest extends S3MockTest { response2.status.intValue() shouldBe 200 val response3 = Await.result(http.singleRequest(HttpRequest(method = HttpMethods.POST, uri = s"http://127.0.0.1:$port/getput/foo2?partNumber=2&uploadId=$uploadId", entity = "boo")), 10.minutes) response3.status.intValue() shouldBe 200 + + val response4 = Await.result(http.singleRequest(HttpRequest(method = HttpMethods.GET, uri = s"http://127.0.0.1:$port/getput/foo2?uploadId=$uploadId")), 10.minutes) + response4.status.intValue() shouldBe 200 + val data2 = Await.result(response4.entity.dataBytes.fold(ByteString(""))(_ ++ _).runWith(Sink.head), 10.seconds) + val listParts = scala.xml.XML.loadString(data2.utf8String) + (listParts \ "Bucket").text shouldBe "getput" + (listParts \ "Key").text shouldBe "foo2" + (listParts \ "UploadId").text shouldBe uploadId + (listParts \ "IsTruncated").text shouldBe "false" + val parts = (listParts \ "Part").map(part => { + Instant.parse((part \ "LastModified").text) + ( + (part \ "PartNumber").text.toInt, + (part \ "ETag").text, + (part \ "Size").text.toInt, + ) + }) + parts shouldBe Seq((1, DigestUtils.md5Hex("foo"), 3), (2, DigestUtils.md5Hex("boo"), 3)) + val commit = """ | | 1 @@ -47,8 +67,8 @@ class MultipartUploadTest extends S3MockTest { | ETag | |""".stripMargin - val response4 = Await.result(http.singleRequest(HttpRequest(method = HttpMethods.POST, uri = s"http://127.0.0.1:$port/getput/foo2?uploadId=$uploadId", entity = commit)), 10.minutes) - response4.status.intValue() shouldBe 200 + val response5 = Await.result(http.singleRequest(HttpRequest(method = HttpMethods.POST, uri = s"http://127.0.0.1:$port/getput/foo2?uploadId=$uploadId", entity = commit)), 10.minutes) + response5.status.intValue() shouldBe 200 getContent(s3.getObject("getput", "foo2")) shouldBe "fooboo" } diff --git a/src/test/scala/io/findify/s3mock/transfermanager/MultipartUploadTest.scala b/src/test/scala/io/findify/s3mock/transfermanager/MultipartUploadTest.scala new file mode 100644 index 0000000..1f79f47 --- /dev/null +++ b/src/test/scala/io/findify/s3mock/transfermanager/MultipartUploadTest.scala @@ -0,0 +1,48 @@ +package io.findify.s3mock.transfermanager + +import java.io.ByteArrayInputStream + +import com.amazonaws.services.s3.model.ObjectMetadata +import com.amazonaws.services.s3.transfer.TransferManagerBuilder +import io.findify.s3mock.S3MockTest + +class MultipartUploadTest extends S3MockTest { + override def behaviour(fixture: => Fixture) = { + val s3 = fixture.client + + val partSize = 1024L + val tm = TransferManagerBuilder + .standard() + .withMultipartUploadThreshold(partSize) + .withMinimumUploadPartSize(partSize) + .withMultipartCopyThreshold(partSize) + .withMultipartCopyPartSize(partSize) + .withS3Client(s3) + .build() + + def zeroes(size: Long): (ByteArrayInputStream, ObjectMetadata) = { + val stream = new ByteArrayInputStream(new Array[Byte](size.toInt)) + val meta = new ObjectMetadata() + meta.setContentLength(size) + (stream, meta) + } + + it should "multipart-upload files with TransferManager" in { + val (stream, meta) = zeroes(100 * partSize) + s3.createBucket("tm1") + val upload = tm.upload("tm1", "hello1", stream, meta) + val result = upload.waitForUploadResult() + result.getKey shouldBe "hello1" + } + + it should "multipart-copy files with TransferManager" in { + val (stream, meta) = zeroes(100 * partSize) + s3.createBucket("tm2") + tm.upload("tm2", "hello2", stream, meta).waitForUploadResult() + val copy = tm.copy("tm2", "hello2", "tm2", "hello2-copy") + val res = copy.waitForCopyResult() + res.getSourceKey shouldBe "hello2" + res.getDestinationKey shouldBe "hello2-copy" + } + } +}