Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

zstd: Tweak decoder allocs. #680

Merged
merged 1 commit into from Oct 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion zstd/blockdec.go
Expand Up @@ -232,7 +232,7 @@ func (b *blockDec) decodeBuf(hist *history) error {
if b.lowMem {
b.dst = make([]byte, b.RLESize)
} else {
b.dst = make([]byte, maxBlockSize)
b.dst = make([]byte, maxCompressedBlockSize)
}
}
b.dst = b.dst[:b.RLESize]
Expand Down
2 changes: 1 addition & 1 deletion zstd/decoder.go
Expand Up @@ -770,7 +770,7 @@ func (d *Decoder) startStreamDecoder(ctx context.Context, r io.Reader, output ch
if block.lowMem {
block.dst = make([]byte, block.RLESize)
} else {
block.dst = make([]byte, maxBlockSize)
block.dst = make([]byte, maxCompressedBlockSize)
}
}
block.dst = block.dst[:block.RLESize]
Expand Down
63 changes: 61 additions & 2 deletions zstd/decoder_test.go
Expand Up @@ -1249,7 +1249,7 @@ func BenchmarkDecoder_DecoderReset(b *testing.B) {
defer dec.Close()
bench := func(name string, b *testing.B, opts []DOption, in, want []byte) {
b.Helper()
buf := &bytesReader{Reader: bytes.NewReader(in), buf: in}
buf := newBytesReader(in)
dec, err := NewReader(nil, opts...)
if err != nil {
b.Fatal(err)
Expand Down Expand Up @@ -1298,6 +1298,11 @@ func BenchmarkDecoder_DecoderReset(b *testing.B) {
}
}

// newBytesReader returns a *bytes.Reader that also supports Bytes() []byte
func newBytesReader(b []byte) *bytesReader {
return &bytesReader{Reader: bytes.NewReader(b), buf: b}
}

type bytesReader struct {
*bytes.Reader
buf []byte
Expand Down Expand Up @@ -1328,7 +1333,7 @@ func BenchmarkDecoder_DecoderNewNoRead(b *testing.B) {
bench := func(name string, b *testing.B, opts []DOption, in, want []byte) {
b.Helper()
b.Run(name, func(b *testing.B) {
buf := &bytesReader{Reader: bytes.NewReader(in), buf: in}
buf := newBytesReader(in)
b.SetBytes(1)
b.ReportAllocs()
b.ResetTimer()
Expand Down Expand Up @@ -1372,6 +1377,60 @@ func BenchmarkDecoder_DecoderNewNoRead(b *testing.B) {
}
}

func BenchmarkDecoder_DecoderNewSomeRead(b *testing.B) {
var buf [1 << 20]byte
bench := func(name string, b *testing.B, opts []DOption, in *os.File) {
b.Helper()
b.Run(name, func(b *testing.B) {
//b.ReportAllocs()
b.ResetTimer()
var heapTotal int64
var m runtime.MemStats
for i := 0; i < b.N; i++ {
runtime.GC()
runtime.ReadMemStats(&m)
heapTotal -= int64(m.HeapInuse)
_, err := in.Seek(io.SeekStart, 0)
if err != nil {
b.Fatal(err)
}

dec, err := NewReader(in, opts...)
if err != nil {
b.Fatal(err)
}
// Read 16 MB
_, err = io.CopyBuffer(io.Discard, io.LimitReader(dec, 16<<20), buf[:])
if err != nil {
b.Fatal(err)
}
runtime.GC()
runtime.ReadMemStats(&m)
heapTotal += int64(m.HeapInuse)

dec.Close()
}
b.ReportMetric(float64(heapTotal)/float64(b.N), "b/op")
})
}
files := []string{"testdata/000002.map.win32K.zst", "testdata/000002.map.win1MB.zst", "testdata/000002.map.win8MB.zst"}
for _, file := range files {
if !strings.HasSuffix(file, ".zst") {
continue
}
r, err := os.Open(file)
if err != nil {
b.Fatal(err)
}
defer r.Close()

b.Run(file, func(b *testing.B) {
bench("stream-single", b, []DOption{WithDecodeBuffersBelow(0), WithDecoderConcurrency(1)}, r)
bench("stream-single-himem", b, []DOption{WithDecodeBuffersBelow(0), WithDecoderConcurrency(1), WithDecoderLowmem(false)}, r)
})
}
}

func BenchmarkDecoder_DecodeAll(b *testing.B) {
zr := testCreateZipReader("testdata/benchdecoder.zip", b)
dec, err := NewReader(nil, WithDecoderConcurrency(1))
Expand Down
2 changes: 1 addition & 1 deletion zstd/enc_fast.go
Expand Up @@ -304,7 +304,7 @@ func (e *fastEncoder) EncodeNoHist(blk *blockEnc, src []byte) {
minNonLiteralBlockSize = 1 + 1 + inputMargin
)
if debugEncoder {
if len(src) > maxBlockSize {
if len(src) > maxCompressedBlockSize {
panic("src too big")
}
}
Expand Down
11 changes: 8 additions & 3 deletions zstd/framedec.go
Expand Up @@ -261,11 +261,16 @@ func (d *frameDec) reset(br byteBuffer) error {
}
d.history.windowSize = int(d.WindowSize)
if !d.o.lowMem || d.history.windowSize < maxBlockSize {
// Alloc 2x window size if not low-mem, or very small window size.
// Alloc 2x window size if not low-mem, or window size below 2MB.
d.history.allocFrameBuffer = d.history.windowSize * 2
} else {
// Alloc with one additional block
d.history.allocFrameBuffer = d.history.windowSize + maxBlockSize
if d.o.lowMem {
// Alloc with 1MB extra.
d.history.allocFrameBuffer = d.history.windowSize + maxBlockSize/2
} else {
// Alloc with 2MB extra.
d.history.allocFrameBuffer = d.history.windowSize + maxBlockSize
}
}

if debugDecoder {
Expand Down
23 changes: 18 additions & 5 deletions zstd/fuzz_test.go
Expand Up @@ -50,7 +50,7 @@ func FuzzDecodeAll(f *testing.F) {
})
}

func FuzzDecodeAllNoBMI2(f *testing.F) {
func FuzzDecAllNoBMI2(f *testing.F) {
if !cpuinfo.HasBMI2() {
f.Skip("No BMI, so already tested")
return
Expand All @@ -62,18 +62,20 @@ func FuzzDecodeAllNoBMI2(f *testing.F) {
func FuzzDecoder(f *testing.F) {
fuzz.AddFromZip(f, "testdata/fuzz/decode-corpus-raw.zip", true, testing.Short())
fuzz.AddFromZip(f, "testdata/fuzz/decode-corpus-encoded.zip", false, testing.Short())
decLow, err := NewReader(nil, WithDecoderLowmem(true), WithDecoderConcurrency(2), WithDecoderMaxMemory(20<<20), WithDecoderMaxWindow(1<<20), IgnoreChecksum(true))
decLow, err := NewReader(nil, WithDecoderLowmem(true), WithDecoderConcurrency(2), WithDecoderMaxMemory(20<<20), WithDecoderMaxWindow(1<<20), IgnoreChecksum(true), WithDecodeBuffersBelow(8<<10))
if err != nil {
f.Fatal(err)
}
defer decLow.Close()
// Test with high memory, but sync decoding
decHi, err := NewReader(nil, WithDecoderLowmem(false), WithDecoderConcurrency(1), WithDecoderMaxMemory(20<<20), WithDecoderMaxWindow(1<<20), IgnoreChecksum(true))
decHi, err := NewReader(nil, WithDecoderLowmem(false), WithDecoderConcurrency(1), WithDecoderMaxMemory(20<<20), WithDecoderMaxWindow(1<<20), IgnoreChecksum(true), WithDecodeBuffersBelow(8<<10))
if err != nil {
f.Fatal(err)
}
defer decHi.Close()

brLow := newBytesReader(nil)
brHi := newBytesReader(nil)
f.Fuzz(func(t *testing.T, b []byte) {
// Just test if we crash...
defer func() {
Expand All @@ -82,11 +84,13 @@ func FuzzDecoder(f *testing.F) {
t.Fatal(r)
}
}()
err := decLow.Reset(io.NopCloser(bytes.NewReader(b)))
brLow.Reset(b)
brHi.Reset(b)
err := decLow.Reset(brLow)
if err != nil {
t.Fatal(err)
}
err = decHi.Reset(io.NopCloser(bytes.NewReader(b)))
err = decHi.Reset(brHi)
if err != nil {
t.Fatal(err)
}
Expand All @@ -104,6 +108,15 @@ func FuzzDecoder(f *testing.F) {
})
}

func FuzzNoBMI2Dec(f *testing.F) {
if !cpuinfo.HasBMI2() {
f.Skip("No BMI, so already tested")
return
}
defer cpuinfo.DisableBMI2()()
FuzzDecoder(f)
}

func FuzzEncoding(f *testing.F) {
fuzz.AddFromZip(f, "testdata/fuzz/encode-corpus-raw.zip", true, testing.Short())
fuzz.AddFromZip(f, "testdata/comp-crashers.zip", true, false)
Expand Down
Binary file modified zstd/testdata/fuzz/decode-corpus-encoded.zip
Binary file not shown.