Skip to content

Commit

Permalink
Merge pull request DataDog#2 from remicalixte/v1.30.1-patched+telemetry
Browse files Browse the repository at this point in the history
measure input channel fullness
  • Loading branch information
remicalixte committed Jan 18, 2023
2 parents 5a3d94c + b4d9d5f commit 1d57ef5
Showing 1 changed file with 11 additions and 0 deletions.
11 changes: 11 additions & 0 deletions async_producer.go
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/eapache/go-resiliency/breaker"
"github.com/eapache/queue"
"github.com/rcrowley/go-metrics"
)

// AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages
Expand Down Expand Up @@ -393,6 +394,8 @@ type topicProducer struct {
breaker *breaker.Breaker
handlers map[int32]chan<- *ProducerMessage
partitioner Partitioner

inputChanHistogram metrics.Histogram
}

func (p *asyncProducer) newTopicProducer(topic string) chan<- *ProducerMessage {
Expand All @@ -404,13 +407,16 @@ func (p *asyncProducer) newTopicProducer(topic string) chan<- *ProducerMessage {
breaker: breaker.New(3, 1, 10*time.Second),
handlers: make(map[int32]chan<- *ProducerMessage),
partitioner: p.conf.Producer.Partitioner(topic),

inputChanHistogram: getOrRegisterTopicHistogram("topic-producer-input-chan", topic, p.conf.MetricRegistry),
}
go withRecover(tp.dispatch)
return input
}

func (tp *topicProducer) dispatch() {
for msg := range tp.input {
tp.inputChanHistogram.Update(int64((len(tp.input) * 100) / cap(tp.input)))
if msg.retries == 0 {
if err := tp.partitionMessage(msg); err != nil {
tp.parent.returnError(msg, err)
Expand Down Expand Up @@ -492,6 +498,8 @@ type partitionProducer struct {
// therefore whether our buffer is complete and safe to flush)
highWatermark int
retryState []partitionRetryState

inputChanHistogram metrics.Histogram
}

type partitionRetryState struct {
Expand All @@ -509,6 +517,8 @@ func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan

breaker: breaker.New(3, 1, 10*time.Second),
retryState: make([]partitionRetryState, p.conf.Producer.Retry.Max+1),

inputChanHistogram: getOrRegisterTopicHistogram("partition-producer-input-chan", topic, p.conf.MetricRegistry),
}
go withRecover(pp.dispatch)
return input
Expand Down Expand Up @@ -544,6 +554,7 @@ func (pp *partitionProducer) dispatch() {
}()

for msg := range pp.input {
pp.inputChanHistogram.Update(int64((len(pp.input) * 100) / cap(pp.input)))
if pp.brokerProducer != nil && pp.brokerProducer.abandoned != nil {
select {
case <-pp.brokerProducer.abandoned:
Expand Down

0 comments on commit 1d57ef5

Please sign in to comment.