Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix zstd decoder leak #543

Merged
merged 10 commits into from Oct 21, 2020
Merged

fix zstd decoder leak #543

merged 10 commits into from Oct 21, 2020

Conversation

achille-roussel
Copy link
Contributor

This PR aims to resolve a memory leak that would occur under some circumstances when reading from partitions containing batches compressed with zstd.

The zstd encoders and decoders are retained by the goroutines they start internally, which causes their finalizers to never go called. The solution implemented here consists in adding a layer of indirection by wrapping the encoders and decoders with an intermediary object on which we install the finalizers.

Related issue klauspost/compress#264

Copy link
Contributor

@nlsun nlsun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haven't looked at the test code yet

compress/zstd/zstd.go Outdated Show resolved Hide resolved
}
err := w.enc.Close()
w.c.encPool.Put(w.enc)
w.c = nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't set this nil anymore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's accessory, we're not making decisions based on this field so setting it to nil or not.

w.c.encPool.Put(w.enc)
w.c = nil
w.enc = nil
w.err = io.ErrClosedPipe
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how come we store err instead of io.ErrClosedPipe now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I missed this, I'll fix.

}
})

t.Run("decode with "+codec.Name(), func(t *testing.T) {
if r1 == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is r1 only nil if the first test fails? in that case does the t.Fatal cover this already?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

r1 may be nil if the first test is not run, which is what this check is intended to cover (for example if -run is used in the go test invocation).

func makeTopic() string {
return fmt.Sprintf("kafka-go-%016x", rand.Int63())
}

func createTopic(t *testing.T, partitions int) string {
func newLocalClientAndTopic() (*kafka.Client, string, func()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems to mostly be a copy of what's in client_test.go, would it help to move this to the testing package too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately it would result in a cycle between the top level kafka package and kafka/testing, we'd have to refactor the tests to all be in the kafka_test package for that to work.

Achille and others added 2 commits October 20, 2020 20:51
Co-authored-by: Nicholas Sun <olassun2@gmail.com>
@achille-roussel
Copy link
Contributor Author

Thanks for the reviews @nlsun !

@achille-roussel achille-roussel merged commit e6b8599 into 0.4 Oct 21, 2020
@achille-roussel achille-roussel deleted the fix-zstd-decoder-leak branch October 21, 2020 23:54
achille-roussel pushed a commit that referenced this pull request Dec 4, 2020
* add protocol package

* add some documentation

* fix

* make ByteSequence more generic + add more benchmarks

* WIP: add support for record batches

* finish support for record batches

* add support for recort set compression

* backward-compatible compression codec imports

* fix compress tests

* make it possible for the transport to connect to multiple clusters + enhance kafka.Client to expose methods for creating and deleting topics

* support responding to metadata requests with cached response

* manage proper shutdown of client transport in tests

* WIP: test Produce API

* WIP: massive cleanup + track down CRC32 validation issue

* functional Produce and Fetch implementations

* add metadata request/response

* add listoffsets API

* expose cluster id and controller in metadata response

* remove bufio.Writer from the protocol API

* remove bufio.Reader from the protocol API

* add back deprecated Client methods

* fixes for kafka 0.10

* cleanup comment in protocol/record.go

* add more comments

* reduce size of bufio.Reader buffer on kafka connections

* refactor transport internals to support splitting requests and dispatching them across multiple brokers

* avoid contention on connection pool mutex in most cases

* cleanup

* add kafka.(*Client).MultiFetch API

* close records in produce request

* refactor record batch APIs to fully support streaming

* remove io.Closer from protocol.RecordBatch

* never return nil record batches

* record batch fixes

* remove unused variable

* fix reading of multiple topic partitions in produce and fetch messages

* alias compress.Compression in the kafka package

* expose compression constants in the kafka package

* exposes kafka.Request and kafka.Response interfaces

* simplify the protocol.Bytes interface

* simplify error management in protocol package

* wait for topic creation to propagate + fix request dispatching in multi-broker clusters

* simplify kafka.(*Client).CreateTopics API

* improve error handling + wait for metadata propagation after topic creation

* revisit connection pool implementation to remove multiplexing

* fix panic when referencing truncated page buffer

* fix unexpected EOF errors reading kafka messages

* revisit record reader API

* fix panic type asserting nil response into *metadata.Response

* optimize allocation of broker ids in cluster metadata

* unify sync.Pool usage

* reduce memory footprint of protocol.(*RecordSet).readFromVersion2

* fix panic accessing optimized record reader with a nil headers slice

* add APIs for marshaling and unmarshaling kafka values

* [skip ci] fix README example

* investigate-multi-fetch-issues

* remove MultiFetch API

* simplify protocol tests

* add benchmarks for kafka.Marshal and kafka.Unmarshal

* fix crash on cluster layout changes

* add more error codes

* remove partial support for flexible message format

* downgrade metadata test from v9 to v8

* test against kafka 2.5.0

* Update offsetfetch.go

Co-authored-by: Jeremy Jackins <jeremyjackins@gmail.com>

* Update offsetfetch.go

Co-authored-by: Jeremy Jackins <jeremyjackins@gmail.com>

* Update offsetfetch.go

Co-authored-by: Jeremy Jackins <jeremyjackins@gmail.com>

* fix typos

* fix more typos

* set pprof labels on transport goroutines (#458)

* change tests to run against 2.4.1 instead of 2.5.0

* support up to 2.3.1 (TestConn/nettest/PingPong fails with 2.4 and above)

* Update README.md

Co-authored-by: Steve van Loben Sels <steve@segment.com>

* Update client.go

Co-authored-by: Steve van Loben Sels <steve@segment.com>

* comment on why we devide the timeout by 2

* protocol.Reducer => protocol.Merger

* cleanup docker-compose.yml

* protocol.Mapper => protocol.Splitter

* propagate the caller's context to the dial function (#460)

* fix backward compatiblity with kafka-go v0.3.x

* fix record offsets when fetching messages with version 1

* default record timestamps to current timestamp

* revert changes to docker-compose.yml

* fix tests

* fix tests (2)

* 0.4: kafka.Writer (#461)

* 0.4: kafka.Writer

* update README

* disable some parallel tests

* disable global parallelism in tests

* fix typo

* disable parallelism in sub-packages tests

* properly seed random sources + delete test topics

* cleanup build

* run all tests

* fix tests

* enable more SASL mechanisms on CI

* try to fix the CI config

* try testing the sasl package with 2.3.1 only

* inline configuration for kafka 2.3.1 in CI

* fix zookeeper hostname in CI

* cleanup CI config

* keep the kafka 0.10 configuration separate + test against more kafka versions

* fix kafka 0.11 image tag

* try caching dependencies

* support multiple broker addresses

* uncomment max attempt test

* fix typos

* guard against empty kafka.MultiAddr in kafka.Transport

* don't export new APIs for network addresses + adapt to any multi-addr implementation

* add comment about the transport caching the metadata responses

* 0.4 fix tls address panic (#478)

* 0.4: fix panic when TLS is enabled

* 0.4: fix panic when establishing TLS connections

* cleanup

* Update transport_test.go

Co-authored-by: Steve van Loben Sels <steve@segment.com>

* validate that an error is returned

Co-authored-by: Steve van Loben Sels <steve@segment.com>

* 0.4: fix short writes (#479)

* 0.4: modify protocol.Bytes to expose the number of remaining bytes instead of the full size of the sequence (#485)

* modify protocol.Bytes to expose the number of remaining bytes instead of the full size of the sequence

* add test for pageRef.ReadByte + fix pageRef.scan

* reuse contiguousPages.scan

* fix(writer): set correct balancer (#489)

Sets the correct balancer as passed through in the config on the writer

Co-authored-by: Steve van Loben Sels <steve@segment.com>
Co-authored-by: Artur <artur.kronenberg@askattest.com>

* Fix for panic when RequiredAcks is set to RequireNone (#504)

* Fix panic in async wait() method when RequiredAcks is None

When RequiredAcks is None, the producer does not wait for a
response from the broker, therefore the response is nil.
The async wait() method was not handling this case, leading
to a panic.

* Add regression test for RequiredAcks == RequireNone

This new test is required because all the other Writer tests use
NewWriter() to create Writers, which sets RequiredAcks to
RequireAll when 0 (None) was specified.

* fix: writer test for RequiredAcks=None

* fix: writer tests for RequiredAcks=None (2)

* 0.4 broker resolver (#526)

* 0.4: kafka.BrokerResolver

* add kafka.Transport.Context

* inline network and address fields in conn type

* Fix sasl authentication on writer (#541)

The authenticateSASL was called before getting api version.
This resulted incorrect apiversion (0 instead of 1) when
calling saslHandshakeRoundTrip request

* Remove deprecated function (NewWriter) usages (#528)

* fix zstd decoder leak (#543)

* fix zstd decoder leak

* fix tests

* fix panic

* fix tests (2)

* fix tests (3)

* fix tests (4)

* move ConnWaitGroup to testing package

* fix zstd codec

* Update compress/zstd/zstd.go

Co-authored-by: Nicholas Sun <olassun2@gmail.com>

* PR feedback

Co-authored-by: Nicholas Sun <olassun2@gmail.com>

* improve custom resolver support by allowing port to be overridden (#545)

* 0.4: reduce memory footprint (#547)

* Bring over flexible message changes

* Add docker-compose config for kafka 2.4.1

* Misc. cleanups

* Add protocol tests and fix issues

* Misc. fixes; run circleci on v2.4.1

* Skip conntest for v2.4.1

* Disable nettests for kafka 2.4.1

* Revert formatting changes

* Misc. fixes

* Update comments

* Make create topics test more interesting

* feat(writer): add support for writing messages to multiple topics (#561)

* Add comments on failing nettests

* Fix spacing

* Update var int sizing

* Simplify writeVarInt implementation

* Revert encoding change

* Simplify varint encoding functions and expand tests

* Also test sizeOf functions in protocol test

* chore: merge master and resolve conflicts (#570)

Co-authored-by: Jeremy Jackins <jeremyjackins@gmail.com>
Co-authored-by: Steve van Loben Sels <steve@segment.com>
Co-authored-by: Artur <artur.kronenberg@askattest.com>
Co-authored-by: Neil Cook <neil.cook@noware.co.uk>
Co-authored-by: Ahmy Yulrizka <yulrizka@users.noreply.github.com>
Co-authored-by: Turfa Auliarachman <t.auliarachman@gmail.com>
Co-authored-by: Nicholas Sun <olassun2@gmail.com>
Co-authored-by: Dominic Barnes <dominic@segment.com>
Co-authored-by: Benjamin Yolken <benjamin.yolken@segment.com>
Co-authored-by: Benjamin Yolken <54862872+yolken-segment@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants