Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 192 additions & 0 deletions workflow/artifacts/gcs/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
package gcs

import (
"cloud.google.com/go/storage"
"context"
"github.com/argoproj/pkg/s3"
"github.com/cyrusbiotechnology/argo/util"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
"io"
"os"
"path"
"path/filepath"
"strings"
log "github.com/sirupsen/logrus"
)

// This is modeled heavily on https://github.com/argoproj/pkg/blob/master/s3/s3.go and
// Should probably be moved into argoproj/pkg at some point

// S3Client is a totally reasonable interface, using an alias so the typenames make sense
type GCSClient s3.S3Client

type GCSClientOpts struct {
CredsJSONData []byte
}

type gcsClient struct {
GCSClientOpts
context context.Context
client *storage.Client
}


type uploadTask struct {
key string
path string
}


func NewGCSClient(opts GCSClientOpts) (client GCSClient, err error) {
gcs := gcsClient{
GCSClientOpts: opts,
}

gcs.context = context.Background()

gcs.client, err = storage.NewClient(gcs.context, option.WithCredentialsJSON(gcs.CredsJSONData))
if err != nil {
return
}

client = &gcs
return
}

//plagiarized from github.com/argoproj/pkg/s3
func generatePutTasks(keyPrefix, rootPath string) chan uploadTask {
rootPath = filepath.Clean(rootPath) + "/"
uploadTasks := make(chan uploadTask)
visit := func(localPath string, fi os.FileInfo, err error) error {
relPath := strings.TrimPrefix(localPath, rootPath)
if fi.IsDir() {
return nil
}
if fi.Mode()&os.ModeSymlink != 0 {
return nil
}
t := uploadTask{
key: path.Join(keyPrefix, relPath),
path: localPath,
}
uploadTasks <- t
return nil
}
go func() {
_ = filepath.Walk(rootPath, visit)
close(uploadTasks)
}()
return uploadTasks
}

func (g *gcsClient) PutFile(bucket, key, path string) error {
inputFile, err := os.Open(path)
if err != nil {
return err
}

defer util.Close(inputFile)

bucketHandle := g.client.Bucket(bucket)
object := bucketHandle.Object(key)

w := object.NewWriter(g.context)
_, err = io.Copy(w, inputFile)
if err != nil {
return err
}

err = w.Close()
if err != nil {
return err
}
return nil
}

func (g *gcsClient) PutDirectory(bucket, key, path string) error {
for putTask := range generatePutTasks(key, path) {
err := g.PutFile(bucket, putTask.key, putTask.path)
if err != nil {
return err
}
}
return nil
}

func (g *gcsClient) GetFile(bucket, key, path string) error {
log.Infof("Getting from GCS (bucket: %s, key: %s) to %s", bucket, key, path)

// Extract top level directory.
objectDir, _ := filepath.Split(path)
if objectDir != "" {
// Create any missing top level directories.
if err := os.MkdirAll(objectDir, 0700); err != nil {
return err
}
}

outputFile, err := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0600)
if err != nil {
return err
}

bucketHandle := g.client.Bucket(bucket)
object := bucketHandle.Object(key)

r, err := object.NewReader(g.context)
if err != nil {
return err
}
defer util.Close(r)

_, err = io.Copy(outputFile, r)
if err != nil {
return err
}

err = outputFile.Close()
if err != nil {
return err
}
return nil
}

func (g *gcsClient) GetDirectory(bucket, keyPrefix, path string) error {
log.Infof("Getting directory from gcs (bucket: %s, key: %s) to %s", bucket, keyPrefix, path)
bucketHandle := g.client.Bucket(bucket)
it := bucketHandle.Objects(g.context, &storage.Query{Prefix: keyPrefix})
for {
objAttrs, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return err
}
localPath := filepath.Join(path, objAttrs.Name)
err = g.GetFile(bucket, objAttrs.Name, localPath)
if err != nil {
return err
}
}
return nil


}

func (g *gcsClient) IsDirectory(bucket, key string) (bool, error) {
bucketHandle := g.client.Bucket(bucket)
// If the item in the query result has a name that matches the key, this is a file not a directory
it := bucketHandle.Objects(g.context, &storage.Query{Prefix: key})
objectAttrs, err := it.Next()
if err != nil {
return false, err
}
if objectAttrs.Name == key {
return false, nil
}
log.Infof("%s != %s", objectAttrs.Name, key)
return true, nil

}
171 changes: 69 additions & 102 deletions workflow/artifacts/gcs/gcs.go
Original file line number Diff line number Diff line change
@@ -1,130 +1,97 @@
package gcs

import (
"cloud.google.com/go/storage"
"context"
"errors"
"github.com/argoproj/pkg/file"
argoErrors "github.com/cyrusbiotechnology/argo/errors"
wfv1 "github.com/cyrusbiotechnology/argo/pkg/apis/workflow/v1alpha1"
"github.com/cyrusbiotechnology/argo/util"
log "github.com/sirupsen/logrus"
"google.golang.org/api/option"
"io"
"os"
"k8s.io/apimachinery/pkg/util/wait"
"time"
)

