diff --git a/.github/scripts/s3/assets/cloudformation-s3cli-iam.template.json b/.github/scripts/s3/assets/cloudformation-s3cli-iam.template.json index e836b65..89fb729 100644 --- a/.github/scripts/s3/assets/cloudformation-s3cli-iam.template.json +++ b/.github/scripts/s3/assets/cloudformation-s3cli-iam.template.json @@ -40,6 +40,20 @@ "Effect": "Allow", "Resource": "arn:aws:logs:*:*:*" }, + { + "Action": ["s3:CreateBucket", "s3:HeadBucket", "s3:DeleteBucket"], + "Effect": "Allow", + "Resource": { + "Fn::Join": [ + "", + [ + "arn:aws:s3:::", + { "Ref": "S3Bucket" }, + "*" + ] + ] + } + }, { "Action": [ "s3:GetObject*", @@ -67,6 +81,26 @@ "/*" ] ] + }, + { + "Fn::Join": [ + "", + [ + "arn:aws:s3:::", + { "Ref": "S3Bucket" }, + "*" + ] + ] + }, + { + "Fn::Join": [ + "", + [ + "arn:aws:s3:::", + { "Ref": "S3Bucket" }, + "*/*" + ] + ] } ] } diff --git a/.github/scripts/s3/run-integration-aws-iam.sh b/.github/scripts/s3/run-integration-aws-iam.sh index 498acb9..471e7b9 100755 --- a/.github/scripts/s3/run-integration-aws-iam.sh +++ b/.github/scripts/s3/run-integration-aws-iam.sh @@ -49,7 +49,8 @@ pushd "${repo_root}" > /dev/null --function-name "${lambda_function_name}" \ --zip-file fileb://payload.zip \ --role "${iam_role_arn}" \ - --timeout 300 \ + --timeout 600 \ + --memory-size 512 \ --handler lambda_function.test_runner_handler \ --runtime python3.9 @@ -58,8 +59,8 @@ pushd "${repo_root}" > /dev/null tries=0 get_function_status_command="aws lambda get-function --region ${region_name} --function-name ${lambda_function_name}" function_status=$(${get_function_status_command}) - while [[ ( $(echo "${function_status}" | jq -r ".Configuration.State") != "Active" ) && ( $tries -ne 5 ) ]] ; do - sleep 2 + while [[ ( $(echo "${function_status}" | jq -r ".Configuration.State") != "Active" ) && ( $tries -ne 15 ) ]] ; do + sleep 3 echo "Checking for function readiness; attempt: $tries" tries=$((tries + 1)) function_status=$(${get_function_status_command}) diff --git a/README.md b/README.md index ebcb763..bbc9995 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,6 @@ Key points - additional endpoints needed by CAPI still missing - [Gcs](./gcs/README.md) - [S3](./s3/README.md) - - additional endpoints needed by CAPI still missing ## Build diff --git a/s3/README.md b/s3/README.md index f3a30c4..9fdfa65 100644 --- a/s3/README.md +++ b/s3/README.md @@ -12,23 +12,25 @@ The S3 client requires a JSON configuration file with the following structure: ``` json { - "bucket_name": " (required)", - "credentials_source": " [static|env_or_profile|none]", - "access_key_id": " (required if credentials_source = 'static')", - "secret_access_key": " (required if credentials_source = 'static')", - "region": " (optional - default: 'us-east-1')", - "host": " (optional)", - "port": (optional), - "ssl_verify_peer": (optional - default: true), - "use_ssl": (optional - default: true), - "signature_version": " (optional)", - "server_side_encryption": " (optional)", - "sse_kms_key_id": " (optional)", - "multipart_upload": (optional - default: true), - "download_concurrency": (optional - default: 5), - "download_part_size": (optional - default: 5242880), # 5 MB - "upload_concurrency": (optional - default: 5), - "upload_part_size": (optional - default: 5242880) # 5 MB + "bucket_name": " (required)", + "credentials_source": " [static|env_or_profile|none]", + "access_key_id": " (required if credentials_source = 'static')", + "secret_access_key": " (required if credentials_source = 'static')", + "region": " (optional - default: 'us-east-1')", + "host": " (optional)", + "port": (optional), + "ssl_verify_peer": (optional - default: true), + "use_ssl": (optional - default: true), + "signature_version": " (optional)", + "server_side_encryption": " (optional)", + "sse_kms_key_id": " (optional)", + "multipart_upload": (optional - default: true), + "download_concurrency": (optional - default: 5), + "download_part_size": (optional - default: 5242880), # 5 MB + "upload_concurrency": (optional - default: 5), + "upload_part_size": (optional - default: 5242880) # 5 MB + "multipart_copy_threshold": (optional - default: 5368709120) # default 5 GB + "multipart_copy_part_size": (optional - default: 104857600) # default 100 MB - must be at least 5 MB } ``` > Note: **multipart_upload** is not supported by Google - it's automatically set to false by parsing the provided 'host' diff --git a/s3/client/aws_s3_blobstore.go b/s3/client/aws_s3_blobstore.go index 1e2f5e1..06d61c2 100644 --- a/s3/client/aws_s3_blobstore.go +++ b/s3/client/aws_s3_blobstore.go @@ -1,6 +1,7 @@ package client import ( + "encoding/json" "errors" "fmt" "io" @@ -28,6 +29,10 @@ var oneTB = int64(1000 * 1024 * 1024 * 1024) const ( defaultTransferConcurrency = 5 defaultTransferPartSize = int64(5 * 1024 * 1024) // 5 MB + // For copy operations: use multipart copy only when necessary (>5GB) + // AWS CopyObject limit is 5GB, use 100MB parts for multipart copy + defaultMultipartCopyThreshold = int64(5 * 1024 * 1024 * 1024) // 5 GB + defaultMultipartCopyPartSize = int64(100 * 1024 * 1024) // 100 MB ) // awsS3Client encapsulates AWS S3 blobstore interactions @@ -228,3 +233,314 @@ func (b *awsS3Client) putSigned(objectID string, expiration time.Duration) (stri return req.URL, nil } + +func (b *awsS3Client) EnsureStorageExists() error { + slog.Info("Ensuring bucket exists", "bucket", b.s3cliConfig.BucketName) + _, err := b.s3Client.HeadBucket(context.TODO(), &s3.HeadBucketInput{ + Bucket: aws.String(b.s3cliConfig.BucketName), + }) + + if err == nil { + slog.Info("Bucket exists", "bucket", b.s3cliConfig.BucketName) + return nil + } + + var notFoundErr *types.NotFound + if !errors.As(err, ¬FoundErr) { + return fmt.Errorf("failed to check if bucket exists: %w", err) + } + + slog.Info("Bucket does not exist, creating it", "bucket", b.s3cliConfig.BucketName) + createBucketInput := &s3.CreateBucketInput{ + Bucket: aws.String(b.s3cliConfig.BucketName), + } + + // For GCS and AWS region 'us-east-1', LocationConstraint must be omitted + if !b.s3cliConfig.IsGoogle() && b.s3cliConfig.Region != "us-east-1" { + createBucketInput.CreateBucketConfiguration = &types.CreateBucketConfiguration{ + LocationConstraint: types.BucketLocationConstraint(b.s3cliConfig.Region), + } + } + + _, err = b.s3Client.CreateBucket(context.TODO(), createBucketInput) + if err != nil { + var alreadyOwned *types.BucketAlreadyOwnedByYou + var alreadyExists *types.BucketAlreadyExists + if errors.As(err, &alreadyOwned) || errors.As(err, &alreadyExists) { + slog.Warn("Bucket got created by another process", "bucket", b.s3cliConfig.BucketName) + return nil + } + return fmt.Errorf("failed to create bucket: %w", err) + } + + slog.Info("Bucket created successfully", "bucket", b.s3cliConfig.BucketName) + return nil +} + +func (b *awsS3Client) Copy(srcBlob string, dstBlob string) error { + cfg := b.s3cliConfig + + copyThreshold := defaultMultipartCopyThreshold + if cfg.MultipartCopyThreshold > 0 { + copyThreshold = cfg.MultipartCopyThreshold + } + copyPartSize := defaultMultipartCopyPartSize + if cfg.MultipartCopyPartSize > 0 { + copyPartSize = cfg.MultipartCopyPartSize + } + + headOutput, err := b.s3Client.HeadObject(context.TODO(), &s3.HeadObjectInput{ + Bucket: aws.String(cfg.BucketName), + Key: b.key(srcBlob), + }) + if err != nil { + return fmt.Errorf("failed to get object metadata: %w", err) + } + if headOutput.ContentLength == nil { + return errors.New("object content length is nil") + } + + objectSize := *headOutput.ContentLength + copySource := fmt.Sprintf("%s/%s", cfg.BucketName, *b.key(srcBlob)) + + // Use simple copy if file is below threshold + if objectSize < copyThreshold { + slog.Info("Copying object", "source", srcBlob, "destination", dstBlob, "size", objectSize) + return b.simpleCopy(copySource, dstBlob) + } + + // For large files, try multipart copy first (works for AWS, MinIO, Ceph, AliCloud) + // Fall back to simple copy if provider doesn't support UploadPartCopy (e.g., GCS) + slog.Info("Copying large object using multipart copy", "source", srcBlob, "destination", dstBlob, "size", objectSize) + + err = b.multipartCopy(copySource, dstBlob, objectSize, copyPartSize) + if err != nil { + var apiErr smithy.APIError + if errors.As(err, &apiErr) && apiErr.ErrorCode() == "NotImplemented" { + slog.Info("Multipart copy not supported by provider, falling back to simple copy", "source", srcBlob, "destination", dstBlob) + return b.simpleCopy(copySource, dstBlob) + } + return err + } + + return nil +} + +// simpleCopy performs a single CopyObject request +func (b *awsS3Client) simpleCopy(copySource string, dstBlob string) error { + cfg := b.s3cliConfig + + copyInput := &s3.CopyObjectInput{ + Bucket: aws.String(cfg.BucketName), + CopySource: aws.String(copySource), + Key: b.key(dstBlob), + } + if cfg.ServerSideEncryption != "" { + copyInput.ServerSideEncryption = types.ServerSideEncryption(cfg.ServerSideEncryption) + } + if cfg.SSEKMSKeyID != "" { + copyInput.SSEKMSKeyId = aws.String(cfg.SSEKMSKeyID) + } + + _, err := b.s3Client.CopyObject(context.TODO(), copyInput) + if err != nil { + return fmt.Errorf("failed to copy object: %w", err) + } + return nil +} + +// multipartCopy performs a multipart copy using CreateMultipartUpload, UploadPartCopy, and CompleteMultipartUpload +func (b *awsS3Client) multipartCopy(copySource string, dstBlob string, objectSize int64, copyPartSize int64) error { + cfg := b.s3cliConfig + numParts := int((objectSize + copyPartSize - 1) / copyPartSize) + + createInput := &s3.CreateMultipartUploadInput{ + Bucket: aws.String(cfg.BucketName), + Key: b.key(dstBlob), + } + if cfg.ServerSideEncryption != "" { + createInput.ServerSideEncryption = types.ServerSideEncryption(cfg.ServerSideEncryption) + } + if cfg.SSEKMSKeyID != "" { + createInput.SSEKMSKeyId = aws.String(cfg.SSEKMSKeyID) + } + + createOutput, err := b.s3Client.CreateMultipartUpload(context.TODO(), createInput) + if err != nil { + return fmt.Errorf("failed to create multipart upload: %w", err) + } + + uploadID := *createOutput.UploadId + + var completed bool + defer func() { + if !completed { + _, err := b.s3Client.AbortMultipartUpload(context.TODO(), &s3.AbortMultipartUploadInput{ + Bucket: aws.String(cfg.BucketName), + Key: b.key(dstBlob), + UploadId: aws.String(uploadID), + }) + if err != nil { + slog.Warn("Failed to abort multipart upload", "uploadId", uploadID, "error", err) + } + } + }() + + completedParts := make([]types.CompletedPart, 0, numParts) + for i := 0; i < numParts; i++ { + partNumber := int32(i + 1) + start := int64(i) * copyPartSize + end := start + copyPartSize - 1 + if end >= objectSize { + end = objectSize - 1 + } + byteRange := fmt.Sprintf("bytes=%d-%d", start, end) + + output, err := b.s3Client.UploadPartCopy(context.TODO(), &s3.UploadPartCopyInput{ + Bucket: aws.String(cfg.BucketName), + CopySource: aws.String(copySource), + CopySourceRange: aws.String(byteRange), + Key: b.key(dstBlob), + PartNumber: aws.Int32(partNumber), + UploadId: aws.String(uploadID), + }) + if err != nil { + return fmt.Errorf("failed to copy part %d: %w", partNumber, err) + } + + completedParts = append(completedParts, types.CompletedPart{ + ETag: output.CopyPartResult.ETag, + PartNumber: aws.Int32(partNumber), + }) + slog.Debug("Copied part", "part", partNumber, "range", byteRange) + } + + _, err = b.s3Client.CompleteMultipartUpload(context.TODO(), &s3.CompleteMultipartUploadInput{ + Bucket: aws.String(cfg.BucketName), + Key: b.key(dstBlob), + UploadId: aws.String(uploadID), + MultipartUpload: &types.CompletedMultipartUpload{ + Parts: completedParts, + }, + }) + if err != nil { + return fmt.Errorf("failed to complete multipart upload: %w", err) + } + + completed = true + slog.Debug("Multipart copy completed successfully", "parts", numParts) + return nil +} + +type BlobProperties struct { + ETag string `json:"etag,omitempty"` + LastModified time.Time `json:"last_modified,omitempty"` + ContentLength int64 `json:"content_length,omitempty"` +} + +func (b *awsS3Client) Properties(dest string) error { + slog.Info("fetching blob properties", "bucket", b.s3cliConfig.BucketName, "blob", dest) + + headObjectOutput, err := b.s3Client.HeadObject(context.TODO(), &s3.HeadObjectInput{ + Bucket: aws.String(b.s3cliConfig.BucketName), + Key: b.key(dest), + }) + + if err != nil { + var apiErr smithy.APIError + if errors.As(err, &apiErr) && apiErr.ErrorCode() == "NotFound" { + fmt.Println(`{}`) + return nil + } + return fmt.Errorf("failed to fetch blob properties: %w", err) + } + + properties := BlobProperties{} + if headObjectOutput.ETag != nil { + properties.ETag = strings.Trim(*headObjectOutput.ETag, `"`) + } + if headObjectOutput.LastModified != nil { + properties.LastModified = *headObjectOutput.LastModified + } + if headObjectOutput.ContentLength != nil { + properties.ContentLength = *headObjectOutput.ContentLength + } + + output, err := json.MarshalIndent(properties, "", " ") + if err != nil { + return fmt.Errorf("failed to marshal blob properties: %w", err) + } + + fmt.Println(string(output)) + + return nil +} + +func (b *awsS3Client) List(prefix string) ([]string, error) { + input := &s3.ListObjectsV2Input{ + Bucket: aws.String(b.s3cliConfig.BucketName), + } + + if prefix != "" { + slog.Info("Listing all objects in bucket with prefix", "bucket", b.s3cliConfig.BucketName, "prefix", prefix) + input.Prefix = b.key(prefix) + } else { + slog.Info("Listing all objects in bucket", "bucket", b.s3cliConfig.BucketName) + } + + var names []string + objectPaginator := s3.NewListObjectsV2Paginator(b.s3Client, input) + for objectPaginator.HasMorePages() { + page, err := objectPaginator.NextPage(context.TODO()) + if err != nil { + return nil, fmt.Errorf("failed to list objects: %w", err) + } + + if len(page.Contents) == 0 { + continue + } + + for _, obj := range page.Contents { + names = append(names, *obj.Key) + } + } + + return names, nil +} + +func (b *awsS3Client) DeleteRecursive(prefix string) error { + input := &s3.ListObjectsV2Input{ + Bucket: aws.String(b.s3cliConfig.BucketName), + } + + if prefix != "" { + slog.Info("Deleting all objects in bucket with given prefix", "bucket", b.s3cliConfig.BucketName, "prefix", prefix) + input.Prefix = b.key(prefix) + } else { + slog.Info("Deleting all objects in bucket", "bucket", b.s3cliConfig.BucketName) + } + + objectPaginator := s3.NewListObjectsV2Paginator(b.s3Client, input) + for objectPaginator.HasMorePages() { + page, err := objectPaginator.NextPage(context.TODO()) + if err != nil { + return fmt.Errorf("failed to list objects for deletion: %w", err) + } + + for _, obj := range page.Contents { + slog.Debug("Deleting object", "key", *obj.Key) + _, err := b.s3Client.DeleteObject(context.TODO(), &s3.DeleteObjectInput{ + Bucket: aws.String(b.s3cliConfig.BucketName), + Key: obj.Key, + }) + if err != nil { + var apiErr smithy.APIError + if errors.As(err, &apiErr) && (apiErr.ErrorCode() == "NotFound" || apiErr.ErrorCode() == "NoSuchKey") { + continue // Object already deleted, which is fine + } + return fmt.Errorf("failed to delete object '%s': %w", *obj.Key, err) + } + } + } + return nil +} diff --git a/s3/client/client.go b/s3/client/client.go index f5853cc..40383b0 100644 --- a/s3/client/client.go +++ b/s3/client/client.go @@ -1,7 +1,6 @@ package client import ( - "errors" "os" "time" @@ -65,25 +64,24 @@ func (c *S3CompatibleClient) Sign(objectID string, action string, expiration tim } func (c *S3CompatibleClient) EnsureStorageExists() error { - return errors.New("not implemented") - + return c.awsS3BlobstoreClient.EnsureStorageExists() } func (c *S3CompatibleClient) Copy(srcBlob string, dstBlob string) error { - return errors.New("not implemented") + return c.awsS3BlobstoreClient.Copy(srcBlob, dstBlob) } func (c *S3CompatibleClient) Properties(dest string) error { - return errors.New("not implemented") + return c.awsS3BlobstoreClient.Properties(dest) } func (c *S3CompatibleClient) List(prefix string) ([]string, error) { - return nil, errors.New("not implemented") + return c.awsS3BlobstoreClient.List(prefix) } func (c *S3CompatibleClient) DeleteRecursive(prefix string) error { - return errors.New("not implemented") + return c.awsS3BlobstoreClient.DeleteRecursive(prefix) } diff --git a/s3/config/config.go b/s3/config/config.go index 2241e02..b174f4d 100644 --- a/s3/config/config.go +++ b/s3/config/config.go @@ -33,11 +33,19 @@ type S3Cli struct { // Optional knobs to tune transfer performance. // If zero, the client will apply sensible defaults (handled by the S3 client layer). // Part size values are provided in bytes. - DownloadConcurrency int `json:"download_concurrency"` - DownloadPartSize int64 `json:"download_part_size"` - UploadConcurrency int `json:"upload_concurrency"` - UploadPartSize int64 `json:"upload_part_size"` -} + DownloadConcurrency int `json:"download_concurrency"` + DownloadPartSize int64 `json:"download_part_size"` + UploadConcurrency int `json:"upload_concurrency"` + UploadPartSize int64 `json:"upload_part_size"` + MultipartCopyThreshold int64 `json:"multipart_copy_threshold"` // Default: 5GB - files larger than this use multipart copy + MultipartCopyPartSize int64 `json:"multipart_copy_part_size"` // Default: 100MB - size of each part in multipart copy +} + +const ( + // multipartCopyMinPartSize is the AWS minimum part size for multipart operations. + // Other providers may have different limits - users should consult their provider's documentation. + multipartCopyMinPartSize = 5 * 1024 * 1024 // 5MB - AWS minimum part size +) const defaultAWSRegion = "us-east-1" //nolint:unused @@ -98,6 +106,19 @@ func NewFromReader(reader io.Reader) (S3Cli, error) { return S3Cli{}, errors.New("download/upload concurrency and part sizes must be non-negative") } + // Validate multipart copy settings (0 means "use defaults") + // Note: Default threshold is 5GB (AWS limit), but users can configure higher values for providers + // that support larger simple copies (e.g., GCS has no limit). Users should consult their provider's documentation. + if c.MultipartCopyThreshold < 0 { + return S3Cli{}, errors.New("multipart_copy_threshold must be non-negative (0 means use default)") + } + if c.MultipartCopyPartSize < 0 { + return S3Cli{}, errors.New("multipart_copy_part_size must be non-negative (0 means use default)") + } + if c.MultipartCopyPartSize > 0 && c.MultipartCopyPartSize < multipartCopyMinPartSize { + return S3Cli{}, fmt.Errorf("multipart_copy_part_size must be at least %d bytes (5MB - AWS minimum)", multipartCopyMinPartSize) + } + switch c.CredentialsSource { case StaticCredentialsSource: if c.AccessKeyID == "" || c.SecretAccessKey == "" { diff --git a/s3/config/config_test.go b/s3/config/config_test.go index 75b2c8c..70eca8c 100644 --- a/s3/config/config_test.go +++ b/s3/config/config_test.go @@ -327,6 +327,93 @@ var _ = Describe("BlobstoreClient configuration", func() { _, err = config.NewFromReader(dummyJSONReader) Expect(err).To(MatchError("download/upload concurrency and part sizes must be non-negative")) }) + + Describe("multipart copy tuning fields", func() { + It("rejects negative multipart copy threshold", func() { + dummyJSONBytes := []byte(`{ + "access_key_id":"id", + "secret_access_key":"key", + "bucket_name":"some-bucket", + "multipart_copy_threshold": -1 + }`) + dummyJSONReader := bytes.NewReader(dummyJSONBytes) + + _, err := config.NewFromReader(dummyJSONReader) + Expect(err).To(MatchError("multipart_copy_threshold must be non-negative (0 means use default)")) + }) + + It("rejects negative multipart copy part size", func() { + dummyJSONBytes := []byte(`{ + "access_key_id":"id", + "secret_access_key":"key", + "bucket_name":"some-bucket", + "multipart_copy_part_size": -1 + }`) + dummyJSONReader := bytes.NewReader(dummyJSONBytes) + + _, err := config.NewFromReader(dummyJSONReader) + Expect(err).To(MatchError("multipart_copy_part_size must be non-negative (0 means use default)")) + }) + + It("rejects multipart copy part size below AWS minimum", func() { + dummyJSONBytes := []byte(`{ + "access_key_id":"id", + "secret_access_key":"key", + "bucket_name":"some-bucket", + "multipart_copy_part_size": 1048576 + }`) + dummyJSONReader := bytes.NewReader(dummyJSONBytes) + + _, err := config.NewFromReader(dummyJSONReader) + Expect(err).To(MatchError("multipart_copy_part_size must be at least 5242880 bytes (5MB - AWS minimum)")) + }) + + It("accepts zero values (use defaults)", func() { + dummyJSONBytes := []byte(`{ + "access_key_id":"id", + "secret_access_key":"key", + "bucket_name":"some-bucket", + "multipart_copy_threshold": 0, + "multipart_copy_part_size": 0 + }`) + dummyJSONReader := bytes.NewReader(dummyJSONBytes) + + c, err := config.NewFromReader(dummyJSONReader) + Expect(err).ToNot(HaveOccurred()) + Expect(c.MultipartCopyThreshold).To(Equal(int64(0))) + Expect(c.MultipartCopyPartSize).To(Equal(int64(0))) + }) + + It("accepts valid custom values", func() { + dummyJSONBytes := []byte(`{ + "access_key_id":"id", + "secret_access_key":"key", + "bucket_name":"some-bucket", + "multipart_copy_threshold": 1073741824, + "multipart_copy_part_size": 104857600 + }`) + dummyJSONReader := bytes.NewReader(dummyJSONBytes) + + c, err := config.NewFromReader(dummyJSONReader) + Expect(err).ToNot(HaveOccurred()) + Expect(c.MultipartCopyThreshold).To(Equal(int64(1073741824))) // 1GB + Expect(c.MultipartCopyPartSize).To(Equal(int64(104857600))) // 100MB + }) + + It("accepts threshold above AWS limit for providers with higher limits", func() { + dummyJSONBytes := []byte(`{ + "access_key_id":"id", + "secret_access_key":"key", + "bucket_name":"some-bucket", + "multipart_copy_threshold": 10737418240 + }`) + dummyJSONReader := bytes.NewReader(dummyJSONBytes) + + c, err := config.NewFromReader(dummyJSONReader) + Expect(err).ToNot(HaveOccurred()) + Expect(c.MultipartCopyThreshold).To(Equal(int64(10737418240))) // 10GB + }) + }) }) Describe("returning the S3 endpoint", func() { diff --git a/s3/integration/assertions.go b/s3/integration/assertions.go index fb05003..4fe8feb 100644 --- a/s3/integration/assertions.go +++ b/s3/integration/assertions.go @@ -2,6 +2,7 @@ package integration import ( "context" + "errors" "fmt" "io" "log" @@ -90,6 +91,39 @@ func AssertLifecycleWorks(s3CLIPath string, cfg *config.S3Cli) { Expect(err).ToNot(HaveOccurred()) Expect(string(gottenBytes)).To(Equal(expectedString)) + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "properties", s3Filename) + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + Expect(s3CLISession.Out.Contents()).To(ContainSubstring(fmt.Sprintf("\"content_length\": %d", len(expectedString)))) + Expect(s3CLISession.Out.Contents()).To(ContainSubstring("\"etag\":")) + Expect(s3CLISession.Out.Contents()).To(ContainSubstring("\"last_modified\":")) + + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "copy", s3Filename, s3Filename+"_copy") + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "exists", s3Filename+"_copy") + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + + tmpCopiedFile, err := os.CreateTemp("", "s3cli-download-copy") + Expect(err).ToNot(HaveOccurred()) + err = tmpCopiedFile.Close() + Expect(err).ToNot(HaveOccurred()) + defer os.Remove(tmpCopiedFile.Name()) //nolint:errcheck + + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "get", s3Filename+"_copy", tmpCopiedFile.Name()) + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + + copiedBytes, err := os.ReadFile(tmpCopiedFile.Name()) + Expect(err).ToNot(HaveOccurred()) + Expect(string(copiedBytes)).To(Equal(expectedString)) + + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "delete", s3Filename+"_copy") + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "delete", s3Filename) Expect(err).ToNot(HaveOccurred()) Expect(s3CLISession.ExitCode()).To(BeZero()) @@ -97,6 +131,175 @@ func AssertLifecycleWorks(s3CLIPath string, cfg *config.S3Cli) { s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "exists", s3Filename) Expect(err).ToNot(HaveOccurred()) Expect(s3CLISession.ExitCode()).To(Equal(3)) + + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "properties", s3Filename) + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + Expect(s3CLISession.Out.Contents()).To(ContainSubstring("{}")) + +} + +// AssertMultipartCopyWorks tests multipart copy functionality by setting a low threshold +// This forces a small file to be copied using multipart copy (2 parts) +func AssertMultipartCopyWorks(s3CLIPath string, cfg *config.S3Cli) { + storageType := "s3" + s3Filename := GenerateRandomString() + + // Create a 15MB file content (will result in 2-3 parts with 5MB minimum part size) + // We use 15MB to ensure we get at least 2 parts when threshold is set to 10MB + contentSize := 15 * 1024 * 1024 // 15 MB + expectedContent := GenerateRandomString(contentSize) + + // Configure low multipart copy threshold to force multipart copy + // Threshold: 10MB - files larger than this use multipart copy + // Part size: 5MB (AWS minimum) - so our 15MB file will be split into 3 parts + cfg.MultipartCopyThreshold = 10 * 1024 * 1024 // 10 MB + cfg.MultipartCopyPartSize = 5 * 1024 * 1024 // 5 MB (AWS minimum) + + configPath := MakeConfigFile(cfg) + defer os.Remove(configPath) //nolint:errcheck + + contentFile := MakeContentFile(expectedContent) + defer os.Remove(contentFile) //nolint:errcheck + + // Upload the test file + s3CLISession, err := RunS3CLI(s3CLIPath, configPath, storageType, "put", contentFile, s3Filename) + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + + // Copy the file - this should trigger multipart copy since file size (15MB) > threshold (10MB) + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "copy", s3Filename, s3Filename+"_multipart_copy") + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + + // Verify the copied file exists + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "exists", s3Filename+"_multipart_copy") + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + + // Download and verify content matches + tmpCopiedFile, err := os.CreateTemp("", "s3cli-download-multipart-copy") + Expect(err).ToNot(HaveOccurred()) + err = tmpCopiedFile.Close() + Expect(err).ToNot(HaveOccurred()) + defer os.Remove(tmpCopiedFile.Name()) //nolint:errcheck + + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "get", s3Filename+"_multipart_copy", tmpCopiedFile.Name()) + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + + copiedBytes, err := os.ReadFile(tmpCopiedFile.Name()) + Expect(err).ToNot(HaveOccurred()) + Expect(string(copiedBytes)).To(Equal(expectedContent)) + + // Verify file size matches + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "properties", s3Filename+"_multipart_copy") + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + Expect(s3CLISession.Out.Contents()).To(ContainSubstring(fmt.Sprintf("\"content_length\": %d", contentSize))) + + // Clean up + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "delete", s3Filename+"_multipart_copy") + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "delete", s3Filename) + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) +} + +func AssertOnBulkOperations(s3CLIPath string, cfg *config.S3Cli) { + storageType := "s3" + numFiles := 5 + s3FilenamePrefix := GenerateRandomString() + localFile := MakeContentFile(GenerateRandomString()) + defer os.Remove(localFile) //nolint:errcheck + + configPath := MakeConfigFile(cfg) + defer os.Remove(configPath) //nolint:errcheck + + for i := 0; i < numFiles; i++ { + suffix := strings.Repeat("A", i) + s3Filename := fmt.Sprintf("%s%s", s3FilenamePrefix, suffix) + + s3CLISession, err := RunS3CLI(s3CLIPath, configPath, storageType, "put", localFile, s3Filename) + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + } + + s3CLISession, err := RunS3CLI(s3CLIPath, configPath, storageType, "list", s3FilenamePrefix) + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + output := strings.TrimSpace(string(s3CLISession.Out.Contents())) + Expect(strings.Split(output, "\n")).To(HaveLen(numFiles)) + + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "delete-recursive", fmt.Sprintf("%sAAA", s3FilenamePrefix)) + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "list", s3FilenamePrefix) + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + output = strings.TrimSpace(string(s3CLISession.Out.Contents())) + Expect(strings.Split(output, "\n")).To(HaveLen(3)) + + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "delete-recursive") + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "list", s3FilenamePrefix) + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + output = strings.TrimSpace(string(s3CLISession.Out.Contents())) + Expect(output).To(BeEmpty()) +} + +func AssertOnStorageExists(s3CLIPath string, cfg *config.S3Cli) { + cfgCopy := *cfg + cfgCopy.BucketName = fmt.Sprintf("%s-%s", cfg.BucketName, strings.ToLower(GenerateRandomString(4))) + + configPath := MakeConfigFile(&cfgCopy) + defer os.Remove(configPath) //nolint:errcheck + + // Create a single verification/cleanup client from the config file. + // This ensures it has the exact same settings as the CLI will use. + verificationCfgFile, err := os.Open(configPath) + Expect(err).ToNot(HaveOccurred()) + defer verificationCfgFile.Close() //nolint:errcheck + + verificationCfg, err := config.NewFromReader(verificationCfgFile) + Expect(err).ToNot(HaveOccurred()) + + verificationClient, err := client.NewAwsS3Client(&verificationCfg) + Expect(err).ToNot(HaveOccurred()) + + // Defer the cleanup to ensure the bucket is deleted after the test. + defer func() { + _, err := verificationClient.DeleteBucket(context.TODO(), &s3.DeleteBucketInput{ + Bucket: aws.String(cfgCopy.BucketName), + }) + // A NoSuchBucket error is acceptable if the bucket was never created or already cleaned up. + var noSuchBucketErr *types.NoSuchBucket + if err != nil && !errors.As(err, &noSuchBucketErr) { + Expect(err).ToNot(HaveOccurred(), "Failed to clean up test bucket") + } + }() + + // --- Scenario 1: Bucket does not exist, should be created --- + s3CLISession, err := RunS3CLI(s3CLIPath, configPath, "s3", "ensure-storage-exists") + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + + // Verify the bucket now exists using the client created earlier. + _, headBucketErr := verificationClient.HeadBucket(context.TODO(), &s3.HeadBucketInput{ + Bucket: aws.String(cfgCopy.BucketName), + }) + Expect(headBucketErr).ToNot(HaveOccurred(), "Bucket should have been created by 'ensure-storage-exists'") + + // --- Scenario 2: Bucket already exists, command should still succeed (idempotency) --- + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, "s3", "ensure-storage-exists") + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) } func AssertOnPutFailures(cfg *config.S3Cli, content, errorMessage string) { diff --git a/s3/integration/aws_iam_role_test.go b/s3/integration/aws_iam_role_test.go index e11251c..a4f22ad 100644 --- a/s3/integration/aws_iam_role_test.go +++ b/s3/integration/aws_iam_role_test.go @@ -46,6 +46,14 @@ var _ = Describe("Testing inside an AWS compute resource with an IAM role", func func(cfg *config.S3Cli) { integration.AssertLifecycleWorks(s3CLIPath, cfg) }, configurations, ) + DescribeTable("Invoking `ensure-storage-exists` works", + func(cfg *config.S3Cli) { integration.AssertOnStorageExists(s3CLIPath, cfg) }, + configurations, + ) + DescribeTable("Blobstore bulk operations work", + func(cfg *config.S3Cli) { integration.AssertOnBulkOperations(s3CLIPath, cfg) }, + configurations, + ) DescribeTable("Invoking `s3cli get` on a non-existent-key fails", func(cfg *config.S3Cli) { integration.AssertGetNonexistentFails(s3CLIPath, cfg) }, configurations, @@ -54,5 +62,9 @@ var _ = Describe("Testing inside an AWS compute resource with an IAM role", func func(cfg *config.S3Cli) { integration.AssertDeleteNonexistentWorks(s3CLIPath, cfg) }, configurations, ) + DescribeTable("Multipart copy works with low threshold", + func(cfg *config.S3Cli) { integration.AssertMultipartCopyWorks(s3CLIPath, cfg) }, + configurations, + ) }) }) diff --git a/s3/integration/aws_us_east_test.go b/s3/integration/aws_us_east_test.go index db81450..cd50570 100644 --- a/s3/integration/aws_us_east_test.go +++ b/s3/integration/aws_us_east_test.go @@ -46,6 +46,14 @@ var _ = Describe("Testing only in us-east-1", func() { func(cfg *config.S3Cli) { integration.AssertLifecycleWorks(s3CLIPath, cfg) }, configurations, ) + DescribeTable("Invoking `ensure-storage-exists` works", + func(cfg *config.S3Cli) { integration.AssertOnStorageExists(s3CLIPath, cfg) }, + configurations, + ) + DescribeTable("Blobstore bulk operations work", + func(cfg *config.S3Cli) { integration.AssertOnBulkOperations(s3CLIPath, cfg) }, + configurations, + ) DescribeTable("Invoking `s3cli get` on a non-existent-key fails", func(cfg *config.S3Cli) { integration.AssertGetNonexistentFails(s3CLIPath, cfg) }, configurations, @@ -54,5 +62,9 @@ var _ = Describe("Testing only in us-east-1", func() { func(cfg *config.S3Cli) { integration.AssertDeleteNonexistentWorks(s3CLIPath, cfg) }, configurations, ) + DescribeTable("Multipart copy works with low threshold", + func(cfg *config.S3Cli) { integration.AssertMultipartCopyWorks(s3CLIPath, cfg) }, + configurations, + ) }) }) diff --git a/s3/integration/general_aws_test.go b/s3/integration/general_aws_test.go index f114375..2871901 100644 --- a/s3/integration/general_aws_test.go +++ b/s3/integration/general_aws_test.go @@ -58,6 +58,15 @@ var _ = Describe("General testing for all AWS regions", func() { func(cfg *config.S3Cli) { integration.AssertLifecycleWorks(s3CLIPath, cfg) }, configurations, ) + + DescribeTable("Invoking `ensure-storage-exists` works", + func(cfg *config.S3Cli) { integration.AssertOnStorageExists(s3CLIPath, cfg) }, + configurations, + ) + DescribeTable("Blobstore bulk operations work", + func(cfg *config.S3Cli) { integration.AssertOnBulkOperations(s3CLIPath, cfg) }, + configurations, + ) DescribeTable("Invoking `s3cli get` on a non-existent-key fails", func(cfg *config.S3Cli) { integration.AssertGetNonexistentFails(s3CLIPath, cfg) }, configurations, @@ -70,6 +79,10 @@ var _ = Describe("General testing for all AWS regions", func() { func(cfg *config.S3Cli) { integration.AssertOnSignedURLs(s3CLIPath, cfg) }, configurations, ) + DescribeTable("Multipart copy works with low threshold", + func(cfg *config.S3Cli) { integration.AssertMultipartCopyWorks(s3CLIPath, cfg) }, + configurations, + ) configurations = []TableEntry{ Entry("with encryption", &config.S3Cli{ diff --git a/s3/integration/s3_compatible_test.go b/s3/integration/s3_compatible_test.go index 965b41c..698ed74 100644 --- a/s3/integration/s3_compatible_test.go +++ b/s3/integration/s3_compatible_test.go @@ -65,6 +65,14 @@ var _ = Describe("Testing in any non-AWS, S3 compatible storage service", func() func(cfg *config.S3Cli) { integration.AssertLifecycleWorks(s3CLIPath, cfg) }, configurations, ) + DescribeTable("Invoking `ensure-storage-exists` works", + func(cfg *config.S3Cli) { integration.AssertOnStorageExists(s3CLIPath, cfg) }, + configurations, + ) + DescribeTable("Blobstore bulk operations work", + func(cfg *config.S3Cli) { integration.AssertOnBulkOperations(s3CLIPath, cfg) }, + configurations, + ) DescribeTable("Invoking `s3cli get` on a non-existent-key fails", func(cfg *config.S3Cli) { integration.AssertGetNonexistentFails(s3CLIPath, cfg) }, configurations, @@ -81,5 +89,9 @@ var _ = Describe("Testing in any non-AWS, S3 compatible storage service", func() func(cfg *config.S3Cli) { integration.AssertOnSignedURLs(s3CLIPath, cfg) }, configurations, ) + DescribeTable("Multipart copy works with low threshold", + func(cfg *config.S3Cli) { integration.AssertMultipartCopyWorks(s3CLIPath, cfg) }, + configurations, + ) }) })