Skip to content

Commit

Permalink
feat(pubsub): report publisher outstanding metrics (#6187)
Browse files Browse the repository at this point in the history
* feat(pubsub): report publisher outstanding metrics

Fixes #6180.

* fix topic tag

* flowcontroller: check all values on recordStat

* remove unknown value

Co-authored-by: Alex Hong <9397363+hongalex@users.noreply.github.com>
  • Loading branch information
schallert and hongalex committed Jun 23, 2022
1 parent f01bf32 commit cc1528b
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 7 deletions.
50 changes: 46 additions & 4 deletions pubsub/flow_controller.go
Expand Up @@ -35,6 +35,15 @@ const (
FlowControlSignalError
)

// flowControllerPurpose indicates whether a flowController is for a topic or a
// subscription.
type flowControllerPurpose int

const (
flowControllerPurposeSubscription flowControllerPurpose = iota
flowControllerPurposeTopic
)

// FlowControlSettings controls flow control for messages while publishing or subscribing.
type FlowControlSettings struct {
// MaxOutstandingMessages is the maximum number of bufered messages to be published.
Expand Down Expand Up @@ -73,6 +82,7 @@ type flowController struct {
// Number of outstanding bytes remaining. Atomic.
bytesRemaining int64
limitBehavior LimitExceededBehavior
purpose flowControllerPurpose
}

// newFlowController creates a new flowController that ensures no more than
Expand All @@ -96,6 +106,18 @@ func newFlowController(fc FlowControlSettings) flowController {
return f
}

func newTopicFlowController(fc FlowControlSettings) flowController {
f := newFlowController(fc)
f.purpose = flowControllerPurposeTopic
return f
}

func newSubscriptionFlowController(fc FlowControlSettings) flowController {
f := newFlowController(fc)
f.purpose = flowControllerPurposeSubscription
return f
}

// acquire allocates space for a message: the message count and its size.
//
// In FlowControlSignalError mode, large messages greater than maxSize
Expand Down Expand Up @@ -135,13 +157,15 @@ func (f *flowController) acquire(ctx context.Context, size int) error {
}
}
}

if f.semCount != nil {
outstandingMessages := atomic.AddInt64(&f.countRemaining, 1)
recordStat(ctx, OutstandingMessages, outstandingMessages)
f.recordOutstandingMessages(ctx, outstandingMessages)
}

