Skip to content

Commit

Permalink
Fix premature close with chunked transfer encoding and for async iter…
Browse files Browse the repository at this point in the history
…ators in Node 12 (#1064)

Co-authored-by: Irakli Gozalishvili <contact@gozala.io>
  • Loading branch information
tekwiz and Gozala committed Feb 23, 2021
1 parent 6ee9d31 commit 8eeeec1
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 0 deletions.
49 changes: 49 additions & 0 deletions README.md
Expand Up @@ -285,6 +285,55 @@ if (!response.ok) throw new Error(`unexpected response ${response.statusText}`);
await streamPipeline(response.body, createWriteStream('./octocat.png'));
```

In Node.js 14 you can also use async iterators to read `body`; however, be careful to catch
errors -- the longer a response runs, the more likely it is to encounter an error.

```js
const fetch = require('node-fetch');

const response = await fetch('https://httpbin.org/stream/3');

try {
for await (const chunk of response.body) {
console.dir(JSON.parse(chunk.toString()));
}
} catch (err) {
console.error(err.stack);
}
```

In Node.js 12 you can also use async iterators to read `body`; however, async iterators with streams
did not mature until Node.js 14, so you need to do some extra work to ensure you handle errors
directly from the stream and wait on it response to fully close.

```js
const fetch = require('node-fetch');

const read = async body => {
let error;
body.on('error', err => {
error = err;
});

for await (const chunk of body) {
console.dir(JSON.parse(chunk.toString()));
}

return new Promise((resolve, reject) => {
body.on('close', () => {
error ? reject(error) : resolve();
});
});
};

try {
const response = await fetch('https://httpbin.org/stream/3');
await read(response.body);
} catch (err) {
console.error(err.stack);
}
```

### Buffer

If you prefer to cache binary data in full, use buffer(). (NOTE: buffer() is a `node-fetch` only API)
Expand Down
52 changes: 52 additions & 0 deletions src/index.js
Expand Up @@ -95,6 +95,30 @@ export default async function fetch(url, options_) {
finalize();
});

fixResponseChunkedTransferBadEnding(request_, err => {
response.body.destroy(err);
});

/* c8 ignore next 18 */
if (process.version < 'v14') {
// Before Node.js 14, pipeline() does not fully support async iterators and does not always
// properly handle when the socket close/end events are out of order.
request_.on('socket', s => {
let endedWithEventsCount;
s.prependListener('end', () => {
endedWithEventsCount = s._eventsCount;
});
s.prependListener('close', hadError => {
// if end happened before close but the socket didn't emit an error, do it now
if (response && endedWithEventsCount < s._eventsCount && !hadError) {
const err = new Error('Premature close');
err.code = 'ERR_STREAM_PREMATURE_CLOSE';
response.body.emit('error', err);
}
});
});
}

request_.on('response', response_ => {
request_.setTimeout(0);
const headers = fromRawHeaders(response_.rawHeaders);
Expand Down Expand Up @@ -265,3 +289,31 @@ export default async function fetch(url, options_) {
writeToStream(request_, request);
});
}

function fixResponseChunkedTransferBadEnding(request, errorCallback) {
const LAST_CHUNK = Buffer.from('0\r\n');
let socket;

request.on('socket', s => {
socket = s;
});

request.on('response', response => {
const {headers} = response;
if (headers['transfer-encoding'] === 'chunked' && !headers['content-length']) {
let properLastChunkReceived = false;

socket.on('data', buf => {
properLastChunkReceived = Buffer.compare(buf.slice(-3), LAST_CHUNK) === 0;
});

socket.prependListener('close', () => {
if (!properLastChunkReceived) {
const err = new Error('Premature close');
err.code = 'ERR_STREAM_PREMATURE_CLOSE';
errorCallback(err);
}
});
}
});
}
68 changes: 68 additions & 0 deletions test/main.js
Expand Up @@ -613,6 +613,74 @@ describe('node-fetch', () => {
});
});

it('should handle network-error in chunked response', () => {
const url = `${base}error/premature/chunked`;
return fetch(url).then(res => {
expect(res.status).to.equal(200);
expect(res.ok).to.be.true;

return expect(new Promise((resolve, reject) => {
res.body.on('error', reject);
res.body.on('close', resolve);
})).to.eventually.be.rejectedWith(Error, 'Premature close')
.and.have.property('code', 'ERR_STREAM_PREMATURE_CLOSE');
});
});

it('should handle network-error in chunked response async iterator', () => {
const url = `${base}error/premature/chunked`;
return fetch(url).then(res => {
expect(res.status).to.equal(200);
expect(res.ok).to.be.true;

const read = async body => {
const chunks = [];

if (process.version < 'v14') {
// In Node.js 12, some errors don't come out in the async iterator; we have to pick
// them up from the event-emitter and then throw them after the async iterator
let error;
body.on('error', err => {
error = err;
});

for await (const chunk of body) {
chunks.push(chunk);
}

if (error) {
throw error;
}

return new Promise(resolve => {
body.on('close', () => resolve(chunks));
});
}

for await (const chunk of body) {
chunks.push(chunk);
}

return chunks;
};

return expect(read(res.body))
.to.eventually.be.rejectedWith(Error, 'Premature close')
.and.have.property('code', 'ERR_STREAM_PREMATURE_CLOSE');
});
});

it('should handle network-error in chunked response in consumeBody', () => {
const url = `${base}error/premature/chunked`;
return fetch(url).then(res => {
expect(res.status).to.equal(200);
expect(res.ok).to.be.true;

return expect(res.text())
.to.eventually.be.rejectedWith(Error, 'Premature close');
});
});

it('should handle DNS-error response', () => {
const url = 'http://domain.invalid';
return expect(fetch(url)).to.eventually.be.rejected
Expand Down
17 changes: 17 additions & 0 deletions test/utils/server.js
Expand Up @@ -323,6 +323,23 @@ export default class TestServer {
}, 100);
}

if (p === '/error/premature/chunked') {
res.writeHead(200, {
'Content-Type': 'application/json',
'Transfer-Encoding': 'chunked'
});

res.write(`${JSON.stringify({data: 'hi'})}\n`);

setTimeout(() => {
res.write(`${JSON.stringify({data: 'bye'})}\n`);
}, 200);

setTimeout(() => {
res.destroy();
}, 400);
}

if (p === '/error/json') {
res.statusCode = 200;
res.setHeader('Content-Type', 'application/json');
Expand Down

0 comments on commit 8eeeec1

Please sign in to comment.