Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use ringbuffer in the framer #3874

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 8 additions & 8 deletions framer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/quic-go/quic-go/internal/ackhandler"
"github.com/quic-go/quic-go/internal/protocol"
"github.com/quic-go/quic-go/internal/utils/ringbuffer"
"github.com/quic-go/quic-go/internal/wire"
"github.com/quic-go/quic-go/quicvarint"
)
Expand All @@ -28,7 +29,7 @@ type framerI struct {
streamGetter streamGetter

activeStreams map[protocol.StreamID]struct{}
streamQueue []protocol.StreamID
streamQueue ringbuffer.RingBuffer[protocol.StreamID]

controlFrameMutex sync.Mutex
controlFrames []wire.Frame
Expand All @@ -45,7 +46,7 @@ func newFramer(streamGetter streamGetter) framer {

func (f *framerI) HasData() bool {
f.mutex.Lock()
hasData := len(f.streamQueue) > 0
hasData := !f.streamQueue.Empty()
f.mutex.Unlock()
if hasData {
return true
Expand Down Expand Up @@ -84,7 +85,7 @@ func (f *framerI) AppendControlFrames(frames []*ackhandler.Frame, maxLen protoco
func (f *framerI) AddActiveStream(id protocol.StreamID) {
f.mutex.Lock()
if _, ok := f.activeStreams[id]; !ok {
f.streamQueue = append(f.streamQueue, id)
f.streamQueue.PushBack(id)
f.activeStreams[id] = struct{}{}
}
f.mutex.Unlock()
Expand All @@ -95,13 +96,12 @@ func (f *framerI) AppendStreamFrames(frames []*ackhandler.Frame, maxLen protocol
var lastFrame *ackhandler.Frame
f.mutex.Lock()
// pop STREAM frames, until less than MinStreamFrameSize bytes are left in the packet
numActiveStreams := len(f.streamQueue)
numActiveStreams := f.streamQueue.Len()
for i := 0; i < numActiveStreams; i++ {
if protocol.MinStreamFrameSize+length > maxLen {
break
}
id := f.streamQueue[0]
f.streamQueue = f.streamQueue[1:]
id := f.streamQueue.PopFront()
// This should never return an error. Better check it anyway.
// The stream will only be in the streamQueue, if it enqueued itself there.
str, err := f.streamGetter.GetOrOpenSendStream(id)
Expand All @@ -117,7 +117,7 @@ func (f *framerI) AppendStreamFrames(frames []*ackhandler.Frame, maxLen protocol
remainingLen += quicvarint.Len(uint64(remainingLen))
frame, hasMoreData := str.popStreamFrame(remainingLen, v)
if hasMoreData { // put the stream back in the queue (at the end)
f.streamQueue = append(f.streamQueue, id)
f.streamQueue.PushBack(id)
} else { // no more data to send. Stream is not active any more
delete(f.activeStreams, id)
}
Expand Down Expand Up @@ -146,7 +146,7 @@ func (f *framerI) Handle0RTTRejection() error {
defer f.mutex.Unlock()

f.controlFrameMutex.Lock()
f.streamQueue = f.streamQueue[:0]
f.streamQueue.Clear()
for id := range f.activeStreams {
delete(f.activeStreams, id)
}
Expand Down
75 changes: 75 additions & 0 deletions internal/utils/ringbuffer/ringbuffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package ringbuffer

type RingBuffer[T any] struct {
ring []T
headPos, tailPos int
full bool
}

func (r *RingBuffer[T]) Init(size int) {
r.ring = make([]T, size)
}

func (r *RingBuffer[T]) Len() int {
if r.full {
return len(r.ring)
}
if r.tailPos >= r.headPos {
return r.tailPos - r.headPos
}
return r.tailPos - r.headPos + len(r.ring)
}

func (r *RingBuffer[T]) Empty() bool {
return !r.full && r.headPos == r.tailPos
}

func (r *RingBuffer[T]) PushBack(t T) {
if r.full || len(r.ring) == 0 {
r.grow()
}
r.ring[r.tailPos] = t
r.tailPos++
if r.tailPos == len(r.ring) {
r.tailPos = 0
}
if r.tailPos == r.headPos {
r.full = true
}
}

func (r *RingBuffer[T]) PopFront() T {
if r.Empty() {
panic("github.com/quic-go/quic-go/internal/utils/ringbuffer: pop from an empty queue")
}
r.full = false
t := r.ring[r.headPos]
r.ring[r.headPos] = *new(T)
r.headPos++
if r.headPos == len(r.ring) {
r.headPos = 0
}
return t
}

// Grow the maximum size of the queue.
// This method assume the queue is full.
func (r *RingBuffer[T]) grow() {
oldRing := r.ring
newSize := len(oldRing) * 2
if newSize == 0 {
newSize = 1
}
r.ring = make([]T, newSize)
headLen := copy(r.ring, oldRing[r.headPos:])
copy(r.ring[headLen:], oldRing[:r.headPos])
r.headPos, r.tailPos, r.full = 0, len(oldRing), false
}

func (r *RingBuffer[T]) Clear() {
var zeroValue T
for i := range r.ring {
r.ring[i] = zeroValue
}
r.headPos, r.tailPos, r.full = 0, 0, false
}
12 changes: 12 additions & 0 deletions internal/utils/ringbuffer/ringbuffer_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package ringbuffer

import "testing"

func BenchmarkRingBuffer(b *testing.B) {
r := RingBuffer[int]{}
b.ResetTimer()
for i := 0; i < b.N; i++ {
r.PushBack(i)
r.PopFront()
}
}
13 changes: 13 additions & 0 deletions internal/utils/ringbuffer/ringbuffer_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package ringbuffer

import (
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

func TestTestdata(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "ringbuffer suite")
}
38 changes: 38 additions & 0 deletions internal/utils/ringbuffer/ringbuffer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package ringbuffer

import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

var _ = Describe("RingBuffer", func() {
It("push and pop", func() {
r := RingBuffer[int]{}
Expect(len(r.ring)).To(Equal(0))
Expect(func() { r.PopFront() }).To(Panic())
r.PushBack(1)
r.PushBack(2)
r.PushBack(3)
Expect(r.PopFront()).To(Equal(1))
Expect(r.PopFront()).To(Equal(2))
r.PushBack(4)
r.PushBack(5)
Expect(r.Len()).To(Equal(3))
r.PushBack(6)
Expect(r.Len()).To(Equal(4))
Expect(r.PopFront()).To(Equal(3))
Expect(r.PopFront()).To(Equal(4))
Expect(r.PopFront()).To(Equal(5))
Expect(r.PopFront()).To(Equal(6))
})
It("clear", func() {
r := RingBuffer[int]{}
r.Init(2)
r.PushBack(1)
r.PushBack(2)
Expect(r.full).To(BeTrue())
r.Clear()
Expect(r.full).To(BeFalse())
Expect(r.Len()).To(Equal(0))
})
})