Skip to content

Commit

Permalink
Remove zstdpool-syncpool (#551)
Browse files Browse the repository at this point in the history
It was initially created due to goroutine leakage as described in
klauspost/compress#264, but that has now been
fixed in the zstd package. So it's safe to use zstd Encoders/Decoders in
sync.Pool directly, as long as concurrency is disabled.
  • Loading branch information
danw committed Apr 25, 2024
1 parent c09a88c commit c2fef0a
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 29 deletions.
4 changes: 1 addition & 3 deletions go.mod
Expand Up @@ -6,8 +6,7 @@ require (
github.com/bazelbuild/remote-apis v0.0.0-20230411132548-35aee1c4a425
github.com/golang/glog v1.1.0
github.com/google/go-cmp v0.5.9
github.com/klauspost/compress v1.12.3
github.com/mostynb/zstdpool-syncpool v0.0.7
github.com/klauspost/compress v1.17.8
github.com/pborman/uuid v1.2.0
github.com/pkg/errors v0.9.1
github.com/pkg/xattr v0.4.4
Expand All @@ -26,7 +25,6 @@ require (
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/longrunning v0.5.1 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.3 // indirect
github.com/google/uuid v1.3.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/sys v0.18.0 // indirect
Expand Down
8 changes: 2 additions & 6 deletions go.sum
Expand Up @@ -35,8 +35,6 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA=
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
Expand All @@ -49,10 +47,8 @@ github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/klauspost/compress v1.12.3 h1:G5AfA94pHPysR56qqrkO2pxEexdDzrpFJ6yt/VqWxVU=
github.com/klauspost/compress v1.12.3/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/mostynb/zstdpool-syncpool v0.0.7 h1:meYfUODlzmtOCrFmbJsUVEIt5rbmNUsz+Bu+Vnr95ls=
github.com/mostynb/zstdpool-syncpool v0.0.7/go.mod h1:YpzqIpN8xvRZZvemem7CMLPWkjuaKR37MnkQruSj6aw=
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/pborman/uuid v1.2.0 h1:J7Q5mO4ysT1dv8hyrUGHb9+ooztCXu1D8MY8DZYsu3g=
github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down
1 change: 0 additions & 1 deletion go/pkg/client/BUILD.bazel
Expand Up @@ -30,7 +30,6 @@ go_library(
"@com_github_bazelbuild_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto",
"@com_github_golang_glog//:go_default_library",
"@com_github_klauspost_compress//zstd:go_default_library",
"@com_github_mostynb_zstdpool_syncpool//:go_default_library",
"@com_github_pborman_uuid//:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@go_googleapis//google/bytestream:bytestream_go_proto",
Expand Down
22 changes: 17 additions & 5 deletions go/pkg/client/cas_download.go
Expand Up @@ -18,7 +18,6 @@ import (
repb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
log "github.com/golang/glog"
"github.com/klauspost/compress/zstd"
syncpool "github.com/mostynb/zstdpool-syncpool"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -536,11 +535,20 @@ func NewCompressedWriteBuffer(w io.Writer) (io.WriteCloser, chan error, error) {
r, nw := io.Pipe()

decoderInit.Do(func() {
decoders = syncpool.NewDecoderPool(zstd.WithDecoderConcurrency(1))
decoders = &sync.Pool{
New: func() interface{} {
d, err := zstd.NewReader(nil, zstd.WithDecoderConcurrency(1))
if err != nil {
log.Errorf("Error creating new decoder: %v", err)
return nil
}
return d
},
}
})

decdIntf := decoders.Get()
decoderW, ok := decdIntf.(*syncpool.DecoderWrapper)
decoderW, ok := decdIntf.(*zstd.Decoder)
if !ok || decoderW == nil {
return nil, nil, fmt.Errorf("failed creating new decoder")
}
Expand All @@ -562,8 +570,12 @@ func NewCompressedWriteBuffer(w io.Writer) (io.WriteCloser, chan error, error) {
// have to go somewhere or they'll block execution.
io.Copy(io.Discard, r)
}
// DecoderWrapper.Close moves the decoder back to the Pool.
decoderW.Close()
// Reset and move the decoder back to the Pool.
if rerr := decoderW.Reset(nil); rerr == nil {
decoders.Put(decoderW)
} else {
log.Warningf("Error resetting decoder: %v", rerr)
}
done <- err
}()

Expand Down
2 changes: 1 addition & 1 deletion go/pkg/reader/BUILD.bazel
Expand Up @@ -6,8 +6,8 @@ go_library(
importpath = "github.com/bazelbuild/remote-apis-sdks/go/pkg/reader",
visibility = ["//visibility:public"],
deps = [
"@com_github_golang_glog//:go_default_library",
"@com_github_klauspost_compress//zstd:go_default_library",
"@com_github_mostynb_zstdpool_syncpool//:go_default_library",
],
)

Expand Down
19 changes: 14 additions & 5 deletions go/pkg/reader/reader.go
Expand Up @@ -9,8 +9,8 @@ import (
"os"
"sync"

log "github.com/golang/glog"
"github.com/klauspost/compress/zstd"
syncpool "github.com/mostynb/zstdpool-syncpool"
)

// errNotInitialized is the error returned from Read() by a ReedSeeker that
Expand Down Expand Up @@ -156,7 +156,7 @@ func (sb *syncedBuffer) Reset() {

type compressedSeeker struct {
fs ReadSeeker
encdW *syncpool.EncoderWrapper
encdW *zstd.Encoder
// This keeps the compressed data
buf *syncedBuffer
}
Expand All @@ -176,14 +176,23 @@ func NewCompressedSeeker(fs ReadSeeker) (ReadSeeker, error) {
}

encoderInit.Do(func() {
encoders = syncpool.NewEncoderPool(zstd.WithEncoderConcurrency(1))
encoders = &sync.Pool{
New: func() interface{} {
e, err := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1))
if err != nil {
log.Errorf("Error creating new encoder: %v", err)
return nil
}
return e
},
}
})

buf := bytes.NewBuffer(nil)
sb := &syncedBuffer{buf: buf}

encdIntf := encoders.Get()
encdW, ok := encdIntf.(*syncpool.EncoderWrapper)
encdW, ok := encdIntf.(*zstd.Encoder)
if !ok || encdW == nil {
return nil, errors.New("failed creating new encoder")
}
Expand Down Expand Up @@ -245,7 +254,7 @@ func (cfs *compressedSeeker) SeekOffset(offset int64) error {
if cfs.encdW == nil {
encdIntf := encoders.Get()
var ok bool
cfs.encdW, ok = encdIntf.(*syncpool.EncoderWrapper)
cfs.encdW, ok = encdIntf.(*zstd.Encoder)
if !ok || cfs.encdW == nil {
return errors.New("failed to get a new encoder")
}
Expand Down
10 changes: 2 additions & 8 deletions go_deps.bzl
Expand Up @@ -118,14 +118,8 @@ def remote_apis_sdks_go_deps():
go_repository(
name = "com_github_klauspost_compress",
importpath = "github.com/klauspost/compress",
sum = "h1:G5AfA94pHPysR56qqrkO2pxEexdDzrpFJ6yt/VqWxVU=",
version = "v1.12.3",
)
go_repository(
name = "com_github_mostynb_zstdpool_syncpool",
importpath = "github.com/mostynb/zstdpool-syncpool",
sum = "h1:meYfUODlzmtOCrFmbJsUVEIt5rbmNUsz+Bu+Vnr95ls=",
version = "v0.0.7",
sum = "h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=",
version = "v1.17.8",
)
go_repository(
name = "com_github_pborman_uuid",
Expand Down

0 comments on commit c2fef0a

Please sign in to comment.