Skip to content

Commit

Permalink
Merge pull request #10345 from tolgap/fix/9517-keep-alive-connections…
Browse files Browse the repository at this point in the history
…-blocking

fix(platform): shutdown hooks not firing caused by open http connections
  • Loading branch information
kamilmysliwiec committed Nov 7, 2022
2 parents dc2582a + 1f376fe commit fc6b34b
Show file tree
Hide file tree
Showing 8 changed files with 241 additions and 4 deletions.
79 changes: 79 additions & 0 deletions integration/nest-application/sse/e2e/express.spec.ts
@@ -0,0 +1,79 @@
import { NestExpressApplication } from '@nestjs/platform-express';
import { Test } from '@nestjs/testing';
import { expect } from 'chai';
import * as EventSource from 'eventsource';
import { AppModule } from '../src/app.module';

describe('Sse (Express Application)', () => {
let app: NestExpressApplication;
let eventSource: EventSource;

describe('without forceCloseConnections', () => {
beforeEach(async () => {
const moduleFixture = await Test.createTestingModule({
imports: [AppModule],
}).compile();

app = moduleFixture.createNestApplication<NestExpressApplication>();

await app.listen(3000);
const url = await app.getUrl();

eventSource = new EventSource(url + '/sse', {
headers: { connection: 'keep-alive' },
});
});

// The order of actions is very important here. When not using `forceCloseConnections`,
// the SSe eventsource should close the connections in order to signal the server that
// the keep-alive connection can be ended.
afterEach(async () => {
eventSource.close();

await app.close();
});

it('receives events from server', done => {
eventSource.addEventListener('message', event => {
expect(JSON.parse(event.data)).to.eql({
hello: 'world',
});
done();
});
});
});

describe('with forceCloseConnections', () => {
beforeEach(async () => {
const moduleFixture = await Test.createTestingModule({
imports: [AppModule],
}).compile();

app = moduleFixture.createNestApplication<NestExpressApplication>({
forceCloseConnections: true,
});

await app.listen(3000);
const url = await app.getUrl();

eventSource = new EventSource(url + '/sse', {
headers: { connection: 'keep-alive' },
});
});

afterEach(async () => {
await app.close();

eventSource.close();
});

it('receives events from server', done => {
eventSource.addEventListener('message', event => {
expect(JSON.parse(event.data)).to.eql({
hello: 'world',
});
done();
});
});
});
});
86 changes: 86 additions & 0 deletions integration/nest-application/sse/e2e/fastify.spec.ts
@@ -0,0 +1,86 @@
import {
FastifyAdapter,
NestFastifyApplication,
} from '@nestjs/platform-fastify';
import { Test } from '@nestjs/testing';
import { expect } from 'chai';
import * as EventSource from 'eventsource';
import { AppModule } from '../src/app.module';

describe('Sse (Fastify Application)', () => {
let app: NestFastifyApplication;
let eventSource: EventSource;

describe('without forceCloseConnections', () => {
beforeEach(async () => {
const moduleFixture = await Test.createTestingModule({
imports: [AppModule],
}).compile();

app = moduleFixture.createNestApplication<NestFastifyApplication>(
new FastifyAdapter(),
);

await app.listen(3000);
const url = await app.getUrl();

eventSource = new EventSource(url + '/sse', {
headers: { connection: 'keep-alive' },
});
});

// The order of actions is very important here. When not using `forceCloseConnections`,
// the SSe eventsource should close the connections in order to signal the server that
// the keep-alive connection can be ended.
afterEach(async () => {
eventSource.close();

await app.close();
});

it('receives events from server', done => {
eventSource.addEventListener('message', event => {
expect(JSON.parse(event.data)).to.eql({
hello: 'world',
});
done();
});
});
});

describe('with forceCloseConnections', () => {
beforeEach(async () => {
const moduleFixture = await Test.createTestingModule({
imports: [AppModule],
}).compile();

app = moduleFixture.createNestApplication<NestFastifyApplication>(
new FastifyAdapter({
forceCloseConnections: true,
}),
);

await app.listen(3000);
const url = await app.getUrl();

eventSource = new EventSource(url + '/sse', {
headers: { connection: 'keep-alive' },
});
});

afterEach(async () => {
await app.close();

eventSource.close();
});

it('receives events from server', done => {
eventSource.addEventListener('message', event => {
expect(JSON.parse(event.data)).to.eql({
hello: 'world',
});
done();
});
});
});
});
12 changes: 12 additions & 0 deletions integration/nest-application/sse/src/app.controller.ts
@@ -0,0 +1,12 @@
import { Controller, MessageEvent, Sse } from '@nestjs/common';
import { interval, map, Observable } from 'rxjs';

