Skip to content

Commit

Permalink
fix data races
Browse files Browse the repository at this point in the history
  • Loading branch information
birneee committed Jan 11, 2024
1 parent 8eddef2 commit dec7c06
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 7 deletions.
30 changes: 24 additions & 6 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,11 @@ type baseServer struct {
protocol.VersionNumber,
) quicConn

closeOnce sync.Once
errorChan chan struct{} // is closed when the server is closed
closeErr error
running chan struct{} // closed as soon as run() returns
closeOnce sync.Once
errorChan chan struct{} // is closed when the server is closed
closeErr error
running chan struct{} // closed as soon as run() returns
handshakeWaitGroup sync.WaitGroup // connections that are started but not passed to connQueue yet

versionNegotiationQueue chan receivedPacket
invalidTokenQueue chan rejectedPacket
Expand Down Expand Up @@ -340,6 +341,19 @@ func (s *baseServer) close(e error, notifyOnClose bool) {
close(s.errorChan)

<-s.running
s.handshakeWaitGroup.Wait()

loop: // drain connQueue
for {
select {
case conn := <-s.connQueue:
conn.destroy(&qerr.TransportError{ErrorCode: ConnectionRefused})
atomic.AddInt32(&s.connQueueLen, -1)
default:
break loop
}
}

if notifyOnClose {
s.onClose()
}
Expand Down Expand Up @@ -681,6 +695,7 @@ func (s *baseServer) handleInitialImpl(p receivedPacket, hdr *wire.Header) error
return nil
}
go conn.run()
s.handshakeWaitGroup.Add(1)
go s.handleNewConn(conn)
if conn == nil {
p.buffer.Release()
Expand All @@ -690,6 +705,7 @@ func (s *baseServer) handleInitialImpl(p receivedPacket, hdr *wire.Header) error
}

func (s *baseServer) handleNewConn(conn quicConn) {
defer s.handshakeWaitGroup.Done()
connCtx := conn.Context()
if s.acceptEarlyConns {
// wait until the early connection is ready, the handshake fails, or the server is closed
Expand All @@ -699,7 +715,6 @@ func (s *baseServer) handleNewConn(conn quicConn) {
return
case <-conn.earlyConnReady():
case <-connCtx.Done():
conn.destroy(context.Canceled)
return
}
} else {
Expand All @@ -710,7 +725,6 @@ func (s *baseServer) handleNewConn(conn quicConn) {
return
case <-conn.HandshakeComplete():
case <-connCtx.Done():
conn.destroy(context.Canceled)
return
}
}
Expand All @@ -722,6 +736,10 @@ func (s *baseServer) handleNewConn(conn quicConn) {
case <-connCtx.Done():
atomic.AddInt32(&s.connQueueLen, -1)
// don't pass connections that were already closed to Accept()
case <-s.errorChan:
conn.destroy(&qerr.TransportError{ErrorCode: ConnectionRefused})
atomic.AddInt32(&s.connQueueLen, -1)
// don't pass connections to Accept() if server already closed
}
}

Expand Down
3 changes: 2 additions & 1 deletion server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,7 @@ var _ = Describe("Server", func() {
c := make(chan struct{})
close(c)
conn.EXPECT().HandshakeComplete().Return(c)
conn.EXPECT().destroy(gomock.Any())
return conn
}

Expand Down Expand Up @@ -1276,6 +1277,7 @@ var _ = Describe("Server", func() {
conn.EXPECT().run()
conn.EXPECT().earlyConnReady().Return(ready)
conn.EXPECT().Context().Return(context.Background())
conn.EXPECT().destroy(gomock.Any())
return conn
}

Expand Down Expand Up @@ -1337,7 +1339,6 @@ var _ = Describe("Server", func() {
conn.EXPECT().run()
conn.EXPECT().earlyConnReady()
conn.EXPECT().Context().Return(ctx)
conn.EXPECT().destroy(gomock.Any())
close(connCreated)
return conn
}
Expand Down

0 comments on commit dec7c06

Please sign in to comment.