diff --git a/.github/scripts/s3/assets/cloudformation-s3cli-iam.template.json b/.github/scripts/s3/assets/cloudformation-s3cli-iam.template.json index e836b65..89fb729 100644 --- a/.github/scripts/s3/assets/cloudformation-s3cli-iam.template.json +++ b/.github/scripts/s3/assets/cloudformation-s3cli-iam.template.json @@ -40,6 +40,20 @@ "Effect": "Allow", "Resource": "arn:aws:logs:*:*:*" }, + { + "Action": ["s3:CreateBucket", "s3:HeadBucket", "s3:DeleteBucket"], + "Effect": "Allow", + "Resource": { + "Fn::Join": [ + "", + [ + "arn:aws:s3:::", + { "Ref": "S3Bucket" }, + "*" + ] + ] + } + }, { "Action": [ "s3:GetObject*", @@ -67,6 +81,26 @@ "/*" ] ] + }, + { + "Fn::Join": [ + "", + [ + "arn:aws:s3:::", + { "Ref": "S3Bucket" }, + "*" + ] + ] + }, + { + "Fn::Join": [ + "", + [ + "arn:aws:s3:::", + { "Ref": "S3Bucket" }, + "*/*" + ] + ] } ] } diff --git a/README.md b/README.md index ebcb763..bbc9995 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,6 @@ Key points - additional endpoints needed by CAPI still missing - [Gcs](./gcs/README.md) - [S3](./s3/README.md) - - additional endpoints needed by CAPI still missing ## Build diff --git a/s3/client/aws_s3_blobstore.go b/s3/client/aws_s3_blobstore.go index 1e2f5e1..b5bebb9 100644 --- a/s3/client/aws_s3_blobstore.go +++ b/s3/client/aws_s3_blobstore.go @@ -1,6 +1,7 @@ package client import ( + "encoding/json" "errors" "fmt" "io" @@ -228,3 +229,186 @@ func (b *awsS3Client) putSigned(objectID string, expiration time.Duration) (stri return req.URL, nil } + +func (b *awsS3Client) EnsureStorageExists() error { + slog.Info("Ensuring bucket exists", "bucket", b.s3cliConfig.BucketName) + _, err := b.s3Client.HeadBucket(context.TODO(), &s3.HeadBucketInput{ + Bucket: aws.String(b.s3cliConfig.BucketName), + }) + + if err == nil { + slog.Info("Bucket exists", "bucket", b.s3cliConfig.BucketName) + return nil + } + + var apiErr smithy.APIError + if !errors.As(err, &apiErr) || apiErr.ErrorCode() != "NotFound" { + return fmt.Errorf("failed to check if bucket exists: %w", err) + } + + slog.Info("Bucket does not exist, creating it", "bucket", b.s3cliConfig.BucketName) + createBucketInput := &s3.CreateBucketInput{ + Bucket: aws.String(b.s3cliConfig.BucketName), + } + + // For GCS and AWS region 'us-east-1', LocationConstraint must be omitted + if !b.s3cliConfig.IsGoogle() && b.s3cliConfig.Region != "us-east-1" { + createBucketInput.CreateBucketConfiguration = &types.CreateBucketConfiguration{ + LocationConstraint: types.BucketLocationConstraint(b.s3cliConfig.Region), + } + } + + _, err = b.s3Client.CreateBucket(context.TODO(), createBucketInput) + if err != nil { + var alreadyOwned *types.BucketAlreadyOwnedByYou + var alreadyExists *types.BucketAlreadyExists + if errors.As(err, &alreadyOwned) || errors.As(err, &alreadyExists) { + slog.Warn("Bucket got created by another process", "bucket", b.s3cliConfig.BucketName) + return nil + } + return fmt.Errorf("failed to create bucket: %w", err) + } + + slog.Info("Bucket created successfully", "bucket", b.s3cliConfig.BucketName) + return nil +} + +func (b *awsS3Client) Copy(srcBlob string, dstBlob string) error { + slog.Info("Copying object within s3 bucket", "bucket", b.s3cliConfig.BucketName, "source_blob", srcBlob, "destination_blob", dstBlob) + + copySource := fmt.Sprintf("%s/%s", b.s3cliConfig.BucketName, *b.key(srcBlob)) + + _, err := b.s3Client.CopyObject(context.TODO(), &s3.CopyObjectInput{ + Bucket: aws.String(b.s3cliConfig.BucketName), + CopySource: aws.String(copySource), + Key: b.key(dstBlob), + }) + if err != nil { + return fmt.Errorf("failed to copy object: %w", err) + } + + waiter := s3.NewObjectExistsWaiter(b.s3Client) + err = waiter.Wait(context.TODO(), &s3.HeadObjectInput{ + Bucket: aws.String(b.s3cliConfig.BucketName), + Key: b.key(dstBlob), + }, 15*time.Minute) + + if err != nil { + return fmt.Errorf("failed waiting for object to exist after copy: %w", err) + } + + return nil +} + +type BlobProperties struct { + ETag string `json:"etag,omitempty"` + LastModified time.Time `json:"last_modified,omitempty"` + ContentLength int64 `json:"content_length,omitempty"` +} + +func (b *awsS3Client) Properties(dest string) error { + slog.Info("Fetching blob properties", "bucket", b.s3cliConfig.BucketName, "blob", dest) + + headObjectOutput, err := b.s3Client.HeadObject(context.TODO(), &s3.HeadObjectInput{ + Bucket: aws.String(b.s3cliConfig.BucketName), + Key: b.key(dest), + }) + + if err != nil { + var apiErr smithy.APIError + if errors.As(err, &apiErr) && apiErr.ErrorCode() == "NotFound" { + fmt.Println(`{}`) + return nil + } + return fmt.Errorf("failed to fetch blob properties: %w", err) + } + + properties := BlobProperties{} + if headObjectOutput.ETag != nil { + properties.ETag = strings.Trim(*headObjectOutput.ETag, `"`) + } + if headObjectOutput.LastModified != nil { + properties.LastModified = *headObjectOutput.LastModified + } + if headObjectOutput.ContentLength != nil { + properties.ContentLength = *headObjectOutput.ContentLength + } + + output, err := json.MarshalIndent(properties, "", " ") + if err != nil { + return fmt.Errorf("failed to marshal blob properties: %w", err) + } + + fmt.Println(string(output)) + + return nil +} + +func (b *awsS3Client) List(prefix string) ([]string, error) { + input := &s3.ListObjectsV2Input{ + Bucket: aws.String(b.s3cliConfig.BucketName), + } + + if prefix != "" { + slog.Info("Listing all objects in bucket with prefix", "bucket", b.s3cliConfig.BucketName, "prefix", prefix) + input.Prefix = b.key(prefix) + } else { + slog.Info("Listing all objects in bucket", "bucket", b.s3cliConfig.BucketName) + } + + var names []string + objectPaginator := s3.NewListObjectsV2Paginator(b.s3Client, input) + for objectPaginator.HasMorePages() { + page, err := objectPaginator.NextPage(context.TODO()) + if err != nil { + return nil, fmt.Errorf("failed to list objects: %w", err) + } + + if len(page.Contents) == 0 { + continue + } + + for _, obj := range page.Contents { + names = append(names, *obj.Key) + } + } + + return names, nil +} + +func (b *awsS3Client) DeleteRecursive(prefix string) error { + input := &s3.ListObjectsV2Input{ + Bucket: aws.String(b.s3cliConfig.BucketName), + } + + if prefix != "" { + slog.Info("Deleting all objects in bucket with given prefix", "bucket", b.s3cliConfig.BucketName, "prefix", prefix) + input.Prefix = b.key(prefix) + } else { + slog.Info("Deleting all objects in bucket", "bucket", b.s3cliConfig.BucketName) + } + + objectPaginator := s3.NewListObjectsV2Paginator(b.s3Client, input) + for objectPaginator.HasMorePages() { + page, err := objectPaginator.NextPage(context.TODO()) + if err != nil { + return fmt.Errorf("failed to list objects for deletion: %w", err) + } + + for _, obj := range page.Contents { + slog.Debug("Deleting object", "key", *obj.Key) + _, err := b.s3Client.DeleteObject(context.TODO(), &s3.DeleteObjectInput{ + Bucket: aws.String(b.s3cliConfig.BucketName), + Key: obj.Key, + }) + if err != nil { + var apiErr smithy.APIError + if errors.As(err, &apiErr) && (apiErr.ErrorCode() == "NotFound" || apiErr.ErrorCode() == "NoSuchKey") { + continue // Object already deleted, which is fine + } + return fmt.Errorf("failed to delete object '%s': %w", *obj.Key, err) + } + } + } + return nil +} diff --git a/s3/client/client.go b/s3/client/client.go index f5853cc..40383b0 100644 --- a/s3/client/client.go +++ b/s3/client/client.go @@ -1,7 +1,6 @@ package client import ( - "errors" "os" "time" @@ -65,25 +64,24 @@ func (c *S3CompatibleClient) Sign(objectID string, action string, expiration tim } func (c *S3CompatibleClient) EnsureStorageExists() error { - return errors.New("not implemented") - + return c.awsS3BlobstoreClient.EnsureStorageExists() } func (c *S3CompatibleClient) Copy(srcBlob string, dstBlob string) error { - return errors.New("not implemented") + return c.awsS3BlobstoreClient.Copy(srcBlob, dstBlob) } func (c *S3CompatibleClient) Properties(dest string) error { - return errors.New("not implemented") + return c.awsS3BlobstoreClient.Properties(dest) } func (c *S3CompatibleClient) List(prefix string) ([]string, error) { - return nil, errors.New("not implemented") + return c.awsS3BlobstoreClient.List(prefix) } func (c *S3CompatibleClient) DeleteRecursive(prefix string) error { - return errors.New("not implemented") + return c.awsS3BlobstoreClient.DeleteRecursive(prefix) } diff --git a/s3/integration/assertions.go b/s3/integration/assertions.go index fb05003..44d8c9d 100644 --- a/s3/integration/assertions.go +++ b/s3/integration/assertions.go @@ -2,6 +2,7 @@ package integration import ( "context" + "errors" "fmt" "io" "log" @@ -90,6 +91,39 @@ func AssertLifecycleWorks(s3CLIPath string, cfg *config.S3Cli) { Expect(err).ToNot(HaveOccurred()) Expect(string(gottenBytes)).To(Equal(expectedString)) + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "properties", s3Filename) + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + Expect(s3CLISession.Out.Contents()).To(ContainSubstring(fmt.Sprintf("\"content_length\": %d", len(expectedString)))) + Expect(s3CLISession.Out.Contents()).To(ContainSubstring("\"etag\":")) + Expect(s3CLISession.Out.Contents()).To(ContainSubstring("\"last_modified\":")) + + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "copy", s3Filename, s3Filename+"_copy") + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "exists", s3Filename+"_copy") + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + + tmpCopiedFile, err := os.CreateTemp("", "s3cli-download-copy") + Expect(err).ToNot(HaveOccurred()) + err = tmpCopiedFile.Close() + Expect(err).ToNot(HaveOccurred()) + defer os.Remove(tmpCopiedFile.Name()) //nolint:errcheck + + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "get", s3Filename+"_copy", tmpCopiedFile.Name()) + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + + copiedBytes, err := os.ReadFile(tmpCopiedFile.Name()) + Expect(err).ToNot(HaveOccurred()) + Expect(string(copiedBytes)).To(Equal(expectedString)) + + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "delete", s3Filename+"_copy") + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "delete", s3Filename) Expect(err).ToNot(HaveOccurred()) Expect(s3CLISession.ExitCode()).To(BeZero()) @@ -97,6 +131,106 @@ func AssertLifecycleWorks(s3CLIPath string, cfg *config.S3Cli) { s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "exists", s3Filename) Expect(err).ToNot(HaveOccurred()) Expect(s3CLISession.ExitCode()).To(Equal(3)) + + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "properties", s3Filename) + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + Expect(s3CLISession.Out.Contents()).To(ContainSubstring("{}")) + +} + +func AssertOnBulkOperations(s3CLIPath string, cfg *config.S3Cli) { + storageType := "s3" + numFiles := 5 + s3FilenamePrefix := GenerateRandomString() + localFile := MakeContentFile(GenerateRandomString()) + defer os.Remove(localFile) //nolint:errcheck + + configPath := MakeConfigFile(cfg) + defer os.Remove(configPath) //nolint:errcheck + + for i := 0; i < numFiles; i++ { + suffix := strings.Repeat("A", i) + s3Filename := fmt.Sprintf("%s%s", s3FilenamePrefix, suffix) + + s3CLISession, err := RunS3CLI(s3CLIPath, configPath, storageType, "put", localFile, s3Filename) + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + } + + s3CLISession, err := RunS3CLI(s3CLIPath, configPath, storageType, "list", s3FilenamePrefix) + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + output := strings.TrimSpace(string(s3CLISession.Out.Contents())) + Expect(strings.Split(output, "\n")).To(HaveLen(numFiles)) + + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "delete-recursive", fmt.Sprintf("%sAAA", s3FilenamePrefix)) + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "list", s3FilenamePrefix) + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + output = strings.TrimSpace(string(s3CLISession.Out.Contents())) + Expect(strings.Split(output, "\n")).To(HaveLen(3)) + + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "delete-recursive") + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, storageType, "list", s3FilenamePrefix) + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + output = strings.TrimSpace(string(s3CLISession.Out.Contents())) + Expect(output).To(BeEmpty()) +} + +func AssertOnStorageExists(s3CLIPath string, cfg *config.S3Cli) { + cfgCopy := *cfg + cfgCopy.BucketName = fmt.Sprintf("%s-%s", cfg.BucketName, strings.ToLower(GenerateRandomString(4))) + + configPath := MakeConfigFile(&cfgCopy) + defer os.Remove(configPath) //nolint:errcheck + + // Create a single verification/cleanup client from the config file. + // This ensures it has the exact same settings as the CLI will use. + verificationCfgFile, err := os.Open(configPath) + Expect(err).ToNot(HaveOccurred()) + defer verificationCfgFile.Close() //nolint:errcheck + + verificationCfg, err := config.NewFromReader(verificationCfgFile) + Expect(err).ToNot(HaveOccurred()) + + verificationClient, err := client.NewAwsS3Client(&verificationCfg) + Expect(err).ToNot(HaveOccurred()) + + // Defer the cleanup to ensure the bucket is deleted after the test. + defer func() { + _, err := verificationClient.DeleteBucket(context.TODO(), &s3.DeleteBucketInput{ + Bucket: aws.String(cfgCopy.BucketName), + }) + // A NoSuchBucket error is acceptable if the bucket was never created or already cleaned up. + var noSuchBucketErr *types.NoSuchBucket + if err != nil && !errors.As(err, &noSuchBucketErr) { + Expect(err).ToNot(HaveOccurred(), "Failed to clean up test bucket") + } + }() + + // --- Scenario 1: Bucket does not exist, should be created --- + s3CLISession, err := RunS3CLI(s3CLIPath, configPath, "s3", "ensure-storage-exists") + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) + + // Verify the bucket now exists using the client created earlier. + _, headBucketErr := verificationClient.HeadBucket(context.TODO(), &s3.HeadBucketInput{ + Bucket: aws.String(cfgCopy.BucketName), + }) + Expect(headBucketErr).ToNot(HaveOccurred(), "Bucket should have been created by 'ensure-storage-exists'") + + // --- Scenario 2: Bucket already exists, command should still succeed (idempotency) --- + s3CLISession, err = RunS3CLI(s3CLIPath, configPath, "s3", "ensure-storage-exists") + Expect(err).ToNot(HaveOccurred()) + Expect(s3CLISession.ExitCode()).To(BeZero()) } func AssertOnPutFailures(cfg *config.S3Cli, content, errorMessage string) { diff --git a/s3/integration/aws_iam_role_test.go b/s3/integration/aws_iam_role_test.go index e11251c..9e711ce 100644 --- a/s3/integration/aws_iam_role_test.go +++ b/s3/integration/aws_iam_role_test.go @@ -46,6 +46,14 @@ var _ = Describe("Testing inside an AWS compute resource with an IAM role", func func(cfg *config.S3Cli) { integration.AssertLifecycleWorks(s3CLIPath, cfg) }, configurations, ) + DescribeTable("Invoking `ensure-storage-exists` works", + func(cfg *config.S3Cli) { integration.AssertOnStorageExists(s3CLIPath, cfg) }, + configurations, + ) + DescribeTable("Blobstore bulk operations work", + func(cfg *config.S3Cli) { integration.AssertOnBulkOperations(s3CLIPath, cfg) }, + configurations, + ) DescribeTable("Invoking `s3cli get` on a non-existent-key fails", func(cfg *config.S3Cli) { integration.AssertGetNonexistentFails(s3CLIPath, cfg) }, configurations, diff --git a/s3/integration/aws_us_east_test.go b/s3/integration/aws_us_east_test.go index db81450..dc27777 100644 --- a/s3/integration/aws_us_east_test.go +++ b/s3/integration/aws_us_east_test.go @@ -46,6 +46,14 @@ var _ = Describe("Testing only in us-east-1", func() { func(cfg *config.S3Cli) { integration.AssertLifecycleWorks(s3CLIPath, cfg) }, configurations, ) + DescribeTable("Invoking `ensure-storage-exists` works", + func(cfg *config.S3Cli) { integration.AssertOnStorageExists(s3CLIPath, cfg) }, + configurations, + ) + DescribeTable("Blobstore bulk operations work", + func(cfg *config.S3Cli) { integration.AssertOnBulkOperations(s3CLIPath, cfg) }, + configurations, + ) DescribeTable("Invoking `s3cli get` on a non-existent-key fails", func(cfg *config.S3Cli) { integration.AssertGetNonexistentFails(s3CLIPath, cfg) }, configurations, diff --git a/s3/integration/general_aws_test.go b/s3/integration/general_aws_test.go index f114375..954f5f2 100644 --- a/s3/integration/general_aws_test.go +++ b/s3/integration/general_aws_test.go @@ -58,6 +58,15 @@ var _ = Describe("General testing for all AWS regions", func() { func(cfg *config.S3Cli) { integration.AssertLifecycleWorks(s3CLIPath, cfg) }, configurations, ) + + DescribeTable("Invoking `ensure-storage-exists` works", + func(cfg *config.S3Cli) { integration.AssertOnStorageExists(s3CLIPath, cfg) }, + configurations, + ) + DescribeTable("Blobstore bulk operations work", + func(cfg *config.S3Cli) { integration.AssertOnBulkOperations(s3CLIPath, cfg) }, + configurations, + ) DescribeTable("Invoking `s3cli get` on a non-existent-key fails", func(cfg *config.S3Cli) { integration.AssertGetNonexistentFails(s3CLIPath, cfg) }, configurations, diff --git a/s3/integration/s3_compatible_test.go b/s3/integration/s3_compatible_test.go index 965b41c..d63c64a 100644 --- a/s3/integration/s3_compatible_test.go +++ b/s3/integration/s3_compatible_test.go @@ -65,6 +65,14 @@ var _ = Describe("Testing in any non-AWS, S3 compatible storage service", func() func(cfg *config.S3Cli) { integration.AssertLifecycleWorks(s3CLIPath, cfg) }, configurations, ) + DescribeTable("Invoking `ensure-storage-exists` works", + func(cfg *config.S3Cli) { integration.AssertOnStorageExists(s3CLIPath, cfg) }, + configurations, + ) + DescribeTable("Blobstore bulk operations work", + func(cfg *config.S3Cli) { integration.AssertOnBulkOperations(s3CLIPath, cfg) }, + configurations, + ) DescribeTable("Invoking `s3cli get` on a non-existent-key fails", func(cfg *config.S3Cli) { integration.AssertGetNonexistentFails(s3CLIPath, cfg) }, configurations,