Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions .github/scripts/s3/run-integration-aws-iam.sh
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ pushd "${repo_root}" > /dev/null
--function-name "${lambda_function_name}" \
--zip-file fileb://payload.zip \
--role "${iam_role_arn}" \
--timeout 300 \
--timeout 600 \
--memory-size 512 \
--handler lambda_function.test_runner_handler \
--runtime python3.9

Expand All @@ -58,8 +59,8 @@ pushd "${repo_root}" > /dev/null
tries=0
get_function_status_command="aws lambda get-function --region ${region_name} --function-name ${lambda_function_name}"
function_status=$(${get_function_status_command})
while [[ ( $(echo "${function_status}" | jq -r ".Configuration.State") != "Active" ) && ( $tries -ne 5 ) ]] ; do
sleep 2
while [[ ( $(echo "${function_status}" | jq -r ".Configuration.State") != "Active" ) && ( $tries -ne 15 ) ]] ; do
sleep 3
echo "Checking for function readiness; attempt: $tries"
tries=$((tries + 1))
function_status=$(${get_function_status_command})
Expand Down
36 changes: 19 additions & 17 deletions s3/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,25 @@ The S3 client requires a JSON configuration file with the following structure:

``` json
{
"bucket_name": "<string> (required)",
"credentials_source": "<string> [static|env_or_profile|none]",
"access_key_id": "<string> (required if credentials_source = 'static')",
"secret_access_key": "<string> (required if credentials_source = 'static')",
"region": "<string> (optional - default: 'us-east-1')",
"host": "<string> (optional)",
"port": <int> (optional),
"ssl_verify_peer": <bool> (optional - default: true),
"use_ssl": <bool> (optional - default: true),
"signature_version": "<string> (optional)",
"server_side_encryption": "<string> (optional)",
"sse_kms_key_id": "<string> (optional)",
"multipart_upload": <bool> (optional - default: true),
"download_concurrency": <int> (optional - default: 5),
"download_part_size": <int64> (optional - default: 5242880), # 5 MB
"upload_concurrency": <int> (optional - default: 5),
"upload_part_size": <int64> (optional - default: 5242880) # 5 MB
"bucket_name": "<string> (required)",
"credentials_source": "<string> [static|env_or_profile|none]",
"access_key_id": "<string> (required if credentials_source = 'static')",
"secret_access_key": "<string> (required if credentials_source = 'static')",
"region": "<string> (optional - default: 'us-east-1')",
"host": "<string> (optional)",
"port": <int> (optional),
"ssl_verify_peer": <bool> (optional - default: true),
"use_ssl": <bool> (optional - default: true),
"signature_version": "<string> (optional)",
"server_side_encryption": "<string> (optional)",
"sse_kms_key_id": "<string> (optional)",
"multipart_upload": <bool> (optional - default: true),
"download_concurrency": <int> (optional - default: 5),
"download_part_size": <int64> (optional - default: 5242880), # 5 MB
"upload_concurrency": <int> (optional - default: 5),
"upload_part_size": <int64> (optional - default: 5242880) # 5 MB
"multipart_copy_threshold": <int64> (optional - default: 5368709120) # default 5 GB
"multipart_copy_part_size": <int64> (optional - default: 104857600) # default 100 MB - must be at least 5 MB
}
```
> Note: **multipart_upload** is not supported by Google - it's automatically set to false by parsing the provided 'host'
Expand Down
152 changes: 142 additions & 10 deletions s3/client/aws_s3_blobstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ var oneTB = int64(1000 * 1024 * 1024 * 1024)
const (
defaultTransferConcurrency = 5
defaultTransferPartSize = int64(5 * 1024 * 1024) // 5 MB
// For copy operations: use multipart copy only when necessary (>5GB)
// AWS CopyObject limit is 5GB, use 100MB parts for multipart copy
defaultMultipartCopyThreshold = int64(5 * 1024 * 1024 * 1024) // 5 GB
defaultMultipartCopyPartSize = int64(100 * 1024 * 1024) // 100 MB
)

// awsS3Client encapsulates AWS S3 blobstore interactions
Expand Down Expand Up @@ -274,29 +278,157 @@ func (b *awsS3Client) EnsureStorageExists() error {
}

