Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

zstd: Copy literal in 16 byte blocks when possible #592

Merged
merged 1 commit into from May 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
28 changes: 27 additions & 1 deletion zstd/_generate/gen.go
Expand Up @@ -66,6 +66,8 @@ func main() {
safeMem: false,
}
exec.generateProcedure("sequenceDecs_executeSimple_amd64")
exec.safeMem = true
exec.generateProcedure("sequenceDecs_executeSimple_safe_amd64")

decodeSync := decodeSync{}
decodeSync.setBMI2(false)
Expand Down Expand Up @@ -1032,7 +1034,11 @@ func (e executeSimple) executeSingleTriple(c *executeSingleTripleContext, handle
TESTQ(ll, ll)
JZ(LabelRef("check_offset"))
// TODO: Investigate if it is possible to consistently overallocate literals.
e.copyMemoryPrecise("1", c.literals, c.outBase, ll)
if e.safeMem {
e.copyMemoryPrecise("1", c.literals, c.outBase, ll)
} else {
e.copyMemoryND("1", c.literals, c.outBase, ll)
}
ADDQ(ll, c.literals)
ADDQ(ll, c.outBase)
ADDQ(ll, c.outPosition)
Expand Down Expand Up @@ -1188,6 +1194,26 @@ func (e executeSimple) copyMemory(suffix string, src, dst, length reg.GPVirtual)
JHI(LabelRef(label))
}

// copyMemoryND will copy memory in blocks of 16 bytes,
// overwriting up to 15 extra bytes.
// All parameters are preserved.
func (e executeSimple) copyMemoryND(suffix string, src, dst, length reg.GPVirtual) {
label := "copy_" + suffix

ofs := GP64()
s := Mem{Base: src, Index: ofs, Scale: 1}
d := Mem{Base: dst, Index: ofs, Scale: 1}

XORQ(ofs, ofs)
Label(label)
t := XMM()
MOVUPS(s, t)
MOVUPS(t, d)
ADDQ(U8(16), ofs)
CMPQ(ofs, length)
JB(LabelRef(label))
}

