Skip to content

Commit

Permalink
Merge pull request #1362 from DirectXMan12/bug/channel-source
Browse files Browse the repository at this point in the history
🐛 Don't hotloop on channel source closure
  • Loading branch information
k8s-ci-robot committed Jan 27, 2021
2 parents ad730f8 + 3b9d201 commit a763c9a
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 2 deletions.
8 changes: 7 additions & 1 deletion pkg/source/source.go
Expand Up @@ -274,7 +274,13 @@ func (cs *Channel) syncLoop(ctx context.Context) {
// Close destination channels
cs.doStop()
return
case evt := <-cs.Source:
case evt, stillOpen := <-cs.Source:
if !stillOpen {
// if the source channel is closed, we're never gonna get
// anything more on it, so stop & bail
cs.doStop()
return
}
cs.distribute(evt)
}
}
Expand Down
45 changes: 44 additions & 1 deletion pkg/source/source_test.go
Expand Up @@ -446,6 +446,50 @@ var _ = Describe("Source", func() {

close(done)
})
It("should stop when the source channel is closed", func() {
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
// if we didn't stop, we'd start spamming the queue with empty
// messages as we "received" a zero-valued GenericEvent from
// the source channel

By("creating a channel with one element, then closing it")
ch := make(chan event.GenericEvent, 1)
evt := event.GenericEvent{}
ch <- evt
close(ch)

By("feeding that channel to a channel source")
src := &source.Channel{Source: ch}
Expect(inject.StopChannelInto(ctx.Done(), src)).To(BeTrue())

processed := make(chan struct{})
defer close(processed)

err := src.Start(ctx, handler.Funcs{
CreateFunc: func(event.CreateEvent, workqueue.RateLimitingInterface) {
defer GinkgoRecover()
Fail("Unexpected CreateEvent")
},
UpdateFunc: func(event.UpdateEvent, workqueue.RateLimitingInterface) {
defer GinkgoRecover()
Fail("Unexpected UpdateEvent")
},
DeleteFunc: func(event.DeleteEvent, workqueue.RateLimitingInterface) {
defer GinkgoRecover()
Fail("Unexpected DeleteEvent")
},
GenericFunc: func(evt event.GenericEvent, q2 workqueue.RateLimitingInterface) {
defer GinkgoRecover()

processed <- struct{}{}
},
}, q)
Expect(err).NotTo(HaveOccurred())

By("expecting to only get one event")
Eventually(processed).Should(Receive())
Consistently(processed).ShouldNot(Receive())
})
It("should get error if no source specified", func(done Done) {
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
instance := &source.Channel{ /*no source specified*/ }
Expand All @@ -461,7 +505,6 @@ var _ = Describe("Source", func() {
Expect(err).To(Equal(fmt.Errorf("must call InjectStop on Channel before calling Start")))
close(done)
})

})
Context("for multi sources (handlers)", func() {
It("should provide GenericEvents for all handlers", func(done Done) {
Expand Down

0 comments on commit a763c9a

Please sign in to comment.