From 37d09c53ab98a416589b15a49f44291636a9a472 Mon Sep 17 00:00:00 2001 From: tsctx <91457664+tsctx@users.noreply.github.com> Date: Fri, 1 Mar 2024 22:41:28 +0900 Subject: [PATCH] WIP --- lib/dispatcher/client-h1.js | 2 +- lib/web/fetch/body.js | 303 +++++++++++++++++++++++++++++------- lib/web/fetch/constants.js | 1 + lib/web/fetch/index.js | 16 +- lib/web/fetch/request.js | 36 +++-- lib/web/fetch/response.js | 52 ++++++- lib/web/fetch/util.js | 5 +- 7 files changed, 327 insertions(+), 88 deletions(-) diff --git a/lib/dispatcher/client-h1.js b/lib/dispatcher/client-h1.js index 46b63087826..043ce6e6b78 100644 --- a/lib/dispatcher/client-h1.js +++ b/lib/dispatcher/client-h1.js @@ -857,7 +857,7 @@ function writeH1 (client, request) { extractBody = require('../web/fetch/body.js').extractBody } - const [bodyStream, contentType] = extractBody(body) + const { 0: bodyStream, 1: contentType } = extractBody(body) if (request.contentType == null) { headers.push('content-type', contentType) } diff --git a/lib/web/fetch/body.js b/lib/web/fetch/body.js index 932df3e6532..d55659b5fbe 100644 --- a/lib/web/fetch/body.js +++ b/lib/web/fetch/body.js @@ -5,7 +5,6 @@ const util = require('../../core/util') const { ReadableStreamFrom, isBlobLike, - isReadableStreamLike, readableStreamClose, createDeferredPromise, fullyReadBody, @@ -27,6 +26,199 @@ const File = NativeFile ?? UndiciFile const textEncoder = new TextEncoder() const textDecoder = new TextDecoder() +// https://github.com/nodejs/node/issues/44985 +// fix-patch: https://github.com/nodejs/node/pull/51526 +const needReadableStreamTee = util.nodeMajor <= 20 || (util.nodeMajor === 21 && util.nodeMinor <= 6) + +class InnerBody { + /** + * @type {ReadableStream | null} + */ + #stream + /** + * @type {string | Uint8Array | null} + */ + #staticStream + /** + * @type {string | Uint8Array | Blob | FormData | null} + */ + #source + /** + * @type {number | null} + */ + #length + /** + * @type {boolean} + */ + #consumed + /** + * @param {ReadableStream | string | Uint8Array} stream + * @param {string | Uint8Array | Blob | FormData | null} source + * @param {number | null} length + */ + constructor (stream, source, length) { + if (stream instanceof ReadableStream) { + this.#stream = stream + this.#staticStream = null + } else { + this.#stream = null + this.#staticStream = stream + } + this.#source = source ?? null + this.#length = length ?? null + this.#consumed = false + } + + /** + * @returns {ReadableStream | null} + */ + get stream () { + if (this.#stream !== null) { + return this.#stream + } + + const source = this.#staticStream + + if (source === null) { + throw new TypeError('Unreachable') + } + + this.#staticStream = null + + // static body consumed + if (this.#consumed) { + this.#stream = new ReadableStream({ type: 'bytes' }) + // TODO: Is this the best way to force a lock? + this.#stream.getReader() // Ensure stream is locked. + } else { + this.#stream = new ReadableStream({ + async pull (controller) { + const buffer = typeof source === 'string' ? textEncoder.encode(source) : source + if (buffer.byteLength > 0) { + controller.enqueue(buffer) + } + + queueMicrotask(() => readableStreamClose(controller)) + }, + start () {}, + type: 'bytes' + }) + this.#consumed = true + } + return this.#stream + } + + set stream (stream) { + this.#stream = stream + this.#staticStream = null + } + + get source () { + return this.#source + } + + set source (source) { + this.#source = source ?? null + } + + get length () { + // does not work + // if (this.#length === null && this.#staticStream !== null) { + // return (this.#length = Buffer.byteLength(this.#staticStream)) + // } + return this.#length + } + + set length (length) { + this.#length = length ?? null + } + + /** + * @returns {boolean} + */ + consumed () { + if (this.#stream !== null) { + return util.isDisturbed(this.#stream) + } + return this.#consumed + } + + /** + * @see https://fetch.spec.whatwg.org/#concept-body-consume-body + * @param {(value: unknown) => unknown} convertBytesToJSValue + */ + async consume (convertBytesToJSValue) { + // TODO + } + + /** + * @see https://fetch.spec.whatwg.org/#body-unusable + */ + unusable () { + if (this.#stream !== null) { + return this.#stream.locked || util.isDisturbed(this.#stream) + } + return this.#consumed + } + + /** + * @see https://fetch.spec.whatwg.org/#concept-body-clone + * @returns {InnerBody} + */ + clone () { + // fast-path: staticStream + if (this.#staticStream !== null) { + if (this.unusable()) { + throw new TypeError('The body has already been consumed.') + } + const body = new InnerBody(this.#staticStream, this.#source, this.#length) + return body + } + + // To clone a body body, run these steps: + + // 1. Let « out1, out2 » be the result of teeing body’s stream. + const { 0: out1, 1: out2 } = this.stream.tee() + const out2Clone = structuredClone(out2, { transfer: [out2] }) + /** @type {ReadableStream} */ + let streamClone + if (needReadableStreamTee) { + // This, for whatever reasons, unrefs out2Clone which allows + // the process to exit by itself. + const { 1: finalClone } = out2Clone.tee() + streamClone = finalClone + } else { + streamClone = out2Clone + } + // 2. Set body’s stream to out1. + this.#stream = out1 + // clear static stream + this.#staticStream = null + + // 3. Return a body whose stream is out2 and other members are copied from body. + return new InnerBody(streamClone, this.#source, this.#length) + } + + trySyncReadAsBuffer () { + if (this.unusable()) { + throw new TypeError('The body has already been consumed.') + } + const staticBody = this.#staticStream + if (staticBody === null) { + return null + } + this.#consumed = true + if (typeof staticBody === 'string') { + return textEncoder.encode(staticBody) + } + return staticBody + } + + canSyncReadAsBuffer () { + return this.#staticStream !== null + } +} + // https://fetch.spec.whatwg.org/#concept-bodyinit-extract function extractBody (object, keepalive = false) { // 1. Let stream be null. @@ -42,23 +234,23 @@ function extractBody (object, keepalive = false) { } else { // 4. Otherwise, set stream to a new ReadableStream object, and set // up stream with byte reading support. - stream = new ReadableStream({ - async pull (controller) { - const buffer = typeof source === 'string' ? textEncoder.encode(source) : source - - if (buffer.byteLength) { - controller.enqueue(buffer) - } - - queueMicrotask(() => readableStreamClose(controller)) - }, - start () {}, - type: 'bytes' - }) + // stream = new ReadableStream({ + // async pull (controller) { + // const buffer = typeof source === 'string' ? textEncoder.encode(source) : source + // + // if (buffer.byteLength) { + // controller.enqueue(buffer) + // } + // + // queueMicrotask(() => readableStreamClose(controller)) + // }, + // start () {}, + // type: 'bytes' + // }) } // 5. Assert: stream is a ReadableStream object. - assert(isReadableStreamLike(stream)) + // assert(isReadableStreamLike(stream)) // 6. Let action be null. let action = null @@ -125,17 +317,21 @@ function extractBody (object, keepalive = false) { for (const [name, value] of object) { if (typeof value === 'string') { - const chunk = textEncoder.encode(prefix + - `; name="${escape(normalizeLinefeeds(name))}"` + - `\r\n\r\n${normalizeLinefeeds(value)}\r\n`) + const chunk = textEncoder.encode( + `${prefix}; name="${escape( + normalizeLinefeeds(name) + )}"\r\n\r\n${normalizeLinefeeds(value)}\r\n` + ) blobParts.push(chunk) length += chunk.byteLength } else { - const chunk = textEncoder.encode(`${prefix}; name="${escape(normalizeLinefeeds(name))}"` + - (value.name ? `; filename="${escape(value.name)}"` : '') + '\r\n' + - `Content-Type: ${ + const chunk = textEncoder.encode( + `${prefix}; name="${escape(normalizeLinefeeds(name))}"${ + value.name ? `; filename="${escape(value.name)}"` : '' + }\r\nContent-Type: ${ value.type || 'application/octet-stream' - }\r\n\r\n`) + }\r\n\r\n` + ) blobParts.push(chunk, value, rn) if (typeof value.size === 'number') { length += chunk.byteLength + value.size + rn.byteLength @@ -203,7 +399,9 @@ function extractBody (object, keepalive = false) { // 11. If source is a byte sequence, then set action to a // step that returns source and length to source’s length. if (typeof source === 'string' || util.isBuffer(source)) { + // TODO: lazy init length = Buffer.byteLength(source) + stream = source } // 12. If action is non-null, then run these steps in in parallel: @@ -228,7 +426,7 @@ function extractBody (object, keepalive = false) { // bytes into stream. if (!isErrored(stream)) { const buffer = new Uint8Array(value) - if (buffer.byteLength) { + if (buffer.byteLength > 0) { controller.enqueue(buffer) } } @@ -241,10 +439,13 @@ function extractBody (object, keepalive = false) { type: 'bytes' }) } + // else if (typeof source === 'string' || util.isBuffer(source)) { + // stream = source + // } // 13. Let body be a body whose stream is stream, source is source, // and length is length. - const body = { stream, source, length } + const body = new InnerBody(stream, source, length) // 14. Return (body, type). return [body, type] @@ -268,29 +469,6 @@ function safelyExtractBody (object, keepalive = false) { return extractBody(object, keepalive) } -function cloneBody (body) { - // To clone a body body, run these steps: - - // https://fetch.spec.whatwg.org/#concept-body-clone - - // 1. Let « out1, out2 » be the result of teeing body’s stream. - const [out1, out2] = body.stream.tee() - const out2Clone = structuredClone(out2, { transfer: [out2] }) - // This, for whatever reasons, unrefs out2Clone which allows - // the process to exit by itself. - const [, finalClone] = out2Clone.tee() - - // 2. Set body’s stream to out1. - body.stream = out1 - - // 3. Return a body whose stream is out2 and other members are copied from body. - return { - stream: finalClone, - length: body.length, - source: body.source - } -} - function throwIfAborted (state) { if (state.aborted) { throw new DOMException('The operation was aborted.', 'AbortError') @@ -435,7 +613,7 @@ function bodyMixinMethods (instance) { // 3. Return a new FormData object whose entries are entries. const formData = new FormData() - for (const [name, value] of entries) { + for (const { 0: name, 1: value } of entries) { formData.append(name, value) } return formData @@ -473,9 +651,15 @@ async function consumeBody (object, convertBytesToJSValue, instance) { throwIfAborted(object[kState]) + const body = object[kState].body + // 1. If object is unusable, then return a promise rejected // with a TypeError. - if (bodyUnusable(object[kState].body)) { + + // An object including the Body interface mixin is + // said to be unusable if its body is non-null and + // its body’s stream is disturbed or locked. + if (body != null && body.unusable()) { throw new TypeError('Body is unusable') } @@ -499,27 +683,26 @@ async function consumeBody (object, convertBytesToJSValue, instance) { // 5. If object’s body is null, then run successSteps with an // empty byte sequence. - if (object[kState].body == null) { + if (body == null) { successSteps(new Uint8Array()) return promise.promise } + // fast-path + // https://github.com/nodejs/undici/issues/2164 + if (body.canSyncReadAsBuffer()) { + successSteps(body.trySyncReadAsBuffer()) + return promise.promise + } + // 6. Otherwise, fully read object’s body given successSteps, // errorSteps, and object’s relevant global object. - await fullyReadBody(object[kState].body, successSteps, errorSteps) + await fullyReadBody(body, successSteps, errorSteps) // 7. Return promise. return promise.promise } -// https://fetch.spec.whatwg.org/#body-unusable -function bodyUnusable (body) { - // An object including the Body interface mixin is - // said to be unusable if its body is non-null and - // its body’s stream is disturbed or locked. - return body != null && (body.stream.locked || util.isDisturbed(body.stream)) -} - /** * @see https://encoding.spec.whatwg.org/#utf-8-decode * @param {Buffer} buffer @@ -578,8 +761,8 @@ function bodyMimeType (requestOrResponse) { } module.exports = { + InnerBody, extractBody, safelyExtractBody, - cloneBody, mixinBody } diff --git a/lib/web/fetch/constants.js b/lib/web/fetch/constants.js index ada622feed5..c30c3716644 100644 --- a/lib/web/fetch/constants.js +++ b/lib/web/fetch/constants.js @@ -53,6 +53,7 @@ const requestCache = [ ] // https://fetch.spec.whatwg.org/#request-body-header-name +// Note: The header names are should be lowercase. const requestBodyHeader = [ 'content-encoding', 'content-language', diff --git a/lib/web/fetch/index.js b/lib/web/fetch/index.js index 37e269fbc93..4cb21e163b7 100644 --- a/lib/web/fetch/index.js +++ b/lib/web/fetch/index.js @@ -49,7 +49,7 @@ const { } = require('./util') const { kState, kDispatcher } = require('./symbols') const assert = require('node:assert') -const { safelyExtractBody, extractBody } = require('./body') +const { safelyExtractBody, extractBody, InnerBody } = require('./body') const { redirectStatusSet, nullBodyStatus, @@ -1313,8 +1313,8 @@ function httpRedirectFetch (fetchParams, response) { // 2. For each headerName of request-body-header name, delete headerName from // request’s header list. - for (const headerName of requestBodyHeader) { - request.headersList.delete(headerName) + for (let i = 0; i < requestBodyHeader.length; ++i) { + request.headersList.delete(requestBodyHeader[i], true) } } @@ -1475,7 +1475,7 @@ async function httpNetworkOrCacheFetch ( // user agents should append `User-Agent`/default `User-Agent` value to // httpRequest’s header list. if (!httpRequest.headersList.contains('user-agent', true)) { - httpRequest.headersList.append('user-agent', defaultUserAgent) + httpRequest.headersList.append('user-agent', defaultUserAgent, true) } // 15. If httpRequest’s cache mode is "default" and httpRequest’s header @@ -1942,7 +1942,7 @@ async function httpNetworkFetch ( // 17. Run these steps, but abort when the ongoing fetch is terminated: // 1. Set response’s body to a new body whose stream is stream. - response.body = { stream } + response.body = new InnerBody(stream, null, null) // 2. If response is not a network error and request’s cache mode is // not "no-store", then update response in httpCache for request. @@ -2078,7 +2078,11 @@ async function httpNetworkFetch ( path: url.pathname + url.search, origin: url.origin, method: request.method, - body: agent.isMockActive ? request.body && (request.body.source || request.body.stream) : body, + // https://github.com/nodejs/undici/issues/2418 + body: agent.isMockActive + // FIXME: Why prioritize source? + ? request.body && (request.body.source || request.body.stream) + : body, headers: request.headersList.entries, maxRedirections: 0, upgrade: request.mode === 'websocket' ? 'websocket' : undefined diff --git a/lib/web/fetch/request.js b/lib/web/fetch/request.js index afe92499267..fc8b65a903f 100644 --- a/lib/web/fetch/request.js +++ b/lib/web/fetch/request.js @@ -2,7 +2,7 @@ 'use strict' -const { extractBody, mixinBody, cloneBody } = require('./body') +const { extractBody, mixinBody, InnerBody } = require('./body') const { Headers, fill: fillHeaders, HeadersList } = require('./headers') const { FinalizationRegistry } = require('./dispatcher-weakref')() const util = require('../../core/util') @@ -50,7 +50,7 @@ class Request { webidl.argumentLengthCheck(arguments, 1, { header: 'Request constructor' }) - input = webidl.converters.RequestInfo(input) + input = webidl.converters.RequestInfo_DOMString(input) init = webidl.converters.RequestInit(init) // https://html.spec.whatwg.org/multipage/webappapis.html#environment-settings-object @@ -467,7 +467,8 @@ class Request { // list, append header’s name/header’s value to this’s headers. if (headers instanceof HeadersList) { for (const [key, val] of headers) { - headersList.append(key, val) + // Note: The header names are already in lowercase. + headersList.append(key, val, true) } // Note: Copy the `set-cookie` meta-data. headersList.cookies = headers.cookies @@ -499,7 +500,7 @@ class Request { // 1. Let Content-Type be null. // 2. Set initBody and Content-Type to the result of extracting // init["body"], with keepalive set to request’s keepalive. - const [extractedBody, contentType] = extractBody( + const { 0: extractedBody, 1: contentType } = extractBody( init.body, request.keepalive ) @@ -554,11 +555,7 @@ class Request { // https://streams.spec.whatwg.org/#readablestream-create-a-proxy const identityTransform = new TransformStream() inputBody.stream.pipeThrough(identityTransform) - finalBody = { - source: inputBody.source, - length: inputBody.length, - stream: identityTransform.readable - } + finalBody = new InnerBody(identityTransform.readable, inputBody.source, inputBody.length) } // 41. Set this’s request’s body to finalBody. @@ -726,13 +723,13 @@ class Request { get body () { webidl.brandCheck(this, Request) - return this[kState].body ? this[kState].body.stream : null + return this[kState].body !== null ? this[kState].body.stream : null } get bodyUsed () { webidl.brandCheck(this, Request) - return !!this[kState].body && util.isDisturbed(this[kState].body.stream) + return this[kState].body !== null && this[kState].body.unusable() } get duplex () { @@ -832,7 +829,7 @@ function cloneRequest (request) { // 2. If request’s body is non-null, set newRequest’s body to the // result of cloning request’s body. if (request.body != null) { - newRequest.body = cloneBody(request.body) + newRequest.body = request.body.clone() } // 3. Return newRequest. @@ -904,6 +901,19 @@ webidl.converters.RequestInfo = function (V) { return webidl.converters.USVString(V) } +// DOMString is used because the value is converted to a USVString in `new URL()`. +webidl.converters.RequestInfo_DOMString = function (V) { + if (typeof V === 'string') { + return webidl.converters.DOMString(V) + } + + if (V instanceof Request) { + return webidl.converters.Request(V) + } + + return webidl.converters.DOMString(V) +} + webidl.converters.AbortSignal = webidl.interfaceConverter( AbortSignal ) @@ -921,7 +931,7 @@ webidl.converters.RequestInit = webidl.dictionaryConverter([ { key: 'body', converter: webidl.nullableConverter( - webidl.converters.BodyInit + webidl.converters.BodyInit_DOMString ) }, { diff --git a/lib/web/fetch/response.js b/lib/web/fetch/response.js index e31f619590f..d5c450e892c 100644 --- a/lib/web/fetch/response.js +++ b/lib/web/fetch/response.js @@ -1,7 +1,7 @@ 'use strict' const { Headers, HeadersList, fill } = require('./headers') -const { extractBody, cloneBody, mixinBody } = require('./body') +const { extractBody, mixinBody } = require('./body') const util = require('../../core/util') const { kEnumerableProperty } = util const { @@ -77,7 +77,8 @@ class Response { webidl.argumentLengthCheck(arguments, 1, { header: 'Response.redirect' }) - url = webidl.converters.USVString(url) + // DOMString is used because the value is converted to a USVString in `new URL()`. + url = webidl.converters.DOMString(url) status = webidl.converters['unsigned short'](status) // 1. Let parsedURL be the result of parsing url with current settings @@ -120,7 +121,7 @@ class Response { } if (body !== null) { - body = webidl.converters.BodyInit(body) + body = webidl.converters.BodyInit_DOMString(body) } init = webidl.converters.ResponseInit(init) @@ -224,13 +225,13 @@ class Response { get body () { webidl.brandCheck(this, Response) - return this[kState].body ? this[kState].body.stream : null + return this[kState].body !== null ? this[kState].body.stream : null } get bodyUsed () { webidl.brandCheck(this, Response) - return !!this[kState].body && util.isDisturbed(this[kState].body.stream) + return this[kState].body !== null && this[kState].body.unusable() } // Returns a clone of response. @@ -299,7 +300,7 @@ function cloneResponse (response) { // 3. If response’s body is non-null, then set newResponse’s body to the // result of cloning response’s body. if (response.body != null) { - newResponse.body = cloneBody(response.body) + newResponse.body = response.body.clone() } // 4. Return newResponse. @@ -540,6 +541,30 @@ webidl.converters.XMLHttpRequestBodyInit = function (V) { return webidl.converters.DOMString(V) } +webidl.converters.XMLHttpRequestBodyInit_DOMString = function (V) { + if (typeof V === 'string') { + return webidl.converters.USVString(V) + } + + if (isBlobLike(V)) { + return webidl.converters.Blob(V, { strict: false }) + } + + if (ArrayBuffer.isView(V) || types.isArrayBuffer(V)) { + return webidl.converters.BufferSource(V) + } + + if (util.isFormDataLike(V)) { + return webidl.converters.FormData(V, { strict: false }) + } + + if (V instanceof URLSearchParams) { + return webidl.converters.URLSearchParams(V) + } + + return webidl.converters.DOMString(V) +} + // https://fetch.spec.whatwg.org/#bodyinit webidl.converters.BodyInit = function (V) { if (V instanceof ReadableStream) { @@ -555,6 +580,21 @@ webidl.converters.BodyInit = function (V) { return webidl.converters.XMLHttpRequestBodyInit(V) } +// https://fetch.spec.whatwg.org/#bodyinit +webidl.converters.BodyInit_DOMString = function (V) { + if (V instanceof ReadableStream) { + return webidl.converters.ReadableStream(V) + } + + // Note: the spec doesn't include async iterables, + // this is an undici extension. + if (V?.[Symbol.asyncIterator]) { + return V + } + + return webidl.converters.XMLHttpRequestBodyInit_DOMString(V) +} + webidl.converters.ResponseInit = webidl.dictionaryConverter([ { key: 'status', diff --git a/lib/web/fetch/util.js b/lib/web/fetch/util.js index 92bcb6cb202..eefe372e785 100644 --- a/lib/web/fetch/util.js +++ b/lib/web/fetch/util.js @@ -1322,8 +1322,9 @@ function extractMimeType (headers) { // 6.4.2. If mimeType’s parameters["charset"] exists, then set charset to // mimeType’s parameters["charset"]. - if (mimeType.parameters.has('charset')) { - charset = mimeType.parameters.get('charset') + const maybeCharset = mimeType.parameters.get('charset') + if (maybeCharset !== undefined) { + charset = maybeCharset } // 6.4.3. Set essence to mimeType’s essence.