Skip to content

Commit

Permalink
Add shared zstd decoder pools.
Browse files Browse the repository at this point in the history
Like #267, but for decoders.
  • Loading branch information
rubensf committed Feb 19, 2021
1 parent 3fb4183 commit c3d4dae
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 5 deletions.
1 change: 1 addition & 0 deletions go/pkg/client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ go_library(
"@com_github_golang_protobuf//proto:go_default_library",
"@com_github_golang_protobuf//ptypes:go_default_library_gen",
"@com_github_klauspost_compress//zstd:go_default_library",
"@com_github_mostynb_zstdpool_syncpool//:go_default_library",
"@com_github_pborman_uuid//:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@go_googleapis//google/bytestream:bytestream_go_proto",
Expand Down
23 changes: 18 additions & 5 deletions go/pkg/client/cas.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/bazelbuild/remote-apis-sdks/go/pkg/uploadinfo"
"github.com/golang/protobuf/proto"
"github.com/klauspost/compress/zstd"
"github.com/mostynb/zstdpool-syncpool"
"github.com/pborman/uuid"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -859,6 +860,9 @@ func (c *Client) ReadBlobToFile(ctx context.Context, d digest.Digest, fpath stri
return c.readBlobStreamed(ctx, d, 0, 0, &fileWriter{f})
}

var decoderInit sync.Once
var decoders *sync.Pool

// NewCompressedWriteBuffer creates wraps a io.Writer contained compressed contents to write
// decompressed contents.
func NewCompressedWriteBuffer(w io.Writer) (io.WriteCloser, chan error, error) {
Expand All @@ -868,9 +872,17 @@ func NewCompressedWriteBuffer(w io.Writer) (io.WriteCloser, chan error, error) {
// the writer data.
r, nw := io.Pipe()

// TODO(rubensf): Reuse decoders when possible to save the effort of starting/closing goroutines.
decoder, err := zstd.NewReader(r)
if err != nil {
decoderInit.Do(func() {
decoders = syncpool.NewDecoderPool(zstd.WithDecoderConcurrency(1))
})

decdIntf := decoders.Get()
decoderW, ok := decdIntf.(*syncpool.DecoderWrapper)
if !ok || decoderW == nil {
return nil, nil, fmt.Errorf("failed creating new decoder")
}

if err := decoderW.Reset(r); err != nil {
return nil, nil, err
}

Expand All @@ -881,13 +893,14 @@ func NewCompressedWriteBuffer(w io.Writer) (io.WriteCloser, chan error, error) {
// separate thread. As such, we also need a way to signal the main
// thread that the decoding has finished - which will have some delay
// from the last Write call.
_, err := decoder.WriteTo(w)
_, err := decoderW.WriteTo(w)
if err != nil {
// Because WriteTo returned early, the pipe writers still
// have to go somewhere or they'll block execution.
io.Copy(ioutil.Discard, r)
}
decoder.Close()
// DecoderWrapper.Close moves the decoder back to the Pool.
decoderW.Close()
done <- err
}()

Expand Down

0 comments on commit c3d4dae

Please sign in to comment.