From 172255fa2d0129ae96ae2f568331878c4a0ba1d2 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Mon, 22 Jul 2019 17:13:32 -0400 Subject: [PATCH 1/2] vendor: upgrade grpc from 1.13.0 to 1.21.2 This PR upgrades gRPC from 1.13.0 to 1.21.2. The primary motivation for this upgrade is to eliminate the disconnections caused by https://github.com/grpc/grpc-go/issues/1882. These failures manifest themselves as the following set of errors: ``` ajwerner-test-0001> I190722 22:15:01.203008 12054 vendor/github.com/cockroachdb/circuitbreaker/circuitbreaker.go:322 [n1] circuitbreaker: rpc [::]:26257 [n2] tripped: failed to check for ready connection to n2 at ajwerner-test-0002:26257: connection not ready: TRANSIENT_FAILURE ``` Which then lead to tripped breakers and general badness. I suspect that there are several other good bug fixes in here, including some purported leaks and correctness fixes on shutdown. I have verified that with this upgrade I no longer see connections break in overload scenarios which reliably reproduced the situation in the above log. This commit removes one condition from grpcutil.IsClosedConnection which should be subsumed by the status check above. The `transport` subpackage has not been around for many releases. This does not upgrade to the current release 1.22.0 because the maintainer mentions that it contains a bug (https://github.com/grpc/grpc-go/issues/2663#issuecomment-504129002). This change also unfortunately updates the keepalive behavior to be more spec compliant (https://github.com/grpc/grpc-go/pull/2642). This change mandates a minimum ping time of 10s to the client. Given https://github.com/grpc/grpc-go/issues/2638 this means that the rpc test for keepalives now takes over 20s. I would be okay skipping it but leave that discussion for review. Also updated the acceptance test to look out for an HTTP/2.0 header because grpc now does not send RPCs until after the HTTP handshake has completed (see https://github.com/grpc/grpc-go/issues/2406). Release note (bug fix): Upgrade grpc library to fix connection state management bug. --- Gopkg.lock | 16 ++-- Gopkg.toml | 4 + pkg/cli/interactive_tests/netcat.py | 5 +- .../interactive_tests/test_error_hints.tcl | 4 +- pkg/rpc/context.go | 9 +-- pkg/rpc/context_test.go | 74 +++++++++++++------ pkg/rpc/stats_handler_test.go | 3 + pkg/server/server.go | 2 +- pkg/util/grpcutil/grpc_util.go | 4 - pkg/util/grpcutil/grpc_util_test.go | 4 + vendor | 2 +- 11 files changed, 82 insertions(+), 45 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 36ebbfeb5774..f4fd8a5e795f 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1767,24 +1767,32 @@ revision = "db91494dd46c1fdcbbde05e5ff5eb56df8f7d79a" [[projects]] - digest = "1:3a98314fd2e43bbd905b33125dad80b10111ba6e5e541db8ed2a953fe01fbb31" + digest = "1:1bdff737fd41a4c48d06265e782e38f7fddb2610ed6877095c42763b8767d195" name = "google.golang.org/grpc" packages = [ ".", "balancer", "balancer/base", "balancer/roundrobin", + "binarylog/grpc_binarylog_v1", "codes", "connectivity", "credentials", + "credentials/internal", "encoding", "encoding/proto", "grpclog", "health/grpc_health_v1", "internal", "internal/backoff", + "internal/balancerload", + "internal/binarylog", "internal/channelz", + "internal/envconfig", "internal/grpcrand", + "internal/grpcsync", + "internal/syscall", + "internal/transport", "keepalive", "metadata", "naming", @@ -1795,11 +1803,10 @@ "stats", "status", "tap", - "transport", ] pruneopts = "UT" - revision = "168a6198bcb0ef175f7dacec0b8691fc141dc9b8" - version = "v1.13.0" + revision = "86af7e80a3703e2400cce4ea455a9abab36bbcf8" + version = "v1.21.2" [[projects]] branch = "v2-encoding-style" @@ -2045,7 +2052,6 @@ "google.golang.org/grpc/peer", "google.golang.org/grpc/stats", "google.golang.org/grpc/status", - "google.golang.org/grpc/transport", "gopkg.in/yaml.v2", "honnef.co/go/tools/cmd/staticcheck", "honnef.co/go/tools/lint", diff --git a/Gopkg.toml b/Gopkg.toml index ac57b020a83b..b30ad4dbbfeb 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -124,6 +124,10 @@ ignored = [ name = "google.golang.org/genproto" branch = "master" +[[constraint]] + name = "google.golang.org/grpc" + version = "=v1.21.2" + [prune] go-tests = true unused-packages = true diff --git a/pkg/cli/interactive_tests/netcat.py b/pkg/cli/interactive_tests/netcat.py index 9075913bf65f..e44d594aac46 100644 --- a/pkg/cli/interactive_tests/netcat.py +++ b/pkg/cli/interactive_tests/netcat.py @@ -11,5 +11,6 @@ while True: c = client_socket.recv(1) - sys.stdout.write("%c" % c) - sys.stdout.flush() + if c: + sys.stdout.write("%c" % c) + sys.stdout.flush() diff --git a/pkg/cli/interactive_tests/test_error_hints.tcl b/pkg/cli/interactive_tests/test_error_hints.tcl index bc88f3467649..0bc8e7dab30c 100644 --- a/pkg/cli/interactive_tests/test_error_hints.tcl +++ b/pkg/cli/interactive_tests/test_error_hints.tcl @@ -107,10 +107,10 @@ eexpect "ready" set spawn_id $client_spawn_id send "$argv quit --insecure\r" eexpect "insecure\r\n" -# In the first shell, stop the server. +# Wait to see an HTTP/2.0 header on the fake server, then stop the server. set spawn_id $shell_spawn_id eexpect "connected" -eexpect ":26257" +eexpect "PRI * HTTP/2.0" interrupt eexpect ":/# " # Check that cockroach quit becomes suitably confused. diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index 0a9bad33fcec..1b933e394ef5 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -682,23 +682,21 @@ func init() { // ensures that our initial heartbeat (and its version/clusterID // validation) occurs on every new connection. type onlyOnceDialer struct { - ctx context.Context syncutil.Mutex dialed bool closed bool redialChan chan struct{} } -func (ood *onlyOnceDialer) dial(addr string, timeout time.Duration) (net.Conn, error) { +func (ood *onlyOnceDialer) dial(ctx context.Context, addr string) (net.Conn, error) { ood.Lock() defer ood.Unlock() if !ood.dialed { ood.dialed = true dialer := net.Dialer{ - Timeout: timeout, LocalAddr: sourceAddr, } - return dialer.DialContext(ood.ctx, "tcp", addr) + return dialer.DialContext(ctx, "tcp", addr) } else if !ood.closed { ood.closed = true close(ood.redialChan) @@ -726,10 +724,9 @@ func (ctx *Context) GRPCDialRaw(target string) (*grpc.ClientConn, <-chan struct{ grpc.WithInitialConnWindowSize(initialConnWindowSize)) dialer := onlyOnceDialer{ - ctx: ctx.masterCtx, redialChan: make(chan struct{}), } - dialOpts = append(dialOpts, grpc.WithDialer(dialer.dial)) + dialOpts = append(dialOpts, grpc.WithContextDialer(dialer.dial)) // add testingDialOpts after our dialer because one of our tests // uses a custom dialer (this disables the only-one-connection diff --git a/pkg/rpc/context_test.go b/pkg/rpc/context_test.go index 0b27c286a8c4..e7d9eb654ae5 100644 --- a/pkg/rpc/context_test.go +++ b/pkg/rpc/context_test.go @@ -1041,8 +1041,8 @@ func grpcRunKeepaliveTestCase(testCtx context.Context, c grpcKeepaliveTestCase) // PartitionableConns. We'll partition the first opened connection. dialerCh := make(chan *testutils.PartitionableConn, 1) clientCtx.AddTestingDialOpts( - grpc.WithDialer( - func(addr string, timeout time.Duration) (net.Conn, error) { + grpc.WithContextDialer( + func(_ context.Context, addr string) (net.Conn, error) { if !atomic.CompareAndSwapInt32(&firstConn, 1, 0) { // If we allow gRPC to open a 2nd transport connection, then our RPCs // might succeed if they're sent on that one. In the spirit of a @@ -1052,7 +1052,7 @@ func grpcRunKeepaliveTestCase(testCtx context.Context, c grpcKeepaliveTestCase) return nil, errors.Errorf("No more connections for you. We're partitioned.") } - conn, err := net.DialTimeout("tcp", addr, timeout) + conn, err := net.Dial("tcp", addr) if err != nil { return nil, err } @@ -1116,10 +1116,8 @@ func grpcRunKeepaliveTestCase(testCtx context.Context, c grpcKeepaliveTestCase) // Now partition either client->server, server->client, or both, and attempt // to perform an RPC. We expect it to fail once the grpc keepalive fails to // get a response from the server. - transportConn := <-dialerCh defer transportConn.Finish() - if c.partitionC2S { log.Infof(ctx, "partition C2S") transportConn.PartitionC2S() @@ -1129,38 +1127,65 @@ func grpcRunKeepaliveTestCase(testCtx context.Context, c grpcKeepaliveTestCase) transportConn.PartitionS2C() } + // We want to start a goroutine that keeps trying to send requests and reports + // the error from the send call. In cases where there are no keep-alives this + // request may get blocked if flow control blocks it. + errChan := make(chan error) + sendCtx, cancel := context.WithCancel(ctx) + r := retry.StartWithCtx(sendCtx, retry.Options{ + InitialBackoff: 10 * time.Millisecond, + MaxBackoff: 500 * time.Millisecond, + }) + defer cancel() + go func() { + for r.Next() { + err := heartbeatClient.Send(&request) + isClosed := err != nil && grpcutil.IsClosedConnection(err) + log.Infof(ctx, "heartbeat Send got error %+v (closed=%v)", err, isClosed) + select { + case errChan <- err: + case <-sendCtx.Done(): + return + } + if isClosed { + return + } + } + }() // Check whether the connection eventually closes. We may need to // adjust this duration if the test gets flaky. - const retryDur = 3 * time.Second - errNotClosed := errors.New("conn not closed") - closedErr := retry.ForDuration(retryDur, func() error { - err := heartbeatClient.Send(&request) - if err == nil { - log.Infof(ctx, "expected send error, got no error") - return errNotClosed - } - if !grpcutil.IsClosedConnection(err) { - newErr := fmt.Errorf("expected closed connection error, found %v", err) - log.Infof(ctx, "%+v", newErr) - return newErr + // This unfortunately massive amount of time is required due to gRPC's + // minimum timeout of 10s and the below issue whereby keepalives are sent + // at half the expected rate. + // https://github.com/grpc/grpc-go/issues/2638 + const timeoutDur = 21 * time.Second + timeout := time.After(timeoutDur) + // sendErr will hold the last error we saw from an attempt to send a + // heartbeat. Initialize it with a dummy error which will fail the test if + // it is not overwritten. + sendErr := fmt.Errorf("not a real error") + for done := false; !done; { + select { + case <-timeout: + cancel() + done = true + case sendErr = <-errChan: } - return nil - }) + } if c.expClose { - if closedErr != nil { - newErr := fmt.Errorf("expected closed connection, found %v", closedErr) + if sendErr == nil || !grpcutil.IsClosedConnection(sendErr) { + newErr := fmt.Errorf("expected closed connection, found %v", sendErr) log.Infof(ctx, "%+v", newErr) return newErr } } else { - if closedErr != errNotClosed { - newErr := fmt.Errorf("expected unclosed connection, found %v", closedErr) + if sendErr != nil { + newErr := fmt.Errorf("expected unclosed connection, found %v", sendErr) log.Infof(ctx, "%+v", newErr) return newErr } } - log.Infof(ctx, "test done") // If the DialOptions we passed to gRPC didn't prevent it from opening new // connections, then next RPCs would succeed since gRPC reconnects the // transport (and that would succeed here since we've only partitioned one @@ -1169,6 +1194,7 @@ func grpcRunKeepaliveTestCase(testCtx context.Context, c grpcKeepaliveTestCase) // the (application-level) heartbeats performed by rpc.Context, but the // behavior of our heartbeats in the face of transport failures is // sufficiently tested in TestHeartbeatHealthTransport. + log.Infof(ctx, "test done") return nil } diff --git a/pkg/rpc/stats_handler_test.go b/pkg/rpc/stats_handler_test.go index 2174736e1dde..075a9153bf2e 100644 --- a/pkg/rpc/stats_handler_test.go +++ b/pkg/rpc/stats_handler_test.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/netutil" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -168,6 +169,8 @@ func TestStatsHandlerWithHeartbeats(t *testing.T) { if s, c := serverVal.(*Stats).Outgoing(), clientVal.(*Stats).Incoming(); s == 0 || c == 0 || s > c { return fmt.Errorf("expected server.outgoing < client.incoming; got %d, %d", s, c) } + log.Infof(context.TODO(), "server incoming = %v, server outgoing = %v, client incoming = %v, client outgoing = %v", + serverVal.(*Stats).Incoming(), serverVal.(*Stats).Outgoing(), clientVal.(*Stats).Incoming(), clientVal.(*Stats).Outgoing()) return nil }) } diff --git a/pkg/server/server.go b/pkg/server/server.go index e802e4d0f00b..a88e0b88c00e 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -1316,7 +1316,7 @@ func (s *Server) Start(ctx context.Context) error { } conn, err := grpc.DialContext(ctx, s.cfg.AdvertiseAddr, append( dialOpts, - grpc.WithDialer(func(string, time.Duration) (net.Conn, error) { + grpc.WithContextDialer(func(_ context.Context, _ string) (net.Conn, error) { return c2, nil }), )...) diff --git a/pkg/util/grpcutil/grpc_util.go b/pkg/util/grpcutil/grpc_util.go index e8110c16d9f8..8adc240c2e26 100644 --- a/pkg/util/grpcutil/grpc_util.go +++ b/pkg/util/grpcutil/grpc_util.go @@ -22,7 +22,6 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/status" - "google.golang.org/grpc/transport" ) // ErrCannotReuseClientConn is returned when a failed connection is @@ -64,9 +63,6 @@ func IsClosedConnection(err error) bool { strings.Contains(err.Error(), "node unavailable") { return true } - if streamErr, ok := err.(transport.StreamError); ok && streamErr.Code == codes.Canceled { - return true - } return netutil.IsClosedConnection(err) } diff --git a/pkg/util/grpcutil/grpc_util_test.go b/pkg/util/grpcutil/grpc_util_test.go index dac250759088..410d750023ac 100644 --- a/pkg/util/grpcutil/grpc_util_test.go +++ b/pkg/util/grpcutil/grpc_util_test.go @@ -45,6 +45,10 @@ func (hs healthServer) Check( return nil, errors.New("no one should see this") } +func (hs healthServer) Watch(*healthpb.HealthCheckRequest, healthpb.Health_WatchServer) error { + panic("not implemented") +} + func TestRequestDidNotStart(t *testing.T) { defer leaktest.AfterTest(t)() diff --git a/vendor b/vendor index 7c852f2ccd37..7a80ff42b860 160000 --- a/vendor +++ b/vendor @@ -1 +1 @@ -Subproject commit 7c852f2ccd374446039567068281b576886be13b +Subproject commit 7a80ff42b860005744d0d0aa03131a494e519af9 From 03aac6485f82036f4691918ac79120c45f03c275 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Wed, 24 Jul 2019 13:19:51 -0400 Subject: [PATCH 2/2] rpc: skip TestGRPCKeepaliveFailureFailsInflightRPCs This test now takes 21 seconds which is too long. Release note: None --- pkg/rpc/context_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/rpc/context_test.go b/pkg/rpc/context_test.go index e7d9eb654ae5..8aaa6f4f8c9f 100644 --- a/pkg/rpc/context_test.go +++ b/pkg/rpc/context_test.go @@ -891,6 +891,7 @@ func TestRemoteOffsetUnhealthy(t *testing.T) { // its response stream even if it doesn't get any new requests. func TestGRPCKeepaliveFailureFailsInflightRPCs(t *testing.T) { defer leaktest.AfterTest(t)() + t.Skip("Takes too long given https://github.com/grpc/grpc-go/pull/2642") sc := log.Scope(t) defer sc.Close(t)