Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsub): report publisher outstanding metrics #6187

Merged
50 changes: 46 additions & 4 deletions pubsub/flow_controller.go
Expand Up @@ -35,6 +35,15 @@ const (
FlowControlSignalError
)

// flowControllerPurpose indicates whether a flowController is for a topic or a
// subscription.
type flowControllerPurpose int

const (
flowControllerPurposeSubscription flowControllerPurpose = iota
flowControllerPurposeTopic
)

// FlowControlSettings controls flow control for messages while publishing or subscribing.
type FlowControlSettings struct {
// MaxOutstandingMessages is the maximum number of bufered messages to be published.
Expand Down Expand Up @@ -73,6 +82,7 @@ type flowController struct {
// Number of outstanding bytes remaining. Atomic.
bytesRemaining int64
limitBehavior LimitExceededBehavior
purpose flowControllerPurpose
}

// newFlowController creates a new flowController that ensures no more than
Expand All @@ -96,6 +106,18 @@ func newFlowController(fc FlowControlSettings) flowController {
return f
}

func newTopicFlowController(fc FlowControlSettings) flowController {
f := newFlowController(fc)
f.purpose = flowControllerPurposeTopic
return f
}

func newSubscriptionFlowController(fc FlowControlSettings) flowController {
f := newFlowController(fc)
f.purpose = flowControllerPurposeSubscription
return f
}

// acquire allocates space for a message: the message count and its size.
//
// In FlowControlSignalError mode, large messages greater than maxSize
Expand Down Expand Up @@ -135,13 +157,15 @@ func (f *flowController) acquire(ctx context.Context, size int) error {
}
}
}

if f.semCount != nil {
outstandingMessages := atomic.AddInt64(&f.countRemaining, 1)
recordStat(ctx, OutstandingMessages, outstandingMessages)
f.recordOutstandingMessages(ctx, outstandingMessages)
}

