From 48aa1ce16971d223db372c0c25b8e74787343163 Mon Sep 17 00:00:00 2001 From: tmdiep Date: Wed, 9 Dec 2020 14:21:04 +1100 Subject: [PATCH] refactor(pubsub): use Message and PublishResult in internal/pubsub (#3231) Replaces Message and PublishResult with aliases of types from internal/pubsub. Refactors all usage. --- pubsub/go.mod | 2 +- pubsub/go.sum | 7 ++ pubsub/integration_test.go | 18 ++--- pubsub/iterator.go | 16 ++-- pubsub/message.go | 145 +++++++++++++--------------------- pubsub/service.go | 14 ---- pubsub/streaming_pull_test.go | 14 ++-- pubsub/subscription.go | 2 +- pubsub/subscription_test.go | 4 +- pubsub/topic.go | 61 +++++--------- 10 files changed, 106 insertions(+), 177 deletions(-) diff --git a/pubsub/go.mod b/pubsub/go.mod index 22e6697a6ac..e0399a6148d 100644 --- a/pubsub/go.mod +++ b/pubsub/go.mod @@ -3,7 +3,7 @@ module cloud.google.com/go/pubsub go 1.11 require ( - cloud.google.com/go v0.72.0 + cloud.google.com/go v0.73.0 github.com/golang/protobuf v1.4.3 github.com/google/go-cmp v0.5.4 github.com/googleapis/gax-go/v2 v2.0.5 diff --git a/pubsub/go.sum b/pubsub/go.sum index 518c64ae340..eb387598238 100644 --- a/pubsub/go.sum +++ b/pubsub/go.sum @@ -19,6 +19,8 @@ cloud.google.com/go v0.65.0 h1:Dg9iHVQfrhq82rUNu9ZxUDrJLaxFUe/HlCVaLyRruq8= cloud.google.com/go v0.65.0/go.mod h1:O5N8zS7uWy9vkA9vayVHs65eM1ubvY4h553ofrNHObY= cloud.google.com/go v0.72.0 h1:eWRCuwubtDrCJG0oSUMgnsbD4CmPFQF2ei4OFbXvwww= cloud.google.com/go v0.72.0/go.mod h1:M+5Vjvlc2wnp6tjzE102Dw08nGShTscUx2nZMufOKPI= +cloud.google.com/go v0.73.0 h1:sGvc4e0Cmm4+DKQR76a9VwNukpacQK8TOl5pDl0Pcn0= +cloud.google.com/go v0.73.0/go.mod h1:BkDh9dFvGjCitVw03TNjKbBxXNKULXXIq6orU6HrJ4Q= cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o= cloud.google.com/go/bigquery v1.3.0/go.mod h1:PjpwJnslEMmckchkHFfq+HTD2DmtT67aNFKH1/VBDHE= cloud.google.com/go/bigquery v1.4.0/go.mod h1:S8dzgnTigyfTmLBfrtrhyYhwRxG72rYxvftPBK2Dvzc= @@ -119,6 +121,7 @@ github.com/google/pprof v0.0.0-20200229191704-1ebb73c60ed3/go.mod h1:ZgVRPoUq/hf github.com/google/pprof v0.0.0-20200430221834-fc25d7d30c6d/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= github.com/google/pprof v0.0.0-20201023163331-3e6fc7fc9c4c/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= +github.com/google/pprof v0.0.0-20201117184057-ae444373da19/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= @@ -230,6 +233,8 @@ golang.org/x/net v0.0.0-20201021035429-f5854403a974 h1:IX6qOQeG5uLjB/hjjwjedwfjN golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201031054903-ff519b6c9102 h1:42cLlJJdEh+ySyeUUbEQ5bsTiq8voBeTuweGVkY6Puw= golang.org/x/net v0.0.0-20201031054903-ff519b6c9102/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20201110031124-69a78807bb2b h1:uwuIcX0g4Yl1NC5XAz37xsr2lTtcqevgzYNVt49waME= +golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -352,6 +357,7 @@ golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc golang.org/x/tools v0.0.0-20200904185747-39188db58858/go.mod h1:Cj7w3i3Rnn0Xh82ur9kSqwfTHTeVxaDqrfMjpcNT6bE= golang.org/x/tools v0.0.0-20201110124207-079ba7bd75cd/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201201161351-ac6f37ff4c2a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.0.0-20201202200335-bef1c476418a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201204162204-73cf035baebf h1:LJkCozzIEY51bepolJQN3tP938NA5mMucF2dDJ9AMNA= golang.org/x/tools v0.0.0-20201204162204-73cf035baebf/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -420,6 +426,7 @@ google.golang.org/genproto v0.0.0-20200825200019-8632dd797987/go.mod h1:FWY/as6D google.golang.org/genproto v0.0.0-20200904004341-0bd0a958aa1d/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20201109203340-2640f1f9cdfb/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20201201144952-b05cb90ed32e/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= +google.golang.org/genproto v0.0.0-20201203001206-6486ece9c497/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20201204160425-06b3db808446 h1:65ppmIPdaZE+BO34gntwqexoTYr30IRNGmS0OGOHu3A= google.golang.org/genproto v0.0.0-20201204160425-06b3db808446/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= diff --git a/pubsub/integration_test.go b/pubsub/integration_test.go index f2c9c21e834..8ebe7bd4fa9 100644 --- a/pubsub/integration_test.go +++ b/pubsub/integration_test.go @@ -1149,9 +1149,8 @@ func TestIntegration_OrderedKeys_Basic(t *testing.T) { OrderingKey: orderingKey, }) go func() { - <-r.ready - if r.err != nil { - t.Error(r.err) + if _, err := r.Get(ctx); err != nil { + t.Error(err) } }() } @@ -1320,8 +1319,7 @@ func TestIntegration_OrderedKeys_ResumePublish(t *testing.T) { Data: bytes.Repeat([]byte("A"), 1000), OrderingKey: orderingKey, }) - <-r.ready - if r.err == nil { + if _, err := r.Get(ctx); err == nil { t.Fatalf("expected bundle byte limit error, got nil") } // Publish a normal sized message now, which should fail @@ -1330,9 +1328,8 @@ func TestIntegration_OrderedKeys_ResumePublish(t *testing.T) { Data: []byte("should fail"), OrderingKey: orderingKey, }) - <-r.ready - if r.err == nil || !strings.Contains(r.err.Error(), "pubsub: Publishing for ordering key") { - t.Fatalf("expected ordering keys publish error, got %v", r.err) + if _, err := r.Get(ctx); err == nil || !strings.Contains(err.Error(), "pubsub: Publishing for ordering key") { + t.Fatalf("expected ordering keys publish error, got %v", err) } // Lastly, call ResumePublish and make sure subsequent publishes succeed. @@ -1341,9 +1338,8 @@ func TestIntegration_OrderedKeys_ResumePublish(t *testing.T) { Data: []byte("should succeed"), OrderingKey: orderingKey, }) - <-r.ready - if r.err != nil { - t.Fatalf("got error while publishing message: %v", r.err) + if _, err := r.Get(ctx); err != nil { + t.Fatalf("got error while publishing message: %v", err) } } diff --git a/pubsub/iterator.go b/pubsub/iterator.go index f63f60d5aaa..dcca633aae4 100644 --- a/pubsub/iterator.go +++ b/pubsub/iterator.go @@ -205,7 +205,8 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) { return nil, it.fail(err) } recordStat(it.ctx, PullCount, int64(len(rmsgs))) - msgs, err := convertMessages(rmsgs) + now := time.Now() + msgs, err := convertMessages(rmsgs, now, it.done) if err != nil { return nil, it.fail(err) } @@ -214,17 +215,14 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) { maxExt := time.Now().Add(it.po.maxExtension) ackIDs := map[string]bool{} it.mu.Lock() - now := time.Now() for _, m := range msgs { - ackh, _ := m.ackHandler() - ackh.receiveTime = now - addRecv(m.ID, ackh.ackID, now) - ackh.doneFunc = it.done - it.keepAliveDeadlines[ackh.ackID] = maxExt + ackID := msgAckID(m) + addRecv(m.ID, ackID, now) + it.keepAliveDeadlines[ackID] = maxExt // Don't change the mod-ack if the message is going to be nacked. This is // possible if there are retries. - if !it.pendingNacks[ackh.ackID] { - ackIDs[ackh.ackID] = true + if !it.pendingNacks[ackID] { + ackIDs[ackID] = true } } deadline := it.ackDeadline() diff --git a/pubsub/message.go b/pubsub/message.go index c09a7e51c40..4aa815ed7f8 100644 --- a/pubsub/message.go +++ b/pubsub/message.go @@ -15,55 +15,61 @@ package pubsub import ( + "fmt" "time" + ipubsub "cloud.google.com/go/internal/pubsub" "github.com/golang/protobuf/ptypes" pb "google.golang.org/genproto/googleapis/pubsub/v1" ) // Message represents a Pub/Sub message. -type Message struct { - // ID identifies this message. This ID is assigned by the server and is - // populated for Messages obtained from a subscription. - // - // This field is read-only. - ID string - - // Data is the actual data in the message. - Data []byte - - // Attributes represents the key-value pairs the current message is - // labelled with. - Attributes map[string]string - - // PublishTime is the time at which the message was published. This is - // populated by the server for Messages obtained from a subscription. - // - // This field is read-only. - PublishTime time.Time - - // DeliveryAttempt is the number of times a message has been delivered. - // This is part of the dead lettering feature that forwards messages that - // fail to be processed (from nack/ack deadline timeout) to a dead letter topic. - // If dead lettering is enabled, this will be set on all attempts, starting - // with value 1. Otherwise, the value will be nil. - // This field is read-only. - DeliveryAttempt *int - - // size is the approximate size of the message's data and attributes. - size int - - // OrderingKey identifies related messages for which publish order should - // be respected. If empty string is used, message will be sent unordered. - OrderingKey string - - // ackh handles Ack() or Nack(). - ackh ackHandler +// +// Message can be passed to Topic.Publish for publishing. +// +// If received in the callback passed to Subscription.Receive, client code must +// call Message.Ack or Message.Nack when finished processing the Message. Calls +// to Ack or Nack have no effect after the first call. +// +// Ack indicates successful processing of a Message. If message acknowledgement +// fails, the Message will be redelivered. Nack indicates that the client will +// not or cannot process a Message. Nack will result in the Message being +// redelivered more quickly than if it were allowed to expire. +type Message = ipubsub.Message + +// msgAckHandler performs a safe cast of the message's ack handler to psAckHandler. +func msgAckHandler(m *Message) (*psAckHandler, bool) { + ackh, ok := ipubsub.MessageAckHandler(m).(*psAckHandler) + return ackh, ok +} + +func msgAckID(m *Message) string { + if ackh, ok := msgAckHandler(m); ok { + return ackh.ackID + } + return "" } -func toMessage(resp *pb.ReceivedMessage) (*Message, error) { +// The done method of the iterator that created a Message. +type iterDoneFunc func(string, bool, time.Time) + +func convertMessages(rms []*pb.ReceivedMessage, receiveTime time.Time, doneFunc iterDoneFunc) ([]*Message, error) { + msgs := make([]*Message, 0, len(rms)) + for i, m := range rms { + msg, err := toMessage(m, receiveTime, doneFunc) + if err != nil { + return nil, fmt.Errorf("pubsub: cannot decode the retrieved message at index: %d, message: %+v", i, m) + } + msgs = append(msgs, msg) + } + return msgs, nil +} + +func toMessage(resp *pb.ReceivedMessage, receiveTime time.Time, doneFunc iterDoneFunc) (*Message, error) { + ackh := &psAckHandler{ackID: resp.AckId} + msg := ipubsub.NewMessage(ackh) if resp.Message == nil { - return &Message{ackh: &psAckHandler{ackID: resp.AckId}}, nil + return msg, nil } pubTime, err := ptypes.Timestamp(resp.Message.PublishTime) @@ -77,56 +83,15 @@ func toMessage(resp *pb.ReceivedMessage) (*Message, error) { deliveryAttempt = &da } - return &Message{ - Data: resp.Message.Data, - Attributes: resp.Message.Attributes, - ID: resp.Message.MessageId, - PublishTime: pubTime, - DeliveryAttempt: deliveryAttempt, - OrderingKey: resp.Message.OrderingKey, - ackh: &psAckHandler{ackID: resp.AckId}, - }, nil -} - -// Ack indicates successful processing of a Message passed to the Subscriber.Receive callback. -// It should not be called on any other Message value. -// If message acknowledgement fails, the Message will be redelivered. -// Client code must call Ack or Nack when finished for each received Message. -// Calls to Ack or Nack have no effect after the first call. -func (m *Message) Ack() { - if m.ackh != nil { - m.ackh.OnAck() - } -} - -// Nack indicates that the client will not or cannot process a Message passed to the Subscriber.Receive callback. -// It should not be called on any other Message value. -// Nack will result in the Message being redelivered more quickly than if it were allowed to expire. -// Client code must call Ack or Nack when finished for each received Message. -// Calls to Ack or Nack have no effect after the first call. -func (m *Message) Nack() { - if m.ackh != nil { - m.ackh.OnNack() - } -} - -// ackHandler performs a safe cast of the message's ack handler to psAckHandler. -func (m *Message) ackHandler() (*psAckHandler, bool) { - ackh, ok := m.ackh.(*psAckHandler) - return ackh, ok -} - -func (m *Message) ackID() string { - if ackh, ok := m.ackh.(*psAckHandler); ok { - return ackh.ackID - } - return "" -} - -// ackHandler implements ack/nack handling. -type ackHandler interface { - OnAck() - OnNack() + msg.Data = resp.Message.Data + msg.Attributes = resp.Message.Attributes + msg.ID = resp.Message.MessageId + msg.PublishTime = pubTime + msg.DeliveryAttempt = deliveryAttempt + msg.OrderingKey = resp.Message.OrderingKey + ackh.receiveTime = receiveTime + ackh.doneFunc = doneFunc + return msg, nil } // psAckHandler handles ack/nack for the pubsub package. @@ -140,7 +105,7 @@ type psAckHandler struct { calledDone bool // The done method of the iterator that created this Message. - doneFunc func(string, bool, time.Time) + doneFunc iterDoneFunc } func (ah *psAckHandler) OnAck() { diff --git a/pubsub/service.go b/pubsub/service.go index a22b9147fcc..d9a00021eb0 100644 --- a/pubsub/service.go +++ b/pubsub/service.go @@ -15,13 +15,11 @@ package pubsub import ( - "fmt" "math" "strings" "time" gax "github.com/googleapis/gax-go/v2" - pb "google.golang.org/genproto/googleapis/pubsub/v1" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -37,18 +35,6 @@ const ( maxSendRecvBytes = 20 * 1024 * 1024 // 20M ) -func convertMessages(rms []*pb.ReceivedMessage) ([]*Message, error) { - msgs := make([]*Message, 0, len(rms)) - for i, m := range rms { - msg, err := toMessage(m) - if err != nil { - return nil, fmt.Errorf("pubsub: cannot decode the retrieved message at index: %d, message: %+v", i, m) - } - msgs = append(msgs, msg) - } - return msgs, nil -} - func trunc32(i int64) int32 { if i > math.MaxInt32 { i = math.MaxInt32 diff --git a/pubsub/streaming_pull_test.go b/pubsub/streaming_pull_test.go index e70d2e7abff..3bf571a9516 100644 --- a/pubsub/streaming_pull_test.go +++ b/pubsub/streaming_pull_test.go @@ -67,7 +67,7 @@ func TestStreamingPullMultipleFetches(t *testing.T) { func testStreamingPullIteration(t *testing.T, client *Client, server *mockServer, msgs []*pb.ReceivedMessage) { sub := client.Subscription("S") gotMsgs, err := pullN(context.Background(), sub, len(msgs), func(_ context.Context, m *Message) { - id, err := strconv.Atoi(m.ackID()) + id, err := strconv.Atoi(msgAckID(m)) if err != nil { panic(err) } @@ -83,14 +83,14 @@ func testStreamingPullIteration(t *testing.T, client *Client, server *mockServer } gotMap := map[string]*Message{} for _, m := range gotMsgs { - gotMap[m.ackID()] = m + gotMap[msgAckID(m)] = m } for i, msg := range msgs { - want, err := toMessage(msg) + want, err := toMessage(msg, time.Time{}, nil) if err != nil { t.Fatal(err) } - wantAckh, _ := want.ackHandler() + wantAckh, _ := msgAckHandler(want) wantAckh.calledDone = true got := gotMap[wantAckh.ackID] if got == nil { @@ -236,10 +236,10 @@ func TestStreamingPullConcurrent(t *testing.T) { } seen := map[string]bool{} for _, gm := range gotMsgs { - if seen[gm.ackID()] { - t.Fatalf("duplicate ID %q", gm.ackID()) + if seen[msgAckID(gm)] { + t.Fatalf("duplicate ID %q", msgAckID(gm)) } - seen[gm.ackID()] = true + seen[msgAckID(gm)] = true } if len(seen) != nMessages { t.Fatalf("got %d messages, want %d", len(seen), nMessages) diff --git a/pubsub/subscription.go b/pubsub/subscription.go index 729e34f351f..d1dee3ca9a1 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -916,7 +916,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes // Return nil if the context is done, not err. return nil } - ackh, _ := msg.ackHandler() + ackh, _ := msgAckHandler(msg) old := ackh.doneFunc msgLen := len(msg.Data) ackh.doneFunc = func(ackID string, ack bool, receiveTime time.Time) { diff --git a/pubsub/subscription_test.go b/pubsub/subscription_test.go index 76102068245..6b6033ccdf3 100644 --- a/pubsub/subscription_test.go +++ b/pubsub/subscription_test.go @@ -342,7 +342,7 @@ func TestDeadLettering_toMessage(t *testing.T) { PublishTime: ptypes.TimestampNow(), }, } - got, err := toMessage(receivedMsg) + got, err := toMessage(receivedMsg, time.Time{}, nil) if err != nil { t.Errorf("toMessage failed: %v", err) } @@ -352,7 +352,7 @@ func TestDeadLettering_toMessage(t *testing.T) { // If dead lettering is enabled, toMessage should properly pass through the DeliveryAttempt field. receivedMsg.DeliveryAttempt = 10 - got, err = toMessage(receivedMsg) + got, err = toMessage(receivedMsg, time.Time{}, nil) if err != nil { t.Errorf("toMessage failed: %v", err) } diff --git a/pubsub/topic.go b/pubsub/topic.go index 07e392e0869..91bcaf93428 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -25,6 +25,7 @@ import ( "time" "cloud.google.com/go/iam" + ipubsub "cloud.google.com/go/internal/pubsub" "cloud.google.com/go/pubsub/internal/scheduler" "github.com/golang/protobuf/proto" gax "github.com/googleapis/gax-go/v2" @@ -400,6 +401,16 @@ func (t *Topic) Subscriptions(ctx context.Context) *SubscriptionIterator { var errTopicStopped = errors.New("pubsub: Stop has been called for this topic") +// A PublishResult holds the result from a call to Publish. +// +// Call Get to obtain the result of the Publish call. Example: +// // Get blocks until Publish completes or ctx is done. +// id, err := r.Get(ctx) +// if err != nil { +// // TODO: Handle error. +// } +type PublishResult = ipubsub.PublishResult + // Publish publishes msg to the topic asynchronously. Messages are batched and // sent according to the topic's PublishSettings. Publish never blocks. // @@ -410,9 +421,9 @@ var errTopicStopped = errors.New("pubsub: Stop has been called for this topic") // need to be stopped by calling t.Stop(). Once stopped, future calls to Publish // will immediately return a PublishResult with an error. func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult { - r := &PublishResult{ready: make(chan struct{})} + r := ipubsub.NewPublishResult() if !t.EnableMessageOrdering && msg.OrderingKey != "" { - r.set("", errors.New("Topic.EnableMessageOrdering=false, but an OrderingKey was set in Message. Please remove the OrderingKey or turn on Topic.EnableMessageOrdering")) + ipubsub.SetPublishResult(r, "", errors.New("Topic.EnableMessageOrdering=false, but an OrderingKey was set in Message. Please remove the OrderingKey or turn on Topic.EnableMessageOrdering")) return r } @@ -421,7 +432,7 @@ func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult { // encoded proto message by accounting for the length of an individual // PubSubMessage and Data/Attributes field. // TODO(hongalex): if this turns out to take significant time, try to approximate it. - msg.size = proto.Size(&pb.PublishRequest{ + msgSize := proto.Size(&pb.PublishRequest{ Messages: []*pb.PubsubMessage{ { Data: msg.Data, @@ -435,16 +446,16 @@ func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult { defer t.mu.RUnlock() // TODO(aboulhosn) [from bcmills] consider changing the semantics of bundler to perform this logic so we don't have to do it here if t.stopped { - r.set("", errTopicStopped) + ipubsub.SetPublishResult(r, "", errTopicStopped) return r } // TODO(jba) [from bcmills] consider using a shared channel per bundle // (requires Bundler API changes; would reduce allocations) - err := t.scheduler.Add(msg.OrderingKey, &bundledMessage{msg, r}, msg.size) + err := t.scheduler.Add(msg.OrderingKey, &bundledMessage{msg, r}, msgSize) if err != nil { t.scheduler.Pause(msg.OrderingKey) - r.set("", err) + ipubsub.SetPublishResult(r, "", err) } return r } @@ -463,40 +474,6 @@ func (t *Topic) Stop() { t.scheduler.FlushAndStop() } -// A PublishResult holds the result from a call to Publish. -type PublishResult struct { - ready chan struct{} - serverID string - err error -} - -// Ready returns a channel that is closed when the result is ready. -// When the Ready channel is closed, Get is guaranteed not to block. -func (r *PublishResult) Ready() <-chan struct{} { return r.ready } - -// Get returns the server-generated message ID and/or error result of a Publish call. -// Get blocks until the Publish call completes or the context is done. -func (r *PublishResult) Get(ctx context.Context) (serverID string, err error) { - // If the result is already ready, return it even if the context is done. - select { - case <-r.Ready(): - return r.serverID, r.err - default: - } - select { - case <-ctx.Done(): - return "", ctx.Err() - case <-r.Ready(): - return r.serverID, r.err - } -} - -func (r *PublishResult) set(sid string, err error) { - r.serverID = sid - r.err = err - close(r.ready) -} - type bundledMessage struct { msg *Message res *PublishResult @@ -591,9 +568,9 @@ func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage) PublishedMessages.M(int64(len(bms)))) for i, bm := range bms { if err != nil { - bm.res.set("", err) + ipubsub.SetPublishResult(bm.res, "", err) } else { - bm.res.set(res.MessageIds[i], nil) + ipubsub.SetPublishResult(bm.res, res.MessageIds[i], nil) } } }