if f.semSize != nil {
outstandingBytes := atomic.AddInt64(&f.bytesRemaining, f.bound(size))
recordStat(ctx, OutstandingBytes, outstandingBytes)
f.recordOutstandingBytes(ctx, outstandingBytes)
}
return nil
}
Expand All @@ -154,12 +178,12 @@ func (f *flowController) release(ctx context.Context, size int) {

if f.semCount != nil {
outstandingMessages := atomic.AddInt64(&f.countRemaining, -1)
recordStat(ctx, OutstandingMessages, outstandingMessages)
f.recordOutstandingMessages(ctx, outstandingMessages)
f.semCount.Release(1)
}
if f.semSize != nil {
outstandingBytes := atomic.AddInt64(&f.bytesRemaining, -1*f.bound(size))
recordStat(ctx, OutstandingBytes, outstandingBytes)
f.recordOutstandingBytes(ctx, outstandingBytes)
f.semSize.Release(f.bound(size))
}
}
Expand All @@ -176,3 +200,21 @@ func (f *flowController) bound(size int) int64 {
func (f *flowController) count() int {
return int(atomic.LoadInt64(&f.countRemaining))
}

func (f *flowController) recordOutstandingMessages(ctx context.Context, n int64) {
if f.purpose == flowControllerPurposeTopic {
recordStat(ctx, PublisherOutstandingMessages, n)
return
}

recordStat(ctx, OutstandingMessages, n)
}

func (f *flowController) recordOutstandingBytes(ctx context.Context, n int64) {
if f.purpose == flowControllerPurposeTopic {
recordStat(ctx, PublisherOutstandingBytes, n)
return
}

recordStat(ctx, OutstandingBytes, n)
}
2 changes: 1 addition & 1 deletion pubsub/subscription.go
Expand Up @@ -1032,7 +1032,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
maxOutstandingBytes: maxBytes,
useLegacyFlowControl: s.ReceiveSettings.UseLegacyFlowControl,
}
fc := newFlowController(FlowControlSettings{
fc := newSubscriptionFlowController(FlowControlSettings{
MaxOutstandingMessages: maxCount,
MaxOutstandingBytes: maxBytes,
LimitExceededBehavior: FlowControlBlock,
Expand Down
9 changes: 7 additions & 2 deletions pubsub/topic.go
Expand Up @@ -529,6 +529,11 @@ type PublishResult = ipubsub.PublishResult
// need to be stopped by calling t.Stop(). Once stopped, future calls to Publish
// will immediately return a PublishResult with an error.
func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult {
ctx, err := tag.New(ctx, tag.Insert(keyStatus, "OK"), tag.Upsert(keyTopic, t.name))
if err != nil {
log.Printf("pubsub: cannot create context with tag in Publish: %v", err)
}

r := ipubsub.NewPublishResult()
if !t.EnableMessageOrdering && msg.OrderingKey != "" {
ipubsub.SetPublishResult(r, "", errors.New("Topic.EnableMessageOrdering=false, but an OrderingKey was set in Message. Please remove the OrderingKey or turn on Topic.EnableMessageOrdering"))
Expand Down Expand Up @@ -557,7 +562,7 @@ func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult {
ipubsub.SetPublishResult(r, "", err)
return r
}
err := t.scheduler.Add(msg.OrderingKey, &bundledMessage{msg, r, msgSize}, msgSize)
err = t.scheduler.Add(msg.OrderingKey, &bundledMessage{msg, r, msgSize}, msgSize)
if err != nil {
fmt.Printf("got err: %v\n", err)
t.scheduler.Pause(msg.OrderingKey)
Expand Down Expand Up @@ -651,7 +656,7 @@ func (t *Topic) initBundler() {
fcs.MaxOutstandingMessages = t.PublishSettings.FlowControlSettings.MaxOutstandingMessages
}

t.flowController = newFlowController(fcs)
t.flowController = newTopicFlowController(fcs)

bufferedByteLimit := DefaultPublishSettings.BufferedByteLimit
if t.PublishSettings.BufferedByteLimit > 0 {
Expand Down
20 changes: 20 additions & 0 deletions pubsub/trace.go
Expand Up @@ -92,6 +92,14 @@ var (
// OutstandingBytes is a measure of the number of bytes all outstanding messages held by the client take up.
// It is EXPERIMENTAL and subject to change or removal without notice.
OutstandingBytes = stats.Int64(statsPrefix+"outstanding_bytes", "Number of outstanding bytes", stats.UnitDimensionless)

// PublisherOutstandingMessages is a measure of the number of published outstanding messages held by the client before they are processed.
// It is EXPERIMENTAL and subject to change or removal without notice.
PublisherOutstandingMessages = stats.Int64(statsPrefix+"publisher_outstanding_messages", "Number of outstanding publish messages", stats.UnitDimensionless)

// PublisherOutstandingBytes is a measure of the number of bytes all outstanding publish messages held by the client take up.
// It is EXPERIMENTAL and subject to change or removal without notice.
PublisherOutstandingBytes = stats.Int64(statsPrefix+"publisher_outstanding_bytes", "Number of outstanding publish bytes", stats.UnitDimensionless)
)

var (
Expand Down Expand Up @@ -146,11 +154,21 @@ var (
// OutstandingBytesView is the last value of OutstandingBytes
// It is EXPERIMENTAL and subject to change or removal without notice.
OutstandingBytesView *view.View

// PublisherOutstandingMessagesView is the last value of OutstandingMessages
// It is EXPERIMENTAL and subject to change or removal without notice.
PublisherOutstandingMessagesView *view.View

// PublisherOutstandingBytesView is the last value of OutstandingBytes
// It is EXPERIMENTAL and subject to change or removal without notice.
PublisherOutstandingBytesView *view.View
)

func init() {
PublishedMessagesView = createCountView(stats.Measure(PublishedMessages), keyTopic, keyStatus, keyError)
PublishLatencyView = createDistView(PublishLatency, keyTopic, keyStatus, keyError)
PublisherOutstandingMessagesView = createLastValueView(PublisherOutstandingMessages, keyTopic)
PublisherOutstandingBytesView = createLastValueView(PublisherOutstandingBytes, keyTopic)
PullCountView = createCountView(PullCount, keySubscription)
AckCountView = createCountView(AckCount, keySubscription)
NackCountView = createCountView(NackCount, keySubscription)
Expand All @@ -166,6 +184,8 @@ func init() {
DefaultPublishViews = []*view.View{
PublishedMessagesView,
PublishLatencyView,
PublisherOutstandingMessagesView,
PublisherOutstandingBytesView,
}

DefaultSubscribeViews = []*view.View{
Expand Down

0 comments on commit cc1528b

Please sign in to comment.