Skip to content

Commit

Permalink
Exert backpressure when pipeline is not flowing
Browse files Browse the repository at this point in the history
If the pipeline as a whole is not flowing, then it should return `false`
from any write operation.  Since the Pipeline listens to the tail
stream's `data` event, the streams in the pipeline always are in flowing
mode.  However, the Pipeline itself may not be, so it would return
`true` from writes inappropriately, allowing data to be buffered up in
the Pipeline excessively.

This would not cause any significant issues in most cases, except
excess memory usage.

Discovered while debugging npm/npm-registry-fetch#23
  • Loading branch information
isaacs committed May 13, 2020
1 parent aa9c520 commit 331ec69
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 1 deletion.
2 changes: 1 addition & 1 deletion index.js
Expand Up @@ -116,7 +116,7 @@ class Pipeline extends Minipass {
this.emit('drain')
}
write (chunk, enc, cb) {
return this[_head].write(chunk, enc, cb)
return this[_head].write(chunk, enc, cb) && this.flowing
}
end (chunk, enc, cb) {
this[_head].end(chunk, enc, cb)
Expand Down
37 changes: 37 additions & 0 deletions test/backpressure.js
@@ -0,0 +1,37 @@
const Pipeline = require('../')
const Minipass = require('minipass')
const t = require('tap')

t.test('verify that pipelines exert backpressure properly', t => {
const tail = new class extends Minipass {
write (chunk, encoding, cb) {
const ret = super.write(chunk, encoding, cb)
t.equal(ret, true, 'tail write should return true')
return ret
}
}

const head = new class extends Minipass {
write (chunk, encoding, cb) {
const ret = super.write(chunk, encoding, cb)
t.equal(ret, true, 'head write should return true')
return ret
}
}

const pipe = new Pipeline({ encoding: 'utf8' }, head, tail)

for (let i = 0; i < 5; i++) {
t.equal(pipe.write('' + i), false, 'write is false until flowing')
}

const p = pipe.concat().then(d => t.equal(d, '0123456789', 'got expected data'))

for (let i = 5; i < 10; i++) {
t.equal(pipe.write('' + i), true, 'write is true when pipeline is flowing')
}

pipe.end()

return p
})

0 comments on commit 331ec69

Please sign in to comment.