diff --git a/go.mod b/go.mod index a8387e65..a20ecd43 100644 --- a/go.mod +++ b/go.mod @@ -6,8 +6,7 @@ require ( github.com/bazelbuild/remote-apis v0.0.0-20230411132548-35aee1c4a425 github.com/golang/glog v1.1.0 github.com/google/go-cmp v0.5.9 - github.com/klauspost/compress v1.12.3 - github.com/mostynb/zstdpool-syncpool v0.0.7 + github.com/klauspost/compress v1.17.8 github.com/pborman/uuid v1.2.0 github.com/pkg/errors v0.9.1 github.com/pkg/xattr v0.4.4 diff --git a/go.sum b/go.sum index b5a0c42e..233371dc 100644 --- a/go.sum +++ b/go.sum @@ -51,8 +51,8 @@ github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/klauspost/compress v1.12.3 h1:G5AfA94pHPysR56qqrkO2pxEexdDzrpFJ6yt/VqWxVU= github.com/klauspost/compress v1.12.3/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= -github.com/mostynb/zstdpool-syncpool v0.0.7 h1:meYfUODlzmtOCrFmbJsUVEIt5rbmNUsz+Bu+Vnr95ls= -github.com/mostynb/zstdpool-syncpool v0.0.7/go.mod h1:YpzqIpN8xvRZZvemem7CMLPWkjuaKR37MnkQruSj6aw= +github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= +github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/pborman/uuid v1.2.0 h1:J7Q5mO4ysT1dv8hyrUGHb9+ooztCXu1D8MY8DZYsu3g= github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= diff --git a/go/pkg/client/BUILD.bazel b/go/pkg/client/BUILD.bazel index f7800590..2fbeec98 100644 --- a/go/pkg/client/BUILD.bazel +++ b/go/pkg/client/BUILD.bazel @@ -30,7 +30,6 @@ go_library( "@com_github_bazelbuild_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto", "@com_github_golang_glog//:go_default_library", "@com_github_klauspost_compress//zstd:go_default_library", - "@com_github_mostynb_zstdpool_syncpool//:go_default_library", "@com_github_pborman_uuid//:go_default_library", "@com_github_pkg_errors//:go_default_library", "@go_googleapis//google/bytestream:bytestream_go_proto", diff --git a/go/pkg/client/cas_download.go b/go/pkg/client/cas_download.go index fccb8f71..6f60b2ca 100644 --- a/go/pkg/client/cas_download.go +++ b/go/pkg/client/cas_download.go @@ -18,7 +18,6 @@ import ( repb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2" log "github.com/golang/glog" "github.com/klauspost/compress/zstd" - syncpool "github.com/mostynb/zstdpool-syncpool" "golang.org/x/sync/errgroup" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -536,11 +535,19 @@ func NewCompressedWriteBuffer(w io.Writer) (io.WriteCloser, chan error, error) { r, nw := io.Pipe() decoderInit.Do(func() { - decoders = syncpool.NewDecoderPool(zstd.WithDecoderConcurrency(1)) + decoders = &sync.Pool{ + New: func() interface{} { + d, err := zstd.NewReader(nil, zstd.WithDecoderConcurrency(1)) + if err != nil { + return nil + } + return d + }, + } }) decdIntf := decoders.Get() - decoderW, ok := decdIntf.(*syncpool.DecoderWrapper) + decoderW, ok := decdIntf.(*zstd.Decoder) if !ok || decoderW == nil { return nil, nil, fmt.Errorf("failed creating new decoder") } @@ -562,8 +569,10 @@ func NewCompressedWriteBuffer(w io.Writer) (io.WriteCloser, chan error, error) { // have to go somewhere or they'll block execution. io.Copy(io.Discard, r) } - // DecoderWrapper.Close moves the decoder back to the Pool. - decoderW.Close() + // Reset and move the decoder back to the Pool. + if decoderW.Reset(nil) == nil { + decoders.Put(decoderW) + } done <- err }() diff --git a/go/pkg/reader/BUILD.bazel b/go/pkg/reader/BUILD.bazel index d210bef3..f8a6611c 100644 --- a/go/pkg/reader/BUILD.bazel +++ b/go/pkg/reader/BUILD.bazel @@ -7,7 +7,6 @@ go_library( visibility = ["//visibility:public"], deps = [ "@com_github_klauspost_compress//zstd:go_default_library", - "@com_github_mostynb_zstdpool_syncpool//:go_default_library", ], ) diff --git a/go/pkg/reader/reader.go b/go/pkg/reader/reader.go index e35f772a..10c2162a 100644 --- a/go/pkg/reader/reader.go +++ b/go/pkg/reader/reader.go @@ -10,7 +10,6 @@ import ( "sync" "github.com/klauspost/compress/zstd" - syncpool "github.com/mostynb/zstdpool-syncpool" ) // errNotInitialized is the error returned from Read() by a ReedSeeker that @@ -156,7 +155,7 @@ func (sb *syncedBuffer) Reset() { type compressedSeeker struct { fs ReadSeeker - encdW *syncpool.EncoderWrapper + encdW *zstd.Encoder // This keeps the compressed data buf *syncedBuffer } @@ -176,14 +175,22 @@ func NewCompressedSeeker(fs ReadSeeker) (ReadSeeker, error) { } encoderInit.Do(func() { - encoders = syncpool.NewEncoderPool(zstd.WithEncoderConcurrency(1)) + encoders = &sync.Pool{ + New: func() interface{} { + e, err := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1)) + if err != nil { + return nil + } + return e + }, + } }) buf := bytes.NewBuffer(nil) sb := &syncedBuffer{buf: buf} encdIntf := encoders.Get() - encdW, ok := encdIntf.(*syncpool.EncoderWrapper) + encdW, ok := encdIntf.(*zstd.Encoder) if !ok || encdW == nil { return nil, errors.New("failed creating new encoder") } @@ -245,7 +252,7 @@ func (cfs *compressedSeeker) SeekOffset(offset int64) error { if cfs.encdW == nil { encdIntf := encoders.Get() var ok bool - cfs.encdW, ok = encdIntf.(*syncpool.EncoderWrapper) + cfs.encdW, ok = encdIntf.(*zstd.Encoder) if !ok || cfs.encdW == nil { return errors.New("failed to get a new encoder") } diff --git a/go_deps.bzl b/go_deps.bzl index ec1326e7..5b2b8c82 100644 --- a/go_deps.bzl +++ b/go_deps.bzl @@ -118,14 +118,8 @@ def remote_apis_sdks_go_deps(): go_repository( name = "com_github_klauspost_compress", importpath = "github.com/klauspost/compress", - sum = "h1:G5AfA94pHPysR56qqrkO2pxEexdDzrpFJ6yt/VqWxVU=", - version = "v1.12.3", - ) - go_repository( - name = "com_github_mostynb_zstdpool_syncpool", - importpath = "github.com/mostynb/zstdpool-syncpool", - sum = "h1:meYfUODlzmtOCrFmbJsUVEIt5rbmNUsz+Bu+Vnr95ls=", - version = "v0.0.7", + sum = "h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=", + version = "v1.17.8", ) go_repository( name = "com_github_pborman_uuid",