Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

zstd: add no-goroutine option to encoding #264

Closed
twmb opened this issue Jun 1, 2020 · 22 comments · Fixed by #505
Closed

zstd: add no-goroutine option to encoding #264

twmb opened this issue Jun 1, 2020 · 22 comments · Fixed by #505

Comments

@twmb
Copy link

twmb commented Jun 1, 2020

To start, I appreciate that the concurrency allows for greater speed, it's great!

Backstory

I have a project that manages different types of encoding options for thousands of configs. As a rule, when compressing data for an individual config, we only want to compress serially; we have thousands of other concurrent compressors and we do not want one utilizing more CPU than necessary.

Our current use of the zstd encoder is to create one per config and use the WithEncoderConcurrency(1) option. This works for the most part, but we now have to be careful about properly closing the zstd compressor in the face of write errors. For other compressors, we can just drop the writer and have it be garbage collected with no dangling resources. For the zstd compressor, unless things are closed properly, goroutines leak.

An alternative option for me would be to use a global zstd encoder with the default encoder concurrency and just never close it. I'm not much of a fan of this approach, though, since with poor scheduling, some goroutines could sit in the compressing zstd goroutines for longer than they need to and block other configs that need to be zstd compressing. It's likely not a large risk, but it's one I'm concerned about.

Feature Request

I think it'd be both convenient and faster if there was an option to not spawn goroutines for encoding. Convenient in that when I am done with the compressor, I can just drop it. This would also make the zstd encoder an option to use in sync.Pool's, where it is not an option today. Faster in that there will not be goroutine synchronization and message passing overhead, especially so since I know I'm always encoding serially per encoder.

@klauspost
Copy link
Owner

How much data are you on average compressing per use? It sounds like you should just use a single instance and EncodeAll if you have small payloads.

Alternatively you can have a var encs chan *zstd.Encoder that contains the maximum number of concurrent encoders you need. When you need one, do e := <- encs. When done do encs <- e.

properly closing the zstd compressor in the face of write errors.

If you are not reusing, defer solves that problem. If you reuse you can still do that even after an error.

we do not want one utilizing more CPU than necessary

That is in general a wrong assumption. You want your encodes to complete as fast as possible.

If you encode LESS than your number of processors concurrently it will encode that much faster.

If you encode MORE than the number of processors, the concurrent and single-threaded performance will be the SAME since your single-threaded encodes still will have to fight for CPU.

So you gain nothing by doing it singlethreaded.

The exception is when doing very small encodes. When using the writer it will automatically switch to single-threaded if your encode is <16KB and EncodeAll always just uses a single thread.

That said currently streams only use 2 goroutines one for doing the compression and one for encoding the output. That may however change in the future.

@peterbourgon
Copy link

we do not want one utilizing more CPU than necessary

That is in general a wrong assumption. You want your encodes to complete as fast as possible.

😶

@peterbourgon
Copy link

peterbourgon commented Jun 1, 2020