func (b *awsS3Client) Copy(srcBlob string, dstBlob string) error {
slog.Info("Copying object within s3 bucket", "bucket", b.s3cliConfig.BucketName, "source_blob", srcBlob, "destination_blob", dstBlob)
cfg := b.s3cliConfig

copySource := fmt.Sprintf("%s/%s", b.s3cliConfig.BucketName, *b.key(srcBlob))
copyThreshold := defaultMultipartCopyThreshold
if cfg.MultipartCopyThreshold > 0 {
copyThreshold = cfg.MultipartCopyThreshold
}
copyPartSize := defaultMultipartCopyPartSize
if cfg.MultipartCopyPartSize > 0 {
copyPartSize = cfg.MultipartCopyPartSize
}

_, err := b.s3Client.CopyObject(context.TODO(), &s3.CopyObjectInput{
Bucket: aws.String(b.s3cliConfig.BucketName),
headOutput, err := b.s3Client.HeadObject(context.TODO(), &s3.HeadObjectInput{
Bucket: aws.String(cfg.BucketName),
Key: b.key(srcBlob),
})
if err != nil {
return fmt.Errorf("failed to get object metadata: %w", err)
}
if headOutput.ContentLength == nil {
return errors.New("object content length is nil")
}

objectSize := *headOutput.ContentLength
Copy link
Contributor

@serdarozerr serdarozerr Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be better to return if objectSize is 0

copySource := fmt.Sprintf("%s/%s", cfg.BucketName, *b.key(srcBlob))

// Use simple copy if file is below threshold
if objectSize < copyThreshold {
slog.Info("Copying object", "source", srcBlob, "destination", dstBlob, "size", objectSize)
return b.simpleCopy(copySource, dstBlob)
}

// For large files, try multipart copy first (works for AWS, MinIO, Ceph, AliCloud)
// Fall back to simple copy if provider doesn't support UploadPartCopy (e.g., GCS)
slog.Info("Copying large object using multipart copy", "source", srcBlob, "destination", dstBlob, "size", objectSize)

err = b.multipartCopy(copySource, dstBlob, objectSize, copyPartSize)
if err != nil {
var apiErr smithy.APIError
if errors.As(err, &apiErr) && apiErr.ErrorCode() == "NotImplemented" {
slog.Info("Multipart copy not supported by provider, falling back to simple copy", "source", srcBlob, "destination", dstBlob)
return b.simpleCopy(copySource, dstBlob)
}
return err
}

return nil
}

// simpleCopy performs a single CopyObject request
func (b *awsS3Client) simpleCopy(copySource string, dstBlob string) error {
cfg := b.s3cliConfig

copyInput := &s3.CopyObjectInput{
Bucket: aws.String(cfg.BucketName),
CopySource: aws.String(copySource),
Key: b.key(dstBlob),
})
}
if cfg.ServerSideEncryption != "" {
copyInput.ServerSideEncryption = types.ServerSideEncryption(cfg.ServerSideEncryption)
}
if cfg.SSEKMSKeyID != "" {
copyInput.SSEKMSKeyId = aws.String(cfg.SSEKMSKeyID)
}

_, err := b.s3Client.CopyObject(context.TODO(), copyInput)
if err != nil {
return fmt.Errorf("failed to copy object: %w", err)
}
return nil
}

