Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

zstd: Use 1 less goroutine for stream decoding #588

Merged
merged 1 commit into from May 12, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
93 changes: 41 additions & 52 deletions zstd/decoder.go
Expand Up @@ -637,60 +637,18 @@ func (d *Decoder) startSyncDecoder(r io.Reader) error {

// Create Decoder:
// ASYNC:
// Spawn 4 go routines.
// 0: Read frames and decode blocks.
// 1: Decode block and literals. Receives hufftree and seqdecs, returns seqdecs and huff tree.
// 2: Wait for recentOffsets if needed. Decode sequences, send recentOffsets.
// 3: Wait for stream history, execute sequences, send stream history.
// Spawn 3 go routines.
// 0: Read frames and decode block literals.
// 1: Decode sequences.
// 2: Execute sequences, send to output.
func (d *Decoder) startStreamDecoder(ctx context.Context, r io.Reader, output chan decodeOutput) {
defer d.streamWg.Done()
br := readerWrapper{r: r}

var seqPrepare = make(chan *blockDec, d.o.concurrent)
var seqDecode = make(chan *blockDec, d.o.concurrent)
var seqExecute = make(chan *blockDec, d.o.concurrent)

// Async 1: Prepare blocks...
go func() {
var hist history
var hasErr bool
for block := range seqPrepare {
if hasErr {
if block != nil {
seqDecode <- block
}
continue
}
if block.async.newHist != nil {
if debugDecoder {
println("Async 1: new history")
}
hist.reset()
if block.async.newHist.dict != nil {
hist.setDict(block.async.newHist.dict)
}
}
if block.err != nil || block.Type != blockTypeCompressed {
hasErr = block.err != nil
seqDecode <- block
continue
}

remain, err := block.decodeLiterals(block.data, &hist)
block.err = err
hasErr = block.err != nil
if err == nil {
block.async.literals = hist.decoders.literals
block.async.seqData = remain
} else if debugDecoder {
println("decodeLiterals error:", err)
}
seqDecode <- block
}
close(seqDecode)
}()

// Async 2: Decode sequences...
// Async 1: Decode sequences...
go func() {
var hist history
var hasErr bool
Expand All @@ -704,7 +662,7 @@ func (d *Decoder) startStreamDecoder(ctx context.Context, r io.Reader, output ch
}
if block.async.newHist != nil {
if debugDecoder {
println("Async 2: new history, recent:", block.async.newHist.recentOffsets)
println("Async 1: new history, recent:", block.async.newHist.recentOffsets)
}
hist.decoders = block.async.newHist.decoders
hist.recentOffsets = block.async.newHist.recentOffsets
Expand Down Expand Up @@ -758,7 +716,7 @@ func (d *Decoder) startStreamDecoder(ctx context.Context, r io.Reader, output ch
}
if block.async.newHist != nil {
if debugDecoder {
println("Async 3: new history")
println("Async 2: new history")
}
hist.windowSize = block.async.newHist.windowSize
hist.allocFrameBuffer = block.async.newHist.allocFrameBuffer
Expand Down Expand Up @@ -845,6 +803,33 @@ func (d *Decoder) startStreamDecoder(ctx context.Context, r io.Reader, output ch

decodeStream:
for {
var hist history
var hasErr bool

decodeBlock := func(block *blockDec) {
if hasErr {
if block != nil {
seqDecode <- block
}
return
}
if block.err != nil || block.Type != blockTypeCompressed {
hasErr = block.err != nil
seqDecode <- block
return
}

remain, err := block.decodeLiterals(block.data, &hist)
block.err = err
hasErr = block.err != nil
if err == nil {
block.async.literals = hist.decoders.literals
block.async.seqData = remain
} else if debugDecoder {
println("decodeLiterals error:", err)
}
seqDecode <- block
}
frame := d.frame
if debugDecoder {
println("New frame...")
Expand All @@ -871,7 +856,7 @@ decodeStream:
case <-ctx.Done():
case dec := <-d.decoders:
dec.sendErr(err)
seqPrepare <- dec
decodeBlock(dec)
}
break decodeStream
}
Expand All @@ -891,6 +876,10 @@ decodeStream:
if debugDecoder {
println("Alloc History:", h.allocFrameBuffer)
}
hist.reset()
if h.dict != nil {
hist.setDict(h.dict)
}
dec.async.newHist = &h
dec.async.fcs = frame.FrameContentSize
historySent = true
Expand All @@ -917,7 +906,7 @@ decodeStream:
}
err = dec.err
last := dec.Last
seqPrepare <- dec
decodeBlock(dec)
if err != nil {
break decodeStream
}
Expand All @@ -926,7 +915,7 @@ decodeStream:
}
}
}
close(seqPrepare)
close(seqDecode)
wg.Wait()
d.frame.history.b = frameHistCache
}