As a user, I am capable of creating goroutines to make synchronous APIs asynchronous. It is much more difficult for me to do the reverse, and deterministically manage goroutines created by APIs. It would be a lot more user-friendly if this package left concurrency to the caller, and avoided creating goroutines altogether. (It would also remove the package's no. 1 Gotcha, which is correctly managing Close.) Is there a technical reason this isn't possible?

@klauspost
Copy link
Owner

I am capable of creating goroutines to make synchronous

@peterbourgon No, in this case you are not. You cannot speed up concurrent encoding unless you implement all parts of the encoding process as separate steps, which will just leave where you are.

It would also remove the package's no. 1 Gotcha

It is not only about goroutines (which is only really an issue when NOT doing streaming encodes), but also to make re-use a priority. There are several larger buffers required and doing create+throw away is very inefficient and I don't want to encourage that.

I don't want conditional behavior, where you sometimes have to do something or several APIs for the same functionality. If you do New you must Close, always. Instead of managing goroutines, manage that - it is not trivial, but also not hard.

I don't know if your smiley attempts to convey some meaning. If you have anything constructive to say to that, just say it. Until then I will ignore it.

@twmb
Copy link
Author

twmb commented Jun 1, 2020

How much data are you on average compressing per use? It sounds like you should just use a single instance and EncodeAll if you have small payloads.

Alternatively you can have a var encs chan *zstd.Encoder that contains the maximum number of concurrent encoders you need. When you need one, do e := <- encs. When done do encs <- e.

The use case is mixed. For some configs, we do long streams to upload data. For others, the batches are small and we do EncodeAll (although the batches are not quite <16K small). In both use cases, right now we use EncoderConcurrency(1).

properly closing the zstd compressor in the face of write errors.

If you are not reusing, defer solves that problem. If you reuse you can still do that even after an error.

we do not want one utilizing more CPU than necessary

That is in general a wrong assumption. You want your encodes to complete as fast as possible.

If you encode LESS than your number of processors concurrently it will encode that much faster.

If you encode MORE than the number of processors, the concurrent and single-threaded performance will be the SAME since your single-threaded encodes still will have to fight for CPU.

So you gain nothing by doing it singlethreaded.

I'm a bit mixed on wanting things to complete as fast as possible. I agree that generally that's a good principle in a system where you have your own program compressing your own data (although, in the past, I've had a program DoS itself by pigzing so hard it consumed all resources and another thread could not heartbeat). However, in a multi-tenant system, I don't want one compressor utilizing more than its fair share of cpu resources. The thousands of configs are analogous to a multi-tenant system here, and I can't set per-goroutine limits.

If I have a zstd.Encoder that is universally shared using GOMAXPROCS goroutines, if 48 configs begin compressing and have their goroutines swapped out, and another 48 want to compress, then those 48 will be blocked on even being able to begin compression. To avoid one config affecting others, we give each its unique zstd encoder.

With each of these encoders, while we could theoretically give them all GOMAXPROCS goroutines, that doesn't really make much sense. Why allow for 48 concurrent goroutines compressing one stream of data when we have ~10K other streams of data to also concurrently worry about?

It is not only about goroutines (which is only really an issue when NOT doing streaming encodes), but also to make re-use a priority. There are several larger buffers required and doing create+throw away is very inefficient and I don't want to encourage that.

I did consider that other buffers could be shared. The only real problem, and it's not that large of a problem, is that goroutines make the pattern of reuse just a little bit trickier. For gzip encoders, I can just use a sync.Pool and reset on Get. This allows me to dynamically scale up and down the number of encoders I am using. With the zstd encoder, there is no dynamic scaling up or down because of the goroutines. I can't just discard whenever convenient.

I'd find it easier to reason about using one encoder at a time and dynamically scaling up as necessary, rather than one encoder globally with a fixed about of goroutines.

Again this isn't the largest problem in the world, but I do believe it would make some things easier to reason about and code around.

Out of curiosity, what do C libraries do to avoid these concurrency issues?

@peterbourgon
Copy link

If you do New you must Close, always.

That's fine, but Close prevents re-use, which is the problem.

@klauspost
Copy link
Owner

@peterbourgon No, it doesn't. It prevents use of sync.Map, but you can reuse many other ways. But if you have something to discuss feel free to open an issue, unless it is something relevant to this topic.

@achille-roussel
Copy link

achille-roussel commented Jun 11, 2020

I was going to open an issue on this topic as well, I'm glad there's already a discussion going on :)

One thing that the goroutines make harder is profiling CPU and memory usage, because the stack traces of the zstd encoder and decoder are not continued from the program.

A practical example of where this has made it harder for us was in stream processors that were reading from and writing to multiple kafka topics, all resource utilized by the encoders and decoders are merged into the same stack traces, making it impossible to determine the performance trade off of compression on different streams.

One way to solve this would be to have the zstd package forward pprof labels in the goroutines it starts, but this also breaks if the encoders and decoders are reused (via a sync.Pool for example).

@twmb

This would also make the zstd encoder an option to use in sync.Pool's, where it is not an option today.

This has bitten us in kafka-go actually, it caused memory leaks when decoders that were maintained in a sync.Pool were being garbage collected, but the goroutines they had started were not stopped and hung forever. We worked around it using a finalizer to close the decoders when the objects are released from the pool, but it's definitely not the most elegant of solutions:
https://github.com/segmentio/kafka-go/blob/master/zstd/zstd.go#L39-L47

We put the same in place for the encoders, however I don't know if it's effective because it appears some of the internal goroutines may be retaining pointers to the encoder's state (we haven't seen leaks so far tho 🤞 ).

