Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(platform): shutdown hooks not firing caused by open http connections #10345

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
79 changes: 79 additions & 0 deletions integration/nest-application/sse/e2e/express.spec.ts
@@ -0,0 +1,79 @@
import { NestExpressApplication } from '@nestjs/platform-express';
import { Test } from '@nestjs/testing';
import { expect } from 'chai';
import * as EventSource from 'eventsource';
import { AppModule } from '../src/app.module';

describe('Sse (Express Application)', () => {
let app: NestExpressApplication;
let eventSource: EventSource;

describe('without forceCloseConnections', () => {
beforeEach(async () => {
const moduleFixture = await Test.createTestingModule({
imports: [AppModule],
}).compile();

app = moduleFixture.createNestApplication<NestExpressApplication>();

await app.listen(3000);
const url = await app.getUrl();

eventSource = new EventSource(url + '/sse', {
headers: { connection: 'keep-alive' },
});
});

// The order of actions is very important here. When not using `forceCloseConnections`,
// the SSe eventsource should close the connections in order to signal the server that
// the keep-alive connection can be ended.
afterEach(async () => {
eventSource.close();

await app.close();
});

it('receives events from server', done => {
eventSource.addEventListener('message', event => {
expect(JSON.parse(event.data)).to.eql({
hello: 'world',
});
done();
});
});
});

describe('with forceCloseConnections', () => {
beforeEach(async () => {
const moduleFixture = await Test.createTestingModule({
imports: [AppModule],
}).compile();

app = moduleFixture.createNestApplication<NestExpressApplication>({
forceCloseConnections: true,
});

await app.listen(3000);
const url = await app.getUrl();

eventSource = new EventSource(url + '/sse', {
headers: { connection: 'keep-alive' },
});
});

afterEach(async () => {
await app.close();

eventSource.close();
});

it('receives events from server', done => {
eventSource.addEventListener('message', event => {
expect(JSON.parse(event.data)).to.eql({
hello: 'world',
});
done();
});
});
});
});
86 changes: 86 additions & 0 deletions integration/nest-application/sse/e2e/fastify.spec.ts
@@ -0,0 +1,86 @@
import {
FastifyAdapter,
NestFastifyApplication,
} from '@nestjs/platform-fastify';
import { Test } from '@nestjs/testing';
import { expect } from 'chai';
import * as EventSource from 'eventsource';
import { AppModule } from '../src/app.module';

describe('Sse (Fastify Application)', () => {
let app: NestFastifyApplication;
let eventSource: EventSource;

describe('without forceCloseConnections', () => {
beforeEach(async () => {
const moduleFixture = await Test.createTestingModule({
imports: [AppModule],
}).compile();

app = moduleFixture.createNestApplication<NestFastifyApplication>(
new FastifyAdapter(),
);

await app.listen(3000);
const url = await app.getUrl();

eventSource = new EventSource(url + '/sse', {
headers: { connection: 'keep-alive' },
});
});

// The order of actions is very important here. When not using `forceCloseConnections`,
// the SSe eventsource should close the connections in order to signal the server that
// the keep-alive connection can be ended.
afterEach(async () => {
eventSource.close();

await app.close();
});

it('receives events from server', done => {
eventSource.addEventListener('message', event => {
expect(JSON.parse(event.data)).to.eql({
hello: 'world',
});
done();
});
});
});

describe('with forceCloseConnections', () => {
beforeEach(async () => {
const moduleFixture = await Test.createTestingModule({
imports: [AppModule],
}).compile();

app = moduleFixture.createNestApplication<NestFastifyApplication>(
new FastifyAdapter({
forceCloseConnections: true,
}),
);

await app.listen(3000);
const url = await app.getUrl();

eventSource = new EventSource(url + '/sse', {
headers: { connection: 'keep-alive' },
});
});

afterEach(async () => {
await app.close();

eventSource.close();
});

it('receives events from server', done => {
eventSource.addEventListener('message', event => {
expect(JSON.parse(event.data)).to.eql({
hello: 'world',
});
done();
});
});
});
});
12 changes: 12 additions & 0 deletions integration/nest-application/sse/src/app.controller.ts
@@ -0,0 +1,12 @@
import { Controller, MessageEvent, Sse } from '@nestjs/common';
import { interval, map, Observable } from 'rxjs';

