Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix availability signaling of the send queue #3597

Merged
merged 1 commit into from Oct 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions send_queue.go
Expand Up @@ -36,6 +36,13 @@ func newSendQueue(conn sendConn) sender {
func (h *sendQueue) Send(p *packetBuffer) {
select {
case h.queue <- p:
// clear available channel if we've reached capacity
if len(h.queue) == sendQueueCapacity {
select {
case <-h.available:
default:
}
}
case <-h.runStopped:
default:
panic("sendQueue.Send would have blocked")
Expand Down
42 changes: 42 additions & 0 deletions send_queue_test.go
Expand Up @@ -73,6 +73,48 @@ var _ = Describe("Send Queue", func() {
Eventually(done).Should(BeClosed())
})

It("signals when sending is possible again, when the first write succeeded", func() {
write := make(chan struct{}, 1)
written := make(chan struct{}, 100)
// now start sending out packets. This should free up queue space.
c.EXPECT().Write(gomock.Any()).DoAndReturn(func(b []byte) error {
<-write
written <- struct{}{}
return nil
}).AnyTimes()
// allow the first packet to be sent immediately
write <- struct{}{}

done := make(chan struct{})
go func() {
defer GinkgoRecover()
q.Run()
close(done)
}()

q.Send(getPacket([]byte("foobar")))
<-written

// now fill up the send queue
for i := 0; i < sendQueueCapacity+1; i++ {
Expect(q.WouldBlock()).To(BeFalse())
q.Send(getPacket([]byte("foobar")))
}

Expect(q.WouldBlock()).To(BeTrue())
Consistently(q.Available()).ShouldNot(Receive())
write <- struct{}{}
Eventually(q.Available()).Should(Receive())

// test shutdown
for i := 0; i < sendQueueCapacity; i++ {
write <- struct{}{}
}

q.Close()
Eventually(done).Should(BeClosed())
})

It("does not block pending send after the queue has stopped running", func() {
done := make(chan struct{})
go func() {
Expand Down