From 3690e90db582d2ef76ff6ed28580e97a700a6313 Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Sun, 25 Sep 2022 10:37:12 +0200 Subject: [PATCH] Fix+reduce allocations (#668) Adds `WithDecodeBuffersBelow` to tweak buffer switch-over. Fixes #666 and generally reduces Reader allocations. --- zstd/decoder.go | 21 +++++++++++----- zstd/decoder_options.go | 34 +++++++++++++++++-------- zstd/decoder_test.go | 55 ++++++++++++++++++++++++++++++++--------- zstd/framedec.go | 4 +-- zstd/history.go | 21 +++++++--------- zstd/seqdec.go | 22 +++++++++++++++-- zstd/seqdec_amd64.go | 7 +++--- zstd/seqdec_generic.go | 4 +-- 8 files changed, 119 insertions(+), 49 deletions(-) diff --git a/zstd/decoder.go b/zstd/decoder.go index 6104eb7936..78c10755f8 100644 --- a/zstd/decoder.go +++ b/zstd/decoder.go @@ -35,6 +35,7 @@ type Decoder struct { br readerWrapper enabled bool inFrame bool + dstBuf []byte } frame *frameDec @@ -187,21 +188,23 @@ func (d *Decoder) Reset(r io.Reader) error { } // If bytes buffer and < 5MB, do sync decoding anyway. - if bb, ok := r.(byter); ok && bb.Len() < 5<<20 { + if bb, ok := r.(byter); ok && bb.Len() < d.o.decodeBufsBelow && !d.o.limitToCap { bb2 := bb if debugDecoder { println("*bytes.Buffer detected, doing sync decode, len:", bb.Len()) } b := bb2.Bytes() var dst []byte - if cap(d.current.b) > 0 { - dst = d.current.b + if cap(d.syncStream.dstBuf) > 0 { + dst = d.syncStream.dstBuf[:0] } - dst, err := d.DecodeAll(b, dst[:0]) + dst, err := d.DecodeAll(b, dst) if err == nil { err = io.EOF } + // Save output buffer + d.syncStream.dstBuf = dst d.current.b = dst d.current.err = err d.current.flushed = true @@ -216,6 +219,7 @@ func (d *Decoder) Reset(r io.Reader) error { d.current.err = nil d.current.flushed = false d.current.d = nil + d.syncStream.dstBuf = nil // Ensure no-one else is still running... d.streamWg.Wait() @@ -680,6 +684,7 @@ func (d *Decoder) startStreamDecoder(ctx context.Context, r io.Reader, output ch if debugDecoder { println("Async 1: new history, recent:", block.async.newHist.recentOffsets) } + hist.reset() hist.decoders = block.async.newHist.decoders hist.recentOffsets = block.async.newHist.recentOffsets hist.windowSize = block.async.newHist.windowSize @@ -711,6 +716,7 @@ func (d *Decoder) startStreamDecoder(ctx context.Context, r io.Reader, output ch seqExecute <- block } close(seqExecute) + hist.reset() }() var wg sync.WaitGroup @@ -734,6 +740,7 @@ func (d *Decoder) startStreamDecoder(ctx context.Context, r io.Reader, output ch if debugDecoder { println("Async 2: new history") } + hist.reset() hist.windowSize = block.async.newHist.windowSize hist.allocFrameBuffer = block.async.newHist.allocFrameBuffer if block.async.newHist.dict != nil { @@ -815,13 +822,14 @@ func (d *Decoder) startStreamDecoder(ctx context.Context, r io.Reader, output ch if debugDecoder { println("decoder goroutines finished") } + hist.reset() }() + var hist history decodeStream: for { - var hist history var hasErr bool - + hist.reset() decodeBlock := func(block *blockDec) { if hasErr { if block != nil { @@ -937,5 +945,6 @@ decodeStream: } close(seqDecode) wg.Wait() + hist.reset() d.frame.history.b = frameHistCache } diff --git a/zstd/decoder_options.go b/zstd/decoder_options.go index 666c2715fe..f42448e69c 100644 --- a/zstd/decoder_options.go +++ b/zstd/decoder_options.go @@ -14,21 +14,23 @@ type DOption func(*decoderOptions) error // options retains accumulated state of multiple options. type decoderOptions struct { - lowMem bool - concurrent int - maxDecodedSize uint64 - maxWindowSize uint64 - dicts []dict - ignoreChecksum bool - limitToCap bool + lowMem bool + concurrent int + maxDecodedSize uint64 + maxWindowSize uint64 + dicts []dict + ignoreChecksum bool + limitToCap bool + decodeBufsBelow int } func (o *decoderOptions) setDefault() { *o = decoderOptions{ // use less ram: true for now, but may change. - lowMem: true, - concurrent: runtime.GOMAXPROCS(0), - maxWindowSize: MaxWindowSize, + lowMem: true, + concurrent: runtime.GOMAXPROCS(0), + maxWindowSize: MaxWindowSize, + decodeBufsBelow: 128 << 10, } if o.concurrent > 4 { o.concurrent = 4 @@ -126,6 +128,18 @@ func WithDecodeAllCapLimit(b bool) DOption { } } +// WithDecodeBuffersBelow will fully decode readers that have a +// `Bytes() []byte` and `Len() int` interface similar to bytes.Buffer. +// This typically uses less allocations but will have the full decompressed object in memory. +// Note that DecodeAllCapLimit will disable this, as well as giving a size of 0 or less. +// Default is 128KiB. +func WithDecodeBuffersBelow(size int) DOption { + return func(o *decoderOptions) error { + o.decodeBufsBelow = size + return nil + } +} + // IgnoreChecksum allows to forcibly ignore checksum checking. func IgnoreChecksum(b bool) DOption { return func(o *decoderOptions) error { diff --git a/zstd/decoder_test.go b/zstd/decoder_test.go index e3f0a2ffc3..e9c7203487 100644 --- a/zstd/decoder_test.go +++ b/zstd/decoder_test.go @@ -1157,12 +1157,18 @@ func testDecoderFileBad(t *testing.T, fn string, newDec func() (*Decoder, error) func BenchmarkDecoder_DecoderSmall(b *testing.B) { zr := testCreateZipReader("testdata/benchdecoder.zip", b) - dec, err := NewReader(nil) + dec, err := NewReader(nil, WithDecodeBuffersBelow(1<<30)) if err != nil { b.Fatal(err) return } defer dec.Close() + dec2, err := NewReader(nil, WithDecodeBuffersBelow(0)) + if err != nil { + b.Fatal(err) + return + } + defer dec2.Close() for _, tt := range zr.File { if !strings.HasSuffix(tt.Name, ".zst") { continue @@ -1183,6 +1189,7 @@ func BenchmarkDecoder_DecoderSmall(b *testing.B) { in = append(in, in...) // 8x in = append(in, in...) + err = dec.Reset(bytes.NewBuffer(in)) if err != nil { b.Fatal(err) @@ -1191,19 +1198,43 @@ func BenchmarkDecoder_DecoderSmall(b *testing.B) { if err != nil { b.Fatal(err) } - b.SetBytes(int64(len(got))) - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - err = dec.Reset(bytes.NewBuffer(in)) - if err != nil { - b.Fatal(err) + b.Run("buffered", func(b *testing.B) { + b.SetBytes(int64(len(got))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err = dec.Reset(bytes.NewBuffer(in)) + if err != nil { + b.Fatal(err) + } + n, err := io.Copy(io.Discard, dec) + if err != nil { + b.Fatal(err) + } + if int(n) != len(got) { + b.Fatalf("want %d, got %d", len(got), n) + } + } - _, err := io.Copy(io.Discard, dec) - if err != nil { - b.Fatal(err) + }) + b.Run("unbuffered", func(b *testing.B) { + b.SetBytes(int64(len(got))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err = dec2.Reset(bytes.NewBuffer(in)) + if err != nil { + b.Fatal(err) + } + n, err := io.Copy(io.Discard, dec2) + if err != nil { + b.Fatal(err) + } + if int(n) != len(got) { + b.Fatalf("want %d, got %d", len(got), n) + } } - } + }) }) } } diff --git a/zstd/framedec.go b/zstd/framedec.go index 1559a20386..b6c5054176 100644 --- a/zstd/framedec.go +++ b/zstd/framedec.go @@ -343,7 +343,7 @@ func (d *frameDec) consumeCRC() error { return nil } -// runDecoder will create a sync decoder that will decode a block of data. +// runDecoder will run the decoder for the remainder of the frame. func (d *frameDec) runDecoder(dst []byte, dec *blockDec) ([]byte, error) { saved := d.history.b @@ -369,7 +369,7 @@ func (d *frameDec) runDecoder(dst []byte, dec *blockDec) ([]byte, error) { if debugDecoder { println("maxSyncLen:", d.history.decoders.maxSyncLen) } - if !d.o.limitToCap && uint64(cap(dst)-len(dst)) < d.history.decoders.maxSyncLen { + if !d.o.limitToCap && uint64(cap(dst)) < d.history.decoders.maxSyncLen { // Alloc for output dst2 := make([]byte, len(dst), d.history.decoders.maxSyncLen+compressedBlockOverAlloc) copy(dst2, dst) diff --git a/zstd/history.go b/zstd/history.go index 28b40153cc..09164856d2 100644 --- a/zstd/history.go +++ b/zstd/history.go @@ -37,24 +37,21 @@ func (h *history) reset() { h.ignoreBuffer = 0 h.error = false h.recentOffsets = [3]int{1, 4, 8} - if f := h.decoders.litLengths.fse; f != nil && !f.preDefined { - fseDecoderPool.Put(f) - } - if f := h.decoders.offsets.fse; f != nil && !f.preDefined { - fseDecoderPool.Put(f) - } - if f := h.decoders.matchLengths.fse; f != nil && !f.preDefined { - fseDecoderPool.Put(f) - } + h.decoders.freeDecoders() h.decoders = sequenceDecs{br: h.decoders.br} + h.freeHuffDecoder() + h.huffTree = nil + h.dict = nil + //printf("history created: %+v (l: %d, c: %d)", *h, len(h.b), cap(h.b)) +} + +func (h *history) freeHuffDecoder() { if h.huffTree != nil { if h.dict == nil || h.dict.litEnc != h.huffTree { huffDecoderPool.Put(h.huffTree) + h.huffTree = nil } } - h.huffTree = nil - h.dict = nil - //printf("history created: %+v (l: %d, c: %d)", *h, len(h.b), cap(h.b)) } func (h *history) setDict(dict *dict) { diff --git a/zstd/seqdec.go b/zstd/seqdec.go index df04472030..f833d1541f 100644 --- a/zstd/seqdec.go +++ b/zstd/seqdec.go @@ -99,6 +99,21 @@ func (s *sequenceDecs) initialize(br *bitReader, hist *history, out []byte) erro return nil } +func (s *sequenceDecs) freeDecoders() { + if f := s.litLengths.fse; f != nil && !f.preDefined { + fseDecoderPool.Put(f) + s.litLengths.fse = nil + } + if f := s.offsets.fse; f != nil && !f.preDefined { + fseDecoderPool.Put(f) + s.offsets.fse = nil + } + if f := s.matchLengths.fse; f != nil && !f.preDefined { + fseDecoderPool.Put(f) + s.matchLengths.fse = nil + } +} + // execute will execute the decoded sequence with the provided history. // The sequence must be evaluated before being sent. func (s *sequenceDecs) execute(seqs []seqVals, hist []byte) error { @@ -299,7 +314,10 @@ func (s *sequenceDecs) decodeSync(hist []byte) error { } size := ll + ml + len(out) if size-startSize > maxBlockSize { - return fmt.Errorf("output (%d) bigger than max block size (%d)", size-startSize, maxBlockSize) + if size-startSize == 424242 { + panic("here") + } + return fmt.Errorf("output bigger than max block size (%d)", maxBlockSize) } if size > cap(out) { // Not enough size, which can happen under high volume block streaming conditions @@ -411,7 +429,7 @@ func (s *sequenceDecs) decodeSync(hist []byte) error { // Check if space for literals if size := len(s.literals) + len(s.out) - startSize; size > maxBlockSize { - return fmt.Errorf("output (%d) bigger than max block size (%d)", size, maxBlockSize) + return fmt.Errorf("output bigger than max block size (%d)", maxBlockSize) } // Add final literals diff --git a/zstd/seqdec_amd64.go b/zstd/seqdec_amd64.go index 1c704d30c9..191384adfd 100644 --- a/zstd/seqdec_amd64.go +++ b/zstd/seqdec_amd64.go @@ -139,7 +139,7 @@ func (s *sequenceDecs) decodeSyncSimple(hist []byte) (bool, error) { if debugDecoder { println("msl:", s.maxSyncLen, "cap", cap(s.out), "bef:", startSize, "sz:", size-startSize, "mbs:", maxBlockSize, "outsz:", cap(s.out)-startSize) } - return true, fmt.Errorf("output (%d) bigger than max block size (%d)", size-startSize, maxBlockSize) + return true, fmt.Errorf("output bigger than max block size (%d)", maxBlockSize) default: return true, fmt.Errorf("sequenceDecs_decode returned erronous code %d", errCode) @@ -147,7 +147,8 @@ func (s *sequenceDecs) decodeSyncSimple(hist []byte) (bool, error) { s.seqSize += ctx.litRemain if s.seqSize > maxBlockSize { - return true, fmt.Errorf("output (%d) bigger than max block size (%d)", s.seqSize, maxBlockSize) + return true, fmt.Errorf("output bigger than max block size (%d)", maxBlockSize) + } err := br.close() if err != nil { @@ -289,7 +290,7 @@ func (s *sequenceDecs) decode(seqs []seqVals) error { s.seqSize += ctx.litRemain if s.seqSize > maxBlockSize { - return fmt.Errorf("output (%d) bigger than max block size (%d)", s.seqSize, maxBlockSize) + return fmt.Errorf("output bigger than max block size (%d)", maxBlockSize) } err := br.close() if err != nil { diff --git a/zstd/seqdec_generic.go b/zstd/seqdec_generic.go index c3452bc3a9..ac2a80d291 100644 --- a/zstd/seqdec_generic.go +++ b/zstd/seqdec_generic.go @@ -111,7 +111,7 @@ func (s *sequenceDecs) decode(seqs []seqVals) error { } s.seqSize += ll + ml if s.seqSize > maxBlockSize { - return fmt.Errorf("output (%d) bigger than max block size (%d)", s.seqSize, maxBlockSize) + return fmt.Errorf("output bigger than max block size (%d)", maxBlockSize) } litRemain -= ll if litRemain < 0 { @@ -149,7 +149,7 @@ func (s *sequenceDecs) decode(seqs []seqVals) error { } s.seqSize += litRemain if s.seqSize > maxBlockSize { - return fmt.Errorf("output (%d) bigger than max block size (%d)", s.seqSize, maxBlockSize) + return fmt.Errorf("output bigger than max block size (%d)", maxBlockSize) } err := br.close() if err != nil {