Skip to content

Commit

Permalink
Fix resource leak by removing abort event listeners on destroy
Browse files Browse the repository at this point in the history
Not sure that this fix is entirely correct because I may have missed clean
up points. Also, needs a test still.

Fixes #2160
  • Loading branch information
marcelmeulemans committed Nov 16, 2022
1 parent 623229f commit 30f1f0d
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 18 deletions.
26 changes: 18 additions & 8 deletions source/core/index.ts
Expand Up @@ -176,6 +176,7 @@ export default class Request extends Duplex implements RequestEvents<Request> {
private _triggerRead: boolean;
declare private _jobs: Array<() => void>;
private _cancelTimeouts: () => void;
private readonly _removeListeners: () => void;
private _nativeResponse?: IncomingMessageWithTimings;
private _flushed: boolean;
private _aborted: boolean;
Expand All @@ -199,6 +200,7 @@ export default class Request extends Duplex implements RequestEvents<Request> {
this._unproxyEvents = noop;
this._triggerRead = false;
this._cancelTimeouts = noop;
this._removeListeners = noop;
this._jobs = [];
this._flushed = false;
this._requestInitialized = false;
Expand Down Expand Up @@ -247,14 +249,6 @@ export default class Request extends Duplex implements RequestEvents<Request> {
return;
}

if (this.options.signal?.aborted) {
this.destroy(new AbortError(this));
}

this.options.signal?.addEventListener('abort', () => {
this.destroy(new AbortError(this));
});

// Important! If you replace `body` in a handler with another stream, make sure it's readable first.
// The below is run only once.
const {body} = this.options;
Expand All @@ -271,6 +265,21 @@ export default class Request extends Duplex implements RequestEvents<Request> {
}
});
}

if (this.options.signal) {
const abort = () => {
this.destroy(new AbortError(this));
};

if (this.options.signal.aborted) {
abort();
} else {
this.options.signal.addEventListener('abort', abort);
this._removeListeners = () => {
this.options.signal.removeEventListener('abort', abort);
};
}
}
}

