Skip to content

dev5c32373043/nestjs-pulsar

Repository files navigation

Nest Logo

A progressive Node.js framework for building efficient and scalable server-side applications.

Description

Apache Pulsar module for Nest.

Based on https://www.npmjs.com/package/nest-pulsar with dependency updates and small improvements.

Installation

$ npm i @dev5c32373043/nestjs-pulsar

Note

Pulsar Node.js client library is based on the C++ client library. You must install the Pulsar C++ client library before installing a Node.js client. For more details, see pulsar-client-node on GitHub or The Pulsar Node.js client dedicated page on Apache Pulsar documentation.

Getting started

Once the installation process (npm install) is complete, we can import the PulsarModule into the root AppModule.

import { Module } from '@nestjs/common';
import { PulsarModule } from '@dev5c32373043/nestjs-pulsar';

@Module({
  imports: [
    PulsarModule.forRoot({
      serviceUrl: 'pulsar://localhost:6650',
    }),
  ],
})
export class AppModule {}

The forRoot() method supports all the configuration properties exposed by the Client class constructor from the pulsar-client package.

Note

forRoot() inject the Pulsar Client provider globally.

Next, let's look at another module, let's say the UsersModule.

Once the pulsar Client configured. You can inject the needed Producer, Consumer, Reader using the forFeature() method:

import { Module } from '@nestjs/common';
import { PulsarModule, MessageId } from '@dev5c32373043/nestjs-pulsar';

@Module({
  imports: [
    PulsarModule.forFeature('producer', 'my-producer', {
      topic: 'my-topic',
    }),
    PulsarModule.forFeature('consumer', 'my-consumer', {
      topic: 'my-topic',
      subscription: 'my-sub',
    }),
    PulsarModule.forFeature('reader', 'my-reader', {
      topic: 'my-topic',
      startMessageId: MessageId.earliest(),
    }),
  ],
})
export class UsersModule {}

Warning

Producer, consumer, reader name (2nd param) is mandatory. Please note that you shouldn't have multiple producers, consumers or readers with the same name, otherwise they will get overridden.

The forFeature() method third param supports all the configuration properties exposed by the following Pulsar Client factory methods:

  • producer feature configuration object corresponds to client.createProducer() configuration object.
  • consumer feature configuration object corresponds to client.subscribe() configuration object.
  • reader feature configuration object corresponds to client.createReader() configuration object.

This module uses the forFeature() method to define which features (producer, consumer or reader) are registered in the current scope. With that in place, we can inject the Producer, Consumer and Reader Pulsar objects into the UsersService using the @PulsarInject() decorator:

import { Injectable } from '@nestjs/common';
import { InjectPulsar, Producer, Consumer, Reader } from '@dev5c32373043/nestjs-pulsar';

@Injectable()
export class UsersService {
  constructor(
    @InjectPulsar('producer', 'my-producer')
    private readonly producer: Producer,
    @InjectPulsar('consumer', 'my-consumer')
    private readonly consumer: Consumer,
    @InjectPulsar('reader', 'my-reader')
    private readonly reader: Reader,
  ) {}

  async publish(data: any) {
    await this.producer.send({ data: Buffer.from(JSON.stringify(data)) });
  }

  async consume(timeout: number = 1000) {
    const rawMessage = await this.consumer.receive(timeout); // timeout is optional
    const data = JSON.parse(rawMessage.getData().toString());
    await this.consumer.acknowledge(rawMessage);

    return data;
  }

  async read() {
    if (!this.reader.hasNext()) return;

    const rawMessage = await this.reader.readNext();
    const data = JSON.parse(rawMessage.getData().toString());

    return data;
  }
}

If you want to use the producer, consumer or reader outside of the module which imports PulsarModule.forFeature(), you'll need to re-export the providers generated by it. You can do this by exporting the whole module, like this:

import { Module } from '@nestjs/common';
import { PulsarModule } from '@dev5c32373043/nestjs-pulsar';

@Module({
  imports: [
    PulsarModule.forFeature('producer', 'my-producer', {
      topic: 'my-topic',
    }),
    PulsarModule.forFeature('consumer', 'my-consumer', {
      topic: 'my-topic',
      subscription: 'my-sub',
    }),
    PulsarModule.forFeature('reader', 'my-reader', {
      topic: 'my-topic',
    }),
  ],
  exports: [PulsarModule],
})
export class UsersModule {}

Async configuration

You may want to pass your module options asynchronously instead of statically. In this case, use the forRootAsync() method:

import { Module } from '@nestjs/common';
import { PulsarModule } from '@dev5c32373043/nestjs-pulsar';

@Module({
  imports: [
    PulsarModule.forRootAsync({
      useFactory: () => ({
        serviceUrl: 'pulsar://localhost:6650',
      }),
    }),
  ],
})
export class AppModule {}

Our factory behaves like any other asynchronous provider (e.g., it can be async and it's able to inject dependencies through inject):

import { Module } from '@nestjs/common';
import { PulsarModule } from '@dev5c32373043/nestjs-pulsar';

@Module({
  imports: [
    PulsarModule.forRootAsync({
      imports: [ConfigModule],
      inject: [ConfigService],
      useFactory: (config: ConfigService) => ({
        serviceUrl: config.get('SERVICE_URL'),
      }),
    }),
  ],
})
export class AppModule {}

Multiple clients

Some projects require multiple pulsar clients. This can also be achieved with this module. To work with multiple clients, first create the clients. In this case, client naming becomes mandatory.

import { Module } from '@nestjs/common';
import { PulsarModule } from '@dev5c32373043/nestjs-pulsar';

@Module({
  imports: [
    PulsarModule.forRoot({
      serviceUrl: 'pulsar://localhost:6650',
    }),
    PulsarModule.forRoot(
      {
        serviceUrl: 'pulsar://other.client:6650',
      },
      'other-client', // client name
    ),
  ],
})
export class AppModule {}

Warning

If you don't set the name for a client, its name is set to default. Please note that you shouldn't have multiple clients without a name, or with the same name, otherwise they will get overridden.

If you are using PulsarModule.forRootAsync(), you have to also set the client name the same way:

import { Module } from '@nestjs/common';
import { PulsarModule } from '@dev5c32373043/nestjs-pulsar';

@Module({
  imports: [
    PulsarModule.forRootAsync({
      useFactory: () => ({
        serviceUrl: 'pulsar://localhost:6650',
      }),
    }),
    PulsarModule.forRootAsync(
      {
        useFactory: () => ({
          serviceUrl: 'pulsar://other.client:6650',
        }),
      },
      'other-client',
    ),
  ],
})
export class AppModule {}

Testing

When it comes to unit testing an application, we usually want to avoid making a real Pulsar connection, keeping our test suites independent and their execution process as fast as possible. But our classes might depend on producers, consumers or readears that are pulled that are created from the client instance. How do we handle that? The solution is to create mocks. In order to achieve that, we set up custom providers. Each registered producer, consumer or reader is automatically represented by an auto-generated token.

The @dev5c32373043/nestjs-pulsar package exposes the getFeatureToken() function which returns a prepared token based on a given feature type and name.

@Module({
  providers: [
    UsersService,
    {
      provide: getFeatureToken('consumer', 'my-consumer'),
      useValue: mockConsumer,
    },
  ],
})
export class UsersModule {}

Now a substitute mockConsumer will be used as the Consumer named myConsumer. Whenever any class asks for myConsumer using an @PulsarInject() decorator, Nest will use the registered mockConsumer object.

License

Nest Pulsar is MIT licensed.