Skip to content

Commit

Permalink
http3: reduce usage of bytes.Buffer (#3539)
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann committed Sep 1, 2022
1 parent dfd35cb commit 62b8278
Show file tree
Hide file tree
Showing 10 changed files with 136 additions and 149 deletions.
9 changes: 4 additions & 5 deletions http3/client.go
@@ -1,7 +1,6 @@
package http3

import (
"bytes"
"context"
"crypto/tls"
"errors"
Expand Down Expand Up @@ -136,11 +135,11 @@ func (c *client) setupConn() error {
if err != nil {
return err
}
buf := &bytes.Buffer{}
quicvarint.Write(buf, streamTypeControlStream)
b := make([]byte, 0, 64)
b = quicvarint.Append(b, streamTypeControlStream)
// send the SETTINGS frame
(&settingsFrame{Datagram: c.opts.EnableDatagram, Other: c.opts.AdditionalSettings}).Write(buf)
_, err = str.Write(buf.Bytes())
b = (&settingsFrame{Datagram: c.opts.EnableDatagram, Other: c.opts.AdditionalSettings}).Append(b)
_, err = str.Write(b)
return err
}

Expand Down
65 changes: 31 additions & 34 deletions http3/client_test.go
Expand Up @@ -455,11 +455,11 @@ var _ = Describe("Client", func() {
})

It("parses the SETTINGS frame", func() {
buf := &bytes.Buffer{}
quicvarint.Write(buf, streamTypeControlStream)
(&settingsFrame{}).Write(buf)
b := quicvarint.Append(nil, streamTypeControlStream)
b = (&settingsFrame{}).Append(b)
r := bytes.NewReader(b)
controlStr := mockquic.NewMockStream(mockCtrl)
controlStr.EXPECT().Read(gomock.Any()).DoAndReturn(buf.Read).AnyTimes()
controlStr.EXPECT().Read(gomock.Any()).DoAndReturn(r.Read).AnyTimes()
conn.EXPECT().AcceptUniStream(gomock.Any()).DoAndReturn(func(context.Context) (quic.ReceiveStream, error) {
return controlStr, nil
})
Expand Down Expand Up @@ -521,11 +521,11 @@ var _ = Describe("Client", func() {
})

It("errors when the first frame on the control stream is not a SETTINGS frame", func() {
buf := &bytes.Buffer{}
quicvarint.Write(buf, streamTypeControlStream)
(&dataFrame{}).Write(buf)
b := quicvarint.Append(nil, streamTypeControlStream)
b = (&dataFrame{}).Append(b)
r := bytes.NewReader(b)
controlStr := mockquic.NewMockStream(mockCtrl)
controlStr.EXPECT().Read(gomock.Any()).DoAndReturn(buf.Read).AnyTimes()
controlStr.EXPECT().Read(gomock.Any()).DoAndReturn(r.Read).AnyTimes()
conn.EXPECT().AcceptUniStream(gomock.Any()).DoAndReturn(func(context.Context) (quic.ReceiveStream, error) {
return controlStr, nil
})
Expand All @@ -545,13 +545,11 @@ var _ = Describe("Client", func() {
})

It("errors when parsing the frame on the control stream fails", func() {
buf := &bytes.Buffer{}
quicvarint.Write(buf, streamTypeControlStream)
b := &bytes.Buffer{}
(&settingsFrame{}).Write(b)
buf.Write(b.Bytes()[:b.Len()-1])
b := quicvarint.Append(nil, streamTypeControlStream)
b = (&settingsFrame{}).Append(b)
r := bytes.NewReader(b[:len(b)-1])
controlStr := mockquic.NewMockStream(mockCtrl)
controlStr.EXPECT().Read(gomock.Any()).DoAndReturn(buf.Read).AnyTimes()
controlStr.EXPECT().Read(gomock.Any()).DoAndReturn(r.Read).AnyTimes()
conn.EXPECT().AcceptUniStream(gomock.Any()).DoAndReturn(func(context.Context) (quic.ReceiveStream, error) {
return controlStr, nil
})
Expand Down Expand Up @@ -595,11 +593,11 @@ var _ = Describe("Client", func() {

It("errors when the server advertises datagram support (and we enabled support for it)", func() {
client.opts.EnableDatagram = true
buf := &bytes.Buffer{}
quicvarint.Write(buf, streamTypeControlStream)
(&settingsFrame{Datagram: true}).Write(buf)
b := quicvarint.Append(nil, streamTypeControlStream)
b = (&settingsFrame{Datagram: true}).Append(b)
r := bytes.NewReader(b)
controlStr := mockquic.NewMockStream(mockCtrl)
controlStr.EXPECT().Read(gomock.Any()).DoAndReturn(buf.Read).AnyTimes()
controlStr.EXPECT().Read(gomock.Any()).DoAndReturn(r.Read).AnyTimes()
conn.EXPECT().AcceptUniStream(gomock.Any()).DoAndReturn(func(context.Context) (quic.ReceiveStream, error) {
return controlStr, nil
})
Expand Down Expand Up @@ -631,16 +629,15 @@ var _ = Describe("Client", func() {
testDone := make(chan struct{})

getHeadersFrame := func(headers map[string]string) []byte {
buf := &bytes.Buffer{}
headerBuf := &bytes.Buffer{}
enc := qpack.NewEncoder(headerBuf)
for name, value := range headers {
Expect(enc.WriteField(qpack.HeaderField{Name: name, Value: value})).To(Succeed())
}
Expect(enc.Close()).To(Succeed())
(&headersFrame{Length: uint64(headerBuf.Len())}).Write(buf)
buf.Write(headerBuf.Bytes())
return buf.Bytes()
b := (&headersFrame{Length: uint64(headerBuf.Len())}).Append(nil)
b = append(b, headerBuf.Bytes()...)
return b
}

decodeHeader := func(str io.Reader) map[string]string {
Expand Down Expand Up @@ -805,43 +802,43 @@ var _ = Describe("Client", func() {

It("sets the Content-Length", func() {
done := make(chan struct{})
buf := &bytes.Buffer{}
buf.Write(getHeadersFrame(map[string]string{
b := getHeadersFrame(map[string]string{
":status": "200",
"Content-Length": "1337",
}))
(&dataFrame{Length: 0x6}).Write(buf)
buf.Write([]byte("foobar"))
})
b = (&dataFrame{Length: 0x6}).Append(b)
b = append(b, []byte("foobar")...)
r := bytes.NewReader(b)
str.EXPECT().Close().Do(func() { close(done) })
conn.EXPECT().ConnectionState().Return(quic.ConnectionState{})
str.EXPECT().CancelWrite(gomock.Any()).MaxTimes(1) // when reading the response errors
// the response body is sent asynchronously, while already reading the response
str.EXPECT().Read(gomock.Any()).DoAndReturn(buf.Read).AnyTimes()
str.EXPECT().Read(gomock.Any()).DoAndReturn(r.Read).AnyTimes()
req, err := client.RoundTripOpt(req, RoundTripOpt{})
Expect(err).ToNot(HaveOccurred())
Expect(req.ContentLength).To(BeEquivalentTo(1337))
Eventually(done).Should(BeClosed())
})

It("closes the connection when the first frame is not a HEADERS frame", func() {
buf := &bytes.Buffer{}
(&dataFrame{Length: 0x42}).Write(buf)
b := (&dataFrame{Length: 0x42}).Append(nil)
conn.EXPECT().CloseWithError(quic.ApplicationErrorCode(errorFrameUnexpected), gomock.Any())
closed := make(chan struct{})
r := bytes.NewReader(b)
str.EXPECT().Close().Do(func() { close(closed) })
str.EXPECT().Read(gomock.Any()).DoAndReturn(buf.Read).AnyTimes()
str.EXPECT().Read(gomock.Any()).DoAndReturn(r.Read).AnyTimes()
_, err := client.RoundTripOpt(req, RoundTripOpt{})
Expect(err).To(MatchError("expected first frame to be a HEADERS frame"))
Eventually(closed).Should(BeClosed())
})

It("cancels the stream when the HEADERS frame is too large", func() {
buf := &bytes.Buffer{}
(&headersFrame{Length: 1338}).Write(buf)
b := (&headersFrame{Length: 1338}).Append(nil)
r := bytes.NewReader(b)
str.EXPECT().CancelWrite(quic.StreamErrorCode(errorFrameError))
closed := make(chan struct{})
str.EXPECT().Close().Do(func() { close(closed) })
str.EXPECT().Read(gomock.Any()).DoAndReturn(buf.Read).AnyTimes()
str.EXPECT().Read(gomock.Any()).DoAndReturn(r.Read).AnyTimes()
_, err := client.RoundTripOpt(req, RoundTripOpt{})
Expect(err).To(MatchError("HEADERS frame too large: 1338 bytes (max: 1337)"))
Eventually(closed).Should(BeClosed())
Expand Down
27 changes: 14 additions & 13 deletions http3/frames.go
Expand Up @@ -74,18 +74,18 @@ type dataFrame struct {
Length uint64
}

func (f *dataFrame) Write(b *bytes.Buffer) {
quicvarint.Write(b, 0x0)
quicvarint.Write(b, f.Length)
func (f *dataFrame) Append(b []byte) []byte {
b = quicvarint.Append(b, 0x0)
return quicvarint.Append(b, f.Length)
}

type headersFrame struct {
Length uint64
}

func (f *headersFrame) Write(b *bytes.Buffer) {
quicvarint.Write(b, 0x1)
quicvarint.Write(b, f.Length)
func (f *headersFrame) Append(b []byte) []byte {
b = quicvarint.Append(b, 0x1)
return quicvarint.Append(b, f.Length)
}

const settingDatagram = 0xffd277
Expand Down Expand Up @@ -142,22 +142,23 @@ func parseSettingsFrame(r io.Reader, l uint64) (*settingsFrame, error) {
return frame, nil
}

func (f *settingsFrame) Write(b *bytes.Buffer) {
quicvarint.Write(b, 0x4)
func (f *settingsFrame) Append(b []byte) []byte {
b = quicvarint.Append(b, 0x4)
var l protocol.ByteCount
for id, val := range f.Other {
l += quicvarint.Len(id) + quicvarint.Len(val)
}
if f.Datagram {
l += quicvarint.Len(settingDatagram) + quicvarint.Len(1)
}
quicvarint.Write(b, uint64(l))
b = quicvarint.Append(b, uint64(l))
if f.Datagram {
quicvarint.Write(b, settingDatagram)
quicvarint.Write(b, 1)
b = quicvarint.Append(b, settingDatagram)
b = quicvarint.Append(b, 1)
}
for id, val := range f.Other {
quicvarint.Write(b, id)
quicvarint.Write(b, val)
b = quicvarint.Append(b, id)
b = quicvarint.Append(b, val)
}
return b
}
47 changes: 19 additions & 28 deletions http3/frames_test.go
Expand Up @@ -24,12 +24,12 @@ var _ = Describe("Frames", func() {
}

It("skips unknown frames", func() {
data := appendVarInt(nil, 0xdeadbeef) // type byte
data = appendVarInt(data, 0x42)
data = append(data, make([]byte, 0x42)...)
buf := bytes.NewBuffer(data)
(&dataFrame{Length: 0x1234}).Write(buf)
frame, err := parseNextFrame(buf, nil)
b := appendVarInt(nil, 0xdeadbeef) // type byte
b = appendVarInt(b, 0x42)
b = append(b, make([]byte, 0x42)...)
b = (&dataFrame{Length: 0x1234}).Append(b)
r := bytes.NewReader(b)
frame, err := parseNextFrame(r, nil)
Expect(err).ToNot(HaveOccurred())
Expect(frame).To(BeAssignableToTypeOf(&dataFrame{}))
Expect(frame.(*dataFrame).Length).To(Equal(uint64(0x1234)))
Expand All @@ -46,9 +46,8 @@ var _ = Describe("Frames", func() {
})

It("writes", func() {
buf := &bytes.Buffer{}
(&dataFrame{Length: 0xdeadbeef}).Write(buf)
frame, err := parseNextFrame(buf, nil)
b := (&dataFrame{Length: 0xdeadbeef}).Append(nil)
frame, err := parseNextFrame(bytes.NewReader(b), nil)
Expect(err).ToNot(HaveOccurred())
Expect(err).ToNot(HaveOccurred())
Expect(frame).To(BeAssignableToTypeOf(&dataFrame{}))
Expand All @@ -67,9 +66,8 @@ var _ = Describe("Frames", func() {
})

It("writes", func() {
buf := &bytes.Buffer{}
(&headersFrame{Length: 0xdeadbeef}).Write(buf)
frame, err := parseNextFrame(buf, nil)
b := (&headersFrame{Length: 0xdeadbeef}).Append(nil)
frame, err := parseNextFrame(bytes.NewReader(b), nil)
Expect(err).ToNot(HaveOccurred())
Expect(err).ToNot(HaveOccurred())
Expect(frame).To(BeAssignableToTypeOf(&headersFrame{}))
Expand Down Expand Up @@ -112,9 +110,7 @@ var _ = Describe("Frames", func() {
99: 999,
13: 37,
}}
buf := &bytes.Buffer{}
sf.Write(buf)
frame, err := parseNextFrame(buf, nil)
frame, err := parseNextFrame(bytes.NewReader(sf.Append(nil)), nil)
Expect(err).ToNot(HaveOccurred())
Expect(frame).To(Equal(sf))
})
Expand All @@ -124,10 +120,8 @@ var _ = Describe("Frames", func() {
13: 37,
0xdeadbeef: 0xdecafbad,
}}
buf := &bytes.Buffer{}
sf.Write(buf)
data := sf.Append(nil)

data := buf.Bytes()
_, err := parseNextFrame(bytes.NewReader(data), nil)
Expect(err).ToNot(HaveOccurred())

Expand Down Expand Up @@ -177,9 +171,7 @@ var _ = Describe("Frames", func() {

It("writes the H3_DATAGRAM setting", func() {
sf := &settingsFrame{Datagram: true}
buf := &bytes.Buffer{}
sf.Write(buf)
frame, err := parseNextFrame(buf, nil)
frame, err := parseNextFrame(bytes.NewReader(sf.Append(nil)), nil)
Expect(err).ToNot(HaveOccurred())
Expect(frame).To(Equal(sf))
})
Expand Down Expand Up @@ -222,16 +214,15 @@ var _ = Describe("Frames", func() {
})

It("reads a frame without hijacking the stream", func() {
buf := &bytes.Buffer{}
quicvarint.Write(buf, 1337)
b := quicvarint.Append(nil, 1337)
customFrameContents := []byte("custom frame")
quicvarint.Write(buf, uint64(len(customFrameContents)))
buf.Write(customFrameContents)
(&dataFrame{Length: 6}).Write(buf)
buf.WriteString("foobar")
b = quicvarint.Append(b, uint64(len(customFrameContents)))
b = append(b, customFrameContents...)
b = (&dataFrame{Length: 6}).Append(b)
b = append(b, []byte("foobar")...)

var called bool
frame, err := parseNextFrame(buf, func(ft FrameType, e error) (hijacked bool, err error) {
frame, err := parseNextFrame(bytes.NewReader(b), func(ft FrameType, e error) (hijacked bool, err error) {
Expect(e).ToNot(HaveOccurred())
Expect(ft).To(BeEquivalentTo(1337))
called = true
Expand Down
15 changes: 10 additions & 5 deletions http3/http_stream.go
@@ -1,7 +1,6 @@
package http3

import (
"bytes"
"fmt"

"github.com/lucas-clemente/quic-go"
Expand All @@ -16,14 +15,20 @@ type Stream quic.Stream
type stream struct {
quic.Stream

buf []byte

onFrameError func()
bytesRemainingInFrame uint64
}

var _ Stream = &stream{}

func newStream(str quic.Stream, onFrameError func()) *stream {
return &stream{Stream: str, onFrameError: onFrameError}
return &stream{
Stream: str,
onFrameError: onFrameError,
buf: make([]byte, 0, 16),
}
}

func (s *stream) Read(b []byte) (int, error) {
Expand Down Expand Up @@ -62,9 +67,9 @@ func (s *stream) Read(b []byte) (int, error) {
}

func (s *stream) Write(b []byte) (int, error) {
buf := &bytes.Buffer{}
(&dataFrame{Length: uint64(len(b))}).Write(buf)
if _, err := s.Stream.Write(buf.Bytes()); err != nil {
s.buf = s.buf[:0]
s.buf = (&dataFrame{Length: uint64(len(b))}).Append(s.buf)
if _, err := s.Stream.Write(s.buf); err != nil {
return 0, err
}
return s.Stream.Write(b)
Expand Down

0 comments on commit 62b8278

Please sign in to comment.