From f979acca1781c3924850ec5b1acc01d79b03b428 Mon Sep 17 00:00:00 2001 From: Matt Schallert Date: Wed, 15 Jun 2022 22:35:09 +0000 Subject: [PATCH 1/4] feat(pubsub): report publisher outstanding metrics Fixes https://github.com/googleapis/google-cloud-go/issues/6180. --- pubsub/flow_controller.go | 49 +++++++++++++++++++++++++++++++++++---- pubsub/subscription.go | 2 +- pubsub/topic.go | 2 +- pubsub/trace.go | 20 ++++++++++++++++ 4 files changed, 67 insertions(+), 6 deletions(-) diff --git a/pubsub/flow_controller.go b/pubsub/flow_controller.go index 9b0a19f2327..2770d36733f 100644 --- a/pubsub/flow_controller.go +++ b/pubsub/flow_controller.go @@ -35,6 +35,16 @@ const ( FlowControlSignalError ) +// flowControllerPurpose indicates whether a flowController is for a topic or a +// subscription. +type flowControllerPurpose int + +const ( + flowControllerPurposeUnknown flowControllerPurpose = iota + flowControllerPurposeTopic + flowControllerPurposeSubscription +) + // FlowControlSettings controls flow control for messages while publishing or subscribing. type FlowControlSettings struct { // MaxOutstandingMessages is the maximum number of bufered messages to be published. @@ -73,6 +83,7 @@ type flowController struct { // Number of outstanding bytes remaining. Atomic. bytesRemaining int64 limitBehavior LimitExceededBehavior + purpose flowControllerPurpose } // newFlowController creates a new flowController that ensures no more than @@ -96,6 +107,18 @@ func newFlowController(fc FlowControlSettings) flowController { return f } +func newTopicFlowController(fc FlowControlSettings) flowController { + f := newFlowController(fc) + f.purpose = flowControllerPurposeTopic + return f +} + +func newSubscriptionFlowController(fc FlowControlSettings) flowController { + f := newFlowController(fc) + f.purpose = flowControllerPurposeSubscription + return f +} + // acquire allocates space for a message: the message count and its size. // // In FlowControlSignalError mode, large messages greater than maxSize @@ -135,13 +158,15 @@ func (f *flowController) acquire(ctx context.Context, size int) error { } } } + if f.semCount != nil { outstandingMessages := atomic.AddInt64(&f.countRemaining, 1) - recordStat(ctx, OutstandingMessages, outstandingMessages) + f.recordOutstandingMessages(ctx, outstandingMessages) } + if f.semSize != nil { outstandingBytes := atomic.AddInt64(&f.bytesRemaining, f.bound(size)) - recordStat(ctx, OutstandingBytes, outstandingBytes) + f.recordOutstandingBytes(ctx, outstandingBytes) } return nil } @@ -154,12 +179,12 @@ func (f *flowController) release(ctx context.Context, size int) { if f.semCount != nil { outstandingMessages := atomic.AddInt64(&f.countRemaining, -1) - recordStat(ctx, OutstandingMessages, outstandingMessages) + f.recordOutstandingMessages(ctx, outstandingMessages) f.semCount.Release(1) } if f.semSize != nil { outstandingBytes := atomic.AddInt64(&f.bytesRemaining, -1*f.bound(size)) - recordStat(ctx, OutstandingBytes, outstandingBytes) + f.recordOutstandingBytes(ctx, outstandingBytes) f.semSize.Release(f.bound(size)) } } @@ -176,3 +201,19 @@ func (f *flowController) bound(size int) int64 { func (f *flowController) count() int { return int(atomic.LoadInt64(&f.countRemaining)) } + +func (f *flowController) recordOutstandingMessages(ctx context.Context, n int64) { + if f.purpose == flowControllerPurposeTopic { + recordStat(ctx, PublisherOutstandingMessages, n) + } else { + recordStat(ctx, OutstandingMessages, n) + } +} + +func (f *flowController) recordOutstandingBytes(ctx context.Context, n int64) { + if f.purpose == flowControllerPurposeTopic { + recordStat(ctx, PublisherOutstandingBytes, n) + } else { + recordStat(ctx, OutstandingBytes, n) + } +} diff --git a/pubsub/subscription.go b/pubsub/subscription.go index 0eb37487238..b4867050594 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -903,7 +903,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes maxOutstandingBytes: maxBytes, useLegacyFlowControl: s.ReceiveSettings.UseLegacyFlowControl, } - fc := newFlowController(FlowControlSettings{ + fc := newSubscriptionFlowController(FlowControlSettings{ MaxOutstandingMessages: maxCount, MaxOutstandingBytes: maxBytes, LimitExceededBehavior: FlowControlBlock, diff --git a/pubsub/topic.go b/pubsub/topic.go index cadb91a42f7..9488ef0a620 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -651,7 +651,7 @@ func (t *Topic) initBundler() { fcs.MaxOutstandingMessages = t.PublishSettings.FlowControlSettings.MaxOutstandingMessages } - t.flowController = newFlowController(fcs) + t.flowController = newTopicFlowController(fcs) bufferedByteLimit := DefaultPublishSettings.BufferedByteLimit if t.PublishSettings.BufferedByteLimit > 0 { diff --git a/pubsub/trace.go b/pubsub/trace.go index 84cab3cd3cf..cadc3eb6d50 100644 --- a/pubsub/trace.go +++ b/pubsub/trace.go @@ -92,6 +92,14 @@ var ( // OutstandingBytes is a measure of the number of bytes all outstanding messages held by the client take up. // It is EXPERIMENTAL and subject to change or removal without notice. OutstandingBytes = stats.Int64(statsPrefix+"outstanding_bytes", "Number of outstanding bytes", stats.UnitDimensionless) + + // PublisherOutstandingMessages is a measure of the number of published outstanding messages held by the client before they are processed. + // It is EXPERIMENTAL and subject to change or removal without notice. + PublisherOutstandingMessages = stats.Int64(statsPrefix+"publisher_outstanding_messages", "Number of outstanding publish messages", stats.UnitDimensionless) + + // PublisherOutstandingBytes is a measure of the number of bytes all outstanding publish messages held by the client take up. + // It is EXPERIMENTAL and subject to change or removal without notice. + PublisherOutstandingBytes = stats.Int64(statsPrefix+"publisher_outstanding_bytes", "Number of outstanding publish bytes", stats.UnitDimensionless) ) var ( @@ -146,11 +154,21 @@ var ( // OutstandingBytesView is the last value of OutstandingBytes // It is EXPERIMENTAL and subject to change or removal without notice. OutstandingBytesView *view.View + + // PublisherOutstandingMessagesView is the last value of OutstandingMessages + // It is EXPERIMENTAL and subject to change or removal without notice. + PublisherOutstandingMessagesView *view.View + + // PublisherOutstandingBytesView is the last value of OutstandingBytes + // It is EXPERIMENTAL and subject to change or removal without notice. + PublisherOutstandingBytesView *view.View ) func init() { PublishedMessagesView = createCountView(stats.Measure(PublishedMessages), keyTopic, keyStatus, keyError) PublishLatencyView = createDistView(PublishLatency, keyTopic, keyStatus, keyError) + PublisherOutstandingMessagesView = createLastValueView(PublisherOutstandingMessages, keyTopic) + PublisherOutstandingBytesView = createLastValueView(PublisherOutstandingBytes, keyTopic) PullCountView = createCountView(PullCount, keySubscription) AckCountView = createCountView(AckCount, keySubscription) NackCountView = createCountView(NackCount, keySubscription) @@ -166,6 +184,8 @@ func init() { DefaultPublishViews = []*view.View{ PublishedMessagesView, PublishLatencyView, + PublisherOutstandingMessagesView, + PublisherOutstandingBytesView, } DefaultSubscribeViews = []*view.View{ From 2808de0f15ffca823703d01c163688174aee4df2 Mon Sep 17 00:00:00 2001 From: Matt Schallert Date: Thu, 16 Jun 2022 17:06:23 +0000 Subject: [PATCH 2/4] fix topic tag --- pubsub/topic.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pubsub/topic.go b/pubsub/topic.go index 9488ef0a620..26d05947025 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -529,6 +529,11 @@ type PublishResult = ipubsub.PublishResult // need to be stopped by calling t.Stop(). Once stopped, future calls to Publish // will immediately return a PublishResult with an error. func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult { + ctx, err := tag.New(ctx, tag.Insert(keyStatus, "OK"), tag.Upsert(keyTopic, t.name)) + if err != nil { + log.Printf("pubsub: cannot create context with tag in Publish: %v", err) + } + r := ipubsub.NewPublishResult() if !t.EnableMessageOrdering && msg.OrderingKey != "" { ipubsub.SetPublishResult(r, "", errors.New("Topic.EnableMessageOrdering=false, but an OrderingKey was set in Message. Please remove the OrderingKey or turn on Topic.EnableMessageOrdering")) @@ -557,7 +562,7 @@ func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult { ipubsub.SetPublishResult(r, "", err) return r } - err := t.scheduler.Add(msg.OrderingKey, &bundledMessage{msg, r, msgSize}, msgSize) + err = t.scheduler.Add(msg.OrderingKey, &bundledMessage{msg, r, msgSize}, msgSize) if err != nil { fmt.Printf("got err: %v\n", err) t.scheduler.Pause(msg.OrderingKey) From 1732d312fea338038d3d6dc685a8e2d41af56ea8 Mon Sep 17 00:00:00 2001 From: Matt Schallert Date: Thu, 16 Jun 2022 19:16:14 +0000 Subject: [PATCH 3/4] flowcontroller: check all values on recordStat --- pubsub/flow_controller.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/pubsub/flow_controller.go b/pubsub/flow_controller.go index 2770d36733f..ced27af664a 100644 --- a/pubsub/flow_controller.go +++ b/pubsub/flow_controller.go @@ -203,17 +203,23 @@ func (f *flowController) count() int { } func (f *flowController) recordOutstandingMessages(ctx context.Context, n int64) { - if f.purpose == flowControllerPurposeTopic { + switch f.purpose { + case flowControllerPurposeTopic: recordStat(ctx, PublisherOutstandingMessages, n) - } else { + case flowControllerPurposeSubscription: + fallthrough + default: recordStat(ctx, OutstandingMessages, n) } } func (f *flowController) recordOutstandingBytes(ctx context.Context, n int64) { - if f.purpose == flowControllerPurposeTopic { + switch f.purpose { + case flowControllerPurposeTopic: recordStat(ctx, PublisherOutstandingBytes, n) - } else { + case flowControllerPurposeSubscription: + fallthrough + default: recordStat(ctx, OutstandingBytes, n) } } From 31bae5299d45ae5789302cd002ca753c363e27ca Mon Sep 17 00:00:00 2001 From: Matt Schallert Date: Thu, 23 Jun 2022 00:58:45 +0000 Subject: [PATCH 4/4] remove unknown value --- pubsub/flow_controller.go | 23 +++++++++-------------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/pubsub/flow_controller.go b/pubsub/flow_controller.go index ced27af664a..ad7e0907bfe 100644 --- a/pubsub/flow_controller.go +++ b/pubsub/flow_controller.go @@ -40,9 +40,8 @@ const ( type flowControllerPurpose int const ( - flowControllerPurposeUnknown flowControllerPurpose = iota + flowControllerPurposeSubscription flowControllerPurpose = iota flowControllerPurposeTopic - flowControllerPurposeSubscription ) // FlowControlSettings controls flow control for messages while publishing or subscribing. @@ -203,23 +202,19 @@ func (f *flowController) count() int { } func (f *flowController) recordOutstandingMessages(ctx context.Context, n int64) { - switch f.purpose { - case flowControllerPurposeTopic: + if f.purpose == flowControllerPurposeTopic { recordStat(ctx, PublisherOutstandingMessages, n) - case flowControllerPurposeSubscription: - fallthrough - default: - recordStat(ctx, OutstandingMessages, n) + return } + + recordStat(ctx, OutstandingMessages, n) } func (f *flowController) recordOutstandingBytes(ctx context.Context, n int64) { - switch f.purpose { - case flowControllerPurposeTopic: + if f.purpose == flowControllerPurposeTopic { recordStat(ctx, PublisherOutstandingBytes, n) - case flowControllerPurposeSubscription: - fallthrough - default: - recordStat(ctx, OutstandingBytes, n) + return } + + recordStat(ctx, OutstandingBytes, n) }