Skip to content

Commit

Permalink
feat(pubsub): support exactly once delivery (#6506)
Browse files Browse the repository at this point in the history
* feat(pubsub): prepare iterator for exactly once (#6040)

* feat(pubsub): read exactly once for SubscriptionProperties

* rename vars to be specific this is exactly once delivery

* feat(pubsub): send stream ack deadline seconds on exactly once change #6157 (#6162)

* add RWMutex for guarding exactly once bool

* feat(pubsub): send stream ack deadline seconds on exactly once change

* remove extra test

* feat(pubsub): add AckWithResult and NackWithResult to message (#6201)

* add AckResult and related methods

* feat(pubsub): add AckWithResult and NackWithResult to message

* feat(pubsub): add AckWithResult and NackWithResult to message

* add comments for AckResult and bring over AcknowledgeStatus from internal

* update function definition for IgnoreExported in tests

* temporarily update internal/pubsub for samples test

* change enum naming to AcknowledgeStatus

* remove extra enums in temp internal message.go

* remove internal/pubsub/message.go

* fix style issues with variadic function options

* add back comment format to exported const

* keep track of AckResults if exactly once is enabled

* feat(pubsub): add helper method for parsing ErrorInfos (#6281)

* add AckResult and related methods

* feat(pubsub): add AckWithResult and NackWithResult to message

* feat(pubsub): add AckWithResult and NackWithResult to message

* add comments for AckResult and bring over AcknowledgeStatus from internal

* update function definition for IgnoreExported in tests

* temporarily update internal/pubsub for samples test

* add process results

* change enum naming to AcknowledgeStatus

* remove extra enums in temp internal message.go

* remove internal/pubsub/message.go

* add process results

* update process info with new enum names

* add tests to process error info

* add process results

* update process info with new enum names

* add process results

* add tests to process error info

* clean up iterator from merge

* cleanup comments

* add list of retriable errors to test

* simplify testing of completed/retry slice lengths

* remove getStatus/ackErrors methods

* address code review comments

* remove error string conversion step

* feat(pubsub): complete AckResult for exactly once (#6387)

* refactor sendAck to pipe errors to AckResult map

* rewrite sendAck/sendModAck for exactly once

* add AckResult to list of uncompared methods

* use ackResultWithID in all locations

* feat(pubsub): retry temporary failures for ack/modacks (#6485)

* retry acks in goroutine

* retry acks/modacks with transient errors

* add retry test

* add nack tests and support shorter timeouts

* add integration tests

* remove extra comment

* add commnets to ack/modack methods in iterator

* remove transient invalid ack id error string

* reduce number of mutex locks

* pass in StreamAckDeadline seconds for streaming pull requests in fake_test

* fix lint issues

* add changes to internal/pubsub/message

* implement default ack handler functions in lite

* use pubsub package ack result

* use pinned library for pubsublite

* resolve all lite Ack/NackWithResult to success
  • Loading branch information
hongalex committed Aug 22, 2022
1 parent 57c2a9e commit 74da335
Show file tree
Hide file tree
Showing 16 changed files with 1,410 additions and 326 deletions.
124 changes: 124 additions & 0 deletions internal/pubsub/message.go
Expand Up @@ -14,6 +14,7 @@
package pubsub

import (
"context"
"time"
)

Expand All @@ -24,6 +25,14 @@ type AckHandler interface {

// OnNack processes a message nack.
OnNack()

// OnAckWithResult processes a message ack and returns
// a result that shows if it succeeded.
OnAckWithResult() *AckResult

// OnNackWithResult processes a message nack and returns
// a result that shows if it succeeded.
OnNackWithResult() *AckResult
}

// Message represents a Pub/Sub message.
Expand Down Expand Up @@ -85,6 +94,121 @@ func (m *Message) Nack() {
}
}

// AcknowledgeStatus represents the status of an Ack or Nack request.
type AcknowledgeStatus int

const (
// AcknowledgeStatusSuccess indicates the request was a success.
AcknowledgeStatusSuccess AcknowledgeStatus = iota
// AcknowledgeStatusPermissionDenied indicates the caller does not have sufficient permissions.
AcknowledgeStatusPermissionDenied
// AcknowledgeStatusFailedPrecondition indicates the request encountered a FailedPrecondition error.
AcknowledgeStatusFailedPrecondition
// AcknowledgeStatusInvalidAckID indicates one or more of the ack IDs sent were invalid.
AcknowledgeStatusInvalidAckID
// AcknowledgeStatusOther indicates another unknown error was returned.
AcknowledgeStatusOther
)

// AckResult holds the result from a call to Ack or Nack.
type AckResult struct {
ready chan struct{}
res AcknowledgeStatus
err error
}

// Ready returns a channel that is closed when the result is ready.
// When the Ready channel is closed, Get is guaranteed not to block.
func (r *AckResult) Ready() <-chan struct{} { return r.ready }

// Get returns the status and/or error result of a Ack, Nack, or Modack call.
// Get blocks until the Ack/Nack completes or the context is done.
func (r *AckResult) Get(ctx context.Context) (res AcknowledgeStatus, err error) {
// If the result is already ready, return it even if the context is done.
select {
case <-r.Ready():
return r.res, r.err
default:
}
select {
case <-ctx.Done():
// Explicitly return AcknowledgeStatusOther for context cancelled cases,
// since the default is success.
return AcknowledgeStatusOther, ctx.Err()
case <-r.Ready():
return r.res, r.err
}
}

// NewAckResult creates a AckResult.
func NewAckResult() *AckResult {
return &AckResult{
ready: make(chan struct{}),
}
}

// SetAckResult sets the ack response and error for a ack result and closes
// the Ready channel. Any call after the first for the same AckResult
// is a no-op.
func SetAckResult(r *AckResult, res AcknowledgeStatus, err error) {
select {
case <-r.Ready():
return
default:
r.res = res
r.err = err
close(r.ready)
}
}

// AckWithResult acknowledges a message in Pub/Sub and it will not be
// delivered to this subscription again.
//
// You should avoid acknowledging messages until you have
// *finished* processing them, so that in the event of a failure,
// you receive the message again.
//
// If exactly-once delivery is enabled on the subscription, the
// AckResult returned by this method tracks the state of acknowledgement
// operation. If the operation completes successfully, the message is
// guaranteed NOT to be re-delivered. Otherwise, the result will
// contain an error with more details about the failure and the
// message may be re-delivered.
//
// If exactly-once delivery is NOT enabled on the subscription, or
// if using Pub/Sub Lite, AckResult readies immediately with a AcknowledgeStatus.Success.
// Since acks in Cloud Pub/Sub are best effort when exactly-once
// delivery is disabled, the message may be re-delivered. Because
// re-deliveries are possible, you should ensure that your processing
// code is idempotent, as you may receive any given message more than
// once.
func (m *Message) AckWithResult() *AckResult {
if m.ackh != nil {
return m.ackh.OnAckWithResult()
}
return nil
}

// NackWithResult declines to acknowledge the message which indicates that
// the client will not or cannot process a Message. This will cause the message
// to be re-delivered to subscribers. Re-deliveries may take place immediately
// or after a delay.
//
// If exactly-once delivery is enabled on the subscription, the
// AckResult returned by this method tracks the state of nack
// operation. If the operation completes successfully, the result will
// contain AckResponse.Success. Otherwise, the result will contain an error
// with more details about the failure.
//
// If exactly-once delivery is NOT enabled on the subscription, or
// if using Pub/Sub Lite, AckResult readies immediately with a AcknowledgeStatus.Success.
func (m *Message) NackWithResult() *AckResult {
if m.ackh != nil {
return m.ackh.OnNackWithResult()
}
return nil
}

// NewMessage creates a message with an AckHandler implementation, which should
// not be nil.
func NewMessage(ackh AckHandler) *Message {
Expand Down
2 changes: 1 addition & 1 deletion pubsub/go.mod
Expand Up @@ -3,7 +3,7 @@ module cloud.google.com/go/pubsub
go 1.17

require (
cloud.google.com/go v0.102.1
cloud.google.com/go v0.102.1-0.20220708235547-f3d2cc2c987e
cloud.google.com/go/iam v0.3.0
cloud.google.com/go/kms v1.4.0
github.com/golang/protobuf v1.5.2
Expand Down
5 changes: 3 additions & 2 deletions pubsub/go.sum
Expand Up @@ -29,8 +29,8 @@ cloud.google.com/go v0.99.0/go.mod h1:w0Xx2nLzqWJPuozYQX+hFfCSI8WioryfRDzkoI/Y2Z
cloud.google.com/go v0.100.1/go.mod h1:fs4QogzfH5n2pBXBP9vRiU+eCny7lD2vmFZy79Iuw1U=
cloud.google.com/go v0.100.2/go.mod h1:4Xra9TjzAeYHrl5+oeLlzbM2k3mjVhZh4UqTZ//w99A=
cloud.google.com/go v0.102.0/go.mod h1:oWcCzKlqJ5zgHQt9YsaeTY9KzIvjyy0ArmiBUgpQ+nc=
cloud.google.com/go v0.102.1 h1:vpK6iQWv/2uUeFJth4/cBHsQAGjn1iIE6AAlxipRaA0=
cloud.google.com/go v0.102.1/go.mod h1:XZ77E9qnTEnrgEOvr4xzfdX5TRo7fB4T2F4O6+34hIU=
cloud.google.com/go v0.102.1-0.20220708235547-f3d2cc2c987e h1:GZ9rHNbN2TY+p6/dTeU0EADYrOc3BCqy/KwGPZHLsdA=
cloud.google.com/go v0.102.1-0.20220708235547-f3d2cc2c987e/go.mod h1:mqs3bFXrt/gPc6aOZpchX8DEdQhuJluA/7LZNutd2Nc=
cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o=
cloud.google.com/go/bigquery v1.3.0/go.mod h1:PjpwJnslEMmckchkHFfq+HTD2DmtT67aNFKH1/VBDHE=
cloud.google.com/go/bigquery v1.4.0/go.mod h1:S8dzgnTigyfTmLBfrtrhyYhwRxG72rYxvftPBK2Dvzc=
Expand Down Expand Up @@ -594,6 +594,7 @@ google.golang.org/genproto v0.0.0-20220505152158-f39f71e6c8f3/go.mod h1:RAyBrSAP
google.golang.org/genproto v0.0.0-20220518221133-4f43b3371335/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4=
google.golang.org/genproto v0.0.0-20220523171625-347a074981d8/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4=
google.golang.org/genproto v0.0.0-20220608133413-ed9918b62aac/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
google.golang.org/genproto v0.0.0-20220615141314-f1464d18c36b/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
google.golang.org/genproto v0.0.0-20220616135557-88e70c0c3a90/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
google.golang.org/genproto v0.0.0-20220617124728-180714bec0ad/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
google.golang.org/genproto v0.0.0-20220624142145-8cd45d7dbd1f/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
Expand Down

0 comments on commit 74da335

Please sign in to comment.