From 7320ae8012ec30731bb5ad0781e1de54c64c8154 Mon Sep 17 00:00:00 2001 From: Matt Schallert Date: Wed, 15 Jun 2022 22:35:09 +0000 Subject: [PATCH] feat(pubsub): report publisher outstanding metrics Fixes https://github.com/googleapis/google-cloud-go/issues/6180. --- pubsub/flow_controller.go | 49 +++++++++++++++++++++++++++++++++++---- pubsub/subscription.go | 2 +- pubsub/topic.go | 2 +- pubsub/trace.go | 20 ++++++++++++++++ 4 files changed, 67 insertions(+), 6 deletions(-) diff --git a/pubsub/flow_controller.go b/pubsub/flow_controller.go index 4fa86b67146..a7ab60e3682 100644 --- a/pubsub/flow_controller.go +++ b/pubsub/flow_controller.go @@ -35,6 +35,16 @@ const ( FlowControlSignalError ) +// flowControllerPurpose indicates whether a flowController is for a topic or a +// subscription. +type flowControllerPurpose int + +const ( + flowControllerPurposeUnknown flowControllerPurpose = iota + flowControllerPurposeTopic + flowControllerPurposeSubscription +) + // FlowControlSettings controls flow control for messages while publishing or subscribing. type FlowControlSettings struct { // MaxOutstandingMessages is the maximum number of bufered messages to be published. @@ -73,6 +83,7 @@ type flowController struct { // Number of outstanding bytes remaining. Atomic. bytesRemaining int64 limitBehavior LimitExceededBehavior + purpose flowControllerPurpose } // newFlowController creates a new flowController that ensures no more than @@ -96,6 +107,18 @@ func newFlowController(fc FlowControlSettings) flowController { return f } +func newTopicFlowController(fc FlowControlSettings) flowController { + f := newFlowController(fc) + f.purpose = flowControllerPurposeTopic + return f +} + +func newSubscriptionFlowController(fc FlowControlSettings) flowController { + f := newFlowController(fc) + f.purpose = flowControllerPurposeSubscription + return f +} + // acquire allocates space for a message: the message count and its size. // // In FlowControlSignalError mode, large messages greater than maxSize @@ -135,13 +158,15 @@ func (f *flowController) acquire(ctx context.Context, size int) error { } } } + if f.semCount != nil { outstandingMessages := atomic.AddInt64(&f.countRemaining, 1) - recordStat(ctx, OutstandingMessages, outstandingMessages) + f.recordOutstandingMessages(ctx, outstandingMessages) } + if f.semSize != nil { outstandingBytes := atomic.AddInt64(&f.bytesRemaining, f.bound(size)) - recordStat(ctx, OutstandingBytes, outstandingBytes) + f.recordOutstandingBytes(ctx, outstandingBytes) } return nil } @@ -154,12 +179,12 @@ func (f *flowController) release(ctx context.Context, size int) { if f.semCount != nil { outstandingMessages := atomic.AddInt64(&f.countRemaining, -1) - recordStat(ctx, OutstandingMessages, outstandingMessages) + f.recordOutstandingMessages(ctx, outstandingMessages) f.semCount.Release(1) } if f.semSize != nil { outstandingBytes := atomic.AddInt64(&f.bytesRemaining, -1*f.bound(size)) - recordStat(ctx, OutstandingBytes, outstandingBytes) + f.recordOutstandingBytes(ctx, outstandingBytes) f.semSize.Release(f.bound(size)) } } @@ -176,3 +201,19 @@ func (f *flowController) bound(size int) int64 { func (f *flowController) count() int { return int(atomic.LoadInt64(&f.countRemaining)) } + +func (f *flowController) recordOutstandingMessages(ctx context.Context, n int64) { + if f.purpose == flowControllerPurposeTopic { + recordStat(ctx, PublisherOutstandingMessages, n) + } else { + recordStat(ctx, OutstandingMessages, n) + } +} + +func (f *flowController) recordOutstandingBytes(ctx context.Context, n int64) { + if f.purpose == flowControllerPurposeTopic { + recordStat(ctx, PublisherOutstandingBytes, n) + } else { + recordStat(ctx, OutstandingBytes, n) + } +} diff --git a/pubsub/subscription.go b/pubsub/subscription.go index 17736544507..416c521e9bf 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -889,7 +889,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes maxOutstandingBytes: maxBytes, useLegacyFlowControl: s.ReceiveSettings.UseLegacyFlowControl, } - fc := newFlowController(FlowControlSettings{ + fc := newSubscriptionFlowController(FlowControlSettings{ MaxOutstandingMessages: maxCount, MaxOutstandingBytes: maxBytes, LimitExceededBehavior: FlowControlBlock, diff --git a/pubsub/topic.go b/pubsub/topic.go index a90503cc8a6..8b7aa060dcd 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -641,7 +641,7 @@ func (t *Topic) initBundler() { fcs.MaxOutstandingMessages = t.PublishSettings.FlowControlSettings.MaxOutstandingMessages } - t.flowController = newFlowController(fcs) + t.flowController = newTopicFlowController(fcs) bufferedByteLimit := DefaultPublishSettings.BufferedByteLimit if t.PublishSettings.BufferedByteLimit > 0 { diff --git a/pubsub/trace.go b/pubsub/trace.go index 84cab3cd3cf..cadc3eb6d50 100644 --- a/pubsub/trace.go +++ b/pubsub/trace.go @@ -92,6 +92,14 @@ var ( // OutstandingBytes is a measure of the number of bytes all outstanding messages held by the client take up. // It is EXPERIMENTAL and subject to change or removal without notice. OutstandingBytes = stats.Int64(statsPrefix+"outstanding_bytes", "Number of outstanding bytes", stats.UnitDimensionless) + + // PublisherOutstandingMessages is a measure of the number of published outstanding messages held by the client before they are processed. + // It is EXPERIMENTAL and subject to change or removal without notice. + PublisherOutstandingMessages = stats.Int64(statsPrefix+"publisher_outstanding_messages", "Number of outstanding publish messages", stats.UnitDimensionless) + + // PublisherOutstandingBytes is a measure of the number of bytes all outstanding publish messages held by the client take up. + // It is EXPERIMENTAL and subject to change or removal without notice. + PublisherOutstandingBytes = stats.Int64(statsPrefix+"publisher_outstanding_bytes", "Number of outstanding publish bytes", stats.UnitDimensionless) ) var ( @@ -146,11 +154,21 @@ var ( // OutstandingBytesView is the last value of OutstandingBytes // It is EXPERIMENTAL and subject to change or removal without notice. OutstandingBytesView *view.View + + // PublisherOutstandingMessagesView is the last value of OutstandingMessages + // It is EXPERIMENTAL and subject to change or removal without notice. + PublisherOutstandingMessagesView *view.View + + // PublisherOutstandingBytesView is the last value of OutstandingBytes + // It is EXPERIMENTAL and subject to change or removal without notice. + PublisherOutstandingBytesView *view.View ) func init() { PublishedMessagesView = createCountView(stats.Measure(PublishedMessages), keyTopic, keyStatus, keyError) PublishLatencyView = createDistView(PublishLatency, keyTopic, keyStatus, keyError) + PublisherOutstandingMessagesView = createLastValueView(PublisherOutstandingMessages, keyTopic) + PublisherOutstandingBytesView = createLastValueView(PublisherOutstandingBytes, keyTopic) PullCountView = createCountView(PullCount, keySubscription) AckCountView = createCountView(AckCount, keySubscription) NackCountView = createCountView(NackCount, keySubscription) @@ -166,6 +184,8 @@ func init() { DefaultPublishViews = []*view.View{ PublishedMessagesView, PublishLatencyView, + PublisherOutstandingMessagesView, + PublisherOutstandingBytesView, } DefaultSubscribeViews = []*view.View{