diff --git a/send_queue.go b/send_queue.go index 1fc8c1bf893..9eafcd374b0 100644 --- a/send_queue.go +++ b/send_queue.go @@ -36,6 +36,13 @@ func newSendQueue(conn sendConn) sender { func (h *sendQueue) Send(p *packetBuffer) { select { case h.queue <- p: + // clear available channel if we've reached capacity + if len(h.queue) == sendQueueCapacity { + select { + case <-h.available: + default: + } + } case <-h.runStopped: default: panic("sendQueue.Send would have blocked") diff --git a/send_queue_test.go b/send_queue_test.go index 275be06bc59..ce4279ac9c7 100644 --- a/send_queue_test.go +++ b/send_queue_test.go @@ -73,6 +73,48 @@ var _ = Describe("Send Queue", func() { Eventually(done).Should(BeClosed()) }) + It("signals when sending is possible again, when the first write succeeded", func() { + write := make(chan struct{}, 1) + written := make(chan struct{}, 100) + // now start sending out packets. This should free up queue space. + c.EXPECT().Write(gomock.Any()).DoAndReturn(func(b []byte) error { + <-write + written <- struct{}{} + return nil + }).AnyTimes() + // allow the first packet to be sent immediately + write <- struct{}{} + + done := make(chan struct{}) + go func() { + defer GinkgoRecover() + q.Run() + close(done) + }() + + q.Send(getPacket([]byte("foobar"))) + <-written + + // now fill up the send queue + for i := 0; i < sendQueueCapacity+1; i++ { + Expect(q.WouldBlock()).To(BeFalse()) + q.Send(getPacket([]byte("foobar"))) + } + + Expect(q.WouldBlock()).To(BeTrue()) + Consistently(q.Available()).ShouldNot(Receive()) + write <- struct{}{} + Eventually(q.Available()).Should(Receive()) + + // test shutdown + for i := 0; i < sendQueueCapacity; i++ { + write <- struct{}{} + } + + q.Close() + Eventually(done).Should(BeClosed()) + }) + It("does not block pending send after the queue has stopped running", func() { done := make(chan struct{}) go func() {