Skip to content

Commit

Permalink
Pull request 315: 373 fix goroutines leak
Browse files Browse the repository at this point in the history
Updates #373.

Squashed commit of the following:

commit 0632b4f
Author: Eugene Burkov <E.Burkov@AdGuard.COM>
Date:   Tue Jan 23 16:21:41 2024 +0300

    upstream: imp code, logging

commit cea34d5
Author: Eugene Burkov <E.Burkov@AdGuard.COM>
Date:   Tue Jan 23 15:50:53 2024 +0300

    upstream: use mutex. imp logging
  • Loading branch information
EugeneOne1 committed Jan 23, 2024
1 parent edb394b commit a2506cd
Showing 1 changed file with 44 additions and 30 deletions.
74 changes: 44 additions & 30 deletions upstream/quic.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,13 @@ type dnsOverQUIC struct {
bytesPool *sync.Pool

// quicConfigMu protects quicConfig.
quicConfigMu sync.Mutex
quicConfigMu *sync.Mutex

// connMu protects conn.
connMu sync.RWMutex
connMu *sync.Mutex

// bytesPoolGuard protects bytesPool.
bytesPoolMu sync.Mutex
bytesPoolMu *sync.Mutex

// timeout is the timeout for the upstream connection.
timeout time.Duration
Expand Down Expand Up @@ -118,7 +118,10 @@ func newDoQ(addr *url.URL, opts *Options) (u Upstream, err error) {
VerifyConnection: opts.VerifyConnection,
NextProtos: compatProtoDQ,
},
timeout: opts.Timeout,
quicConfigMu: &sync.Mutex{},
connMu: &sync.Mutex{},
bytesPoolMu: &sync.Mutex{},
timeout: opts.Timeout,
}

runtime.SetFinalizer(u, (*dnsOverQUIC).Close)
Expand Down Expand Up @@ -159,7 +162,7 @@ func (p *dnsOverQUIC) Exchange(m *dns.Msg) (resp *dns.Msg, err error) {
// connection was closed (due to inactivity for example) AND the server
// refuses to open a 0-RTT connection.
for i := 0; hasConnection && p.shouldRetry(err) && i < 2; i++ {
log.Debug("re-creating the QUIC connection and retrying due to %v", err)
log.Debug("dnsproxy: re-creating the QUIC connection and retrying due to %v", err)

// Close the active connection to make sure we'll try to re-connect.
p.closeConnWithError(err)
Expand Down Expand Up @@ -214,7 +217,7 @@ func (p *dnsOverQUIC) exchangeQUIC(req *dns.Msg) (resp *dns.Msg, err error) {
var stream quic.Stream
stream, err = p.openStream(conn)
if err != nil {
return nil, err
return nil, fmt.Errorf("opening stream: %w", err)
}

_, err = stream.Write(proxyutil.AddPrefix(buf))
Expand All @@ -226,7 +229,10 @@ func (p *dnsOverQUIC) exchangeQUIC(req *dns.Msg) (resp *dns.Msg, err error) {
// indicate through the STREAM FIN mechanism that no further data will
// be sent on that stream. Note, that stream.Close() closes the
// write-direction of the stream, but does not prevent reading from it.
_ = stream.Close()
err = stream.Close()
if err != nil {
log.Debug("dnsproxy: closing quic stream: %s", err)
}

return p.readMsg(stream)
}
Expand Down Expand Up @@ -259,29 +265,30 @@ func (p *dnsOverQUIC) getBytesPool() (pool *sync.Pool) {
// argument controls whether we should try to use the existing cached
// connection. If it is false, we will forcibly create a new connection and
// close the existing one if needed.
func (p *dnsOverQUIC) getConnection(useCached bool) (quic.Connection, error) {
func (p *dnsOverQUIC) getConnection(useCached bool) (c quic.Connection, err error) {
var conn quic.Connection
p.connMu.RLock()
conn = p.conn
if conn != nil && useCached {
p.connMu.RUnlock()

return conn, nil
}
if conn != nil {
// we're recreating the connection, let's create a new one.
_ = conn.CloseWithError(QUICCodeNoError, "")
}
p.connMu.RUnlock()

p.connMu.Lock()
defer p.connMu.Unlock()

var err error
conn = p.conn
if conn != nil {
if useCached {
return conn, nil
}

// We're recreating the connection, let's create a new one.
err = conn.CloseWithError(QUICCodeNoError, "")
if err != nil {
log.Debug("dnsproxy: closing stale connection: %s", err)
}
}

conn, err = p.openConnection()
if err != nil {
return nil, err
}

p.conn = conn

return conn, nil
Expand Down Expand Up @@ -320,7 +327,9 @@ func (p *dnsOverQUIC) openStream(conn quic.Connection) (quic.Stream, error) {
defer cancel()

stream, err := conn.OpenStreamSync(ctx)
if err == nil {
if err != nil {
log.Debug("dnsproxy: opening quic stream: %s", err)
} else {
return stream, nil
}

Expand All @@ -330,30 +339,35 @@ func (p *dnsOverQUIC) openStream(conn quic.Connection) (quic.Stream, error) {
if err != nil {
return nil, err
}

// Open a new stream.
return newConn.OpenStreamSync(ctx)
}

// openConnection opens a new QUIC connection.
// openConnection dials a new QUIC connection.
func (p *dnsOverQUIC) openConnection() (conn quic.Connection, err error) {
dialContext, err := p.getDialer()
if err != nil {
return nil, fmt.Errorf("failed to bootstrap QUIC connection: %w", err)
return nil, fmt.Errorf("bootstrapping %s: %w", p.addr, err)
}

// we're using bootstrapped address instead of what's passed to the function
// it does not create an actual connection, but it helps us determine
// what IP is actually reachable (when there're v4/v6 addresses).
rawConn, err := dialContext(context.Background(), "udp", "")
if err != nil {
return nil, fmt.Errorf("failed to open a QUIC connection: %w", err)
return nil, fmt.Errorf("dialing raw connection to %s: %w", p.addr, err)
}

// It's never actually used.
err = rawConn.Close()
if err != nil {
log.Debug("dnsproxy: closing raw connection for %s: %s", p.addr, err)
}
// It's never actually used
_ = rawConn.Close()

udpConn, ok := rawConn.(*net.UDPConn)
if !ok {
return nil, fmt.Errorf("failed to open connection to %s", p.addr)
return nil, fmt.Errorf("unexpected type %T of connection; should be %T", rawConn, udpConn)
}

addr := udpConn.RemoteAddr().String()
Expand All @@ -363,7 +377,7 @@ func (p *dnsOverQUIC) openConnection() (conn quic.Connection, err error) {

conn, err = quic.DialAddrEarly(ctx, addr, p.tlsConf.Clone(), p.getQUICConfig())
if err != nil {
return nil, fmt.Errorf("opening quic connection to %s: %w", p.addr, err)
return nil, fmt.Errorf("dialing quic connection to %s: %w", p.addr, err)
}

return conn, nil
Expand Down Expand Up @@ -393,7 +407,7 @@ func (p *dnsOverQUIC) closeConnWithError(err error) {

err = p.conn.CloseWithError(code, "")
if err != nil {
log.Error("failed to close the conn: %v", err)
log.Error("dnsproxy: failed to close the conn: %v", err)
}
p.conn = nil
}
Expand Down

0 comments on commit a2506cd

Please sign in to comment.