Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(fetch): implement fully read body algorithm #1597

Merged
merged 2 commits into from Aug 12, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 4 additions & 11 deletions lib/fetch/index.js
Expand Up @@ -34,7 +34,8 @@ const {
sameOrigin,
isCancelled,
isAborted,
isErrorLike
isErrorLike,
fullyReadBody
} = require('./util')
const { kState, kHeaders, kGuard, kRealm } = require('./symbols')
const assert = require('assert')
Expand Down Expand Up @@ -738,11 +739,7 @@ async function mainFetch (fetchParams, recursive = false) {
}

// 4. Fully read response’s body given processBody and processBodyError.
try {
processBody(await response.arrayBuffer())
} catch (err) {
processBodyError(err)
}
await fullyReadBody(response.body, processBody, processBodyError)
} else {
// 21. Otherwise, run fetch finale given fetchParams and response.
fetchFinale(fetchParams, response)
Expand Down Expand Up @@ -974,11 +971,7 @@ async function fetchFinale (fetchParams, response) {
} else {
// 4. Otherwise, fully read response’s body given processBody, processBodyError,
// and fetchParams’s task destination.
try {
processBody(await response.body.stream.arrayBuffer())
} catch (err) {
processBodyError(err)
}
await fullyReadBody(response.body, processBody, processBodyError)
}
}
}
Expand Down
51 changes: 50 additions & 1 deletion lib/fetch/util.js
Expand Up @@ -4,6 +4,7 @@ const { redirectStatus } = require('./constants')
const { performance } = require('perf_hooks')
const { isBlobLike, toUSVString, ReadableStreamFrom } = require('../core/util')
const assert = require('assert')
const { isUint8Array } = require('util/types')

let File

Expand Down Expand Up @@ -438,6 +439,53 @@ function makeIterator (iterator, name) {
return Object.setPrototypeOf({}, i)
}

/**
* @see https://fetch.spec.whatwg.org/#body-fully-read
*/
async function fullyReadBody (body, processBody, processBodyError) {
// 1. If taskDestination is null, then set taskDestination to
// the result of starting a new parallel queue.

// 2. Let promise be the result of fully reading body as promise
// given body.
try {
/** @type {Uint8Array[]} */
const chunks = []
let length = 0

const reader = body.stream.getReader()

while (true) {
const { done, value } = await reader.read()

if (done === true) {
break
}

// read-loop chunk steps
assert(isUint8Array(value))

chunks.push(value)
length += value.byteLength
}

// 3. Let fulfilledSteps given a byte sequence bytes be to queue
// a fetch task to run processBody given bytes, with
// taskDestination.
const fulfilledSteps = (bytes) => queueMicrotask(() => {
processBody(bytes)
})

fulfilledSteps(Buffer.concat(chunks, length))
} catch {
// 4. Let rejectedSteps be to queue a fetch task to run
// processBodyError, with taskDestination.
queueMicrotask(() => processBodyError())
KhafraDev marked this conversation as resolved.
Show resolved Hide resolved
}

// 5. React to promise with fulfilledSteps and rejectedSteps.
}

/**
* Fetch supports node >= 16.8.0, but Object.hasOwn was added in v16.9.0.
*/
Expand Down Expand Up @@ -477,5 +525,6 @@ module.exports = {
isValidHeaderName,
isValidHeaderValue,
hasOwn,
isErrorLike
isErrorLike,
fullyReadBody
}
25 changes: 25 additions & 0 deletions test/fetch/client-fetch.js
Expand Up @@ -536,3 +536,28 @@ test('Receiving non-Latin1 headers', async (t) => {
t.same(lengths, [30, 34, 94, 104, 90])
t.end()
})

// https://github.com/nodejs/undici/issues/1594
// TODO(@KhafraDev): this test fails because of an integrity-mismatch check
// that hasn't been implemented when this comment was written. Enable this
// check once a resource's integrity is checked.
// test('with RequestInit.integrity set', async (t) => {
// const body = 'Hello world'
// const hash = require('crypto').createHash('sha256').update(body).digest('hex')
//
// const server = createServer((req, res) => {
// res.write(body)
// res.end()
// }).listen(0)
//
// t.teardown(server.close.bind(server))
// await once(server, 'listening')
//
// const response = await fetch(`http://localhost:${server.address().port}`, {
// integrity: `sha256-${hash}`
// })
//
// const ab = await response.arrayBuffer()
//
// t.same(new Uint8Array(ab), new TextEncoder().encode(body))
// })