Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaJS v2 #4427

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions CODEOWNERS
Validating CODEOWNERS rules …
Expand Up @@ -23,3 +23,5 @@

/definitions/npm/sequelize_*/**/* @jedwards1211

/definitions/npm/kafkajs_*/**/* @edoardo-bluframe
/definitions/npm/react-helmet_v6*/**/* @edoardo-bluframe
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check out this to add code owners in the respective definition instead for the service to do it's thing

143 changes: 143 additions & 0 deletions definitions/npm/kafkajs_v2.x.x/flow_v0.83.x-/kafkajs_v2.x.x.js
@@ -0,0 +1,143 @@
// @flow

declare module "kafkajs" {
declare type KafkaOptions = {|
brokers: string[],
clientId?: string,
connectionTimeout?: number,
logLevel?: number,
retry?: {|
initialRetryTime?: number,
maxRetryTime?: number,
retries?: number
|},
sasl?: {|
mechanism: string,
password: string,
username: string
|},
+ssl?:
| boolean
| {|
ca?: string | string[],
cert?: string,
key?: string,
passphrase?: string,
pfx?: string | Buffer,
rejectUnauthorized?: boolean,
secureProtocol?: string,
servername?: string
|}
|}

declare type ConsumerOptions = {|
+groupId: string,
+sessionTimeout?: number,
+heartbeatInterval?: number,
+maxBytesPerPartition?: number,
+minBytes?: number,
+maxBytes?: number,
+maxWaitTimeInMs?: number,
+rebalanceTimeout?: number
|}

declare type Message = {|
+key?: string,
+value: string,
+timestamp?: string
|}

declare type ConsumerEachMessageOptions = {|
+message: Message,
+partition: number,
+topic: string
|}

declare export type ConsumerRunOptions = {|
+eachMessage: (options: ConsumerEachMessageOptions) => Promise<void>
|}

declare export type ConsumerSubscribeOptions = {|
+fromBeginning?: boolean,
+topic: string
|}

declare export interface ConsumerInterface {
connect(): Promise<void>;
disconnect(): Promise<void>;
run(options: ConsumerRunOptions): Promise<void>;
subscribe(options: ConsumerSubscribeOptions): Promise<void>;
}

declare export class Consumer implements ConsumerInterface {
constructor(options: ConsumerOptions): this;

connect: () => Promise<void>;
disconnect: () => Promise<void>;
run: (options: ConsumerRunOptions) => Promise<void>;
subscribe: (options: ConsumerSubscribeOptions) => Promise<void>;
}

declare type ProducerOptions = {|
+allowAutoTopicCreation?: boolean,
+createPartitioner?: Partitioner,
+idempotent?: boolean,
+maxQueueSize?: number,
+retry?: {|
retries?: number
|}
|}

declare export type ProducerSendOptions = {|
+messages: Message[],
+topic: string
|}

declare export interface ProducerInterface {
connect(): Promise<void>;
disconnect(): Promise<void>;
send(options: ProducerSendOptions): Promise<void>;
}

declare export class Producer implements ProducerInterface {
constructor(options: ProducerOptions): this;

connect: () => Promise<void>;
disconnect: () => Promise<void>;
send: (options: ProducerSendOptions) => Promise<void>;
}

declare export class Kafka {
constructor(options: KafkaOptions): this;

consumer: (options: ConsumerOptions) => Consumer;
producer: (options: ProducerOptions) => Producer;
}

declare export type Partitioner = (args: {|
+topic: string,
+partitionMetadata: Array<{|
+partitionId: number,
+leader: number,
+replicas: number[],
+isr: number[],
+isPreferredLeader: boolean,
+isUnderReplicated: boolean
|}>,
+message: Message
|}) => number

declare export var Partitioners: {|
+DefaultPartitioner: Partitioner,
+RoundRobinPartitioner: Partitioner,
+RandomPartitioner: Partitioner
|}

declare export var logLevel: {|
+NOTHING: number,
+ERROR: number,
+WARN: number,
+INFO: number,
+DEBUG: number
|}
}
87 changes: 87 additions & 0 deletions definitions/npm/kafkajs_v2.x.x/test_kafkajs_v2.x.x.js
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definition itself looks good, I can't fault it's structure.

Regarding tests though, I see that you've approached this with use cases in mind which is good, shows that it works as you expect. But the core thing about tests is that you're trying to exercise the definition to prove 2 things, what you've written is actually valid and if someone were to change your definition it would break expectedly.

You'll need some // $FlowExpectedError samples in some places, the fact that you don't have any begs the question, how do we know that your types are actually strict and that they're not just any and you can actually call any function against it? Here is an example of what that might look like, though you don't need to go to the extreme levels of testing everything as I understand it can be tedious, but enough testing so we don't assume a type can be used in the wrong way.

If a nutshell, if you have a block that proves it does this you should also have another block similar that states it shouldn't be able to do this

@@ -0,0 +1,87 @@
// @flow
import { describe, it } from "flow-typed-test";
import { Kafka, Producer, Consumer, Partitioners, logLevel } from "kafkajs";

describe("kafkajs", () => {
it("creates Kafka instance", () => {
const kafka = new Kafka({
clientId: "my-app",
brokers: ["kafka1:9092", "kafka2:9092"],
});
});

it("creates Consumer instance", () => {
const kafka = new Kafka({
clientId: "my-app",
brokers: ["kafka1:9092", "kafka2:9092"],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestions for the constructor, you might want to exercise the options,

  • can it accept all the expected options
  • what if you pass in the invalid values for the option
  • what if you pass in an option that doesn't exist?

Then once constructed with kafka returned to you, does it has the functions and properties you expect but at the same time, does it do things you don't expect?

});

const consumer: Consumer = kafka.consumer({ groupId: "test-group" });
});

it("creates Producer instance", () => {
const kafka = new Kafka({
clientId: "my-app",
brokers: ["kafka1:9092", "kafka2:9092"],
});

const producer: Producer = kafka.producer();
});

it("uses Consumer methods", async () => {
const kafka = new Kafka({
clientId: "my-app",
brokers: ["kafka1:9092", "kafka2:9092"],
});
const consumer: Consumer = kafka.consumer({ groupId: "test-group" });

await consumer.connect();
await consumer.subscribe({ topic: "test-topic", fromBeginning: true });

await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log(`Received message: ${message.value}`);
},
});

await consumer.disconnect();
});

it("uses Producer methods", async () => {
const kafka = new Kafka({
clientId: "my-app",
brokers: ["kafka1:9092", "kafka2:9092"],
});
const producer: Producer = kafka.producer();

const message: Message = {
key: "key",
value: "value",
timestamp: "timestamp",
};

await producer.connect();
await producer.send({
topic: "test-topic",
messages: [message],
});

await producer.disconnect();
});

it("uses partitioners", () => {
const defaultPartitioner: Partitioner = Partitioners.DefaultPartitioner;
const roundRobinPartitioner: Partitioner = Partitioners.RoundRobinPartitioner;
const randomPartitioner: Partitioner = Partitioners.RandomPartitioner;
});

it("uses log levels", () => {
const logLevels = [
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might better test the expectation, your current code block doesn't actually test anything at all

Suggested change
const logLevels = [
const logLevels: number[] = [

logLevel.NOTHING,
logLevel.ERROR,
logLevel.WARN,
logLevel.INFO,
logLevel.DEBUG,
];
});
});