Skip to content

Commit

Permalink
use FinalizationRegistry to cancel the body if response is collected (#…
Browse files Browse the repository at this point in the history
…3199)

* use FinalizationRegistry to cancel the body if response is collected

Signed-off-by: Matteo Collina <hello@matteocollina.com>

* fixup

Signed-off-by: Matteo Collina <hello@matteocollina.com>

* fixup

Signed-off-by: Matteo Collina <hello@matteocollina.com>

* fixup

Signed-off-by: Matteo Collina <hello@matteocollina.com>

* fixup

Signed-off-by: Matteo Collina <hello@matteocollina.com>

* fixup

Signed-off-by: Matteo Collina <hello@matteocollina.com>

* fixup

Signed-off-by: Matteo Collina <hello@matteocollina.com>

* fixup

Signed-off-by: Matteo Collina <hello@matteocollina.com>

* fixup

Signed-off-by: Matteo Collina <hello@matteocollina.com>

* fixup

Signed-off-by: Matteo Collina <hello@matteocollina.com>

* fixup

Signed-off-by: Matteo Collina <hello@matteocollina.com>

* fixup

Signed-off-by: Matteo Collina <hello@matteocollina.com>

* fixup

Signed-off-by: Matteo Collina <hello@matteocollina.com>

* fixup

Signed-off-by: Matteo Collina <hello@matteocollina.com>

* fixup

Signed-off-by: Matteo Collina <hello@matteocollina.com>

* fixup

Signed-off-by: Matteo Collina <hello@matteocollina.com>

* fixup

Signed-off-by: Matteo Collina <hello@matteocollina.com>

* fixup

Signed-off-by: Matteo Collina <hello@matteocollina.com>

* fixup

Signed-off-by: Matteo Collina <hello@matteocollina.com>

* fixup

Signed-off-by: Matteo Collina <hello@matteocollina.com>

* fixup

Signed-off-by: Matteo Collina <hello@matteocollina.com>

* fixup

Signed-off-by: Matteo Collina <hello@matteocollina.com>

* fixup

Signed-off-by: Matteo Collina <hello@matteocollina.com>

* fixup

Signed-off-by: Matteo Collina <hello@matteocollina.com>

* Update lib/dispatcher/client-h2.js

Co-authored-by: Aras Abbasi <aras.abbasi@googlemail.com>

---------

Signed-off-by: Matteo Collina <hello@matteocollina.com>
Co-authored-by: Aras Abbasi <aras.abbasi@googlemail.com>
  • Loading branch information
mcollina and Uzlopak committed May 7, 2024
1 parent c0dc3dd commit 63b7794
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 10 deletions.
34 changes: 25 additions & 9 deletions lib/web/fetch/index.js
Expand Up @@ -120,12 +120,16 @@ class Fetch extends EE {
}
}

function handleFetchDone (response) {
finalizeAndReportTiming(response, 'fetch')
}

// https://fetch.spec.whatwg.org/#fetch-method
function fetch (input, init = undefined) {
webidl.argumentLengthCheck(arguments, 1, 'globalThis.fetch')

// 1. Let p be a new promise.
const p = createDeferredPromise()
let p = createDeferredPromise()

// 2. Let requestObject be the result of invoking the initial value of
// Request as constructor with input and init as arguments. If this throws
Expand Down Expand Up @@ -185,16 +189,17 @@ function fetch (input, init = undefined) {
// 3. Abort controller with requestObject’s signal’s abort reason.
controller.abort(requestObject.signal.reason)

const realResponse = responseObject?.deref()

// 4. Abort the fetch() call with p, request, responseObject,
// and requestObject’s signal’s abort reason.
abortFetch(p, request, responseObject, requestObject.signal.reason)
abortFetch(p, request, realResponse, requestObject.signal.reason)
}
)

// 12. Let handleFetchDone given response response be to finalize and
// report timing with response, globalObject, and "fetch".
const handleFetchDone = (response) =>
finalizeAndReportTiming(response, 'fetch')
// see function handleFetchDone

// 13. Set controller to the result of calling fetch given request,
// with processResponseEndOfBody set to handleFetchDone, and processResponse
Expand Down Expand Up @@ -228,10 +233,11 @@ function fetch (input, init = undefined) {

// 4. Set responseObject to the result of creating a Response object,
// given response, "immutable", and relevantRealm.
responseObject = fromInnerResponse(response, 'immutable')
responseObject = new WeakRef(fromInnerResponse(response, 'immutable'))

// 5. Resolve p with responseObject.
p.resolve(responseObject)
p.resolve(responseObject.deref())
p = null
}