type GCSArtifactDriver struct {
Context context.Context
CredsJSONData []byte
}

func (gcsDriver *GCSArtifactDriver) newGcsClient() (client *storage.Client, err error) {
gcsDriver.Context = context.Background()
func (gcsDriver *GCSArtifactDriver) newGcsClient() (client GCSClient, err error) {

client, err = NewGCSClient(GCSClientOpts{CredsJSONData: gcsDriver.CredsJSONData})

client, err = storage.NewClient(gcsDriver.Context, option.WithCredentialsJSON(gcsDriver.CredsJSONData))
if err != nil {
return nil, argoErrors.InternalWrapError(err)
}
return

}

func (gcsDriver *GCSArtifactDriver) saveToFile(inputArtifact *wfv1.Artifact, filePath string) error {

log.Infof("Loading from GCS (gs://%s/%s) to %s",
inputArtifact.GCS.Bucket, inputArtifact.GCS.Key, filePath)

stat, err := os.Stat(filePath)
if err != nil && !os.IsNotExist(err) {
return err
}

if stat != nil && stat.IsDir() {
return errors.New("output artifact path is a directory")
}

outputFile, err := os.OpenFile(filePath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0600)
if err != nil {
return err
}

gcsClient, err := gcsDriver.newGcsClient()
if err != nil {
return err
}

bucket := gcsClient.Bucket(inputArtifact.GCS.Bucket)
object := bucket.Object(inputArtifact.GCS.Key)

r, err := object.NewReader(gcsDriver.Context)
if err != nil {
return err
}
defer util.Close(r)

_, err = io.Copy(outputFile, r)
if err != nil {
return err
}

err = outputFile.Close()
if err != nil {
return err
}
return nil
}

func (gcsDriver *GCSArtifactDriver) saveToGCS(outputArtifact *wfv1.Artifact, filePath string) error {

log.Infof("Saving to GCS (gs://%s/%s)",
outputArtifact.GCS.Bucket, outputArtifact.GCS.Key)

gcsClient, err := gcsDriver.newGcsClient()
if err != nil {
return err
}

inputFile, err := os.Open(filePath)
if err != nil {
return err
}

stat, err := os.Stat(filePath)
if err != nil {
return err
}

if stat.IsDir() {
return errors.New("only single files can be saved to GCS, not entire directories")
}

defer util.Close(inputFile)

bucket := gcsClient.Bucket(outputArtifact.GCS.Bucket)
object := bucket.Object(outputArtifact.GCS.Key)

w := object.NewWriter(gcsDriver.Context)
_, err = io.Copy(w, inputFile)
if err != nil {
return err
}

err = w.Close()
if err != nil {
return err
}
return nil

}

func (gcsDriver *GCSArtifactDriver) Load(inputArtifact *wfv1.Artifact, path string) error {

err := gcsDriver.saveToFile(inputArtifact, path)
bucketName := inputArtifact.GCS.Bucket
key := inputArtifact.GCS.Key

err := wait.ExponentialBackoff(wait.Backoff{Duration: time.Second * 2, Factor: 2.0, Steps: 5, Jitter: 0.1},
func() (bool, error) {
log.Infof("GCS Load path: %s, key: %s", path, key)
gcsClient, err := gcsDriver.newGcsClient()
if err != nil {
log.Warnf("Failed to create new GCS client: %v", err)
return false, nil
}

isDir, err := gcsClient.IsDirectory(bucketName, key)
if err != nil {
log.Warnf("Failed to test if %s is a directory: %v", bucketName, err)
return false, nil
}

if isDir {
if err = gcsClient.GetDirectory(bucketName, key, path)
err != nil {
log.Warnf("Failed get directory: %v", err)
return false, nil
}
} else {
err := gcsClient.GetFile(bucketName, key, path)
if err != nil {
return false, nil
}
}

return true, nil
})

return err
}

func (gcsDriver *GCSArtifactDriver) Save(path string, outputArtifact *wfv1.Artifact) error {

err := gcsDriver.saveToGCS(outputArtifact, path)
bucketName := outputArtifact.GCS.Bucket
key := outputArtifact.GCS.Key

err := wait.ExponentialBackoff(wait.Backoff{Duration: time.Second * 2, Factor: 2.0, Steps: 5, Jitter: 0.1},
func() (bool, error) {
log.Infof("S3 Save path: %s, key: %s", path, key)
gcsClient, err := gcsDriver.newGcsClient()
if err != nil {
log.Warnf("Failed to create new S3 client: %v", err)
return false, nil
}
isDir, err := file.IsDirectory(path)
if err != nil {
log.Warnf("Failed to test if %s is a directory: %v", path, err)
return false, nil
}
if isDir {
if err = gcsClient.PutDirectory(bucketName, key, path); err != nil {
log.Warnf("Failed to put directory: %v", err)
return false, nil
}
} else {
if err = gcsClient.PutFile(bucketName, key, path); err != nil {
log.Warnf("Failed to put file: %v", err)
return false, nil
}
}
return true, nil
})
return err
}
Loading