Skip to content

Commit

Permalink
Merge pull request #106827 from tkashem/apf-exempt-note
Browse files Browse the repository at this point in the history
apf: ensure exempt request is noted with classification
  • Loading branch information
k8s-ci-robot committed Dec 11, 2021
2 parents 8b9e1d7 + 8b2dd74 commit 0ae6ef6
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 19 deletions.
Expand Up @@ -101,7 +101,7 @@ func WithPriorityAndFairness(
}

var classification *PriorityAndFairnessClassification
estimateWork := func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) flowcontrolrequest.WorkEstimate {
noteFn := func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) {
classification = &PriorityAndFairnessClassification{
FlowSchemaName: fs.Name,
FlowSchemaUID: fs.UID,
Expand All @@ -111,7 +111,19 @@ func WithPriorityAndFairness(
httplog.AddKeyValue(ctx, "apf_pl", truncateLogField(pl.Name))
httplog.AddKeyValue(ctx, "apf_fs", truncateLogField(fs.Name))
httplog.AddKeyValue(ctx, "apf_fd", truncateLogField(flowDistinguisher))
return workEstimator(r, fs.Name, pl.Name)
}
// estimateWork is called, if at all, after noteFn
estimateWork := func() flowcontrolrequest.WorkEstimate {
if classification == nil {
// workEstimator is being invoked before classification of
// the request has completed, we should never be here though.
klog.ErrorS(fmt.Errorf("workEstimator is being invoked before classification of the request has completed"),
"Using empty FlowSchema and PriorityLevelConfiguration name", "verb", r.Method, "URI", r.RequestURI)

return workEstimator(r, "", "")
}

return workEstimator(r, classification.FlowSchemaName, classification.PriorityLevelName)
}

var served bool
Expand Down Expand Up @@ -235,7 +247,7 @@ func WithPriorityAndFairness(
// Note that Handle will return irrespective of whether the request
// executes or is rejected. In the latter case, the function will return
// without calling the passed `execute` function.
fcIfc.Handle(handleCtx, digest, estimateWork, queueNote, execute)
fcIfc.Handle(handleCtx, digest, noteFn, estimateWork, queueNote, execute)
}()

select {
Expand Down Expand Up @@ -266,7 +278,7 @@ func WithPriorityAndFairness(
handler.ServeHTTP(w, r)
}

fcIfc.Handle(ctx, digest, estimateWork, queueNote, execute)
fcIfc.Handle(ctx, digest, noteFn, estimateWork, queueNote, execute)
}

if !served {
Expand Down
Expand Up @@ -87,14 +87,15 @@ func (t fakeApfFilter) MaintainObservations(stopCh <-chan struct{}) {

func (t fakeApfFilter) Handle(ctx context.Context,
requestDigest utilflowcontrol.RequestDigest,
workEstimator func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate,
noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string),
workEstimator func() fcrequest.WorkEstimate,
queueNoteFn fq.QueueNoteFn,
execFn func(),
) {
if t.mockDecision == decisionSkipFilter {
panic("Handle should not be invoked")
}
workEstimator(bootstrap.SuggestedFlowSchemaGlobalDefault, bootstrap.SuggestedPriorityLevelConfigurationGlobalDefault, requestDigest.User.GetName())
noteFn(bootstrap.SuggestedFlowSchemaGlobalDefault, bootstrap.SuggestedPriorityLevelConfigurationGlobalDefault, requestDigest.User.GetName())
switch t.mockDecision {
case decisionNoQueuingExecute:
execFn()
Expand Down Expand Up @@ -390,7 +391,8 @@ func newFakeWatchApfFilter(capacity int) *fakeWatchApfFilter {

func (f *fakeWatchApfFilter) Handle(ctx context.Context,
requestDigest utilflowcontrol.RequestDigest,
_ func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate,
_ func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string),
_ func() fcrequest.WorkEstimate,
_ fq.QueueNoteFn,
execFn func(),
) {
Expand Down Expand Up @@ -640,11 +642,13 @@ type fakeFilterRequestDigest struct {

func (f *fakeFilterRequestDigest) Handle(ctx context.Context,
requestDigest utilflowcontrol.RequestDigest,
workEstimator func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate,
noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string),
workEstimator func() fcrequest.WorkEstimate,
_ fq.QueueNoteFn, _ func(),
) {
f.requestDigestGot = &requestDigest
f.workEstimateGot = workEstimator(bootstrap.MandatoryFlowSchemaCatchAll, bootstrap.MandatoryPriorityLevelConfigurationCatchAll, "")
noteFn(bootstrap.MandatoryFlowSchemaCatchAll, bootstrap.MandatoryPriorityLevelConfigurationCatchAll, "")
f.workEstimateGot = workEstimator()
}

func TestApfWithRequestDigest(t *testing.T) {
Expand Down
Expand Up @@ -799,7 +799,10 @@ func (immediateRequest) Finish(execute func()) bool {
// The returned bool indicates whether the request is exempt from
// limitation. The startWaitingTime is when the request started
// waiting in its queue, or `Time{}` if this did not happen.
func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDigest, workEstimator func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate, queueNoteFn fq.QueueNoteFn) (fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, isExempt bool, req fq.Request, startWaitingTime time.Time) {
func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDigest,
noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string),
workEstimator func() fcrequest.WorkEstimate,
queueNoteFn fq.QueueNoteFn) (fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, isExempt bool, req fq.Request, startWaitingTime time.Time) {
klog.V(7).Infof("startRequest(%#+v)", rd)
cfgCtlr.lock.RLock()
defer cfgCtlr.lock.RUnlock()
Expand Down Expand Up @@ -830,6 +833,7 @@ func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDig
plName := selectedFlowSchema.Spec.PriorityLevelConfiguration.Name
plState := cfgCtlr.priorityLevelStates[plName]
if plState.pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt {
noteFn(selectedFlowSchema, plState.pl, "")
klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, immediate", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName)
return selectedFlowSchema, plState.pl, true, immediateRequest{}, time.Time{}
}
Expand All @@ -843,7 +847,10 @@ func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDig
flowDistinguisher = computeFlowDistinguisher(rd, selectedFlowSchema.Spec.DistinguisherMethod)
hashValue = hashFlowID(selectedFlowSchema.Name, flowDistinguisher)
}
workEstimate := workEstimator(selectedFlowSchema, plState.pl, flowDistinguisher)

noteFn(selectedFlowSchema, plState.pl, flowDistinguisher)
workEstimate := workEstimator()

startWaitingTime = time.Now()
klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName, numQueues)
req, idle := plState.queues.StartRequest(ctx, &workEstimate, hashValue, flowDistinguisher, selectedFlowSchema.Name, rd.RequestInfo, rd.User, queueNoteFn)
Expand Down
20 changes: 13 additions & 7 deletions staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go
Expand Up @@ -42,10 +42,14 @@ const ConfigConsumerAsFieldManager = "api-priority-and-fairness-config-consumer-
// Interface defines how the API Priority and Fairness filter interacts with the underlying system.
type Interface interface {
// Handle takes care of queuing and dispatching a request
// characterized by the given digest. The given `workEstimator` will be
// invoked with the results of request classification and must return the
// work parameters for the request. If the
// request is queued then `queueNoteFn` will be called twice,
// characterized by the given digest. The given `noteFn` will be
// invoked with the results of request classification.
// The given `workEstimator` is called, if at all, after noteFn.
// `workEstimator` will be invoked only when the request
// is classified as non 'exempt'.
// 'workEstimator', when invoked, must return the
// work parameters for the request.
// If the request is queued then `queueNoteFn` will be called twice,
// first with `true` and then with `false`; otherwise
// `queueNoteFn` will not be called at all. If Handle decides
// that the request should be executed then `execute()` will be
Expand All @@ -55,7 +59,8 @@ type Interface interface {
// ctx is cancelled or times out.
Handle(ctx context.Context,
requestDigest RequestDigest,
workEstimator func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate,
noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string),
workEstimator func() fcrequest.WorkEstimate,
queueNoteFn fq.QueueNoteFn,
execFn func(),
)
Expand Down Expand Up @@ -150,10 +155,11 @@ func NewTestable(config TestableConfig) Interface {
}

func (cfgCtlr *configController) Handle(ctx context.Context, requestDigest RequestDigest,
workEstimator func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate,
noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string),
workEstimator func() fcrequest.WorkEstimate,
queueNoteFn fq.QueueNoteFn,
execFn func()) {
fs, pl, isExempt, req, startWaitingTime := cfgCtlr.startRequest(ctx, requestDigest, workEstimator, queueNoteFn)
fs, pl, isExempt, req, startWaitingTime := cfgCtlr.startRequest(ctx, requestDigest, noteFn, workEstimator, queueNoteFn)
queued := startWaitingTime != time.Time{}
if req == nil {
if queued {
Expand Down
Expand Up @@ -462,7 +462,7 @@ func checkNewFS(cts *ctlrTestState, rng *rand.Rand, trialName string, ftr *fsTes
startWG.Add(1)
go func(matches, isResource bool, rdu RequestDigest) {
expectedMatch := matches && ftr.wellFormed && (fsPrecedes(fs, catchAlls[isResource]) || fs.Name == catchAlls[isResource].Name)
ctlr.Handle(ctx, rdu, func(matchFS *flowcontrol.FlowSchema, matchPL *flowcontrol.PriorityLevelConfiguration, _ string) fcrequest.WorkEstimate {
ctlr.Handle(ctx, rdu, func(matchFS *flowcontrol.FlowSchema, matchPL *flowcontrol.PriorityLevelConfiguration, _ string) {
matchIsExempt := matchPL.Spec.Type == flowcontrol.PriorityLevelEnablementExempt
if testDebugLogs {
t.Logf("Considering FlowSchema %s, expectedMatch=%v, isResource=%v: Handle(%#+v) => note(fs=%s, pl=%s, isExempt=%v)", fs.Name, expectedMatch, isResource, rdu, matchFS.Name, matchPL.Name, matchIsExempt)
Expand All @@ -475,6 +475,7 @@ func checkNewFS(cts *ctlrTestState, rng *rand.Rand, trialName string, ftr *fsTes
t.Errorf("Fail at %s/%s: expected=%v, actual=%v", trialName, fs.Name, fs.Spec.PriorityLevelConfiguration.Name, matchPL.Name)
}
}
}, func() fcrequest.WorkEstimate {
return fcrequest.WorkEstimate{InitialSeats: 1}
}, func(inQueue bool) {
}, func() {
Expand Down

0 comments on commit 0ae6ef6

Please sign in to comment.