Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions .github/scripts/s3/assets/cloudformation-s3cli-iam.template.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,20 @@
"Effect": "Allow",
"Resource": "arn:aws:logs:*:*:*"
},
{
"Action": ["s3:CreateBucket", "s3:HeadBucket", "s3:DeleteBucket"],
"Effect": "Allow",
"Resource": {
"Fn::Join": [
"",
[
"arn:aws:s3:::",
{ "Ref": "S3Bucket" },
"*"
]
]
}
},
{
"Action": [
"s3:GetObject*",
Expand Down Expand Up @@ -67,6 +81,26 @@
"/*"
]
]
},
{
"Fn::Join": [
"",
[
"arn:aws:s3:::",
{ "Ref": "S3Bucket" },
"*"
]
]
},
{
"Fn::Join": [
"",
[
"arn:aws:s3:::",
{ "Ref": "S3Bucket" },
"*/*"
]
]
}
]
}
Expand Down
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ Key points
- additional endpoints needed by CAPI still missing
- [Gcs](./gcs/README.md)
- [S3](./s3/README.md)
- additional endpoints needed by CAPI still missing


## Build
Expand Down
184 changes: 184 additions & 0 deletions s3/client/aws_s3_blobstore.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package client

import (
"encoding/json"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -228,3 +229,186 @@ func (b *awsS3Client) putSigned(objectID string, expiration time.Duration) (stri

return req.URL, nil
}

func (b *awsS3Client) EnsureStorageExists() error {
slog.Info("Ensuring bucket exists", "bucket", b.s3cliConfig.BucketName)
_, err := b.s3Client.HeadBucket(context.TODO(), &s3.HeadBucketInput{
Bucket: aws.String(b.s3cliConfig.BucketName),
})

if err == nil {
slog.Info("Bucket exists", "bucket", b.s3cliConfig.BucketName)
return nil
}

var apiErr smithy.APIError
if !errors.As(err, &apiErr) || apiErr.ErrorCode() != "NotFound" {
return fmt.Errorf("failed to check if bucket exists: %w", err)
}

slog.Info("Bucket does not exist, creating it", "bucket", b.s3cliConfig.BucketName)
createBucketInput := &s3.CreateBucketInput{
Bucket: aws.String(b.s3cliConfig.BucketName),
}

// For GCS and AWS region 'us-east-1', LocationConstraint must be omitted
if !b.s3cliConfig.IsGoogle() && b.s3cliConfig.Region != "us-east-1" {
createBucketInput.CreateBucketConfiguration = &types.CreateBucketConfiguration{
LocationConstraint: types.BucketLocationConstraint(b.s3cliConfig.Region),
}
}

_, err = b.s3Client.CreateBucket(context.TODO(), createBucketInput)
if err != nil {
var alreadyOwned *types.BucketAlreadyOwnedByYou
var alreadyExists *types.BucketAlreadyExists
if errors.As(err, &alreadyOwned) || errors.As(err, &alreadyExists) {
slog.Warn("Bucket got created by another process", "bucket", b.s3cliConfig.BucketName)
return nil
}
return fmt.Errorf("failed to create bucket: %w", err)
}

slog.Info("Bucket created successfully", "bucket", b.s3cliConfig.BucketName)
return nil
}

func (b *awsS3Client) Copy(srcBlob string, dstBlob string) error {
slog.Info("Copying object within s3 bucket", "bucket", b.s3cliConfig.BucketName, "source_blob", srcBlob, "destination_blob", dstBlob)

copySource := fmt.Sprintf("%s/%s", b.s3cliConfig.BucketName, *b.key(srcBlob))

_, err := b.s3Client.CopyObject(context.TODO(), &s3.CopyObjectInput{
Bucket: aws.String(b.s3cliConfig.BucketName),
CopySource: aws.String(copySource),
Key: b.key(dstBlob),
})
if err != nil {
return fmt.Errorf("failed to copy object: %w", err)
}

waiter := s3.NewObjectExistsWaiter(b.s3Client)
err = waiter.Wait(context.TODO(), &s3.HeadObjectInput{
Bucket: aws.String(b.s3cliConfig.BucketName),
Key: b.key(dstBlob),
}, 15*time.Minute)

if err != nil {
return fmt.Errorf("failed waiting for object to exist after copy: %w", err)
}

return nil
}

type BlobProperties struct {
ETag string `json:"etag,omitempty"`
LastModified time.Time `json:"last_modified,omitempty"`
ContentLength int64 `json:"content_length,omitempty"`
}

func (b *awsS3Client) Properties(dest string) error {
slog.Info("Fetching blob properties", "bucket", b.s3cliConfig.BucketName, "blob", dest)

headObjectOutput, err := b.s3Client.HeadObject(context.TODO(), &s3.HeadObjectInput{
Bucket: aws.String(b.s3cliConfig.BucketName),
Key: b.key(dest),
})

if err != nil {
var apiErr smithy.APIError
if errors.As(err, &apiErr) && apiErr.ErrorCode() == "NotFound" {
fmt.Println(`{}`)
return nil
}
return fmt.Errorf("failed to fetch blob properties: %w", err)
}

properties := BlobProperties{}
if headObjectOutput.ETag != nil {
properties.ETag = strings.Trim(*headObjectOutput.ETag, `"`)
}
if headObjectOutput.LastModified != nil {
properties.LastModified = *headObjectOutput.LastModified
}
if headObjectOutput.ContentLength != nil {
properties.ContentLength = *headObjectOutput.ContentLength
}

output, err := json.MarshalIndent(properties, "", " ")
if err != nil {
return fmt.Errorf("failed to marshal blob properties: %w", err)
}

fmt.Println(string(output))

return nil
}

func (b *awsS3Client) List(prefix string) ([]string, error) {
input := &s3.ListObjectsV2Input{
Bucket: aws.String(b.s3cliConfig.BucketName),
}

if prefix != "" {
slog.Info("Listing all objects in bucket with prefix", "bucket", b.s3cliConfig.BucketName, "prefix", prefix)
input.Prefix = b.key(prefix)
} else {
slog.Info("Listing all objects in bucket", "bucket", b.s3cliConfig.BucketName)
}

var names []string
objectPaginator := s3.NewListObjectsV2Paginator(b.s3Client, input)
for objectPaginator.HasMorePages() {
page, err := objectPaginator.NextPage(context.TODO())
if err != nil {
return nil, fmt.Errorf("failed to list objects: %w", err)
}

if len(page.Contents) == 0 {
continue
}

for _, obj := range page.Contents {
names = append(names, *obj.Key)
}
}

return names, nil
}

func (b *awsS3Client) DeleteRecursive(prefix string) error {
input := &s3.ListObjectsV2Input{
Bucket: aws.String(b.s3cliConfig.BucketName),
}

if prefix != "" {
slog.Info("Deleting all objects in bucket with given prefix", "bucket", b.s3cliConfig.BucketName, "prefix", prefix)
input.Prefix = b.key(prefix)
} else {
slog.Info("Deleting all objects in bucket", "bucket", b.s3cliConfig.BucketName)
}

objectPaginator := s3.NewListObjectsV2Paginator(b.s3Client, input)
for objectPaginator.HasMorePages() {
page, err := objectPaginator.NextPage(context.TODO())
if err != nil {
return fmt.Errorf("failed to list objects for deletion: %w", err)
}

for _, obj := range page.Contents {
slog.Debug("Deleting object", "key", *obj.Key)
_, err := b.s3Client.DeleteObject(context.TODO(), &s3.DeleteObjectInput{
Bucket: aws.String(b.s3cliConfig.BucketName),
Key: obj.Key,
})
if err != nil {
var apiErr smithy.APIError
if errors.As(err, &apiErr) && (apiErr.ErrorCode() == "NotFound" || apiErr.ErrorCode() == "NoSuchKey") {
continue // Object already deleted, which is fine
}
return fmt.Errorf("failed to delete object '%s': %w", *obj.Key, err)
}
}
}
return nil
}
12 changes: 5 additions & 7 deletions s3/client/client.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package client

import (
"errors"
"os"
"time"

Expand Down Expand Up @@ -65,25 +64,24 @@ func (c *S3CompatibleClient) Sign(objectID string, action string, expiration tim
}

func (c *S3CompatibleClient) EnsureStorageExists() error {
return errors.New("not implemented")

return c.awsS3BlobstoreClient.EnsureStorageExists()
}

func (c *S3CompatibleClient) Copy(srcBlob string, dstBlob string) error {
return errors.New("not implemented")
return c.awsS3BlobstoreClient.Copy(srcBlob, dstBlob)

}

func (c *S3CompatibleClient) Properties(dest string) error {
return errors.New("not implemented")
return c.awsS3BlobstoreClient.Properties(dest)

}

func (c *S3CompatibleClient) List(prefix string) ([]string, error) {
return nil, errors.New("not implemented")
return c.awsS3BlobstoreClient.List(prefix)

}

func (c *S3CompatibleClient) DeleteRecursive(prefix string) error {
return errors.New("not implemented")
return c.awsS3BlobstoreClient.DeleteRecursive(prefix)
}
Loading
Loading