diff --git a/async_producer.go b/async_producer.go index ea1cb3f5e..5c23ac775 100644 --- a/async_producer.go +++ b/async_producer.go @@ -891,7 +891,7 @@ type brokerProducer struct { abandoned chan struct{} buffer *produceSet - timer <-chan time.Time + timer *time.Timer timerFired bool closing error @@ -900,6 +900,7 @@ type brokerProducer struct { func (bp *brokerProducer) run() { var output chan<- *produceSet + var timerChan <-chan time.Time Logger.Printf("producer/broker/%d starting up\n", bp.broker.ID()) for { @@ -969,12 +970,14 @@ func (bp *brokerProducer) run() { } if bp.parent.conf.Producer.Flush.Frequency > 0 && bp.timer == nil { - bp.timer = time.After(bp.parent.conf.Producer.Flush.Frequency) + bp.timer = time.NewTimer(bp.parent.conf.Producer.Flush.Frequency) + timerChan = bp.timer.C } - case <-bp.timer: + case <-timerChan: bp.timerFired = true case output <- bp.buffer: bp.rollOver() + timerChan = nil case response, ok := <-bp.responses: if ok { bp.handleResponse(response) @@ -1034,6 +1037,9 @@ func (bp *brokerProducer) waitForSpace(msg *ProducerMessage, forceRollover bool) } func (bp *brokerProducer) rollOver() { + if bp.timer != nil { + bp.timer.Stop() + } bp.timer = nil bp.timerFired = false bp.buffer = newProduceSet(bp.parent)