@Controller()
export class AppController {
@Sse('sse')
sse(): Observable<MessageEvent> {
return interval(1000).pipe(
map(_ => ({ data: { hello: 'world' } } as MessageEvent)),
);
}
}
7 changes: 7 additions & 0 deletions integration/nest-application/sse/src/app.module.ts
@@ -0,0 +1,7 @@
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';

@Module({
controllers: [AppController],
})
export class AppModule {}
23 changes: 23 additions & 0 deletions integration/nest-application/sse/tsconfig.json
@@ -0,0 +1,23 @@
{
"compilerOptions": {
"module": "commonjs",
"declaration": false,
"noImplicitAny": false,
"removeComments": true,
"lib": ["dom"],
"noLib": false,
"emitDecoratorMetadata": true,
"experimentalDecorators": true,
"target": "es6",
"sourceMap": true,
"allowJs": true,
"outDir": "./dist"
},
"include": [
"src/**/*",
"e2e/**/*"
],
"exclude": [
"node_modules"
]
}
Expand Up @@ -25,4 +25,9 @@ export interface NestApplicationOptions extends NestApplicationContextOptions {
* Whether to register the raw request body on the request. Use `req.rawBody`.
*/
rawBody?: boolean;
/**
* Force close open HTTP connections. Useful if restarting your application hangs due to
* keep-alive connections in the HTTP adapter.
*/
forceCloseConnections?: boolean;
}
28 changes: 25 additions & 3 deletions packages/platform-express/adapters/express-adapter.ts
Expand Up @@ -33,7 +33,7 @@ import * as cors from 'cors';
import * as express from 'express';
import * as http from 'http';
import * as https from 'https';
import { pipeline } from 'stream';
import { Duplex, pipeline } from 'stream';
import { ServeStaticOptions } from '../interfaces/serve-static-options.interface';
import { getBodyParserOptions } from './utils/get-body-parser-options.util';

Expand All @@ -49,6 +49,7 @@ type VersionedRoute = <
export class ExpressAdapter extends AbstractHttpAdapter {
private readonly routerMethodFactory = new RouterMethodFactory();
private readonly logger = new Logger(ExpressAdapter.name);
private readonly openConnections = new Set<Duplex>();

constructor(instance?: any) {
super(instance || express());
Expand Down Expand Up @@ -139,6 +140,8 @@ export class ExpressAdapter extends AbstractHttpAdapter {
}

public close() {
this.closeOpenConnections();

if (!this.httpServer) {
return undefined;
}
Expand Down Expand Up @@ -207,9 +210,13 @@ export class ExpressAdapter extends AbstractHttpAdapter {
options.httpsOptions,
this.getInstance(),
);
return;
} else {
this.httpServer = http.createServer(this.getInstance());
}

if (options?.forceCloseConnections) {
this.trackOpenConnections();
}
this.httpServer = http.createServer(this.getInstance());
}

public registerParserMiddleware(prefix?: string, rawBody?: boolean) {
Expand Down Expand Up @@ -382,6 +389,21 @@ export class ExpressAdapter extends AbstractHttpAdapter {
}
}

private trackOpenConnections() {
this.httpServer.on('connection', (socket: Duplex) => {
this.openConnections.add(socket);

socket.on('close', () => this.openConnections.delete(socket));
});
}

private closeOpenConnections() {
for (const socket of this.openConnections) {
socket.destroy();
this.openConnections.delete(socket);
}
}

private isMiddlewareApplied(name: string): boolean {
const app = this.getInstance();
return (
Expand Down
5 changes: 4 additions & 1 deletion sample/28-sse/src/main.ts
Expand Up @@ -2,7 +2,10 @@ import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';

async function bootstrap() {
const app = await NestFactory.create(AppModule);
const app = await NestFactory.create(AppModule, {
forceCloseConnections: true,
});
app.enableShutdownHooks();
await app.listen(3000);
console.log(`Application is running on: ${await app.getUrl()}`);
}
Expand Down

0 comments on commit fc6b34b

Please sign in to comment.