if f.semSize != nil {
outstandingBytes := atomic.AddInt64(&f.bytesRemaining, f.bound(size))
recordStat(ctx, OutstandingBytes, outstandingBytes)
f.recordOutstandingBytes(ctx, outstandingBytes)
}
return nil
}
Expand All @@ -154,12 +178,12 @@ func (f *flowController) release(ctx context.Context, size int) {

if f.semCount != nil {
outstandingMessages := atomic.AddInt64(&f.countRemaining, -1)
recordStat(ctx, OutstandingMessages, outstandingMessages)
f.recordOutstandingMessages(ctx, outstandingMessages)
f.semCount.Release(1)
}
if f.semSize != nil {
outstandingBytes := atomic.AddInt64(&f.bytesRemaining, -1*f.bound(size))
recordStat(ctx, OutstandingBytes, outstandingBytes)
f.recordOutstandingBytes(ctx, outstandingBytes)
f.semSize.Release(f.bound(size))
}
}
Expand All @@ -176,3 +200,21 @@ func (f *flowController) bound(size int) int64 {
func (f *flowController) count() int {
return int(atomic.LoadInt64(&f.countRemaining))
}

func (f *flowController) recordOutstandingMessages(ctx context.Context, n int64) {
if f.purpose == flowControllerPurposeTopic {
recordStat(ctx, PublisherOutstandingMessages, n)
return
}

recordStat(ctx, OutstandingMessages, n)
}

func (f *flowController) recordOutstandingBytes(ctx context.Context, n int64) {
if f.purpose == flowControllerPurposeTopic {
recordStat(ctx, PublisherOutstandingBytes, n)
return
}

recordStat(ctx, OutstandingBytes, n)
}
2 changes: 1 addition & 1 deletion pubsub/subscription.go
Expand Up @@ -1032,7 +1032,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
maxOutstandingBytes: maxBytes,
useLegacyFlowControl: s.ReceiveSettings.UseLegacyFlowControl,
}
fc := newFlowController(FlowControlSettings{
fc := newSubscriptionFlowController(FlowControlSettings{
MaxOutstandingMessages: maxCount,
MaxOutstandingBytes: maxBytes,
LimitExceededBehavior: FlowControlBlock,
Expand Down
9 changes: 7 additions & 2 deletions pubsub/topic.go
Expand Up @@ -529,6 +529,11 @@ type PublishResult = ipubsub.PublishResult
// need to be stopped by calling t.Stop(). Once stopped, future calls to Publish
// will immediately return a PublishResult with an error.
func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult {
ctx, err := tag.New(ctx, tag.Insert(keyStatus, "OK"), tag.Upsert(keyTopic, t.name))
if err != nil {
log.Printf("pubsub: cannot create context with tag in Publish: %v", err)
}

r := ipubsub.NewPublishResult()
if !t.EnableMessageOrdering && msg.OrderingKey != "" {
ipubsub.SetPublishResult(r, "", errors.New("Topic.EnableMessageOrdering=false, but an OrderingKey was set in Message. Please remove the OrderingKey or turn on Topic.EnableMessageOrdering"))
Expand Down Expand Up @@ -557,7 +562,7 @@ func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult {
ipubsub.SetPublishResult(r, "", err)
return r
}
err := t.scheduler.Add(msg.OrderingKey, &bundledMessage{msg, r, msgSize}, msgSize)
err = t.scheduler.Add(msg.OrderingKey, &bundledMessage{msg, r, msgSize}, msgSize)
if err != nil {
fmt.Printf("got err: %v\n", err)
t.scheduler.Pause(msg.OrderingKey)
Expand Down Expand Up @@ -651,7 +656,7 @@ func (t *Topic) initBundler() {
fcs.MaxOutstandingMessages = t.PublishSettings.FlowControlSettings.MaxOutstandingMessages
}

t.flowController = newFlowController(fcs)
t.flowController = newTopicFlowController(fcs)

bufferedByteLimit := DefaultPublishSettings.BufferedByteLimit
if t.PublishSettings.BufferedByteLimit > 0 {
Expand Down
20 changes: 20 additions & 0 deletions pubsub/trace.go
Expand Up @@ -92,6 +92,14 @@ var (
// OutstandingBytes is a measure of the number of bytes all outstanding messages held by the client take up.
// It is EXPERIMENTAL and subject to change or removal without notice.
OutstandingBytes = stats.Int64(statsPrefix+"outstanding_bytes", "Number of outstanding bytes", stats.UnitDimensionless)

// PublisherOutstandingMessages is a measure of the number of published outstanding messages held by the client before they are processed.
// It is EXPERIMENTAL and subject to change or removal without notice.
PublisherOutstandingMessages = stats.Int64(statsPrefix+"publisher_outstanding_messages", "Number of outstanding publish messages", stats.UnitDimensionless)

// PublisherOutstandingBytes is a measure of the number of bytes all outstanding publish messages held by the client take up.
// It is EXPERIMENTAL and subject to change or removal without notice.
PublisherOutstandingBytes = stats.Int64(statsPrefix+"publisher_outstanding_bytes", "Number of outstanding publish bytes", stats.UnitDimensionless)
)

var (
Expand Down Expand Up @@ -146,11 +154,21 @@ var (
// OutstandingBytesView is the last value of OutstandingBytes
// It is EXPERIMENTAL and subject to change or removal without notice.
OutstandingBytesView *view.View

// PublisherOutstandingMessagesView is the last value of OutstandingMessages
// It is EXPERIMENTAL and subject to change or removal without notice.
PublisherOutstandingMessagesView *view.View

// PublisherOutstandingBytesView is the last value of OutstandingBytes
// It is EXPERIMENTAL and subject to change or removal without notice.
PublisherOutstandingBytesView *view.View
)

func init() {
PublishedMessagesView = createCountView(stats.Measure(PublishedMessages), keyTopic, keyStatus, keyError)
PublishLatencyView = createDistView(PublishLatency, keyTopic, keyStatus, keyError)
PublisherOutstandingMessagesView = createLastValueView(PublisherOutstandingMessages, keyTopic)
PublisherOutstandingBytesView = createLastValueView(PublisherOutstandingBytes, keyTopic)
PullCountView = createCountView(PullCount, keySubscription)
AckCountView = createCountView(AckCount, keySubscription)
NackCountView = createCountView(NackCount, keySubscription)
Expand All @@ -166,6 +184,8 @@ func init() {
DefaultPublishViews = []*view.View{
PublishedMessagesView,
PublishLatencyView,
PublisherOutstandingMessagesView,
PublisherOutstandingBytesView,
}

DefaultSubscribeViews = []*view.View{
Expand Down