From b5bef63730f9b8f01092a2588c4505196b0c80d0 Mon Sep 17 00:00:00 2001 From: Sam DeLuca Date: Fri, 9 Aug 2019 13:05:06 -0400 Subject: [PATCH 01/10] rewriting gcs client to handle directories --- workflow/artifacts/gcs/client.go | 181 +++++++++++++++++++++++++++++++ workflow/artifacts/gcs/gcs.go | 103 ++++++++++++++---- 2 files changed, 260 insertions(+), 24 deletions(-) create mode 100644 workflow/artifacts/gcs/client.go diff --git a/workflow/artifacts/gcs/client.go b/workflow/artifacts/gcs/client.go new file mode 100644 index 000000000000..1fc5474aece8 --- /dev/null +++ b/workflow/artifacts/gcs/client.go @@ -0,0 +1,181 @@ +package gcs + +import ( + "cloud.google.com/go/storage" + "context" + "github.com/argoproj/pkg/s3" + "github.com/cyrusbiotechnology/argo/util" + "google.golang.org/api/iterator" + "google.golang.org/api/option" + "io" + "os" + "path" + "path/filepath" + "strings" + log "github.com/sirupsen/logrus" +) + +// This is modeled heavily on https://github.com/argoproj/pkg/blob/master/s3/s3.go and +// Should probably be moved into argoproj/pkg at some point + +// S3Client is a totally reasonable interface, using an alias so the typenames make sense +type GCSClient s3.S3Client + +type GCSClientOpts struct { + CredsJSONData []byte +} + +type gcsClient struct { + GCSClientOpts + context context.Context + client *storage.Client +} + + +type uploadTask struct { + key string + path string +} + + +func NewGCSClient(opts GCSClientOpts) (client GCSClient, err error) { + gcs := gcsClient{ + GCSClientOpts: opts, + } + + gcs.context = context.Background() + + gcs.client, err = storage.NewClient(gcs.context, option.WithCredentialsJSON(gcs.CredsJSONData)) + if err != nil { + return + } + return +} + + +//plagiarized from github.com/argoproj/pkg/s3 +func generatePutTasks(keyPrefix, rootPath string) chan uploadTask { + rootPath = filepath.Clean(rootPath) + "/" + uploadTasks := make(chan uploadTask) + visit := func(localPath string, fi os.FileInfo, err error) error { + relPath := strings.TrimPrefix(localPath, rootPath) + if fi.IsDir() { + return nil + } + if fi.Mode()&os.ModeSymlink != 0 { + return nil + } + t := uploadTask{ + key: path.Join(keyPrefix, relPath), + path: localPath, + } + uploadTasks <- t + return nil + } + go func() { + _ = filepath.Walk(rootPath, visit) + close(uploadTasks) + }() + return uploadTasks +} + +func (g *gcsClient) PutFile(bucket, key, path string) error { + inputFile, err := os.Open(path) + if err != nil { + return err + } + + defer util.Close(inputFile) + + bucketHandle := g.client.Bucket(bucket) + object := bucketHandle.Object(key) + + w := object.NewWriter(g.Context) + _, err = io.Copy(w, inputFile) + if err != nil { + return err + } + + err = w.Close() + if err != nil { + return err + } + return nil +} + +func (g *gcsClient) PutDirectory(bucket, key, path string) error { + for putTask := range generatePutTasks(key, path) { + err := g.PutFile(bucket, putTask.key, putTask.path) + if err != nil { + return err + } + } + return nil +} + +func (g *gcsClient) GetFile(bucket, key, path string) error { + log.Infof("Getting from GCS (bucket: %s, key: %s) to %s", bucket, key, path) + outputFile, err := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0600) + if err != nil { + return err + } + + bucketHandle := g.client.Bucket(bucket) + object := bucketHandle.Object(key) + + r, err := object.NewReader(g.Context) + if err != nil { + return err + } + defer util.Close(r) + + _, err = io.Copy(outputFile, r) + if err != nil { + return err + } + + err = outputFile.Close() + if err != nil { + return err + } + return nil +} + +func (g *gcsClient) GetDirectory(bucket, keyPrefix, path string) error { + log.Infof("Getting directory from gcs (bucket: %s, key: %s) to %s", bucket, keyPrefix, path) + bucketHandle := g.client.Bucket(bucket) + it := bucketHandle.Objects(g.Context, &storage.Query{Prefix: keyPrefix}) + for { + objAttrs, err := it.Next() + if err == iterator.Done { + break + } + if err != nil { + return err + } + localPath := filepath.Join(path, objAttrs.Name) + err = g.GetFile(bucket, objAttrs.Name, localPath) + if err != nil { + return err + } + } + return nil + + +} + +func (g *gcsClient) IsDirectory(bucket, key string) (bool, error) { + bucketHandle := g.client.Bucket(bucket) + // If the item in the query result has a name that matches the key, this is a file not a directory + it := bucketHandle.Objects(g.Context, &storage.Query{Prefix: key}) + objectAttrs, err := it.Next() + if err != nil { + return false, err + } + if objectAttrs.Name == key { + return false, nil + } + return true, nil + +} + diff --git a/workflow/artifacts/gcs/gcs.go b/workflow/artifacts/gcs/gcs.go index d0404e70ddef..61f09bbb9d6c 100644 --- a/workflow/artifacts/gcs/gcs.go +++ b/workflow/artifacts/gcs/gcs.go @@ -4,24 +4,29 @@ import ( "cloud.google.com/go/storage" "context" "errors" + "github.com/argoproj/pkg/file" argoErrors "github.com/cyrusbiotechnology/argo/errors" wfv1 "github.com/cyrusbiotechnology/argo/pkg/apis/workflow/v1alpha1" "github.com/cyrusbiotechnology/argo/util" log "github.com/sirupsen/logrus" "google.golang.org/api/option" "io" + "k8s.io/apimachinery/pkg/util/wait" "os" + "path" + "path/filepath" + "strings" + "time" ) type GCSArtifactDriver struct { - Context context.Context CredsJSONData []byte } -func (gcsDriver *GCSArtifactDriver) newGcsClient() (client *storage.Client, err error) { - gcsDriver.Context = context.Background() +func (gcsDriver *GCSArtifactDriver) newGcsClient() (client GCSClient, err error) { + + client, err = NewGCSClient(GCSClientOpts{CredsJSONData: gcsDriver.CredsJSONData}) - client, err = storage.NewClient(gcsDriver.Context, option.WithCredentialsJSON(gcsDriver.CredsJSONData)) if err != nil { return nil, argoErrors.InternalWrapError(err) } @@ -34,20 +39,6 @@ func (gcsDriver *GCSArtifactDriver) saveToFile(inputArtifact *wfv1.Artifact, fil log.Infof("Loading from GCS (gs://%s/%s) to %s", inputArtifact.GCS.Bucket, inputArtifact.GCS.Key, filePath) - stat, err := os.Stat(filePath) - if err != nil && !os.IsNotExist(err) { - return err - } - - if stat != nil && stat.IsDir() { - return errors.New("output artifact path is a directory") - } - - outputFile, err := os.OpenFile(filePath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0600) - if err != nil { - return err - } - gcsClient, err := gcsDriver.newGcsClient() if err != nil { return err @@ -74,6 +65,7 @@ func (gcsDriver *GCSArtifactDriver) saveToFile(inputArtifact *wfv1.Artifact, fil return nil } + func (gcsDriver *GCSArtifactDriver) saveToGCS(outputArtifact *wfv1.Artifact, filePath string) error { log.Infof("Saving to GCS (gs://%s/%s)", @@ -94,9 +86,12 @@ func (gcsDriver *GCSArtifactDriver) saveToGCS(outputArtifact *wfv1.Artifact, fil return err } - if stat.IsDir() { - return errors.New("only single files can be saved to GCS, not entire directories") - } + //if stat.IsDir() { + // for putTask := range generatePutTasks(outputArtifact.GCS.Bucket, outputArtifact.GCS.Key, filePath) { + // err := + // } + // return errors.New("only single files can be saved to GCS, not entire directories") + //} defer util.Close(inputFile) @@ -119,12 +114,72 @@ func (gcsDriver *GCSArtifactDriver) saveToGCS(outputArtifact *wfv1.Artifact, fil func (gcsDriver *GCSArtifactDriver) Load(inputArtifact *wfv1.Artifact, path string) error { - err := gcsDriver.saveToFile(inputArtifact, path) + bucketName := inputArtifact.GCS.Bucket + key := inputArtifact.GCS.Key + + err := wait.ExponentialBackoff(wait.Backoff{Duration: time.Second * 2, Factor: 2.0, Steps: 5, Jitter: 0.1}, + func() (bool, error) { + log.Infof("GCS Load path: %s, key: %s", path, key) + gcsClient, err := gcsDriver.newGcsClient() + if err != nil { + log.Warnf("Failed to create new GCS client: %v", err) + return false, nil + } + + isDir, err := gcsClient.IsDirectory(bucketName, key) + if err != nil { + log.Warnf("Failed to test if %s is a directory: %v", bucketName, err) + return false, nil + } + + if isDir { + if err = gcsClient.GetDirectory(bucketName, key, path) + err != nil { + log.Warnf("Failed get directory: %v", err) + return false, nil + } + } else { + err := gcsClient.GetFile(bucketName, key, path) + if err != nil { + return false, nil + } + } + + return true, nil + }) + return err } func (gcsDriver *GCSArtifactDriver) Save(path string, outputArtifact *wfv1.Artifact) error { - - err := gcsDriver.saveToGCS(outputArtifact, path) + bucketName := outputArtifact.GCS.Bucket + key := outputArtifact.GCS.Key + + err := wait.ExponentialBackoff(wait.Backoff{Duration: time.Second * 2, Factor: 2.0, Steps: 5, Jitter: 0.1}, + func() (bool, error) { + log.Infof("S3 Save path: %s, key: %s", path, key) + gcsClient, err := gcsDriver.newGcsClient() + if err != nil { + log.Warnf("Failed to create new S3 client: %v", err) + return false, nil + } + isDir, err := file.IsDirectory(path) + if err != nil { + log.Warnf("Failed to test if %s is a directory: %v", path, err) + return false, nil + } + if isDir { + if err = gcsClient.PutDirectory(bucketName, key, path); err != nil { + log.Warnf("Failed to put directory: %v", err) + return false, nil + } + } else { + if err = gcsClient.PutFile(bucketName, key, path); err != nil { + log.Warnf("Failed to put file: %v", err) + return false, nil + } + } + return true, nil + }) return err } From 3179339ccc3aef7e3c20f82c5b23ba7dad769383 Mon Sep 17 00:00:00 2001 From: Sam DeLuca Date: Fri, 9 Aug 2019 13:14:30 -0400 Subject: [PATCH 02/10] fixing build issues --- workflow/artifacts/gcs/client.go | 8 ++++---- workflow/artifacts/gcs/gcs.go | 7 ------- 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/workflow/artifacts/gcs/client.go b/workflow/artifacts/gcs/client.go index 1fc5474aece8..057f78056bd4 100644 --- a/workflow/artifacts/gcs/client.go +++ b/workflow/artifacts/gcs/client.go @@ -90,7 +90,7 @@ func (g *gcsClient) PutFile(bucket, key, path string) error { bucketHandle := g.client.Bucket(bucket) object := bucketHandle.Object(key) - w := object.NewWriter(g.Context) + w := object.NewWriter(g.context) _, err = io.Copy(w, inputFile) if err != nil { return err @@ -123,7 +123,7 @@ func (g *gcsClient) GetFile(bucket, key, path string) error { bucketHandle := g.client.Bucket(bucket) object := bucketHandle.Object(key) - r, err := object.NewReader(g.Context) + r, err := object.NewReader(g.context) if err != nil { return err } @@ -144,7 +144,7 @@ func (g *gcsClient) GetFile(bucket, key, path string) error { func (g *gcsClient) GetDirectory(bucket, keyPrefix, path string) error { log.Infof("Getting directory from gcs (bucket: %s, key: %s) to %s", bucket, keyPrefix, path) bucketHandle := g.client.Bucket(bucket) - it := bucketHandle.Objects(g.Context, &storage.Query{Prefix: keyPrefix}) + it := bucketHandle.Objects(g.context, &storage.Query{Prefix: keyPrefix}) for { objAttrs, err := it.Next() if err == iterator.Done { @@ -167,7 +167,7 @@ func (g *gcsClient) GetDirectory(bucket, keyPrefix, path string) error { func (g *gcsClient) IsDirectory(bucket, key string) (bool, error) { bucketHandle := g.client.Bucket(bucket) // If the item in the query result has a name that matches the key, this is a file not a directory - it := bucketHandle.Objects(g.Context, &storage.Query{Prefix: key}) + it := bucketHandle.Objects(g.context, &storage.Query{Prefix: key}) objectAttrs, err := it.Next() if err != nil { return false, err diff --git a/workflow/artifacts/gcs/gcs.go b/workflow/artifacts/gcs/gcs.go index 61f09bbb9d6c..ae15734f8fe0 100644 --- a/workflow/artifacts/gcs/gcs.go +++ b/workflow/artifacts/gcs/gcs.go @@ -1,21 +1,14 @@ package gcs import ( - "cloud.google.com/go/storage" - "context" - "errors" "github.com/argoproj/pkg/file" argoErrors "github.com/cyrusbiotechnology/argo/errors" wfv1 "github.com/cyrusbiotechnology/argo/pkg/apis/workflow/v1alpha1" "github.com/cyrusbiotechnology/argo/util" log "github.com/sirupsen/logrus" - "google.golang.org/api/option" "io" "k8s.io/apimachinery/pkg/util/wait" "os" - "path" - "path/filepath" - "strings" "time" ) From a27a6077bafc42baba07967917dfd2cc3d1bcef4 Mon Sep 17 00:00:00 2001 From: Sam DeLuca Date: Fri, 9 Aug 2019 13:21:35 -0400 Subject: [PATCH 03/10] removing unused func --- workflow/artifacts/gcs/gcs.go | 81 ----------------------------------- 1 file changed, 81 deletions(-) diff --git a/workflow/artifacts/gcs/gcs.go b/workflow/artifacts/gcs/gcs.go index ae15734f8fe0..8c3628eb12ec 100644 --- a/workflow/artifacts/gcs/gcs.go +++ b/workflow/artifacts/gcs/gcs.go @@ -4,11 +4,8 @@ import ( "github.com/argoproj/pkg/file" argoErrors "github.com/cyrusbiotechnology/argo/errors" wfv1 "github.com/cyrusbiotechnology/argo/pkg/apis/workflow/v1alpha1" - "github.com/cyrusbiotechnology/argo/util" log "github.com/sirupsen/logrus" - "io" "k8s.io/apimachinery/pkg/util/wait" - "os" "time" ) @@ -27,84 +24,6 @@ func (gcsDriver *GCSArtifactDriver) newGcsClient() (client GCSClient, err error) } -func (gcsDriver *GCSArtifactDriver) saveToFile(inputArtifact *wfv1.Artifact, filePath string) error { - - log.Infof("Loading from GCS (gs://%s/%s) to %s", - inputArtifact.GCS.Bucket, inputArtifact.GCS.Key, filePath) - - gcsClient, err := gcsDriver.newGcsClient() - if err != nil { - return err - } - - bucket := gcsClient.Bucket(inputArtifact.GCS.Bucket) - object := bucket.Object(inputArtifact.GCS.Key) - - r, err := object.NewReader(gcsDriver.Context) - if err != nil { - return err - } - defer util.Close(r) - - _, err = io.Copy(outputFile, r) - if err != nil { - return err - } - - err = outputFile.Close() - if err != nil { - return err - } - return nil -} - - -func (gcsDriver *GCSArtifactDriver) saveToGCS(outputArtifact *wfv1.Artifact, filePath string) error { - - log.Infof("Saving to GCS (gs://%s/%s)", - outputArtifact.GCS.Bucket, outputArtifact.GCS.Key) - - gcsClient, err := gcsDriver.newGcsClient() - if err != nil { - return err - } - - inputFile, err := os.Open(filePath) - if err != nil { - return err - } - - stat, err := os.Stat(filePath) - if err != nil { - return err - } - - //if stat.IsDir() { - // for putTask := range generatePutTasks(outputArtifact.GCS.Bucket, outputArtifact.GCS.Key, filePath) { - // err := - // } - // return errors.New("only single files can be saved to GCS, not entire directories") - //} - - defer util.Close(inputFile) - - bucket := gcsClient.Bucket(outputArtifact.GCS.Bucket) - object := bucket.Object(outputArtifact.GCS.Key) - - w := object.NewWriter(gcsDriver.Context) - _, err = io.Copy(w, inputFile) - if err != nil { - return err - } - - err = w.Close() - if err != nil { - return err - } - return nil - -} - func (gcsDriver *GCSArtifactDriver) Load(inputArtifact *wfv1.Artifact, path string) error { bucketName := inputArtifact.GCS.Bucket From 0ac45e81df3880d2166d5dabcf65fd02f4c0c676 Mon Sep 17 00:00:00 2001 From: Sam DeLuca Date: Fri, 9 Aug 2019 14:29:27 -0400 Subject: [PATCH 04/10] fixing pointer issue --- workflow/artifacts/gcs/client.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/workflow/artifacts/gcs/client.go b/workflow/artifacts/gcs/client.go index 057f78056bd4..37f82a50d380 100644 --- a/workflow/artifacts/gcs/client.go +++ b/workflow/artifacts/gcs/client.go @@ -49,6 +49,8 @@ func NewGCSClient(opts GCSClientOpts) (client GCSClient, err error) { if err != nil { return } + + client = &gcs return } From 2508cee27f3150804024a0bf9e1e9c0844e49842 Mon Sep 17 00:00:00 2001 From: Sam DeLuca Date: Fri, 9 Aug 2019 16:34:40 -0400 Subject: [PATCH 05/10] logging --- workflow/artifacts/gcs/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/workflow/artifacts/gcs/client.go b/workflow/artifacts/gcs/client.go index 37f82a50d380..887a4bfe4548 100644 --- a/workflow/artifacts/gcs/client.go +++ b/workflow/artifacts/gcs/client.go @@ -54,7 +54,6 @@ func NewGCSClient(opts GCSClientOpts) (client GCSClient, err error) { return } - //plagiarized from github.com/argoproj/pkg/s3 func generatePutTasks(keyPrefix, rootPath string) chan uploadTask { rootPath = filepath.Clean(rootPath) + "/" @@ -177,6 +176,7 @@ func (g *gcsClient) IsDirectory(bucket, key string) (bool, error) { if objectAttrs.Name == key { return false, nil } + log.Infof("%s != %s", objectAttrs.Name, key) return true, nil } From e92b9756782ced9a9d530673e46bb619f9edd659 Mon Sep 17 00:00:00 2001 From: Sam DeLuca Date: Mon, 12 Aug 2019 09:46:48 -0400 Subject: [PATCH 06/10] create subdirectories --- workflow/artifacts/gcs/client.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/workflow/artifacts/gcs/client.go b/workflow/artifacts/gcs/client.go index 887a4bfe4548..0e40f95508ec 100644 --- a/workflow/artifacts/gcs/client.go +++ b/workflow/artifacts/gcs/client.go @@ -116,6 +116,16 @@ func (g *gcsClient) PutDirectory(bucket, key, path string) error { func (g *gcsClient) GetFile(bucket, key, path string) error { log.Infof("Getting from GCS (bucket: %s, key: %s) to %s", bucket, key, path) + + // Extract top level directory. + objectDir, _ := filepath.Split(path) + if objectDir != "" { + // Create any missing top level directories. + if err := os.MkdirAll(objectDir, 0700); err != nil { + return err + } + } + outputFile, err := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0600) if err != nil { return err @@ -181,3 +191,4 @@ func (g *gcsClient) IsDirectory(bucket, key string) (bool, error) { } +@tr \ No newline at end of file From 87c83612c84dd7635dd1750b5a393981aadd2b3c Mon Sep 17 00:00:00 2001 From: Sam DeLuca Date: Mon, 12 Aug 2019 09:52:38 -0400 Subject: [PATCH 07/10] typo --- workflow/artifacts/gcs/client.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/workflow/artifacts/gcs/client.go b/workflow/artifacts/gcs/client.go index 0e40f95508ec..dbb138c44d99 100644 --- a/workflow/artifacts/gcs/client.go +++ b/workflow/artifacts/gcs/client.go @@ -190,5 +190,3 @@ func (g *gcsClient) IsDirectory(bucket, key string) (bool, error) { return true, nil } - -@tr \ No newline at end of file From 80c92e370b53c445dcb8274804202746cc4e84e5 Mon Sep 17 00:00:00 2001 From: Sam DeLuca Date: Mon, 12 Aug 2019 11:00:38 -0400 Subject: [PATCH 08/10] attempting to add support for artifact aggregation --- workflow/controller/operator.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index e5dd72663b75..612f70ba0d5e 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -1479,6 +1479,7 @@ func (woc *wfOperationCtx) processAggregateNodeOutputs(templateName string, scop sort.Sort(loopNodes(childNodes)) paramList := make([]map[string]string, 0) resultsList := make([]wfv1.Item, 0) + artifactsList := make([]map[string]wfv1.Artifact, 0) for _, node := range childNodes { if node.Outputs == nil { continue @@ -1500,6 +1501,14 @@ func (woc *wfOperationCtx) processAggregateNodeOutputs(templateName string, scop resultsList = append(resultsList, wfv1.Item{Value: *node.Outputs.Result}) } } + + if len(node.Outputs.Artifacts) > 0 { + artifact := make(map[string]wfv1.Artifact) + for _, a := range node.Outputs.Artifacts { + artifact[a.Name] = a + } + + } } tmplType := woc.wf.GetTemplate(templateName).GetType() if tmplType == wfv1.TemplateTypeScript { @@ -1510,6 +1519,10 @@ func (woc *wfOperationCtx) processAggregateNodeOutputs(templateName string, scop outputsJSON, _ := json.Marshal(paramList) key := fmt.Sprintf("%s.outputs.parameters", prefix) scope.addParamToScope(key, string(outputsJSON)) + + artifactJSON, _ := json.Marshal(artifactsList) + artifactKey := fmt.Sprintf("%s.outputs.artifacts", prefix) + scope.addParamToScope(artifactKey, string(artifactJSON)) } // addParamToGlobalScope exports any desired node outputs to the global scope, and adds it to the global outputs. From dbd4ca2ee27dfca36ec8e1c7883e6810f2f0f908 Mon Sep 17 00:00:00 2001 From: Sam DeLuca Date: Mon, 12 Aug 2019 11:11:30 -0400 Subject: [PATCH 09/10] logs --- workflow/controller/operator.go | 1 + 1 file changed, 1 insertion(+) diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 612f70ba0d5e..6ccb1a8421d9 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -1523,6 +1523,7 @@ func (woc *wfOperationCtx) processAggregateNodeOutputs(templateName string, scop artifactJSON, _ := json.Marshal(artifactsList) artifactKey := fmt.Sprintf("%s.outputs.artifacts", prefix) scope.addParamToScope(artifactKey, string(artifactJSON)) + log.Infof("artifact to scope %s => %s", artifactKey, artifactJSON) } // addParamToGlobalScope exports any desired node outputs to the global scope, and adds it to the global outputs. From 8d89dda4fd235d040c04f4f23a959db8bb3ad4c5 Mon Sep 17 00:00:00 2001 From: Sam DeLuca Date: Mon, 12 Aug 2019 11:18:23 -0400 Subject: [PATCH 10/10] fixing typo --- workflow/controller/operator.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 6ccb1a8421d9..64d3a2d60fba 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -1479,7 +1479,7 @@ func (woc *wfOperationCtx) processAggregateNodeOutputs(templateName string, scop sort.Sort(loopNodes(childNodes)) paramList := make([]map[string]string, 0) resultsList := make([]wfv1.Item, 0) - artifactsList := make([]map[string]wfv1.Artifact, 0) + artifactList := make([]map[string]wfv1.Artifact, 0) for _, node := range childNodes { if node.Outputs == nil { continue @@ -1507,6 +1507,7 @@ func (woc *wfOperationCtx) processAggregateNodeOutputs(templateName string, scop for _, a := range node.Outputs.Artifacts { artifact[a.Name] = a } + artifactList = append(artifactList, artifact) } } @@ -1520,7 +1521,7 @@ func (woc *wfOperationCtx) processAggregateNodeOutputs(templateName string, scop key := fmt.Sprintf("%s.outputs.parameters", prefix) scope.addParamToScope(key, string(outputsJSON)) - artifactJSON, _ := json.Marshal(artifactsList) + artifactJSON, _ := json.Marshal(artifactList) artifactKey := fmt.Sprintf("%s.outputs.artifacts", prefix) scope.addParamToScope(artifactKey, string(artifactJSON)) log.Infof("artifact to scope %s => %s", artifactKey, artifactJSON)