Skip to content

Commit

Permalink
feat(bigquery/storage/managedwriter): augment reconnection logic (#6609)
Browse files Browse the repository at this point in the history
This PR augments the reconnection logic to include the grpc transport
stream drain error as a condition where we should force reconnect,
rather the waiting for the io.EOF of the connection fully closing.

Related: #6595
  • Loading branch information
shollyman committed Sep 2, 2022
1 parent 9f3c334 commit 6b0ac0c
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 4 deletions.
5 changes: 1 addition & 4 deletions bigquery/storage/managedwriter/managed_stream.go
Expand Up @@ -16,7 +16,6 @@ package managedwriter

import (
"context"
"errors"
"fmt"
"io"
"sync"
Expand Down Expand Up @@ -316,9 +315,7 @@ func (ms *ManagedStream) lockingAppend(requestCtx context.Context, pw *pendingWr
err = (*arc).Send(pw.request)
}
if err != nil {
// Transient connection loss. If we got io.EOF from a send, we want subsequent appends to
// reconnect the network connection for the stream.
if errors.Is(err, io.EOF) {
if shouldReconnect(err) {
ms.reconnect = true
}
return 0, err
Expand Down
17 changes: 17 additions & 0 deletions bigquery/storage/managedwriter/retry.go
Expand Up @@ -17,6 +17,7 @@ package managedwriter
import (
"context"
"errors"
"io"
"time"

"github.com/googleapis/gax-go/v2"
Expand Down Expand Up @@ -47,3 +48,19 @@ func (r *defaultRetryer) Retry(err error) (pause time.Duration, shouldRetry bool
return r.bo.Pause(), false
}
}

// shouldReconnect is akin to a retry predicate, in that it evaluates whether we should force
// our bidi stream to close/reopen based on the responses error. Errors here signal that no
// further appends will succeed.
func shouldReconnect(err error) bool {
var knownErrors = []error{
io.EOF,
status.Error(codes.Unavailable, "the connection is draining"), // errStreamDrain in gRPC transport
}
for _, ke := range knownErrors {
if errors.Is(err, ke) {
return true
}
}
return false
}
64 changes: 64 additions & 0 deletions bigquery/storage/managedwriter/retry_test.go
@@ -0,0 +1,64 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package managedwriter

import (
"fmt"
"io"
"testing"

"github.com/googleapis/gax-go/v2/apierror"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

func TestManagedStream_ShouldReconnect(t *testing.T) {

testCases := []struct {
err error
want bool
}{
{
err: fmt.Errorf("random error"),
want: false,
},
{
err: io.EOF,
want: true,
},
{
err: status.Error(codes.Unavailable, "nope"),
want: false,
},
{
err: status.Error(codes.Unavailable, "the connection is draining"),
want: true,
},
{
err: func() error {
// wrap the underlying error in a gax apierror
ai, _ := apierror.FromError(status.Error(codes.Unavailable, "the connection is draining"))
return ai
}(),
want: true,
},
}

for _, tc := range testCases {
if got := shouldReconnect(tc.err); got != tc.want {
t.Errorf("got %t, want %t for error: %+v", got, tc.want, tc.err)
}
}
}

0 comments on commit 6b0ac0c

Please sign in to comment.