Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(producer): replace time.After with time.Timer to avoid high memory usage #2355

Merged
merged 1 commit into from Oct 4, 2022
Merged

fix(producer): replace time.After with time.Timer to avoid high memory usage #2355

merged 1 commit into from Oct 4, 2022

Conversation

napallday
Copy link
Contributor

Sarama allows several options to control how often to flush a batch of messages to brokers. It utilizes time.After to control the frequency of a flush. However, it would cause high memory usage in some cases.

I've written the following example with Flush.Messages = 1 and Flush.Frequency = time.Second * 60 to demonstrate it. In practice, the situation could be even worse since there could be multiple broker producers in one producer and multiple producers in one instance/machine.

func TestProducerMemory(t *testing.T) {
	var mem runtime.MemStats
	topic := "mem-test-topic"
	config := NewConfig()
	config.Producer.Flush.Messages = 1
	config.Producer.Flush.Frequency = time.Second * 60
	go func() {
		for {
			runtime.ReadMemStats(&mem)
			runtime.GC()
			t.Logf("heap memory: %dMB", mem.HeapAlloc/1000/1000)
			time.Sleep(time.Second * 5)
		}
	}()

	p, err := NewAsyncProducer([]string{"127.0.0.1:9094"}, config)
	assert.NoError(t, err)
	for {
		p.Input() <- &ProducerMessage{
			Topic: topic,
			Value: StringEncoder("test"),
		}
	}
}

Since the message flushes very quickly, a new timer will be created every time in time.After, which won't be garbage collected until expiration, thus causing high memory usage.

logs before optimization

=== RUN   TestProducerMemory
    async_producer_test.go:2279: heap memory: 0MB
    async_producer_test.go:2279: heap memory: 9MB
    async_producer_test.go:2279: heap memory: 36MB
    async_producer_test.go:2279: heap memory: 30MB
    async_producer_test.go:2279: heap memory: 62MB
    async_producer_test.go:2279: heap memory: 80MB
    async_producer_test.go:2279: heap memory: 94MB
    async_producer_test.go:2279: heap memory: 78MB
    async_producer_test.go:2279: heap memory: 130MB
    async_producer_test.go:2279: heap memory: 139MB
    async_producer_test.go:2279: heap memory: 116MB
    async_producer_test.go:2279: heap memory: 138MB
    async_producer_test.go:2279: heap memory: 133MB
    async_producer_test.go:2279: heap memory: 202MB
    async_producer_test.go:2279: heap memory: 111MB
    async_producer_test.go:2279: heap memory: 135MB
    async_producer_test.go:2279: heap memory: 146MB

Following the guideline in https://pkg.go.dev/time#After, I simply replace time.After with time.NewTimer and manually stop timer every time when calling bp.rollover. And as shown in the below logs, the memory usage decreses a lot as expected.

logs after optimization

=== RUN   TestProducerMemory
    async_producer_test.go:2279: heap memory: 0MB
    async_producer_test.go:2279: heap memory: 1MB
    async_producer_test.go:2279: heap memory: 0MB
    async_producer_test.go:2279: heap memory: 0MB
    async_producer_test.go:2279: heap memory: 1MB
    async_producer_test.go:2279: heap memory: 1MB
    async_producer_test.go:2279: heap memory: 1MB
    async_producer_test.go:2279: heap memory: 1MB
    async_producer_test.go:2279: heap memory: 1MB
    async_producer_test.go:2279: heap memory: 0MB

@dnwe dnwe added the fix label Oct 4, 2022
@dnwe dnwe merged commit 0162486 into IBM:main Oct 4, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants