From 1f6a9ecafd9dbe55833469300c0d574fdf949fb9 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Tue, 11 Oct 2022 12:25:17 +0300 Subject: [PATCH] use a Peek / Pop API for the datagram queue (#3582) --- connection.go | 2 +- datagram_queue.go | 52 ++++++++++++++++++------------------------ datagram_queue_test.go | 38 ++++++++++++++++++++---------- packet_packer.go | 21 ++++++++--------- packet_packer_test.go | 3 +-- 5 files changed, 60 insertions(+), 56 deletions(-) diff --git a/connection.go b/connection.go index d9ee5fd7baa..9227beb2a13 100644 --- a/connection.go +++ b/connection.go @@ -542,7 +542,7 @@ func (s *connection) preSetup() { s.creationTime = now s.windowUpdateQueue = newWindowUpdateQueue(s.streamsMap, s.connFlowController, s.framer.QueueControlFrame) - s.datagramQueue = newDatagramQueue(s.scheduleSending, s.logger, s.version) + s.datagramQueue = newDatagramQueue(s.scheduleSending, s.logger) } // run the connection main loop diff --git a/datagram_queue.go b/datagram_queue.go index acc7e1a184d..7bedfe6556c 100644 --- a/datagram_queue.go +++ b/datagram_queue.go @@ -1,18 +1,14 @@ package quic import ( - "sync" - "github.com/lucas-clemente/quic-go/internal/protocol" "github.com/lucas-clemente/quic-go/internal/utils" "github.com/lucas-clemente/quic-go/internal/wire" ) type datagramQueue struct { - mx sync.Mutex - nextFrameSize protocol.ByteCount - sendQueue chan *wire.DatagramFrame + nextFrame *wire.DatagramFrame rcvQueue chan []byte closeErr error @@ -22,20 +18,17 @@ type datagramQueue struct { dequeued chan struct{} - logger utils.Logger - version protocol.VersionNumber + logger utils.Logger } -func newDatagramQueue(hasData func(), logger utils.Logger, v protocol.VersionNumber) *datagramQueue { +func newDatagramQueue(hasData func(), logger utils.Logger) *datagramQueue { return &datagramQueue{ - hasData: hasData, - sendQueue: make(chan *wire.DatagramFrame, 1), - nextFrameSize: protocol.InvalidByteCount, - rcvQueue: make(chan []byte, protocol.DatagramRcvQueueLen), - dequeued: make(chan struct{}), - closed: make(chan struct{}), - logger: logger, - version: v, + hasData: hasData, + sendQueue: make(chan *wire.DatagramFrame, 1), + rcvQueue: make(chan []byte, protocol.DatagramRcvQueueLen), + dequeued: make(chan struct{}), + closed: make(chan struct{}), + logger: logger, } } @@ -44,9 +37,6 @@ func newDatagramQueue(hasData func(), logger utils.Logger, v protocol.VersionNum func (h *datagramQueue) AddAndWait(f *wire.DatagramFrame) error { select { case h.sendQueue <- f: - h.mx.Lock() - h.nextFrameSize = f.Length(h.version) - h.mx.Unlock() h.hasData() case <-h.closed: return h.closeErr @@ -60,24 +50,26 @@ func (h *datagramQueue) AddAndWait(f *wire.DatagramFrame) error { } } -// Get dequeues a DATAGRAM frame for sending. -func (h *datagramQueue) Get() *wire.DatagramFrame { +// Peek gets the next DATAGRAM frame for sending. +// If actually sent out, Pop needs to be called before the next call to Peek. +func (h *datagramQueue) Peek() *wire.DatagramFrame { + if h.nextFrame != nil { + return h.nextFrame + } select { - case f := <-h.sendQueue: - h.mx.Lock() - h.nextFrameSize = protocol.InvalidByteCount - h.mx.Unlock() + case h.nextFrame = <-h.sendQueue: h.dequeued <- struct{}{} - return f default: return nil } + return h.nextFrame } -func (h *datagramQueue) NextFrameSize() protocol.ByteCount { - h.mx.Lock() - defer h.mx.Unlock() - return h.nextFrameSize +func (h *datagramQueue) Pop() { + if h.nextFrame == nil { + panic("datagramQueue BUG: Pop called for nil frame") + } + h.nextFrame = nil } // HandleDatagramFrame handles a received DATAGRAM frame. diff --git a/datagram_queue_test.go b/datagram_queue_test.go index 4de5c1bf086..755a7dc78e2 100644 --- a/datagram_queue_test.go +++ b/datagram_queue_test.go @@ -3,7 +3,6 @@ package quic import ( "errors" - "github.com/lucas-clemente/quic-go/internal/protocol" "github.com/lucas-clemente/quic-go/internal/utils" "github.com/lucas-clemente/quic-go/internal/wire" @@ -17,15 +16,12 @@ var _ = Describe("Datagram Queue", func() { BeforeEach(func() { queued = make(chan struct{}, 100) - queue = newDatagramQueue(func() { - queued <- struct{}{} - }, utils.DefaultLogger, protocol.Version1) + queue = newDatagramQueue(func() { queued <- struct{}{} }, utils.DefaultLogger) }) Context("sending", func() { It("returns nil when there's no datagram to send", func() { - Expect(queue.NextFrameSize()).To(Equal(protocol.InvalidByteCount)) - Expect(queue.Get()).To(BeNil()) + Expect(queue.Peek()).To(BeNil()) }) It("queues a datagram", func() { @@ -39,14 +35,32 @@ var _ = Describe("Datagram Queue", func() { Eventually(queued).Should(HaveLen(1)) Consistently(done).ShouldNot(BeClosed()) - l := queue.NextFrameSize() - f := queue.Get() - Expect(l).To(Equal(f.Length(protocol.Version1))) - Expect(queue.NextFrameSize()).To(Equal(protocol.InvalidByteCount)) - Expect(f).ToNot(BeNil()) + f := queue.Peek() Expect(f.Data).To(Equal([]byte("foobar"))) Eventually(done).Should(BeClosed()) - Expect(queue.Get()).To(BeNil()) + queue.Pop() + Expect(queue.Peek()).To(BeNil()) + }) + + It("returns the same datagram multiple times, when Pop isn't called", func() { + sent := make(chan struct{}, 1) + go func() { + defer GinkgoRecover() + Expect(queue.AddAndWait(&wire.DatagramFrame{Data: []byte("foo")})).To(Succeed()) + sent <- struct{}{} + Expect(queue.AddAndWait(&wire.DatagramFrame{Data: []byte("bar")})).To(Succeed()) + sent <- struct{}{} + }() + + Eventually(queued).Should(HaveLen(1)) + f := queue.Peek() + Expect(f.Data).To(Equal([]byte("foo"))) + Eventually(sent).Should(Receive()) + Expect(queue.Peek()).To(Equal(f)) + Expect(queue.Peek()).To(Equal(f)) + queue.Pop() + f = queue.Peek() + Expect(f.Data).To(Equal([]byte("bar"))) }) It("closes", func() { diff --git a/packet_packer.go b/packet_packer.go index 8b1d4772351..378e57665db 100644 --- a/packet_packer.go +++ b/packet_packer.go @@ -592,18 +592,17 @@ func (p *packetPacker) composeNextPacket(maxFrameSize protocol.ByteCount, onlyAc } if p.datagramQueue != nil { - size := p.datagramQueue.NextFrameSize() - if size > 0 && size <= maxFrameSize-payload.length { - datagram := p.datagramQueue.Get() - if datagram == nil || datagram.Length(p.version) != size { - panic("packet packer BUG: inconsistent DATAGRAM frame length") + if f := p.datagramQueue.Peek(); f != nil { + size := f.Length(p.version) + if size <= maxFrameSize-payload.length { + payload.frames = append(payload.frames, ackhandler.Frame{ + Frame: f, + // set it to a no-op. Then we won't set the default callback, which would retransmit the frame. + OnLost: func(wire.Frame) {}, + }) + payload.length += size + p.datagramQueue.Pop() } - payload.frames = append(payload.frames, ackhandler.Frame{ - Frame: datagram, - // set it to a no-op. Then we won't set the default callback, which would retransmit the frame. - OnLost: func(wire.Frame) {}, - }) - payload.length += datagram.Length(p.version) } } diff --git a/packet_packer_test.go b/packet_packer_test.go index e1e9cb95125..34656c9004a 100644 --- a/packet_packer_test.go +++ b/packet_packer_test.go @@ -91,7 +91,7 @@ var _ = Describe("Packet packer", func() { ackFramer = NewMockAckFrameSource(mockCtrl) sealingManager = NewMockSealingManager(mockCtrl) pnManager = mockackhandler.NewMockSentPacketHandler(mockCtrl) - datagramQueue = newDatagramQueue(func() {}, utils.DefaultLogger, version) + datagramQueue = newDatagramQueue(func() {}, utils.DefaultLogger) packer = newPacketPacker( protocol.ParseConnectionID([]byte{1, 2, 3, 4, 5, 6, 7, 8}), @@ -634,7 +634,6 @@ var _ = Describe("Packet packer", func() { Expect(p.ack).ToNot(BeNil()) Expect(p.frames).To(BeEmpty()) Expect(p.buffer.Data).ToNot(BeEmpty()) - Expect(done).ToNot(BeClosed()) datagramQueue.CloseWithError(nil) Eventually(done).Should(BeClosed()) })