Skip to content

Commit

Permalink
feat: implement spec-compliant body mixins (nodejs#1694)
Browse files Browse the repository at this point in the history
* feat: implement spec-compliant body mixins

* fix: skip tests on v16.8
  • Loading branch information
KhafraDev authored and metcoder95 committed Dec 26, 2022
1 parent 806760e commit 4937381
Show file tree
Hide file tree
Showing 9 changed files with 203 additions and 123 deletions.
289 changes: 184 additions & 105 deletions lib/fetch/body.js
Expand Up @@ -13,6 +13,8 @@ const assert = require('assert')
const { isErrored } = require('../core/util')
const { isUint8Array, isArrayBuffer } = require('util/types')
const { File } = require('./file')
const { StringDecoder } = require('string_decoder')
const { parseMIMEType } = require('./dataURL')

let ReadableStream

Expand Down Expand Up @@ -301,117 +303,28 @@ function throwIfAborted (state) {

function bodyMixinMethods (instance) {
const methods = {
async blob () {
if (!(this instanceof instance)) {
throw new TypeError('Illegal invocation')
}

throwIfAborted(this[kState])

const chunks = []

for await (const chunk of consumeBody(this[kState].body)) {
if (!isUint8Array(chunk)) {
throw new TypeError('Expected Uint8Array chunk')
}

// Assemble one final large blob with Uint8Array's can exhaust memory.
// That's why we create create multiple blob's and using references
chunks.push(new Blob([chunk]))
}

return new Blob(chunks, { type: this.headers.get('Content-Type') || '' })
blob () {
// The blob() method steps are to return the result of
// running consume body with this and Blob.
return specConsumeBody(this, 'Blob', instance)
},

async arrayBuffer () {
if (!(this instanceof instance)) {
throw new TypeError('Illegal invocation')
}

throwIfAborted(this[kState])

const contentLength = this.headers.get('content-length')
const encoded = this.headers.has('content-encoding')

// if we have content length and no encoding, then we can
// pre allocate the buffer and just read the data into it
if (!encoded && contentLength) {
const buffer = new Uint8Array(contentLength)
let offset = 0

for await (const chunk of consumeBody(this[kState].body)) {
if (!isUint8Array(chunk)) {
throw new TypeError('Expected Uint8Array chunk')
}

buffer.set(chunk, offset)
offset += chunk.length
}

return buffer.buffer
}

// if we don't have content length, then we have to allocate 2x the
// size of the body, once for consumed data, and once for the final buffer

// This could be optimized by using growable ArrayBuffer, but it's not
// implemented yet. https://github.com/tc39/proposal-resizablearraybuffer

const chunks = []
let size = 0

for await (const chunk of consumeBody(this[kState].body)) {
if (!isUint8Array(chunk)) {
throw new TypeError('Expected Uint8Array chunk')
}

chunks.push(chunk)
size += chunk.byteLength
}

const buffer = new Uint8Array(size)
let offset = 0

for (const chunk of chunks) {
buffer.set(chunk, offset)
offset += chunk.byteLength
}

return buffer.buffer
arrayBuffer () {
// The arrayBuffer() method steps are to return the
// result of running consume body with this and ArrayBuffer.
return specConsumeBody(this, 'ArrayBuffer', instance)
},

async text () {
if (!(this instanceof instance)) {
throw new TypeError('Illegal invocation')
}

throwIfAborted(this[kState])

let result = ''
const textDecoder = new TextDecoder()

for await (const chunk of consumeBody(this[kState].body)) {
if (!isUint8Array(chunk)) {
throw new TypeError('Expected Uint8Array chunk')
}

result += textDecoder.decode(chunk, { stream: true })
}

// flush
result += textDecoder.decode()

return result
text () {
// The text() method steps are to return the result of
// running consume body with this and text.
return specConsumeBody(this, 'text', instance)
},

async json () {
if (!(this instanceof instance)) {
throw new TypeError('Illegal invocation')
}

throwIfAborted(this[kState])

return JSON.parse(await this.text())
json () {
// The json() method steps are to return the result of
// running consume body with this and JSON.
return specConsumeBody(this, 'JSON', instance)
},

async formData () {
Expand Down Expand Up @@ -534,6 +447,172 @@ function mixinBody (prototype) {
Object.assign(prototype.prototype, bodyMixinMethods(prototype))
}

// https://fetch.spec.whatwg.org/#concept-body-consume-body
async function specConsumeBody (object, type, instance) {
if (!(object instanceof instance)) {
throw new TypeError('Illegal invocation')
}

// TODO: why is this needed?
throwIfAborted(object[kState])

// 1. If object is unusable, then return a promise rejected
// with a TypeError.
if (bodyUnusable(object[kState].body)) {
throw new TypeError('Body is unusable')
}

// 2. Let promise be a promise resolved with an empty byte
// sequence.
let promise

// 3. If object’s body is non-null, then set promise to the
// result of fully reading body as promise given object’s
// body.
if (object[kState].body != null) {
promise = await fullyReadBodyAsPromise(object[kState].body)
} else {
// step #2
promise = { size: 0, bytes: [new Uint8Array()] }
}

// 4. Let steps be to return the result of package data with
// the first argument given, type, and object’s MIME type.
const mimeType = type === 'Blob' || type === 'FormData'
? bodyMimeType(object)
: undefined

// 5. Return the result of upon fulfillment of promise given
// steps.
return packageData(promise, type, mimeType)
}

/**
* @see https://fetch.spec.whatwg.org/#concept-body-package-data
* @param {{ size: number, bytes: Uint8Array[] }} bytes
* @param {string} type
* @param {ReturnType<typeof parseMIMEType>|undefined} mimeType
*/
function packageData ({ bytes, size }, type, mimeType) {
switch (type) {
case 'ArrayBuffer': {
// Return a new ArrayBuffer whose contents are bytes.
const uint8 = new Uint8Array(size)
let offset = 0

for (const chunk of bytes) {
uint8.set(chunk, offset)
offset += chunk.byteLength
}

return uint8.buffer
}
case 'Blob': {
// Return a Blob whose contents are bytes and type attribute
// is mimeType.
return new Blob(bytes, { type: mimeType?.essence })
}
case 'JSON': {
// Return the result of running parse JSON from bytes on bytes.
return JSON.parse(utf8DecodeBytes(bytes))
}
case 'text': {
// 1. Return the result of running UTF-8 decode on bytes.
return utf8DecodeBytes(bytes)
}
}
}

// https://fetch.spec.whatwg.org/#body-unusable
function bodyUnusable (body) {
// An object including the Body interface mixin is
// said to be unusable if its body is non-null and
// its body’s stream is disturbed or locked.
return body != null && (body.stream.locked || util.isDisturbed(body.stream))
}

// https://fetch.spec.whatwg.org/#fully-reading-body-as-promise
async function fullyReadBodyAsPromise (body) {
// 1. Let reader be the result of getting a reader for body’s
// stream. If that threw an exception, then return a promise
// rejected with that exception.
const reader = body.stream.getReader()

// 2. Return the result of reading all bytes from reader.
/** @type {Uint8Array[]} */
const bytes = []
let size = 0

while (true) {
const { done, value } = await reader.read()

if (done) {
break
}

// https://streams.spec.whatwg.org/#read-loop
// If chunk is not a Uint8Array object, reject promise with
// a TypeError and abort these steps.
if (!isUint8Array(value)) {
throw new TypeError('Value is not a Uint8Array.')
}

bytes.push(value)
size += value.byteLength
}

return { size, bytes }
}

/**
* @see https://encoding.spec.whatwg.org/#utf-8-decode
* @param {Uint8Array[]} ioQueue
*/
function utf8DecodeBytes (ioQueue) {
if (ioQueue.length === 0) {
return ''
}

// 1. Let buffer be the result of peeking three bytes
// from ioQueue, converted to a byte sequence.
const buffer = ioQueue[0]

// 2. If buffer is 0xEF 0xBB 0xBF, then read three
// bytes from ioQueue. (Do nothing with those bytes.)
if (buffer[0] === 0xEF && buffer[1] === 0xBB && buffer[2] === 0xBF) {
ioQueue[0] = ioQueue[0].subarray(3)
}

// 3. Process a queue with an instance of UTF-8’s
// decoder, ioQueue, output, and "replacement".
const decoder = new StringDecoder('utf-8')
let output = ''

for (const chunk of ioQueue) {
output += decoder.write(chunk)
}

output += decoder.end()

// 4. Return output.
return output
}

/**
* @see https://fetch.spec.whatwg.org/#concept-body-mime-type
* @param {import('./response').Response|import('./request').Request} object
*/
function bodyMimeType (object) {
const { headersList } = object[kState]
const contentType = headersList.get('content-type')

if (contentType === null) {
return 'failure'
}

return parseMIMEType(contentType)
}

module.exports = {
extractBody,
safelyExtractBody,
Expand Down
6 changes: 5 additions & 1 deletion lib/fetch/dataURL.js
Expand Up @@ -321,7 +321,11 @@ function parseMIMEType (input) {
type: type.toLowerCase(),
subtype: subtype.toLowerCase(),
/** @type {Map<string, string>} */
parameters: new Map()
parameters: new Map(),
// https://mimesniff.spec.whatwg.org/#mime-type-essence
get essence () {
return `${this.type}/${this.subtype}`
}
}

// 11. While position is not past the end of input:
Expand Down
4 changes: 2 additions & 2 deletions test/fetch/client-fetch.js
Expand Up @@ -316,7 +316,7 @@ test('locked blob body', (t) => {
const res = await fetch(`http://localhost:${server.address().port}`)
const reader = res.body.getReader()
res.blob().catch(err => {
t.equal(err.message, 'The stream is locked.')
t.equal(err.message, 'Body is unusable')
reader.cancel()
})
})
Expand All @@ -336,7 +336,7 @@ test('disturbed blob body', (t) => {
t.pass(2)
})
res.blob().catch(err => {
t.equal(err.message, 'The body has already been consumed.')
t.equal(err.message, 'Body is unusable')
})
})
})
Expand Down
9 changes: 6 additions & 3 deletions test/fetch/data-uri.js
Expand Up @@ -113,19 +113,22 @@ test('https://mimesniff.spec.whatwg.org/#parse-a-mime-type', (t) => {
t.same(parseMIMEType('text/plain'), {
type: 'text',
subtype: 'plain',
parameters: new Map()
parameters: new Map(),
essence: 'text/plain'
})

t.same(parseMIMEType('text/html;charset="shift_jis"iso-2022-jp'), {
type: 'text',
subtype: 'html',
parameters: new Map([['charset', 'shift_jis']])
parameters: new Map([['charset', 'shift_jis']]),
essence: 'text/html'
})

t.same(parseMIMEType('application/javascript'), {
type: 'application',
subtype: 'javascript',
parameters: new Map()
parameters: new Map(),
essence: 'application/javascript'
})

t.end()
Expand Down
4 changes: 2 additions & 2 deletions test/fetch/response.js
Expand Up @@ -171,7 +171,7 @@ test('Modifying headers using Headers.prototype.set', (t) => {
})

// https://github.com/nodejs/node/issues/43838
test('constructing a Response with a ReadableStream body', async (t) => {
test('constructing a Response with a ReadableStream body', { skip: process.version.startsWith('v16.') }, async (t) => {
const text = '{"foo":"bar"}'
const uint8 = new TextEncoder().encode(text)

Expand Down Expand Up @@ -209,7 +209,7 @@ test('constructing a Response with a ReadableStream body', async (t) => {
t.end()
})

t.test('Readable with ArrayBuffer chunk still throws', async (t) => {
t.test('Readable with ArrayBuffer chunk still throws', { skip: process.version.startsWith('v16.') }, async (t) => {
const readable = new ReadableStream({
start (controller) {
controller.enqueue(uint8.buffer)
Expand Down

0 comments on commit 4937381

Please sign in to comment.