// copyMemoryPrecise will copy memory in blocks of 16 bytes,
// without overwriting nor overreading.
func (e executeSimple) copyMemoryPrecise(suffix string, src, dst, length reg.GPVirtual) {
Expand Down
30 changes: 12 additions & 18 deletions zstd/blockdec.go
Expand Up @@ -49,11 +49,8 @@ const (
// Maximum possible block size (all Raw+Uncompressed).
maxBlockSize = (1 << 21) - 1

// https://github.com/facebook/zstd/blob/dev/doc/zstd_compression_format.md#literals_section_header
maxCompressedLiteralSize = 1 << 18
maxRLELiteralSize = 1 << 20
maxMatchLen = 131074
maxSequences = 0x7f00 + 0xffff
maxMatchLen = 131074
maxSequences = 0x7f00 + 0xffff

// We support slightly less than the reference decoder to be able to
// use ints on 32 bit archs.
Expand Down Expand Up @@ -368,14 +365,9 @@ func (b *blockDec) decodeLiterals(in []byte, hist *history) (remain []byte, err
}
if cap(b.literalBuf) < litRegenSize {
if b.lowMem {
b.literalBuf = make([]byte, litRegenSize)
b.literalBuf = make([]byte, litRegenSize, litRegenSize+compressedBlockOverAlloc)
} else {
if litRegenSize > maxCompressedLiteralSize {
// Exceptional
b.literalBuf = make([]byte, litRegenSize)
} else {
b.literalBuf = make([]byte, litRegenSize, maxCompressedLiteralSize)
}
b.literalBuf = make([]byte, litRegenSize, maxCompressedBlockSize+compressedBlockOverAlloc)
}
}
literals = b.literalBuf[:litRegenSize]
Expand Down Expand Up @@ -405,14 +397,14 @@ func (b *blockDec) decodeLiterals(in []byte, hist *history) (remain []byte, err
// Ensure we have space to store it.
if cap(b.literalBuf) < litRegenSize {
if b.lowMem {
b.literalBuf = make([]byte, 0, litRegenSize)
b.literalBuf = make([]byte, 0, litRegenSize+compressedBlockOverAlloc)
} else {
b.literalBuf = make([]byte, 0, maxCompressedLiteralSize)
b.literalBuf = make([]byte, 0, maxCompressedBlockSize+compressedBlockOverAlloc)
}
}
var err error
// Use our out buffer.
huff.MaxDecodedSize = maxCompressedBlockSize
huff.MaxDecodedSize = litRegenSize
if fourStreams {
literals, err = huff.Decoder().Decompress4X(b.literalBuf[:0:litRegenSize], literals)
} else {
Expand All @@ -437,9 +429,9 @@ func (b *blockDec) decodeLiterals(in []byte, hist *history) (remain []byte, err
// Ensure we have space to store it.
if cap(b.literalBuf) < litRegenSize {
if b.lowMem {
b.literalBuf = make([]byte, 0, litRegenSize)
b.literalBuf = make([]byte, 0, litRegenSize+compressedBlockOverAlloc)
} else {
b.literalBuf = make([]byte, 0, maxCompressedBlockSize)
b.literalBuf = make([]byte, 0, maxCompressedBlockSize+compressedBlockOverAlloc)
}
}
huff := hist.huffTree
Expand All @@ -456,7 +448,7 @@ func (b *blockDec) decodeLiterals(in []byte, hist *history) (remain []byte, err
return in, err
}
hist.huffTree = huff
huff.MaxDecodedSize = maxCompressedBlockSize
huff.MaxDecodedSize = litRegenSize
// Use our out buffer.
if fourStreams {
literals, err = huff.Decoder().Decompress4X(b.literalBuf[:0:litRegenSize], literals)
Expand All @@ -471,6 +463,8 @@ func (b *blockDec) decodeLiterals(in []byte, hist *history) (remain []byte, err
if len(literals) != litRegenSize {
return in, fmt.Errorf("literal output size mismatch want %d, got %d", litRegenSize, len(literals))
}
// Re-cap to get extra size.
literals = b.literalBuf[:len(literals)]
if debugDecoder {
printf("Decompressed %d literals into %d bytes\n", litCompSize, litRegenSize)
}
Expand Down
71 changes: 58 additions & 13 deletions zstd/decoder_test.go
Expand Up @@ -1402,12 +1402,7 @@ func benchmarkDecoderWithFile(path string, b *testing.B) {
if err != nil {
b.Fatal(err)
}
dec, err := NewReader(nil, WithDecoderLowmem(false))
if err != nil {
b.Fatal(err)
}
defer dec.Close()
err = dec.Reset(bytes.NewBuffer(data))
dec, err := NewReader(bytes.NewBuffer(data), WithDecoderLowmem(false), WithDecoderConcurrency(1))
if err != nil {
b.Fatal(err)
}
Expand All @@ -1416,19 +1411,69 @@ func benchmarkDecoderWithFile(path string, b *testing.B) {
b.Fatal(err)
}

b.SetBytes(n)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err = dec.Reset(bytes.NewBuffer(data))
b.Run("multithreaded-writer", func(b *testing.B) {
dec, err := NewReader(nil)
if err != nil {
b.Fatal(err)
}
_, err := io.CopyN(ioutil.Discard, dec, n)

b.SetBytes(n)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err = dec.Reset(bytes.NewBuffer(data))
if err != nil {
b.Fatal(err)
}
_, err := io.CopyN(ioutil.Discard, dec, n)
if err != nil {
b.Fatal(err)
}
}
})

b.Run("singlethreaded-writer", func(b *testing.B) {
dec, err := NewReader(nil, WithDecoderConcurrency(1))
if err != nil {
b.Fatal(err)
}
}

b.SetBytes(n)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err = dec.Reset(bytes.NewBuffer(data))
if err != nil {
b.Fatal(err)
}
_, err := io.CopyN(ioutil.Discard, dec, n)
if err != nil {
b.Fatal(err)
}
}
})

b.Run("singlethreaded-writerto", func(b *testing.B) {
dec, err := NewReader(nil, WithDecoderConcurrency(1))
if err != nil {
b.Fatal(err)
}

b.SetBytes(n)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err = dec.Reset(bytes.NewBuffer(data))
if err != nil {
b.Fatal(err)
}
// io.Copy will use io.WriterTo
_, err := io.Copy(ioutil.Discard, dec)
if err != nil {
b.Fatal(err)
}
}
})
}

func BenchmarkDecoderSilesia(b *testing.B) {
Expand Down
16 changes: 14 additions & 2 deletions zstd/seqdec_amd64.go
Expand Up @@ -62,6 +62,10 @@ func (s *sequenceDecs) decodeSyncSimple(hist []byte) (bool, error) {
if s.maxSyncLen > 0 && cap(s.out)-len(s.out)-compressedBlockOverAlloc < int(s.maxSyncLen) {
useSafe = true
}
if cap(s.literals) < len(s.literals)+compressedBlockOverAlloc {
useSafe = true
}

br := s.br

maxBlockSize := maxCompressedBlockSize
Expand Down Expand Up @@ -301,6 +305,10 @@ type executeAsmContext struct {
//go:noescape
func sequenceDecs_executeSimple_amd64(ctx *executeAsmContext) bool

// Same as above, but with safe memcopies
//go:noescape
func sequenceDecs_executeSimple_safe_amd64(ctx *executeAsmContext) bool

// executeSimple handles cases when dictionary is not used.
func (s *sequenceDecs) executeSimple(seqs []seqVals, hist []byte) error {
// Ensure we have enough output size...
Expand All @@ -327,8 +335,12 @@ func (s *sequenceDecs) executeSimple(seqs []seqVals, hist []byte) error {
literals: s.literals,
windowSize: s.windowSize,
}

ok := sequenceDecs_executeSimple_amd64(&ctx)
var ok bool
if cap(s.literals) < len(s.literals)+compressedBlockOverAlloc {
ok = sequenceDecs_executeSimple_safe_amd64(&ctx)
} else {
ok = sequenceDecs_executeSimple_amd64(&ctx)
}
if !ok {
return fmt.Errorf("match offset (%d) bigger than current history (%d)",
seqs[ctx.seqIndex].mo, ctx.outPosition+len(hist))
Expand Down