Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions lib/resourceutil/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ func GetConfigureWorkflowCompletedConditionStatus(obj *unstructured.Unstructured
return condition.Status
}

func GetDeleteWorkflowCompletedConditionStatus(obj *unstructured.Unstructured) v1.ConditionStatus {
condition := GetCondition(obj, DeleteWorkflowCompletedCondition)
if condition == nil {
return v1.ConditionUnknown
}
return condition.Status
}

func MarkConfigureWorkflowAsRunning(logger logr.Logger, obj *unstructured.Unstructured) {
SetCondition(obj, &clusterv1.Condition{
Type: ConfigureWorkflowCompletedCondition,
Expand All @@ -65,6 +73,17 @@ func MarkConfigureWorkflowAsFailed(logger logr.Logger, obj *unstructured.Unstruc
logging.Warn(logger, "marking configure workflow as failed", "condition", ConfigureWorkflowCompletedCondition, "value", v1.ConditionFalse, "reason", ConfigureWorkflowCompletedFailedReason)
}

func MarkDeleteWorkflowAsRunning(logger logr.Logger, obj *unstructured.Unstructured) {
SetCondition(obj, &clusterv1.Condition{
Type: DeleteWorkflowCompletedCondition,
Status: v1.ConditionFalse,
Message: "Pipelines are still in progress",
Reason: "PipelinesInProgress",
LastTransitionTime: metav1.NewTime(time.Now()),
})
logging.Info(logger, "marking delete workflow as running", "condition", DeleteWorkflowCompletedCondition, "value", v1.ConditionFalse, "reason", "PipelinesInProgress")
}

func MarkResourceRequestAsWorksFailed(obj *unstructured.Unstructured, works []string) {
SetCondition(obj, &clusterv1.Condition{
Type: WorksSucceededCondition,
Expand Down
25 changes: 25 additions & 0 deletions lib/workflow/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ func ReconcileDelete(opts Opts) (bool, error) {
}

func createDeletePipeline(opts Opts, pipeline v1alpha1.PipelineJobResources) (abort bool, err error) {
updated, err := setDeleteWorkflowCompletedConditionStatus(opts, opts.parentObject)
if err != nil || updated {
return updated, err
}

logging.Debug(opts.logger, "creating delete pipeline; execution will commence")
if isManualReconciliation(opts.parentObject.GetLabels()) {
if err := removeManualReconciliationLabel(opts); err != nil {
Expand Down Expand Up @@ -507,6 +512,26 @@ func setConfigureWorkflowCompletedConditionStatus(opts Opts, isTheFirstPipeline
}
}

func setDeleteWorkflowCompletedConditionStatus(opts Opts, obj *unstructured.Unstructured) (bool, error) {
if opts.SkipConditions {
return false, nil
}
switch resourceutil.GetDeleteWorkflowCompletedConditionStatus(obj) {
case v1.ConditionTrue:
fallthrough
case v1.ConditionUnknown:
resourceutil.MarkDeleteWorkflowAsRunning(opts.logger, obj)
err := opts.client.Status().Update(opts.ctx, obj)
if err != nil {
logging.Error(opts.logger, err, "failed to update object status")
return false, err
}
return true, nil
default:
return false, nil
}
}

func getMostRecentDeletePipelineJob(opts Opts, namespace string, pipeline v1alpha1.PipelineJobResources) (*batchv1.Job, error) {
labels := getLabelsForPipelineJob(pipeline)
jobs, err := getJobsWithLabels(opts, labels, namespace)
Expand Down
36 changes: 32 additions & 4 deletions lib/workflow/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1863,15 +1863,38 @@ var _ = Describe("Workflow Reconciler", func() {
})

When("there are pipelines to reconcile", func() {
It("reconciles the first pipeline", func() {
It("updates the DeleteWorkflowCompleted condition to indicate pipelines are in progress", func() {
opts := workflow.NewOpts(ctx, fakeK8sClient, eventRecorder, logger, uPromise, workflowPipelines, "promise", 5, namespace)
requeue, err := workflow.ReconcileDelete(opts)
Expect(err).NotTo(HaveOccurred())
Expect(requeue).To(BeTrue())
jobList := listJobs(namespace)
Expect(jobList).To(HaveLen(1))

Expect(findByName(jobList, workflowPipelines[0].Job.Name)).To(BeTrue())
Expect(fakeK8sClient.Get(ctx, types.NamespacedName{Name: promise.Name}, &promise)).To(Succeed())
deleteWorkflowCond := apimeta.FindStatusCondition(promise.Status.Conditions, string(resourceutil.DeleteWorkflowCompletedCondition))
Expect(deleteWorkflowCond).NotTo(BeNil())
Expect(deleteWorkflowCond.Message).To(Equal("Pipelines are still in progress"))
Expect(deleteWorkflowCond.Reason).To(Equal("PipelinesInProgress"))
Expect(string(deleteWorkflowCond.Status)).To(Equal("False"))
})

It("reconciles the first pipeline", func() {
opts := workflow.NewOpts(ctx, fakeK8sClient, eventRecorder, logger, uPromise, workflowPipelines, "promise", 5, namespace)

By("first updating the delete workflow condition", func() {
requeue, err := workflow.ReconcileDelete(opts)
Expect(err).NotTo(HaveOccurred())
Expect(requeue).To(BeTrue())
})

By("then creating the job on subsequent reconcile", func() {
requeue, err := workflow.ReconcileDelete(opts)
Expect(err).NotTo(HaveOccurred())
Expect(requeue).To(BeTrue())
jobList := listJobs(namespace)
Expect(jobList).To(HaveLen(1))

Expect(findByName(jobList, workflowPipelines[0].Job.Name)).To(BeTrue())
})

By("not returning completed until the job is marked as completed", func() {
requeue, err := workflow.ReconcileDelete(opts)
Expand Down Expand Up @@ -1960,9 +1983,14 @@ var _ = Describe("Workflow Reconciler", func() {
labelPromiseForManualReconciliation("redis")
newWorkflowPipelines, uPromise = setupTest(promise, pipelines)
opts := workflow.NewOpts(ctx, fakeK8sClient, eventRecorder, logger, uPromise, newWorkflowPipelines, "promise", 5, namespace)
// First call updates the delete workflow condition
abort, err := workflow.ReconcileDelete(opts)
Expect(abort).To(BeTrue())
Expect(err).NotTo(HaveOccurred())
// Second call creates the job
abort, err = workflow.ReconcileDelete(opts)
Expect(abort).To(BeTrue())
Expect(err).NotTo(HaveOccurred())
})

It("re-triggers the pipeline in the workflow", func() {
Expand Down