Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use a Peek / Pop API for the datagram queue #3582

Merged
merged 1 commit into from Oct 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion connection.go
Expand Up @@ -542,7 +542,7 @@ func (s *connection) preSetup() {
s.creationTime = now

s.windowUpdateQueue = newWindowUpdateQueue(s.streamsMap, s.connFlowController, s.framer.QueueControlFrame)
s.datagramQueue = newDatagramQueue(s.scheduleSending, s.logger, s.version)
s.datagramQueue = newDatagramQueue(s.scheduleSending, s.logger)
}

// run the connection main loop
Expand Down
52 changes: 22 additions & 30 deletions datagram_queue.go
@@ -1,18 +1,14 @@
package quic

import (
"sync"

"github.com/lucas-clemente/quic-go/internal/protocol"
"github.com/lucas-clemente/quic-go/internal/utils"
"github.com/lucas-clemente/quic-go/internal/wire"
)

type datagramQueue struct {
mx sync.Mutex
nextFrameSize protocol.ByteCount

sendQueue chan *wire.DatagramFrame
nextFrame *wire.DatagramFrame
rcvQueue chan []byte

closeErr error
Expand All @@ -22,20 +18,17 @@ type datagramQueue struct {

dequeued chan struct{}

logger utils.Logger
version protocol.VersionNumber
logger utils.Logger
}

func newDatagramQueue(hasData func(), logger utils.Logger, v protocol.VersionNumber) *datagramQueue {
func newDatagramQueue(hasData func(), logger utils.Logger) *datagramQueue {
return &datagramQueue{
hasData: hasData,
sendQueue: make(chan *wire.DatagramFrame, 1),
nextFrameSize: protocol.InvalidByteCount,
rcvQueue: make(chan []byte, protocol.DatagramRcvQueueLen),
dequeued: make(chan struct{}),
closed: make(chan struct{}),
logger: logger,
version: v,
hasData: hasData,
sendQueue: make(chan *wire.DatagramFrame, 1),
rcvQueue: make(chan []byte, protocol.DatagramRcvQueueLen),
dequeued: make(chan struct{}),
closed: make(chan struct{}),
logger: logger,
}
}

Expand All @@ -44,9 +37,6 @@ func newDatagramQueue(hasData func(), logger utils.Logger, v protocol.VersionNum
func (h *datagramQueue) AddAndWait(f *wire.DatagramFrame) error {
select {
case h.sendQueue <- f:
h.mx.Lock()
h.nextFrameSize = f.Length(h.version)
h.mx.Unlock()
h.hasData()
case <-h.closed:
return h.closeErr
Expand All @@ -60,24 +50,26 @@ func (h *datagramQueue) AddAndWait(f *wire.DatagramFrame) error {
}
}

// Get dequeues a DATAGRAM frame for sending.
func (h *datagramQueue) Get() *wire.DatagramFrame {
// Peek gets the next DATAGRAM frame for sending.
// If actually sent out, Pop needs to be called before the next call to Peek.
func (h *datagramQueue) Peek() *wire.DatagramFrame {
if h.nextFrame != nil {
return h.nextFrame
}
select {
case f := <-h.sendQueue:
h.mx.Lock()
h.nextFrameSize = protocol.InvalidByteCount
h.mx.Unlock()
case h.nextFrame = <-h.sendQueue:
h.dequeued <- struct{}{}
return f
default:
return nil
}
return h.nextFrame
}

func (h *datagramQueue) NextFrameSize() protocol.ByteCount {
h.mx.Lock()
defer h.mx.Unlock()
return h.nextFrameSize
func (h *datagramQueue) Pop() {
if h.nextFrame == nil {
panic("datagramQueue BUG: Pop called for nil frame")
}
h.nextFrame = nil
}

// HandleDatagramFrame handles a received DATAGRAM frame.
Expand Down
38 changes: 26 additions & 12 deletions datagram_queue_test.go
Expand Up @@ -3,7 +3,6 @@ package quic
import (
"errors"

"github.com/lucas-clemente/quic-go/internal/protocol"
"github.com/lucas-clemente/quic-go/internal/utils"
"github.com/lucas-clemente/quic-go/internal/wire"

Expand All @@ -17,15 +16,12 @@ var _ = Describe("Datagram Queue", func() {

BeforeEach(func() {
queued = make(chan struct{}, 100)
queue = newDatagramQueue(func() {
queued <- struct{}{}
}, utils.DefaultLogger, protocol.Version1)
queue = newDatagramQueue(func() { queued <- struct{}{} }, utils.DefaultLogger)
})

Context("sending", func() {
It("returns nil when there's no datagram to send", func() {
Expect(queue.NextFrameSize()).To(Equal(protocol.InvalidByteCount))
Expect(queue.Get()).To(BeNil())
Expect(queue.Peek()).To(BeNil())
})

It("queues a datagram", func() {
Expand All @@ -39,14 +35,32 @@ var _ = Describe("Datagram Queue", func() {

Eventually(queued).Should(HaveLen(1))
Consistently(done).ShouldNot(BeClosed())
l := queue.NextFrameSize()
f := queue.Get()
Expect(l).To(Equal(f.Length(protocol.Version1)))
Expect(queue.NextFrameSize()).To(Equal(protocol.InvalidByteCount))
Expect(f).ToNot(BeNil())
f := queue.Peek()
Expect(f.Data).To(Equal([]byte("foobar")))
Eventually(done).Should(BeClosed())
Expect(queue.Get()).To(BeNil())
queue.Pop()
Expect(queue.Peek()).To(BeNil())
})

It("returns the same datagram multiple times, when Pop isn't called", func() {
sent := make(chan struct{}, 1)
go func() {
defer GinkgoRecover()
Expect(queue.AddAndWait(&wire.DatagramFrame{Data: []byte("foo")})).To(Succeed())
sent <- struct{}{}
Expect(queue.AddAndWait(&wire.DatagramFrame{Data: []byte("bar")})).To(Succeed())
sent <- struct{}{}
}()

Eventually(queued).Should(HaveLen(1))
f := queue.Peek()
Expect(f.Data).To(Equal([]byte("foo")))
Eventually(sent).Should(Receive())
Expect(queue.Peek()).To(Equal(f))
Expect(queue.Peek()).To(Equal(f))
queue.Pop()
f = queue.Peek()
Expect(f.Data).To(Equal([]byte("bar")))
})

It("closes", func() {
Expand Down
21 changes: 10 additions & 11 deletions packet_packer.go
Expand Up @@ -592,18 +592,17 @@ func (p *packetPacker) composeNextPacket(maxFrameSize protocol.ByteCount, onlyAc
}

if p.datagramQueue != nil {
size := p.datagramQueue.NextFrameSize()
if size > 0 && size <= maxFrameSize-payload.length {
datagram := p.datagramQueue.Get()
if datagram == nil || datagram.Length(p.version) != size {
panic("packet packer BUG: inconsistent DATAGRAM frame length")
if f := p.datagramQueue.Peek(); f != nil {
size := f.Length(p.version)
if size <= maxFrameSize-payload.length {
payload.frames = append(payload.frames, ackhandler.Frame{
Frame: f,
// set it to a no-op. Then we won't set the default callback, which would retransmit the frame.
OnLost: func(wire.Frame) {},
})
payload.length += size
p.datagramQueue.Pop()
}
payload.frames = append(payload.frames, ackhandler.Frame{
Frame: datagram,
// set it to a no-op. Then we won't set the default callback, which would retransmit the frame.
OnLost: func(wire.Frame) {},
})
payload.length += datagram.Length(p.version)
}
}

Expand Down
3 changes: 1 addition & 2 deletions packet_packer_test.go
Expand Up @@ -91,7 +91,7 @@ var _ = Describe("Packet packer", func() {
ackFramer = NewMockAckFrameSource(mockCtrl)
sealingManager = NewMockSealingManager(mockCtrl)
pnManager = mockackhandler.NewMockSentPacketHandler(mockCtrl)
datagramQueue = newDatagramQueue(func() {}, utils.DefaultLogger, version)
datagramQueue = newDatagramQueue(func() {}, utils.DefaultLogger)

packer = newPacketPacker(
protocol.ParseConnectionID([]byte{1, 2, 3, 4, 5, 6, 7, 8}),
Expand Down Expand Up @@ -634,7 +634,6 @@ var _ = Describe("Packet packer", func() {
Expect(p.ack).ToNot(BeNil())
Expect(p.frames).To(BeEmpty())
Expect(p.buffer.Data).ToNot(BeEmpty())
Expect(done).ToNot(BeClosed())
datagramQueue.CloseWithError(nil)
Eventually(done).Should(BeClosed())
})
Expand Down