From cc1528b2bfebbb48d49bcacd639abf2cf3468c96 Mon Sep 17 00:00:00 2001 From: Matt Schallert Date: Thu, 23 Jun 2022 09:45:19 -0700 Subject: [PATCH] feat(pubsub): report publisher outstanding metrics (#6187) * feat(pubsub): report publisher outstanding metrics Fixes https://github.com/googleapis/google-cloud-go/issues/6180. * fix topic tag * flowcontroller: check all values on recordStat * remove unknown value Co-authored-by: Alex Hong <9397363+hongalex@users.noreply.github.com> --- pubsub/flow_controller.go | 50 +++++++++++++++++++++++++++++++++++---- pubsub/subscription.go | 2 +- pubsub/topic.go | 9 +++++-- pubsub/trace.go | 20 ++++++++++++++++ 4 files changed, 74 insertions(+), 7 deletions(-) diff --git a/pubsub/flow_controller.go b/pubsub/flow_controller.go index 9b0a19f2327..ad7e0907bfe 100644 --- a/pubsub/flow_controller.go +++ b/pubsub/flow_controller.go @@ -35,6 +35,15 @@ const ( FlowControlSignalError ) +// flowControllerPurpose indicates whether a flowController is for a topic or a +// subscription. +type flowControllerPurpose int + +const ( + flowControllerPurposeSubscription flowControllerPurpose = iota + flowControllerPurposeTopic +) + // FlowControlSettings controls flow control for messages while publishing or subscribing. type FlowControlSettings struct { // MaxOutstandingMessages is the maximum number of bufered messages to be published. @@ -73,6 +82,7 @@ type flowController struct { // Number of outstanding bytes remaining. Atomic. bytesRemaining int64 limitBehavior LimitExceededBehavior + purpose flowControllerPurpose } // newFlowController creates a new flowController that ensures no more than @@ -96,6 +106,18 @@ func newFlowController(fc FlowControlSettings) flowController { return f } +func newTopicFlowController(fc FlowControlSettings) flowController { + f := newFlowController(fc) + f.purpose = flowControllerPurposeTopic + return f +} + +func newSubscriptionFlowController(fc FlowControlSettings) flowController { + f := newFlowController(fc) + f.purpose = flowControllerPurposeSubscription + return f +} + // acquire allocates space for a message: the message count and its size. // // In FlowControlSignalError mode, large messages greater than maxSize @@ -135,13 +157,15 @@ func (f *flowController) acquire(ctx context.Context, size int) error { } } } + if f.semCount != nil { outstandingMessages := atomic.AddInt64(&f.countRemaining, 1) - recordStat(ctx, OutstandingMessages, outstandingMessages) + f.recordOutstandingMessages(ctx, outstandingMessages) } + if f.semSize != nil { outstandingBytes := atomic.AddInt64(&f.bytesRemaining, f.bound(size)) - recordStat(ctx, OutstandingBytes, outstandingBytes) + f.recordOutstandingBytes(ctx, outstandingBytes) } return nil } @@ -154,12 +178,12 @@ func (f *flowController) release(ctx context.Context, size int) { if f.semCount != nil { outstandingMessages := atomic.AddInt64(&f.countRemaining, -1) - recordStat(ctx, OutstandingMessages, outstandingMessages) + f.recordOutstandingMessages(ctx, outstandingMessages) f.semCount.Release(1) } if f.semSize != nil { outstandingBytes := atomic.AddInt64(&f.bytesRemaining, -1*f.bound(size)) - recordStat(ctx, OutstandingBytes, outstandingBytes) + f.recordOutstandingBytes(ctx, outstandingBytes) f.semSize.Release(f.bound(size)) } } @@ -176,3 +200,21 @@ func (f *flowController) bound(size int) int64 { func (f *flowController) count() int { return int(atomic.LoadInt64(&f.countRemaining)) } + +func (f *flowController) recordOutstandingMessages(ctx context.Context, n int64) { + if f.purpose == flowControllerPurposeTopic { + recordStat(ctx, PublisherOutstandingMessages, n) + return + } + + recordStat(ctx, OutstandingMessages, n) +} + +func (f *flowController) recordOutstandingBytes(ctx context.Context, n int64) { + if f.purpose == flowControllerPurposeTopic { + recordStat(ctx, PublisherOutstandingBytes, n) + return + } + + recordStat(ctx, OutstandingBytes, n) +} diff --git a/pubsub/subscription.go b/pubsub/subscription.go index daaf6748df9..7b5339a3279 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -1032,7 +1032,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes maxOutstandingBytes: maxBytes, useLegacyFlowControl: s.ReceiveSettings.UseLegacyFlowControl, } - fc := newFlowController(FlowControlSettings{ + fc := newSubscriptionFlowController(FlowControlSettings{ MaxOutstandingMessages: maxCount, MaxOutstandingBytes: maxBytes, LimitExceededBehavior: FlowControlBlock, diff --git a/pubsub/topic.go b/pubsub/topic.go index cadb91a42f7..26d05947025 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -529,6 +529,11 @@ type PublishResult = ipubsub.PublishResult // need to be stopped by calling t.Stop(). Once stopped, future calls to Publish // will immediately return a PublishResult with an error. func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult { + ctx, err := tag.New(ctx, tag.Insert(keyStatus, "OK"), tag.Upsert(keyTopic, t.name)) + if err != nil { + log.Printf("pubsub: cannot create context with tag in Publish: %v", err) + } + r := ipubsub.NewPublishResult() if !t.EnableMessageOrdering && msg.OrderingKey != "" { ipubsub.SetPublishResult(r, "", errors.New("Topic.EnableMessageOrdering=false, but an OrderingKey was set in Message. Please remove the OrderingKey or turn on Topic.EnableMessageOrdering")) @@ -557,7 +562,7 @@ func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult { ipubsub.SetPublishResult(r, "", err) return r } - err := t.scheduler.Add(msg.OrderingKey, &bundledMessage{msg, r, msgSize}, msgSize) + err = t.scheduler.Add(msg.OrderingKey, &bundledMessage{msg, r, msgSize}, msgSize) if err != nil { fmt.Printf("got err: %v\n", err) t.scheduler.Pause(msg.OrderingKey) @@ -651,7 +656,7 @@ func (t *Topic) initBundler() { fcs.MaxOutstandingMessages = t.PublishSettings.FlowControlSettings.MaxOutstandingMessages } - t.flowController = newFlowController(fcs) + t.flowController = newTopicFlowController(fcs) bufferedByteLimit := DefaultPublishSettings.BufferedByteLimit if t.PublishSettings.BufferedByteLimit > 0 { diff --git a/pubsub/trace.go b/pubsub/trace.go index 84cab3cd3cf..cadc3eb6d50 100644 --- a/pubsub/trace.go +++ b/pubsub/trace.go @@ -92,6 +92,14 @@ var ( // OutstandingBytes is a measure of the number of bytes all outstanding messages held by the client take up. // It is EXPERIMENTAL and subject to change or removal without notice. OutstandingBytes = stats.Int64(statsPrefix+"outstanding_bytes", "Number of outstanding bytes", stats.UnitDimensionless) + + // PublisherOutstandingMessages is a measure of the number of published outstanding messages held by the client before they are processed. + // It is EXPERIMENTAL and subject to change or removal without notice. + PublisherOutstandingMessages = stats.Int64(statsPrefix+"publisher_outstanding_messages", "Number of outstanding publish messages", stats.UnitDimensionless) + + // PublisherOutstandingBytes is a measure of the number of bytes all outstanding publish messages held by the client take up. + // It is EXPERIMENTAL and subject to change or removal without notice. + PublisherOutstandingBytes = stats.Int64(statsPrefix+"publisher_outstanding_bytes", "Number of outstanding publish bytes", stats.UnitDimensionless) ) var ( @@ -146,11 +154,21 @@ var ( // OutstandingBytesView is the last value of OutstandingBytes // It is EXPERIMENTAL and subject to change or removal without notice. OutstandingBytesView *view.View + + // PublisherOutstandingMessagesView is the last value of OutstandingMessages + // It is EXPERIMENTAL and subject to change or removal without notice. + PublisherOutstandingMessagesView *view.View + + // PublisherOutstandingBytesView is the last value of OutstandingBytes + // It is EXPERIMENTAL and subject to change or removal without notice. + PublisherOutstandingBytesView *view.View ) func init() { PublishedMessagesView = createCountView(stats.Measure(PublishedMessages), keyTopic, keyStatus, keyError) PublishLatencyView = createDistView(PublishLatency, keyTopic, keyStatus, keyError) + PublisherOutstandingMessagesView = createLastValueView(PublisherOutstandingMessages, keyTopic) + PublisherOutstandingBytesView = createLastValueView(PublisherOutstandingBytes, keyTopic) PullCountView = createCountView(PullCount, keySubscription) AckCountView = createCountView(AckCount, keySubscription) NackCountView = createCountView(NackCount, keySubscription) @@ -166,6 +184,8 @@ func init() { DefaultPublishViews = []*view.View{ PublishedMessagesView, PublishLatencyView, + PublisherOutstandingMessagesView, + PublisherOutstandingBytesView, } DefaultSubscribeViews = []*view.View{