diff --git a/.github/scripts/s3/run-integration-aws-iam.sh b/.github/scripts/s3/run-integration-aws-iam.sh index 498acb9..471e7b9 100755 --- a/.github/scripts/s3/run-integration-aws-iam.sh +++ b/.github/scripts/s3/run-integration-aws-iam.sh @@ -49,7 +49,8 @@ pushd "${repo_root}" > /dev/null --function-name "${lambda_function_name}" \ --zip-file fileb://payload.zip \ --role "${iam_role_arn}" \ - --timeout 300 \ + --timeout 600 \ + --memory-size 512 \ --handler lambda_function.test_runner_handler \ --runtime python3.9 @@ -58,8 +59,8 @@ pushd "${repo_root}" > /dev/null tries=0 get_function_status_command="aws lambda get-function --region ${region_name} --function-name ${lambda_function_name}" function_status=$(${get_function_status_command}) - while [[ ( $(echo "${function_status}" | jq -r ".Configuration.State") != "Active" ) && ( $tries -ne 5 ) ]] ; do - sleep 2 + while [[ ( $(echo "${function_status}" | jq -r ".Configuration.State") != "Active" ) && ( $tries -ne 15 ) ]] ; do + sleep 3 echo "Checking for function readiness; attempt: $tries" tries=$((tries + 1)) function_status=$(${get_function_status_command}) diff --git a/s3/README.md b/s3/README.md index f3a30c4..9fdfa65 100644 --- a/s3/README.md +++ b/s3/README.md @@ -12,23 +12,25 @@ The S3 client requires a JSON configuration file with the following structure: ``` json { - "bucket_name": " (required)", - "credentials_source": " [static|env_or_profile|none]", - "access_key_id": " (required if credentials_source = 'static')", - "secret_access_key": " (required if credentials_source = 'static')", - "region": " (optional - default: 'us-east-1')", - "host": " (optional)", - "port": (optional), - "ssl_verify_peer": (optional - default: true), - "use_ssl": (optional - default: true), - "signature_version": " (optional)", - "server_side_encryption": " (optional)", - "sse_kms_key_id": " (optional)", - "multipart_upload": (optional - default: true), - "download_concurrency": (optional - default: 5), - "download_part_size": (optional - default: 5242880), # 5 MB - "upload_concurrency": (optional - default: 5), - "upload_part_size": (optional - default: 5242880) # 5 MB + "bucket_name": " (required)", + "credentials_source": " [static|env_or_profile|none]", + "access_key_id": " (required if credentials_source = 'static')", + "secret_access_key": " (required if credentials_source = 'static')", + "region": " (optional - default: 'us-east-1')", + "host": " (optional)", + "port": (optional), + "ssl_verify_peer": (optional - default: true), + "use_ssl": (optional - default: true), + "signature_version": " (optional)", + "server_side_encryption": " (optional)", + "sse_kms_key_id": " (optional)", + "multipart_upload": (optional - default: true), + "download_concurrency": (optional - default: 5), + "download_part_size": (optional - default: 5242880), # 5 MB + "upload_concurrency": (optional - default: 5), + "upload_part_size": (optional - default: 5242880) # 5 MB + "multipart_copy_threshold": (optional - default: 5368709120) # default 5 GB + "multipart_copy_part_size": (optional - default: 104857600) # default 100 MB - must be at least 5 MB } ``` > Note: **multipart_upload** is not supported by Google - it's automatically set to false by parsing the provided 'host' diff --git a/s3/client/aws_s3_blobstore.go b/s3/client/aws_s3_blobstore.go index f50a6d3..3c9d89e 100644 --- a/s3/client/aws_s3_blobstore.go +++ b/s3/client/aws_s3_blobstore.go @@ -29,6 +29,10 @@ var oneTB = int64(1000 * 1024 * 1024 * 1024) const ( defaultTransferConcurrency = 5 defaultTransferPartSize = int64(5 * 1024 * 1024) // 5 MB + // For copy operations: use multipart copy only when necessary (>5GB) + // AWS CopyObject limit is 5GB, use 100MB parts for multipart copy + defaultMultipartCopyThreshold = int64(5 * 1024 * 1024 * 1024) // 5 GB + defaultMultipartCopyPartSize = int64(100 * 1024 * 1024) // 100 MB ) // awsS3Client encapsulates AWS S3 blobstore interactions @@ -274,29 +278,157 @@ func (b *awsS3Client) EnsureStorageExists() error { } func (b *awsS3Client) Copy(srcBlob string, dstBlob string) error { - slog.Info("Copying object within s3 bucket", "bucket", b.s3cliConfig.BucketName, "source_blob", srcBlob, "destination_blob", dstBlob) + cfg := b.s3cliConfig - copySource := fmt.Sprintf("%s/%s", b.s3cliConfig.BucketName, *b.key(srcBlob)) + copyThreshold := defaultMultipartCopyThreshold + if cfg.MultipartCopyThreshold > 0 { + copyThreshold = cfg.MultipartCopyThreshold + } + copyPartSize := defaultMultipartCopyPartSize + if cfg.MultipartCopyPartSize > 0 { + copyPartSize = cfg.MultipartCopyPartSize + } - _, err := b.s3Client.CopyObject(context.TODO(), &s3.CopyObjectInput{ - Bucket: aws.String(b.s3cliConfig.BucketName), + headOutput, err := b.s3Client.HeadObject(context.TODO(), &s3.HeadObjectInput{ + Bucket: aws.String(cfg.BucketName), + Key: b.key(srcBlob), + }) + if err != nil { + return fmt.Errorf("failed to get object metadata: %w", err) + } + if headOutput.ContentLength == nil { + return errors.New("object content length is nil") + } + + objectSize := *headOutput.ContentLength + copySource := fmt.Sprintf("%s/%s", cfg.BucketName, *b.key(srcBlob)) + + // Use simple copy if file is below threshold + if objectSize < copyThreshold { + slog.Info("Copying object", "source", srcBlob, "destination", dstBlob, "size", objectSize) + return b.simpleCopy(copySource, dstBlob) + } + + // For large files, try multipart copy first (works for AWS, MinIO, Ceph, AliCloud) + // Fall back to simple copy if provider doesn't support UploadPartCopy (e.g., GCS) + slog.Info("Copying large object using multipart copy", "source", srcBlob, "destination", dstBlob, "size", objectSize) + + err = b.multipartCopy(copySource, dstBlob, objectSize, copyPartSize) + if err != nil { + var apiErr smithy.APIError + if errors.As(err, &apiErr) && apiErr.ErrorCode() == "NotImplemented" { + slog.Info("Multipart copy not supported by provider, falling back to simple copy", "source", srcBlob, "destination", dstBlob) + return b.simpleCopy(copySource, dstBlob) + } + return err + } + + return nil +} + +// simpleCopy performs a single CopyObject request +func (b *awsS3Client) simpleCopy(copySource string, dstBlob string) error { + cfg := b.s3cliConfig + + copyInput := &s3.CopyObjectInput{ + Bucket: aws.String(cfg.BucketName), CopySource: aws.String(copySource), Key: b.key(dstBlob), - }) + } + if cfg.ServerSideEncryption != "" { + copyInput.ServerSideEncryption = types.ServerSideEncryption(cfg.ServerSideEncryption) + } + if cfg.SSEKMSKeyID != "" { + copyInput.SSEKMSKeyId = aws.String(cfg.SSEKMSKeyID) + } + + _, err := b.s3Client.CopyObject(context.TODO(), copyInput) if err != nil { return fmt.Errorf("failed to copy object: %w", err) } + return nil +} - waiter := s3.NewObjectExistsWaiter(b.s3Client) - err = waiter.Wait(context.TODO(), &s3.HeadObjectInput{ - Bucket: aws.String(b.s3cliConfig.BucketName), +// multipartCopy performs a multipart copy using CreateMultipartUpload, UploadPartCopy, and CompleteMultipartUpload +func (b *awsS3Client) multipartCopy(copySource string, dstBlob string, objectSize int64, copyPartSize int64) error { + cfg := b.s3cliConfig + numParts := int((objectSize + copyPartSize - 1) / copyPartSize) + + createInput := &s3.CreateMultipartUploadInput{ + Bucket: aws.String(cfg.BucketName), Key: b.key(dstBlob), - }, 15*time.Minute) + } + if cfg.ServerSideEncryption != "" { + createInput.ServerSideEncryption = types.ServerSideEncryption(cfg.ServerSideEncryption) + } + if cfg.SSEKMSKeyID != "" { + createInput.SSEKMSKeyId = aws.String(cfg.SSEKMSKeyID) + } + + createOutput, err := b.s3Client.CreateMultipartUpload(context.TODO(), createInput) + if err != nil { + return fmt.Errorf("failed to create multipart upload: %w", err) + } + + uploadID := *createOutput.UploadId + + var completed bool + defer func() { + if !completed { + _, err := b.s3Client.AbortMultipartUpload(context.TODO(), &s3.AbortMultipartUploadInput{ + Bucket: aws.String(cfg.BucketName), + Key: b.key(dstBlob), + UploadId: aws.String(uploadID), + }) + if err != nil { + slog.Warn("Failed to abort multipart upload", "uploadId", uploadID, "error", err) + } + } + }() + + completedParts := make([]types.CompletedPart, 0, numParts) + for i := 0; i < numParts; i++ { + partNumber := int32(i + 1) + start := int64(i) * copyPartSize + end := start + copyPartSize - 1 + if end >= objectSize { + end = objectSize - 1 + } + byteRange := fmt.Sprintf("bytes=%d-%d", start, end) + + output, err := b.s3Client.UploadPartCopy(context.TODO(), &s3.UploadPartCopyInput{ + Bucket: aws.String(cfg.BucketName), + CopySource: aws.String(copySource), + CopySourceRange: aws.String(byteRange), + Key: b.key(dstBlob), + PartNumber: aws.Int32(partNumber), + UploadId: aws.String(uploadID), + }) + if err != nil { + return fmt.Errorf("failed to copy part %d: %w", partNumber, err) + } + completedParts = append(completedParts, types.CompletedPart{ + ETag: output.CopyPartResult.ETag, + PartNumber: aws.Int32(partNumber), + }) + slog.Debug("Copied part", "part", partNumber, "range", byteRange) + } + + _, err = b.s3Client.CompleteMultipartUpload(context.TODO(), &s3.CompleteMultipartUploadInput{ + Bucket: aws.String(cfg.BucketName), + Key: b.key(dstBlob), + UploadId: aws.String(uploadID), + MultipartUpload: &types.CompletedMultipartUpload{ + Parts: completedParts, + }, + }) if err != nil { - return fmt.Errorf("failed waiting for object to exist after copy: %w", err) + return fmt.Errorf("failed to complete multipart upload: %w", err) } + completed = true + slog.Debug("Multipart copy completed successfully", "parts", numParts) return nil } diff --git a/s3/config/config.go b/s3/config/config.go index 2241e02..b174f4d 100644 --- a/s3/config/config.go +++ b/s3/config/config.go @@ -33,11 +33,19 @@ type S3Cli struct { // Optional knobs to tune transfer performance. // If zero, the client will apply sensible defaults (handled by the S3 client layer). // Part size values are provided in bytes. - DownloadConcurrency int `json:"download_concurrency"` - DownloadPartSize int64 `json:"download_part_size"` - UploadConcurrency int `json:"upload_concurrency"` - UploadPartSize int64 `json:"upload_part_size"` -} + DownloadConcurrency int `json:"download_concurrency"` + DownloadPartSize int64 `json:"download_part_size"` + UploadConcurrency int `json:"upload_concurrency"` + UploadPartSize int64 `json:"upload_part_size"` + MultipartCopyThreshold int64 `json:"multipart_copy_threshold"` // Default: 5GB - files larger than this use multipart copy + MultipartCopyPartSize int64 `json:"multipart_copy_part_size"` // Default: 100MB - size of each part in multipart copy +} + +const ( + // multipartCopyMinPartSize is the AWS minimum part size for multipart operations. + // Other providers may have different limits - users should consult their provider's documentation. + multipartCopyMinPartSize = 5 * 1024 * 1024 // 5MB - AWS minimum part size +) const defaultAWSRegion = "us-east-1" //nolint:unused @@ -98,6 +106,19 @@ func NewFromReader(reader io.Reader) (S3Cli, error) { return S3Cli{}, errors.New("download/upload concurrency and part sizes must be non-negative") } + // Validate multipart copy settings (0 means "use defaults") + // Note: Default threshold is 5GB (AWS limit), but users can configure higher values for providers + // that support larger simple copies (e.g., GCS has no limit). Users should consult their provider's documentation. + if c.MultipartCopyThreshold < 0 { + return S3Cli{}, errors.New("multipart_copy_threshold must be non-negative (0 means use default)") + } + if c.MultipartCopyPartSize < 0 { + return S3Cli{}, errors.New("multipart_copy_part_size must be non-negative (0 means use default)") + } + if c.MultipartCopyPartSize > 0 && c.MultipartCopyPartSize < multipartCopyMinPartSize { + return S3Cli{}, fmt.Errorf("multipart_copy_part_size must be at least %d bytes (5MB - AWS minimum)", multipartCopyMinPartSize) + } + switch c.CredentialsSource { case StaticCredentialsSource: if c.AccessKeyID == "" || c.SecretAccessKey == "" { diff --git a/s3/config/config_test.go b/s3/config/config_test.go index 75b2c8c..70eca8c 100644 --- a/s3/config/config_test.go +++ b/s3/config/config_test.go @@ -327,6 +327,93 @@ var _ = Describe("BlobstoreClient configuration", func() { _, err = config.NewFromReader(dummyJSONReader) Expect(err).To(MatchError("download/upload concurrency and part sizes must be non-negative")) }) + + Describe("multipart copy tuning fields", func() { + It("rejects negative multipart copy threshold", func() { + dummyJSONBytes := []byte(`{ + "access_key_id":"id", + "secret_access_key":"key", + "bucket_name":"some-bucket", + "multipart_copy_threshold": -1 + }`) + dummyJSONReader := bytes.NewReader(dummyJSONBytes) + + _, err := config.NewFromReader(dummyJSONReader) + Expect(err).To(MatchError("multipart_copy_threshold must be non-negative (0 means use default)")) + }) + + It("rejects negative multipart copy part size", func() { + dummyJSONBytes := []byte(`{ + "access_key_id":"id", + "secret_access_key":"key", + "bucket_name":"some-bucket", + "multipart_copy_part_size": -1 + }`) + dummyJSONReader := bytes.NewReader(dummyJSONBytes) + + _, err := config.NewFromReader(dummyJSONReader) + Expect(err).To(MatchError("multipart_copy_part_size must be non-negative (0 means use default)")) + }) + + It("rejects multipart copy part size below AWS minimum", func() { + dummyJSONBytes := []byte(`{ + "access_key_id":"id", + "secret_access_key":"key", + "bucket_name":"some-bucket", + "multipart_copy_part_size": 1048576 + }`) + dummyJSONReader := bytes.NewReader(dummyJSONBytes) + + _, err := config.NewFromReader(dummyJSONReader) + Expect(err).To(MatchError("multipart_copy_part_size must be at least 5242880 bytes (5MB - AWS minimum)")) + }) + + It("accepts zero values (use defaults)", func() { + dummyJSONBytes := []byte(`{ + "access_key_id":"id", + "secret_access_key":"key", + "bucket_name":"some-bucket", + "multipart_copy_threshold": 0, + "multipart_copy_part_size": 0 + }`) + dummyJSONReader := bytes.NewReader(dummyJSONBytes) + + c, err := config.NewFromReader(dummyJSONReader) + Expect(err).ToNot(HaveOccurred()) + Expect(c.MultipartCopyThreshold).To(Equal(int64(0))) + Expect(c.MultipartCopyPartSize).To(Equal(int64(0))) + }) + + It("accepts valid custom values", func() { + dummyJSONBytes := []byte(`{ + "access_key_id":"id", + "secret_access_key":"key", + "bucket_name":"some-bucket", + "multipart_copy_threshold": 1073741824, + "multipart_copy_part_size": 104857600 + }`) + dummyJSONReader := bytes.NewReader(dummyJSONBytes) + + c, err := config.NewFromReader(dummyJSONReader) + Expect(err).ToNot(HaveOccurred()) + Expect(c.MultipartCopyThreshold).To(Equal(int64(1073741824))) // 1GB + Expect(c.MultipartCopyPartSize).To(Equal(int64(104857600))) // 100MB + }) + + It("accepts threshold above AWS limit for providers with higher limits", func() { + dummyJSONBytes := []byte(`{ + "access_key_id":"id", + "secret_access_key":"key", + "bucket_name":"some-bucket", + "multipart_copy_threshold": 10737418240 + }`) + dummyJSONReader := bytes.NewReader(dummyJSONBytes) + + c, err := config.NewFromReader(dummyJSONReader) + Expect(err).ToNot(HaveOccurred()) + Expect(c.MultipartCopyThreshold).To(Equal(int64(10737418240))) // 10GB + }) + }) }) Describe("returning the S3 endpoint", func() { diff --git a/s3/integration/assertions.go b/s3/integration/assertions.go index 44d8c9d..4fe8feb 100644 --- a/s3/integration/assertions.go +++ b/s3/integration/assertions.go @@ -139,6 +139,75 @@ func AssertLifecycleWorks(s3CLIPath string, cfg *config.S3Cli) { } +// AssertMultipartCopyWorks tests multipart copy functionality by setting a low threshold +// This forces a small file to be copied using multipart copy (2 parts) +func AssertMultipartCopyWorks(s3CLIPath string, cfg *config.S3Cli) { + storageType := "s3" + s3Filename := GenerateRandomString() + + // Create a 15MB file content (will result in 2-3 parts with 5MB minimum part size) + // We use 15MB to ensure we get at least 2 parts when threshold is set to 10MB + contentSize := 15 * 1024 * 1024 // 15 MB + expectedContent := GenerateRandomString(contentSize) + + // Configure low multipart copy threshold to force multipart copy + // Threshold: 10MB - files larger than this use multipart copy + // Part size: 5MB (AWS minimum) - so our 15MB file will be split into 3 parts + cfg.MultipartCopyThreshold = 10 * 1024 * 1024 // 10 MB + cfg.MultipartCopyPartSize = 5 * 1024 * 1024 // 5 MB (AWS minimum) + + configPath := MakeConfigFile(cfg) + defer os.Remove(configPath) //nolint:errcheck + + contentFile := MakeContentFile(expectedContent) + defer os.Remove(contentFile) //nolint:errcheck + + // Upload the test file + s3CLISession, err := RunS3CLI(s3CLIPath, configPath, storageType, "put", contentFile, s3Filename) + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + + // Copy the file - this should trigger multipart copy since file size (15MB) > threshold (10MB) + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "copy", s3Filename, s3Filename+"_multipart_copy") + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + + // Verify the copied file exists + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "exists", s3Filename+"_multipart_copy") + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + + // Download and verify content matches + tmpCopiedFile, err := os.CreateTemp("", "s3cli-download-multipart-copy") + Expect(err).ToNot(HaveOccurred()) + err = tmpCopiedFile.Close() + Expect(err).ToNot(HaveOccurred()) + defer os.Remove(tmpCopiedFile.Name()) //nolint:errcheck + + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "get", s3Filename+"_multipart_copy", tmpCopiedFile.Name()) + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + + copiedBytes, err := os.ReadFile(tmpCopiedFile.Name()) + Expect(err).ToNot(HaveOccurred()) + Expect(string(copiedBytes)).To(Equal(expectedContent)) + + // Verify file size matches + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "properties", s3Filename+"_multipart_copy") + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + Expect(s3CLISession.Out.Contents()).To(ContainSubstring(fmt.Sprintf("\"content_length\": %d", contentSize))) + + // Clean up + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "delete", s3Filename+"_multipart_copy") + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "delete", s3Filename) + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) +} + func AssertOnBulkOperations(s3CLIPath string, cfg *config.S3Cli) { storageType := "s3" numFiles := 5 diff --git a/s3/integration/aws_iam_role_test.go b/s3/integration/aws_iam_role_test.go index 9e711ce..a4f22ad 100644 --- a/s3/integration/aws_iam_role_test.go +++ b/s3/integration/aws_iam_role_test.go @@ -62,5 +62,9 @@ var _ = Describe("Testing inside an AWS compute resource with an IAM role", func func(cfg *config.S3Cli) { integration.AssertDeleteNonexistentWorks(s3CLIPath, cfg) }, configurations, ) + DescribeTable("Multipart copy works with low threshold", + func(cfg *config.S3Cli) { integration.AssertMultipartCopyWorks(s3CLIPath, cfg) }, + configurations, + ) }) }) diff --git a/s3/integration/aws_us_east_test.go b/s3/integration/aws_us_east_test.go index dc27777..cd50570 100644 --- a/s3/integration/aws_us_east_test.go +++ b/s3/integration/aws_us_east_test.go @@ -62,5 +62,9 @@ var _ = Describe("Testing only in us-east-1", func() { func(cfg *config.S3Cli) { integration.AssertDeleteNonexistentWorks(s3CLIPath, cfg) }, configurations, ) + DescribeTable("Multipart copy works with low threshold", + func(cfg *config.S3Cli) { integration.AssertMultipartCopyWorks(s3CLIPath, cfg) }, + configurations, + ) }) }) diff --git a/s3/integration/general_aws_test.go b/s3/integration/general_aws_test.go index 954f5f2..2871901 100644 --- a/s3/integration/general_aws_test.go +++ b/s3/integration/general_aws_test.go @@ -79,6 +79,10 @@ var _ = Describe("General testing for all AWS regions", func() { func(cfg *config.S3Cli) { integration.AssertOnSignedURLs(s3CLIPath, cfg) }, configurations, ) + DescribeTable("Multipart copy works with low threshold", + func(cfg *config.S3Cli) { integration.AssertMultipartCopyWorks(s3CLIPath, cfg) }, + configurations, + ) configurations = []TableEntry{ Entry("with encryption", &config.S3Cli{ diff --git a/s3/integration/s3_compatible_test.go b/s3/integration/s3_compatible_test.go index d63c64a..698ed74 100644 --- a/s3/integration/s3_compatible_test.go +++ b/s3/integration/s3_compatible_test.go @@ -89,5 +89,9 @@ var _ = Describe("Testing in any non-AWS, S3 compatible storage service", func() func(cfg *config.S3Cli) { integration.AssertOnSignedURLs(s3CLIPath, cfg) }, configurations, ) + DescribeTable("Multipart copy works with low threshold", + func(cfg *config.S3Cli) { integration.AssertMultipartCopyWorks(s3CLIPath, cfg) }, + configurations, + ) }) })