Skip to content
This repository was archived by the owner on Dec 4, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/main/scala/io/findify/s3mock/S3Mock.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class S3Mock(port:Int, provider:Provider)(implicit system:ActorSystem = ActorSys
} ~ parameterMap { params =>
path(RemainingPath) { key =>
concat(
ListParts().route(bucket, key.toString()),
GetObject().route(bucket, key.toString(), params),
CopyObjectMultipart().route(bucket, key.toString()),
CopyObject().route(bucket, key.toString()),
Expand Down
11 changes: 11 additions & 0 deletions src/main/scala/io/findify/s3mock/error/NoSuchUploadException.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.findify.s3mock.error

case class NoSuchUploadException(bucket: String, key: String, uploadId: String)
extends Exception(s"upload id does not exist: $uploadId") {
def toXML =
<Error>
<Code>NoSuchUpload</Code>
<Message>The specified multipart upload does not exist</Message>
<Resource>/{bucket}/{key}</Resource>
</Error>
}
34 changes: 29 additions & 5 deletions src/main/scala/io/findify/s3mock/provider/FileProvider.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import better.files.File
import better.files.File.OpenOptions
import com.amazonaws.services.s3.model.ObjectMetadata
import com.typesafe.scalalogging.LazyLogging
import io.findify.s3mock.error.{NoSuchBucketException, NoSuchKeyException}
import io.findify.s3mock.error.{NoSuchBucketException, NoSuchKeyException, NoSuchUploadException}
import io.findify.s3mock.provider.metadata.{MapMetadataStore, MetadataStore}
import io.findify.s3mock.request.{CompleteMultipartUpload, CreateBucketConfiguration}
import io.findify.s3mock.response._
Expand Down Expand Up @@ -114,12 +114,11 @@ class FileProvider(dir:String) extends Provider with LazyLogging {
override def putObjectMultipartComplete(bucket:String, key:String, uploadId:String, request:CompleteMultipartUpload): CompleteMultipartUploadResult = {
val bucketFile = File(s"$dir/$bucket")
if (!bucketFile.exists) throw NoSuchBucketException(bucket)
val files = request.parts.map(part => File(s"$dir/.mp/$bucket/$key/$uploadId/${part.partNumber}"))
val parts = files.map(f => f.byteArray)
val partFiles = request.parts.map(part => File(s"$dir/.mp/$bucket/$key/$uploadId/${part.partNumber}"))
val file = File(s"$dir/$bucket/$key")
file.createIfNotExists(createParents = true)
val data = parts.fold(Array[Byte]())(_ ++ _)
file.writeBytes(data.toIterator)
file.clear()
partFiles.foreach(partFile => file.appendByteArray(partFile.byteArray))
File(s"$dir/.mp/$bucket/$key").delete()
val hash = file.md5
metadataStore.get(bucket, key).foreach {m =>
Expand All @@ -130,6 +129,31 @@ class FileProvider(dir:String) extends Provider with LazyLogging {
CompleteMultipartUploadResult(bucket, key, hash)
}

override def listParts(bucket: String, key: String, uploadId: String, markerOpt: Option[Int], countOpt: Option[Int]): ListParts = {
val marker = markerOpt.filter(0.to(10000).contains(_)).getOrElse(0)
val count = countOpt.filter(1.to(10000).contains(_)).getOrElse(10000)

logger.debug(s"listing parts for upload $uploadId to s3://$bucket/$key (marker $marker, count $count)")

val bucketDir = File(s"$dir/$bucket")
if (!bucketDir.exists) throw NoSuchBucketException(bucket)

val uploadDir = File(s"$dir/.mp/$bucket/$key/$uploadId")
if (!uploadDir.exists) throw NoSuchUploadException(bucket, key, uploadId)

val partFiles = uploadDir.list.flatMap(file => file.name.toIntOption.map { partNo => (partNo, file) }).toSeq.sortBy(_._1)
val highestNumber = partFiles.lastOption.map(_._1).getOrElse(0)

val parts = partFiles
.filter { case (partNo, _) => partNo >= marker }
.take(count)
.map { case (partNo, file) => Part(partNo, file.md5.toLowerCase, file.size, file.lastModifiedTime) }

val nextMarker = parts.lastOption.map(_.partNo).filter(_ < highestNumber)

ListParts(bucket, key, uploadId, markerOpt.getOrElse(0), nextMarker, count, parts.toList)
}

override def copyObject(sourceBucket: String, sourceKey: String, destBucket: String, destKey: String, newMeta: Option[ObjectMetadata] = None): CopyObjectResult = {
val sourceBucketFile = File(s"$dir/$sourceBucket")
val destBucketFile = File(s"$dir/$destBucket")
Expand Down
34 changes: 31 additions & 3 deletions src/main/scala/io/findify/s3mock/provider/InMemoryProvider.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import java.util.{Date, UUID}
import akka.http.scaladsl.model.DateTime
import com.amazonaws.services.s3.model.ObjectMetadata
import com.typesafe.scalalogging.LazyLogging
import io.findify.s3mock.error.{NoSuchBucketException, NoSuchKeyException}
import io.findify.s3mock.error.{NoSuchBucketException, NoSuchKeyException, NoSuchUploadException}
import io.findify.s3mock.provider.metadata.{InMemoryMetadataStore, MetadataStore}
import io.findify.s3mock.request.{CompleteMultipartUpload, CreateBucketConfiguration}
import io.findify.s3mock.response._
Expand All @@ -25,7 +25,7 @@ class InMemoryProvider extends Provider with LazyLogging {

private case class KeyContents(lastModificationTime: DateTime, data: Array[Byte])

private case class MultipartChunk(partNo: Int, data: Array[Byte]) extends Ordered[MultipartChunk] {
private case class MultipartChunk(partNo: Int, data: Array[Byte], etag: String, lastModifed: Instant) extends Ordered[MultipartChunk] {
override def compare(that: MultipartChunk): Int = partNo compareTo that.partNo
}

Expand Down Expand Up @@ -117,7 +117,8 @@ class InMemoryProvider extends Provider with LazyLogging {
bucketDataStore.get(bucket) match {
case Some(_) =>
logger.debug(s"uploading multipart chunk $partNumber for s3://$bucket/$key")
multipartTempStore.getOrElseUpdate(uploadId, new mutable.TreeSet).add(MultipartChunk(partNumber, data))
val chunk = MultipartChunk(partNumber, data, DigestUtils.md5Hex(data), Instant.now())
multipartTempStore.getOrElseUpdate(uploadId, new mutable.TreeSet).add(chunk)
case None => throw NoSuchBucketException(bucket)
}
}
Expand All @@ -139,6 +140,33 @@ class InMemoryProvider extends Provider with LazyLogging {
}
}

override def listParts(bucket: String, key: String, uploadId: String, markerOpt: Option[Int], countOpt: Option[Int]): ListParts = {
val marker = markerOpt.filter(0.to(10000).contains(_)).getOrElse(0)
val count = countOpt.filter(1.to(10000).contains(_)).getOrElse(10000)

logger.debug(s"listing parts for upload $uploadId to s3://$bucket/$key (marker $marker, count $count)")

if(!bucketDataStore.contains(bucket)) throw NoSuchBucketException(bucket)

multipartTempStore.get(uploadId) match {
case Some(chunks) => {
val sorted = chunks.toSeq.sortBy(_.partNo)
val highestNumber = sorted.lastOption.map(_.partNo).getOrElse(0)

val parts = sorted
.filter(_.partNo >= marker)
.take(count)
.map(chunk => Part(chunk.partNo, chunk.etag, chunk.data.length, chunk.lastModifed))
.toList

val nextMarker = parts.lastOption.map(_.partNo).filter(_ < highestNumber)

ListParts(bucket, key, uploadId, marker, nextMarker, count, parts)
}
case None => throw NoSuchUploadException(bucket, key, uploadId)
}
}

override def copyObject(sourceBucket: String, sourceKey: String, destBucket: String, destKey: String, newMeta: Option[ObjectMetadata] = None): CopyObjectResult = {
(bucketDataStore.get(sourceBucket), bucketDataStore.get(destBucket)) match {
case (Some(srcBucketContent), Some(dstBucketContent)) =>
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/io/findify/s3mock/provider/Provider.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import io.findify.s3mock.response._


case class GetObjectData(bytes: Array[Byte], metadata: Option[ObjectMetadata])

/**
* Interface for provider implementations.
*/
Expand All @@ -21,6 +20,7 @@ trait Provider {
def putObjectMultipartStart(bucket:String, key:String, metadata: ObjectMetadata):InitiateMultipartUploadResult
def putObjectMultipartPart(bucket:String, key:String, partNumber:Int, uploadId:String, data:Array[Byte]):Unit
def putObjectMultipartComplete(bucket:String, key:String, uploadId:String, request:CompleteMultipartUpload):CompleteMultipartUploadResult
def listParts(bucket: String, key: String, uploadId: String, marker: Option[Int], count: Option[Int]): ListParts
def deleteObject(bucket:String, key:String):Unit
def deleteBucket(bucket:String):Unit
def copyObject(sourceBucket: String, sourceKey: String, destBucket: String, destKey: String, newMeta: Option[ObjectMetadata] = None): CopyObjectResult
Expand Down
33 changes: 33 additions & 0 deletions src/main/scala/io/findify/s3mock/response/ListParts.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.findify.s3mock.response

import java.time.Instant

case class Part(partNo: Int, etag: String, size: Long, lastModified: Instant) {
def toXML =
<Part>
<PartNumber>{partNo}</PartNumber>
<ETag>{etag}</ETag>
<Size>{size}</Size>
<LastModified>{lastModified}</LastModified>
</Part>
}

case class ListParts(bucket: String,
key: String,
uploadId: String,
marker: Int,
nextMarker: Option[Int],
maxParts: Int,
parts: List[Part]) {
def toXml =
<ListPartsOutput xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<Bucket>{bucket}</Bucket>
<Key>{key}</Key>
<UploadId>{uploadId}</UploadId>
<PartNumberMarker>{marker}</PartNumberMarker>
{nextMarker.map(partNo => <NextPartNumberMarker>{partNo}</NextPartNumberMarker>).toSeq}
<MaxParts>{maxParts}</MaxParts>
<IsTruncated>{nextMarker.isDefined}</IsTruncated>
{parts.map(_.toXML)}
</ListPartsOutput>
}
68 changes: 68 additions & 0 deletions src/main/scala/io/findify/s3mock/route/ListParts.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package io.findify.s3mock.route

import akka.http.scaladsl.common.{NameOptionReceptacle, NameReceptacle}
import akka.http.scaladsl.model.{
ContentType,
HttpCharsets,
HttpEntity,
HttpResponse,
MediaTypes,
StatusCodes
}
import akka.http.scaladsl.server.Directives.{complete, get, parameters}
import com.typesafe.scalalogging.LazyLogging
import io.findify.s3mock.error.{
InternalErrorException,
NoSuchBucketException,
NoSuchKeyException,
NoSuchUploadException
}
import io.findify.s3mock.provider.Provider
import akka.http.scaladsl.model.HttpEntity

import scala.util.{Failure, Success, Try}

case class ListParts()(implicit provider: Provider) extends LazyLogging {
def route(bucket: String, path: String) = get {
parameters(
Symbol("uploadId"),
new NameOptionReceptacle[Int]("part-number-marker"),
new NameOptionReceptacle[Int]("max-parts")
) { (uploadId: String, marker: Option[Int], count: Option[Int]) =>
{
complete {
logger.debug(s"list parts bucket=$bucket path=$path uploadId=$uploadId")
val (status, response) = Try(
provider.listParts(bucket, path, uploadId, marker, count)
) match {
case Success(listParts) =>
(StatusCodes.OK, listParts.toXml)
case Failure(e) =>
val (message, status, response) = e match {
case e: NoSuchBucketException =>
("no such bucket", StatusCodes.NotFound, e.toXML)
case e: NoSuchUploadException =>
("no such upload", StatusCodes.NotFound, e.toXML)
case e =>
(
e.toString,
StatusCodes.InternalServerError,
InternalErrorException(e).toXML
)
}
logger.info(s"cannot list parts of upload $uploadId in $bucket/$path: $message")
(status, response)
}

HttpResponse(
status,
entity = HttpEntity(
ContentType(MediaTypes.`application/xml`, HttpCharsets.`UTF-8`),
response.toString
)
)
}
}
}
}
}
4 changes: 2 additions & 2 deletions src/test/scala/io/findify/s3mock/GetPutObjectTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,10 @@ class GetPutObjectTest extends S3MockTest {

it should "work with = in path" in {
s3.createBucket("urlencoded")
s3.listBuckets().exists(_.getName == "urlencoded") shouldBe true
s3.listBuckets().asScala.exists(_.getName == "urlencoded") shouldBe true
s3.putObject("urlencoded", "path/with=123/foo", "bar=")
s3.putObject("urlencoded", "path/withoutequals/foo", "bar")
val result = s3.listObjects("urlencoded").getObjectSummaries.toList.map(_.getKey)
val result = s3.listObjects("urlencoded").getObjectSummaries.asScala.map(_.getKey)
result shouldBe List("path/with=123/foo", "path/withoutequals/foo")
getContent(s3.getObject("urlencoded", "path/with=123/foo")) shouldBe "bar="
getContent(s3.getObject("urlencoded", "path/withoutequals/foo")) shouldBe "bar"
Expand Down
24 changes: 22 additions & 2 deletions src/test/scala/io/findify/s3mock/MultipartUploadTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package io.findify.s3mock

import java.io.ByteArrayInputStream
import java.nio.charset.Charset
import java.time.Instant

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
Expand Down Expand Up @@ -37,6 +38,25 @@ class MultipartUploadTest extends S3MockTest {
response2.status.intValue() shouldBe 200
val response3 = Await.result(http.singleRequest(HttpRequest(method = HttpMethods.POST, uri = s"http://127.0.0.1:$port/getput/foo2?partNumber=2&uploadId=$uploadId", entity = "boo")), 10.minutes)
response3.status.intValue() shouldBe 200

val response4 = Await.result(http.singleRequest(HttpRequest(method = HttpMethods.GET, uri = s"http://127.0.0.1:$port/getput/foo2?uploadId=$uploadId")), 10.minutes)
response4.status.intValue() shouldBe 200
val data2 = Await.result(response4.entity.dataBytes.fold(ByteString(""))(_ ++ _).runWith(Sink.head), 10.seconds)
val listParts = scala.xml.XML.loadString(data2.utf8String)
(listParts \ "Bucket").text shouldBe "getput"
(listParts \ "Key").text shouldBe "foo2"
(listParts \ "UploadId").text shouldBe uploadId
(listParts \ "IsTruncated").text shouldBe "false"
val parts = (listParts \ "Part").map(part => {
Instant.parse((part \ "LastModified").text)
(
(part \ "PartNumber").text.toInt,
(part \ "ETag").text,
(part \ "Size").text.toInt,
)
})
parts shouldBe Seq((1, DigestUtils.md5Hex("foo"), 3), (2, DigestUtils.md5Hex("boo"), 3))

val commit = """<CompleteMultipartUpload>
| <Part>
| <PartNumber>1</PartNumber>
Expand All @@ -47,8 +67,8 @@ class MultipartUploadTest extends S3MockTest {
| <ETag>ETag</ETag>
| </Part>
|</CompleteMultipartUpload>""".stripMargin
val response4 = Await.result(http.singleRequest(HttpRequest(method = HttpMethods.POST, uri = s"http://127.0.0.1:$port/getput/foo2?uploadId=$uploadId", entity = commit)), 10.minutes)
response4.status.intValue() shouldBe 200
val response5 = Await.result(http.singleRequest(HttpRequest(method = HttpMethods.POST, uri = s"http://127.0.0.1:$port/getput/foo2?uploadId=$uploadId", entity = commit)), 10.minutes)
response5.status.intValue() shouldBe 200

getContent(s3.getObject("getput", "foo2")) shouldBe "fooboo"
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package io.findify.s3mock.transfermanager

import java.io.ByteArrayInputStream

import com.amazonaws.services.s3.model.ObjectMetadata
import com.amazonaws.services.s3.transfer.TransferManagerBuilder
import io.findify.s3mock.S3MockTest

class MultipartUploadTest extends S3MockTest {
override def behaviour(fixture: => Fixture) = {
val s3 = fixture.client

val partSize = 1024L
val tm = TransferManagerBuilder
.standard()
.withMultipartUploadThreshold(partSize)
.withMinimumUploadPartSize(partSize)
.withMultipartCopyThreshold(partSize)
.withMultipartCopyPartSize(partSize)
.withS3Client(s3)
.build()

def zeroes(size: Long): (ByteArrayInputStream, ObjectMetadata) = {
val stream = new ByteArrayInputStream(new Array[Byte](size.toInt))
val meta = new ObjectMetadata()
meta.setContentLength(size)
(stream, meta)
}

it should "multipart-upload files with TransferManager" in {
val (stream, meta) = zeroes(100 * partSize)
s3.createBucket("tm1")
val upload = tm.upload("tm1", "hello1", stream, meta)
val result = upload.waitForUploadResult()
result.getKey shouldBe "hello1"
}

it should "multipart-copy files with TransferManager" in {
val (stream, meta) = zeroes(100 * partSize)
s3.createBucket("tm2")
tm.upload("tm2", "hello2", stream, meta).waitForUploadResult()
val copy = tm.copy("tm2", "hello2", "tm2", "hello2-copy")
val res = copy.waitForCopyResult()
res.getSourceKey shouldBe "hello2"
res.getDestinationKey shouldBe "hello2-copy"
}
}
}