Skip to content

Commit

Permalink
feat(pubsub): report publisher outstanding metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
schallert committed Jun 15, 2022
1 parent b394500 commit f979acc
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 6 deletions.
49 changes: 45 additions & 4 deletions pubsub/flow_controller.go
Expand Up @@ -35,6 +35,16 @@ const (
FlowControlSignalError
)

// flowControllerPurpose indicates whether a flowController is for a topic or a
// subscription.
type flowControllerPurpose int

const (
flowControllerPurposeUnknown flowControllerPurpose = iota
flowControllerPurposeTopic
flowControllerPurposeSubscription
)

// FlowControlSettings controls flow control for messages while publishing or subscribing.
type FlowControlSettings struct {
// MaxOutstandingMessages is the maximum number of bufered messages to be published.
Expand Down Expand Up @@ -73,6 +83,7 @@ type flowController struct {
// Number of outstanding bytes remaining. Atomic.
bytesRemaining int64
limitBehavior LimitExceededBehavior
purpose flowControllerPurpose
}

// newFlowController creates a new flowController that ensures no more than
Expand All @@ -96,6 +107,18 @@ func newFlowController(fc FlowControlSettings) flowController {
return f
}

func newTopicFlowController(fc FlowControlSettings) flowController {
f := newFlowController(fc)
f.purpose = flowControllerPurposeTopic
return f
}

func newSubscriptionFlowController(fc FlowControlSettings) flowController {
f := newFlowController(fc)
f.purpose = flowControllerPurposeSubscription
return f
}

// acquire allocates space for a message: the message count and its size.
//
// In FlowControlSignalError mode, large messages greater than maxSize
Expand Down Expand Up @@ -135,13 +158,15 @@ func (f *flowController) acquire(ctx context.Context, size int) error {
}
}
}

if f.semCount != nil {
outstandingMessages := atomic.AddInt64(&f.countRemaining, 1)
recordStat(ctx, OutstandingMessages, outstandingMessages)
f.recordOutstandingMessages(ctx, outstandingMessages)
}

if f.semSize != nil {
outstandingBytes := atomic.AddInt64(&f.bytesRemaining, f.bound(size))
recordStat(ctx, OutstandingBytes, outstandingBytes)
f.recordOutstandingBytes(ctx, outstandingBytes)
}
return nil
}
Expand All @@ -154,12 +179,12 @@ func (f *flowController) release(ctx context.Context, size int) {

if f.semCount != nil {
outstandingMessages := atomic.AddInt64(&f.countRemaining, -1)
recordStat(ctx, OutstandingMessages, outstandingMessages)
f.recordOutstandingMessages(ctx, outstandingMessages)
f.semCount.Release(1)
}
if f.semSize != nil {
outstandingBytes := atomic.AddInt64(&f.bytesRemaining, -1*f.bound(size))
recordStat(ctx, OutstandingBytes, outstandingBytes)
f.recordOutstandingBytes(ctx, outstandingBytes)
f.semSize.Release(f.bound(size))
}
}
Expand All @@ -176,3 +201,19 @@ func (f *flowController) bound(size int) int64 {
func (f *flowController) count() int {
return int(atomic.LoadInt64(&f.countRemaining))
}

func (f *flowController) recordOutstandingMessages(ctx context.Context, n int64) {
if f.purpose == flowControllerPurposeTopic {
recordStat(ctx, PublisherOutstandingMessages, n)
} else {
recordStat(ctx, OutstandingMessages, n)
}
}

func (f *flowController) recordOutstandingBytes(ctx context.Context, n int64) {
if f.purpose == flowControllerPurposeTopic {
recordStat(ctx, PublisherOutstandingBytes, n)
} else {
recordStat(ctx, OutstandingBytes, n)
}
}
2 changes: 1 addition & 1 deletion pubsub/subscription.go
Expand Up @@ -903,7 +903,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
maxOutstandingBytes: maxBytes,
useLegacyFlowControl: s.ReceiveSettings.UseLegacyFlowControl,
}
fc := newFlowController(FlowControlSettings{
fc := newSubscriptionFlowController(FlowControlSettings{
MaxOutstandingMessages: maxCount,
MaxOutstandingBytes: maxBytes,
LimitExceededBehavior: FlowControlBlock,
Expand Down
2 changes: 1 addition & 1 deletion pubsub/topic.go
Expand Up @@ -651,7 +651,7 @@ func (t *Topic) initBundler() {
fcs.MaxOutstandingMessages = t.PublishSettings.FlowControlSettings.MaxOutstandingMessages
}

t.flowController = newFlowController(fcs)
t.flowController = newTopicFlowController(fcs)

bufferedByteLimit := DefaultPublishSettings.BufferedByteLimit
if t.PublishSettings.BufferedByteLimit > 0 {
Expand Down
20 changes: 20 additions & 0 deletions pubsub/trace.go
Expand Up @@ -92,6 +92,14 @@ var (
// OutstandingBytes is a measure of the number of bytes all outstanding messages held by the client take up.
// It is EXPERIMENTAL and subject to change or removal without notice.
OutstandingBytes = stats.Int64(statsPrefix+"outstanding_bytes", "Number of outstanding bytes", stats.UnitDimensionless)

// PublisherOutstandingMessages is a measure of the number of published outstanding messages held by the client before they are processed.
// It is EXPERIMENTAL and subject to change or removal without notice.
PublisherOutstandingMessages = stats.Int64(statsPrefix+"publisher_outstanding_messages", "Number of outstanding publish messages", stats.UnitDimensionless)

// PublisherOutstandingBytes is a measure of the number of bytes all outstanding publish messages held by the client take up.
// It is EXPERIMENTAL and subject to change or removal without notice.
PublisherOutstandingBytes = stats.Int64(statsPrefix+"publisher_outstanding_bytes", "Number of outstanding publish bytes", stats.UnitDimensionless)
)

var (
Expand Down Expand Up @@ -146,11 +154,21 @@ var (
// OutstandingBytesView is the last value of OutstandingBytes
// It is EXPERIMENTAL and subject to change or removal without notice.
OutstandingBytesView *view.View

// PublisherOutstandingMessagesView is the last value of OutstandingMessages
// It is EXPERIMENTAL and subject to change or removal without notice.
PublisherOutstandingMessagesView *view.View

// PublisherOutstandingBytesView is the last value of OutstandingBytes
// It is EXPERIMENTAL and subject to change or removal without notice.
PublisherOutstandingBytesView *view.View
)

func init() {
PublishedMessagesView = createCountView(stats.Measure(PublishedMessages), keyTopic, keyStatus, keyError)
PublishLatencyView = createDistView(PublishLatency, keyTopic, keyStatus, keyError)
PublisherOutstandingMessagesView = createLastValueView(PublisherOutstandingMessages, keyTopic)
PublisherOutstandingBytesView = createLastValueView(PublisherOutstandingBytes, keyTopic)
PullCountView = createCountView(PullCount, keySubscription)
AckCountView = createCountView(AckCount, keySubscription)
NackCountView = createCountView(NackCount, keySubscription)
Expand All @@ -166,6 +184,8 @@ func init() {
DefaultPublishViews = []*view.View{
PublishedMessagesView,
PublishLatencyView,
PublisherOutstandingMessagesView,
PublisherOutstandingBytesView,
}

DefaultSubscribeViews = []*view.View{
Expand Down

0 comments on commit f979acc

Please sign in to comment.