Skip to content

Commit

Permalink
pubsub/gcppubsub: Expose ReceivedMessage in As (for AckId) (#3131)
Browse files Browse the repository at this point in the history
  • Loading branch information
vangent committed May 18, 2022
1 parent 60d7929 commit eecd4f0
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 8 deletions.
22 changes: 14 additions & 8 deletions pubsub/gcppubsub/gcppubsub.go
Expand Up @@ -45,7 +45,7 @@
// - Subscription: *raw.SubscriberClient
// - Message.BeforeSend: *pb.PubsubMessage
// - Message.AfterSend: *string for the pb.PublishResponse.MessageIds entry corresponding to the message.
// - Message: *pb.PubsubMessage
// - Message: *pb.PubsubMessage, *pb.ReceivedMessage
// - Error: *google.golang.org/grpc/status.Status
package gcppubsub // import "gocloud.dev/pubsub/gcppubsub"

Expand Down Expand Up @@ -497,27 +497,33 @@ func (s *subscription) ReceiveBatch(ctx context.Context, maxMessages int) ([]*dr

ms := make([]*driver.Message, 0, len(resp.ReceivedMessages))
for _, rm := range resp.ReceivedMessages {
rm := rm
rmm := rm.Message
m := &driver.Message{
LoggableID: rmm.MessageId,
Body: rmm.Data,
Metadata: rmm.Attributes,
AckID: rm.AckId,
AsFunc: messageAsFunc(rmm),
AsFunc: messageAsFunc(rmm, rm),
}
ms = append(ms, m)
}
return ms, nil
}

func messageAsFunc(pm *pb.PubsubMessage) func(interface{}) bool {
func messageAsFunc(pm *pb.PubsubMessage, rm *pb.ReceivedMessage) func(interface{}) bool {
return func(i interface{}) bool {
p, ok := i.(**pb.PubsubMessage)
if !ok {
return false
ip, ok := i.(**pb.PubsubMessage)
if ok {
*ip = pm
return true
}
*p = pm
return true
rp, ok := i.(**pb.ReceivedMessage)
if ok {
*rp = rm
return true
}
return false
}
}

Expand Down
4 changes: 4 additions & 0 deletions pubsub/gcppubsub/gcppubsub_test.go
Expand Up @@ -255,6 +255,10 @@ func (gcpAsTest) MessageCheck(m *pubsub.Message) error {
if !m.As(&ppm) {
return fmt.Errorf("cast failed for %T", &ppm)
}
var prm *pubsubpb.ReceivedMessage
if !m.As(&prm) {
return fmt.Errorf("cast failed for %T", &prm)
}
return nil
}

Expand Down

0 comments on commit eecd4f0

Please sign in to comment.