Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix resource leak by removing abort event listeners on destroy #2162

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
26 changes: 18 additions & 8 deletions source/core/index.ts
Expand Up @@ -176,6 +176,7 @@ export default class Request extends Duplex implements RequestEvents<Request> {
private _triggerRead: boolean;
declare private _jobs: Array<() => void>;
private _cancelTimeouts: () => void;
private readonly _removeListeners: () => void;
marcelmeulemans marked this conversation as resolved.
Show resolved Hide resolved
private _nativeResponse?: IncomingMessageWithTimings;
private _flushed: boolean;
private _aborted: boolean;
Expand All @@ -199,6 +200,7 @@ export default class Request extends Duplex implements RequestEvents<Request> {
this._unproxyEvents = noop;
this._triggerRead = false;
this._cancelTimeouts = noop;
this._removeListeners = noop;
marcelmeulemans marked this conversation as resolved.
Show resolved Hide resolved
this._jobs = [];
this._flushed = false;
this._requestInitialized = false;
Expand Down Expand Up @@ -247,14 +249,6 @@ export default class Request extends Duplex implements RequestEvents<Request> {
return;
}

if (this.options.signal?.aborted) {
this.destroy(new AbortError(this));
}

this.options.signal?.addEventListener('abort', () => {
this.destroy(new AbortError(this));
});

// Important! If you replace `body` in a handler with another stream, make sure it's readable first.
// The below is run only once.
const {body} = this.options;
Expand All @@ -271,6 +265,21 @@ export default class Request extends Duplex implements RequestEvents<Request> {
}
});
}

if (this.options.signal) {
const abort = () => {
this.destroy(new AbortError(this));
};

if (this.options.signal.aborted) {
abort();
} else {
this.options.signal.addEventListener('abort', abort);
this._removeListeners = () => {
this.options.signal.removeEventListener('abort', abort);
};
}
}
}

