diff --git a/workflow/controller/taskresult.go b/workflow/controller/taskresult.go index 0af6383b3b7c..6a584cea2055 100644 --- a/workflow/controller/taskresult.go +++ b/workflow/controller/taskresult.go @@ -11,6 +11,7 @@ import ( wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" wfextvv1alpha1 "github.com/argoproj/argo-workflows/v3/pkg/client/informers/externalversions/workflow/v1alpha1" + "github.com/argoproj/argo-workflows/v3/workflow/common" "github.com/argoproj/argo-workflows/v3/workflow/controller/indexes" ) @@ -21,7 +22,7 @@ func (wfc *WorkflowController) newWorkflowTaskResultInformer() cache.SharedIndex String() log.WithField("labelSelector", labelSelector). Info("Watching task results") - return wfextvv1alpha1.NewFilteredWorkflowTaskResultInformer( + informer := wfextvv1alpha1.NewFilteredWorkflowTaskResultInformer( wfc.wfclientset, wfc.GetManagedNamespace(), 20*time.Minute, @@ -32,6 +33,20 @@ func (wfc *WorkflowController) newWorkflowTaskResultInformer() cache.SharedIndex options.LabelSelector = labelSelector }, ) + informer.AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(new interface{}) { + result := new.(*wfv1.WorkflowTaskResult) + workflow := result.Labels[common.LabelKeyWorkflow] + wfc.wfQueue.AddRateLimited(result.Namespace + "/" + workflow) + }, + UpdateFunc: func(old, new interface{}) { + result := new.(*wfv1.WorkflowTaskResult) + workflow := result.Labels[common.LabelKeyWorkflow] + wfc.wfQueue.AddRateLimited(result.Namespace + "/" + workflow) + }, + }) + return informer } func (woc *wfOperationCtx) taskResultReconciliation() {