From ac54eb3e443fb8e188f9ffcb50335be8f91593dd Mon Sep 17 00:00:00 2001 From: Michael Weibel Date: Tue, 22 Nov 2022 17:23:37 +0100 Subject: [PATCH] fix: reconcile wf when taskresult is added/updated Re-adds an EventHandler to trigger an update when a WorkflowTaskResult got added/updated. Without this, a workflow progress only gets updated upon completion or every 20mins. Fixes #10096, bug introduced in #8135 Reverts #8459 to re-add the progress test Signed-off-by: Michael Weibel --- test/e2e/progress_test.go | 5 +++-- test/e2e/testdata/progress-workflow.yaml | 2 +- workflow/controller/taskresult.go | 17 ++++++++++++++++- 3 files changed, 20 insertions(+), 4 deletions(-) diff --git a/test/e2e/progress_test.go b/test/e2e/progress_test.go index 6ff006fdde5a..5749440db321 100644 --- a/test/e2e/progress_test.go +++ b/test/e2e/progress_test.go @@ -34,7 +34,6 @@ func (s *ProgressSuite) TestDefaultProgress() { } func (s *ProgressSuite) TestLoggedProgress() { - s.T().SkipNow() toHaveProgress := func(p wfv1.Progress) fixtures.Condition { return func(wf *wfv1.Workflow) (bool, string) { return wf.Status.Nodes[wf.Name].Progress == p && @@ -46,7 +45,9 @@ func (s *ProgressSuite) TestLoggedProgress() { Workflow("@testdata/progress-workflow.yaml"). When(). SubmitWorkflow(). - WaitForWorkflow(toHaveProgress("50/100"), time.Minute). // ARGO_PROGRESS_PATCH_TICK_DURATION=1m + WaitForWorkflow(fixtures.ToBeRunning). + WaitForWorkflow(toHaveProgress("0/100"), 20*time.Second). + WaitForWorkflow(toHaveProgress("50/100"), 20*time.Second). WaitForWorkflow(). Then(). ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) { diff --git a/test/e2e/testdata/progress-workflow.yaml b/test/e2e/testdata/progress-workflow.yaml index fb893dd2b371..80e31f25bc93 100644 --- a/test/e2e/testdata/progress-workflow.yaml +++ b/test/e2e/testdata/progress-workflow.yaml @@ -17,4 +17,4 @@ spec: image: argoproj/argosay:v2 command: ["/bin/sh", "-c"] args: - - /argosay echo 50/100 $ARGO_PROGRESS_FILE && /argosay sleep 1m + - /argosay echo 0/100 $ARGO_PROGRESS_FILE && /argosay sleep 10s && /argosay echo 50/100 $ARGO_PROGRESS_FILE && /argosay sleep 10s diff --git a/workflow/controller/taskresult.go b/workflow/controller/taskresult.go index 0af6383b3b7c..6a584cea2055 100644 --- a/workflow/controller/taskresult.go +++ b/workflow/controller/taskresult.go @@ -11,6 +11,7 @@ import ( wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" wfextvv1alpha1 "github.com/argoproj/argo-workflows/v3/pkg/client/informers/externalversions/workflow/v1alpha1" + "github.com/argoproj/argo-workflows/v3/workflow/common" "github.com/argoproj/argo-workflows/v3/workflow/controller/indexes" ) @@ -21,7 +22,7 @@ func (wfc *WorkflowController) newWorkflowTaskResultInformer() cache.SharedIndex String() log.WithField("labelSelector", labelSelector). Info("Watching task results") - return wfextvv1alpha1.NewFilteredWorkflowTaskResultInformer( + informer := wfextvv1alpha1.NewFilteredWorkflowTaskResultInformer( wfc.wfclientset, wfc.GetManagedNamespace(), 20*time.Minute, @@ -32,6 +33,20 @@ func (wfc *WorkflowController) newWorkflowTaskResultInformer() cache.SharedIndex options.LabelSelector = labelSelector }, ) + informer.AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(new interface{}) { + result := new.(*wfv1.WorkflowTaskResult) + workflow := result.Labels[common.LabelKeyWorkflow] + wfc.wfQueue.AddRateLimited(result.Namespace + "/" + workflow) + }, + UpdateFunc: func(old, new interface{}) { + result := new.(*wfv1.WorkflowTaskResult) + workflow := result.Labels[common.LabelKeyWorkflow] + wfc.wfQueue.AddRateLimited(result.Namespace + "/" + workflow) + }, + }) + return informer } func (woc *wfOperationCtx) taskResultReconciliation() {