@klauspost
I feel like there would be little downside to not spawn goroutines when concurrency is set to 1? (maybe concerns about increasing code complexity? but it seems it's already done in EncodeAll in some cases).

Would you accept a PR to modify the implementation to avoid spawning goroutines when concurrency is 1?

@klauspost
Copy link
Owner

@achille-roussel It is no problem, it just needs to be done. The main stream decoder is built around being async to it can decode ahead of whatever is reading from it. So it is not a trivial change.

A PR would be welcome. Please make it a separate decoder option. There will be a significant performance regression compared to 1 goroutine since reads will be blocking much more with a sync decoder.

@twmb
Copy link
Author

twmb commented Jun 12, 2020

Rather than a separate option, would it make more sense to have a separate two types (SerialEncoder and SerialDecoder)? This would address your comment above about not wanting special case behavior where encoder level 1 + new option doesn't need closing whereas anything else does.

@klauspost
Copy link
Owner

@twmb Yes, that sounds like a good clean way to do it.

@bcmills
Copy link

bcmills commented Jun 18, 2020

Under what conditions do the goroutines leak? (Would it make sense to, for example, set a finalizer in NewEncoder that panics if Close, or perhaps Flush or Reset, is not called on the returned *Encoder before it becomes unreachable?)

@twmb
Copy link
Author

twmb commented Jun 18, 2020

The returned encoder never becomes unreachable because the goroutines it starts ensures that. So, goroutines leak if you do not call Close, which is mildly unexpected if you're using this in a context where you're pooling other potential compressors, or are used to pooling compressors. So attaching a finalizer to the current return would not solve a leak by forgetting to call close.

Another change that could be possible though would be to change the return to instead be a dedicated struct that the spawned goroutines do not have, and to attach a finalizer to that struct that calls Close. This would ensure goroutine leaks are avoided even with the current encoder if Close is forgotten. This can be done in a not-API-breaking way, too, so that's a plus.

@mostynb
Copy link
Contributor

mostynb commented Nov 24, 2020

Since this can be difficult to get right, I started working on a couple of tiny libraries to reuse zstd Encoders and Decoders safely:

  1. https://github.com/mostynb/zstdpool-freelist - Custom freelists, which allow clients to choose when and how many items to drop.
  2. https://github.com/mostynb/zstdpool-syncpool - Using wrappers with finalizers that embed an Encoder/Decoder, and storing the wrappers in a sync.Pool (similar to kafka-go's solution).

Neither have been stress-tested yet, and I'm not sure which I prefer. Opinions/collaborators welcome.

@vmihailenco
Copy link

vmihailenco commented Jan 18, 2021

I also found https://github.com/tailscale/tailscale/tree/main/smallzstd which just provides a pre-configured encoder/decoder, but proves that more people would like to have a no-goroutine version...

@peterbourgon
Copy link

peterbourgon commented May 30, 2021

Revisiting this issue, I realized I haven't described my own use case, which is similar to @twmb but not identical.

I have a service which handles tens of thousands of requests per second per instance. Acceptable and predictable performance requires that the work done by a request is constrained to that request's goroutine — no worker pools or task queues or handoff allowed. When that work includes compressing some outbound data and decompressing the response, I need to be able to take a compressor/decompressor from a pool, use it, and return it, without involving any goroutines beyond my own.

I would absolutely love to use this package, but the goroutines unavoidably spawned by the Encoder and Decoder — even with concurrency 1 — make it impossible.

@klauspost
Copy link
Owner

Tracking in #477

@klauspost klauspost reopened this Feb 24, 2022
@klauspost
Copy link
Owner

Just realized this was for encoding.

#498 is now ready for decoding.

klauspost added a commit that referenced this issue Feb 26, 2022
Do not use goroutines when encoder concurrency is 1.

Fixes #264

Can probably be clean up a bit.
@klauspost
Copy link
Owner

Encoding is in #505 - appears to be working. Fuzz testing a bit.

@twmb
Copy link
Author

twmb commented Feb 27, 2022

Very exciting! Is the intent to merge & tag as v1.14.5 ~soon?

klauspost added a commit that referenced this issue Feb 27, 2022
* zstd: Add stream encoding without goroutines

Does not use goroutines when encoder concurrency is 1.

Fixes #264

Can probably be clean up a bit.

* Reduce allocs for concurrent buffers when not used.
@klauspost
Copy link
Owner

Yes, through it will probably be 1.15.0 since decoder differences to the previous that doesn't make it a 100% trivial update.

Fuzz tests looks good. It is merged to master now, feel free to test it out.

@twmb
Copy link
Author

twmb commented Mar 2, 2022

I tested this locally and saw ~5% increase in compressing throughput (realizing now that I did not check decompressing), but I keep the zstd encoders / decoders quite restricted. Thank you again for the work here!

danw added a commit to danw/remote-apis-sdks that referenced this issue Apr 12, 2024
It was initially created due to goroutine leakage as described in
klauspost/compress#264, but that has now been
fixed in the zstd package. So it's safe to use zstd Encoders/Decoders in
sync.Pool directly, as long as concurrency is disabled.
danw added a commit to danw/remote-apis-sdks that referenced this issue Apr 12, 2024
It was initially created due to goroutine leakage as described in
klauspost/compress#264, but that has now been
fixed in the zstd package. So it's safe to use zstd Encoders/Decoders in
sync.Pool directly, as long as concurrency is disabled.
danw added a commit to danw/remote-apis-sdks that referenced this issue Apr 12, 2024
It was initially created due to goroutine leakage as described in
klauspost/compress#264, but that has now been
fixed in the zstd package. So it's safe to use zstd Encoders/Decoders in
sync.Pool directly, as long as concurrency is disabled.
dmitryax pushed a commit to open-telemetry/opentelemetry-collector that referenced this issue Apr 12, 2024
**Description:** zstd benchmark tests added
The goal of this PR is to disable concurrency in zstd compression to
reduce its memory footprint and avoid a known issue with goroutine
leaks. Please see - klauspost/compress#264

**Link to tracking Issue:**
#8216

**Testing:** Benchmark test results below
```
BenchmarkCompression/zstdWithConcurrency/compress-10         	   21392	     55855 ns/op	187732.88 MB/s	 2329164 B/op	      28 allocs/op
BenchmarkCompression/zstdNoConcurrency/compress-10           	   29526	     39902 ns/op	262787.42 MB/s	 1758988 B/op	      15 allocs/op
input => 10.00 MB
```
danw added a commit to danw/remote-apis-sdks that referenced this issue Apr 23, 2024
It was initially created due to goroutine leakage as described in
klauspost/compress#264, but that has now been
fixed in the zstd package. So it's safe to use zstd Encoders/Decoders in
sync.Pool directly, as long as concurrency is disabled.
mrahs pushed a commit to bazelbuild/remote-apis-sdks that referenced this issue Apr 25, 2024
It was initially created due to goroutine leakage as described in
klauspost/compress#264, but that has now been
fixed in the zstd package. So it's safe to use zstd Encoders/Decoders in
sync.Pool directly, as long as concurrency is disabled.
mrahs pushed a commit to mrahs/remote-apis-sdks that referenced this issue Apr 29, 2024
It was initially created due to goroutine leakage as described in
klauspost/compress#264, but that has now been
fixed in the zstd package. So it's safe to use zstd Encoders/Decoders in
sync.Pool directly, as long as concurrency is disabled.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

7 participants