From 48fa88eda75a34970d8572916b2c99abf308f22a Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Sat, 9 Nov 2019 15:21:09 +0100 Subject: [PATCH 1/2] Add stateless gzip/deflate StatelessCompression will do compression but without maintaining any state between Write calls. There will be no memory kept between Write calls, but compression and speed will be suboptimal. Because of this, the size of actual Write calls will affect output size. --- flate/deflate.go | 2 - flate/stateless.go | 247 +++++++++++++++++++++++++++++++++++++++++++++ gzip/gzip.go | 30 ++++-- gzip/gzip_test.go | 2 + 4 files changed, 273 insertions(+), 8 deletions(-) create mode 100644 flate/stateless.go diff --git a/flate/deflate.go b/flate/deflate.go index 50d2ddb38e..20c94f5968 100644 --- a/flate/deflate.go +++ b/flate/deflate.go @@ -322,8 +322,6 @@ func (d *compressor) writeStoredBlock(buf []byte) error { return d.w.err } -const hashmul = 0x1e35a7bd - // hash4 returns a hash representation of the first 4 bytes // of the supplied slice. // The caller must ensure that len(b) >= 4. diff --git a/flate/stateless.go b/flate/stateless.go new file mode 100644 index 0000000000..0648bb74b9 --- /dev/null +++ b/flate/stateless.go @@ -0,0 +1,247 @@ +package flate + +import ( + "io" + "math" +) + +const ( + maxStatelessBlock = math.MaxInt16 + + slTableBits = 13 + slTableSize = 1 << slTableBits + slTableShift = 32 - slTableBits +) + +type statelessWriter struct { + dst io.Writer + closed bool +} + +func (s *statelessWriter) Close() error { + if s.closed { + return nil + } + s.closed = true + // Emit EOF block + return StatelessDeflate(s.dst, nil, true) +} + +func (s *statelessWriter) Write(p []byte) (n int, err error) { + err = StatelessDeflate(s.dst, p, false) + if err != nil { + return 0, err + } + return len(p), nil +} + +// NewStatelessWriter will do compression but without maintaining any state +// between Write calls. +// There will be no memory kept between Write calls, +// but compression and speed will be suboptimal. +// Because of this, the size of actual Write calls will affect output size. +func NewStatelessWriter(dst io.Writer) io.WriteCloser { + return &statelessWriter{dst: dst} +} + +// StatelessDeflate allows to compress directly to a Writer without retaining state. +// When returning everything will be flushed. +func StatelessDeflate(out io.Writer, in []byte, eof bool) error { + var dst tokens + bw := newHuffmanBitWriter(out) + if eof && len(in) == 0 { + // Just write an EOF block. + // Could be faster... + bw.writeStoredHeader(0, true) + bw.flush() + return bw.err + } + + for len(in) > 0 { + todo := in + if len(todo) > maxStatelessBlock { + todo = todo[:maxStatelessBlock] + } + in = in[len(todo):] + // Compress + statelessEnc(&dst, todo) + isEof := eof && len(in) == 0 + + if dst.n == 0 { + bw.writeStoredHeader(len(todo), isEof) + if bw.err != nil { + return bw.err + } + bw.writeBytes(todo) + } else if int(dst.n) > len(todo)-len(todo)>>4 { + // If we removed less than 1/16th, huffman compress the block. + bw.writeBlockHuff(isEof, todo, false) + } else { + bw.writeBlockDynamic(&dst, isEof, todo, false) + } + if bw.err != nil { + return bw.err + } + dst.Reset() + } + if !eof { + // Align. + bw.writeStoredHeader(0, false) + } + bw.flush() + return bw.err +} + +func hashSL(u uint32) uint32 { + return (u * 0x1e35a7bd) >> slTableShift +} + +func load3216(b []byte, i int16) uint32 { + // Help the compiler eliminate bounds checks on the read so it can be done in a single read. + b = b[i:] + b = b[:4] + return uint32(b[0]) | uint32(b[1])<<8 | uint32(b[2])<<16 | uint32(b[3])<<24 +} + +func load6416(b []byte, i int16) uint64 { + // Help the compiler eliminate bounds checks on the read so it can be done in a single read. + b = b[i:] + b = b[:8] + return uint64(b[0]) | uint64(b[1])<<8 | uint64(b[2])<<16 | uint64(b[3])<<24 | + uint64(b[4])<<32 | uint64(b[5])<<40 | uint64(b[6])<<48 | uint64(b[7])<<56 +} + +func statelessEnc(dst *tokens, src []byte) { + const ( + inputMargin = 12 - 1 + minNonLiteralBlockSize = 1 + 1 + inputMargin + ) + + type tableEntry struct { + offset int16 + } + + var table [slTableSize]tableEntry + + // This check isn't in the Snappy implementation, but there, the caller + // instead of the callee handles this case. + if len(src) < minNonLiteralBlockSize { + // We do not fill the token table. + // This will be picked up by caller. + dst.n = uint16(len(src)) + return + } + + s := int16(1) + nextEmit := int16(0) + // sLimit is when to stop looking for offset/length copies. The inputMargin + // lets us use a fast path for emitLiteral in the main loop, while we are + // looking for copies. + sLimit := int16(len(src) - inputMargin) + + // nextEmit is where in src the next emitLiteral should start from. + cv := load3216(src, s) + + for { + const skipLog = 5 + const doEvery = 2 + + nextS := s + var candidate tableEntry + for { + nextHash := hashSL(cv) + candidate = table[nextHash] + nextS = s + doEvery + (s-nextEmit)>>skipLog + if nextS > sLimit || nextS <= 0 { + goto emitRemainder + } + + now := load6416(src, nextS) + table[nextHash] = tableEntry{offset: s} + nextHash = hashSL(uint32(now)) + + if cv == load3216(src, candidate.offset) { + table[nextHash] = tableEntry{offset: nextS} + break + } + + // Do one right away... + cv = uint32(now) + s = nextS + nextS++ + candidate = table[nextHash] + now >>= 8 + table[nextHash] = tableEntry{offset: s} + + if cv == load3216(src, candidate.offset) { + table[nextHash] = tableEntry{offset: nextS} + break + } + cv = uint32(now) + s = nextS + } + + // A 4-byte match has been found. We'll later see if more than 4 bytes + // match. But, prior to the match, src[nextEmit:s] are unmatched. Emit + // them as literal bytes. + for { + // Invariant: we have a 4-byte match at s, and no need to emit any + // literal bytes prior to s. + + // Extend the 4-byte match as long as possible. + t := candidate.offset + l := int16(matchLen(src[s+4:], src[t+4:]) + 4) + + // Extend backwards + for t > 0 && s > nextEmit && src[t-1] == src[s-1] { + s-- + t-- + l++ + } + if nextEmit < s { + emitLiteral(dst, src[nextEmit:s]) + } + + // Save the match found + dst.AddMatchLong(int32(l), uint32(s-t-baseMatchOffset)) + s += l + nextEmit = s + if nextS >= s { + s = nextS + 1 + } + if s >= sLimit { + goto emitRemainder + } + + // We could immediately start working at s now, but to improve + // compression we first update the hash table at s-2 and at s. If + // another emitCopy is not our next move, also calculate nextHash + // at s+1. At least on GOARCH=amd64, these three hash calculations + // are faster as one load64 call (with some shifts) instead of + // three load32 calls. + x := load6416(src, s-2) + o := s - 2 + prevHash := hashSL(uint32(x)) + table[prevHash] = tableEntry{offset: o} + x >>= 16 + currHash := hashSL(uint32(x)) + candidate = table[currHash] + table[currHash] = tableEntry{offset: o + 2} + + if uint32(x) != load3216(src, candidate.offset) { + cv = uint32(x >> 8) + s++ + break + } + } + } + +emitRemainder: + if int(nextEmit) < len(src) { + // If nothing was added, don't encode literals. + if dst.n == 0 { + return + } + emitLiteral(dst, src[nextEmit:]) + } +} diff --git a/gzip/gzip.go b/gzip/gzip.go index 7da7ee7486..ed0cc148f8 100644 --- a/gzip/gzip.go +++ b/gzip/gzip.go @@ -22,6 +22,13 @@ const ( DefaultCompression = flate.DefaultCompression ConstantCompression = flate.ConstantCompression HuffmanOnly = flate.HuffmanOnly + + // StatelessCompression will do compression but without maintaining any state + // between Write calls. + // There will be no memory kept between Write calls, + // but compression and speed will be suboptimal. + // Because of this, the size of actual Write calls will affect output size. + StatelessCompression = -3 ) // A Writer is an io.WriteCloser. @@ -59,7 +66,7 @@ func NewWriter(w io.Writer) *Writer { // integer value between BestSpeed and BestCompression inclusive. The error // returned will be nil if the level is valid. func NewWriterLevel(w io.Writer, level int) (*Writer, error) { - if level < HuffmanOnly || level > BestCompression { + if level < StatelessCompression || level > BestCompression { return nil, fmt.Errorf("gzip: invalid compression level: %d", level) } z := new(Writer) @@ -69,9 +76,12 @@ func NewWriterLevel(w io.Writer, level int) (*Writer, error) { func (z *Writer) init(w io.Writer, level int) { compressor := z.compressor - if compressor != nil { - compressor.Reset(w) + if level != StatelessCompression { + if compressor != nil { + compressor.Reset(w) + } } + *z = Writer{ Header: Header{ OS: 255, // unknown @@ -189,12 +199,16 @@ func (z *Writer) Write(p []byte) (int, error) { return n, z.err } } - if z.compressor == nil { + + if z.compressor == nil && z.level != StatelessCompression { z.compressor, _ = flate.NewWriter(z.w, z.level) } } z.size += uint32(len(p)) z.digest = crc32.Update(z.digest, crc32.IEEETable, p) + if z.level == StatelessCompression { + return len(p), flate.StatelessDeflate(z.w, p, false) + } n, z.err = z.compressor.Write(p) return n, z.err } @@ -211,7 +225,7 @@ func (z *Writer) Flush() error { if z.err != nil { return z.err } - if z.closed { + if z.closed || z.level == StatelessCompression { return nil } if !z.wroteHeader { @@ -240,7 +254,11 @@ func (z *Writer) Close() error { return z.err } } - z.err = z.compressor.Close() + if z.level == StatelessCompression { + z.err = flate.StatelessDeflate(z.w, nil, true) + } else { + z.err = z.compressor.Close() + } if z.err != nil { return z.err } diff --git a/gzip/gzip_test.go b/gzip/gzip_test.go index b18bb54b04..135637f7c1 100644 --- a/gzip/gzip_test.go +++ b/gzip/gzip_test.go @@ -276,6 +276,7 @@ func testFile(i, level int, t *testing.T) { } } +func TestFile1xM3(t *testing.T) { testFile(1, -3, t) } func TestFile1xM2(t *testing.T) { testFile(1, -2, t) } func TestFile1xM1(t *testing.T) { testFile(1, -1, t) } func TestFile1x0(t *testing.T) { testFile(1, 0, t) } @@ -444,6 +445,7 @@ func testDeterm(i int, t *testing.T) { } } +func BenchmarkGzipLM3(b *testing.B) { benchmarkGzipN(b, -3) } func BenchmarkGzipLM2(b *testing.B) { benchmarkGzipN(b, -2) } func BenchmarkGzipL1(b *testing.B) { benchmarkGzipN(b, 1) } func BenchmarkGzipL2(b *testing.B) { benchmarkGzipN(b, 2) } From 5859158fdbce9ce4c5c300e27910b6b23b29e443 Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Sun, 10 Nov 2019 12:19:54 +0100 Subject: [PATCH 2/2] Add Reset + benchmark. --- flate/stateless.go | 5 +++++ flate/writer_test.go | 39 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+) diff --git a/flate/stateless.go b/flate/stateless.go index 0648bb74b9..524ee0ae37 100644 --- a/flate/stateless.go +++ b/flate/stateless.go @@ -35,6 +35,11 @@ func (s *statelessWriter) Write(p []byte) (n int, err error) { return len(p), nil } +func (s *statelessWriter) Reset(w io.Writer) { + s.dst = w + s.closed = false +} + // NewStatelessWriter will do compression but without maintaining any state // between Write calls. // There will be no memory kept between Write calls, diff --git a/flate/writer_test.go b/flate/writer_test.go index cba263c525..7ea0aa708b 100644 --- a/flate/writer_test.go +++ b/flate/writer_test.go @@ -144,6 +144,9 @@ func BenchmarkEncodeDigitsDefault1e6(b *testing.B) { benchmarkEncoder(b, digits func BenchmarkEncodeDigitsCompress1e4(b *testing.B) { benchmarkEncoder(b, digits, compress, 1e4) } func BenchmarkEncodeDigitsCompress1e5(b *testing.B) { benchmarkEncoder(b, digits, compress, 1e5) } func BenchmarkEncodeDigitsCompress1e6(b *testing.B) { benchmarkEncoder(b, digits, compress, 1e6) } +func BenchmarkEncodeDigitsSL1e4(b *testing.B) { benchmarkStatelessEncoder(b, digits, 1e4) } +func BenchmarkEncodeDigitsSL1e5(b *testing.B) { benchmarkStatelessEncoder(b, digits, 1e5) } +func BenchmarkEncodeDigitsSL1e6(b *testing.B) { benchmarkStatelessEncoder(b, digits, 1e6) } func BenchmarkEncodeTwainConstant1e4(b *testing.B) { benchmarkEncoder(b, twain, constant, 1e4) } func BenchmarkEncodeTwainConstant1e5(b *testing.B) { benchmarkEncoder(b, twain, constant, 1e5) } func BenchmarkEncodeTwainConstant1e6(b *testing.B) { benchmarkEncoder(b, twain, constant, 1e6) } @@ -156,6 +159,42 @@ func BenchmarkEncodeTwainDefault1e6(b *testing.B) { benchmarkEncoder(b, twain, func BenchmarkEncodeTwainCompress1e4(b *testing.B) { benchmarkEncoder(b, twain, compress, 1e4) } func BenchmarkEncodeTwainCompress1e5(b *testing.B) { benchmarkEncoder(b, twain, compress, 1e5) } func BenchmarkEncodeTwainCompress1e6(b *testing.B) { benchmarkEncoder(b, twain, compress, 1e6) } +func BenchmarkEncodeTwainSL1e4(b *testing.B) { benchmarkStatelessEncoder(b, twain, 1e4) } +func BenchmarkEncodeTwainSL1e5(b *testing.B) { benchmarkStatelessEncoder(b, twain, 1e5) } +func BenchmarkEncodeTwainSL1e6(b *testing.B) { benchmarkStatelessEncoder(b, twain, 1e6) } + +func benchmarkStatelessEncoder(b *testing.B, testfile, n int) { + b.SetBytes(int64(n)) + buf0, err := ioutil.ReadFile(testfiles[testfile]) + if err != nil { + b.Fatal(err) + } + if len(buf0) == 0 { + b.Fatalf("test file %q has no data", testfiles[testfile]) + } + buf1 := make([]byte, n) + for i := 0; i < n; i += len(buf0) { + if len(buf0) > n-i { + buf0 = buf0[:n-i] + } + copy(buf1[i:], buf0) + } + buf0 = nil + runtime.GC() + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + w := NewStatelessWriter(ioutil.Discard) + _, err = w.Write(buf1) + if err != nil { + b.Fatal(err) + } + err = w.Close() + if err != nil { + b.Fatal(err) + } + } +} // A writer that fails after N writes. type errorWriter struct {