Skip to content

Commit

Permalink
Merge pull request #112298 from MikeSpreitzer/automated-cherry-pick-o…
Browse files Browse the repository at this point in the history
…f-#112198-upstream-release-1.23

Automated cherry pick of #112198: Call queueSet::boundNextDispatchLocked enough

Kubernetes-commit: 0bf187e7cb5c89be17db79f68b009f2a391c4b19
  • Loading branch information
k8s-publishing-bot committed Dec 12, 2022
2 parents 1c37dd0 + 16fa483 commit a75e1b4
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 18 deletions.
50 changes: 33 additions & 17 deletions pkg/util/flowcontrol/fairqueuing/queueset/queueset.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,19 @@ type queueSetCompleter struct {
// described in this package's doc, and a pointer to one implements
// the QueueSet interface. The fields listed before the lock
// should not be changed; the fields listed after the
// lock must be accessed only while holding the lock. The methods of
// this type follow the naming convention that the suffix "Locked"
// means the caller must hold the lock; for a method whose name does
// not end in "Locked" either acquires the lock or does not care about
// locking.
// lock must be accessed only while holding the lock.
//
// The methods of this type follow the naming convention that the
// suffix "Locked" means the caller must hold the lock; for a method
// whose name does not end in "Locked" either acquires the lock or
// does not care about locking.
//
// The methods of this type also follow the convention that the suffix
// "ToBoundLocked" means that the caller may have to follow up with a
// call to `boundNextDispatchLocked`. This is so for a method that
// changes what request is oldest in a queue, because that change means
// that the anti-windup hack in boundNextDispatchLocked needs to be
// applied wrt the revised oldest request in the queue.
type queueSet struct {
clock eventclock.Interface
estimatedServiceDuration time.Duration
Expand Down Expand Up @@ -393,7 +401,9 @@ func (req *request) wait() (bool, bool) {
// TODO(aaron-prindle) add metrics for this case
klog.V(5).Infof("QS(%s): Ejecting request %#+v %#+v from its queue", qs.qCfg.Name, req.descr1, req.descr2)
// remove the request from the queue as it has timed out
queue := req.queue
if req.removeFromQueueLocked() != nil {
defer qs.boundNextDispatchLocked(queue)
qs.totRequestsWaiting--
metrics.AddReject(req.ctx, qs.qCfg.Name, req.fsName, "cancelled")
metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1)
Expand Down Expand Up @@ -518,7 +528,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte
queueIdx := qs.shuffleShardLocked(hashValue, descr1, descr2)
queue := qs.queues[queueIdx]
// The next step is the logic to reject requests that have been waiting too long
qs.removeTimedOutRequestsFromQueueLocked(queue, fsName)
qs.removeTimedOutRequestsFromQueueToBoundLocked(queue, fsName)
// NOTE: currently timeout is only checked for each new request. This means that there can be
// requests that are in the queue longer than the timeout if there are no new requests
// We prefer the simplicity over the promptness, at least for now.
Expand All @@ -540,7 +550,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte
queueNoteFn: queueNoteFn,
workEstimate: qs.completeWorkEstimate(workEstimate),
}
if ok := qs.rejectOrEnqueueLocked(req); !ok {
if ok := qs.rejectOrEnqueueToBoundLocked(req); !ok {
return nil
}
metrics.ObserveQueueLength(ctx, qs.qCfg.Name, fsName, queue.requests.Length())
Expand Down Expand Up @@ -580,9 +590,9 @@ func (qs *queueSet) shuffleShardLocked(hashValue uint64, descr1, descr2 interfac
return bestQueueIdx
}

// removeTimedOutRequestsFromQueueLocked rejects old requests that have been enqueued
// removeTimedOutRequestsFromQueueToBoundLocked rejects old requests that have been enqueued
// past the requestWaitLimit
func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName string) {
func (qs *queueSet) removeTimedOutRequestsFromQueueToBoundLocked(queue *queue, fsName string) {
timeoutCount := 0
now := qs.clock.Now()
reqs := queue.requests
Expand Down Expand Up @@ -613,11 +623,11 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName s
}
}

// rejectOrEnqueueLocked rejects or enqueues the newly arrived
// rejectOrEnqueueToBoundLocked rejects or enqueues the newly arrived
// request, which has been assigned to a queue. If up against the
// queue length limit and the concurrency limit then returns false.
// Otherwise enqueues and returns true.
func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool {
func (qs *queueSet) rejectOrEnqueueToBoundLocked(request *request) bool {
queue := request.queue
curQueueLength := queue.requests.Length()
// rejects the newly arrived request if resource criteria not met
Expand All @@ -626,12 +636,12 @@ func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool {
return false
}

qs.enqueueLocked(request)
qs.enqueueToBoundLocked(request)
return true
}

// enqueues a request into its queue.
func (qs *queueSet) enqueueLocked(request *request) {
func (qs *queueSet) enqueueToBoundLocked(request *request) {
queue := request.queue
now := qs.clock.Now()
if queue.requests.Length() == 0 && queue.requestsExecuting == 0 {
Expand Down Expand Up @@ -688,7 +698,7 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f
// be false when either all queues are empty or the request at the head
// of the next queue cannot be dispatched.
func (qs *queueSet) dispatchLocked() bool {
queue, request := qs.findDispatchQueueLocked()
queue, request := qs.findDispatchQueueToBoundLocked()
if queue == nil {
return false
}
Expand Down Expand Up @@ -723,6 +733,11 @@ func (qs *queueSet) dispatchLocked() bool {
request.workEstimate, queue.index, queue.nextDispatchR, queue.requests.Length(), queue.requestsExecuting, queue.seatsInUse, qs.totSeatsInUse)
}
// When a request is dequeued for service -> qs.virtualStart += G * width
if request.totalWork() > rDecrement/100 { // A single increment should never be so big
klog.Errorf("QS(%s) at t=%s R=%v: dispatching request %#+v %#+v with implausibly high work %v from queue %d with start R %v",
qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.currentR, request.descr1, request.descr2,
request.workEstimate, queue.index, queue.nextDispatchR)
}
queue.nextDispatchR += request.totalWork()
return true
}
Expand Down Expand Up @@ -750,11 +765,12 @@ func (qs *queueSet) canAccommodateSeatsLocked(seats int) bool {
return true
}

// findDispatchQueueLocked examines the queues in round robin order and
// findDispatchQueueToBoundLocked examines the queues in round robin order and
// returns the first one of those for which the virtual finish time of
// the oldest waiting request is minimal, and also returns that request.
// Returns nils if the head of the selected queue can not be dispatched now.
func (qs *queueSet) findDispatchQueueLocked() (*queue, *request) {
// Returns nils if the head of the selected queue can not be dispatched now,
// in which case the caller does not need to follow up with`qs.boundNextDispatchLocked`.
func (qs *queueSet) findDispatchQueueToBoundLocked() (*queue, *request) {
minVirtualFinish := fqrequest.MaxSeatSeconds
sMin := fqrequest.MaxSeatSeconds
dsMin := fqrequest.MaxSeatSeconds
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1308,7 +1308,7 @@ func TestFindDispatchQueueLocked(t *testing.T) {
minQueueExpected = test.queues[queueIdx]
}

minQueueGot, reqGot := qs.findDispatchQueueLocked()
minQueueGot, reqGot := qs.findDispatchQueueToBoundLocked()
if minQueueExpected != minQueueGot {
t.Errorf("Expected queue: %#v, but got: %#v", minQueueExpected, minQueueGot)
}
Expand Down

0 comments on commit a75e1b4

Please sign in to comment.