From c2fef0a45f33f020730fa0da7192740a7c25e68a Mon Sep 17 00:00:00 2001 From: Dan Willemsen Date: Thu, 25 Apr 2024 09:17:13 -0700 Subject: [PATCH] Remove zstdpool-syncpool (#551) It was initially created due to goroutine leakage as described in https://github.com/klauspost/compress/issues/264, but that has now been fixed in the zstd package. So it's safe to use zstd Encoders/Decoders in sync.Pool directly, as long as concurrency is disabled. --- go.mod | 4 +--- go.sum | 8 ++------ go/pkg/client/BUILD.bazel | 1 - go/pkg/client/cas_download.go | 22 +++++++++++++++++----- go/pkg/reader/BUILD.bazel | 2 +- go/pkg/reader/reader.go | 19 ++++++++++++++----- go_deps.bzl | 10 ++-------- 7 files changed, 37 insertions(+), 29 deletions(-) diff --git a/go.mod b/go.mod index 8a3c6a73..e19a698a 100644 --- a/go.mod +++ b/go.mod @@ -6,8 +6,7 @@ require ( github.com/bazelbuild/remote-apis v0.0.0-20230411132548-35aee1c4a425 github.com/golang/glog v1.1.0 github.com/google/go-cmp v0.5.9 - github.com/klauspost/compress v1.12.3 - github.com/mostynb/zstdpool-syncpool v0.0.7 + github.com/klauspost/compress v1.17.8 github.com/pborman/uuid v1.2.0 github.com/pkg/errors v0.9.1 github.com/pkg/xattr v0.4.4 @@ -26,7 +25,6 @@ require ( cloud.google.com/go/compute/metadata v0.2.3 // indirect cloud.google.com/go/longrunning v0.5.1 // indirect github.com/golang/protobuf v1.5.3 // indirect - github.com/golang/snappy v0.0.3 // indirect github.com/google/uuid v1.3.0 // indirect golang.org/x/net v0.23.0 // indirect golang.org/x/sys v0.18.0 // indirect diff --git a/go.sum b/go.sum index 25d8b661..c330c833 100644 --- a/go.sum +++ b/go.sum @@ -35,8 +35,6 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= -github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA= -github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= @@ -49,10 +47,8 @@ github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+ github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/klauspost/compress v1.12.3 h1:G5AfA94pHPysR56qqrkO2pxEexdDzrpFJ6yt/VqWxVU= -github.com/klauspost/compress v1.12.3/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= -github.com/mostynb/zstdpool-syncpool v0.0.7 h1:meYfUODlzmtOCrFmbJsUVEIt5rbmNUsz+Bu+Vnr95ls= -github.com/mostynb/zstdpool-syncpool v0.0.7/go.mod h1:YpzqIpN8xvRZZvemem7CMLPWkjuaKR37MnkQruSj6aw= +github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= +github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/pborman/uuid v1.2.0 h1:J7Q5mO4ysT1dv8hyrUGHb9+ooztCXu1D8MY8DZYsu3g= github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= diff --git a/go/pkg/client/BUILD.bazel b/go/pkg/client/BUILD.bazel index f7800590..2fbeec98 100644 --- a/go/pkg/client/BUILD.bazel +++ b/go/pkg/client/BUILD.bazel @@ -30,7 +30,6 @@ go_library( "@com_github_bazelbuild_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto", "@com_github_golang_glog//:go_default_library", "@com_github_klauspost_compress//zstd:go_default_library", - "@com_github_mostynb_zstdpool_syncpool//:go_default_library", "@com_github_pborman_uuid//:go_default_library", "@com_github_pkg_errors//:go_default_library", "@go_googleapis//google/bytestream:bytestream_go_proto", diff --git a/go/pkg/client/cas_download.go b/go/pkg/client/cas_download.go index fccb8f71..9fe5e1fb 100644 --- a/go/pkg/client/cas_download.go +++ b/go/pkg/client/cas_download.go @@ -18,7 +18,6 @@ import ( repb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2" log "github.com/golang/glog" "github.com/klauspost/compress/zstd" - syncpool "github.com/mostynb/zstdpool-syncpool" "golang.org/x/sync/errgroup" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -536,11 +535,20 @@ func NewCompressedWriteBuffer(w io.Writer) (io.WriteCloser, chan error, error) { r, nw := io.Pipe() decoderInit.Do(func() { - decoders = syncpool.NewDecoderPool(zstd.WithDecoderConcurrency(1)) + decoders = &sync.Pool{ + New: func() interface{} { + d, err := zstd.NewReader(nil, zstd.WithDecoderConcurrency(1)) + if err != nil { + log.Errorf("Error creating new decoder: %v", err) + return nil + } + return d + }, + } }) decdIntf := decoders.Get() - decoderW, ok := decdIntf.(*syncpool.DecoderWrapper) + decoderW, ok := decdIntf.(*zstd.Decoder) if !ok || decoderW == nil { return nil, nil, fmt.Errorf("failed creating new decoder") } @@ -562,8 +570,12 @@ func NewCompressedWriteBuffer(w io.Writer) (io.WriteCloser, chan error, error) { // have to go somewhere or they'll block execution. io.Copy(io.Discard, r) } - // DecoderWrapper.Close moves the decoder back to the Pool. - decoderW.Close() + // Reset and move the decoder back to the Pool. + if rerr := decoderW.Reset(nil); rerr == nil { + decoders.Put(decoderW) + } else { + log.Warningf("Error resetting decoder: %v", rerr) + } done <- err }() diff --git a/go/pkg/reader/BUILD.bazel b/go/pkg/reader/BUILD.bazel index d210bef3..aae5d488 100644 --- a/go/pkg/reader/BUILD.bazel +++ b/go/pkg/reader/BUILD.bazel @@ -6,8 +6,8 @@ go_library( importpath = "github.com/bazelbuild/remote-apis-sdks/go/pkg/reader", visibility = ["//visibility:public"], deps = [ + "@com_github_golang_glog//:go_default_library", "@com_github_klauspost_compress//zstd:go_default_library", - "@com_github_mostynb_zstdpool_syncpool//:go_default_library", ], ) diff --git a/go/pkg/reader/reader.go b/go/pkg/reader/reader.go index e35f772a..fcf52432 100644 --- a/go/pkg/reader/reader.go +++ b/go/pkg/reader/reader.go @@ -9,8 +9,8 @@ import ( "os" "sync" + log "github.com/golang/glog" "github.com/klauspost/compress/zstd" - syncpool "github.com/mostynb/zstdpool-syncpool" ) // errNotInitialized is the error returned from Read() by a ReedSeeker that @@ -156,7 +156,7 @@ func (sb *syncedBuffer) Reset() { type compressedSeeker struct { fs ReadSeeker - encdW *syncpool.EncoderWrapper + encdW *zstd.Encoder // This keeps the compressed data buf *syncedBuffer } @@ -176,14 +176,23 @@ func NewCompressedSeeker(fs ReadSeeker) (ReadSeeker, error) { } encoderInit.Do(func() { - encoders = syncpool.NewEncoderPool(zstd.WithEncoderConcurrency(1)) + encoders = &sync.Pool{ + New: func() interface{} { + e, err := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1)) + if err != nil { + log.Errorf("Error creating new encoder: %v", err) + return nil + } + return e + }, + } }) buf := bytes.NewBuffer(nil) sb := &syncedBuffer{buf: buf} encdIntf := encoders.Get() - encdW, ok := encdIntf.(*syncpool.EncoderWrapper) + encdW, ok := encdIntf.(*zstd.Encoder) if !ok || encdW == nil { return nil, errors.New("failed creating new encoder") } @@ -245,7 +254,7 @@ func (cfs *compressedSeeker) SeekOffset(offset int64) error { if cfs.encdW == nil { encdIntf := encoders.Get() var ok bool - cfs.encdW, ok = encdIntf.(*syncpool.EncoderWrapper) + cfs.encdW, ok = encdIntf.(*zstd.Encoder) if !ok || cfs.encdW == nil { return errors.New("failed to get a new encoder") } diff --git a/go_deps.bzl b/go_deps.bzl index ec1326e7..5b2b8c82 100644 --- a/go_deps.bzl +++ b/go_deps.bzl @@ -118,14 +118,8 @@ def remote_apis_sdks_go_deps(): go_repository( name = "com_github_klauspost_compress", importpath = "github.com/klauspost/compress", - sum = "h1:G5AfA94pHPysR56qqrkO2pxEexdDzrpFJ6yt/VqWxVU=", - version = "v1.12.3", - ) - go_repository( - name = "com_github_mostynb_zstdpool_syncpool", - importpath = "github.com/mostynb/zstdpool-syncpool", - sum = "h1:meYfUODlzmtOCrFmbJsUVEIt5rbmNUsz+Bu+Vnr95ls=", - version = "v0.0.7", + sum = "h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=", + version = "v1.17.8", ) go_repository( name = "com_github_pborman_uuid",