From 016248636579acb45d906610468f688c49721e0f Mon Sep 17 00:00:00 2001 From: Jacob <44193366+Jacob-bzx@users.noreply.github.com> Date: Tue, 4 Oct 2022 16:12:56 +0800 Subject: [PATCH] fix(producer): replace time.After with time.Timer to avoid high memory usage (#2355) --- async_producer.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/async_producer.go b/async_producer.go index ea1cb3f5e..5c23ac775 100644 --- a/async_producer.go +++ b/async_producer.go @@ -891,7 +891,7 @@ type brokerProducer struct { abandoned chan struct{} buffer *produceSet - timer <-chan time.Time + timer *time.Timer timerFired bool closing error @@ -900,6 +900,7 @@ type brokerProducer struct { func (bp *brokerProducer) run() { var output chan<- *produceSet + var timerChan <-chan time.Time Logger.Printf("producer/broker/%d starting up\n", bp.broker.ID()) for { @@ -969,12 +970,14 @@ func (bp *brokerProducer) run() { } if bp.parent.conf.Producer.Flush.Frequency > 0 && bp.timer == nil { - bp.timer = time.After(bp.parent.conf.Producer.Flush.Frequency) + bp.timer = time.NewTimer(bp.parent.conf.Producer.Flush.Frequency) + timerChan = bp.timer.C } - case <-bp.timer: + case <-timerChan: bp.timerFired = true case output <- bp.buffer: bp.rollOver() + timerChan = nil case response, ok := <-bp.responses: if ok { bp.handleResponse(response) @@ -1034,6 +1037,9 @@ func (bp *brokerProducer) waitForSpace(msg *ProducerMessage, forceRollover bool) } func (bp *brokerProducer) rollOver() { + if bp.timer != nil { + bp.timer.Stop() + } bp.timer = nil bp.timerFired = false bp.buffer = newProduceSet(bp.parent)