controller = fetching({
Expand Down Expand Up @@ -314,7 +320,10 @@ const markResourceTiming = performance.markResourceTiming
// https://fetch.spec.whatwg.org/#abort-fetch
function abortFetch (p, request, responseObject, error) {
// 1. Reject promise with error.
p.reject(error)
if (p) {
// We might have already resolved the promise at this stage
p.reject(error)
}

// 2. If request’s body is not null and is readable, then cancel request’s
// body with error.
Expand Down Expand Up @@ -1066,7 +1075,10 @@ function fetchFinale (fetchParams, response) {
// 4. If fetchParams’s process response is non-null, then queue a fetch task to run fetchParams’s
// process response given response, with fetchParams’s task destination.
if (fetchParams.processResponse != null) {
queueMicrotask(() => fetchParams.processResponse(response))
queueMicrotask(() => {
fetchParams.processResponse(response)
fetchParams.processResponse = null
})
}

// 5. Let internalResponse be response, if response is a network error; otherwise response’s internal response.
Expand Down Expand Up @@ -1884,7 +1896,11 @@ async function httpNetworkFetch (
// 12. Let cancelAlgorithm be an algorithm that aborts fetchParams’s
// controller with reason, given reason.
const cancelAlgorithm = (reason) => {
fetchParams.controller.abort(reason)
// If the aborted fetch was already terminated, then we do not
// need to do anything.
if (!isCancelled(fetchParams)) {
fetchParams.controller.abort(reason)
}
}

// 13. Let highWaterMark be a non-negative, non-NaN number, chosen by
Expand Down
19 changes: 19 additions & 0 deletions lib/web/fetch/response.js
Expand Up @@ -26,9 +26,23 @@ const { URLSerializer } = require('./data-url')
const { kHeadersList, kConstruct } = require('../../core/symbols')
const assert = require('node:assert')
const { types } = require('node:util')
const { isDisturbed, isErrored } = require('node:stream')

const textEncoder = new TextEncoder('utf-8')

const hasFinalizationRegistry = globalThis.FinalizationRegistry && process.version.indexOf('v18') !== 0
let registry

if (hasFinalizationRegistry) {
registry = new FinalizationRegistry((stream) => {
if (!stream.locked && !isDisturbed(stream) && !isErrored(stream)) {
stream.cancel('Response object has been garbage collected').catch(noop)
}
})
}

function noop () {}

// https://fetch.spec.whatwg.org/#response-class
class Response {
// Creates network error Response.
Expand Down Expand Up @@ -510,6 +524,11 @@ function fromInnerResponse (innerResponse, guard) {
response[kHeaders] = new Headers(kConstruct)
response[kHeaders][kHeadersList] = innerResponse.headersList
response[kHeaders][kGuard] = guard

if (hasFinalizationRegistry && innerResponse.body?.stream) {
registry.register(response, innerResponse.body.stream)
}

return response
}

Expand Down
2 changes: 1 addition & 1 deletion package.json
Expand Up @@ -77,7 +77,7 @@
"test:eventsource:nobuild": "borp --expose-gc -p \"test/eventsource/*.js\"",
"test:fuzzing": "node test/fuzzing/fuzzing.test.js",
"test:fetch": "npm run build:node && npm run test:fetch:nobuild",
"test:fetch:nobuild": "borp --expose-gc -p \"test/fetch/*.js\" && npm run test:webidl && npm run test:busboy",
"test:fetch:nobuild": "borp --timeout 180000 --expose-gc --concurrency 1 -p \"test/fetch/*.js\" && npm run test:webidl && npm run test:busboy",
"test:interceptors": "borp -p \"test/interceptors/*.js\"",
"test:jest": "cross-env NODE_V8_COVERAGE= jest",
"test:unit": "borp --expose-gc -p \"test/*.js\"",
Expand Down
53 changes: 53 additions & 0 deletions test/fetch/fire-and-forget.js
@@ -0,0 +1,53 @@
'use strict'

const { randomFillSync } = require('node:crypto')
const { setTimeout: sleep } = require('timers/promises')
const { test } = require('node:test')
const { fetch, Agent, setGlobalDispatcher } = require('../..')
const { createServer } = require('node:http')
const { closeServerAsPromise } = require('../utils/node-http')

const blob = randomFillSync(new Uint8Array(1024 * 512))

// Enable when/if FinalizationRegistry in Node.js 18 becomes stable again
const isNode18 = process.version.startsWith('v18')

test('does not need the body to be consumed to continue', { timeout: 180_000, skip: isNode18 }, async (t) => {
const agent = new Agent({
keepAliveMaxTimeout: 10,
keepAliveTimeoutThreshold: 10
})
setGlobalDispatcher(agent)
const server = createServer((req, res) => {
res.writeHead(200)
res.end(blob)
})
t.after(closeServerAsPromise(server))

await new Promise((resolve) => {
server.listen(0, resolve)
})

const url = new URL(`http://127.0.0.1:${server.address().port}`)

const batch = 50
const delay = 0
let total = 0
while (total < 10000) {
// eslint-disable-next-line no-undef
gc(true)
const array = new Array(batch)
for (let i = 0; i < batch; i++) {
array[i] = fetch(url).catch(() => {})
}
await Promise.all(array)
await sleep(delay)

console.log(
'RSS',
(process.memoryUsage.rss() / 1024 / 1024) | 0,
'MB after',
(total += batch) + ' fetch() requests'
)
}
})

0 comments on commit 63b7794

Please sign in to comment.