diff --git a/pkg/source/source.go b/pkg/source/source.go index 72e71e852a..c4a74af02a 100644 --- a/pkg/source/source.go +++ b/pkg/source/source.go @@ -274,7 +274,13 @@ func (cs *Channel) syncLoop(ctx context.Context) { // Close destination channels cs.doStop() return - case evt := <-cs.Source: + case evt, stillOpen := <-cs.Source: + if !stillOpen { + // if the source channel is closed, we're never gonna get + // anything more on it, so stop & bail + cs.doStop() + return + } cs.distribute(evt) } } diff --git a/pkg/source/source_test.go b/pkg/source/source_test.go index 1a0c6146d6..2726b6cfd0 100644 --- a/pkg/source/source_test.go +++ b/pkg/source/source_test.go @@ -446,6 +446,50 @@ var _ = Describe("Source", func() { close(done) }) + It("should stop when the source channel is closed", func() { + q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") + // if we didn't stop, we'd start spamming the queue with empty + // messages as we "received" a zero-valued GenericEvent from + // the source channel + + By("creating a channel with one element, then closing it") + ch := make(chan event.GenericEvent, 1) + evt := event.GenericEvent{} + ch <- evt + close(ch) + + By("feeding that channel to a channel source") + src := &source.Channel{Source: ch} + Expect(inject.StopChannelInto(ctx.Done(), src)).To(BeTrue()) + + processed := make(chan struct{}) + defer close(processed) + + err := src.Start(ctx, handler.Funcs{ + CreateFunc: func(event.CreateEvent, workqueue.RateLimitingInterface) { + defer GinkgoRecover() + Fail("Unexpected CreateEvent") + }, + UpdateFunc: func(event.UpdateEvent, workqueue.RateLimitingInterface) { + defer GinkgoRecover() + Fail("Unexpected UpdateEvent") + }, + DeleteFunc: func(event.DeleteEvent, workqueue.RateLimitingInterface) { + defer GinkgoRecover() + Fail("Unexpected DeleteEvent") + }, + GenericFunc: func(evt event.GenericEvent, q2 workqueue.RateLimitingInterface) { + defer GinkgoRecover() + + processed <- struct{}{} + }, + }, q) + Expect(err).NotTo(HaveOccurred()) + + By("expecting to only get one event") + Eventually(processed).Should(Receive()) + Consistently(processed).ShouldNot(Receive()) + }) It("should get error if no source specified", func(done Done) { q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") instance := &source.Channel{ /*no source specified*/ } @@ -461,7 +505,6 @@ var _ = Describe("Source", func() { Expect(err).To(Equal(fmt.Errorf("must call InjectStop on Channel before calling Start"))) close(done) }) - }) Context("for multi sources (handlers)", func() { It("should provide GenericEvents for all handlers", func(done Done) {