From faa5ebe8ffb737dde8b619aca344f1ec452b6671 Mon Sep 17 00:00:00 2001 From: johha Date: Wed, 4 Feb 2026 10:30:44 +0100 Subject: [PATCH 1/5] Implement missing operations for s3 Adds s3 implementation for commands `ensure-bucket-exists`, `copy`, `properties`, `list` & `delete-recursive` including integration tests. --- README.md | 1 - s3/client/aws_s3_blobstore.go | 178 +++++++++++++++++++++++++++ s3/client/client.go | 12 +- s3/integration/assertions.go | 128 +++++++++++++++++++ s3/integration/aws_iam_role_test.go | 8 ++ s3/integration/aws_us_east_test.go | 8 ++ s3/integration/general_aws_test.go | 9 ++ s3/integration/s3_compatible_test.go | 8 ++ 8 files changed, 344 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index ebcb763..bbc9995 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,6 @@ Key points - additional endpoints needed by CAPI still missing - [Gcs](./gcs/README.md) - [S3](./s3/README.md) - - additional endpoints needed by CAPI still missing ## Build diff --git a/s3/client/aws_s3_blobstore.go b/s3/client/aws_s3_blobstore.go index 1e2f5e1..95446ae 100644 --- a/s3/client/aws_s3_blobstore.go +++ b/s3/client/aws_s3_blobstore.go @@ -1,6 +1,7 @@ package client import ( + "encoding/json" "errors" "fmt" "io" @@ -228,3 +229,180 @@ func (b *awsS3Client) putSigned(objectID string, expiration time.Duration) (stri return req.URL, nil } + +func (b *awsS3Client) EnsureStorageExists() error { + slog.Info("Ensuring bucket exists", "bucket", b.s3cliConfig.BucketName) + _, err := b.s3Client.HeadBucket(context.TODO(), &s3.HeadBucketInput{ + Bucket: aws.String(b.s3cliConfig.BucketName), + }) + + if err == nil { + slog.Info("Bucket exists", "bucket", b.s3cliConfig.BucketName) + return nil + } + + var notFoundErr *types.NotFound + if !errors.As(err, ¬FoundErr) { + return fmt.Errorf("failed to check if bucket exists: %w", err) + } + + slog.Info("Bucket does not exist, creating it", "bucket", b.s3cliConfig.BucketName) + createBucketInput := &s3.CreateBucketInput{ + Bucket: aws.String(b.s3cliConfig.BucketName), + } + + // GCS raises an error if LocationConstraint is set + if !b.s3cliConfig.IsGoogle() { + createBucketInput.CreateBucketConfiguration = &types.CreateBucketConfiguration{ + LocationConstraint: types.BucketLocationConstraint(b.s3cliConfig.Region), + } + } + + _, err = b.s3Client.CreateBucket(context.TODO(), createBucketInput) + if err != nil { + var alreadyOwned *types.BucketAlreadyOwnedByYou + var alreadyExists *types.BucketAlreadyExists + if errors.As(err, &alreadyOwned) || errors.As(err, &alreadyExists) { + slog.Warn("Bucket got created by another process", "bucket", b.s3cliConfig.BucketName) + return nil + } + return fmt.Errorf("failed to create bucket: %w", err) + } + + slog.Info("Bucket created successfully", "bucket", b.s3cliConfig.BucketName) + return nil +} + +func (b *awsS3Client) Copy(srcBlob string, dstBlob string) error { + slog.Info("Copying object within s3 bucket", "bucket", b.s3cliConfig.BucketName, "source_blob", srcBlob, "destination_blob", dstBlob) + + copySource := fmt.Sprintf("%s/%s", b.s3cliConfig.BucketName, *b.key(srcBlob)) + + _, err := b.s3Client.CopyObject(context.TODO(), &s3.CopyObjectInput{ + Bucket: aws.String(b.s3cliConfig.BucketName), + CopySource: aws.String(copySource), + Key: b.key(dstBlob), + }) + if err != nil { + return fmt.Errorf("failed to copy object: %w", err) + } + + waiter := s3.NewObjectExistsWaiter(b.s3Client) + err = waiter.Wait(context.TODO(), &s3.HeadObjectInput{ + Bucket: aws.String(b.s3cliConfig.BucketName), + Key: b.key(dstBlob), + }, 15*time.Minute) + + if err != nil { + return fmt.Errorf("failed waiting for object to exist after copy: %w", err) + } + + return nil +} + +type BlobProperties struct { + ETag string `json:"etag,omitempty"` + LastModified time.Time `json:"last_modified,omitempty"` + ContentLength int64 `json:"content_length,omitempty"` +} + +func (b *awsS3Client) Properties(dest string) error { + slog.Info("fetching blob properties", "bucket", b.s3cliConfig.BucketName, "blob", dest) + + headObjectOutput, err := b.s3Client.HeadObject(context.TODO(), &s3.HeadObjectInput{ + Bucket: aws.String(b.s3cliConfig.BucketName), + Key: b.key(dest), + }) + if err != nil { + return fmt.Errorf("failed to fetch blob properties: %w", err) + } + + properties := BlobProperties{} + if headObjectOutput.ETag != nil { + properties.ETag = strings.Trim(*headObjectOutput.ETag, `"`) + } + if headObjectOutput.LastModified != nil { + properties.LastModified = *headObjectOutput.LastModified + } + if headObjectOutput.ContentLength != nil { + properties.ContentLength = *headObjectOutput.ContentLength + } + + output, err := json.MarshalIndent(properties, "", " ") + if err != nil { + return fmt.Errorf("failed to marshal blob properties: %w", err) + } + + fmt.Println(string(output)) + + return nil +} + +func (b *awsS3Client) List(prefix string) ([]string, error) { + input := &s3.ListObjectsV2Input{ + Bucket: aws.String(b.s3cliConfig.BucketName), + } + + if prefix != "" { + slog.Info("Listing all objects in bucket with prefix", "bucket", b.s3cliConfig.BucketName, "prefix", prefix) + input.Prefix = b.key(prefix) + } else { + slog.Info("Listing all objects in bucket", "bucket", b.s3cliConfig.BucketName) + } + + var names []string + objectPaginator := s3.NewListObjectsV2Paginator(b.s3Client, input) + for objectPaginator.HasMorePages() { + page, err := objectPaginator.NextPage(context.TODO()) + if err != nil { + return nil, fmt.Errorf("failed to list objects for deletion: %w", err) + } + + if len(page.Contents) == 0 { + continue + } + + for _, obj := range page.Contents { + names = append(names, *obj.Key) + } + } + + return names, nil +} + +func (b *awsS3Client) DeleteRecursive(prefix string) error { + input := &s3.ListObjectsV2Input{ + Bucket: aws.String(b.s3cliConfig.BucketName), + } + + if prefix != "" { + slog.Info("Deleting all objects in bucket with given prefix", "bucket", b.s3cliConfig.BucketName, "prefix", prefix) + input.Prefix = b.key(prefix) + } else { + slog.Info("Deleting all objects in bucket", "bucket", b.s3cliConfig.BucketName) + } + + objectPaginator := s3.NewListObjectsV2Paginator(b.s3Client, input) + for objectPaginator.HasMorePages() { + page, err := objectPaginator.NextPage(context.TODO()) + if err != nil { + return fmt.Errorf("failed to list objects for deletion: %w", err) + } + + for _, obj := range page.Contents { + slog.Debug("Deleting object", "key", *obj.Key) + _, err := b.s3Client.DeleteObject(context.TODO(), &s3.DeleteObjectInput{ + Bucket: aws.String(b.s3cliConfig.BucketName), + Key: obj.Key, + }) + if err != nil { + var apiErr smithy.APIError + if errors.As(err, &apiErr) && (apiErr.ErrorCode() == "NotFound" || apiErr.ErrorCode() == "NoSuchKey") { + continue // Object already deleted, which is fine + } + return fmt.Errorf("failed to delete object '%s': %w", *obj.Key, err) + } + } + } + return nil +} diff --git a/s3/client/client.go b/s3/client/client.go index f5853cc..40383b0 100644 --- a/s3/client/client.go +++ b/s3/client/client.go @@ -1,7 +1,6 @@ package client import ( - "errors" "os" "time" @@ -65,25 +64,24 @@ func (c *S3CompatibleClient) Sign(objectID string, action string, expiration tim } func (c *S3CompatibleClient) EnsureStorageExists() error { - return errors.New("not implemented") - + return c.awsS3BlobstoreClient.EnsureStorageExists() } func (c *S3CompatibleClient) Copy(srcBlob string, dstBlob string) error { - return errors.New("not implemented") + return c.awsS3BlobstoreClient.Copy(srcBlob, dstBlob) } func (c *S3CompatibleClient) Properties(dest string) error { - return errors.New("not implemented") + return c.awsS3BlobstoreClient.Properties(dest) } func (c *S3CompatibleClient) List(prefix string) ([]string, error) { - return nil, errors.New("not implemented") + return c.awsS3BlobstoreClient.List(prefix) } func (c *S3CompatibleClient) DeleteRecursive(prefix string) error { - return errors.New("not implemented") + return c.awsS3BlobstoreClient.DeleteRecursive(prefix) } diff --git a/s3/integration/assertions.go b/s3/integration/assertions.go index fb05003..4688ed7 100644 --- a/s3/integration/assertions.go +++ b/s3/integration/assertions.go @@ -2,6 +2,7 @@ package integration import ( "context" + "errors" "fmt" "io" "log" @@ -90,6 +91,39 @@ func AssertLifecycleWorks(s3CLIPath string, cfg *config.S3Cli) { Expect(err).ToNot(HaveOccurred()) Expect(string(gottenBytes)).To(Equal(expectedString)) + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "properties", s3Filename) + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + Expect(s3CLISession.Out.Contents()).To(ContainSubstring(fmt.Sprintf("\"content_length\": %d", len(expectedString)))) + Expect(s3CLISession.Out.Contents()).To(ContainSubstring("\"etag\":")) + Expect(s3CLISession.Out.Contents()).To(ContainSubstring("\"last_modified\":")) + + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "copy", s3Filename, s3Filename+"_copy") + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "exists", s3Filename+"_copy") + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + + tmpCopiedFile, err := os.CreateTemp("", "s3cli-download-copy") + Expect(err).ToNot(HaveOccurred()) + err = tmpCopiedFile.Close() + Expect(err).ToNot(HaveOccurred()) + defer os.Remove(tmpCopiedFile.Name()) //nolint:errcheck + + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "get", s3Filename+"_copy", tmpCopiedFile.Name()) + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + + copiedBytes, err := os.ReadFile(tmpCopiedFile.Name()) + Expect(err).ToNot(HaveOccurred()) + Expect(string(copiedBytes)).To(Equal(expectedString)) + + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "delete", s3Filename+"_copy") + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "delete", s3Filename) Expect(err).ToNot(HaveOccurred()) Expect(s3CLISession.ExitCode()).To(BeZero()) @@ -99,6 +133,100 @@ func AssertLifecycleWorks(s3CLIPath string, cfg *config.S3Cli) { Expect(s3CLISession.ExitCode()).To(Equal(3)) } +func AssertOnBulkOperations(s3CLIPath string, cfg *config.S3Cli) { + storageType := "s3" + numFiles := 5 + s3FilenamePrefix := GenerateRandomString() + localFile := MakeContentFile(GenerateRandomString()) + defer os.Remove(localFile) //nolint:errcheck + + configPath := MakeConfigFile(cfg) + defer os.Remove(configPath) //nolint:errcheck + + for i := 0; i < numFiles; i++ { + suffix := strings.Repeat("A", i) + s3Filename := fmt.Sprintf("%s%s", s3FilenamePrefix, suffix) + + s3CLISession, err := RunS3CLI(s3CLIPath, configPath, storageType, "put", localFile, s3Filename) + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + } + + s3CLISession, err := RunS3CLI(s3CLIPath, configPath, storageType, "list", s3FilenamePrefix) + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + output := strings.TrimSpace(string(s3CLISession.Out.Contents())) + Expect(strings.Split(output, "\n")).To(HaveLen(numFiles)) + + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "delete-recursive", fmt.Sprintf("%sAAA", s3FilenamePrefix)) + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "list", s3FilenamePrefix) + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + output = strings.TrimSpace(string(s3CLISession.Out.Contents())) + Expect(strings.Split(output, "\n")).To(HaveLen(3)) + + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "delete-recursive") + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "list", s3FilenamePrefix) + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + output = strings.TrimSpace(string(s3CLISession.Out.Contents())) + Expect(output).To(BeEmpty()) +} + +func AssertOnStorageExists(s3CLIPath string, cfg *config.S3Cli) { + cfgCopy := *cfg + cfgCopy.BucketName = fmt.Sprintf("%s-%s", cfg.BucketName, strings.ToLower(GenerateRandomString(4))) + + configPath := MakeConfigFile(&cfgCopy) + defer os.Remove(configPath) //nolint:errcheck + + // Create a single verification/cleanup client from the config file. + // This ensures it has the exact same settings as the CLI will use. + verificationCfgFile, err := os.Open(configPath) + Expect(err).ToNot(HaveOccurred()) + defer verificationCfgFile.Close() //nolint:errcheck + + verificationCfg, err := config.NewFromReader(verificationCfgFile) + Expect(err).ToNot(HaveOccurred()) + + verificationClient, err := client.NewAwsS3Client(&verificationCfg) + Expect(err).ToNot(HaveOccurred()) + + // Defer the cleanup to ensure the bucket is deleted after the test. + defer func() { + _, err := verificationClient.DeleteBucket(context.TODO(), &s3.DeleteBucketInput{ + Bucket: aws.String(cfgCopy.BucketName), + }) + // A NoSuchBucket error is acceptable if the bucket was never created or already cleaned up. + var noSuchBucketErr *types.NoSuchBucket + if err != nil && !errors.As(err, &noSuchBucketErr) { + Expect(err).ToNot(HaveOccurred(), "Failed to clean up test bucket") + } + }() + + // --- Scenario 1: Bucket does not exist, should be created --- + s3CLISession, err := RunS3CLI(s3CLIPath, configPath, "s3", "ensure-storage-exists") + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + + // Verify the bucket now exists using the client created earlier. + _, headBucketErr := verificationClient.HeadBucket(context.TODO(), &s3.HeadBucketInput{ + Bucket: aws.String(cfgCopy.BucketName), + }) + Expect(headBucketErr).ToNot(HaveOccurred(), "Bucket should have been created by 'ensure-storage-exists'") + + // --- Scenario 2: Bucket already exists, command should still succeed (idempotency) --- + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, "s3", "ensure-storage-exists") + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) +} + func AssertOnPutFailures(cfg *config.S3Cli, content, errorMessage string) { s3Filename := GenerateRandomString() sourceFile := MakeContentFile(content) diff --git a/s3/integration/aws_iam_role_test.go b/s3/integration/aws_iam_role_test.go index e11251c..9e711ce 100644 --- a/s3/integration/aws_iam_role_test.go +++ b/s3/integration/aws_iam_role_test.go @@ -46,6 +46,14 @@ var _ = Describe("Testing inside an AWS compute resource with an IAM role", func func(cfg *config.S3Cli) { integration.AssertLifecycleWorks(s3CLIPath, cfg) }, configurations, ) + DescribeTable("Invoking `ensure-storage-exists` works", + func(cfg *config.S3Cli) { integration.AssertOnStorageExists(s3CLIPath, cfg) }, + configurations, + ) + DescribeTable("Blobstore bulk operations work", + func(cfg *config.S3Cli) { integration.AssertOnBulkOperations(s3CLIPath, cfg) }, + configurations, + ) DescribeTable("Invoking `s3cli get` on a non-existent-key fails", func(cfg *config.S3Cli) { integration.AssertGetNonexistentFails(s3CLIPath, cfg) }, configurations, diff --git a/s3/integration/aws_us_east_test.go b/s3/integration/aws_us_east_test.go index db81450..dc27777 100644 --- a/s3/integration/aws_us_east_test.go +++ b/s3/integration/aws_us_east_test.go @@ -46,6 +46,14 @@ var _ = Describe("Testing only in us-east-1", func() { func(cfg *config.S3Cli) { integration.AssertLifecycleWorks(s3CLIPath, cfg) }, configurations, ) + DescribeTable("Invoking `ensure-storage-exists` works", + func(cfg *config.S3Cli) { integration.AssertOnStorageExists(s3CLIPath, cfg) }, + configurations, + ) + DescribeTable("Blobstore bulk operations work", + func(cfg *config.S3Cli) { integration.AssertOnBulkOperations(s3CLIPath, cfg) }, + configurations, + ) DescribeTable("Invoking `s3cli get` on a non-existent-key fails", func(cfg *config.S3Cli) { integration.AssertGetNonexistentFails(s3CLIPath, cfg) }, configurations, diff --git a/s3/integration/general_aws_test.go b/s3/integration/general_aws_test.go index f114375..954f5f2 100644 --- a/s3/integration/general_aws_test.go +++ b/s3/integration/general_aws_test.go @@ -58,6 +58,15 @@ var _ = Describe("General testing for all AWS regions", func() { func(cfg *config.S3Cli) { integration.AssertLifecycleWorks(s3CLIPath, cfg) }, configurations, ) + + DescribeTable("Invoking `ensure-storage-exists` works", + func(cfg *config.S3Cli) { integration.AssertOnStorageExists(s3CLIPath, cfg) }, + configurations, + ) + DescribeTable("Blobstore bulk operations work", + func(cfg *config.S3Cli) { integration.AssertOnBulkOperations(s3CLIPath, cfg) }, + configurations, + ) DescribeTable("Invoking `s3cli get` on a non-existent-key fails", func(cfg *config.S3Cli) { integration.AssertGetNonexistentFails(s3CLIPath, cfg) }, configurations, diff --git a/s3/integration/s3_compatible_test.go b/s3/integration/s3_compatible_test.go index 965b41c..d63c64a 100644 --- a/s3/integration/s3_compatible_test.go +++ b/s3/integration/s3_compatible_test.go @@ -65,6 +65,14 @@ var _ = Describe("Testing in any non-AWS, S3 compatible storage service", func() func(cfg *config.S3Cli) { integration.AssertLifecycleWorks(s3CLIPath, cfg) }, configurations, ) + DescribeTable("Invoking `ensure-storage-exists` works", + func(cfg *config.S3Cli) { integration.AssertOnStorageExists(s3CLIPath, cfg) }, + configurations, + ) + DescribeTable("Blobstore bulk operations work", + func(cfg *config.S3Cli) { integration.AssertOnBulkOperations(s3CLIPath, cfg) }, + configurations, + ) DescribeTable("Invoking `s3cli get` on a non-existent-key fails", func(cfg *config.S3Cli) { integration.AssertGetNonexistentFails(s3CLIPath, cfg) }, configurations, From 3efd48727a24569a7ce3d71e252cb5e06a6a10b4 Mon Sep 17 00:00:00 2001 From: johha Date: Wed, 4 Feb 2026 11:52:04 +0100 Subject: [PATCH 2/5] Remove location constraint for AWS us-east-1 --- s3/client/aws_s3_blobstore.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/s3/client/aws_s3_blobstore.go b/s3/client/aws_s3_blobstore.go index 95446ae..fc2c4f2 100644 --- a/s3/client/aws_s3_blobstore.go +++ b/s3/client/aws_s3_blobstore.go @@ -251,8 +251,8 @@ func (b *awsS3Client) EnsureStorageExists() error { Bucket: aws.String(b.s3cliConfig.BucketName), } - // GCS raises an error if LocationConstraint is set - if !b.s3cliConfig.IsGoogle() { + // For GCS and AWS region 'us-east-1', LocationConstraint must be omitted + if !b.s3cliConfig.IsGoogle() && b.s3cliConfig.Region != "us-east-1" { createBucketInput.CreateBucketConfiguration = &types.CreateBucketConfiguration{ LocationConstraint: types.BucketLocationConstraint(b.s3cliConfig.Region), } From 606fddc1222efa150833dd14b5d114ea418b5b3c Mon Sep 17 00:00:00 2001 From: johha Date: Tue, 10 Feb 2026 09:43:57 +0100 Subject: [PATCH 3/5] Print empty properties json when blob does not exist Also adjust IAM S3 permissions to allow bucket creation. --- .../cloudformation-s3cli-iam.template.json | 34 +++++++++++++++++++ s3/client/aws_s3_blobstore.go | 6 ++++ s3/integration/assertions.go | 6 ++++ 3 files changed, 46 insertions(+) diff --git a/.github/scripts/s3/assets/cloudformation-s3cli-iam.template.json b/.github/scripts/s3/assets/cloudformation-s3cli-iam.template.json index e836b65..89fb729 100644 --- a/.github/scripts/s3/assets/cloudformation-s3cli-iam.template.json +++ b/.github/scripts/s3/assets/cloudformation-s3cli-iam.template.json @@ -40,6 +40,20 @@ "Effect": "Allow", "Resource": "arn:aws:logs:*:*:*" }, + { + "Action": ["s3:CreateBucket", "s3:HeadBucket", "s3:DeleteBucket"], + "Effect": "Allow", + "Resource": { + "Fn::Join": [ + "", + [ + "arn:aws:s3:::", + { "Ref": "S3Bucket" }, + "*" + ] + ] + } + }, { "Action": [ "s3:GetObject*", @@ -67,6 +81,26 @@ "/*" ] ] + }, + { + "Fn::Join": [ + "", + [ + "arn:aws:s3:::", + { "Ref": "S3Bucket" }, + "*" + ] + ] + }, + { + "Fn::Join": [ + "", + [ + "arn:aws:s3:::", + { "Ref": "S3Bucket" }, + "*/*" + ] + ] } ] } diff --git a/s3/client/aws_s3_blobstore.go b/s3/client/aws_s3_blobstore.go index fc2c4f2..4842181 100644 --- a/s3/client/aws_s3_blobstore.go +++ b/s3/client/aws_s3_blobstore.go @@ -313,7 +313,13 @@ func (b *awsS3Client) Properties(dest string) error { Bucket: aws.String(b.s3cliConfig.BucketName), Key: b.key(dest), }) + if err != nil { + var apiErr smithy.APIError + if errors.As(err, &apiErr) && apiErr.ErrorCode() == "NotFound" { + fmt.Println(`{}`) + return nil + } return fmt.Errorf("failed to fetch blob properties: %w", err) } diff --git a/s3/integration/assertions.go b/s3/integration/assertions.go index 4688ed7..44d8c9d 100644 --- a/s3/integration/assertions.go +++ b/s3/integration/assertions.go @@ -131,6 +131,12 @@ func AssertLifecycleWorks(s3CLIPath string, cfg *config.S3Cli) { s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "exists", s3Filename) Expect(err).ToNot(HaveOccurred()) Expect(s3CLISession.ExitCode()).To(Equal(3)) + + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "properties", s3Filename) + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + Expect(s3CLISession.Out.Contents()).To(ContainSubstring("{}")) + } func AssertOnBulkOperations(s3CLIPath string, cfg *config.S3Cli) { From 95402eefca849774cb03208466568ca555c2b5d5 Mon Sep 17 00:00:00 2001 From: johha Date: Tue, 10 Feb 2026 13:34:20 +0100 Subject: [PATCH 4/5] fix error message --- s3/client/aws_s3_blobstore.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/s3/client/aws_s3_blobstore.go b/s3/client/aws_s3_blobstore.go index 4842181..54b22f7 100644 --- a/s3/client/aws_s3_blobstore.go +++ b/s3/client/aws_s3_blobstore.go @@ -361,7 +361,7 @@ func (b *awsS3Client) List(prefix string) ([]string, error) { for objectPaginator.HasMorePages() { page, err := objectPaginator.NextPage(context.TODO()) if err != nil { - return nil, fmt.Errorf("failed to list objects for deletion: %w", err) + return nil, fmt.Errorf("failed to list objects: %w", err) } if len(page.Contents) == 0 { From d19fc9cd5bebc9b17326a3bff5ab280666f6339f Mon Sep 17 00:00:00 2001 From: johha Date: Wed, 11 Feb 2026 14:53:09 +0100 Subject: [PATCH 5/5] wip: implement multipart copy --- .github/scripts/s3/run-integration-aws-iam.sh | 7 +- s3/README.md | 36 +++-- s3/client/aws_s3_blobstore.go | 152 ++++++++++++++++-- s3/config/config.go | 31 +++- s3/config/config_test.go | 87 ++++++++++ s3/integration/assertions.go | 69 ++++++++ s3/integration/aws_iam_role_test.go | 4 + s3/integration/aws_us_east_test.go | 4 + s3/integration/general_aws_test.go | 4 + s3/integration/s3_compatible_test.go | 4 + 10 files changed, 363 insertions(+), 35 deletions(-) diff --git a/.github/scripts/s3/run-integration-aws-iam.sh b/.github/scripts/s3/run-integration-aws-iam.sh index 498acb9..471e7b9 100755 --- a/.github/scripts/s3/run-integration-aws-iam.sh +++ b/.github/scripts/s3/run-integration-aws-iam.sh @@ -49,7 +49,8 @@ pushd "${repo_root}" > /dev/null --function-name "${lambda_function_name}" \ --zip-file fileb://payload.zip \ --role "${iam_role_arn}" \ - --timeout 300 \ + --timeout 600 \ + --memory-size 512 \ --handler lambda_function.test_runner_handler \ --runtime python3.9 @@ -58,8 +59,8 @@ pushd "${repo_root}" > /dev/null tries=0 get_function_status_command="aws lambda get-function --region ${region_name} --function-name ${lambda_function_name}" function_status=$(${get_function_status_command}) - while [[ ( $(echo "${function_status}" | jq -r ".Configuration.State") != "Active" ) && ( $tries -ne 5 ) ]] ; do - sleep 2 + while [[ ( $(echo "${function_status}" | jq -r ".Configuration.State") != "Active" ) && ( $tries -ne 15 ) ]] ; do + sleep 3 echo "Checking for function readiness; attempt: $tries" tries=$((tries + 1)) function_status=$(${get_function_status_command}) diff --git a/s3/README.md b/s3/README.md index f3a30c4..9fdfa65 100644 --- a/s3/README.md +++ b/s3/README.md @@ -12,23 +12,25 @@ The S3 client requires a JSON configuration file with the following structure: ``` json { - "bucket_name": " (required)", - "credentials_source": " [static|env_or_profile|none]", - "access_key_id": " (required if credentials_source = 'static')", - "secret_access_key": " (required if credentials_source = 'static')", - "region": " (optional - default: 'us-east-1')", - "host": " (optional)", - "port": (optional), - "ssl_verify_peer": (optional - default: true), - "use_ssl": (optional - default: true), - "signature_version": " (optional)", - "server_side_encryption": " (optional)", - "sse_kms_key_id": " (optional)", - "multipart_upload": (optional - default: true), - "download_concurrency": (optional - default: 5), - "download_part_size": (optional - default: 5242880), # 5 MB - "upload_concurrency": (optional - default: 5), - "upload_part_size": (optional - default: 5242880) # 5 MB + "bucket_name": " (required)", + "credentials_source": " [static|env_or_profile|none]", + "access_key_id": " (required if credentials_source = 'static')", + "secret_access_key": " (required if credentials_source = 'static')", + "region": " (optional - default: 'us-east-1')", + "host": " (optional)", + "port": (optional), + "ssl_verify_peer": (optional - default: true), + "use_ssl": (optional - default: true), + "signature_version": " (optional)", + "server_side_encryption": " (optional)", + "sse_kms_key_id": " (optional)", + "multipart_upload": (optional - default: true), + "download_concurrency": (optional - default: 5), + "download_part_size": (optional - default: 5242880), # 5 MB + "upload_concurrency": (optional - default: 5), + "upload_part_size": (optional - default: 5242880) # 5 MB + "multipart_copy_threshold": (optional - default: 5368709120) # default 5 GB + "multipart_copy_part_size": (optional - default: 104857600) # default 100 MB - must be at least 5 MB } ``` > Note: **multipart_upload** is not supported by Google - it's automatically set to false by parsing the provided 'host' diff --git a/s3/client/aws_s3_blobstore.go b/s3/client/aws_s3_blobstore.go index 54b22f7..06d61c2 100644 --- a/s3/client/aws_s3_blobstore.go +++ b/s3/client/aws_s3_blobstore.go @@ -29,6 +29,10 @@ var oneTB = int64(1000 * 1024 * 1024 * 1024) const ( defaultTransferConcurrency = 5 defaultTransferPartSize = int64(5 * 1024 * 1024) // 5 MB + // For copy operations: use multipart copy only when necessary (>5GB) + // AWS CopyObject limit is 5GB, use 100MB parts for multipart copy + defaultMultipartCopyThreshold = int64(5 * 1024 * 1024 * 1024) // 5 GB + defaultMultipartCopyPartSize = int64(100 * 1024 * 1024) // 100 MB ) // awsS3Client encapsulates AWS S3 blobstore interactions @@ -274,29 +278,157 @@ func (b *awsS3Client) EnsureStorageExists() error { } func (b *awsS3Client) Copy(srcBlob string, dstBlob string) error { - slog.Info("Copying object within s3 bucket", "bucket", b.s3cliConfig.BucketName, "source_blob", srcBlob, "destination_blob", dstBlob) + cfg := b.s3cliConfig - copySource := fmt.Sprintf("%s/%s", b.s3cliConfig.BucketName, *b.key(srcBlob)) + copyThreshold := defaultMultipartCopyThreshold + if cfg.MultipartCopyThreshold > 0 { + copyThreshold = cfg.MultipartCopyThreshold + } + copyPartSize := defaultMultipartCopyPartSize + if cfg.MultipartCopyPartSize > 0 { + copyPartSize = cfg.MultipartCopyPartSize + } - _, err := b.s3Client.CopyObject(context.TODO(), &s3.CopyObjectInput{ - Bucket: aws.String(b.s3cliConfig.BucketName), + headOutput, err := b.s3Client.HeadObject(context.TODO(), &s3.HeadObjectInput{ + Bucket: aws.String(cfg.BucketName), + Key: b.key(srcBlob), + }) + if err != nil { + return fmt.Errorf("failed to get object metadata: %w", err) + } + if headOutput.ContentLength == nil { + return errors.New("object content length is nil") + } + + objectSize := *headOutput.ContentLength + copySource := fmt.Sprintf("%s/%s", cfg.BucketName, *b.key(srcBlob)) + + // Use simple copy if file is below threshold + if objectSize < copyThreshold { + slog.Info("Copying object", "source", srcBlob, "destination", dstBlob, "size", objectSize) + return b.simpleCopy(copySource, dstBlob) + } + + // For large files, try multipart copy first (works for AWS, MinIO, Ceph, AliCloud) + // Fall back to simple copy if provider doesn't support UploadPartCopy (e.g., GCS) + slog.Info("Copying large object using multipart copy", "source", srcBlob, "destination", dstBlob, "size", objectSize) + + err = b.multipartCopy(copySource, dstBlob, objectSize, copyPartSize) + if err != nil { + var apiErr smithy.APIError + if errors.As(err, &apiErr) && apiErr.ErrorCode() == "NotImplemented" { + slog.Info("Multipart copy not supported by provider, falling back to simple copy", "source", srcBlob, "destination", dstBlob) + return b.simpleCopy(copySource, dstBlob) + } + return err + } + + return nil +} + +// simpleCopy performs a single CopyObject request +func (b *awsS3Client) simpleCopy(copySource string, dstBlob string) error { + cfg := b.s3cliConfig + + copyInput := &s3.CopyObjectInput{ + Bucket: aws.String(cfg.BucketName), CopySource: aws.String(copySource), Key: b.key(dstBlob), - }) + } + if cfg.ServerSideEncryption != "" { + copyInput.ServerSideEncryption = types.ServerSideEncryption(cfg.ServerSideEncryption) + } + if cfg.SSEKMSKeyID != "" { + copyInput.SSEKMSKeyId = aws.String(cfg.SSEKMSKeyID) + } + + _, err := b.s3Client.CopyObject(context.TODO(), copyInput) if err != nil { return fmt.Errorf("failed to copy object: %w", err) } + return nil +} - waiter := s3.NewObjectExistsWaiter(b.s3Client) - err = waiter.Wait(context.TODO(), &s3.HeadObjectInput{ - Bucket: aws.String(b.s3cliConfig.BucketName), +// multipartCopy performs a multipart copy using CreateMultipartUpload, UploadPartCopy, and CompleteMultipartUpload +func (b *awsS3Client) multipartCopy(copySource string, dstBlob string, objectSize int64, copyPartSize int64) error { + cfg := b.s3cliConfig + numParts := int((objectSize + copyPartSize - 1) / copyPartSize) + + createInput := &s3.CreateMultipartUploadInput{ + Bucket: aws.String(cfg.BucketName), Key: b.key(dstBlob), - }, 15*time.Minute) + } + if cfg.ServerSideEncryption != "" { + createInput.ServerSideEncryption = types.ServerSideEncryption(cfg.ServerSideEncryption) + } + if cfg.SSEKMSKeyID != "" { + createInput.SSEKMSKeyId = aws.String(cfg.SSEKMSKeyID) + } + + createOutput, err := b.s3Client.CreateMultipartUpload(context.TODO(), createInput) + if err != nil { + return fmt.Errorf("failed to create multipart upload: %w", err) + } + + uploadID := *createOutput.UploadId + + var completed bool + defer func() { + if !completed { + _, err := b.s3Client.AbortMultipartUpload(context.TODO(), &s3.AbortMultipartUploadInput{ + Bucket: aws.String(cfg.BucketName), + Key: b.key(dstBlob), + UploadId: aws.String(uploadID), + }) + if err != nil { + slog.Warn("Failed to abort multipart upload", "uploadId", uploadID, "error", err) + } + } + }() + + completedParts := make([]types.CompletedPart, 0, numParts) + for i := 0; i < numParts; i++ { + partNumber := int32(i + 1) + start := int64(i) * copyPartSize + end := start + copyPartSize - 1 + if end >= objectSize { + end = objectSize - 1 + } + byteRange := fmt.Sprintf("bytes=%d-%d", start, end) + + output, err := b.s3Client.UploadPartCopy(context.TODO(), &s3.UploadPartCopyInput{ + Bucket: aws.String(cfg.BucketName), + CopySource: aws.String(copySource), + CopySourceRange: aws.String(byteRange), + Key: b.key(dstBlob), + PartNumber: aws.Int32(partNumber), + UploadId: aws.String(uploadID), + }) + if err != nil { + return fmt.Errorf("failed to copy part %d: %w", partNumber, err) + } + completedParts = append(completedParts, types.CompletedPart{ + ETag: output.CopyPartResult.ETag, + PartNumber: aws.Int32(partNumber), + }) + slog.Debug("Copied part", "part", partNumber, "range", byteRange) + } + + _, err = b.s3Client.CompleteMultipartUpload(context.TODO(), &s3.CompleteMultipartUploadInput{ + Bucket: aws.String(cfg.BucketName), + Key: b.key(dstBlob), + UploadId: aws.String(uploadID), + MultipartUpload: &types.CompletedMultipartUpload{ + Parts: completedParts, + }, + }) if err != nil { - return fmt.Errorf("failed waiting for object to exist after copy: %w", err) + return fmt.Errorf("failed to complete multipart upload: %w", err) } + completed = true + slog.Debug("Multipart copy completed successfully", "parts", numParts) return nil } diff --git a/s3/config/config.go b/s3/config/config.go index 2241e02..b174f4d 100644 --- a/s3/config/config.go +++ b/s3/config/config.go @@ -33,11 +33,19 @@ type S3Cli struct { // Optional knobs to tune transfer performance. // If zero, the client will apply sensible defaults (handled by the S3 client layer). // Part size values are provided in bytes. - DownloadConcurrency int `json:"download_concurrency"` - DownloadPartSize int64 `json:"download_part_size"` - UploadConcurrency int `json:"upload_concurrency"` - UploadPartSize int64 `json:"upload_part_size"` -} + DownloadConcurrency int `json:"download_concurrency"` + DownloadPartSize int64 `json:"download_part_size"` + UploadConcurrency int `json:"upload_concurrency"` + UploadPartSize int64 `json:"upload_part_size"` + MultipartCopyThreshold int64 `json:"multipart_copy_threshold"` // Default: 5GB - files larger than this use multipart copy + MultipartCopyPartSize int64 `json:"multipart_copy_part_size"` // Default: 100MB - size of each part in multipart copy +} + +const ( + // multipartCopyMinPartSize is the AWS minimum part size for multipart operations. + // Other providers may have different limits - users should consult their provider's documentation. + multipartCopyMinPartSize = 5 * 1024 * 1024 // 5MB - AWS minimum part size +) const defaultAWSRegion = "us-east-1" //nolint:unused @@ -98,6 +106,19 @@ func NewFromReader(reader io.Reader) (S3Cli, error) { return S3Cli{}, errors.New("download/upload concurrency and part sizes must be non-negative") } + // Validate multipart copy settings (0 means "use defaults") + // Note: Default threshold is 5GB (AWS limit), but users can configure higher values for providers + // that support larger simple copies (e.g., GCS has no limit). Users should consult their provider's documentation. + if c.MultipartCopyThreshold < 0 { + return S3Cli{}, errors.New("multipart_copy_threshold must be non-negative (0 means use default)") + } + if c.MultipartCopyPartSize < 0 { + return S3Cli{}, errors.New("multipart_copy_part_size must be non-negative (0 means use default)") + } + if c.MultipartCopyPartSize > 0 && c.MultipartCopyPartSize < multipartCopyMinPartSize { + return S3Cli{}, fmt.Errorf("multipart_copy_part_size must be at least %d bytes (5MB - AWS minimum)", multipartCopyMinPartSize) + } + switch c.CredentialsSource { case StaticCredentialsSource: if c.AccessKeyID == "" || c.SecretAccessKey == "" { diff --git a/s3/config/config_test.go b/s3/config/config_test.go index 75b2c8c..70eca8c 100644 --- a/s3/config/config_test.go +++ b/s3/config/config_test.go @@ -327,6 +327,93 @@ var _ = Describe("BlobstoreClient configuration", func() { _, err = config.NewFromReader(dummyJSONReader) Expect(err).To(MatchError("download/upload concurrency and part sizes must be non-negative")) }) + + Describe("multipart copy tuning fields", func() { + It("rejects negative multipart copy threshold", func() { + dummyJSONBytes := []byte(`{ + "access_key_id":"id", + "secret_access_key":"key", + "bucket_name":"some-bucket", + "multipart_copy_threshold": -1 + }`) + dummyJSONReader := bytes.NewReader(dummyJSONBytes) + + _, err := config.NewFromReader(dummyJSONReader) + Expect(err).To(MatchError("multipart_copy_threshold must be non-negative (0 means use default)")) + }) + + It("rejects negative multipart copy part size", func() { + dummyJSONBytes := []byte(`{ + "access_key_id":"id", + "secret_access_key":"key", + "bucket_name":"some-bucket", + "multipart_copy_part_size": -1 + }`) + dummyJSONReader := bytes.NewReader(dummyJSONBytes) + + _, err := config.NewFromReader(dummyJSONReader) + Expect(err).To(MatchError("multipart_copy_part_size must be non-negative (0 means use default)")) + }) + + It("rejects multipart copy part size below AWS minimum", func() { + dummyJSONBytes := []byte(`{ + "access_key_id":"id", + "secret_access_key":"key", + "bucket_name":"some-bucket", + "multipart_copy_part_size": 1048576 + }`) + dummyJSONReader := bytes.NewReader(dummyJSONBytes) + + _, err := config.NewFromReader(dummyJSONReader) + Expect(err).To(MatchError("multipart_copy_part_size must be at least 5242880 bytes (5MB - AWS minimum)")) + }) + + It("accepts zero values (use defaults)", func() { + dummyJSONBytes := []byte(`{ + "access_key_id":"id", + "secret_access_key":"key", + "bucket_name":"some-bucket", + "multipart_copy_threshold": 0, + "multipart_copy_part_size": 0 + }`) + dummyJSONReader := bytes.NewReader(dummyJSONBytes) + + c, err := config.NewFromReader(dummyJSONReader) + Expect(err).ToNot(HaveOccurred()) + Expect(c.MultipartCopyThreshold).To(Equal(int64(0))) + Expect(c.MultipartCopyPartSize).To(Equal(int64(0))) + }) + + It("accepts valid custom values", func() { + dummyJSONBytes := []byte(`{ + "access_key_id":"id", + "secret_access_key":"key", + "bucket_name":"some-bucket", + "multipart_copy_threshold": 1073741824, + "multipart_copy_part_size": 104857600 + }`) + dummyJSONReader := bytes.NewReader(dummyJSONBytes) + + c, err := config.NewFromReader(dummyJSONReader) + Expect(err).ToNot(HaveOccurred()) + Expect(c.MultipartCopyThreshold).To(Equal(int64(1073741824))) // 1GB + Expect(c.MultipartCopyPartSize).To(Equal(int64(104857600))) // 100MB + }) + + It("accepts threshold above AWS limit for providers with higher limits", func() { + dummyJSONBytes := []byte(`{ + "access_key_id":"id", + "secret_access_key":"key", + "bucket_name":"some-bucket", + "multipart_copy_threshold": 10737418240 + }`) + dummyJSONReader := bytes.NewReader(dummyJSONBytes) + + c, err := config.NewFromReader(dummyJSONReader) + Expect(err).ToNot(HaveOccurred()) + Expect(c.MultipartCopyThreshold).To(Equal(int64(10737418240))) // 10GB + }) + }) }) Describe("returning the S3 endpoint", func() { diff --git a/s3/integration/assertions.go b/s3/integration/assertions.go index 44d8c9d..4fe8feb 100644 --- a/s3/integration/assertions.go +++ b/s3/integration/assertions.go @@ -139,6 +139,75 @@ func AssertLifecycleWorks(s3CLIPath string, cfg *config.S3Cli) { } +// AssertMultipartCopyWorks tests multipart copy functionality by setting a low threshold +// This forces a small file to be copied using multipart copy (2 parts) +func AssertMultipartCopyWorks(s3CLIPath string, cfg *config.S3Cli) { + storageType := "s3" + s3Filename := GenerateRandomString() + + // Create a 15MB file content (will result in 2-3 parts with 5MB minimum part size) + // We use 15MB to ensure we get at least 2 parts when threshold is set to 10MB + contentSize := 15 * 1024 * 1024 // 15 MB + expectedContent := GenerateRandomString(contentSize) + + // Configure low multipart copy threshold to force multipart copy + // Threshold: 10MB - files larger than this use multipart copy + // Part size: 5MB (AWS minimum) - so our 15MB file will be split into 3 parts + cfg.MultipartCopyThreshold = 10 * 1024 * 1024 // 10 MB + cfg.MultipartCopyPartSize = 5 * 1024 * 1024 // 5 MB (AWS minimum) + + configPath := MakeConfigFile(cfg) + defer os.Remove(configPath) //nolint:errcheck + + contentFile := MakeContentFile(expectedContent) + defer os.Remove(contentFile) //nolint:errcheck + + // Upload the test file + s3CLISession, err := RunS3CLI(s3CLIPath, configPath, storageType, "put", contentFile, s3Filename) + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + + // Copy the file - this should trigger multipart copy since file size (15MB) > threshold (10MB) + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "copy", s3Filename, s3Filename+"_multipart_copy") + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + + // Verify the copied file exists + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "exists", s3Filename+"_multipart_copy") + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + + // Download and verify content matches + tmpCopiedFile, err := os.CreateTemp("", "s3cli-download-multipart-copy") + Expect(err).ToNot(HaveOccurred()) + err = tmpCopiedFile.Close() + Expect(err).ToNot(HaveOccurred()) + defer os.Remove(tmpCopiedFile.Name()) //nolint:errcheck + + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "get", s3Filename+"_multipart_copy", tmpCopiedFile.Name()) + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + + copiedBytes, err := os.ReadFile(tmpCopiedFile.Name()) + Expect(err).ToNot(HaveOccurred()) + Expect(string(copiedBytes)).To(Equal(expectedContent)) + + // Verify file size matches + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "properties", s3Filename+"_multipart_copy") + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + Expect(s3CLISession.Out.Contents()).To(ContainSubstring(fmt.Sprintf("\"content_length\": %d", contentSize))) + + // Clean up + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "delete", s3Filename+"_multipart_copy") + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "delete", s3Filename) + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) +} + func AssertOnBulkOperations(s3CLIPath string, cfg *config.S3Cli) { storageType := "s3" numFiles := 5 diff --git a/s3/integration/aws_iam_role_test.go b/s3/integration/aws_iam_role_test.go index 9e711ce..a4f22ad 100644 --- a/s3/integration/aws_iam_role_test.go +++ b/s3/integration/aws_iam_role_test.go @@ -62,5 +62,9 @@ var _ = Describe("Testing inside an AWS compute resource with an IAM role", func func(cfg *config.S3Cli) { integration.AssertDeleteNonexistentWorks(s3CLIPath, cfg) }, configurations, ) + DescribeTable("Multipart copy works with low threshold", + func(cfg *config.S3Cli) { integration.AssertMultipartCopyWorks(s3CLIPath, cfg) }, + configurations, + ) }) }) diff --git a/s3/integration/aws_us_east_test.go b/s3/integration/aws_us_east_test.go index dc27777..cd50570 100644 --- a/s3/integration/aws_us_east_test.go +++ b/s3/integration/aws_us_east_test.go @@ -62,5 +62,9 @@ var _ = Describe("Testing only in us-east-1", func() { func(cfg *config.S3Cli) { integration.AssertDeleteNonexistentWorks(s3CLIPath, cfg) }, configurations, ) + DescribeTable("Multipart copy works with low threshold", + func(cfg *config.S3Cli) { integration.AssertMultipartCopyWorks(s3CLIPath, cfg) }, + configurations, + ) }) }) diff --git a/s3/integration/general_aws_test.go b/s3/integration/general_aws_test.go index 954f5f2..2871901 100644 --- a/s3/integration/general_aws_test.go +++ b/s3/integration/general_aws_test.go @@ -79,6 +79,10 @@ var _ = Describe("General testing for all AWS regions", func() { func(cfg *config.S3Cli) { integration.AssertOnSignedURLs(s3CLIPath, cfg) }, configurations, ) + DescribeTable("Multipart copy works with low threshold", + func(cfg *config.S3Cli) { integration.AssertMultipartCopyWorks(s3CLIPath, cfg) }, + configurations, + ) configurations = []TableEntry{ Entry("with encryption", &config.S3Cli{ diff --git a/s3/integration/s3_compatible_test.go b/s3/integration/s3_compatible_test.go index d63c64a..698ed74 100644 --- a/s3/integration/s3_compatible_test.go +++ b/s3/integration/s3_compatible_test.go @@ -89,5 +89,9 @@ var _ = Describe("Testing in any non-AWS, S3 compatible storage service", func() func(cfg *config.S3Cli) { integration.AssertOnSignedURLs(s3CLIPath, cfg) }, configurations, ) + DescribeTable("Multipart copy works with low threshold", + func(cfg *config.S3Cli) { integration.AssertMultipartCopyWorks(s3CLIPath, cfg) }, + configurations, + ) }) })