Skip to content

Commit

Permalink
Remove zstdpool-syncpool
Browse files Browse the repository at this point in the history
It was initially created due to goroutine leakage as described in
klauspost/compress#264, but that has now been
fixed in the zstd package. So it's safe to use zstd Encoders/Decoders in
sync.Pool directly, as long as concurrency is disabled.
  • Loading branch information
danw committed Apr 12, 2024
1 parent c1bc825 commit 62e53fc
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 24 deletions.
3 changes: 1 addition & 2 deletions go.mod
Expand Up @@ -6,8 +6,7 @@ require (
github.com/bazelbuild/remote-apis v0.0.0-20230411132548-35aee1c4a425
github.com/golang/glog v1.1.0
github.com/google/go-cmp v0.5.9
github.com/klauspost/compress v1.12.3
github.com/mostynb/zstdpool-syncpool v0.0.7
github.com/klauspost/compress v1.17.8
github.com/pborman/uuid v1.2.0
github.com/pkg/errors v0.9.1
github.com/pkg/xattr v0.4.4
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Expand Up @@ -51,8 +51,8 @@ github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/klauspost/compress v1.12.3 h1:G5AfA94pHPysR56qqrkO2pxEexdDzrpFJ6yt/VqWxVU=
github.com/klauspost/compress v1.12.3/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/mostynb/zstdpool-syncpool v0.0.7 h1:meYfUODlzmtOCrFmbJsUVEIt5rbmNUsz+Bu+Vnr95ls=
github.com/mostynb/zstdpool-syncpool v0.0.7/go.mod h1:YpzqIpN8xvRZZvemem7CMLPWkjuaKR37MnkQruSj6aw=
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/pborman/uuid v1.2.0 h1:J7Q5mO4ysT1dv8hyrUGHb9+ooztCXu1D8MY8DZYsu3g=
github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down
1 change: 0 additions & 1 deletion go/pkg/client/BUILD.bazel
Expand Up @@ -30,7 +30,6 @@ go_library(
"@com_github_bazelbuild_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto",
"@com_github_golang_glog//:go_default_library",
"@com_github_klauspost_compress//zstd:go_default_library",
"@com_github_mostynb_zstdpool_syncpool//:go_default_library",
"@com_github_pborman_uuid//:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@go_googleapis//google/bytestream:bytestream_go_proto",
Expand Down
19 changes: 14 additions & 5 deletions go/pkg/client/cas_download.go
Expand Up @@ -18,7 +18,6 @@ import (
repb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
log "github.com/golang/glog"
"github.com/klauspost/compress/zstd"
syncpool "github.com/mostynb/zstdpool-syncpool"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -536,11 +535,19 @@ func NewCompressedWriteBuffer(w io.Writer) (io.WriteCloser, chan error, error) {
r, nw := io.Pipe()

decoderInit.Do(func() {
decoders = syncpool.NewDecoderPool(zstd.WithDecoderConcurrency(1))
decoders = &sync.Pool{
New: func() interface{} {
d, err := zstd.NewReader(nil, zstd.WithDecoderConcurrency(1))
if err != nil {
return nil
}
return d
},
}
})

decdIntf := decoders.Get()
decoderW, ok := decdIntf.(*syncpool.DecoderWrapper)
decoderW, ok := decdIntf.(*zstd.Decoder)
if !ok || decoderW == nil {
return nil, nil, fmt.Errorf("failed creating new decoder")
}
Expand All @@ -562,8 +569,10 @@ func NewCompressedWriteBuffer(w io.Writer) (io.WriteCloser, chan error, error) {
// have to go somewhere or they'll block execution.
io.Copy(io.Discard, r)
}
// DecoderWrapper.Close moves the decoder back to the Pool.
decoderW.Close()
// Reset and move the decoder back to the Pool.
if decoderW.Reset(nil) == nil {
decoders.Put(decoderW)
}
done <- err
}()

Expand Down
1 change: 0 additions & 1 deletion go/pkg/reader/BUILD.bazel
Expand Up @@ -7,7 +7,6 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"@com_github_klauspost_compress//zstd:go_default_library",
"@com_github_mostynb_zstdpool_syncpool//:go_default_library",
],
)

Expand Down
17 changes: 12 additions & 5 deletions go/pkg/reader/reader.go
Expand Up @@ -10,7 +10,6 @@ import (
"sync"

"github.com/klauspost/compress/zstd"
syncpool "github.com/mostynb/zstdpool-syncpool"
)

// errNotInitialized is the error returned from Read() by a ReedSeeker that
Expand Down Expand Up @@ -156,7 +155,7 @@ func (sb *syncedBuffer) Reset() {

type compressedSeeker struct {
fs ReadSeeker
encdW *syncpool.EncoderWrapper
encdW *zstd.Encoder
// This keeps the compressed data
buf *syncedBuffer
}
Expand All @@ -176,14 +175,22 @@ func NewCompressedSeeker(fs ReadSeeker) (ReadSeeker, error) {
}

encoderInit.Do(func() {
encoders = syncpool.NewEncoderPool(zstd.WithEncoderConcurrency(1))
encoders = &sync.Pool{
New: func() interface{} {
e, err := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1))
if err != nil {
return nil
}
return e
},
}
})

buf := bytes.NewBuffer(nil)
sb := &syncedBuffer{buf: buf}

encdIntf := encoders.Get()
encdW, ok := encdIntf.(*syncpool.EncoderWrapper)
encdW, ok := encdIntf.(*zstd.Encoder)
if !ok || encdW == nil {
return nil, errors.New("failed creating new encoder")
}
Expand Down Expand Up @@ -245,7 +252,7 @@ func (cfs *compressedSeeker) SeekOffset(offset int64) error {
if cfs.encdW == nil {
encdIntf := encoders.Get()
var ok bool
cfs.encdW, ok = encdIntf.(*syncpool.EncoderWrapper)
cfs.encdW, ok = encdIntf.(*zstd.Encoder)
if !ok || cfs.encdW == nil {
return errors.New("failed to get a new encoder")
}
Expand Down
10 changes: 2 additions & 8 deletions go_deps.bzl
Expand Up @@ -118,14 +118,8 @@ def remote_apis_sdks_go_deps():
go_repository(
name = "com_github_klauspost_compress",
importpath = "github.com/klauspost/compress",
sum = "h1:G5AfA94pHPysR56qqrkO2pxEexdDzrpFJ6yt/VqWxVU=",
version = "v1.12.3",
)
go_repository(
name = "com_github_mostynb_zstdpool_syncpool",
importpath = "github.com/mostynb/zstdpool-syncpool",
sum = "h1:meYfUODlzmtOCrFmbJsUVEIt5rbmNUsz+Bu+Vnr95ls=",
version = "v0.0.7",
sum = "h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=",
version = "v1.17.8",
)
go_repository(
name = "com_github_pborman_uuid",
Expand Down

0 comments on commit 62e53fc

Please sign in to comment.