Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add stateless gzip/deflate #176

Merged
merged 3 commits into from Nov 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 0 additions & 2 deletions flate/deflate.go
Expand Up @@ -322,8 +322,6 @@ func (d *compressor) writeStoredBlock(buf []byte) error {
return d.w.err
}

const hashmul = 0x1e35a7bd

// hash4 returns a hash representation of the first 4 bytes
// of the supplied slice.
// The caller must ensure that len(b) >= 4.
Expand Down
252 changes: 252 additions & 0 deletions flate/stateless.go
@@ -0,0 +1,252 @@
package flate

import (
"io"
"math"
)

const (
maxStatelessBlock = math.MaxInt16

slTableBits = 13
slTableSize = 1 << slTableBits
slTableShift = 32 - slTableBits
)

type statelessWriter struct {
dst io.Writer
closed bool
}

func (s *statelessWriter) Close() error {
if s.closed {
return nil
}
s.closed = true
// Emit EOF block
return StatelessDeflate(s.dst, nil, true)
}

func (s *statelessWriter) Write(p []byte) (n int, err error) {
err = StatelessDeflate(s.dst, p, false)
if err != nil {
return 0, err
}
return len(p), nil
}

func (s *statelessWriter) Reset(w io.Writer) {
s.dst = w
s.closed = false
}

// NewStatelessWriter will do compression but without maintaining any state
// between Write calls.
// There will be no memory kept between Write calls,
// but compression and speed will be suboptimal.
// Because of this, the size of actual Write calls will affect output size.
func NewStatelessWriter(dst io.Writer) io.WriteCloser {
return &statelessWriter{dst: dst}
}

// StatelessDeflate allows to compress directly to a Writer without retaining state.
// When returning everything will be flushed.
func StatelessDeflate(out io.Writer, in []byte, eof bool) error {
var dst tokens
bw := newHuffmanBitWriter(out)
if eof && len(in) == 0 {
// Just write an EOF block.
// Could be faster...
bw.writeStoredHeader(0, true)
bw.flush()
return bw.err
}

for len(in) > 0 {
todo := in
if len(todo) > maxStatelessBlock {
todo = todo[:maxStatelessBlock]
}
in = in[len(todo):]
// Compress
statelessEnc(&dst, todo)
isEof := eof && len(in) == 0

if dst.n == 0 {
bw.writeStoredHeader(len(todo), isEof)
if bw.err != nil {
return bw.err
}
bw.writeBytes(todo)
} else if int(dst.n) > len(todo)-len(todo)>>4 {
// If we removed less than 1/16th, huffman compress the block.
bw.writeBlockHuff(isEof, todo, false)
} else {
bw.writeBlockDynamic(&dst, isEof, todo, false)
}
if bw.err != nil {
return bw.err
}
dst.Reset()
}
if !eof {
// Align.
bw.writeStoredHeader(0, false)
}
bw.flush()
return bw.err
}

func hashSL(u uint32) uint32 {
return (u * 0x1e35a7bd) >> slTableShift
}

func load3216(b []byte, i int16) uint32 {
// Help the compiler eliminate bounds checks on the read so it can be done in a single read.
b = b[i:]
b = b[:4]
return uint32(b[0]) | uint32(b[1])<<8 | uint32(b[2])<<16 | uint32(b[3])<<24
}

func load6416(b []byte, i int16) uint64 {
// Help the compiler eliminate bounds checks on the read so it can be done in a single read.
b = b[i:]
b = b[:8]
return uint64(b[0]) | uint64(b[1])<<8 | uint64(b[2])<<16 | uint64(b[3])<<24 |
uint64(b[4])<<32 | uint64(b[5])<<40 | uint64(b[6])<<48 | uint64(b[7])<<56
}

