Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix datagram support detection #3511

Merged
merged 3 commits into from Aug 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 13 additions & 4 deletions connection.go
Expand Up @@ -314,6 +314,8 @@ var newConnection = func(
}
if s.config.EnableDatagrams {
params.MaxDatagramFrameSize = protocol.MaxDatagramFrameSize
} else {
params.MaxDatagramFrameSize = protocol.InvalidByteCount
}
if s.tracer != nil {
s.tracer.SentTransportParameters(params)
Expand Down Expand Up @@ -438,6 +440,8 @@ var newClientConnection = func(
}
if s.config.EnableDatagrams {
params.MaxDatagramFrameSize = protocol.MaxDatagramFrameSize
} else {
params.MaxDatagramFrameSize = protocol.InvalidByteCount
}
if s.tracer != nil {
s.tracer.SentTransportParameters(params)
Expand Down Expand Up @@ -532,9 +536,7 @@ func (s *connection) preSetup() {
s.creationTime = now

s.windowUpdateQueue = newWindowUpdateQueue(s.streamsMap, s.connFlowController, s.framer.QueueControlFrame)
if s.config.EnableDatagrams {
s.datagramQueue = newDatagramQueue(s.scheduleSending, s.logger)
}
s.datagramQueue = newDatagramQueue(s.scheduleSending, s.logger)
}

// run the connection main loop
Expand Down Expand Up @@ -724,7 +726,7 @@ func (s *connection) Context() context.Context {
}

func (s *connection) supportsDatagrams() bool {
return s.peerParams.MaxDatagramFrameSize != protocol.InvalidByteCount
return s.peerParams.MaxDatagramFrameSize > 0
}

func (s *connection) ConnectionState() ConnectionState {
Expand Down Expand Up @@ -1975,6 +1977,10 @@ func (s *connection) onStreamCompleted(id protocol.StreamID) {
}

func (s *connection) SendMessage(p []byte) error {
if !s.supportsDatagrams() {
return errors.New("datagram support disabled")
}

f := &wire.DatagramFrame{DataLenPresent: true}
if protocol.ByteCount(len(p)) > f.MaxDataLen(s.peerParams.MaxDatagramFrameSize, s.version) {
return errors.New("message too large")
Expand All @@ -1985,6 +1991,9 @@ func (s *connection) SendMessage(p []byte) error {
}

func (s *connection) ReceiveMessage() ([]byte, error) {
if !s.config.EnableDatagrams {
return nil, errors.New("datagram support disabled")
}
return s.datagramQueue.Receive()
}

Expand Down
84 changes: 68 additions & 16 deletions integrationtests/self/datagram_test.go
Expand Up @@ -32,7 +32,7 @@ var _ = Describe("Datagram test", func() {
dropped, total int32
)

startServerAndProxy := func() {
startServerAndProxy := func(enableDatagram, expectDatagramSupport bool) {
addr, err := net.ResolveUDPAddr("udp", "localhost:0")
Expect(err).ToNot(HaveOccurred())
serverConn, err = net.ListenUDP("udp", addr)
Expand All @@ -41,30 +41,39 @@ var _ = Describe("Datagram test", func() {
serverConn,
getTLSConfig(),
getQuicConfig(&quic.Config{
EnableDatagrams: true,
EnableDatagrams: enableDatagram,
Versions: []protocol.VersionNumber{version},
}),
)
Expect(err).ToNot(HaveOccurred())

go func() {
defer GinkgoRecover()
conn, err := ln.Accept(context.Background())
Expect(err).ToNot(HaveOccurred())
Expect(conn.ConnectionState().SupportsDatagrams).To(BeTrue())

var wg sync.WaitGroup
wg.Add(num)
for i := 0; i < num; i++ {
go func(i int) {
defer GinkgoRecover()
defer wg.Done()
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, uint64(i))
Expect(conn.SendMessage(b)).To(Succeed())
}(i)

if expectDatagramSupport {
Expect(conn.ConnectionState().SupportsDatagrams).To(BeTrue())

if enableDatagram {
var wg sync.WaitGroup
wg.Add(num)
for i := 0; i < num; i++ {
go func(i int) {
defer GinkgoRecover()
defer wg.Done()
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, uint64(i))
Expect(conn.SendMessage(b)).To(Succeed())
}(i)
}
wg.Wait()
}
} else {
Expect(conn.ConnectionState().SupportsDatagrams).To(BeFalse())
}
wg.Wait()
}()

serverPort := ln.Addr().(*net.UDPAddr).Port
proxy, err = quicproxy.NewQuicProxy("localhost:0", &quicproxy.Opts{
RemoteAddr: fmt.Sprintf("localhost:%d", serverPort),
Expand Down Expand Up @@ -100,7 +109,7 @@ var _ = Describe("Datagram test", func() {
})

It("sends datagrams", func() {
startServerAndProxy()
startServerAndProxy(true, true)
raddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("localhost:%d", proxy.LocalPort()))
Expect(err).ToNot(HaveOccurred())
conn, err := quic.Dial(
Expand Down Expand Up @@ -137,6 +146,49 @@ var _ = Describe("Datagram test", func() {
BeNumerically("<", num),
))
})

It("server can disable datagram", func() {
startServerAndProxy(false, true)
raddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("localhost:%d", proxy.LocalPort()))
Expect(err).ToNot(HaveOccurred())
conn, err := quic.Dial(
clientConn,
raddr,
fmt.Sprintf("localhost:%d", proxy.LocalPort()),
getTLSClientConfig(),
getQuicConfig(&quic.Config{
EnableDatagrams: true,
Versions: []protocol.VersionNumber{version},
}),
)
Expect(err).ToNot(HaveOccurred())
Expect(conn.ConnectionState().SupportsDatagrams).To(BeFalse())

conn.CloseWithError(0, "")
<-time.After(10 * time.Millisecond)
})

It("client can disable datagram", func() {
startServerAndProxy(false, true)
raddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("localhost:%d", proxy.LocalPort()))
Expect(err).ToNot(HaveOccurred())
conn, err := quic.Dial(
clientConn,
raddr,
fmt.Sprintf("localhost:%d", proxy.LocalPort()),
getTLSClientConfig(),
getQuicConfig(&quic.Config{
EnableDatagrams: true,
Versions: []protocol.VersionNumber{version},
}),
)
Expect(err).ToNot(HaveOccurred())
Expect(conn.ConnectionState().SupportsDatagrams).To(BeFalse())

Expect(conn.SendMessage([]byte{0})).To(HaveOccurred())
conn.CloseWithError(0, "")
<-time.After(10 * time.Millisecond)
})
})
}
})