@Controller()
export class AppController {
@Sse('sse')
sse(): Observable<MessageEvent> {
return interval(1000).pipe(
map(_ => ({ data: { hello: 'world' } } as MessageEvent)),
);
}
}
7 changes: 7 additions & 0 deletions integration/nest-application/sse/src/app.module.ts
@@ -0,0 +1,7 @@
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';

@Module({
controllers: [AppController],
})
export class AppModule {}
23 changes: 23 additions & 0 deletions integration/nest-application/sse/tsconfig.json
@@ -0,0 +1,23 @@
{
"compilerOptions": {
"module": "commonjs",
"declaration": false,
"noImplicitAny": false,
"removeComments": true,
"lib": ["dom"],
"noLib": false,
"emitDecoratorMetadata": true,
"experimentalDecorators": true,
"target": "es6",
"sourceMap": true,
"allowJs": true,
"outDir": "./dist"
},
"include": [
"src/**/*",
"e2e/**/*"
],
"exclude": [
"node_modules"
]
}
Expand Up @@ -25,4 +25,9 @@ export interface NestApplicationOptions extends NestApplicationContextOptions {
* Whether to register the raw request body on the request. Use `req.rawBody`.
*/
rawBody?: boolean;
/**
* Force close open HTTP connections. Useful if restarting your application hangs due to
* keep-alive connections in the HTTP adapter.
*/
forceCloseConnections?: boolean;
}
28 changes: 25 additions & 3 deletions packages/platform-express/adapters/express-adapter.ts
Expand Up @@ -32,7 +32,7 @@ import * as cors from 'cors';
import * as express from 'express';
import * as http from 'http';
import * as https from 'https';
import { pipeline } from 'stream';
import { Duplex, pipeline } from 'stream';
import { ServeStaticOptions } from '../interfaces/serve-static-options.interface';
import { getBodyParserOptions } from './utils/get-body-parser-options.util';

Expand All @@ -48,6 +48,7 @@ type VersionedRoute = <
export class ExpressAdapter extends AbstractHttpAdapter {
private readonly routerMethodFactory = new RouterMethodFactory();
private readonly logger = new Logger(ExpressAdapter.name);
private readonly openConnections = new Set<Duplex>();

constructor(instance?: any) {
super(instance || express());
Expand Down Expand Up @@ -134,6 +135,8 @@ export class ExpressAdapter extends AbstractHttpAdapter {
}

public close() {
this.closeOpenConnections();

if (!this.httpServer) {
return undefined;
}
Expand Down Expand Up @@ -202,9 +205,13 @@ export class ExpressAdapter extends AbstractHttpAdapter {
options.httpsOptions,
this.getInstance(),
);
return;
} else {
this.httpServer = http.createServer(this.getInstance());
}

if (options?.forceCloseConnections) {
this.trackOpenConnections();
}
this.httpServer = http.createServer(this.getInstance());
}

public registerParserMiddleware(prefix?: string, rawBody?: boolean) {
Expand Down Expand Up @@ -377,6 +384,21 @@ export class ExpressAdapter extends AbstractHttpAdapter {
}
}

private trackOpenConnections() {
this.httpServer.on('connection', (socket: Duplex) => {
this.openConnections.add(socket);

socket.on('close', () => this.openConnections.delete(socket));
});
}

private closeOpenConnections() {
for (const socket of this.openConnections) {
socket.destroy();
this.openConnections.delete(socket);
}
}

private isMiddlewareApplied(name: string): boolean {
const app = this.getInstance();
return (
Expand Down
5 changes: 4 additions & 1 deletion sample/28-sse/src/main.ts
Expand Up @@ -2,7 +2,10 @@ import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';

async function bootstrap() {
const app = await NestFactory.create(AppModule);
const app = await NestFactory.create(AppModule, {
forceCloseConnections: true,
});
app.enableShutdownHooks();
await app.listen(3000);
console.log(`Application is running on: ${await app.getUrl()}`);
}
Expand Down