func statelessEnc(dst *tokens, src []byte) {
const (
inputMargin = 12 - 1
minNonLiteralBlockSize = 1 + 1 + inputMargin
)

type tableEntry struct {
offset int16
}

var table [slTableSize]tableEntry

// This check isn't in the Snappy implementation, but there, the caller
// instead of the callee handles this case.
if len(src) < minNonLiteralBlockSize {
// We do not fill the token table.
// This will be picked up by caller.
dst.n = uint16(len(src))
return
}

s := int16(1)
nextEmit := int16(0)
// sLimit is when to stop looking for offset/length copies. The inputMargin
// lets us use a fast path for emitLiteral in the main loop, while we are
// looking for copies.
sLimit := int16(len(src) - inputMargin)

// nextEmit is where in src the next emitLiteral should start from.
cv := load3216(src, s)

for {
const skipLog = 5
const doEvery = 2

nextS := s
var candidate tableEntry
for {
nextHash := hashSL(cv)
candidate = table[nextHash]
nextS = s + doEvery + (s-nextEmit)>>skipLog
if nextS > sLimit || nextS <= 0 {
goto emitRemainder
}

now := load6416(src, nextS)
table[nextHash] = tableEntry{offset: s}
nextHash = hashSL(uint32(now))

if cv == load3216(src, candidate.offset) {
table[nextHash] = tableEntry{offset: nextS}
break
}

// Do one right away...
cv = uint32(now)
s = nextS
nextS++
candidate = table[nextHash]
now >>= 8
table[nextHash] = tableEntry{offset: s}

if cv == load3216(src, candidate.offset) {
table[nextHash] = tableEntry{offset: nextS}
break
}
cv = uint32(now)
s = nextS
}

// A 4-byte match has been found. We'll later see if more than 4 bytes
// match. But, prior to the match, src[nextEmit:s] are unmatched. Emit
// them as literal bytes.
for {
// Invariant: we have a 4-byte match at s, and no need to emit any
// literal bytes prior to s.

// Extend the 4-byte match as long as possible.
t := candidate.offset
l := int16(matchLen(src[s+4:], src[t+4:]) + 4)

// Extend backwards
for t > 0 && s > nextEmit && src[t-1] == src[s-1] {
s--
t--
l++
}
if nextEmit < s {
emitLiteral(dst, src[nextEmit:s])
}

// Save the match found
dst.AddMatchLong(int32(l), uint32(s-t-baseMatchOffset))
s += l
nextEmit = s
if nextS >= s {
s = nextS + 1
}
if s >= sLimit {
goto emitRemainder
}

// We could immediately start working at s now, but to improve
// compression we first update the hash table at s-2 and at s. If
// another emitCopy is not our next move, also calculate nextHash
// at s+1. At least on GOARCH=amd64, these three hash calculations
// are faster as one load64 call (with some shifts) instead of
// three load32 calls.
x := load6416(src, s-2)
o := s - 2
prevHash := hashSL(uint32(x))
table[prevHash] = tableEntry{offset: o}
x >>= 16
currHash := hashSL(uint32(x))
candidate = table[currHash]
table[currHash] = tableEntry{offset: o + 2}

if uint32(x) != load3216(src, candidate.offset) {
cv = uint32(x >> 8)
s++
break
}
}
}

emitRemainder:
if int(nextEmit) < len(src) {
// If nothing was added, don't encode literals.
if dst.n == 0 {
return
}
emitLiteral(dst, src[nextEmit:])
}
}
39 changes: 39 additions & 0 deletions flate/writer_test.go
Expand Up @@ -144,6 +144,9 @@ func BenchmarkEncodeDigitsDefault1e6(b *testing.B) { benchmarkEncoder(b, digits
func BenchmarkEncodeDigitsCompress1e4(b *testing.B) { benchmarkEncoder(b, digits, compress, 1e4) }
func BenchmarkEncodeDigitsCompress1e5(b *testing.B) { benchmarkEncoder(b, digits, compress, 1e5) }
func BenchmarkEncodeDigitsCompress1e6(b *testing.B) { benchmarkEncoder(b, digits, compress, 1e6) }
func BenchmarkEncodeDigitsSL1e4(b *testing.B) { benchmarkStatelessEncoder(b, digits, 1e4) }
func BenchmarkEncodeDigitsSL1e5(b *testing.B) { benchmarkStatelessEncoder(b, digits, 1e5) }
func BenchmarkEncodeDigitsSL1e6(b *testing.B) { benchmarkStatelessEncoder(b, digits, 1e6) }
func BenchmarkEncodeTwainConstant1e4(b *testing.B) { benchmarkEncoder(b, twain, constant, 1e4) }
func BenchmarkEncodeTwainConstant1e5(b *testing.B) { benchmarkEncoder(b, twain, constant, 1e5) }
func BenchmarkEncodeTwainConstant1e6(b *testing.B) { benchmarkEncoder(b, twain, constant, 1e6) }
Expand All @@ -156,6 +159,42 @@ func BenchmarkEncodeTwainDefault1e6(b *testing.B) { benchmarkEncoder(b, twain,
func BenchmarkEncodeTwainCompress1e4(b *testing.B) { benchmarkEncoder(b, twain, compress, 1e4) }
func BenchmarkEncodeTwainCompress1e5(b *testing.B) { benchmarkEncoder(b, twain, compress, 1e5) }
func BenchmarkEncodeTwainCompress1e6(b *testing.B) { benchmarkEncoder(b, twain, compress, 1e6) }
func BenchmarkEncodeTwainSL1e4(b *testing.B) { benchmarkStatelessEncoder(b, twain, 1e4) }
func BenchmarkEncodeTwainSL1e5(b *testing.B) { benchmarkStatelessEncoder(b, twain, 1e5) }
func BenchmarkEncodeTwainSL1e6(b *testing.B) { benchmarkStatelessEncoder(b, twain, 1e6) }

func benchmarkStatelessEncoder(b *testing.B, testfile, n int) {
b.SetBytes(int64(n))
buf0, err := ioutil.ReadFile(testfiles[testfile])
if err != nil {
b.Fatal(err)
}
if len(buf0) == 0 {
b.Fatalf("test file %q has no data", testfiles[testfile])
}
buf1 := make([]byte, n)
for i := 0; i < n; i += len(buf0) {
if len(buf0) > n-i {
buf0 = buf0[:n-i]
}
copy(buf1[i:], buf0)
}
buf0 = nil
runtime.GC()
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
w := NewStatelessWriter(ioutil.Discard)
_, err = w.Write(buf1)
if err != nil {
b.Fatal(err)
}
err = w.Close()
if err != nil {
b.Fatal(err)
}
}
}

// A writer that fails after N writes.
type errorWriter struct {
Expand Down
30 changes: 24 additions & 6 deletions gzip/gzip.go
Expand Up @@ -22,6 +22,13 @@ const (
DefaultCompression = flate.DefaultCompression
ConstantCompression = flate.ConstantCompression
HuffmanOnly = flate.HuffmanOnly

// StatelessCompression will do compression but without maintaining any state
// between Write calls.
// There will be no memory kept between Write calls,
// but compression and speed will be suboptimal.
// Because of this, the size of actual Write calls will affect output size.
StatelessCompression = -3
)

// A Writer is an io.WriteCloser.
Expand Down Expand Up @@ -59,7 +66,7 @@ func NewWriter(w io.Writer) *Writer {
// integer value between BestSpeed and BestCompression inclusive. The error
// returned will be nil if the level is valid.
func NewWriterLevel(w io.Writer, level int) (*Writer, error) {
if level < HuffmanOnly || level > BestCompression {
if level < StatelessCompression || level > BestCompression {
return nil, fmt.Errorf("gzip: invalid compression level: %d", level)
}
z := new(Writer)
Expand All @@ -69,9 +76,12 @@ func NewWriterLevel(w io.Writer, level int) (*Writer, error) {

func (z *Writer) init(w io.Writer, level int) {
compressor := z.compressor
if compressor != nil {
compressor.Reset(w)
if level != StatelessCompression {
if compressor != nil {
compressor.Reset(w)
}
}

*z = Writer{
Header: Header{
OS: 255, // unknown
Expand Down Expand Up @@ -189,12 +199,16 @@ func (z *Writer) Write(p []byte) (int, error) {
return n, z.err
}
}
if z.compressor == nil {

if z.compressor == nil && z.level != StatelessCompression {
z.compressor, _ = flate.NewWriter(z.w, z.level)
}
}
z.size += uint32(len(p))
z.digest = crc32.Update(z.digest, crc32.IEEETable, p)
if z.level == StatelessCompression {
return len(p), flate.StatelessDeflate(z.w, p, false)
}
n, z.err = z.compressor.Write(p)
return n, z.err
}
Expand All @@ -211,7 +225,7 @@ func (z *Writer) Flush() error {
if z.err != nil {
return z.err
}
if z.closed {
if z.closed || z.level == StatelessCompression {
return nil
}
if !z.wroteHeader {
Expand Down Expand Up @@ -240,7 +254,11 @@ func (z *Writer) Close() error {
return z.err
}
}
z.err = z.compressor.Close()
if z.level == StatelessCompression {
z.err = flate.StatelessDeflate(z.w, nil, true)
} else {
z.err = z.compressor.Close()
}
if z.err != nil {
return z.err
}
Expand Down