waiter := s3.NewObjectExistsWaiter(b.s3Client)
err = waiter.Wait(context.TODO(), &s3.HeadObjectInput{
Bucket: aws.String(b.s3cliConfig.BucketName),
// multipartCopy performs a multipart copy using CreateMultipartUpload, UploadPartCopy, and CompleteMultipartUpload
func (b *awsS3Client) multipartCopy(copySource string, dstBlob string, objectSize int64, copyPartSize int64) error {
cfg := b.s3cliConfig
numParts := int((objectSize + copyPartSize - 1) / copyPartSize)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe adding a comment in here makes more clear how the number of parts are calculated
example: ceiling division formula
and alternative formula ((objectSize - 1) / o.copyPartSize) + 1


createInput := &s3.CreateMultipartUploadInput{
Bucket: aws.String(cfg.BucketName),
Key: b.key(dstBlob),
}, 15*time.Minute)
}
if cfg.ServerSideEncryption != "" {
createInput.ServerSideEncryption = types.ServerSideEncryption(cfg.ServerSideEncryption)
}
if cfg.SSEKMSKeyID != "" {
createInput.SSEKMSKeyId = aws.String(cfg.SSEKMSKeyID)
}

createOutput, err := b.s3Client.CreateMultipartUpload(context.TODO(), createInput)
if err != nil {
return fmt.Errorf("failed to create multipart upload: %w", err)
}

uploadID := *createOutput.UploadId

var completed bool
defer func() {
if !completed {
_, err := b.s3Client.AbortMultipartUpload(context.TODO(), &s3.AbortMultipartUploadInput{
Bucket: aws.String(cfg.BucketName),
Key: b.key(dstBlob),
UploadId: aws.String(uploadID),
})
if err != nil {
slog.Warn("Failed to abort multipart upload", "uploadId", uploadID, "error", err)
}
}
}()

completedParts := make([]types.CompletedPart, 0, numParts)
for i := 0; i < numParts; i++ {
partNumber := int32(i + 1)
start := int64(i) * copyPartSize
end := start + copyPartSize - 1
if end >= objectSize {
end = objectSize - 1
}
byteRange := fmt.Sprintf("bytes=%d-%d", start, end)

output, err := b.s3Client.UploadPartCopy(context.TODO(), &s3.UploadPartCopyInput{
Bucket: aws.String(cfg.BucketName),
CopySource: aws.String(copySource),
CopySourceRange: aws.String(byteRange),
Key: b.key(dstBlob),
PartNumber: aws.Int32(partNumber),
UploadId: aws.String(uploadID),
})
if err != nil {
return fmt.Errorf("failed to copy part %d: %w", partNumber, err)
}

completedParts = append(completedParts, types.CompletedPart{
ETag: output.CopyPartResult.ETag,
PartNumber: aws.Int32(partNumber),
})
slog.Debug("Copied part", "part", partNumber, "range", byteRange)
}

_, err = b.s3Client.CompleteMultipartUpload(context.TODO(), &s3.CompleteMultipartUploadInput{
Bucket: aws.String(cfg.BucketName),
Key: b.key(dstBlob),
UploadId: aws.String(uploadID),
MultipartUpload: &types.CompletedMultipartUpload{
Parts: completedParts,
},
})
if err != nil {
return fmt.Errorf("failed waiting for object to exist after copy: %w", err)
return fmt.Errorf("failed to complete multipart upload: %w", err)
}

completed = true
slog.Debug("Multipart copy completed successfully", "parts", numParts)
return nil
}

Expand Down
31 changes: 26 additions & 5 deletions s3/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,19 @@ type S3Cli struct {
// Optional knobs to tune transfer performance.
// If zero, the client will apply sensible defaults (handled by the S3 client layer).
// Part size values are provided in bytes.
DownloadConcurrency int `json:"download_concurrency"`
DownloadPartSize int64 `json:"download_part_size"`
UploadConcurrency int `json:"upload_concurrency"`
UploadPartSize int64 `json:"upload_part_size"`
}
DownloadConcurrency int `json:"download_concurrency"`
DownloadPartSize int64 `json:"download_part_size"`
UploadConcurrency int `json:"upload_concurrency"`
UploadPartSize int64 `json:"upload_part_size"`
MultipartCopyThreshold int64 `json:"multipart_copy_threshold"` // Default: 5GB - files larger than this use multipart copy
MultipartCopyPartSize int64 `json:"multipart_copy_part_size"` // Default: 100MB - size of each part in multipart copy
}

const (
// multipartCopyMinPartSize is the AWS minimum part size for multipart operations.
// Other providers may have different limits - users should consult their provider's documentation.
multipartCopyMinPartSize = 5 * 1024 * 1024 // 5MB - AWS minimum part size
)

const defaultAWSRegion = "us-east-1" //nolint:unused

Expand Down Expand Up @@ -98,6 +106,19 @@ func NewFromReader(reader io.Reader) (S3Cli, error) {
return S3Cli{}, errors.New("download/upload concurrency and part sizes must be non-negative")
}

