Skip to content

Commit

Permalink
zstd: Add stream encoding without goroutines
Browse files Browse the repository at this point in the history
Do not use goroutines when encoder concurrency is 1.

Fixes #264

Can probably be clean up a bit.
  • Loading branch information
klauspost committed Feb 26, 2022
1 parent 308a751 commit 2574faf
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 2 deletions.
8 changes: 6 additions & 2 deletions zstd/decoder.go
Expand Up @@ -238,7 +238,9 @@ func (d *Decoder) Reset(r io.Reader) error {
// drainOutput will drain the output until errEndOfStream is sent.
func (d *Decoder) drainOutput() {
if d.current.cancel != nil {
println("cancelling current")
if debugDecoder {
println("cancelling current")
}
d.current.cancel()
d.current.cancel = nil
}
Expand Down Expand Up @@ -816,7 +818,9 @@ func (d *Decoder) startStreamDecoder(ctx context.Context, r io.Reader, output ch
do.err = ErrFrameSizeMismatch
hasErr = true
} else {
println("fcs ok", block.Last, fcs, decodedFrame)
if debugDecoder {
println("fcs ok", block.Last, fcs, decodedFrame)
}
}
}
output <- do
Expand Down
40 changes: 40 additions & 0 deletions zstd/encoder.go
Expand Up @@ -258,6 +258,46 @@ func (e *Encoder) nextBlock(final bool) error {
return s.err
}

// SYNC:
if e.o.concurrent == 1 {
src := s.filling
s.nInput += int64(len(s.filling))
if debugEncoder {
println("Adding sync block,", len(src), "bytes, final:", final)
}
enc := s.encoder
blk := enc.Block()
blk.reset(nil)
enc.Encode(blk, src)
blk.last = final
if final {
s.eofWritten = true
}

err := errIncompressible
// If we got the exact same number of literals as input,
// assume the literals cannot be compressed.
if len(src) != len(blk.literals) || len(src) != e.o.blockSize {
err = blk.encode(src, e.o.noEntropy, !e.o.allLitEntropy)
}
switch err {
case errIncompressible:
if debugEncoder {
println("Storing incompressible block as raw")
}
blk.encodeRaw(src)
// In fast mode, we do not transfer offsets, so we don't have to deal with changing the.
case nil:
default:
s.err = err
return err
}
_, s.err = s.w.Write(blk.output)
s.nWritten += int64(len(blk.output))
s.filling = s.filling[:0]
return s.err
}

// Move blocks forward.
s.filling, s.current, s.previous = s.previous[:0], s.filling, s.current
s.nInput += int64(len(s.current))
Expand Down

0 comments on commit 2574faf

Please sign in to comment.