diff --git a/index.js b/index.js index 0193621..c42f996 100644 --- a/index.js +++ b/index.js @@ -116,7 +116,7 @@ class Pipeline extends Minipass { this.emit('drain') } write (chunk, enc, cb) { - return this[_head].write(chunk, enc, cb) + return this[_head].write(chunk, enc, cb) && this.flowing } end (chunk, enc, cb) { this[_head].end(chunk, enc, cb) diff --git a/test/backpressure.js b/test/backpressure.js new file mode 100644 index 0000000..b7ef0c2 --- /dev/null +++ b/test/backpressure.js @@ -0,0 +1,37 @@ +const Pipeline = require('../') +const Minipass = require('minipass') +const t = require('tap') + +t.test('verify that pipelines exert backpressure properly', t => { + const tail = new class extends Minipass { + write (chunk, encoding, cb) { + const ret = super.write(chunk, encoding, cb) + t.equal(ret, true, 'tail write should return true') + return ret + } + } + + const head = new class extends Minipass { + write (chunk, encoding, cb) { + const ret = super.write(chunk, encoding, cb) + t.equal(ret, true, 'head write should return true') + return ret + } + } + + const pipe = new Pipeline({ encoding: 'utf8' }, head, tail) + + for (let i = 0; i < 5; i++) { + t.equal(pipe.write('' + i), false, 'write is false until flowing') + } + + const p = pipe.concat().then(d => t.equal(d, '0123456789', 'got expected data')) + + for (let i = 5; i < 10; i++) { + t.equal(pipe.write('' + i), true, 'write is true when pipeline is flowing') + } + + pipe.end() + + return p +})