From 6c470ff02072d7af32ee07a772c5d0796b545a45 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Fri, 3 Jun 2022 13:03:53 -0700 Subject: [PATCH] fix(pubsub): fix iterator distribution bound calculations (#6125) * fix iterator distribution bounds * add comments to test * run gofmt --- pubsub/iterator.go | 4 ++-- pubsub/iterator_test.go | 41 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 43 insertions(+), 2 deletions(-) diff --git a/pubsub/iterator.go b/pubsub/iterator.go index 012c9f455c1..c8c2f8099af 100644 --- a/pubsub/iterator.go +++ b/pubsub/iterator.go @@ -161,8 +161,8 @@ func (it *messageIterator) checkDrained() { // min/maxDurationPerLeaseExtension. func (it *messageIterator) addToDistribution(receiveTime time.Time) { d := time.Since(receiveTime) - d = minDuration(d, minDurationPerLeaseExtension) - d = maxDuration(d, maxDurationPerLeaseExtension) + d = maxDuration(d, minDurationPerLeaseExtension) + d = minDuration(d, maxDurationPerLeaseExtension) it.ackTimeDist.Record(int(d / time.Second)) } diff --git a/pubsub/iterator_test.go b/pubsub/iterator_test.go index 09cd18526c2..9f2aec19d35 100644 --- a/pubsub/iterator_test.go +++ b/pubsub/iterator_test.go @@ -496,3 +496,44 @@ func TestIterator_BoundedDuration(t *testing.T) { }) } } + +func TestAddToDistribution(t *testing.T) { + srv := pstest.NewServer() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + srv.Publish(fullyQualifiedTopicName, []byte("creating a topic"), nil) + + _, client, err := initConn(ctx, srv.Addr) + if err != nil { + t.Fatal(err) + } + iter := newMessageIterator(client.subc, fullyQualifiedTopicName, &pullOptions{}) + + // Start with a datapoint that's too small that should be bounded to 10s. + receiveTime := time.Now().Add(time.Duration(-1) * time.Second) + iter.addToDistribution(receiveTime) + deadline := iter.ackTimeDist.Percentile(.99) + want := 10 + if deadline != want { + t.Errorf("99th percentile ack distribution got: %v, want %v", deadline, want) + } + + // The next datapoint should not be bounded. + receiveTime = time.Now().Add(time.Duration(-300) * time.Second) + iter.addToDistribution(receiveTime) + deadline = iter.ackTimeDist.Percentile(.99) + want = 300 + if deadline != want { + t.Errorf("99th percentile ack distribution got: %v, want %v", deadline, want) + } + + // Lastly, add a datapoint that should be bounded to 600s + receiveTime = time.Now().Add(time.Duration(-1000) * time.Second) + iter.addToDistribution(receiveTime) + deadline = iter.ackTimeDist.Percentile(.99) + want = 600 + if deadline != want { + t.Errorf("99th percentile ack distribution got: %v, want %v", deadline, want) + } +}