forked from argoproj/argo-workflows
-
Notifications
You must be signed in to change notification settings - Fork 0
/
taskresult.go
80 lines (75 loc) · 2.54 KB
/
taskresult.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package controller
import (
"reflect"
"time"
log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
wfextvv1alpha1 "github.com/argoproj/argo-workflows/v3/pkg/client/informers/externalversions/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/workflow/common"
"github.com/argoproj/argo-workflows/v3/workflow/controller/indexes"
)
func (wfc *WorkflowController) newWorkflowTaskResultInformer() cache.SharedIndexInformer {
labelSelector := labels.NewSelector().
Add(*workflowReq).
Add(wfc.instanceIDReq()).
String()
log.WithField("labelSelector", labelSelector).
Info("Watching task results")
informer := wfextvv1alpha1.NewFilteredWorkflowTaskResultInformer(
wfc.wfclientset,
wfc.GetManagedNamespace(),
20*time.Minute,
cache.Indexers{
indexes.WorkflowIndex: indexes.MetaWorkflowIndexFunc,
},
func(options *metav1.ListOptions) {
options.LabelSelector = labelSelector
},
)
informer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(new interface{}) {
result := new.(*wfv1.WorkflowTaskResult)
workflow := result.Labels[common.LabelKeyWorkflow]
wfc.wfQueue.AddRateLimited(result.Namespace + "/" + workflow)
},
UpdateFunc: func(old, new interface{}) {
result := new.(*wfv1.WorkflowTaskResult)
workflow := result.Labels[common.LabelKeyWorkflow]
wfc.wfQueue.AddRateLimited(result.Namespace + "/" + workflow)
},
})
return informer
}
func (woc *wfOperationCtx) taskResultReconciliation() {
objs, _ := woc.controller.taskResultInformer.GetIndexer().ByIndex(indexes.WorkflowIndex, woc.wf.Namespace+"/"+woc.wf.Name)
woc.log.WithField("numObjs", len(objs)).Info("Task-result reconciliation")
for _, obj := range objs {
result := obj.(*wfv1.WorkflowTaskResult)
nodeID := result.Name
old := woc.wf.Status.Nodes[nodeID]
new := old.DeepCopy()
if result.Outputs.HasOutputs() {
if new.Outputs == nil {
new.Outputs = &wfv1.Outputs{}
}
result.Outputs.DeepCopyInto(new.Outputs) // preserve any existing values
if old.Outputs != nil && new.Outputs.ExitCode == nil { // prevent overwriting of ExitCode
new.Outputs.ExitCode = old.Outputs.ExitCode
}
}
if result.Progress.IsValid() {
new.Progress = result.Progress
}
if !reflect.DeepEqual(&old, new) {
woc.log.
WithField("nodeID", nodeID).
Info("task-result changed")
woc.wf.Status.Nodes[nodeID] = *new
woc.updated = true
}
}
}