Skip to content

Commit

Permalink
fix(pubsublite): retry Cancelled stream errors (#5943)
Browse files Browse the repository at this point in the history
Add `CANCELLED` to retryable error codes.

Updated integration tests (CancelPublisherContext, CancelSubscriberContext). Fixed an unrelated flaky integration test (IncreasePartitions).

Release-As: 1.3.1
  • Loading branch information
tmdiep committed Apr 27, 2022
1 parent a6ef51a commit bbee3d5
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 24 deletions.
2 changes: 1 addition & 1 deletion pubsublite/internal/wire/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func isRetryableSendCode(code codes.Code) bool {
func isRetryableRecvCode(code codes.Code) bool {
switch code {
// Consistent with https://github.com/googleapis/java-pubsublite/blob/master/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/ErrorCodes.java
case codes.Aborted, codes.DeadlineExceeded, codes.Internal, codes.ResourceExhausted, codes.Unavailable, codes.Unknown:
case codes.Aborted, codes.DeadlineExceeded, codes.Internal, codes.ResourceExhausted, codes.Unavailable, codes.Unknown, codes.Canceled:
return true
default:
return false
Expand Down
43 changes: 39 additions & 4 deletions pubsublite/internal/wire/streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"cloud.google.com/go/internal/testutil"
"cloud.google.com/go/pubsublite/internal/test"
"golang.org/x/xerrors"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand All @@ -38,6 +37,7 @@ var errInvalidInitialResponse = errors.New("invalid initial response")
// testStreamHandler is a simplified publisher service that owns a
// retryableStream.
type testStreamHandler struct {
CancelCtx context.CancelFunc
Topic topicPartition
InitialReq *pb.PublishRequest
Stream *retryableStream
Expand All @@ -49,14 +49,15 @@ type testStreamHandler struct {
}

func newTestStreamHandler(t *testing.T, connectTimeout, idleTimeout time.Duration) *testStreamHandler {
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
pubClient, err := newPublisherClient(ctx, "ignored", testServer.ClientConn())
if err != nil {
t.Fatal(err)
}

topic := topicPartition{Path: "path/to/topic", Partition: 1}
sh := &testStreamHandler{
CancelCtx: cancel,
Topic: topic,
InitialReq: initPubReq(topic),
t: t,
Expand Down Expand Up @@ -226,7 +227,7 @@ func TestRetryableStreamConnectRetries(t *testing.T) {
verifiers.AddPublishStream(pub.Topic.Path, pub.Topic.Partition, stream1)

stream2 := test.NewRPCVerifier(t)
stream2.Push(pub.InitialReq, nil, status.Error(codes.Internal, "internal"))
stream2.Push(pub.InitialReq, nil, status.Error(codes.Canceled, "canceled"))
verifiers.AddPublishStream(pub.Topic.Path, pub.Topic.Partition, stream2)

// Third stream should succeed.
Expand All @@ -251,6 +252,40 @@ func TestRetryableStreamConnectRetries(t *testing.T) {
}
}

func TestRetryableStreamContextCanceledNotRetried(t *testing.T) {
pub := newTestStreamHandler(t, streamTestTimeout, streamTestTimeout)

verifiers := test.NewVerifiers(t)
stream := test.NewRPCVerifier(t)
stream.Push(pub.InitialReq, initPubResp(), nil)
verifiers.AddPublishStream(pub.Topic.Path, pub.Topic.Partition, stream)

mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()

pub.Stream.Start()
if got, want := pub.NextStatus(), streamReconnecting; got != want {
t.Errorf("Stream status change: got %d, want %d", got, want)
}
if got, want := pub.NextStatus(), streamConnected; got != want {
t.Errorf("Stream status change: got %d, want %d", got, want)
}

// Cancelling the parent context will cause the current gRPC stream to fail
// with a retryable Canceled error.
pub.CancelCtx()
if got, want := pub.NextStatus(), streamReconnecting; got != want {
t.Errorf("Stream status change: got %d, want %d", got, want)
}
// Reconnection then fails.
if got, want := pub.NextStatus(), streamTerminated; got != want {
t.Errorf("Stream status change: got %d, want %d", got, want)
}
if gotErr, wantErr := pub.Stream.Error(), context.Canceled; !test.ErrorEqual(gotErr, wantErr) {
t.Errorf("Stream final err: got (%v), want (%v)", gotErr, wantErr)
}
}

func TestRetryableStreamConnectPermanentFailure(t *testing.T) {
pub := newTestStreamHandler(t, streamTestTimeout, streamTestTimeout)
permanentErr := status.Error(codes.PermissionDenied, "denied")
Expand Down Expand Up @@ -310,7 +345,7 @@ func TestRetryableStreamConnectTimeout(t *testing.T) {
if pub.Stream.currentStream() != nil {
t.Error("Client stream should be nil")
}
if gotErr := pub.Stream.Error(); !xerrors.Is(gotErr, ErrBackendUnavailable) {
if gotErr := pub.Stream.Error(); !test.ErrorEqual(gotErr, ErrBackendUnavailable) {
t.Errorf("Stream final err: got (%v), want (%v)", gotErr, ErrBackendUnavailable)
}
}
Expand Down
44 changes: 25 additions & 19 deletions pubsublite/pscompat/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ import (
"cloud.google.com/go/pubsublite/internal/wire"
"github.com/google/go-cmp/cmp/cmpopts"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
"google.golang.org/api/option"
"google.golang.org/grpc/codes"

vkit "cloud.google.com/go/pubsublite/apiv1"
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
Expand Down Expand Up @@ -218,18 +216,16 @@ func waitForPublishResults(t *testing.T, pubResults []*pubsub.PublishResult) {
cancel()
}

func parseMessageMetadata(ctx context.Context, t *testing.T, result *pubsub.PublishResult) *MessageMetadata {
func parseMessageMetadata(ctx context.Context, t *testing.T, result *pubsub.PublishResult) (*MessageMetadata, error) {
id, err := result.Get(ctx)
if err != nil {
t.Fatalf("Failed to publish message: %v", err)
return nil
return nil, err
}
metadata, err := ParseMessageMetadata(id)
if err != nil {
t.Fatalf("Failed to parse message metadata: %v", err)
return nil
}
return metadata
return metadata, err
}

func makeMsgTracker(msgs []string) *test.MsgTracker {
Expand Down Expand Up @@ -586,15 +582,15 @@ func TestIntegration_PublishSubscribeSinglePartition(t *testing.T) {

cancel()

wantCode := codes.Canceled
wantErr := context.Canceled
result := publisher.Publish(ctx, &pubsub.Message{Data: []byte("cancel_publisher_context")})
if _, err := result.Get(ctx); !test.ErrorHasCode(err, wantCode) {
t.Errorf("Publish() got err: %v, want code: %v", err, wantCode)
if _, gotErr := result.Get(ctx); !test.ErrorEqual(gotErr, wantErr) {
t.Errorf("Publish() got err: %v, want err: %v", gotErr, wantErr)
}

publisher.Stop()
if err := xerrors.Unwrap(publisher.Error()); !test.ErrorHasCode(err, wantCode) {
t.Errorf("Error() got err: %v, want code: %v", err, wantCode)
if gotErr := publisher.Error(); !test.ErrorEqual(gotErr, wantErr) {
t.Errorf("Error() got err: %v, want err: %v", gotErr, wantErr)
}
})

Expand All @@ -607,13 +603,13 @@ func TestIntegration_PublishSubscribeSinglePartition(t *testing.T) {
cctx, cancel := context.WithCancel(context.Background())
subscriber := subscriberClient(cctx, t, recvSettings, subscriptionPath)

subsErr := subscriber.Receive(context.Background(), func(ctx context.Context, got *pubsub.Message) {
gotErr := subscriber.Receive(context.Background(), func(ctx context.Context, got *pubsub.Message) {
got.Ack()
cancel()
})

if err, wantCode := xerrors.Unwrap(subsErr), codes.Canceled; !test.ErrorHasCode(err, wantCode) {
t.Errorf("Receive() got err: %v, want code: %v", err, wantCode)
if wantErr := context.Canceled; !test.ErrorEqual(gotErr, wantErr) {
t.Errorf("Receive() got err: %v, want err: %v", gotErr, wantErr)
}
})

Expand All @@ -622,7 +618,7 @@ func TestIntegration_PublishSubscribeSinglePartition(t *testing.T) {
// correctly by publishers.
t.Run("IncreasePartitions", func(t *testing.T) {
// Create the publisher client with the initial single partition.
const pollPeriod = 5 * time.Second
const pollPeriod = 10 * time.Second
pubSettings := DefaultPublishSettings
pubSettings.configPollPeriod = pollPeriod // Poll updates more frequently
publisher := publisherClient(context.Background(), t, pubSettings, topicPath)
Expand All @@ -637,15 +633,25 @@ func TestIntegration_PublishSubscribeSinglePartition(t *testing.T) {
t.Errorf("Failed to increase partitions: %v", err)
}

// Wait for the publisher client to receive the updated partition count.
// Wait for the new config to propagate.
time.Sleep(3 * pollPeriod)

// Publish 2 messages, which should be routed to different partitions
// (round robin).
result1 := publisher.Publish(ctx, &pubsub.Message{Data: []byte("increase-partitions-1")})
result2 := publisher.Publish(ctx, &pubsub.Message{Data: []byte("increase-partitions-2")})
metadata1 := parseMessageMetadata(ctx, t, result1)
metadata2 := parseMessageMetadata(ctx, t, result2)
// Tolerate test flakiness, as the new config may not have propagated on the
// server.
metadata1, err := parseMessageMetadata(ctx, t, result1)
if err != nil {
t.Logf("Warning: failed to publish message: %v. Publisher error: %v", err, publisher.Error())
return
}
metadata2, err := parseMessageMetadata(ctx, t, result2)
if err != nil {
t.Logf("Warning: failed to publish message: %v. Publisher error: %v", err, publisher.Error())
return
}
if metadata1.Partition == metadata2.Partition {
t.Errorf("Messages were published to the same partition = %d. Expected different partitions", metadata1.Partition)
}
Expand Down

0 comments on commit bbee3d5

Please sign in to comment.