async flush() {
Expand Down Expand Up @@ -508,6 +517,7 @@ export default class Request extends Duplex implements RequestEvents<Request> {
// Prevent further retries
this._stopRetry();
this._cancelTimeouts();
this._removeListeners();

if (this.options) {
const {body} = this.options;
Expand Down
57 changes: 47 additions & 10 deletions test/abort.ts
Expand Up @@ -5,6 +5,7 @@ import test from 'ava';
import delay from 'delay';
import {pEvent} from 'p-event';
import type {Handler} from 'express';
import {createSandbox} from 'sinon';
import got from '../source/index.js';
import slowDataStream from './helpers/slow-data-stream.js';
import type {GlobalClock} from './helpers/types.js';
Expand Down Expand Up @@ -64,9 +65,25 @@ if (globalThis.AbortController !== undefined) {
);
};

const sandbox = createSandbox();

const createAbortController = (): {controller: AbortController; signalHandlersRemoved: () => boolean} => {
const controller = new AbortController();
sandbox.spy(controller.signal);
// @ts-expect-error AbortSignal type definition issue: https://github.com/DefinitelyTyped/DefinitelyTyped/discussions/57805
const signalHandlersRemoved = () => controller.signal.addEventListener.callCount === controller.signal.removeEventListener.callCount;
return {
controller, signalHandlersRemoved,
};
};

test.afterEach(() => {
sandbox.restore();
});

test.serial('does not retry after abort', withServerAndFakeTimers, async (t, server, got, clock) => {
const {emitter, promise} = prepareServer(server, clock);
const controller = new AbortController();
const {controller, signalHandlersRemoved} = createAbortController();

const gotPromise = got('redirect', {
signal: controller.signal,
Expand All @@ -88,12 +105,14 @@ if (globalThis.AbortController !== undefined) {
});

await t.notThrowsAsync(promise, 'Request finished instead of aborting.');

t.true(signalHandlersRemoved(), 'Abort signal event handlers not removed');
});

test.serial('abort request timeouts', withServer, async (t, server, got) => {
server.get('/', () => {});

const controller = new AbortController();
const {controller, signalHandlersRemoved} = createAbortController();

const gotPromise = got({
signal: controller.signal,
Expand Down Expand Up @@ -121,14 +140,16 @@ if (globalThis.AbortController !== undefined) {
message: 'This operation was aborted.',
});

t.true(signalHandlersRemoved(), 'Abort signal event handlers not removed');

// Wait for unhandled errors
await delay(40);
});

test.serial('aborts in-progress request', withServerAndFakeTimers, async (t, server, got, clock) => {
const {emitter, promise} = prepareServer(server, clock);

const controller = new AbortController();
const {controller, signalHandlersRemoved} = createAbortController();

const body = new ReadableStream({
read() {},
Expand All @@ -148,12 +169,14 @@ if (globalThis.AbortController !== undefined) {
message: 'This operation was aborted.',
});
await t.notThrowsAsync(promise, 'Request finished instead of aborting.');

t.true(signalHandlersRemoved(), 'Abort signal event handlers not removed');
});

test.serial('aborts in-progress request with timeout', withServerAndFakeTimers, async (t, server, got, clock) => {
const {emitter, promise} = prepareServer(server, clock);

const controller = new AbortController();
const {controller, signalHandlersRemoved} = createAbortController();

const body = new ReadableStream({
read() {},
Expand All @@ -173,10 +196,12 @@ if (globalThis.AbortController !== undefined) {
message: 'This operation was aborted.',
});
await t.notThrowsAsync(promise, 'Request finished instead of aborting.');

t.true(signalHandlersRemoved(), 'Abort signal event handlers not removed');
});

test.serial('abort immediately', withServerAndFakeTimers, async (t, server, got, clock) => {
const controller = new AbortController();
const {controller, signalHandlersRemoved} = createAbortController();

const promise = new Promise<void>((resolve, reject) => {
// We won't get an abort or even a connection
Expand All @@ -198,11 +223,13 @@ if (globalThis.AbortController !== undefined) {
message: 'This operation was aborted.',
});
await t.notThrowsAsync(promise, 'Request finished instead of aborting.');

t.true(signalHandlersRemoved(), 'Abort signal event handlers not removed');
});

test('recover from abort using abortable promise attribute', async t => {
// Abort before connection started
const controller = new AbortController();
const {controller, signalHandlersRemoved} = createAbortController();

const p = got('http://example.com', {signal: controller.signal});
const recover = p.catch((error: Error) => {
Expand All @@ -216,10 +243,12 @@ if (globalThis.AbortController !== undefined) {
controller.abort();

await t.notThrowsAsync(recover);

t.true(signalHandlersRemoved(), 'Abort signal event handlers not removed');
});

test('recover from abort using error instance', async t => {
const controller = new AbortController();
const {controller, signalHandlersRemoved} = createAbortController();

const p = got('http://example.com', {signal: controller.signal});
const recover = p.catch((error: Error) => {
Expand All @@ -233,13 +262,15 @@ if (globalThis.AbortController !== undefined) {
controller.abort();

await t.notThrowsAsync(recover);

t.true(signalHandlersRemoved(), 'Abort signal event handlers not removed');
});

// TODO: Use `fakeTimers` here
test.serial('throws on incomplete (aborted) response', withServer, async (t, server, got) => {
server.get('/', downloadHandler());

const controller = new AbortController();
const {controller, signalHandlersRemoved} = createAbortController();

const promise = got('', {signal: controller.signal});

Expand All @@ -251,6 +282,8 @@ if (globalThis.AbortController !== undefined) {
code: 'ERR_ABORTED',
message: 'This operation was aborted.',
});

t.true(signalHandlersRemoved(), 'Abort signal event handlers not removed');
});

test('throws when aborting cached request', withServer, async (t, server, got) => {
Expand All @@ -263,18 +296,20 @@ if (globalThis.AbortController !== undefined) {

await got({cache});

const controller = new AbortController();
const {controller, signalHandlersRemoved} = createAbortController();
const promise = got({cache, signal: controller.signal});
controller.abort();

await t.throwsAsync(promise, {
code: 'ERR_ABORTED',
message: 'This operation was aborted.',
});

t.true(signalHandlersRemoved(), 'Abort signal event handlers not removed');
});

test('support setting the signal as a default option', async t => {
const controller = new AbortController();
const {controller, signalHandlersRemoved} = createAbortController();

const got2 = got.extend({signal: controller.signal});
const p = got2('http://example.com', {signal: controller.signal});
Expand All @@ -284,6 +319,8 @@ if (globalThis.AbortController !== undefined) {
code: 'ERR_ABORTED',
message: 'This operation was aborted.',
});

t.true(signalHandlersRemoved(), 'Abort signal event handlers not removed');
});
} else {
test('x', t => {
Expand Down