// Validate multipart copy settings (0 means "use defaults")
// Note: Default threshold is 5GB (AWS limit), but users can configure higher values for providers
// that support larger simple copies (e.g., GCS has no limit). Users should consult their provider's documentation.
if c.MultipartCopyThreshold < 0 {
return S3Cli{}, errors.New("multipart_copy_threshold must be non-negative (0 means use default)")
}
if c.MultipartCopyPartSize < 0 {
return S3Cli{}, errors.New("multipart_copy_part_size must be non-negative (0 means use default)")
}
if c.MultipartCopyPartSize > 0 && c.MultipartCopyPartSize < multipartCopyMinPartSize {
return S3Cli{}, fmt.Errorf("multipart_copy_part_size must be at least %d bytes (5MB - AWS minimum)", multipartCopyMinPartSize)
}

switch c.CredentialsSource {
case StaticCredentialsSource:
if c.AccessKeyID == "" || c.SecretAccessKey == "" {
Expand Down
87 changes: 87 additions & 0 deletions s3/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,93 @@ var _ = Describe("BlobstoreClient configuration", func() {
_, err = config.NewFromReader(dummyJSONReader)
Expect(err).To(MatchError("download/upload concurrency and part sizes must be non-negative"))
})

Describe("multipart copy tuning fields", func() {
It("rejects negative multipart copy threshold", func() {
dummyJSONBytes := []byte(`{
"access_key_id":"id",
"secret_access_key":"key",
"bucket_name":"some-bucket",
"multipart_copy_threshold": -1
}`)
dummyJSONReader := bytes.NewReader(dummyJSONBytes)

_, err := config.NewFromReader(dummyJSONReader)
Expect(err).To(MatchError("multipart_copy_threshold must be non-negative (0 means use default)"))
})

It("rejects negative multipart copy part size", func() {
dummyJSONBytes := []byte(`{
"access_key_id":"id",
"secret_access_key":"key",
"bucket_name":"some-bucket",
"multipart_copy_part_size": -1
}`)
dummyJSONReader := bytes.NewReader(dummyJSONBytes)

_, err := config.NewFromReader(dummyJSONReader)
Expect(err).To(MatchError("multipart_copy_part_size must be non-negative (0 means use default)"))
})

It("rejects multipart copy part size below AWS minimum", func() {
dummyJSONBytes := []byte(`{
"access_key_id":"id",
"secret_access_key":"key",
"bucket_name":"some-bucket",
"multipart_copy_part_size": 1048576
}`)
dummyJSONReader := bytes.NewReader(dummyJSONBytes)

_, err := config.NewFromReader(dummyJSONReader)
Expect(err).To(MatchError("multipart_copy_part_size must be at least 5242880 bytes (5MB - AWS minimum)"))
})

It("accepts zero values (use defaults)", func() {
dummyJSONBytes := []byte(`{
"access_key_id":"id",
"secret_access_key":"key",
"bucket_name":"some-bucket",
"multipart_copy_threshold": 0,
"multipart_copy_part_size": 0
}`)
dummyJSONReader := bytes.NewReader(dummyJSONBytes)

c, err := config.NewFromReader(dummyJSONReader)
Expect(err).ToNot(HaveOccurred())
Expect(c.MultipartCopyThreshold).To(Equal(int64(0)))
Expect(c.MultipartCopyPartSize).To(Equal(int64(0)))
})

It("accepts valid custom values", func() {
dummyJSONBytes := []byte(`{
"access_key_id":"id",
"secret_access_key":"key",
"bucket_name":"some-bucket",
"multipart_copy_threshold": 1073741824,
"multipart_copy_part_size": 104857600
}`)
dummyJSONReader := bytes.NewReader(dummyJSONBytes)

c, err := config.NewFromReader(dummyJSONReader)
Expect(err).ToNot(HaveOccurred())
Expect(c.MultipartCopyThreshold).To(Equal(int64(1073741824))) // 1GB
Expect(c.MultipartCopyPartSize).To(Equal(int64(104857600))) // 100MB
})

It("accepts threshold above AWS limit for providers with higher limits", func() {
dummyJSONBytes := []byte(`{
"access_key_id":"id",
"secret_access_key":"key",
"bucket_name":"some-bucket",
"multipart_copy_threshold": 10737418240
}`)
dummyJSONReader := bytes.NewReader(dummyJSONBytes)

c, err := config.NewFromReader(dummyJSONReader)
Expect(err).ToNot(HaveOccurred())
Expect(c.MultipartCopyThreshold).To(Equal(int64(10737418240))) // 10GB
})
})
})

Describe("returning the S3 endpoint", func() {
Expand Down
Loading
Loading