diff --git a/pubsub/iterator.go b/pubsub/iterator.go index 012c9f455c1..c8c2f8099af 100644 --- a/pubsub/iterator.go +++ b/pubsub/iterator.go @@ -161,8 +161,8 @@ func (it *messageIterator) checkDrained() { // min/maxDurationPerLeaseExtension. func (it *messageIterator) addToDistribution(receiveTime time.Time) { d := time.Since(receiveTime) - d = minDuration(d, minDurationPerLeaseExtension) - d = maxDuration(d, maxDurationPerLeaseExtension) + d = maxDuration(d, minDurationPerLeaseExtension) + d = minDuration(d, maxDurationPerLeaseExtension) it.ackTimeDist.Record(int(d / time.Second)) } diff --git a/pubsub/iterator_test.go b/pubsub/iterator_test.go index 09cd18526c2..9f2aec19d35 100644 --- a/pubsub/iterator_test.go +++ b/pubsub/iterator_test.go @@ -496,3 +496,44 @@ func TestIterator_BoundedDuration(t *testing.T) { }) } } + +func TestAddToDistribution(t *testing.T) { + srv := pstest.NewServer() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + srv.Publish(fullyQualifiedTopicName, []byte("creating a topic"), nil) + + _, client, err := initConn(ctx, srv.Addr) + if err != nil { + t.Fatal(err) + } + iter := newMessageIterator(client.subc, fullyQualifiedTopicName, &pullOptions{}) + + // Start with a datapoint that's too small that should be bounded to 10s. + receiveTime := time.Now().Add(time.Duration(-1) * time.Second) + iter.addToDistribution(receiveTime) + deadline := iter.ackTimeDist.Percentile(.99) + want := 10 + if deadline != want { + t.Errorf("99th percentile ack distribution got: %v, want %v", deadline, want) + } + + // The next datapoint should not be bounded. + receiveTime = time.Now().Add(time.Duration(-300) * time.Second) + iter.addToDistribution(receiveTime) + deadline = iter.ackTimeDist.Percentile(.99) + want = 300 + if deadline != want { + t.Errorf("99th percentile ack distribution got: %v, want %v", deadline, want) + } + + // Lastly, add a datapoint that should be bounded to 600s + receiveTime = time.Now().Add(time.Duration(-1000) * time.Second) + iter.addToDistribution(receiveTime) + deadline = iter.ackTimeDist.Percentile(.99) + want = 600 + if deadline != want { + t.Errorf("99th percentile ack distribution got: %v, want %v", deadline, want) + } +}