Skip to content

Commit

Permalink
Add Pod Issue handling for jobs created using Executor API (#2065)
Browse files Browse the repository at this point in the history
* [WIP] Initial implementation of executor using executor API

* Handle minor api change - using pointers for NodeInfo

* Tidy ingress details

* gofumpt

* Fix unit tests

* goimports

* Limit event sequence size

* Fix for bad merge

* Fix ingress and service count

* Set preempted run id

* Update internal/executor/reporter/event_sender.go

* Fix merge issue

* Merge branch 'master' into executor_pulsar_api_initial

# Conflicts:
#	internal/executor/reporter/fake/job_event_reporter.go
#	internal/executor/reporter/job_event_reporter.go
#	internal/executor/service/cluster_allocation.go
#	internal/executor/service/job_manager.go
#	internal/executor/service/job_manager_test.go

* Add Pod Issue handling for jobs created using Executor API

As part of the migration to using the Executor API, we now have 2 flows through the executor

The new flow had no handling of stuck pods, which is what this PR introduces

It was simpler to split this off as the handling is slightly different to existing code
 - Existing code is tied in with lease handling
 - The existing code has to make additional calls (return lease, report done) - where the new executor api is purely event based

* gofumpt

* Remove comments

* Populate runAttempted in return lease event

* Tidy up

* Tidy from code review
  • Loading branch information
JamesMurkin committed Feb 8, 2023
1 parent 99c89e3 commit 13361d8
Show file tree
Hide file tree
Showing 13 changed files with 567 additions and 99 deletions.
1 change: 1 addition & 0 deletions config/executor/config.yaml
Expand Up @@ -10,6 +10,7 @@ task:
utilisationReportingInterval: 1s
missingJobEventReconciliationInterval: 15s
jobLeaseRenewalInterval: 15s
podIssueHandlingInterval: 5s
podDeletionInterval: 5s
resourceCleanupInterval: 15s
allocateSpareClusterCapacityInterval: 5s
Expand Down
6 changes: 6 additions & 0 deletions internal/executor/application.go
Expand Up @@ -159,6 +159,12 @@ func StartUpWithContext(
clusterUtilisationService,
submitter,
etcdHealthMonitor)
podIssueService := service.NewPodIssueService(
clusterContext,
eventReporter,
pendingPodChecker,
config.Kubernetes.StuckTerminatingPodExpiry)
taskManager.Register(podIssueService.HandlePodIssues, config.Task.PodIssueHandlingInterval, "pod_issue_handling")
} else {
jobLeaseService := service.NewJobLeaseService(
clusterContext,
Expand Down
1 change: 1 addition & 0 deletions internal/executor/configuration/types.go
Expand Up @@ -78,6 +78,7 @@ type TaskConfiguration struct {
MissingJobEventReconciliationInterval time.Duration
JobLeaseRenewalInterval time.Duration
AllocateSpareClusterCapacityInterval time.Duration
PodIssueHandlingInterval time.Duration
PodDeletionInterval time.Duration
QueueUsageDataRefreshInterval time.Duration
UtilisationEventProcessingInterval time.Duration
Expand Down
2 changes: 1 addition & 1 deletion internal/executor/context/cluster_context.go
Expand Up @@ -401,7 +401,7 @@ func (c *KubernetesClusterContext) DeleteIngress(ingress *networking.Ingress) er

func (c *KubernetesClusterContext) ProcessPodsToDelete() {
pods := c.podsToDelete.GetAll()
util.ProcessPodsWithThreadPool(pods, c.deleteThreadCount, func(podToDelete *v1.Pod) {
util.ProcessItemsWithThreadPool(context.Background(), c.deleteThreadCount, pods, func(podToDelete *v1.Pod) {
if podToDelete == nil {
return
}
Expand Down
15 changes: 6 additions & 9 deletions internal/executor/job/job_context.go
@@ -1,19 +1,19 @@
package job

import (
"context"
"fmt"
"sync"
"time"

"github.com/armadaproject/armada/pkg/api"

log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"

"github.com/armadaproject/armada/internal/executor/context"
executorContext "github.com/armadaproject/armada/internal/executor/context"
"github.com/armadaproject/armada/internal/executor/podchecks"
"github.com/armadaproject/armada/internal/executor/util"
"github.com/armadaproject/armada/pkg/api"
)

type IssueType int
Expand Down Expand Up @@ -56,7 +56,7 @@ type JobContext interface {
}

type ClusterJobContext struct {
clusterContext context.ClusterContext
clusterContext executorContext.ClusterContext
stuckTerminatingPodExpiry time.Duration
pendingPodChecker podchecks.PodChecker
updateThreadCount int
Expand All @@ -66,7 +66,7 @@ type ClusterJobContext struct {
}

func NewClusterJobContext(
clusterContext context.ClusterContext,
clusterContext executorContext.ClusterContext,
pendingPodChecker podchecks.PodChecker,
stuckTerminatingPodExpiry time.Duration,
updateThreadCount int,
Expand Down Expand Up @@ -119,9 +119,6 @@ func (c *ClusterJobContext) MarkIssueReported(issue *PodIssue) {
}

func (c *ClusterJobContext) DeleteJobWithCondition(job *RunningJob, condition func(pod *v1.Pod) bool) error {
c.activeJobIdsMutex.Lock()
defer c.activeJobIdsMutex.Unlock()

for _, pod := range job.ActivePods {
err := c.clusterContext.DeletePodWithCondition(pod, condition, true)
if err != nil {
Expand All @@ -148,7 +145,7 @@ func (c *ClusterJobContext) AddAnnotation(jobs []*RunningJob, annotations map[st
}
}

util.ProcessPodsWithThreadPool(podsToAnnotate, c.updateThreadCount,
util.ProcessItemsWithThreadPool(context.Background(), c.updateThreadCount, podsToAnnotate,
func(pod *v1.Pod) {
err := c.clusterContext.AddAnnotation(pod, annotations)
if err != nil {
Expand Down
33 changes: 1 addition & 32 deletions internal/executor/service/job_manager.go
Expand Up @@ -3,13 +3,11 @@ package service
import (
"context"
"fmt"
"sync"
"time"

log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"

commonUtil "github.com/armadaproject/armada/internal/common/util"
context2 "github.com/armadaproject/armada/internal/executor/context"
"github.com/armadaproject/armada/internal/executor/domain"
"github.com/armadaproject/armada/internal/executor/job"
Expand Down Expand Up @@ -111,7 +109,7 @@ func (m *JobManager) reportTerminated(pods []*v1.Pod) {
}

func (m *JobManager) handlePodIssues(ctx context.Context, allRunningJobs []*job.RunningJob) {
ProcessRunningJobsWithThreadPool(ctx, allRunningJobs, 20, m.handlePodIssue)
util.ProcessItemsWithThreadPool(ctx, 20, allRunningJobs, m.handlePodIssue)
}

func (m *JobManager) handlePodIssue(runningJob *job.RunningJob) {
Expand Down Expand Up @@ -237,32 +235,3 @@ func hasIssueSelfResolved(runningJob *job.RunningJob) bool {

return false
}

func ProcessRunningJobsWithThreadPool(ctx context.Context, runningJobs []*job.RunningJob, maxThreadCount int, processPod func(*job.RunningJob)) {
wg := &sync.WaitGroup{}
processChannel := make(chan *job.RunningJob)

for i := 0; i < commonUtil.Min(len(runningJobs), maxThreadCount); i++ {
wg.Add(1)
go threadPoolWorker(ctx, wg, processChannel, processPod)
}

for _, runningJob := range runningJobs {
processChannel <- runningJob
}

close(processChannel)
wg.Wait()
}

func threadPoolWorker(ctx context.Context, wg *sync.WaitGroup, jobsToProcess chan *job.RunningJob, processFunc func(*job.RunningJob)) {
defer wg.Done()

for pod := range jobsToProcess {
// Skip processing once context is finished
if ctx.Err() != nil {
continue
}
processFunc(pod)
}
}
5 changes: 3 additions & 2 deletions internal/executor/service/job_manager_test.go
Expand Up @@ -233,8 +233,9 @@ func makeTestPod(status v1.PodStatus) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
domain.JobId: "job-id-1",
domain.Queue: "queue-id-1",
domain.JobId: "job-id-1",
domain.JobRunId: "job-run-id-1",
domain.Queue: "queue-id-1",
},
Annotations: map[string]string{
domain.JobSetId: "job-set-id-1",
Expand Down

0 comments on commit 13361d8

Please sign in to comment.