diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go index 8b95021c747e..f4eefcbdb1a5 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go @@ -101,7 +101,7 @@ func WithPriorityAndFairness( } var classification *PriorityAndFairnessClassification - estimateWork := func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) flowcontrolrequest.WorkEstimate { + noteFn := func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) { classification = &PriorityAndFairnessClassification{ FlowSchemaName: fs.Name, FlowSchemaUID: fs.UID, @@ -111,7 +111,19 @@ func WithPriorityAndFairness( httplog.AddKeyValue(ctx, "apf_pl", truncateLogField(pl.Name)) httplog.AddKeyValue(ctx, "apf_fs", truncateLogField(fs.Name)) httplog.AddKeyValue(ctx, "apf_fd", truncateLogField(flowDistinguisher)) - return workEstimator(r, fs.Name, pl.Name) + } + // estimateWork is called, if at all, after noteFn + estimateWork := func() flowcontrolrequest.WorkEstimate { + if classification == nil { + // workEstimator is being invoked before classification of + // the request has completed, we should never be here though. + klog.ErrorS(fmt.Errorf("workEstimator is being invoked before classification of the request has completed"), + "Using empty FlowSchema and PriorityLevelConfiguration name", "verb", r.Method, "URI", r.RequestURI) + + return workEstimator(r, "", "") + } + + return workEstimator(r, classification.FlowSchemaName, classification.PriorityLevelName) } var served bool @@ -235,7 +247,7 @@ func WithPriorityAndFairness( // Note that Handle will return irrespective of whether the request // executes or is rejected. In the latter case, the function will return // without calling the passed `execute` function. - fcIfc.Handle(handleCtx, digest, estimateWork, queueNote, execute) + fcIfc.Handle(handleCtx, digest, noteFn, estimateWork, queueNote, execute) }() select { @@ -266,7 +278,7 @@ func WithPriorityAndFairness( handler.ServeHTTP(w, r) } - fcIfc.Handle(ctx, digest, estimateWork, queueNote, execute) + fcIfc.Handle(ctx, digest, noteFn, estimateWork, queueNote, execute) } if !served { diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go index d4619023bba4..42673aae5afd 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go @@ -87,14 +87,15 @@ func (t fakeApfFilter) MaintainObservations(stopCh <-chan struct{}) { func (t fakeApfFilter) Handle(ctx context.Context, requestDigest utilflowcontrol.RequestDigest, - workEstimator func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate, + noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string), + workEstimator func() fcrequest.WorkEstimate, queueNoteFn fq.QueueNoteFn, execFn func(), ) { if t.mockDecision == decisionSkipFilter { panic("Handle should not be invoked") } - workEstimator(bootstrap.SuggestedFlowSchemaGlobalDefault, bootstrap.SuggestedPriorityLevelConfigurationGlobalDefault, requestDigest.User.GetName()) + noteFn(bootstrap.SuggestedFlowSchemaGlobalDefault, bootstrap.SuggestedPriorityLevelConfigurationGlobalDefault, requestDigest.User.GetName()) switch t.mockDecision { case decisionNoQueuingExecute: execFn() @@ -390,7 +391,8 @@ func newFakeWatchApfFilter(capacity int) *fakeWatchApfFilter { func (f *fakeWatchApfFilter) Handle(ctx context.Context, requestDigest utilflowcontrol.RequestDigest, - _ func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate, + _ func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string), + _ func() fcrequest.WorkEstimate, _ fq.QueueNoteFn, execFn func(), ) { @@ -640,11 +642,13 @@ type fakeFilterRequestDigest struct { func (f *fakeFilterRequestDigest) Handle(ctx context.Context, requestDigest utilflowcontrol.RequestDigest, - workEstimator func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate, + noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string), + workEstimator func() fcrequest.WorkEstimate, _ fq.QueueNoteFn, _ func(), ) { f.requestDigestGot = &requestDigest - f.workEstimateGot = workEstimator(bootstrap.MandatoryFlowSchemaCatchAll, bootstrap.MandatoryPriorityLevelConfigurationCatchAll, "") + noteFn(bootstrap.MandatoryFlowSchemaCatchAll, bootstrap.MandatoryPriorityLevelConfigurationCatchAll, "") + f.workEstimateGot = workEstimator() } func TestApfWithRequestDigest(t *testing.T) { diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go index 7d53dbee7e90..cf3aeba0c2da 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go @@ -799,7 +799,10 @@ func (immediateRequest) Finish(execute func()) bool { // The returned bool indicates whether the request is exempt from // limitation. The startWaitingTime is when the request started // waiting in its queue, or `Time{}` if this did not happen. -func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDigest, workEstimator func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate, queueNoteFn fq.QueueNoteFn) (fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, isExempt bool, req fq.Request, startWaitingTime time.Time) { +func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDigest, + noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string), + workEstimator func() fcrequest.WorkEstimate, + queueNoteFn fq.QueueNoteFn) (fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, isExempt bool, req fq.Request, startWaitingTime time.Time) { klog.V(7).Infof("startRequest(%#+v)", rd) cfgCtlr.lock.RLock() defer cfgCtlr.lock.RUnlock() @@ -830,6 +833,7 @@ func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDig plName := selectedFlowSchema.Spec.PriorityLevelConfiguration.Name plState := cfgCtlr.priorityLevelStates[plName] if plState.pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt { + noteFn(selectedFlowSchema, plState.pl, "") klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, immediate", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName) return selectedFlowSchema, plState.pl, true, immediateRequest{}, time.Time{} } @@ -843,7 +847,10 @@ func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDig flowDistinguisher = computeFlowDistinguisher(rd, selectedFlowSchema.Spec.DistinguisherMethod) hashValue = hashFlowID(selectedFlowSchema.Name, flowDistinguisher) } - workEstimate := workEstimator(selectedFlowSchema, plState.pl, flowDistinguisher) + + noteFn(selectedFlowSchema, plState.pl, flowDistinguisher) + workEstimate := workEstimator() + startWaitingTime = time.Now() klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName, numQueues) req, idle := plState.queues.StartRequest(ctx, &workEstimate, hashValue, flowDistinguisher, selectedFlowSchema.Name, rd.RequestInfo, rd.User, queueNoteFn) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go index 67149fd836f6..005a95099be5 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go @@ -42,10 +42,14 @@ const ConfigConsumerAsFieldManager = "api-priority-and-fairness-config-consumer- // Interface defines how the API Priority and Fairness filter interacts with the underlying system. type Interface interface { // Handle takes care of queuing and dispatching a request - // characterized by the given digest. The given `workEstimator` will be - // invoked with the results of request classification and must return the - // work parameters for the request. If the - // request is queued then `queueNoteFn` will be called twice, + // characterized by the given digest. The given `noteFn` will be + // invoked with the results of request classification. + // The given `workEstimator` is called, if at all, after noteFn. + // `workEstimator` will be invoked only when the request + // is classified as non 'exempt'. + // 'workEstimator', when invoked, must return the + // work parameters for the request. + // If the request is queued then `queueNoteFn` will be called twice, // first with `true` and then with `false`; otherwise // `queueNoteFn` will not be called at all. If Handle decides // that the request should be executed then `execute()` will be @@ -55,7 +59,8 @@ type Interface interface { // ctx is cancelled or times out. Handle(ctx context.Context, requestDigest RequestDigest, - workEstimator func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate, + noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string), + workEstimator func() fcrequest.WorkEstimate, queueNoteFn fq.QueueNoteFn, execFn func(), ) @@ -150,10 +155,11 @@ func NewTestable(config TestableConfig) Interface { } func (cfgCtlr *configController) Handle(ctx context.Context, requestDigest RequestDigest, - workEstimator func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate, + noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string), + workEstimator func() fcrequest.WorkEstimate, queueNoteFn fq.QueueNoteFn, execFn func()) { - fs, pl, isExempt, req, startWaitingTime := cfgCtlr.startRequest(ctx, requestDigest, workEstimator, queueNoteFn) + fs, pl, isExempt, req, startWaitingTime := cfgCtlr.startRequest(ctx, requestDigest, noteFn, workEstimator, queueNoteFn) queued := startWaitingTime != time.Time{} if req == nil { if queued { diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go index 9a40fb719296..f3eedeb962e0 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go @@ -462,7 +462,7 @@ func checkNewFS(cts *ctlrTestState, rng *rand.Rand, trialName string, ftr *fsTes startWG.Add(1) go func(matches, isResource bool, rdu RequestDigest) { expectedMatch := matches && ftr.wellFormed && (fsPrecedes(fs, catchAlls[isResource]) || fs.Name == catchAlls[isResource].Name) - ctlr.Handle(ctx, rdu, func(matchFS *flowcontrol.FlowSchema, matchPL *flowcontrol.PriorityLevelConfiguration, _ string) fcrequest.WorkEstimate { + ctlr.Handle(ctx, rdu, func(matchFS *flowcontrol.FlowSchema, matchPL *flowcontrol.PriorityLevelConfiguration, _ string) { matchIsExempt := matchPL.Spec.Type == flowcontrol.PriorityLevelEnablementExempt if testDebugLogs { t.Logf("Considering FlowSchema %s, expectedMatch=%v, isResource=%v: Handle(%#+v) => note(fs=%s, pl=%s, isExempt=%v)", fs.Name, expectedMatch, isResource, rdu, matchFS.Name, matchPL.Name, matchIsExempt) @@ -475,6 +475,7 @@ func checkNewFS(cts *ctlrTestState, rng *rand.Rand, trialName string, ftr *fsTes t.Errorf("Fail at %s/%s: expected=%v, actual=%v", trialName, fs.Name, fs.Spec.PriorityLevelConfiguration.Name, matchPL.Name) } } + }, func() fcrequest.WorkEstimate { return fcrequest.WorkEstimate{InitialSeats: 1} }, func(inQueue bool) { }, func() {