From 331ec6973f8d031724f5d831b932d5a4b40bdca7 Mon Sep 17 00:00:00 2001 From: isaacs Date: Wed, 13 May 2020 10:22:32 -0700 Subject: [PATCH] Exert backpressure when pipeline is not flowing If the pipeline as a whole is not flowing, then it should return `false` from any write operation. Since the Pipeline listens to the tail stream's `data` event, the streams in the pipeline always are in flowing mode. However, the Pipeline itself may not be, so it would return `true` from writes inappropriately, allowing data to be buffered up in the Pipeline excessively. This would not cause any significant issues in most cases, except excess memory usage. Discovered while debugging npm/npm-registry-fetch#23 --- index.js | 2 +- test/backpressure.js | 37 +++++++++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+), 1 deletion(-) create mode 100644 test/backpressure.js diff --git a/index.js b/index.js index 0193621..c42f996 100644 --- a/index.js +++ b/index.js @@ -116,7 +116,7 @@ class Pipeline extends Minipass { this.emit('drain') } write (chunk, enc, cb) { - return this[_head].write(chunk, enc, cb) + return this[_head].write(chunk, enc, cb) && this.flowing } end (chunk, enc, cb) { this[_head].end(chunk, enc, cb) diff --git a/test/backpressure.js b/test/backpressure.js new file mode 100644 index 0000000..b7ef0c2 --- /dev/null +++ b/test/backpressure.js @@ -0,0 +1,37 @@ +const Pipeline = require('../') +const Minipass = require('minipass') +const t = require('tap') + +t.test('verify that pipelines exert backpressure properly', t => { + const tail = new class extends Minipass { + write (chunk, encoding, cb) { + const ret = super.write(chunk, encoding, cb) + t.equal(ret, true, 'tail write should return true') + return ret + } + } + + const head = new class extends Minipass { + write (chunk, encoding, cb) { + const ret = super.write(chunk, encoding, cb) + t.equal(ret, true, 'head write should return true') + return ret + } + } + + const pipe = new Pipeline({ encoding: 'utf8' }, head, tail) + + for (let i = 0; i < 5; i++) { + t.equal(pipe.write('' + i), false, 'write is false until flowing') + } + + const p = pipe.concat().then(d => t.equal(d, '0123456789', 'got expected data')) + + for (let i = 5; i < 10; i++) { + t.equal(pipe.write('' + i), true, 'write is true when pipeline is flowing') + } + + pipe.end() + + return p +})