Skip to content

Commit

Permalink
Do away with draining, discard error if write fails (#423)
Browse files Browse the repository at this point in the history
  • Loading branch information
lestrrat committed Jul 30, 2021
1 parent c76a3f5 commit 52266f3
Showing 1 changed file with 8 additions and 34 deletions.
42 changes: 8 additions & 34 deletions jwk/refresh.go
Expand Up @@ -26,12 +26,11 @@ import (
// All JWKS objects that are retrieved via the auto-fetch mechanism should be
// treated read-only, as they are shared among the consumers and this object.
type AutoRefresh struct {
errDst chan AutoRefreshError // user-specified error sink
errSink chan AutoRefreshError // AutoRefresh's error sink
errSink chan AutoRefreshError
cache map[string]Set
configureCh chan struct{}
fetching map[string]chan struct{}
muErrDst sync.Mutex
muErrSink sync.Mutex
muCache sync.RWMutex
muFetching sync.Mutex
muRegistry sync.RWMutex
Expand Down Expand Up @@ -111,15 +110,13 @@ type resetTimerReq struct {
// }
func NewAutoRefresh(ctx context.Context) *AutoRefresh {
af := &AutoRefresh{
errSink: make(chan AutoRefreshError, 1),
cache: make(map[string]Set),
configureCh: make(chan struct{}),
fetching: make(map[string]chan struct{}),
registry: make(map[string]*target),
resetTimerCh: make(chan *resetTimerReq),
}
go af.refreshLoop(ctx)
go af.drainErrSink(ctx)
return af
}

Expand Down Expand Up @@ -485,13 +482,11 @@ func (af *AutoRefresh) doRefreshRequest(ctx context.Context, url string, enableB
// At this point if err != nil, we know that there was something wrong
// in either the fetching or the parsing. Send this error to be processed,
// but take the extra mileage to not block regular processing by
// sending the error to a "proxy" sink, and not directly at the user-specified sink
// (see drainErrSink)
if err != nil && af.errSink != nil {
// discarding the error if we fail to send it through the channel
if err != nil {
select {
case af.errSink <- AutoRefreshError{Error: err, URL: url}:
default:
panic("af.errSink is not draining")
}
}

Expand All @@ -514,38 +509,17 @@ func (af *AutoRefresh) doRefreshRequest(ctx context.Context, url string, enableB
return err
}

// drainErrSink is used proxy the errors that were sent to the main
// error sink (af.errSink) to the user specified error sink
func (af *AutoRefresh) drainErrSink(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case err := <-af.errSink:
af.muErrDst.Lock()
dst := af.errDst
af.muErrDst.Unlock()
if dst != nil {
// This will block if the user isn't properly draining the channel.
// It is the user's responsibility to drain it once they
// requested the errors to be streamed
dst <- err
}
}
}
}

// ErrorSink sets a channel to receive JWK fetch errors, if any.
// Only the errors that occurred *after* the channel was set will be sent.
//
// The user is responsible for properly draining the channel. If the channel
// is not drained, the fetch operation will block on repeated errors.
// is not drained properly, errors will be discarded.
//
// To disable, set a nil channel.
func (af *AutoRefresh) ErrorSink(ch chan AutoRefreshError) {
af.muErrDst.Lock()
af.errDst = ch
af.muErrDst.Unlock()
af.muErrSink.Lock()
af.errSink = ch
af.muErrSink.Unlock()
}

func calculateRefreshDuration(res *http.Response, refreshInterval *time.Duration, minRefreshInterval time.Duration) time.Duration {
Expand Down

0 comments on commit 52266f3

Please sign in to comment.