diff --git a/README.md b/README.md index 3e1d8d35e..0d38a35f5 100644 --- a/README.md +++ b/README.md @@ -285,6 +285,55 @@ if (!response.ok) throw new Error(`unexpected response ${response.statusText}`); await streamPipeline(response.body, createWriteStream('./octocat.png')); ``` +In Node.js 14 you can also use async iterators to read `body`; however, be careful to catch +errors -- the longer a response runs, the more likely it is to encounter an error. + +```js +const fetch = require('node-fetch'); + +const response = await fetch('https://httpbin.org/stream/3'); + +try { + for await (const chunk of response.body) { + console.dir(JSON.parse(chunk.toString())); + } +} catch (err) { + console.error(err.stack); +} +``` + +In Node.js 12 you can also use async iterators to read `body`; however, async iterators with streams +did not mature until Node.js 14, so you need to do some extra work to ensure you handle errors +directly from the stream and wait on it response to fully close. + +```js +const fetch = require('node-fetch'); + +const read = async body => { + let error; + body.on('error', err => { + error = err; + }); + + for await (const chunk of body) { + console.dir(JSON.parse(chunk.toString())); + } + + return new Promise((resolve, reject) => { + body.on('close', () => { + error ? reject(error) : resolve(); + }); + }); +}; + +try { + const response = await fetch('https://httpbin.org/stream/3'); + await read(response.body); +} catch (err) { + console.error(err.stack); +} +``` + ### Buffer If you prefer to cache binary data in full, use buffer(). (NOTE: buffer() is a `node-fetch` only API) diff --git a/src/index.js b/src/index.js index 34679ebc5..a46e65f1e 100644 --- a/src/index.js +++ b/src/index.js @@ -95,6 +95,30 @@ export default async function fetch(url, options_) { finalize(); }); + fixResponseChunkedTransferBadEnding(request_, err => { + response.body.destroy(err); + }); + + /* c8 ignore next 18 */ + if (process.version < 'v14') { + // Before Node.js 14, pipeline() does not fully support async iterators and does not always + // properly handle when the socket close/end events are out of order. + request_.on('socket', s => { + let endedWithEventsCount; + s.prependListener('end', () => { + endedWithEventsCount = s._eventsCount; + }); + s.prependListener('close', hadError => { + // if end happened before close but the socket didn't emit an error, do it now + if (response && endedWithEventsCount < s._eventsCount && !hadError) { + const err = new Error('Premature close'); + err.code = 'ERR_STREAM_PREMATURE_CLOSE'; + response.body.emit('error', err); + } + }); + }); + } + request_.on('response', response_ => { request_.setTimeout(0); const headers = fromRawHeaders(response_.rawHeaders); @@ -265,3 +289,31 @@ export default async function fetch(url, options_) { writeToStream(request_, request); }); } + +function fixResponseChunkedTransferBadEnding(request, errorCallback) { + const LAST_CHUNK = Buffer.from('0\r\n'); + let socket; + + request.on('socket', s => { + socket = s; + }); + + request.on('response', response => { + const {headers} = response; + if (headers['transfer-encoding'] === 'chunked' && !headers['content-length']) { + let properLastChunkReceived = false; + + socket.on('data', buf => { + properLastChunkReceived = Buffer.compare(buf.slice(-3), LAST_CHUNK) === 0; + }); + + socket.prependListener('close', () => { + if (!properLastChunkReceived) { + const err = new Error('Premature close'); + err.code = 'ERR_STREAM_PREMATURE_CLOSE'; + errorCallback(err); + } + }); + } + }); +} diff --git a/test/main.js b/test/main.js index 35d7df1ca..550e4f422 100644 --- a/test/main.js +++ b/test/main.js @@ -613,6 +613,74 @@ describe('node-fetch', () => { }); }); + it('should handle network-error in chunked response', () => { + const url = `${base}error/premature/chunked`; + return fetch(url).then(res => { + expect(res.status).to.equal(200); + expect(res.ok).to.be.true; + + return expect(new Promise((resolve, reject) => { + res.body.on('error', reject); + res.body.on('close', resolve); + })).to.eventually.be.rejectedWith(Error, 'Premature close') + .and.have.property('code', 'ERR_STREAM_PREMATURE_CLOSE'); + }); + }); + + it('should handle network-error in chunked response async iterator', () => { + const url = `${base}error/premature/chunked`; + return fetch(url).then(res => { + expect(res.status).to.equal(200); + expect(res.ok).to.be.true; + + const read = async body => { + const chunks = []; + + if (process.version < 'v14') { + // In Node.js 12, some errors don't come out in the async iterator; we have to pick + // them up from the event-emitter and then throw them after the async iterator + let error; + body.on('error', err => { + error = err; + }); + + for await (const chunk of body) { + chunks.push(chunk); + } + + if (error) { + throw error; + } + + return new Promise(resolve => { + body.on('close', () => resolve(chunks)); + }); + } + + for await (const chunk of body) { + chunks.push(chunk); + } + + return chunks; + }; + + return expect(read(res.body)) + .to.eventually.be.rejectedWith(Error, 'Premature close') + .and.have.property('code', 'ERR_STREAM_PREMATURE_CLOSE'); + }); + }); + + it('should handle network-error in chunked response in consumeBody', () => { + const url = `${base}error/premature/chunked`; + return fetch(url).then(res => { + expect(res.status).to.equal(200); + expect(res.ok).to.be.true; + + return expect(res.text()) + .to.eventually.be.rejectedWith(Error, 'Premature close'); + }); + }); + it('should handle DNS-error response', () => { const url = 'http://domain.invalid'; return expect(fetch(url)).to.eventually.be.rejected diff --git a/test/utils/server.js b/test/utils/server.js index a06b9ab5b..4d58ac4f5 100644 --- a/test/utils/server.js +++ b/test/utils/server.js @@ -323,6 +323,23 @@ export default class TestServer { }, 100); } + if (p === '/error/premature/chunked') { + res.writeHead(200, { + 'Content-Type': 'application/json', + 'Transfer-Encoding': 'chunked' + }); + + res.write(`${JSON.stringify({data: 'hi'})}\n`); + + setTimeout(() => { + res.write(`${JSON.stringify({data: 'bye'})}\n`); + }, 200); + + setTimeout(() => { + res.destroy(); + }, 400); + } + if (p === '/error/json') { res.statusCode = 200; res.setHeader('Content-Type', 'application/json');