Skip to content

Commit

Permalink
Flush buffer before emitting 'data' during write()
Browse files Browse the repository at this point in the history
It takes quite a convoluted set of streams to reproduce this error, but
it's possible to end up in a state where there is a chunk in the buffer,
AND we're in a flowing state, during a write() call.  Since the
assumption was that a flowing state means that the buffer must be empty,
this resulted in sending data out of order, breaking the streaming in
make-fetch-happen and npm-registry-fetch, due to how cacache was
proxying writes to a slow tmp file writer, and how make-fetch-happen was
creating a Pipeline to produce the caching response body.

Fix: npm/npm-registry-fetch#23
Re: npm/make-fetch-happen@7f896be
  • Loading branch information
isaacs committed May 13, 2020
1 parent 846701a commit 4c5a106
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 13 deletions.
33 changes: 20 additions & 13 deletions index.js
Expand Up @@ -134,12 +134,11 @@ module.exports = class Minipass extends Stream {
// this ensures at this point that the chunk is a buffer or string
// don't buffer it up or send it to the decoder
if (!this.objectMode && !chunk.length) {
const ret = this.flowing
if (this[BUFFERLENGTH] !== 0)
this.emit('readable')
if (cb)
cb()
return ret
return this.flowing
}

// fast-path writing strings of same encoding to a stream with
Expand All @@ -153,16 +152,24 @@ module.exports = class Minipass extends Stream {
if (Buffer.isBuffer(chunk) && this[ENCODING])
chunk = this[DECODER].write(chunk)

try {
return this.flowing
? (this.emit('data', chunk), this.flowing)
: (this[BUFFERPUSH](chunk), false)
} finally {
if (this.flowing) {
// if we somehow have something in the buffer, but we think we're
// flowing, then we need to flush all that out first, or we get
// chunks coming in out of order. Can't emit 'drain' here though,
// because we're mid-write, so that'd be bad.
if (this[BUFFERLENGTH] !== 0)
this.emit('readable')
if (cb)
cb()
}
this[FLUSH](true)
this.emit('data', chunk)
} else
this[BUFFERPUSH](chunk)

if (this[BUFFERLENGTH] !== 0)
this.emit('readable')

if (cb)
cb()

return this.flowing
}

read (n) {
Expand Down Expand Up @@ -286,10 +293,10 @@ module.exports = class Minipass extends Stream {
return this.buffer.shift()
}

[FLUSH] () {
[FLUSH] (noDrain) {
do {} while (this[FLUSHCHUNK](this[BUFFERSHIFT]()))

if (!this.buffer.length && !this[EOF])
if (!noDrain && !this.buffer.length && !this[EOF])
this.emit('drain')
}

Expand Down
44 changes: 44 additions & 0 deletions test/flush-buffer-before-flowing.js
@@ -0,0 +1,44 @@
// this is a minimal reproduction of a pretty complex interaction between
// minipass-pipeline and a slow-draining proxy stream, which occurred in
// make-fetch-happen. https://github.com/npm/npm-registry-fetch/issues/23
// The pipeline in question was a wrapper that tee'd data into the cache,
// which is a slow-draining sink stream. When multiple chunks come through,
// the Pipeline's buffer is holding a chunk, but the Pipeline itself is in
// flowing mode. The solution is to always drain the buffer before emitting
// 'data', if there is other data waiting to be emitted.
const Minipass = require('../')
const t = require('tap')

const src = new Minipass({ encoding: 'utf8' })
const mid = new Minipass({ encoding: 'utf8' })
const proxy = new Minipass({ encoding: 'utf8' })
mid.write = function (chunk, encoding, cb) {
Minipass.prototype.write.call(this, chunk, encoding, cb)
return proxy.write(chunk, encoding, cb)
}
proxy.on('drain', chunk => mid.emit('drain'))
proxy.on('readable', () => setTimeout(() => proxy.read()))

const dest = new Minipass({ encoding: 'utf8' })
src.write('a')
src.write('b')

const pipeline = new (class Pipeline extends Minipass {
constructor (opt) {
super(opt)
dest.on('data', c => super.write(c))
dest.on('end', () => super.end())
}
emit (ev, ...args) {
if (ev === 'resume')
dest.resume()
return super.emit(ev, ...args)
}
})({ encoding: 'utf8'})

mid.pipe(dest)
src.pipe(mid)
t.test('get all data', t => pipeline.concat().then(d => t.equal(d, 'abcd')))
src.write('c')
src.write('d')
src.end()
32 changes: 32 additions & 0 deletions test/write-returns-true-when-readable-triggers-flow.js
@@ -0,0 +1,32 @@
// if you do s.on('readable', s => s.pipe(d)), then s.write() should return
// true, because even though s is not flowing at the START of the write(),
// it IS flowing by the END of the write call.

const Minipass = require('../')
const t = require('tap')

t.test('empty write', async t => {
const s = new Minipass({ encoding: 'utf8' })
const dest = new Minipass({ encoding: 'utf8' })
const p = dest.concat().then(d => t.equal(d, 'a', 'got data'))
t.equal(s.write('a'), false, 'first write returns false')
t.equal(s.write(''), false, 'empty write returns false')
s.on('readable', () => s.pipe(dest))
t.equal(s.flowing, false, 'src is not flowing yet')
t.equal(s.write(''), true, 'return true, now flowing')
s.end()
await p
})

t.test('non-empty write', async t => {
const s = new Minipass({ encoding: 'utf8' })
const dest = new Minipass({ encoding: 'utf8' })
const p = dest.concat().then(d => t.equal(d, 'ab', 'got data'))
t.equal(s.write('a'), false, 'first write returns false')
t.equal(s.write(''), false, 'empty write returns false')
s.on('readable', () => s.pipe(dest))
t.equal(s.flowing, false, 'src is not flowing yet')
t.equal(s.write('b'), true, 'return true, now flowing')
s.end()
await p
})

0 comments on commit 4c5a106

Please sign in to comment.