From 74da335fea6cd70b27808507f2e58ae53f5f4910 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Mon, 22 Aug 2022 16:49:59 -0400 Subject: [PATCH] feat(pubsub): support exactly once delivery (#6506) * feat(pubsub): prepare iterator for exactly once (#6040) * feat(pubsub): read exactly once for SubscriptionProperties * rename vars to be specific this is exactly once delivery * feat(pubsub): send stream ack deadline seconds on exactly once change #6157 (#6162) * add RWMutex for guarding exactly once bool * feat(pubsub): send stream ack deadline seconds on exactly once change * remove extra test * feat(pubsub): add AckWithResult and NackWithResult to message (#6201) * add AckResult and related methods * feat(pubsub): add AckWithResult and NackWithResult to message * feat(pubsub): add AckWithResult and NackWithResult to message * add comments for AckResult and bring over AcknowledgeStatus from internal * update function definition for IgnoreExported in tests * temporarily update internal/pubsub for samples test * change enum naming to AcknowledgeStatus * remove extra enums in temp internal message.go * remove internal/pubsub/message.go * fix style issues with variadic function options * add back comment format to exported const * keep track of AckResults if exactly once is enabled * feat(pubsub): add helper method for parsing ErrorInfos (#6281) * add AckResult and related methods * feat(pubsub): add AckWithResult and NackWithResult to message * feat(pubsub): add AckWithResult and NackWithResult to message * add comments for AckResult and bring over AcknowledgeStatus from internal * update function definition for IgnoreExported in tests * temporarily update internal/pubsub for samples test * add process results * change enum naming to AcknowledgeStatus * remove extra enums in temp internal message.go * remove internal/pubsub/message.go * add process results * update process info with new enum names * add tests to process error info * add process results * update process info with new enum names * add process results * add tests to process error info * clean up iterator from merge * cleanup comments * add list of retriable errors to test * simplify testing of completed/retry slice lengths * remove getStatus/ackErrors methods * address code review comments * remove error string conversion step * feat(pubsub): complete AckResult for exactly once (#6387) * refactor sendAck to pipe errors to AckResult map * rewrite sendAck/sendModAck for exactly once * add AckResult to list of uncompared methods * use ackResultWithID in all locations * feat(pubsub): retry temporary failures for ack/modacks (#6485) * retry acks in goroutine * retry acks/modacks with transient errors * add retry test * add nack tests and support shorter timeouts * add integration tests * remove extra comment * add commnets to ack/modack methods in iterator * remove transient invalid ack id error string * reduce number of mutex locks * pass in StreamAckDeadline seconds for streaming pull requests in fake_test * fix lint issues * add changes to internal/pubsub/message * implement default ack handler functions in lite * use pubsub package ack result * use pinned library for pubsublite * resolve all lite Ack/NackWithResult to success --- internal/pubsub/message.go | 124 ++++++++ pubsub/go.mod | 2 +- pubsub/go.sum | 5 +- pubsub/integration_test.go | 201 +++++++------ pubsub/iterator.go | 463 ++++++++++++++++++++---------- pubsub/iterator_test.go | 309 +++++++++++++++++++- pubsub/message.go | 75 ++++- pubsub/pstest/fake.go | 40 ++- pubsub/pstest/fake_test.go | 48 +++- pubsub/service.go | 24 ++ pubsub/streaming_pull_test.go | 14 +- pubsub/subscription.go | 66 +++-- pubsub/subscription_test.go | 335 ++++++++++++++++++--- pubsublite/go.mod | 4 +- pubsublite/go.sum | 6 + pubsublite/pscompat/subscriber.go | 20 ++ 16 files changed, 1410 insertions(+), 326 deletions(-) diff --git a/internal/pubsub/message.go b/internal/pubsub/message.go index 53bc12be9df..7d7092b1910 100644 --- a/internal/pubsub/message.go +++ b/internal/pubsub/message.go @@ -14,6 +14,7 @@ package pubsub import ( + "context" "time" ) @@ -24,6 +25,14 @@ type AckHandler interface { // OnNack processes a message nack. OnNack() + + // OnAckWithResult processes a message ack and returns + // a result that shows if it succeeded. + OnAckWithResult() *AckResult + + // OnNackWithResult processes a message nack and returns + // a result that shows if it succeeded. + OnNackWithResult() *AckResult } // Message represents a Pub/Sub message. @@ -85,6 +94,121 @@ func (m *Message) Nack() { } } +// AcknowledgeStatus represents the status of an Ack or Nack request. +type AcknowledgeStatus int + +const ( + // AcknowledgeStatusSuccess indicates the request was a success. + AcknowledgeStatusSuccess AcknowledgeStatus = iota + // AcknowledgeStatusPermissionDenied indicates the caller does not have sufficient permissions. + AcknowledgeStatusPermissionDenied + // AcknowledgeStatusFailedPrecondition indicates the request encountered a FailedPrecondition error. + AcknowledgeStatusFailedPrecondition + // AcknowledgeStatusInvalidAckID indicates one or more of the ack IDs sent were invalid. + AcknowledgeStatusInvalidAckID + // AcknowledgeStatusOther indicates another unknown error was returned. + AcknowledgeStatusOther +) + +// AckResult holds the result from a call to Ack or Nack. +type AckResult struct { + ready chan struct{} + res AcknowledgeStatus + err error +} + +// Ready returns a channel that is closed when the result is ready. +// When the Ready channel is closed, Get is guaranteed not to block. +func (r *AckResult) Ready() <-chan struct{} { return r.ready } + +// Get returns the status and/or error result of a Ack, Nack, or Modack call. +// Get blocks until the Ack/Nack completes or the context is done. +func (r *AckResult) Get(ctx context.Context) (res AcknowledgeStatus, err error) { + // If the result is already ready, return it even if the context is done. + select { + case <-r.Ready(): + return r.res, r.err + default: + } + select { + case <-ctx.Done(): + // Explicitly return AcknowledgeStatusOther for context cancelled cases, + // since the default is success. + return AcknowledgeStatusOther, ctx.Err() + case <-r.Ready(): + return r.res, r.err + } +} + +// NewAckResult creates a AckResult. +func NewAckResult() *AckResult { + return &AckResult{ + ready: make(chan struct{}), + } +} + +// SetAckResult sets the ack response and error for a ack result and closes +// the Ready channel. Any call after the first for the same AckResult +// is a no-op. +func SetAckResult(r *AckResult, res AcknowledgeStatus, err error) { + select { + case <-r.Ready(): + return + default: + r.res = res + r.err = err + close(r.ready) + } +} + +// AckWithResult acknowledges a message in Pub/Sub and it will not be +// delivered to this subscription again. +// +// You should avoid acknowledging messages until you have +// *finished* processing them, so that in the event of a failure, +// you receive the message again. +// +// If exactly-once delivery is enabled on the subscription, the +// AckResult returned by this method tracks the state of acknowledgement +// operation. If the operation completes successfully, the message is +// guaranteed NOT to be re-delivered. Otherwise, the result will +// contain an error with more details about the failure and the +// message may be re-delivered. +// +// If exactly-once delivery is NOT enabled on the subscription, or +// if using Pub/Sub Lite, AckResult readies immediately with a AcknowledgeStatus.Success. +// Since acks in Cloud Pub/Sub are best effort when exactly-once +// delivery is disabled, the message may be re-delivered. Because +// re-deliveries are possible, you should ensure that your processing +// code is idempotent, as you may receive any given message more than +// once. +func (m *Message) AckWithResult() *AckResult { + if m.ackh != nil { + return m.ackh.OnAckWithResult() + } + return nil +} + +// NackWithResult declines to acknowledge the message which indicates that +// the client will not or cannot process a Message. This will cause the message +// to be re-delivered to subscribers. Re-deliveries may take place immediately +// or after a delay. +// +// If exactly-once delivery is enabled on the subscription, the +// AckResult returned by this method tracks the state of nack +// operation. If the operation completes successfully, the result will +// contain AckResponse.Success. Otherwise, the result will contain an error +// with more details about the failure. +// +// If exactly-once delivery is NOT enabled on the subscription, or +// if using Pub/Sub Lite, AckResult readies immediately with a AcknowledgeStatus.Success. +func (m *Message) NackWithResult() *AckResult { + if m.ackh != nil { + return m.ackh.OnNackWithResult() + } + return nil +} + // NewMessage creates a message with an AckHandler implementation, which should // not be nil. func NewMessage(ackh AckHandler) *Message { diff --git a/pubsub/go.mod b/pubsub/go.mod index 771f92fbd57..59ad3fdddf6 100644 --- a/pubsub/go.mod +++ b/pubsub/go.mod @@ -3,7 +3,7 @@ module cloud.google.com/go/pubsub go 1.17 require ( - cloud.google.com/go v0.102.1 + cloud.google.com/go v0.102.1-0.20220708235547-f3d2cc2c987e cloud.google.com/go/iam v0.3.0 cloud.google.com/go/kms v1.4.0 github.com/golang/protobuf v1.5.2 diff --git a/pubsub/go.sum b/pubsub/go.sum index 034d6f35dd8..a96dfa407e9 100644 --- a/pubsub/go.sum +++ b/pubsub/go.sum @@ -29,8 +29,8 @@ cloud.google.com/go v0.99.0/go.mod h1:w0Xx2nLzqWJPuozYQX+hFfCSI8WioryfRDzkoI/Y2Z cloud.google.com/go v0.100.1/go.mod h1:fs4QogzfH5n2pBXBP9vRiU+eCny7lD2vmFZy79Iuw1U= cloud.google.com/go v0.100.2/go.mod h1:4Xra9TjzAeYHrl5+oeLlzbM2k3mjVhZh4UqTZ//w99A= cloud.google.com/go v0.102.0/go.mod h1:oWcCzKlqJ5zgHQt9YsaeTY9KzIvjyy0ArmiBUgpQ+nc= -cloud.google.com/go v0.102.1 h1:vpK6iQWv/2uUeFJth4/cBHsQAGjn1iIE6AAlxipRaA0= -cloud.google.com/go v0.102.1/go.mod h1:XZ77E9qnTEnrgEOvr4xzfdX5TRo7fB4T2F4O6+34hIU= +cloud.google.com/go v0.102.1-0.20220708235547-f3d2cc2c987e h1:GZ9rHNbN2TY+p6/dTeU0EADYrOc3BCqy/KwGPZHLsdA= +cloud.google.com/go v0.102.1-0.20220708235547-f3d2cc2c987e/go.mod h1:mqs3bFXrt/gPc6aOZpchX8DEdQhuJluA/7LZNutd2Nc= cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o= cloud.google.com/go/bigquery v1.3.0/go.mod h1:PjpwJnslEMmckchkHFfq+HTD2DmtT67aNFKH1/VBDHE= cloud.google.com/go/bigquery v1.4.0/go.mod h1:S8dzgnTigyfTmLBfrtrhyYhwRxG72rYxvftPBK2Dvzc= @@ -594,6 +594,7 @@ google.golang.org/genproto v0.0.0-20220505152158-f39f71e6c8f3/go.mod h1:RAyBrSAP google.golang.org/genproto v0.0.0-20220518221133-4f43b3371335/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4= google.golang.org/genproto v0.0.0-20220523171625-347a074981d8/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4= google.golang.org/genproto v0.0.0-20220608133413-ed9918b62aac/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA= +google.golang.org/genproto v0.0.0-20220615141314-f1464d18c36b/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA= google.golang.org/genproto v0.0.0-20220616135557-88e70c0c3a90/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA= google.golang.org/genproto v0.0.0-20220617124728-180714bec0ad/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA= google.golang.org/genproto v0.0.0-20220624142145-8cd45d7dbd1f/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA= diff --git a/pubsub/integration_test.go b/pubsub/integration_test.go index 30207d2312a..121cc2518ec 100644 --- a/pubsub/integration_test.go +++ b/pubsub/integration_test.go @@ -127,15 +127,6 @@ func TestIntegration_All(t *testing.T) { client := integrationTestClient(ctx, t) defer client.Close() - for _, sync := range []bool{false, true} { - for _, maxMsgs := range []int{0, 3, -1} { // MaxOutstandingMessages = default, 3, unlimited - testPublishAndReceive(t, client, maxMsgs, sync, 10, 0) - } - - // Tests for large messages (larger than the 4MB gRPC limit). - testPublishAndReceive(t, client, 0, sync, 1, 5*1024*1024) - } - topic, err := client.CreateTopic(ctx, topicIDs.New()) if err != nil { t.Errorf("CreateTopic error: %v", err) @@ -228,6 +219,20 @@ func TestIntegration_All(t *testing.T) { } } +func TestPublishReceive(t *testing.T) { + ctx := context.Background() + client := integrationTestClient(ctx, t) + + for _, sync := range []bool{false, true} { + for _, maxMsgs := range []int{0, 3, -1} { // MaxOutstandingMessages = default, 3, unlimited + testPublishAndReceive(t, client, maxMsgs, sync, false, 10, 0) + } + + // Tests for large messages (larger than the 4MB gRPC limit). + testPublishAndReceive(t, client, 0, sync, false, 1, 5*1024*1024) + } +} + // withGoogleClientInfo sets the name and version of the application in // the `x-goog-api-client` header passed on each request and returns the // updated context. @@ -246,94 +251,100 @@ func withGoogleClientInfo(ctx context.Context) context.Context { return metadata.NewOutgoingContext(ctx, metadata.Join(allMDs...)) } -func testPublishAndReceive(t *testing.T, client *Client, maxMsgs int, synchronous bool, numMsgs, extraBytes int) { - ctx := context.Background() - topic, err := client.CreateTopic(ctx, topicIDs.New()) - if err != nil { - t.Errorf("CreateTopic error: %v", err) - } - defer topic.Stop() - exists, err := topic.Exists(ctx) - if err != nil { - t.Fatalf("TopicExists error: %v", err) - } - if !exists { - t.Errorf("topic %v should exist, but it doesn't", topic) - } +func testPublishAndReceive(t *testing.T, client *Client, maxMsgs int, synchronous, exactlyOnceDelivery bool, numMsgs, extraBytes int) { + t.Run(fmt.Sprintf("maxMsgs:%d,synchronous:%t,exactlyOnceDelivery:%t,numMsgs:%d", maxMsgs, synchronous, exactlyOnceDelivery, numMsgs), func(t *testing.T) { + t.Parallel() + ctx := context.Background() + topic, err := client.CreateTopic(ctx, topicIDs.New()) + if err != nil { + t.Errorf("CreateTopic error: %v", err) + } + defer topic.Stop() + exists, err := topic.Exists(ctx) + if err != nil { + t.Fatalf("TopicExists error: %v", err) + } + if !exists { + t.Errorf("topic %v should exist, but it doesn't", topic) + } - var sub *Subscription - if sub, err = client.CreateSubscription(ctx, subIDs.New(), SubscriptionConfig{Topic: topic}); err != nil { - t.Errorf("CreateSub error: %v", err) - } - exists, err = sub.Exists(ctx) - if err != nil { - t.Fatalf("SubExists error: %v", err) - } - if !exists { - t.Errorf("subscription %s should exist, but it doesn't", sub.ID()) - } - var msgs []*Message - for i := 0; i < numMsgs; i++ { - text := fmt.Sprintf("a message with an index %d - %s", i, strings.Repeat(".", extraBytes)) - attrs := make(map[string]string) - attrs["foo"] = "bar" - msgs = append(msgs, &Message{ - Data: []byte(text), - Attributes: attrs, + sub, err := client.CreateSubscription(ctx, subIDs.New(), SubscriptionConfig{ + Topic: topic, + EnableExactlyOnceDelivery: exactlyOnceDelivery, }) - } - - // Publish some messages. - type pubResult struct { - m *Message - r *PublishResult - } - var rs []pubResult - for _, m := range msgs { - r := topic.Publish(ctx, m) - rs = append(rs, pubResult{m, r}) - } - want := make(map[string]messageData) - for _, res := range rs { - id, err := res.r.Get(ctx) if err != nil { - t.Fatal(err) + t.Errorf("CreateSub error: %v", err) + } + exists, err = sub.Exists(ctx) + if err != nil { + t.Fatalf("SubExists error: %v", err) + } + if !exists { + t.Errorf("subscription %s should exist, but it doesn't", sub.ID()) + } + var msgs []*Message + for i := 0; i < numMsgs; i++ { + text := fmt.Sprintf("a message with an index %d - %s", i, strings.Repeat(".", extraBytes)) + attrs := make(map[string]string) + attrs["foo"] = "bar" + msgs = append(msgs, &Message{ + Data: []byte(text), + Attributes: attrs, + }) } - md := extractMessageData(res.m) - md.ID = id - want[md.ID] = md - } - sub.ReceiveSettings.MaxOutstandingMessages = maxMsgs - sub.ReceiveSettings.Synchronous = synchronous + // Publish some messages. + type pubResult struct { + m *Message + r *PublishResult + } + var rs []pubResult + for _, m := range msgs { + r := topic.Publish(ctx, m) + rs = append(rs, pubResult{m, r}) + } + want := make(map[string]messageData) + for _, res := range rs { + id, err := res.r.Get(ctx) + if err != nil { + t.Fatal(err) + } + md := extractMessageData(res.m) + md.ID = id + want[md.ID] = md + } - // Use a timeout to ensure that Pull does not block indefinitely if there are - // unexpectedly few messages available. - now := time.Now() - timeout := 3 * time.Minute - timeoutCtx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - gotMsgs, err := pullN(timeoutCtx, sub, len(want), func(ctx context.Context, m *Message) { - m.Ack() - }) - if err != nil { - if c := status.Convert(err); c.Code() == codes.Canceled { - if time.Since(now) >= timeout { - t.Fatal("pullN took too long") + sub.ReceiveSettings.MaxOutstandingMessages = maxMsgs + sub.ReceiveSettings.Synchronous = synchronous + + // Use a timeout to ensure that Pull does not block indefinitely if there are + // unexpectedly few messages available. + now := time.Now() + timeout := 3 * time.Minute + timeoutCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + gotMsgs, err := pullN(timeoutCtx, sub, len(want), func(ctx context.Context, m *Message) { + m.Ack() + }) + if err != nil { + if c := status.Convert(err); c.Code() == codes.Canceled { + if time.Since(now) >= timeout { + t.Fatal("pullN took too long") + } + } else { + t.Fatalf("Pull: %v", err) } - } else { - t.Fatalf("Pull: %v", err) } - } - got := make(map[string]messageData) - for _, m := range gotMsgs { - md := extractMessageData(m) - got[md.ID] = md - } - if !testutil.Equal(got, want) { - t.Fatalf("MaxOutstandingMessages=%d, Synchronous=%t, messages got: %+v, messages want: %+v", - maxMsgs, synchronous, got, want) - } + got := make(map[string]messageData) + for _, m := range gotMsgs { + md := extractMessageData(m) + got[md.ID] = md + } + if !testutil.Equal(got, want) { + t.Fatalf("MaxOutstandingMessages=%d, Synchronous=%t, messages got: %+v, messages want: %+v", + maxMsgs, synchronous, got, want) + } + }) } // IAM tests. @@ -1979,3 +1990,15 @@ func TestIntegration_TopicRetention(t *testing.T) { t.Fatalf("expected cleared retention duration, got: %v", got) } } + +func TestExactlyOnceDelivery_PublishReceive(t *testing.T) { + ctx := context.Background() + client := integrationTestClient(ctx, t) + + for _, maxMsgs := range []int{0, 3, -1} { // MaxOutstandingMessages = default, 3, unlimited + testPublishAndReceive(t, client, maxMsgs, false, true, 10, 0) + } + + // Tests for large messages (larger than the 4MB gRPC limit). + testPublishAndReceive(t, client, 0, false, true, 1, 5*1024*1024) +} diff --git a/pubsub/iterator.go b/pubsub/iterator.go index 4d6aba24d4c..0ebb600f80d 100644 --- a/pubsub/iterator.go +++ b/pubsub/iterator.go @@ -16,14 +16,18 @@ package pubsub import ( "context" + "errors" "io" + "log" "strings" "sync" "time" + ipubsub "cloud.google.com/go/internal/pubsub" vkit "cloud.google.com/go/pubsub/apiv1" "cloud.google.com/go/pubsub/internal/distribution" gax "github.com/googleapis/gax-go/v2" + "github.com/googleapis/gax-go/v2/apierror" pb "google.golang.org/genproto/googleapis/pubsub/v1" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -37,11 +41,24 @@ import ( // of the actual deadline. const gracePeriod = 5 * time.Second +// ackIDBatchSize is the maximum number of ACK IDs to send in a single Ack/Modack RPC. +// The backend imposes a maximum request size limit of 524288 bytes (512 KiB) per +// acknowledge / modifyAckDeadline request. ACK IDs have a maximum size of 164 +// bytes, thus we cannot send more than 524288/176 ~= 2979 ACK IDs in an Ack/ModAc + +// Accounting for some overhead, we should thus only send a maximum of 2500 ACK +// IDs at a time. +// This is a var such that it can be modified for tests. +const ackIDBatchSize int = 2500 + // These are vars so tests can change them. var ( maxDurationPerLeaseExtension = 10 * time.Minute minDurationPerLeaseExtension = 10 * time.Second minDurationPerLeaseExtensionExactlyOnce = 1 * time.Minute + + // The total amount of time to retry acks/modacks with exactly once delivery enabled subscriptions. + exactlyOnceDeliveryRetryDeadline = 600 * time.Second ) type messageIterator struct { @@ -68,12 +85,20 @@ type messageIterator struct { // message arrives, we'll record now+MaxExtension in this table; whenever we have a chance // to update ack deadlines (via modack), we'll consult this table and only include IDs // that are not beyond their deadline. - keepAliveDeadlines map[string]time.Time - pendingAcks map[string]bool - pendingNacks map[string]bool - pendingModAcks map[string]bool // ack IDs whose ack deadline is to be modified - err error // error from stream failure + keepAliveDeadlines map[string]time.Time + pendingAcks map[string]*AckResult + pendingNacks map[string]*AckResult + // ack IDs whose ack deadline is to be modified + // ModAcks don't have AckResults but allows reuse of the SendModAck function. + pendingModAcks map[string]*AckResult + err error // error from stream failure + + eoMu sync.RWMutex enableExactlyOnceDelivery bool + sendNewAckDeadline bool + // This stores pending AckResults for cleaner shutdown when sub.Receive's ctx is cancelled. + // If exactly once delivery is not enabled, this map should not be populated. + pendingAckResults map[string]*AckResult } // newMessageIterator starts and returns a new messageIterator. @@ -116,9 +141,10 @@ func newMessageIterator(subc *vkit.SubscriberClient, subName string, po *pullOpt drained: make(chan struct{}), ackTimeDist: distribution.New(int(maxDurationPerLeaseExtension/time.Second) + 1), keepAliveDeadlines: map[string]time.Time{}, - pendingAcks: map[string]bool{}, - pendingNacks: map[string]bool{}, - pendingModAcks: map[string]bool{}, + pendingAcks: map[string]*AckResult{}, + pendingNacks: map[string]*AckResult{}, + pendingModAcks: map[string]*AckResult{}, + pendingAckResults: map[string]*AckResult{}, } it.wg.Add(1) go it.sender() @@ -167,15 +193,16 @@ func (it *messageIterator) addToDistribution(receiveTime time.Time) { } // Called when a message is acked/nacked. -func (it *messageIterator) done(ackID string, ack bool, receiveTime time.Time) { +func (it *messageIterator) done(ackID string, ack bool, r *AckResult, receiveTime time.Time) { it.addToDistribution(receiveTime) it.mu.Lock() defer it.mu.Unlock() delete(it.keepAliveDeadlines, ackID) + delete(it.pendingAckResults, ackID) if ack { - it.pendingAcks[ackID] = true + it.pendingAcks[ackID] = r } else { - it.pendingNacks[ackID] = true + it.pendingNacks[ackID] = r } it.checkDrained() } @@ -232,16 +259,32 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) { // We received some messages. Remember them so we can keep them alive. Also, // do a receipt mod-ack when streaming. maxExt := time.Now().Add(it.po.maxExtension) - ackIDs := map[string]bool{} + ackIDs := map[string]*AckResult{} it.mu.Lock() + it.eoMu.RLock() + enableExactlyOnceDelivery := it.enableExactlyOnceDelivery + it.eoMu.RUnlock() for _, m := range msgs { ackID := msgAckID(m) addRecv(m.ID, ackID, now) it.keepAliveDeadlines[ackID] = maxExt // Don't change the mod-ack if the message is going to be nacked. This is // possible if there are retries. - if !it.pendingNacks[ackID] { - ackIDs[ackID] = true + if _, ok := it.pendingNacks[ackID]; !ok { + // Don't use the message's AckResult here since these are only for receipt modacks. + // ModAckResults are transparent to the user anyway so these can automatically succeed. + // We can't use an empty AckResult here either since SetAckResult will try to + // close the channel without checking if it exists. + ackIDs[ackID] = newSuccessAckResult() + } + // If exactly once is enabled, keep track of all pending AckResults + // so we can cleanly close them all at shutdown. + if enableExactlyOnceDelivery { + ackh, ok := ipubsub.MessageAckHandler(m).(*psAckHandler) + if !ok { + it.fail(errors.New("failed to assert type as psAckHandler")) + } + it.pendingAckResults[ackID] = ackh.ackResult } } deadline := it.ackDeadline() @@ -282,6 +325,12 @@ func (it *messageIterator) recvMessages() ([]*pb.ReceivedMessage, error) { if err != nil { return nil, err } + it.eoMu.Lock() + if got := res.GetSubscriptionProperties().GetExactlyOnceDeliveryEnabled(); got != it.enableExactlyOnceDelivery { + it.sendNewAckDeadline = true + it.enableExactlyOnceDelivery = got + } + it.eoMu.Unlock() return res.ReceivedMessages, nil } @@ -347,36 +396,30 @@ func (it *messageIterator) sender() { sendPing = !it.po.synchronous } // Lock is held here. - var acks, nacks, modAcks map[string]bool + var acks, nacks, modAcks map[string]*AckResult if sendAcks { acks = it.pendingAcks - it.pendingAcks = map[string]bool{} + it.pendingAcks = map[string]*AckResult{} } if sendNacks { nacks = it.pendingNacks - it.pendingNacks = map[string]bool{} + it.pendingNacks = map[string]*AckResult{} } if sendModAcks { modAcks = it.pendingModAcks - it.pendingModAcks = map[string]bool{} + it.pendingModAcks = map[string]*AckResult{} } it.mu.Unlock() // Make Ack and ModAck RPCs. if sendAcks { - if !it.sendAck(acks) { - return - } + it.sendAck(acks) } if sendNacks { // Nack indicated by modifying the deadline to zero. - if !it.sendModAck(nacks, 0) { - return - } + it.sendModAck(nacks, 0) } if sendModAcks { - if !it.sendModAck(modAcks, dl) { - return - } + it.sendModAck(modAcks, dl) } if sendPing { it.pingStream() @@ -398,134 +441,192 @@ func (it *messageIterator) handleKeepAlives() { // https://groups.google.com/forum/#!msg/golang-nuts/UciASUb03Js/pzSq5iVFAQAJ. delete(it.keepAliveDeadlines, id) } else { - // This will not conflict with a nack, because nacking removes the ID from keepAliveDeadlines. - it.pendingModAcks[id] = true + // Use a success AckResult since we don't propagate ModAcks back to the user. + it.pendingModAcks[id] = newSuccessAckResult() } } it.checkDrained() } -func (it *messageIterator) sendAck(m map[string]bool) bool { - // Account for the Subscription field. - overhead := calcFieldSizeString(it.subName) - return it.sendAckIDRPC(m, maxPayload-overhead, func(ids []string) error { - recordStat(it.ctx, AckCount, int64(len(ids))) - addAcks(ids) - bo := gax.Backoff{ - Initial: 100 * time.Millisecond, - Max: time.Second, - Multiplier: 2, - } - cctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) - defer cancel() - for { - // Use context.Background() as the call's context, not it.ctx. We don't - // want to cancel this RPC when the iterator is stopped. - cctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second) - defer cancel2() - err := it.subc.Acknowledge(cctx2, &pb.AcknowledgeRequest{ - Subscription: it.subName, - AckIds: ids, - }) - // Retry DeadlineExceeded errors a few times before giving up and - // allowing the message to expire and be redelivered. - // The underlying library handles other retries, currently only - // codes.Unavailable. - switch status.Code(err) { - case codes.DeadlineExceeded: - // Use the outer context with timeout here. Errors from gax, including - // context deadline exceeded should be transparent, as unacked messages - // will be redelivered. - if err := gax.Sleep(cctx, bo.Pause()); err != nil { - return nil - } - default: - // TODO(b/226593754): by default, errors should not be fatal unless exactly once is enabled - // since acks are "fire and forget". Once EOS feature is out, retry these errors - // if exactly-once is enabled, which can be determined from StreamingPull response. - return nil +// sendAck is used to confirm acknowledgement of a message. If exactly once delivery is +// enabled, we'll retry these messages for a short duration in a goroutine. +func (it *messageIterator) sendAck(m map[string]*AckResult) { + ackIDs := make([]string, 0, len(m)) + for k := range m { + ackIDs = append(ackIDs, k) + } + it.eoMu.RLock() + exactlyOnceDelivery := it.enableExactlyOnceDelivery + it.eoMu.RUnlock() + + var toSend []string + for len(ackIDs) > 0 { + toSend, ackIDs = splitRequestIDs(ackIDs, ackIDBatchSize) + + recordStat(it.ctx, AckCount, int64(len(toSend))) + addAcks(toSend) + // Use context.Background() as the call's context, not it.ctx. We don't + // want to cancel this RPC when the iterator is stopped. + cctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel2() + err := it.subc.Acknowledge(cctx2, &pb.AcknowledgeRequest{ + Subscription: it.subName, + AckIds: toSend, + }) + if exactlyOnceDelivery { + resultsByAckID := make(map[string]*AckResult) + for _, ackID := range toSend { + resultsByAckID[ackID] = m[ackID] + } + st, md := extractMetadata(err) + _, toRetry := processResults(st, resultsByAckID, md) + if len(toRetry) > 0 { + // Retry acks in a separate goroutine. + go func() { + it.retryAcks(toRetry) + }() } } - }) + } } +// sendModAck is used to extend the lease of messages or nack them. // The receipt mod-ack amount is derived from a percentile distribution based // on the time it takes to process messages. The percentile chosen is the 99%th // percentile in order to capture the highest amount of time necessary without -// considering 1% outliers. -func (it *messageIterator) sendModAck(m map[string]bool, deadline time.Duration) bool { +// considering 1% outliers. If the ModAck RPC fails and exactly once delivery is +// enabled, we retry it in a separate goroutine for a short duration. +func (it *messageIterator) sendModAck(m map[string]*AckResult, deadline time.Duration) { deadlineSec := int32(deadline / time.Second) - // Account for the Subscription and AckDeadlineSeconds fields. - overhead := calcFieldSizeString(it.subName) + calcFieldSizeInt(int(deadlineSec)) - return it.sendAckIDRPC(m, maxPayload-overhead, func(ids []string) error { + ackIDs := make([]string, 0, len(m)) + for k := range m { + ackIDs = append(ackIDs, k) + } + it.eoMu.RLock() + exactlyOnceDelivery := it.enableExactlyOnceDelivery + it.eoMu.RUnlock() + var toSend []string + for len(ackIDs) > 0 { + toSend, ackIDs = splitRequestIDs(ackIDs, ackIDBatchSize) if deadline == 0 { - recordStat(it.ctx, NackCount, int64(len(ids))) + recordStat(it.ctx, NackCount, int64(len(toSend))) } else { - recordStat(it.ctx, ModAckCount, int64(len(ids))) - } - addModAcks(ids, deadlineSec) - // Retry this RPC on Unavailable for a short amount of time, then give up - // without returning a fatal error. The utility of this RPC is by nature - // transient (since the deadline is relative to the current time) and it - // isn't crucial for correctness (since expired messages will just be - // resent). - cctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) - defer cancel() - bo := gax.Backoff{ - Initial: 100 * time.Millisecond, - Max: time.Second, - Multiplier: 2, + recordStat(it.ctx, ModAckCount, int64(len(toSend))) } - for { - err := it.subc.ModifyAckDeadline(cctx, &pb.ModifyAckDeadlineRequest{ - Subscription: it.subName, - AckDeadlineSeconds: deadlineSec, - AckIds: ids, - }) - switch status.Code(err) { - case codes.Unavailable: - if err := gax.Sleep(cctx, bo.Pause()); err == nil { - continue - } - // Treat sleep timeout like RPC timeout. - fallthrough - case codes.DeadlineExceeded: - // Timeout. Not a fatal error, but note that it happened. - recordStat(it.ctx, ModAckTimeoutCount, 1) - return nil - default: - // This addresses an error where `context deadline exceeded` errors - // not captured by the previous case causes fatal errors. - // See https://github.com/googleapis/google-cloud-go/issues/3060 - if err != nil && strings.Contains(err.Error(), "context deadline exceeded") { - recordStat(it.ctx, ModAckTimeoutCount, 1) - return nil - } - // TODO(b/226593754): by default, errors should not be fatal unless exactly once is enabled - // since modacks are "fire and forget". Once EOS feature is out, retry these errors - // if exactly-once is enabled, which can be determined from StreamingPull response. - return nil + addModAcks(toSend, deadlineSec) + // Use context.Background() as the call's context, not it.ctx. We don't + // want to cancel this RPC when the iterator is stopped. + cctx, cancel2 := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel2() + err := it.subc.ModifyAckDeadline(cctx, &pb.ModifyAckDeadlineRequest{ + Subscription: it.subName, + AckDeadlineSeconds: deadlineSec, + AckIds: toSend, + }) + if exactlyOnceDelivery { + resultsByAckID := make(map[string]*AckResult) + for _, ackID := range toSend { + resultsByAckID[ackID] = m[ackID] + } + + st, md := extractMetadata(err) + _, toRetry := processResults(st, resultsByAckID, md) + if len(toRetry) > 0 { + // Retry modacks/nacks in a separate goroutine. + go func() { + it.retryModAcks(toRetry, deadlineSec) + }() } } - }) + } } -func (it *messageIterator) sendAckIDRPC(ackIDSet map[string]bool, maxSize int, call func([]string) error) bool { - ackIDs := make([]string, 0, len(ackIDSet)) - for k := range ackIDSet { - ackIDs = append(ackIDs, k) +// retryAcks retries the ack RPC with backoff. This must be called in a goroutine +// in it.sendAck(), with a max of 2500 ackIDs. +func (it *messageIterator) retryAcks(m map[string]*AckResult) { + ctx, cancel := context.WithTimeout(context.Background(), exactlyOnceDeliveryRetryDeadline) + defer cancel() + bo := newExactlyOnceBackoff() + for { + if ctx.Err() != nil { + for _, r := range m { + ipubsub.SetAckResult(r, AcknowledgeStatusOther, ctx.Err()) + } + return + } + // Don't need to split map since this is the retry function and + // there is already a max of 2500 ackIDs here. + ackIDs := make([]string, 0, len(m)) + for k := range m { + ackIDs = append(ackIDs, k) + } + cctx2, cancel2 := context.WithTimeout(ctx, 60*time.Second) + defer cancel2() + err := it.subc.Acknowledge(cctx2, &pb.AcknowledgeRequest{ + Subscription: it.subName, + AckIds: ackIDs, + }) + st, md := extractMetadata(err) + _, toRetry := processResults(st, m, md) + if len(toRetry) == 0 { + return + } + time.Sleep(bo.Pause()) + m = toRetry } - var toSend []string - for len(ackIDs) > 0 { - toSend, ackIDs = splitRequestIDs(ackIDs, maxSize) - if err := call(toSend); err != nil { - // The underlying client handles retries, so any error is fatal to the - // iterator. - it.fail(err) - return false +} + +// retryModAcks retries the modack RPC with backoff. This must be called in a goroutine +// in it.sendModAck(), with a max of 2500 ackIDs. Modacks are retried up to 3 times +// since after that, the message will have expired. Nacks are retried up until the default +// deadline of 10 minutes. +func (it *messageIterator) retryModAcks(m map[string]*AckResult, deadlineSec int32) { + bo := newExactlyOnceBackoff() + retryCount := 0 + ctx, cancel := context.WithTimeout(context.Background(), exactlyOnceDeliveryRetryDeadline) + defer cancel() + for { + // If context is done, complete all remaining Nacks with DeadlineExceeded + // ModAcks are not exposed to the user so these don't need to be modified. + if ctx.Err() != nil { + if deadlineSec == 0 { + for _, r := range m { + ipubsub.SetAckResult(r, AcknowledgeStatusOther, ctx.Err()) + } + } + return + } + // Only retry modack requests up to 3 times. + if deadlineSec != 0 && retryCount > 3 { + ackIDs := make([]string, 0, len(m)) + for k := range m { + ackIDs = append(ackIDs, k) + } + log.Printf("automatic lease modack retry failed for following IDs: %v", ackIDs) + return } + // Don't need to split map since this is the retry function and + // there is already a max of 2500 ackIDs here. + ackIDs := make([]string, 0, len(m)) + for k := range m { + ackIDs = append(ackIDs, k) + } + cctx2, cancel2 := context.WithTimeout(ctx, 60*time.Second) + defer cancel2() + err := it.subc.ModifyAckDeadline(cctx2, &pb.ModifyAckDeadlineRequest{ + Subscription: it.subName, + AckIds: ackIDs, + AckDeadlineSeconds: deadlineSec, + }) + st, md := extractMetadata(err) + _, toRetry := processResults(st, m, md) + if len(toRetry) == 0 { + return + } + time.Sleep(bo.Pause()) + m = toRetry + retryCount++ } - return true } // Send a message to the stream to keep it open. The stream will close if there's no @@ -535,8 +636,14 @@ func (it *messageIterator) sendAckIDRPC(ackIDSet map[string]bool, maxSize int, c // default ack deadline, and if the messages are small enough so that many can fit // into the buffer. func (it *messageIterator) pingStream() { - // Ignore error; if the stream is broken, this doesn't matter anyway. - _ = it.ps.Send(&pb.StreamingPullRequest{}) + spr := &pb.StreamingPullRequest{} + it.eoMu.RLock() + if it.sendNewAckDeadline { + spr.StreamAckDeadlineSeconds = int32(it.ackDeadline()) + it.sendNewAckDeadline = false + } + it.eoMu.RUnlock() + it.ps.Send(spr) } // calcFieldSizeString returns the number of bytes string fields @@ -560,20 +667,12 @@ func calcFieldSizeInt(fields ...int) int { } // splitRequestIDs takes a slice of ackIDs and returns two slices such that the first -// ackID slice can be used in a request where the payload does not exceed maxSize. -func splitRequestIDs(ids []string, maxSize int) (prefix, remainder []string) { - size := 0 - i := 0 - // TODO(hongalex): Use binary search to find split index, since ackIDs are - // fairly constant. - for size < maxSize && i < len(ids) { - size += calcFieldSizeString(ids[i]) - i++ - } - if size > maxSize { - i-- - } - return ids[:i], ids[i:] +// ackID slice can be used in a request where the payload does not exceed ackIDBatchSize. +func splitRequestIDs(ids []string, maxBatchSize int) (prefix, remainder []string) { + if len(ids) < maxBatchSize { + return ids, []string{} + } + return ids[:maxBatchSize], ids[maxBatchSize:] } // The deadline to ack is derived from a percentile distribution based @@ -584,8 +683,10 @@ func splitRequestIDs(ids []string, maxSize int) (prefix, remainder []string) { // expiration. func (it *messageIterator) ackDeadline() time.Duration { pt := time.Duration(it.ackTimeDist.Percentile(.99)) * time.Second - - return boundedDuration(pt, it.po.minExtensionPeriod, it.po.maxExtensionPeriod, it.enableExactlyOnceDelivery) + it.eoMu.RLock() + enableExactlyOnce := it.enableExactlyOnceDelivery + it.eoMu.RUnlock() + return boundedDuration(pt, it.po.minExtensionPeriod, it.po.maxExtensionPeriod, enableExactlyOnce) } func boundedDuration(ackDeadline, minExtension, maxExtension time.Duration, exactlyOnce bool) time.Duration { @@ -624,3 +725,65 @@ func maxDuration(x, y time.Duration) time.Duration { } return y } + +const ( + transientErrStringPrefix = "TRANSIENT_" + permanentInvalidAckErrString = "PERMANENT_FAILURE_INVALID_ACK_ID" +) + +// extracts information from an API error for exactly once delivery's ack/modack err responses. +func extractMetadata(err error) (*status.Status, map[string]string) { + apiErr, ok := apierror.FromError(err) + if ok { + return apiErr.GRPCStatus(), apiErr.Metadata() + } + return nil, nil +} + +// processResults processes AckResults by referring to errorStatus and errorsMap. +// The errors returned by the server in `errorStatus` or in `errorsByAckID` +// are used to complete the AckResults in `ackResMap` (with a success +// or error) or to return requests for further retries. +// This function returns two maps of ackID to ack results, one for completed results and the other for ones to retry. +// Logic is derived from python-pubsub: https://github.com/googleapis/python-pubsub/blob/main/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py#L161-L220 +func processResults(errorStatus *status.Status, ackResMap map[string]*AckResult, errorsByAckID map[string]string) (map[string]*AckResult, map[string]*AckResult) { + completedResults := make(map[string]*AckResult) + retryResults := make(map[string]*AckResult) + for ackID, ar := range ackResMap { + // Handle special errors returned for ack/modack RPCs via the ErrorInfo + // sidecar metadata when exactly-once delivery is enabled. + if errAckID, ok := errorsByAckID[ackID]; ok { + if strings.HasPrefix(errAckID, transientErrStringPrefix) { + retryResults[ackID] = ar + } else { + if errAckID == permanentInvalidAckErrString { + ipubsub.SetAckResult(ar, AcknowledgeStatusInvalidAckID, errors.New(errAckID)) + } else { + ipubsub.SetAckResult(ar, AcknowledgeStatusOther, errors.New(errAckID)) + } + completedResults[ackID] = ar + } + } else if errorStatus != nil && contains(errorStatus.Code(), exactlyOnceDeliveryTemporaryRetryErrors) { + retryResults[ackID] = ar + } else if errorStatus != nil { + // Other gRPC errors are not retried. + switch errorStatus.Code() { + case codes.PermissionDenied: + ipubsub.SetAckResult(ar, AcknowledgeStatusPermissionDenied, errorStatus.Err()) + case codes.FailedPrecondition: + ipubsub.SetAckResult(ar, AcknowledgeStatusFailedPrecondition, errorStatus.Err()) + default: + ipubsub.SetAckResult(ar, AcknowledgeStatusOther, errorStatus.Err()) + } + completedResults[ackID] = ar + } else if ar != nil { + // Since no error occurred, requests with AckResults are completed successfully. + ipubsub.SetAckResult(ar, AcknowledgeStatusSuccess, nil) + completedResults[ackID] = ar + } else { + // All other requests are considered completed. + completedResults[ackID] = ar + } + } + return completedResults, retryResults +} diff --git a/pubsub/iterator_test.go b/pubsub/iterator_test.go index 9f2aec19d35..10ac1df6e64 100644 --- a/pubsub/iterator_test.go +++ b/pubsub/iterator_test.go @@ -25,19 +25,23 @@ import ( "testing" "time" + ipubsub "cloud.google.com/go/internal/pubsub" "cloud.google.com/go/internal/testutil" "cloud.google.com/go/pubsub/pstest" "google.golang.org/api/option" pb "google.golang.org/genproto/googleapis/pubsub/v1" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" ) var ( - projName = "some-project" + projName = "P" topicName = "some-topic" + subName = "some-sub" fullyQualifiedTopicName = fmt.Sprintf("projects/%s/topics/%s", projName, topicName) + fullyQualifiedSubName = fmt.Sprintf("projects/%s/subscriptions/%s", projName, subName) ) func TestSplitRequestIDs(t *testing.T) { @@ -47,16 +51,17 @@ func TestSplitRequestIDs(t *testing.T) { ids []string splitIndex int }{ - {[]string{}, 0}, - {ids, 2}, - {ids[:2], 2}, + {[]string{}, 0}, // empty slice, no split + {ids, 2}, // slice of size 5, split at index 2 + {ids[:2], 2}, // slice of size 3, split at index 2 + {ids[:1], 1}, // slice of size 1, split at index 1 } { - got1, got2 := splitRequestIDs(test.ids, 15) + got1, got2 := splitRequestIDs(test.ids, 2) want1, want2 := test.ids[:test.splitIndex], test.ids[test.splitIndex:] - if !testutil.Equal(got1, want1) { + if !testutil.Equal(len(got1), len(want1)) { t.Errorf("%v, 1: got %v, want %v", test, got1, want1) } - if !testutil.Equal(got2, want2) { + if !testutil.Equal(len(got2), len(want2)) { t.Errorf("%v, 2: got %v, want %v", test, got2, want2) } } @@ -241,7 +246,7 @@ func startReceiving(ctx context.Context, t *testing.T, s *Subscription, recvdWg _, ok := recvd[msgData] if ok { recvdMu.Unlock() - t.Fatalf("already saw \"%s\"\n", msgData) + t.Logf("already saw \"%s\"\n", msgData) return } recvd[msgData] = true @@ -497,18 +502,59 @@ func TestIterator_BoundedDuration(t *testing.T) { } } -func TestAddToDistribution(t *testing.T) { +func TestIterator_StreamingPullExactlyOnce(t *testing.T) { srv := pstest.NewServer() ctx, cancel := context.WithCancel(context.Background()) defer cancel() srv.Publish(fullyQualifiedTopicName, []byte("creating a topic"), nil) - _, client, err := initConn(ctx, srv.Addr) + conn, err := grpc.Dial(srv.Addr, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { t.Fatal(err) } - iter := newMessageIterator(client.subc, fullyQualifiedTopicName, &pullOptions{}) + opts := withGRPCHeadersAssertion(t, option.WithGRPCConn(conn)) + client, err := NewClient(ctx, projName, opts...) + if err != nil { + t.Fatal(err) + } + + topic := client.Topic(topicName) + sc := SubscriptionConfig{ + Topic: topic, + EnableMessageOrdering: true, + EnableExactlyOnceDelivery: true, + } + _, err = client.CreateSubscription(ctx, subName, sc) + if err != nil { + t.Fatal(err) + } + + // Make sure to call publish before constructing the iterator. + srv.Publish(fullyQualifiedTopicName, []byte("msg"), nil) + + iter := newMessageIterator(client.subc, fullyQualifiedSubName, &pullOptions{ + synchronous: false, + maxOutstandingMessages: 100, + maxOutstandingBytes: 1e6, + maxPrefetch: 30, + maxExtension: 1 * time.Minute, + maxExtensionPeriod: 10 * time.Second, + }) + + if _, err := iter.receive(10); err != nil { + t.Fatalf("Got error in recvMessages: %v", err) + } + + if !iter.enableExactlyOnceDelivery { + t.Fatalf("expected iter.enableExactlyOnce=true") + } +} + +func TestAddToDistribution(t *testing.T) { + c, _ := newFake(t) + + iter := newMessageIterator(c.subc, "some-sub", &pullOptions{}) // Start with a datapoint that's too small that should be bounded to 10s. receiveTime := time.Now().Add(time.Duration(-1) * time.Second) @@ -537,3 +583,244 @@ func TestAddToDistribution(t *testing.T) { t.Errorf("99th percentile ack distribution got: %v, want %v", deadline, want) } } + +func TestPingStreamAckDeadline(t *testing.T) { + c, srv := newFake(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + srv.Publish(fullyQualifiedTopicName, []byte("creating a topic"), nil) + topic := c.Topic(topicName) + s, err := c.CreateSubscription(ctx, subName, SubscriptionConfig{Topic: topic}) + if err != nil { + t.Errorf("failed to create subscription: %v", err) + } + + iter := newMessageIterator(c.subc, fullyQualifiedSubName, &pullOptions{}) + defer iter.stop() + + iter.eoMu.RLock() + if iter.enableExactlyOnceDelivery { + t.Error("iter.enableExactlyOnceDelivery should be false") + } + iter.eoMu.RUnlock() + + _, err = s.Update(ctx, SubscriptionConfigToUpdate{ + EnableExactlyOnceDelivery: true, + }) + if err != nil { + t.Error(err) + } + srv.Publish(fullyQualifiedTopicName, []byte("creating a topic"), nil) + // Receive one message via the stream to trigger the update to enableExactlyOnceDelivery + iter.receive(1) + iter.eoMu.RLock() + if !iter.enableExactlyOnceDelivery { + t.Error("iter.enableExactlyOnceDelivery should be true") + } + iter.eoMu.RUnlock() +} + +func compareCompletedRetryLengths(t *testing.T, completed, retry map[string]*AckResult, wantCompleted, wantRetry int) { + if l := len(completed); l != wantCompleted { + t.Errorf("completed slice length got %d, want %d", l, wantCompleted) + } + if l := len(retry); l != wantRetry { + t.Errorf("retry slice length got %d, want %d", l, wantRetry) + } +} + +func TestExactlyOnceProcessRequests(t *testing.T) { + ctx := context.Background() + + t.Run("NoResults", func(t *testing.T) { + // If the ackResMap is nil, then the resulting slices should be empty. + // nil maps here behave the same as if they were empty maps. + completed, retry := processResults(nil, nil, nil) + compareCompletedRetryLengths(t, completed, retry, 0, 0) + }) + + t.Run("NoErrorsNilAckResult", func(t *testing.T) { + // No errors so request should be completed even without an AckResult. + ackReqMap := map[string]*AckResult{ + "ackID": nil, + } + completed, retry := processResults(nil, ackReqMap, nil) + compareCompletedRetryLengths(t, completed, retry, 1, 0) + }) + + t.Run("NoErrors", func(t *testing.T) { + // No errors so AckResult should be completed with success. + r := ipubsub.NewAckResult() + ackReqMap := map[string]*AckResult{ + "ackID1": r, + } + completed, retry := processResults(nil, ackReqMap, nil) + compareCompletedRetryLengths(t, completed, retry, 1, 0) + + // We can obtain the AckStatus from AckResult if results are completed. + s, err := r.Get(ctx) + if err != nil { + t.Errorf("AckResult err: got %v, want nil", err) + } + if s != AcknowledgeStatusSuccess { + t.Errorf("got %v, want AcknowledgeStatusSuccess", s) + } + }) + + t.Run("PermanentErrorInvalidAckID", func(t *testing.T) { + r := ipubsub.NewAckResult() + ackReqMap := map[string]*AckResult{ + "ackID1": r, + } + errorsMap := map[string]string{ + "ackID1": permanentInvalidAckErrString, + } + completed, retry := processResults(nil, ackReqMap, errorsMap) + compareCompletedRetryLengths(t, completed, retry, 1, 0) + s, err := r.Get(ctx) + if err == nil { + t.Error("AckResult err: got nil, want err") + } + if s != AcknowledgeStatusInvalidAckID { + t.Errorf("got %v, want AcknowledgeStatusSuccess", s) + } + }) + + t.Run("TransientErrorRetry", func(t *testing.T) { + r := ipubsub.NewAckResult() + ackReqMap := map[string]*AckResult{ + "ackID1": r, + } + errorsMap := map[string]string{ + "ackID1": transientErrStringPrefix + "_FAILURE", + } + completed, retry := processResults(nil, ackReqMap, errorsMap) + compareCompletedRetryLengths(t, completed, retry, 0, 1) + }) + + t.Run("UnknownError", func(t *testing.T) { + r := ipubsub.NewAckResult() + ackReqMap := map[string]*AckResult{ + "ackID1": r, + } + errorsMap := map[string]string{ + "ackID1": "unknown_error", + } + completed, retry := processResults(nil, ackReqMap, errorsMap) + compareCompletedRetryLengths(t, completed, retry, 1, 0) + + s, err := r.Get(ctx) + if s != AcknowledgeStatusOther { + t.Errorf("got %v, want AcknowledgeStatusOther", s) + } + if err == nil || err.Error() != "unknown_error" { + t.Errorf("AckResult err: got %s, want unknown_error", err.Error()) + } + }) + + t.Run("PermissionDenied", func(t *testing.T) { + r := ipubsub.NewAckResult() + ackReqMap := map[string]*AckResult{ + "ackID1": r, + } + st := status.New(codes.PermissionDenied, "permission denied") + completed, retry := processResults(st, ackReqMap, nil) + compareCompletedRetryLengths(t, completed, retry, 1, 0) + s, err := r.Get(ctx) + if err == nil { + t.Error("AckResult err: got nil, want err") + } + if s != AcknowledgeStatusPermissionDenied { + t.Errorf("got %v, want AcknowledgeStatusPermissionDenied", s) + } + }) + + t.Run("FailedPrecondition", func(t *testing.T) { + r := ipubsub.NewAckResult() + ackReqMap := map[string]*AckResult{ + "ackID1": r, + } + st := status.New(codes.FailedPrecondition, "failed_precondition") + completed, retry := processResults(st, ackReqMap, nil) + compareCompletedRetryLengths(t, completed, retry, 1, 0) + s, err := r.Get(ctx) + if err == nil { + t.Error("AckResult err: got nil, want err") + } + if s != AcknowledgeStatusFailedPrecondition { + t.Errorf("got %v, want AcknowledgeStatusFailedPrecondition", s) + } + }) + + t.Run("OtherErrorStatus", func(t *testing.T) { + r := ipubsub.NewAckResult() + ackReqMap := map[string]*AckResult{ + "ackID1": r, + } + st := status.New(codes.OutOfRange, "out of range") + completed, retry := processResults(st, ackReqMap, nil) + compareCompletedRetryLengths(t, completed, retry, 1, 0) + s, err := r.Get(ctx) + if err == nil { + t.Error("AckResult err: got nil, want err") + } + if s != AcknowledgeStatusOther { + t.Errorf("got %v, want AcknowledgeStatusOther", s) + } + }) + + t.Run("MixedSuccessFailureAcks", func(t *testing.T) { + r1 := ipubsub.NewAckResult() + r2 := ipubsub.NewAckResult() + r3 := ipubsub.NewAckResult() + ackReqMap := map[string]*AckResult{ + "ackID1": r1, + "ackID2": r2, + "ackID3": r3, + } + errorsMap := map[string]string{ + "ackID1": permanentInvalidAckErrString, + "ackID2": transientErrStringPrefix + "_FAILURE", + } + completed, retry := processResults(nil, ackReqMap, errorsMap) + compareCompletedRetryLengths(t, completed, retry, 2, 1) + // message with ackID "ackID1" fails + s, err := r1.Get(ctx) + if err == nil { + t.Error("r1: AckResult err: got nil, want err") + } + if s != AcknowledgeStatusInvalidAckID { + t.Errorf("r1: got %v, want AcknowledgeInvalidAckID", s) + } + + // message with ackID "ackID2" is to be retried + ctx2, cancel := context.WithTimeout(ctx, 2*time.Second) + defer cancel() + _, err = r2.Get(ctx2) + if !errors.Is(err, context.DeadlineExceeded) { + t.Errorf("r2: AckResult.Get should timeout, got: %v", err) + } + + // message with ackID "ackID3" succeeds + s, err = r3.Get(ctx) + if err != nil { + t.Errorf("r3: AckResult err: got %v, want nil\n", err) + } + if s != AcknowledgeStatusSuccess { + t.Errorf("r3: got %v, want AcknowledgeStatusSuccess", s) + } + }) + + t.Run("RetriableErrorStatusReturnsRequestForRetrying", func(t *testing.T) { + for c := range exactlyOnceDeliveryTemporaryRetryErrors { + r := ipubsub.NewAckResult() + ackReqMap := map[string]*AckResult{ + "ackID1": r, + } + st := status.New(c, "") + completed, retry := processResults(st, ackReqMap, nil) + compareCompletedRetryLengths(t, completed, retry, 0, 1) + } + }) +} diff --git a/pubsub/message.go b/pubsub/message.go index 760bd4c4236..049d034e386 100644 --- a/pubsub/message.go +++ b/pubsub/message.go @@ -34,23 +34,29 @@ import ( // fails, the Message will be redelivered. Nack indicates that the client will // not or cannot process a Message. Nack will result in the Message being // redelivered more quickly than if it were allowed to expire. +// +// If using exactly once delivery, you should call Message.AckWithResult and +// Message.NackWithResult instead. These methods will return an AckResult, +// which you should wait on to obtain the status of the Ack/Nack to ensure +// these were properly processed by the server. If not, type Message = ipubsub.Message // msgAckHandler performs a safe cast of the message's ack handler to psAckHandler. -func msgAckHandler(m *Message) (*psAckHandler, bool) { +func msgAckHandler(m *Message, eod bool) (*psAckHandler, bool) { ackh, ok := ipubsub.MessageAckHandler(m).(*psAckHandler) + ackh.exactlyOnceDelivery = eod return ackh, ok } func msgAckID(m *Message) string { - if ackh, ok := msgAckHandler(m); ok { + if ackh, ok := msgAckHandler(m, false); ok { return ackh.ackID } return "" } // The done method of the iterator that created a Message. -type iterDoneFunc func(string, bool, time.Time) +type iterDoneFunc func(string, bool, *AckResult, time.Time) func convertMessages(rms []*pb.ReceivedMessage, receiveTime time.Time, doneFunc iterDoneFunc) ([]*Message, error) { msgs := make([]*Message, 0, len(rms)) @@ -87,9 +93,37 @@ func toMessage(resp *pb.ReceivedMessage, receiveTime time.Time, doneFunc iterDon msg.OrderingKey = resp.Message.OrderingKey ackh.receiveTime = receiveTime ackh.doneFunc = doneFunc + ackh.ackResult = ipubsub.NewAckResult() return msg, nil } +// AckResult holds the result from a call to Ack or Nack. +// +// Call Get to obtain the result of the Ack/NackWithResult call. Example: +// +// // Get blocks until Ack/NackWithResult completes or ctx is done. +// ackStatus, err := r.Get(ctx) +// if err != nil { +// // TODO: Handle error. +// } +type AckResult = ipubsub.AckResult + +// AcknowledgeStatus represents the status of an Ack or Nack request. +type AcknowledgeStatus = ipubsub.AcknowledgeStatus + +const ( + // AcknowledgeStatusSuccess indicates the request was a success. + AcknowledgeStatusSuccess AcknowledgeStatus = iota + // AcknowledgeStatusPermissionDenied indicates the caller does not have sufficient permissions. + AcknowledgeStatusPermissionDenied + // AcknowledgeStatusFailedPrecondition indicates the request encountered a FailedPrecondition error. + AcknowledgeStatusFailedPrecondition + // AcknowledgeStatusInvalidAckID indicates one or more of the ack IDs sent were invalid. + AcknowledgeStatusInvalidAckID + // AcknowledgeStatusOther indicates another unknown error was returned. + AcknowledgeStatusOther +) + // psAckHandler handles ack/nack for the pubsub package. type psAckHandler struct { // ackID is the identifier to acknowledge this message. @@ -102,6 +136,14 @@ type psAckHandler struct { // The done method of the iterator that created this Message. doneFunc iterDoneFunc + + // the ack result that will be returned for this ack handler + // if AckWithResult or NackWithResult is called. + ackResult *AckResult + + // exactlyOnceDelivery determines if the message needs to be delivered + // exactly once. + exactlyOnceDelivery bool } func (ah *psAckHandler) OnAck() { @@ -112,12 +154,37 @@ func (ah *psAckHandler) OnNack() { ah.done(false) } +func (ah *psAckHandler) OnAckWithResult() *AckResult { + if !ah.exactlyOnceDelivery { + return newSuccessAckResult() + } + // call done with true to indicate ack. + ah.done(true) + return ah.ackResult +} + +func (ah *psAckHandler) OnNackWithResult() *AckResult { + if !ah.exactlyOnceDelivery { + return newSuccessAckResult() + } + // call done with false to indicate nack. + ah.done(false) + return ah.ackResult +} + func (ah *psAckHandler) done(ack bool) { if ah.calledDone { return } ah.calledDone = true if ah.doneFunc != nil { - ah.doneFunc(ah.ackID, ack, ah.receiveTime) + ah.doneFunc(ah.ackID, ack, ah.ackResult, ah.receiveTime) } } + +// newSuccessAckResult returns an AckResult that resolves to success immediately. +func newSuccessAckResult() *AckResult { + ar := ipubsub.NewAckResult() + ipubsub.SetAckResult(ar, AcknowledgeStatusSuccess, nil) + return ar +} diff --git a/pubsub/pstest/fake.go b/pubsub/pstest/fake.go index c95444a4d65..cae0f2db1b9 100644 --- a/pubsub/pstest/fake.go +++ b/pubsub/pstest/fake.go @@ -631,6 +631,9 @@ func (s *GServer) UpdateSubscription(_ context.Context, req *pb.UpdateSubscripti case "enable_exactly_once_delivery": sub.proto.EnableExactlyOnceDelivery = req.Subscription.EnableExactlyOnceDelivery + for _, st := range sub.streams { + st.enableExactlyOnceDelivery = req.Subscription.EnableExactlyOnceDelivery + } default: return nil, status.Errorf(codes.InvalidArgument, "unknown field name %q", path) @@ -930,6 +933,7 @@ func (s *GServer) StreamingPull(sps pb.Subscriber_StreamingPullServer) error { } // Create a new stream to handle the pull. st := sub.newStream(sps, s.streamTimeout) + st.ackTimeout = time.Duration(req.StreamAckDeadlineSeconds) * time.Second err = st.pull(&s.wg) sub.deleteStream(st) return err @@ -1141,12 +1145,14 @@ func (s *subscription) maintainMessages(now time.Time) { func (s *subscription) newStream(gs pb.Subscriber_StreamingPullServer, timeout time.Duration) *stream { st := &stream{ - sub: s, - done: make(chan struct{}), - msgc: make(chan *pb.ReceivedMessage), - gstream: gs, - ackTimeout: s.ackTimeout, - timeout: timeout, + sub: s, + done: make(chan struct{}), + msgc: make(chan *pb.ReceivedMessage), + gstream: gs, + ackTimeout: s.ackTimeout, + timeout: timeout, + enableExactlyOnceDelivery: s.proto.EnableExactlyOnceDelivery, + enableOrdering: s.proto.EnableMessageOrdering, } s.mu.Lock() s.streams = append(s.streams, st) @@ -1223,12 +1229,14 @@ func (m *message) makeAvailable() { } type stream struct { - sub *subscription - done chan struct{} // closed when the stream is finished - msgc chan *pb.ReceivedMessage - gstream pb.Subscriber_StreamingPullServer - ackTimeout time.Duration - timeout time.Duration + sub *subscription + done chan struct{} // closed when the stream is finished + msgc chan *pb.ReceivedMessage + gstream pb.Subscriber_StreamingPullServer + ackTimeout time.Duration + timeout time.Duration + enableExactlyOnceDelivery bool + enableOrdering bool } // pull manages the StreamingPull interaction for the life of the stream. @@ -1266,7 +1274,13 @@ func (st *stream) sendLoop() error { case <-st.done: return nil case rm := <-st.msgc: - res := &pb.StreamingPullResponse{ReceivedMessages: []*pb.ReceivedMessage{rm}} + res := &pb.StreamingPullResponse{ + ReceivedMessages: []*pb.ReceivedMessage{rm}, + SubscriptionProperties: &pb.StreamingPullResponse_SubscriptionProperties{ + ExactlyOnceDeliveryEnabled: st.enableExactlyOnceDelivery, + MessageOrderingEnabled: st.enableOrdering, + }, + } if err := st.gstream.Send(res); err != nil { return err } diff --git a/pubsub/pstest/fake_test.go b/pubsub/pstest/fake_test.go index d061cc63aab..74efdd61d61 100644 --- a/pubsub/pstest/fake_test.go +++ b/pubsub/pstest/fake_test.go @@ -1171,7 +1171,10 @@ func mustStartStreamingPull(ctx context.Context, t *testing.T, sc pb.SubscriberC if err != nil { t.Fatal(err) } - if err := spc.Send(&pb.StreamingPullRequest{Subscription: sub.Name}); err != nil { + if err := spc.Send(&pb.StreamingPullRequest{ + Subscription: sub.Name, + StreamAckDeadlineSeconds: sub.GetAckDeadlineSeconds(), + }); err != nil { t.Fatal(err) } return spc @@ -1456,6 +1459,49 @@ func TestTopicRetentionAdmin(t *testing.T) { } } +func TestStreaming_SubscriptionProperties(t *testing.T) { + ctx := context.Background() + pc, sc, s, cleanup := newFake(ctx, t) + defer cleanup() + + top := mustCreateTopic(ctx, t, pc, &pb.Topic{ + Name: "projects/P/topics/T", + }) + + sub := mustCreateSubscription(ctx, t, sc, &pb.Subscription{ + AckDeadlineSeconds: 10, + Name: "projects/P/subscriptions/S", + Topic: top.Name, + EnableMessageOrdering: true, + EnableExactlyOnceDelivery: true, + }) + + spc := mustStartStreamingPull(ctx, t, sc, sub) + + s.Publish("projects/P/topics/T", []byte("hello"), nil) + + res, err := spc.Recv() + if err != nil { + t.Fatalf("spc.Recv() got err: %v", err) + } + sp := res.GetSubscriptionProperties() + if !sp.GetExactlyOnceDeliveryEnabled() { + t.Fatalf("expected exactly once delivery to be enabled in StreamingPullResponse") + } + if !sp.GetMessageOrderingEnabled() { + t.Fatalf("expected message ordering to be enabled in StreamingPullResponse") + } + + // Close the stream. + if err := spc.CloseSend(); err != nil { + t.Fatal(err) + } + res, err = spc.Recv() + if err != io.EOF { + t.Fatalf("Recv returned <%v> instead of EOF; res = %v", err, res) + } +} + func TestSubscriptionPushPull(t *testing.T) { ctx := context.Background() pclient, sclient, _, cleanup := newFake(ctx, t) diff --git a/pubsub/service.go b/pubsub/service.go index e8d636a01bb..928a77d3b1c 100644 --- a/pubsub/service.go +++ b/pubsub/service.go @@ -106,3 +106,27 @@ func (r *publishRetryer) Retry(err error) (pause time.Duration, shouldRetry bool } return r.defaultRetryer.Retry(err) } + +var ( + exactlyOnceDeliveryTemporaryRetryErrors = map[codes.Code]struct{}{ + codes.DeadlineExceeded: {}, + codes.ResourceExhausted: {}, + codes.Aborted: {}, + codes.Internal: {}, + codes.Unavailable: {}, + } +) + +// contains checks if grpc code v is in t, a set of retryable error codes. +func contains(v codes.Code, t map[codes.Code]struct{}) bool { + _, ok := t[v] + return ok +} + +func newExactlyOnceBackoff() gax.Backoff { + return gax.Backoff{ + Initial: 1 * time.Second, + Max: 64 * time.Second, + Multiplier: 2, + } +} diff --git a/pubsub/streaming_pull_test.go b/pubsub/streaming_pull_test.go index b3af5f4649c..c4290ea641c 100644 --- a/pubsub/streaming_pull_test.go +++ b/pubsub/streaming_pull_test.go @@ -90,14 +90,24 @@ func testStreamingPullIteration(t *testing.T, client *Client, server *mockServer if err != nil { t.Fatal(err) } - wantAckh, _ := msgAckHandler(want) + wantAckh, _ := msgAckHandler(want, false) wantAckh.calledDone = true got := gotMap[wantAckh.ackID] if got == nil { t.Errorf("%d: no message for ackID %q", i, wantAckh.ackID) continue } - if !testutil.Equal(got, want, cmp.AllowUnexported(Message{}, psAckHandler{}), cmpopts.IgnoreTypes(time.Time{}, func(string, bool, time.Time) {})) { + opts := []cmp.Option{ + cmp.AllowUnexported(Message{}, psAckHandler{}), + cmpopts.IgnoreTypes( + time.Time{}, + func(string, bool, + *AckResult, time.Time) { + }, + AckResult{}, + ), + } + if !testutil.Equal(got, want, opts...) { t.Errorf("%d: got\n%#v\nwant\n%#v", i, got, want) } } diff --git a/pubsub/subscription.go b/pubsub/subscription.go index 219272c227f..5afd33a69c4 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -25,6 +25,7 @@ import ( "cloud.google.com/go/iam" "cloud.google.com/go/internal/optional" + ipubsub "cloud.google.com/go/internal/pubsub" "cloud.google.com/go/pubsub/internal/scheduler" gax "github.com/googleapis/gax-go/v2" "golang.org/x/sync/errgroup" @@ -383,6 +384,23 @@ type SubscriptionConfig struct { // and will be ignored if sent in a request. TopicMessageRetentionDuration time.Duration + // EnableExactlyOnceDelivery configures Pub/Sub to provide the following guarantees + // for the delivery of a message with a given MessageID on this subscription: + // + // The message sent to a subscriber is guaranteed not to be resent + // before the message's acknowledgement deadline expires. + // An acknowledged message will not be resent to a subscriber. + // + // Note that subscribers may still receive multiple copies of a message + // when `enable_exactly_once_delivery` is true if the message was published + // multiple times by a publisher client. These copies are considered distinct + // by Pub/Sub and have distinct MessageID values. + // + // Lastly, to guarantee messages have been acked or nacked properly, you must + // call Message.AckWithResponse() or Message.NackWithResponse(). These return an + // AckResponse which will be ready if the message has been acked (or failed to be acked). + EnableExactlyOnceDelivery bool + // State indicates whether or not the subscription can receive messages. // This is an output-only field that indicates whether or not the subscription can // receive messages. This field is set only in responses from the server; @@ -432,20 +450,21 @@ func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription { pbRetryPolicy = cfg.RetryPolicy.toProto() } return &pb.Subscription{ - Name: name, - Topic: cfg.Topic.name, - PushConfig: pbPushConfig, - BigqueryConfig: pbBigQueryConfig, - AckDeadlineSeconds: trunc32(int64(cfg.AckDeadline.Seconds())), - RetainAckedMessages: cfg.RetainAckedMessages, - MessageRetentionDuration: retentionDuration, - Labels: cfg.Labels, - ExpirationPolicy: expirationPolicyToProto(cfg.ExpirationPolicy), - EnableMessageOrdering: cfg.EnableMessageOrdering, - DeadLetterPolicy: pbDeadLetter, - Filter: cfg.Filter, - RetryPolicy: pbRetryPolicy, - Detached: cfg.Detached, + Name: name, + Topic: cfg.Topic.name, + PushConfig: pbPushConfig, + BigqueryConfig: pbBigQueryConfig, + AckDeadlineSeconds: trunc32(int64(cfg.AckDeadline.Seconds())), + RetainAckedMessages: cfg.RetainAckedMessages, + MessageRetentionDuration: retentionDuration, + Labels: cfg.Labels, + ExpirationPolicy: expirationPolicyToProto(cfg.ExpirationPolicy), + EnableMessageOrdering: cfg.EnableMessageOrdering, + DeadLetterPolicy: pbDeadLetter, + Filter: cfg.Filter, + RetryPolicy: pbRetryPolicy, + Detached: cfg.Detached, + EnableExactlyOnceDelivery: cfg.EnableExactlyOnceDelivery, } } @@ -474,6 +493,7 @@ func protoToSubscriptionConfig(pbSub *pb.Subscription, c *Client) (SubscriptionC RetryPolicy: rp, Detached: pbSub.Detached, TopicMessageRetentionDuration: pbSub.TopicMessageRetentionDuration.AsDuration(), + EnableExactlyOnceDelivery: pbSub.EnableExactlyOnceDelivery, State: SubscriptionState(pbSub.State), } if pc := protoToPushConfig(pbSub.PushConfig); pc != nil { @@ -694,9 +714,10 @@ type ReceiveSettings struct { // Deprecated. // Previously, users might use Synchronous mode since StreamingPull had a limitation // where MaxOutstandingMessages was not always respected with large batches of - // small messsages. With server side flow control, this is no longer an issue + // small messages. With server side flow control, this is no longer an issue // and we recommend switching to the default StreamingPull mode by setting // Synchronous to false. + // Synchronous mode does not work with exactly once delivery. Synchronous bool } @@ -795,6 +816,9 @@ type SubscriptionConfigToUpdate struct { // (to redeliver messages as soon as possible) use a pointer to the zero value // for this struct. RetryPolicy *RetryPolicy + + // If set, EnableExactlyOnce is changed. + EnableExactlyOnceDelivery optional.Bool } // Update changes an existing subscription according to the fields set in cfg. @@ -855,6 +879,10 @@ func (s *Subscription) updateRequest(cfg *SubscriptionConfigToUpdate) *pb.Update psub.RetryPolicy = cfg.RetryPolicy.toProto() paths = append(paths, "retry_policy") } + if cfg.EnableExactlyOnceDelivery != nil { + psub.EnableExactlyOnceDelivery = optional.ToBool(cfg.EnableExactlyOnceDelivery) + paths = append(paths, "enable_exactly_once_delivery") + } return &pb.UpdateSubscriptionRequest{ Subscription: psub, UpdateMask: &fmpb.FieldMask{Paths: paths}, @@ -1134,12 +1162,14 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes // Return nil if the context is done, not err. return nil } - ackh, _ := msgAckHandler(msg) + iter.eoMu.RLock() + ackh, _ := msgAckHandler(msg, iter.enableExactlyOnceDelivery) + iter.eoMu.RUnlock() old := ackh.doneFunc msgLen := len(msg.Data) - ackh.doneFunc = func(ackID string, ack bool, receiveTime time.Time) { + ackh.doneFunc = func(ackID string, ack bool, r *ipubsub.AckResult, receiveTime time.Time) { defer fc.release(ctx, msgLen) - old(ackID, ack, receiveTime) + old(ackID, ack, r, receiveTime) } wg.Add(1) // Make sure the subscription has ordering enabled before adding to scheduler. diff --git a/pubsub/subscription_test.go b/pubsub/subscription_test.go index 7d4ac9ee28c..ed758c9be25 100644 --- a/pubsub/subscription_test.go +++ b/pubsub/subscription_test.go @@ -16,7 +16,9 @@ package pubsub import ( "context" + "errors" "fmt" + "sync" "testing" "time" @@ -28,6 +30,7 @@ import ( pb "google.golang.org/genproto/googleapis/pubsub/v1" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" @@ -190,7 +193,8 @@ func TestUpdateSubscription(t *testing.T) { Audience: "client-12345", }, }, - State: SubscriptionStateActive, + EnableExactlyOnceDelivery: false, + State: SubscriptionStateActive, } opt := cmpopts.IgnoreUnexported(SubscriptionConfig{}) if !testutil.Equal(cfg, want, opt) { @@ -209,6 +213,7 @@ func TestUpdateSubscription(t *testing.T) { Audience: "client-12345", }, }, + EnableExactlyOnceDelivery: true, }) if err != nil { t.Fatal(err) @@ -227,7 +232,8 @@ func TestUpdateSubscription(t *testing.T) { Audience: "client-12345", }, }, - State: SubscriptionStateActive, + EnableExactlyOnceDelivery: true, + State: SubscriptionStateActive, } if !testutil.Equal(got, want, opt) { t.Fatalf("\ngot %+v\nwant %+v", got, want) @@ -265,38 +271,59 @@ func TestUpdateSubscription(t *testing.T) { } func TestReceive(t *testing.T) { - testReceive(t, true) - testReceive(t, false) + testReceive(t, true, false) + testReceive(t, false, false) + testReceive(t, false, true) } -func testReceive(t *testing.T, synchronous bool) { - ctx := context.Background() - client, srv := newFake(t) - defer client.Close() - defer srv.Close() - - topic := mustCreateTopic(t, client, "t") - sub, err := client.CreateSubscription(ctx, "s", SubscriptionConfig{Topic: topic}) - if err != nil { - t.Fatal(err) - } - for i := 0; i < 256; i++ { - srv.Publish(topic.name, []byte{byte(i)}, nil) - } - sub.ReceiveSettings.Synchronous = synchronous - msgs, err := pullN(ctx, sub, 256, func(_ context.Context, m *Message) { m.Ack() }) - if c := status.Convert(err); err != nil && c.Code() != codes.Canceled { - t.Fatalf("Pull: %v", err) - } - var seen [256]bool - for _, m := range msgs { - seen[m.Data[0]] = true - } - for i, saw := range seen { - if !saw { - t.Errorf("sync=%t: did not see message #%d", synchronous, i) +func testReceive(t *testing.T, synchronous, exactlyOnceDelivery bool) { + t.Run(fmt.Sprintf("synchronous:%t,exactlyOnceDelivery:%t", synchronous, exactlyOnceDelivery), func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + client, srv := newFake(t) + defer client.Close() + defer srv.Close() + + topic := mustCreateTopic(t, client, "t") + sub, err := client.CreateSubscription(ctx, "s", SubscriptionConfig{ + Topic: topic, + EnableExactlyOnceDelivery: exactlyOnceDelivery, + }) + if err != nil { + t.Fatal(err) } - } + for i := 0; i < 256; i++ { + srv.Publish(topic.name, []byte{byte(i)}, nil) + } + sub.ReceiveSettings.Synchronous = synchronous + msgs, err := pullN(ctx, sub, 256, func(_ context.Context, m *Message) { + if exactlyOnceDelivery { + ar := m.AckWithResult() + // Don't use the above ctx here since that will get cancelled. + ackStatus, err := ar.Get(context.Background()) + if err != nil { + t.Fatalf("pullN err for message(%s): %v", m.ID, err) + } + if ackStatus != AcknowledgeStatusSuccess { + t.Fatalf("pullN got non-success AckStatus: %v", ackStatus) + } + } else { + m.Ack() + } + }) + if c := status.Convert(err); err != nil && c.Code() != codes.Canceled { + t.Fatalf("Pull: %v", err) + } + var seen [256]bool + for _, m := range msgs { + seen[m.Data[0]] = true + } + for i, saw := range seen { + if !saw { + t.Errorf("sync=%t, eod=%t: did not see message #%d", synchronous, exactlyOnceDelivery, i) + } + } + }) } func (t1 *Topic) Equal(t2 *Topic) bool { @@ -313,7 +340,7 @@ func (t1 *Topic) Equal(t2 *Topic) bool { func newFake(t *testing.T) (*Client, *pstest.Server) { ctx := context.Background() srv := pstest.NewServer() - client, err := NewClient(ctx, "P", + client, err := NewClient(ctx, projName, option.WithEndpoint(srv.Addr), option.WithoutAuthentication(), option.WithGRPCDialOption(grpc.WithInsecure())) @@ -439,7 +466,8 @@ func TestOrdering_CreateSubscription(t *testing.T) { } func TestBigQuerySubscription(t *testing.T) { - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() client, srv := newFake(t) defer client.Close() defer srv.Close() @@ -469,3 +497,244 @@ func TestBigQuerySubscription(t *testing.T) { t.Fatalf("CreateBQSubscription mismatch: \n%s", diff) } } + +func TestExactlyOnceDelivery_AckSuccess(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) + client, srv := newFake(t) + defer client.Close() + defer srv.Close() + + topic := mustCreateTopic(t, client, "t") + subConfig := SubscriptionConfig{ + Topic: topic, + EnableExactlyOnceDelivery: true, + } + s, err := client.CreateSubscription(ctx, "s", subConfig) + if err != nil { + t.Fatalf("create sub err: %v", err) + } + s.ReceiveSettings.NumGoroutines = 1 + r := topic.Publish(ctx, &Message{ + Data: []byte("exactly-once-message"), + }) + if _, err := r.Get(ctx); err != nil { + t.Fatalf("failed to publish message: %v", err) + } + + err = s.Receive(ctx, func(ctx context.Context, msg *Message) { + ar := msg.AckWithResult() + s, err := ar.Get(ctx) + if s != AcknowledgeStatusSuccess { + t.Errorf("AckResult AckStatus got %v, want %v", s, AcknowledgeStatusSuccess) + } + if err != nil { + t.Errorf("AckResult error got %v", err) + } + cancel() + }) + if err != nil { + t.Fatalf("s.Receive err: %v", err) + } +} + +func TestExactlyOnceDelivery_AckFailureErrorPermissionDenied(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) + srv := pstest.NewServer(pstest.WithErrorInjection("Acknowledge", codes.PermissionDenied, "insufficient permission")) + client, err := NewClient(ctx, projName, + option.WithEndpoint(srv.Addr), + option.WithoutAuthentication(), + option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials()))) + if err != nil { + t.Fatal(err) + } + defer client.Close() + defer srv.Close() + + topic := mustCreateTopic(t, client, "t") + subConfig := SubscriptionConfig{ + Topic: topic, + EnableExactlyOnceDelivery: true, + } + s, err := client.CreateSubscription(ctx, "s", subConfig) + if err != nil { + t.Fatalf("create sub err: %v", err) + } + s.ReceiveSettings.NumGoroutines = 1 + r := topic.Publish(ctx, &Message{ + Data: []byte("exactly-once-message"), + }) + if _, err := r.Get(ctx); err != nil { + t.Fatalf("failed to publish message: %v", err) + } + err = s.Receive(ctx, func(ctx context.Context, msg *Message) { + ar := msg.AckWithResult() + s, err := ar.Get(ctx) + if s != AcknowledgeStatusPermissionDenied { + t.Errorf("AckResult AckStatus got %v, want %v", s, AcknowledgeStatusPermissionDenied) + } + wantErr := status.Errorf(codes.PermissionDenied, "insufficient permission") + if !errors.Is(err, wantErr) { + t.Errorf("AckResult error\ngot %v\nwant %s", err, wantErr) + } + cancel() + }) + if err != nil { + t.Fatalf("s.Receive err: %v", err) + } +} + +func TestExactlyOnceDelivery_AckRetryDeadlineExceeded(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) + srv := pstest.NewServer(pstest.WithErrorInjection("Acknowledge", codes.Internal, "internal error")) + client, err := NewClient(ctx, projName, + option.WithEndpoint(srv.Addr), + option.WithoutAuthentication(), + option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials()))) + if err != nil { + t.Fatal(err) + } + defer client.Close() + defer srv.Close() + + topic := mustCreateTopic(t, client, "t") + subConfig := SubscriptionConfig{ + Topic: topic, + EnableExactlyOnceDelivery: true, + } + s, err := client.CreateSubscription(ctx, "s", subConfig) + if err != nil { + t.Fatalf("create sub err: %v", err) + } + r := topic.Publish(ctx, &Message{ + Data: []byte("exactly-once-message"), + }) + if _, err := r.Get(ctx); err != nil { + t.Fatalf("failed to publish message: %v", err) + } + + s.ReceiveSettings = ReceiveSettings{ + NumGoroutines: 1, + // This needs to be greater than total deadline otherwise the message will be redelivered. + MinExtensionPeriod: 2 * time.Minute, + } + // Override the default timeout here so this test doesn't take 10 minutes. + exactlyOnceDeliveryRetryDeadline = 1 * time.Minute + err = s.Receive(ctx, func(ctx context.Context, msg *Message) { + ar := msg.AckWithResult() + s, err := ar.Get(ctx) + if s != AcknowledgeStatusOther { + t.Errorf("AckResult AckStatus got %v, want %v", s, AcknowledgeStatusOther) + } + wantErr := context.DeadlineExceeded + if !errors.Is(err, wantErr) { + t.Errorf("AckResult error\ngot %v\nwant %s", err, wantErr) + } + cancel() + }) + if err != nil { + t.Fatalf("s.Receive err: %v", err) + } +} + +func TestExactlyOnceDelivery_NackSuccess(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) + client, srv := newFake(t) + defer client.Close() + defer srv.Close() + + topic := mustCreateTopic(t, client, "t") + subConfig := SubscriptionConfig{ + Topic: topic, + EnableExactlyOnceDelivery: true, + } + s, err := client.CreateSubscription(ctx, "s", subConfig) + if err != nil { + t.Fatalf("create sub err: %v", err) + } + r := topic.Publish(ctx, &Message{ + Data: []byte("exactly-once-message"), + }) + if _, err := r.Get(ctx); err != nil { + t.Fatalf("failed to publish message: %v", err) + } + + s.ReceiveSettings = ReceiveSettings{ + NumGoroutines: 1, + } + err = s.Receive(ctx, func(ctx context.Context, msg *Message) { + ar := msg.NackWithResult() + s, err := ar.Get(ctx) + if s != AcknowledgeStatusSuccess { + t.Errorf("AckResult AckStatus got %v, want %v", s, AcknowledgeStatusSuccess) + } + if err != nil { + t.Errorf("AckResult error got %v", err) + } + cancel() + }) + if err != nil { + t.Fatalf("s.Receive err: %v", err) + } +} + +func TestExactlyOnceDelivery_NackRetry_DeadlineExceeded(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) + srv := pstest.NewServer(pstest.WithErrorInjection("ModifyAckDeadline", codes.Internal, "internal error")) + client, err := NewClient(ctx, projName, + option.WithEndpoint(srv.Addr), + option.WithoutAuthentication(), + option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials()))) + if err != nil { + t.Fatal(err) + } + defer client.Close() + defer srv.Close() + + topic := mustCreateTopic(t, client, "t") + subConfig := SubscriptionConfig{ + Topic: topic, + EnableExactlyOnceDelivery: true, + } + s, err := client.CreateSubscription(ctx, "s", subConfig) + if err != nil { + t.Fatalf("create sub err: %v", err) + } + r := topic.Publish(ctx, &Message{ + Data: []byte("exactly-once-message"), + }) + if _, err := r.Get(ctx); err != nil { + t.Fatalf("failed to publish message: %v", err) + } + + s.ReceiveSettings = ReceiveSettings{ + NumGoroutines: 1, + // This needs to be greater than total deadline otherwise the message will be redelivered. + MinExtensionPeriod: 2 * time.Minute, + MaxExtensionPeriod: 2 * time.Minute, + } + // Override the default timeout here so this test doesn't take 10 minutes. + exactlyOnceDeliveryRetryDeadline = 1 * time.Minute + var once sync.Once + err = s.Receive(ctx, func(ctx context.Context, msg *Message) { + once.Do(func() { + ar := msg.NackWithResult() + s, err := ar.Get(ctx) + if s != AcknowledgeStatusOther { + t.Errorf("AckResult AckStatus got %v, want %v", s, AcknowledgeStatusOther) + } + wantErr := context.DeadlineExceeded + if !errors.Is(err, wantErr) { + t.Errorf("AckResult error\ngot %v\nwant %s", err, wantErr) + } + cancel() + }) + }) + if err != nil { + t.Fatalf("s.Receive err: %v", err) + } +} diff --git a/pubsublite/go.mod b/pubsublite/go.mod index 19d8708db2d..5d5576b71ec 100644 --- a/pubsublite/go.mod +++ b/pubsublite/go.mod @@ -3,8 +3,8 @@ module cloud.google.com/go/pubsublite go 1.17 require ( - cloud.google.com/go v0.102.1 - cloud.google.com/go/pubsub v1.23.0 + cloud.google.com/go v0.103.1-0.20220811221032-e9a4655716d7 + cloud.google.com/go/pubsub v1.24.1-0.20220812182604-346d154f8951 github.com/golang/protobuf v1.5.2 github.com/google/go-cmp v0.5.8 github.com/google/uuid v1.3.0 diff --git a/pubsublite/go.sum b/pubsublite/go.sum index f6e3907a805..fb07fb1de5c 100644 --- a/pubsublite/go.sum +++ b/pubsublite/go.sum @@ -31,6 +31,10 @@ cloud.google.com/go v0.100.2/go.mod h1:4Xra9TjzAeYHrl5+oeLlzbM2k3mjVhZh4UqTZ//w9 cloud.google.com/go v0.102.0/go.mod h1:oWcCzKlqJ5zgHQt9YsaeTY9KzIvjyy0ArmiBUgpQ+nc= cloud.google.com/go v0.102.1 h1:vpK6iQWv/2uUeFJth4/cBHsQAGjn1iIE6AAlxipRaA0= cloud.google.com/go v0.102.1/go.mod h1:XZ77E9qnTEnrgEOvr4xzfdX5TRo7fB4T2F4O6+34hIU= +cloud.google.com/go v0.103.1-0.20220811221032-e9a4655716d7 h1:7onknDbpSpdyZh7QqM8uAKABsO0BvQp2/xsTa/5Kc7U= +cloud.google.com/go v0.103.1-0.20220811221032-e9a4655716d7/go.mod h1:NLrhOsUNo9AkDWHvqHE5lVuGkPZGr7eTs4thmfrKHyw= +cloud.google.com/go v0.103.1-0.20220812182604-346d154f8951 h1:SiwVjzxhXtsQfcnyuRLiiTsQ1rVZLL66+zvrO4KuDUk= +cloud.google.com/go v0.103.1-0.20220812182604-346d154f8951/go.mod h1:NLrhOsUNo9AkDWHvqHE5lVuGkPZGr7eTs4thmfrKHyw= cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o= cloud.google.com/go/bigquery v1.3.0/go.mod h1:PjpwJnslEMmckchkHFfq+HTD2DmtT67aNFKH1/VBDHE= cloud.google.com/go/bigquery v1.4.0/go.mod h1:S8dzgnTigyfTmLBfrtrhyYhwRxG72rYxvftPBK2Dvzc= @@ -57,6 +61,8 @@ cloud.google.com/go/pubsub v1.2.0/go.mod h1:jhfEVHT8odbXTkndysNHCcx0awwzvfOlguIA cloud.google.com/go/pubsub v1.3.1/go.mod h1:i+ucay31+CNRpDW4Lu78I4xXG+O1r/MAHgjpRVR+TSU= cloud.google.com/go/pubsub v1.23.0 h1:DAaD+gG3tUkdq/VdOuI9s4g84w8ia7z/CcLkhprcMew= cloud.google.com/go/pubsub v1.23.0/go.mod h1:XUpUURgUDXYVGARZBmwHbfcVdMo4EVtRhSLlzBbmmf0= +cloud.google.com/go/pubsub v1.24.1-0.20220812182604-346d154f8951 h1:tAvVJCJOJEDatSWO1aOCuW2gimXXnhVsUI612HNaD+s= +cloud.google.com/go/pubsub v1.24.1-0.20220812182604-346d154f8951/go.mod h1:41OKucA1m7M3sCzzZ1/cqk90rClYZvF4WrKyFMTMEaI= cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw= cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0ZeosJ0Rtdos= cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk= diff --git a/pubsublite/pscompat/subscriber.go b/pubsublite/pscompat/subscriber.go index 9f13ec4d240..3aacf17674e 100644 --- a/pubsublite/pscompat/subscriber.go +++ b/pubsublite/pscompat/subscriber.go @@ -75,6 +75,26 @@ func (ah *pslAckHandler) OnNack() { ah.subInstance = nil } +// OnAckWithResult is required implementation for the ack handler +// for Cloud Pub/Sub's exactly once delivery feature. This will +// ack the message and return an AckResult that always resolves to success. +func (ah *pslAckHandler) OnAckWithResult() *ipubsub.AckResult { + ah.OnAck() + ar := ipubsub.NewAckResult() + ipubsub.SetAckResult(ar, ipubsub.AcknowledgeStatusSuccess, nil) + return ar +} + +// OnNackWithResult is required implementation for the ack handler +// for Cloud Pub/Sub's exactly once delivery feature. This will +// nack the message and return an AckResult that always resolves to success. +func (ah *pslAckHandler) OnNackWithResult() *ipubsub.AckResult { + ah.OnNack() + ar := ipubsub.NewAckResult() + ipubsub.SetAckResult(ar, ipubsub.AcknowledgeStatusSuccess, nil) + return ar +} + // wireSubscriberFactory is a factory for creating wire subscribers, which can // be overridden with a mock in unit tests. type wireSubscriberFactory interface {