async flush() {
Expand Down Expand Up @@ -508,6 +517,7 @@ export default class Request extends Duplex implements RequestEvents<Request> {
// Prevent further retries
this._stopRetry();
this._cancelTimeouts();
this._removeListeners();

if (this.options) {
const {body} = this.options;
Expand Down
57 changes: 47 additions & 10 deletions test/abort.ts
Expand Up @@ -5,6 +5,7 @@ import test from 'ava';
import delay from 'delay';
import {pEvent} from 'p-event';
import type {Handler} from 'express';
import {createSandbox} from 'sinon';
import got from '../source/index.js';
import slowDataStream from './helpers/slow-data-stream.js';
import type {GlobalClock} from './helpers/types.js';
Expand Down Expand Up @@ -64,9 +65,25 @@ if (globalThis.AbortController !== undefined) {
);
};

const sandbox = createSandbox();

const createAbortController = (): {controller: AbortController; signalHandlersRemoved: () => boolean} => {
const controller = new AbortController();
sandbox.spy(controller.signal);
// @ts-expect-error AbortSignal type definition issue: https://github.com/DefinitelyTyped/DefinitelyTyped/discussions/57805
const signalHandlersRemoved = () => controller.signal.addEventListener.callCount === controller.signal.removeEventListener.callCount;
return {
controller, signalHandlersRemoved,
};
};

test.afterEach(() => {
sandbox.restore();
});

test.serial('does not retry after abort', withServerAndFakeTimers, async (t, server, got, clock) => {
const {emitter, promise} = prepareServer(server, clock);
const controller = new AbortController();
const {controller, signalHandlersRemoved} = createAbortController();

const gotPromise = got('redirect', {
signal: controller.signal,
Expand All @@ -88,12 +105,14 @@ if (globalThis.AbortController !== undefined) {
});

await t.notThrowsAsync(promise, 'Request finished instead of aborting.');

t.true(signalHandlersRemoved(), 'Abort signal event handlers not removed');
});

test.serial('abort request timeouts', withServer, async (t, server, got) => {
server.get('/', () => {});

const controller = new AbortController();
const {controller, signalHandlersRemoved} = createAbortController();

const gotPromise = got({
signal: controller.signal,
Expand Down Expand Up @@ -121,14 +140,16 @@ if (globalThis.AbortController !== undefined) {
message: 'This operation was aborted.',
});

t.true(signalHandlersRemoved(), 'Abort signal event handlers not removed');

// Wait for unhandled errors
await delay(40);
});

test.serial('aborts in-progress request', withServerAndFakeTimers, async (t, server, got, clock) => {
const {emitter, promise} = prepareServer(server, clock);

const controller = new AbortController();
const {controller, signalHandlersRemoved} = createAbortController();

const body = new ReadableStream({
read() {},
Expand All @@ -148,12 +169,14 @@ if (globalThis.AbortController !== undefined) {
message: 'This operation was aborted.',
});
await t.notThrowsAsync(promise, 'Request finished instead of aborting.');

t.true(signalHandlersRemoved(), 'Abort signal event handlers not removed');
});

test.serial('aborts in-progress request with timeout', withServerAndFakeTimers, async (t, server, got, clock) => {
const {emitter, promise} = prepareServer(server, clock);

const controller = new AbortController();
const {controller, signalHandlersRemoved} = createAbortController();

const body = new ReadableStream({
read() {},
Expand All @@ -173,10 +196,12 @@ if (globalThis.AbortController !== undefined) {
message: 'This operation was aborted.',
});
await t.notThrowsAsync(promise, 'Request finished instead of aborting.');

t.true(signalHandlersRemoved(), 'Abort signal event handlers not removed');
});

test.serial('abort immediately', withServerAndFakeTimers, async (t, server, got, clock) => {
const controller = new AbortController();
const {controller, signalHandlersRemoved} = createAbortController();

const promise = new Promise<void>((resolve, reject) => {
// We won't get an abort or even a connection
Expand All @@ -198,11 +223,13 @@ if (globalThis.AbortController !== undefined) {
message: 'This operation was aborted.',
});
await t.notThrowsAsync(promise, 'Request finished instead of aborting.');

t.true(signalHandlersRemoved(), 'Abort signal event handlers not removed');
});

test('recover from abort using abortable promise attribute', async t => {
// Abort before connection started
const controller = new AbortController();
const {controller, signalHandlersRemoved} = createAbortController();

const p = got('http://example.com', {signal: controller.signal});
const recover = p.catch((error: Error) => {
Expand All @@ -216,10 +243,12 @@ if (globalThis.AbortController !== undefined) {
controller.abort();

await t.notThrowsAsync(recover);

t.true(signalHandlersRemoved(), 'Abort signal event handlers not removed');
});

test('recover from abort using error instance', async t => {
const controller = new AbortController();
const {controller, signalHandlersRemoved} = createAbortController();

const p = got('http://example.com', {signal: controller.signal});
const recover = p.catch((error: Error) => {
Expand All @@ -233,13 +262,15 @@ if (globalThis.AbortController !== undefined) {
controller.abort();

await t.notThrowsAsync(recover);

t.true(signalHandlersRemoved(), 'Abort signal event handlers not removed');
});

// TODO: Use `fakeTimers` here
test.serial('throws on incomplete (aborted) response', withServer, async (t, server, got) => {
server.get('/', downloadHandler());

const controller = new AbortController();
const {controller, signalHandlersRemoved} = createAbortController();

const promise = got('', {signal: controller.signal});

Expand All @@ -251,6 +282,8 @@ if (globalThis.AbortController !== undefined) {
code: 'ERR_ABORTED',
message: 'This operation was aborted.',
});

t.true(signalHandlersRemoved(), 'Abort signal event handlers not removed');
});

test('throws when aborting cached request', withServer, async (t, server, got) => {
Expand All @@ -263,18 +296,20 @@ if (globalThis.AbortController !== undefined) {

await got({cache});

const controller = new AbortController();
const {controller, signalHandlersRemoved} = createAbortController();
const promise = got({cache, signal: controller.signal});
controller.abort();

await t.throwsAsync(promise, {
code: 'ERR_ABORTED',
message: 'This operation was aborted.',
});

t.true(signalHandlersRemoved(), 'Abort signal event handlers not removed');
});

test('support setting the signal as a default option', async t => {
const controller = new AbortController();
const {controller, signalHandlersRemoved} = createAbortController();

const got2 = got.extend({signal: controller.signal});
const p = got2('http://example.com', {signal: controller.signal});
Expand All @@ -284,6 +319,8 @@ if (globalThis.AbortController !== undefined) {
code: 'ERR_ABORTED',
message: 'This operation was aborted.',
});

t.true(signalHandlersRemoved(), 'Abort signal event handlers not removed');
});
} else {
test('x', t => {
Expand Down

0 comments on commit 30f1f0d

Please sign in to comment.