Skip to content

Commit

Permalink
Add stateless gzip/deflate (#176)
Browse files Browse the repository at this point in the history
* Add stateless gzip/deflate

StatelessCompression will do compression but without maintaining any state between Write calls.

There will be no memory kept between Write calls, but compression and speed will be suboptimal.

Because of this, the size of actual Write calls will affect output size.
  • Loading branch information
klauspost committed Nov 12, 2019
1 parent c0145a2 commit ba5daf5
Show file tree
Hide file tree
Showing 5 changed files with 317 additions and 8 deletions.
2 changes: 0 additions & 2 deletions flate/deflate.go
Expand Up @@ -322,8 +322,6 @@ func (d *compressor) writeStoredBlock(buf []byte) error {
return d.w.err
}

const hashmul = 0x1e35a7bd

// hash4 returns a hash representation of the first 4 bytes
// of the supplied slice.
// The caller must ensure that len(b) >= 4.
Expand Down
252 changes: 252 additions & 0 deletions flate/stateless.go
@@ -0,0 +1,252 @@
package flate

import (
"io"
"math"
)

const (
maxStatelessBlock = math.MaxInt16

slTableBits = 13
slTableSize = 1 << slTableBits
slTableShift = 32 - slTableBits
)

type statelessWriter struct {
dst io.Writer
closed bool
}

func (s *statelessWriter) Close() error {
if s.closed {
return nil
}
s.closed = true
// Emit EOF block
return StatelessDeflate(s.dst, nil, true)
}

func (s *statelessWriter) Write(p []byte) (n int, err error) {
err = StatelessDeflate(s.dst, p, false)
if err != nil {
return 0, err
}
return len(p), nil
}

func (s *statelessWriter) Reset(w io.Writer) {
s.dst = w
s.closed = false
}

// NewStatelessWriter will do compression but without maintaining any state
// between Write calls.
// There will be no memory kept between Write calls,
// but compression and speed will be suboptimal.
// Because of this, the size of actual Write calls will affect output size.
func NewStatelessWriter(dst io.Writer) io.WriteCloser {
return &statelessWriter{dst: dst}
}

// StatelessDeflate allows to compress directly to a Writer without retaining state.
// When returning everything will be flushed.
func StatelessDeflate(out io.Writer, in []byte, eof bool) error {
var dst tokens
bw := newHuffmanBitWriter(out)
if eof && len(in) == 0 {
// Just write an EOF block.
// Could be faster...
bw.writeStoredHeader(0, true)
bw.flush()
return bw.err
}

for len(in) > 0 {
todo := in
if len(todo) > maxStatelessBlock {
todo = todo[:maxStatelessBlock]
}
in = in[len(todo):]
// Compress
statelessEnc(&dst, todo)
isEof := eof && len(in) == 0

if dst.n == 0 {
bw.writeStoredHeader(len(todo), isEof)
if bw.err != nil {
return bw.err
}
bw.writeBytes(todo)
} else if int(dst.n) > len(todo)-len(todo)>>4 {
// If we removed less than 1/16th, huffman compress the block.
bw.writeBlockHuff(isEof, todo, false)
} else {
bw.writeBlockDynamic(&dst, isEof, todo, false)
}
if bw.err != nil {
return bw.err
}
dst.Reset()
}
if !eof {
// Align.
bw.writeStoredHeader(0, false)
}
bw.flush()
return bw.err
}

func hashSL(u uint32) uint32 {
return (u * 0x1e35a7bd) >> slTableShift
}

func load3216(b []byte, i int16) uint32 {
// Help the compiler eliminate bounds checks on the read so it can be done in a single read.
b = b[i:]
b = b[:4]
return uint32(b[0]) | uint32(b[1])<<8 | uint32(b[2])<<16 | uint32(b[3])<<24
}

func load6416(b []byte, i int16) uint64 {
// Help the compiler eliminate bounds checks on the read so it can be done in a single read.
b = b[i:]
b = b[:8]
return uint64(b[0]) | uint64(b[1])<<8 | uint64(b[2])<<16 | uint64(b[3])<<24 |
uint64(b[4])<<32 | uint64(b[5])<<40 | uint64(b[6])<<48 | uint64(b[7])<<56
}

func statelessEnc(dst *tokens, src []byte) {
const (
inputMargin = 12 - 1
minNonLiteralBlockSize = 1 + 1 + inputMargin
)

type tableEntry struct {
offset int16
}

var table [slTableSize]tableEntry

// This check isn't in the Snappy implementation, but there, the caller
// instead of the callee handles this case.
if len(src) < minNonLiteralBlockSize {
// We do not fill the token table.
// This will be picked up by caller.
dst.n = uint16(len(src))
return
}

s := int16(1)
nextEmit := int16(0)
// sLimit is when to stop looking for offset/length copies. The inputMargin
// lets us use a fast path for emitLiteral in the main loop, while we are
// looking for copies.
sLimit := int16(len(src) - inputMargin)

// nextEmit is where in src the next emitLiteral should start from.
cv := load3216(src, s)

for {
const skipLog = 5
const doEvery = 2

nextS := s
var candidate tableEntry
for {
nextHash := hashSL(cv)
candidate = table[nextHash]
nextS = s + doEvery + (s-nextEmit)>>skipLog
if nextS > sLimit || nextS <= 0 {
goto emitRemainder
}

now := load6416(src, nextS)
table[nextHash] = tableEntry{offset: s}
nextHash = hashSL(uint32(now))

if cv == load3216(src, candidate.offset) {
table[nextHash] = tableEntry{offset: nextS}
break
}

// Do one right away...
cv = uint32(now)
s = nextS
nextS++
candidate = table[nextHash]
now >>= 8
table[nextHash] = tableEntry{offset: s}

if cv == load3216(src, candidate.offset) {
table[nextHash] = tableEntry{offset: nextS}
break
}
cv = uint32(now)
s = nextS
}

// A 4-byte match has been found. We'll later see if more than 4 bytes
// match. But, prior to the match, src[nextEmit:s] are unmatched. Emit
// them as literal bytes.
for {
// Invariant: we have a 4-byte match at s, and no need to emit any
// literal bytes prior to s.

// Extend the 4-byte match as long as possible.
t := candidate.offset
l := int16(matchLen(src[s+4:], src[t+4:]) + 4)

// Extend backwards
for t > 0 && s > nextEmit && src[t-1] == src[s-1] {
s--
t--
l++
}
if nextEmit < s {
emitLiteral(dst, src[nextEmit:s])
}

// Save the match found
dst.AddMatchLong(int32(l), uint32(s-t-baseMatchOffset))
s += l
nextEmit = s
if nextS >= s {
s = nextS + 1
}
if s >= sLimit {
goto emitRemainder
}

// We could immediately start working at s now, but to improve
// compression we first update the hash table at s-2 and at s. If
// another emitCopy is not our next move, also calculate nextHash
// at s+1. At least on GOARCH=amd64, these three hash calculations
// are faster as one load64 call (with some shifts) instead of
// three load32 calls.
x := load6416(src, s-2)
o := s - 2
prevHash := hashSL(uint32(x))
table[prevHash] = tableEntry{offset: o}
x >>= 16
currHash := hashSL(uint32(x))
candidate = table[currHash]
table[currHash] = tableEntry{offset: o + 2}

if uint32(x) != load3216(src, candidate.offset) {
cv = uint32(x >> 8)
s++
break
}
}
}

emitRemainder:
if int(nextEmit) < len(src) {
// If nothing was added, don't encode literals.
if dst.n == 0 {
return
}
emitLiteral(dst, src[nextEmit:])
}
}
39 changes: 39 additions & 0 deletions flate/writer_test.go
Expand Up @@ -144,6 +144,9 @@ func BenchmarkEncodeDigitsDefault1e6(b *testing.B) { benchmarkEncoder(b, digits
func BenchmarkEncodeDigitsCompress1e4(b *testing.B) { benchmarkEncoder(b, digits, compress, 1e4) }
func BenchmarkEncodeDigitsCompress1e5(b *testing.B) { benchmarkEncoder(b, digits, compress, 1e5) }
func BenchmarkEncodeDigitsCompress1e6(b *testing.B) { benchmarkEncoder(b, digits, compress, 1e6) }
func BenchmarkEncodeDigitsSL1e4(b *testing.B) { benchmarkStatelessEncoder(b, digits, 1e4) }
func BenchmarkEncodeDigitsSL1e5(b *testing.B) { benchmarkStatelessEncoder(b, digits, 1e5) }
func BenchmarkEncodeDigitsSL1e6(b *testing.B) { benchmarkStatelessEncoder(b, digits, 1e6) }
func BenchmarkEncodeTwainConstant1e4(b *testing.B) { benchmarkEncoder(b, twain, constant, 1e4) }
func BenchmarkEncodeTwainConstant1e5(b *testing.B) { benchmarkEncoder(b, twain, constant, 1e5) }
func BenchmarkEncodeTwainConstant1e6(b *testing.B) { benchmarkEncoder(b, twain, constant, 1e6) }
Expand All @@ -156,6 +159,42 @@ func BenchmarkEncodeTwainDefault1e6(b *testing.B) { benchmarkEncoder(b, twain,
func BenchmarkEncodeTwainCompress1e4(b *testing.B) { benchmarkEncoder(b, twain, compress, 1e4) }
func BenchmarkEncodeTwainCompress1e5(b *testing.B) { benchmarkEncoder(b, twain, compress, 1e5) }
func BenchmarkEncodeTwainCompress1e6(b *testing.B) { benchmarkEncoder(b, twain, compress, 1e6) }
func BenchmarkEncodeTwainSL1e4(b *testing.B) { benchmarkStatelessEncoder(b, twain, 1e4) }
func BenchmarkEncodeTwainSL1e5(b *testing.B) { benchmarkStatelessEncoder(b, twain, 1e5) }
func BenchmarkEncodeTwainSL1e6(b *testing.B) { benchmarkStatelessEncoder(b, twain, 1e6) }

func benchmarkStatelessEncoder(b *testing.B, testfile, n int) {
b.SetBytes(int64(n))
buf0, err := ioutil.ReadFile(testfiles[testfile])
if err != nil {
b.Fatal(err)
}
if len(buf0) == 0 {
b.Fatalf("test file %q has no data", testfiles[testfile])
}
buf1 := make([]byte, n)
for i := 0; i < n; i += len(buf0) {
if len(buf0) > n-i {
buf0 = buf0[:n-i]
}
copy(buf1[i:], buf0)
}
buf0 = nil
runtime.GC()
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
w := NewStatelessWriter(ioutil.Discard)
_, err = w.Write(buf1)
if err != nil {
b.Fatal(err)
}
err = w.Close()
if err != nil {
b.Fatal(err)
}
}
}

// A writer that fails after N writes.
type errorWriter struct {
Expand Down
30 changes: 24 additions & 6 deletions gzip/gzip.go
Expand Up @@ -22,6 +22,13 @@ const (
DefaultCompression = flate.DefaultCompression
ConstantCompression = flate.ConstantCompression
HuffmanOnly = flate.HuffmanOnly

// StatelessCompression will do compression but without maintaining any state
// between Write calls.
// There will be no memory kept between Write calls,
// but compression and speed will be suboptimal.
// Because of this, the size of actual Write calls will affect output size.
StatelessCompression = -3
)

// A Writer is an io.WriteCloser.
Expand Down Expand Up @@ -59,7 +66,7 @@ func NewWriter(w io.Writer) *Writer {
// integer value between BestSpeed and BestCompression inclusive. The error
// returned will be nil if the level is valid.
func NewWriterLevel(w io.Writer, level int) (*Writer, error) {
if level < HuffmanOnly || level > BestCompression {
if level < StatelessCompression || level > BestCompression {
return nil, fmt.Errorf("gzip: invalid compression level: %d", level)
}
z := new(Writer)
Expand All @@ -69,9 +76,12 @@ func NewWriterLevel(w io.Writer, level int) (*Writer, error) {

func (z *Writer) init(w io.Writer, level int) {
compressor := z.compressor
if compressor != nil {
compressor.Reset(w)
if level != StatelessCompression {
if compressor != nil {
compressor.Reset(w)
}
}

*z = Writer{
Header: Header{
OS: 255, // unknown
Expand Down Expand Up @@ -189,12 +199,16 @@ func (z *Writer) Write(p []byte) (int, error) {
return n, z.err
}
}
if z.compressor == nil {

if z.compressor == nil && z.level != StatelessCompression {
z.compressor, _ = flate.NewWriter(z.w, z.level)
}
}
z.size += uint32(len(p))
z.digest = crc32.Update(z.digest, crc32.IEEETable, p)
if z.level == StatelessCompression {
return len(p), flate.StatelessDeflate(z.w, p, false)
}
n, z.err = z.compressor.Write(p)
return n, z.err
}
Expand All @@ -211,7 +225,7 @@ func (z *Writer) Flush() error {
if z.err != nil {
return z.err
}
if z.closed {
if z.closed || z.level == StatelessCompression {
return nil
}
if !z.wroteHeader {
Expand Down Expand Up @@ -240,7 +254,11 @@ func (z *Writer) Close() error {
return z.err
}
}
z.err = z.compressor.Close()
if z.level == StatelessCompression {
z.err = flate.StatelessDeflate(z.w, nil, true)
} else {
z.err = z.compressor.Close()
}
if z.err != nil {
return z.err
}
Expand Down

0 comments on commit ba5daf5

Please sign in to comment.