diff --git a/integration/nest-application/sse/e2e/express.spec.ts b/integration/nest-application/sse/e2e/express.spec.ts new file mode 100644 index 00000000000..363dc807f42 --- /dev/null +++ b/integration/nest-application/sse/e2e/express.spec.ts @@ -0,0 +1,79 @@ +import { NestExpressApplication } from '@nestjs/platform-express'; +import { Test } from '@nestjs/testing'; +import { expect } from 'chai'; +import * as EventSource from 'eventsource'; +import { AppModule } from '../src/app.module'; + +describe('Sse (Express Application)', () => { + let app: NestExpressApplication; + let eventSource: EventSource; + + describe('without forceCloseConnections', () => { + beforeEach(async () => { + const moduleFixture = await Test.createTestingModule({ + imports: [AppModule], + }).compile(); + + app = moduleFixture.createNestApplication(); + + await app.listen(3000); + const url = await app.getUrl(); + + eventSource = new EventSource(url + '/sse', { + headers: { connection: 'keep-alive' }, + }); + }); + + // The order of actions is very important here. When not using `forceCloseConnections`, + // the SSe eventsource should close the connections in order to signal the server that + // the keep-alive connection can be ended. + afterEach(async () => { + eventSource.close(); + + await app.close(); + }); + + it('receives events from server', done => { + eventSource.addEventListener('message', event => { + expect(JSON.parse(event.data)).to.eql({ + hello: 'world', + }); + done(); + }); + }); + }); + + describe('with forceCloseConnections', () => { + beforeEach(async () => { + const moduleFixture = await Test.createTestingModule({ + imports: [AppModule], + }).compile(); + + app = moduleFixture.createNestApplication({ + forceCloseConnections: true, + }); + + await app.listen(3000); + const url = await app.getUrl(); + + eventSource = new EventSource(url + '/sse', { + headers: { connection: 'keep-alive' }, + }); + }); + + afterEach(async () => { + await app.close(); + + eventSource.close(); + }); + + it('receives events from server', done => { + eventSource.addEventListener('message', event => { + expect(JSON.parse(event.data)).to.eql({ + hello: 'world', + }); + done(); + }); + }); + }); +}); diff --git a/integration/nest-application/sse/e2e/fastify.spec.ts b/integration/nest-application/sse/e2e/fastify.spec.ts new file mode 100644 index 00000000000..4fc1ebf2029 --- /dev/null +++ b/integration/nest-application/sse/e2e/fastify.spec.ts @@ -0,0 +1,86 @@ +import { + FastifyAdapter, + NestFastifyApplication, +} from '@nestjs/platform-fastify'; +import { Test } from '@nestjs/testing'; +import { expect } from 'chai'; +import * as EventSource from 'eventsource'; +import { AppModule } from '../src/app.module'; + +describe('Sse (Fastify Application)', () => { + let app: NestFastifyApplication; + let eventSource: EventSource; + + describe('without forceCloseConnections', () => { + beforeEach(async () => { + const moduleFixture = await Test.createTestingModule({ + imports: [AppModule], + }).compile(); + + app = moduleFixture.createNestApplication( + new FastifyAdapter(), + ); + + await app.listen(3000); + const url = await app.getUrl(); + + eventSource = new EventSource(url + '/sse', { + headers: { connection: 'keep-alive' }, + }); + }); + + // The order of actions is very important here. When not using `forceCloseConnections`, + // the SSe eventsource should close the connections in order to signal the server that + // the keep-alive connection can be ended. + afterEach(async () => { + eventSource.close(); + + await app.close(); + }); + + it('receives events from server', done => { + eventSource.addEventListener('message', event => { + expect(JSON.parse(event.data)).to.eql({ + hello: 'world', + }); + done(); + }); + }); + }); + + describe('with forceCloseConnections', () => { + beforeEach(async () => { + const moduleFixture = await Test.createTestingModule({ + imports: [AppModule], + }).compile(); + + app = moduleFixture.createNestApplication( + new FastifyAdapter({ + forceCloseConnections: true, + }), + ); + + await app.listen(3000); + const url = await app.getUrl(); + + eventSource = new EventSource(url + '/sse', { + headers: { connection: 'keep-alive' }, + }); + }); + + afterEach(async () => { + await app.close(); + + eventSource.close(); + }); + + it('receives events from server', done => { + eventSource.addEventListener('message', event => { + expect(JSON.parse(event.data)).to.eql({ + hello: 'world', + }); + done(); + }); + }); + }); +}); diff --git a/integration/nest-application/sse/src/app.controller.ts b/integration/nest-application/sse/src/app.controller.ts new file mode 100644 index 00000000000..2e36fa3d3df --- /dev/null +++ b/integration/nest-application/sse/src/app.controller.ts @@ -0,0 +1,12 @@ +import { Controller, MessageEvent, Sse } from '@nestjs/common'; +import { interval, map, Observable } from 'rxjs'; + +@Controller() +export class AppController { + @Sse('sse') + sse(): Observable { + return interval(1000).pipe( + map(_ => ({ data: { hello: 'world' } } as MessageEvent)), + ); + } +} diff --git a/integration/nest-application/sse/src/app.module.ts b/integration/nest-application/sse/src/app.module.ts new file mode 100644 index 00000000000..848d4aaa7fe --- /dev/null +++ b/integration/nest-application/sse/src/app.module.ts @@ -0,0 +1,7 @@ +import { Module } from '@nestjs/common'; +import { AppController } from './app.controller'; + +@Module({ + controllers: [AppController], +}) +export class AppModule {} diff --git a/integration/nest-application/sse/tsconfig.json b/integration/nest-application/sse/tsconfig.json new file mode 100644 index 00000000000..261f9bff228 --- /dev/null +++ b/integration/nest-application/sse/tsconfig.json @@ -0,0 +1,23 @@ +{ + "compilerOptions": { + "module": "commonjs", + "declaration": false, + "noImplicitAny": false, + "removeComments": true, + "lib": ["dom"], + "noLib": false, + "emitDecoratorMetadata": true, + "experimentalDecorators": true, + "target": "es6", + "sourceMap": true, + "allowJs": true, + "outDir": "./dist" + }, + "include": [ + "src/**/*", + "e2e/**/*" + ], + "exclude": [ + "node_modules" + ] +} diff --git a/packages/common/interfaces/nest-application-options.interface.ts b/packages/common/interfaces/nest-application-options.interface.ts index 5c67cd9d2dc..5d4c89cbc38 100644 --- a/packages/common/interfaces/nest-application-options.interface.ts +++ b/packages/common/interfaces/nest-application-options.interface.ts @@ -25,4 +25,9 @@ export interface NestApplicationOptions extends NestApplicationContextOptions { * Whether to register the raw request body on the request. Use `req.rawBody`. */ rawBody?: boolean; + /** + * Force close open HTTP connections. Useful if restarting your application hangs due to + * keep-alive connections in the HTTP adapter. + */ + forceCloseConnections?: boolean; } diff --git a/packages/platform-express/adapters/express-adapter.ts b/packages/platform-express/adapters/express-adapter.ts index 718a3f4fd13..9d02e8b1fa6 100644 --- a/packages/platform-express/adapters/express-adapter.ts +++ b/packages/platform-express/adapters/express-adapter.ts @@ -33,7 +33,7 @@ import * as cors from 'cors'; import * as express from 'express'; import * as http from 'http'; import * as https from 'https'; -import { pipeline } from 'stream'; +import { Duplex, pipeline } from 'stream'; import { ServeStaticOptions } from '../interfaces/serve-static-options.interface'; import { getBodyParserOptions } from './utils/get-body-parser-options.util'; @@ -49,6 +49,7 @@ type VersionedRoute = < export class ExpressAdapter extends AbstractHttpAdapter { private readonly routerMethodFactory = new RouterMethodFactory(); private readonly logger = new Logger(ExpressAdapter.name); + private readonly openConnections = new Set(); constructor(instance?: any) { super(instance || express()); @@ -139,6 +140,8 @@ export class ExpressAdapter extends AbstractHttpAdapter { } public close() { + this.closeOpenConnections(); + if (!this.httpServer) { return undefined; } @@ -207,9 +210,13 @@ export class ExpressAdapter extends AbstractHttpAdapter { options.httpsOptions, this.getInstance(), ); - return; + } else { + this.httpServer = http.createServer(this.getInstance()); + } + + if (options?.forceCloseConnections) { + this.trackOpenConnections(); } - this.httpServer = http.createServer(this.getInstance()); } public registerParserMiddleware(prefix?: string, rawBody?: boolean) { @@ -382,6 +389,21 @@ export class ExpressAdapter extends AbstractHttpAdapter { } } + private trackOpenConnections() { + this.httpServer.on('connection', (socket: Duplex) => { + this.openConnections.add(socket); + + socket.on('close', () => this.openConnections.delete(socket)); + }); + } + + private closeOpenConnections() { + for (const socket of this.openConnections) { + socket.destroy(); + this.openConnections.delete(socket); + } + } + private isMiddlewareApplied(name: string): boolean { const app = this.getInstance(); return ( diff --git a/sample/28-sse/src/main.ts b/sample/28-sse/src/main.ts index e1e10d96995..4d8008da574 100644 --- a/sample/28-sse/src/main.ts +++ b/sample/28-sse/src/main.ts @@ -2,7 +2,10 @@ import { NestFactory } from '@nestjs/core'; import { AppModule } from './app.module'; async function bootstrap() { - const app = await NestFactory.create(AppModule); + const app = await NestFactory.create(AppModule, { + forceCloseConnections: true, + }); + app.enableShutdownHooks(); await app.listen(3000); console.log(`Application is running on: ${await app.getUrl()}`); }