Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Don't hotloop on channel source closure #1362

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 7 additions & 1 deletion pkg/source/source.go
Expand Up @@ -274,7 +274,13 @@ func (cs *Channel) syncLoop(ctx context.Context) {
// Close destination channels
cs.doStop()
return
case evt := <-cs.Source:
case evt, stillOpen := <-cs.Source:
if !stillOpen {
// if the source channel is closed, we're never gonna get
// anything more on it, so stop & bail
cs.doStop()
return
}
cs.distribute(evt)
}
}
Expand Down
45 changes: 44 additions & 1 deletion pkg/source/source_test.go
Expand Up @@ -446,6 +446,50 @@ var _ = Describe("Source", func() {

close(done)
})
It("should stop when the source channel is closed", func() {
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
// if we didn't stop, we'd start spamming the queue with empty
// messages as we "received" a zero-valued GenericEvent from
// the source channel

By("creating a channel with one element, then closing it")
ch := make(chan event.GenericEvent, 1)
evt := event.GenericEvent{}
ch <- evt
close(ch)

By("feeding that channel to a channel source")
src := &source.Channel{Source: ch}
Expect(inject.StopChannelInto(ctx.Done(), src)).To(BeTrue())

processed := make(chan struct{})
defer close(processed)

err := src.Start(ctx, handler.Funcs{
CreateFunc: func(event.CreateEvent, workqueue.RateLimitingInterface) {
defer GinkgoRecover()
Fail("Unexpected CreateEvent")
},
UpdateFunc: func(event.UpdateEvent, workqueue.RateLimitingInterface) {
defer GinkgoRecover()
Fail("Unexpected UpdateEvent")
},
DeleteFunc: func(event.DeleteEvent, workqueue.RateLimitingInterface) {
defer GinkgoRecover()
Fail("Unexpected DeleteEvent")
},
GenericFunc: func(evt event.GenericEvent, q2 workqueue.RateLimitingInterface) {
defer GinkgoRecover()

processed <- struct{}{}
},
}, q)
Expect(err).NotTo(HaveOccurred())

By("expecting to only get one event")
Eventually(processed).Should(Receive())
Consistently(processed).ShouldNot(Receive())
})
It("should get error if no source specified", func(done Done) {
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
instance := &source.Channel{ /*no source specified*/ }
Expand All @@ -461,7 +505,6 @@ var _ = Describe("Source", func() {
Expect(err).To(Equal(fmt.Errorf("must call InjectStop on Channel before calling Start")))
close(done)
})

})
Context("for multi sources (handlers)", func() {
It("should provide GenericEvents for all handlers", func(done Done) {
Expand Down