Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide a pause() helper to eachMessage/eachBatch #1364

Merged
merged 8 commits into from
Jun 28, 2022
34 changes: 31 additions & 3 deletions docs/Consuming.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ The `eachMessage` handler provides a convenient and easy to use API, feeding you

```javascript
await consumer.run({
eachMessage: async ({ topic, partition, message, heartbeat }) => {
eachMessage: async ({ topic, partition, message, heartbeat, pause }) => {
console.log({
key: message.key.toString(),
value: message.value.toString(),
Expand All @@ -53,10 +53,11 @@ await consumer.run({
```

Be aware that the `eachMessage` handler should not block for longer than the configured [session timeout](#options) or else the consumer will be removed from the group. If your workload involves very slow processing times for individual messages then you should either increase the session timeout or make periodic use of the `heartbeat` function exposed in the handler payload.
The `pause` callback function is a convenience for `consumer.pause({ topic, partitions: [partition] })`. Depending on your use case, take care to throw an exception after pausing if you do not want the current message's offset resolved and committed (i.e. if you want to retry the message later).
brianphillips marked this conversation as resolved.
Show resolved Hide resolved

## <a name="each-batch"></a> eachBatch

Some use cases require dealing with batches directly. This handler will feed your function batches and provide some utility functions to give your code more flexibility: `resolveOffset`, `heartbeat`, `commitOffsetsIfNecessary`, `uncommittedOffsets`, `isRunning`, and `isStale`. All resolved offsets will be automatically committed after the function is executed.
Some use cases require dealing with batches directly. This handler will feed your function batches and provide some utility functions to give your code more flexibility: `resolveOffset`, `heartbeat`, `commitOffsetsIfNecessary`, `uncommittedOffsets`, `isRunning`, `isStale`, and `pause`. All resolved offsets will be automatically committed after the function is executed.

> Note: Be aware that using `eachBatch` directly is considered a more advanced use case as compared to using `eachMessage`, since you will have to understand how session timeouts and heartbeats are connected.

Expand All @@ -71,6 +72,7 @@ await consumer.run({
uncommittedOffsets,
isRunning,
isStale,
pause,
}) => {
for (let message of batch.messages) {
console.log({
Expand Down Expand Up @@ -100,6 +102,7 @@ await consumer.run({
* `uncommittedOffsets()` returns all offsets by topic-partition which have not yet been committed.
* `isRunning()` returns true if consumer is in running state, else it returns false.
* `isStale()` returns whether the messages in the batch have been rendered stale through some other operation and should be discarded. For example, when calling [`consumer.seek`](#seek) the messages in the batch should be discarded, as they are not at the offset we seeked to.
* `pause()` can be used to pause the consumer for the current topic/partition. All offsets resolved up to that point will be committed (subject to `eachBatchAutoResolve` and [autoCommit](#auto-commit) settings). If you want to pause in the middle of the batch, you should throw an exception after pausing if you do not wish to resolve the current batch's offset or, alternatively, make sure that `eachBatchAutoResolve` is turned off. You can used the returned callback function to resume processing of the relevant partition and topic (i.e. using `setTimeout` or some other asynchronous trigger). See [Pause & Resume](#pause-resume) for more information about this feature.
brianphillips marked this conversation as resolved.
Show resolved Hide resolved

### Example

Expand Down Expand Up @@ -250,7 +253,7 @@ kafka.consumer({

## <a name="pause-resume"></a> Pause & Resume

In order to pause and resume consuming from one or more topics, the `Consumer` provides the methods `pause` and `resume`. It also provides the `paused` method to get the list of all paused topics. Note that pausing a topic means that it won't be fetched in the next cycle. You may still receive messages for the topic within the current batch.
In order to pause and resume consuming from one or more topics, the `Consumer` provides the methods `pause` and `resume`. It also provides the `paused` method to get the list of all paused topics. Note that pausing a topic means that it won't be fetched in the next cycle and subsequent messages within the current batch won't be passed to an `eachMessage` handler.

Calling `pause` with a topic that the consumer is not subscribed to is a no-op, calling `resume` with a topic that is not paused is also a no-op.

Expand Down Expand Up @@ -304,6 +307,31 @@ consumer.run({
})
```

As a convenience, the `eachMessage` callback provides a simple callback function to pause the specific topic/partition for the message currently being processed. You can also specify a timeout for when that topic/partition will automatically be resumed:
brianphillips marked this conversation as resolved.
Show resolved Hide resolved

```javascript
await consumer.connect()
await consumer.subscribe({ topics: ['jobs'] })

await consumer.run({ eachMessage: async ({ topic, message, pause }) => {
try {
await sendToDependency(message)
} catch (e) {
if (e instanceof TooManyRequestsError) {
const resumeThisPartition = pause()
// Other partitions that are paused will continue to be paused
setTimeout(resumeThisPartition, e.retryAfter * 1000)
}

throw e
}
}})
```

Depending on your use case, be sure to throw an exception after `pause()` is called (as shown above) or the current message's offset will be resolved and committed.
Alternatively, if you want to pause processing of a topic's partition without retrying the current message, no exception needs to be thrown.
In either case, the `eachMessage` callback will not be called again for the current topic/partition until the consumer is resumed for this topic/partition.
brianphillips marked this conversation as resolved.
Show resolved Hide resolved

It's possible to access the list of paused topic partitions using the `paused` method.

```javascript
Expand Down
193 changes: 192 additions & 1 deletion src/consumer/__tests__/pause.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ const {
} = require('testHelpers')

describe('Consumer', () => {
let groupId, producer, consumer, topics
/**
* @type {import('../../../types').Consumer}
*/
let consumer
let groupId, producer, topics

beforeEach(async () => {
topics = [`test-topic-${secureRandom()}`, `test-topic-${secureRandom()}`]
Expand Down Expand Up @@ -64,6 +68,193 @@ describe('Consumer', () => {
)
})

it('pauses the appropriate topic/partition when pausing via the eachMessage callback', async () => {
await consumer.connect()
await producer.connect()
const messages = [0, 0, 1, 0].map(partition => {
const key = secureRandom()
return { key: `key-${key}`, value: `value-${key}`, partition }
})

for (const topic of topics) {
await producer.send({ acks: 1, topic, messages: messages.slice(0, 2) })
await consumer.subscribe({ topic, fromBeginning: true })
}

let shouldPause = true
const messagesConsumed = []
const resumeCallbacks = []
consumer.run({
eachMessage: async event => {
const { topic, message, pause } = event

const whichTopic = topics.indexOf(topic)
const whichMessage = messages.findIndex(m => String(m.key) === String(message.key))

if (shouldPause && whichTopic === 0 && whichMessage === 1) {
resumeCallbacks.push(pause())
throw new Error('bailing out')
}
messagesConsumed.push({
topic: whichTopic,
message: whichMessage,
})
},
})
await waitForConsumerToJoinGroup(consumer)
await waitForMessages(messagesConsumed, { number: 3 })
const [pausedTopic] = topics
expect(consumer.paused()).toEqual([{ topic: pausedTopic, partitions: [0] }])

for (const topic of topics) {
await producer.send({ acks: 1, topic, messages: messages.slice(2) })
}
await waitForMessages(messagesConsumed, { number: 6, delay: 10 })

expect(messagesConsumed).toHaveLength(6)
expect(messagesConsumed).toContainEqual({ topic: 0, message: 0 }) // partition 0
expect(messagesConsumed).toContainEqual({ topic: 0, message: 2 }) // partition 1

expect(messagesConsumed).toContainEqual({ topic: 1, message: 0 }) // partition 0
expect(messagesConsumed).toContainEqual({ topic: 1, message: 1 }) // partition 0
expect(messagesConsumed).toContainEqual({ topic: 1, message: 2 }) // partition 1
expect(messagesConsumed).toContainEqual({ topic: 1, message: 3 }) // partition 0

shouldPause = false
resumeCallbacks.forEach(resume => resume())

await waitForMessages(messagesConsumed, { number: 8 })

// these messages have to wait until the consumer has resumed
expect(messagesConsumed).toHaveLength(8)
expect(messagesConsumed).toContainEqual({ topic: 0, message: 1 }) // partition 0
expect(messagesConsumed).toContainEqual({ topic: 0, message: 3 }) // partition 0
})

it('avoids calling eachMessage again for paused topics/partitions when paused via consumer.pause', async () => {
await consumer.connect()
await producer.connect()
const messages = [0, 0, 1, 0].map(partition => {
const key = secureRandom()
return { key: `key-${key}`, value: `value-${key}`, partition }
})

for (const topic of topics) {
await producer.send({ acks: 1, topic, messages: messages.slice(0, 2) })
await consumer.subscribe({ topic, fromBeginning: true })
}

let shouldPause = true
const messagesConsumed = []
consumer.run({
eachMessage: async event => {
const { topic, message, partition } = event

const whichTopic = topics.indexOf(topic)
const whichMessage = messages.findIndex(m => String(m.key) === String(message.key))

messagesConsumed.push({
topic: whichTopic,
message: whichMessage,
})

// here, we pause after the first message (0) on the first topic (0)
if (shouldPause && whichTopic === 0 && whichMessage === 0) {
consumer.pause([{ topic, partitions: [partition] }])
// we don't throw an exception here to ensure the loop calling us breaks on its own and doesn't call us again
}
},
})
await waitForConsumerToJoinGroup(consumer)
await waitForMessages(messagesConsumed, { number: 3 })
const [pausedTopic] = topics
expect(consumer.paused()).toEqual([{ topic: pausedTopic, partitions: [0] }])

for (const topic of topics) {
await producer.send({ acks: 1, topic, messages: messages.slice(2) })
}
await waitForMessages(messagesConsumed, { number: 6, delay: 10 })

expect(messagesConsumed).toHaveLength(6)
expect(messagesConsumed).toContainEqual({ topic: 0, message: 0 }) // partition 0
expect(messagesConsumed).toContainEqual({ topic: 0, message: 2 }) // partition 1

expect(messagesConsumed).toContainEqual({ topic: 1, message: 0 }) // partition 0
expect(messagesConsumed).toContainEqual({ topic: 1, message: 1 }) // partition 0
expect(messagesConsumed).toContainEqual({ topic: 1, message: 2 }) // partition 1
expect(messagesConsumed).toContainEqual({ topic: 1, message: 3 }) // partition 0

shouldPause = false
consumer.resume(consumer.paused())

await waitForMessages(messagesConsumed, { number: 8 })

// these messages have to wait until the consumer has resumed
expect(messagesConsumed).toHaveLength(8)
expect(messagesConsumed).toContainEqual({ topic: 0, message: 1 }) // partition 0
expect(messagesConsumed).toContainEqual({ topic: 0, message: 3 }) // partition 0
})

it('pauses when pausing via the eachBatch callback', async () => {
await consumer.connect()
await producer.connect()
const originalMessages = [0, 0, 0, 1].map(partition => {
const key = secureRandom()
return { key: `key-${key}`, value: `value-${key}`, partition }
})

for (const topic of topics) {
await producer.send({ acks: 1, topic, messages: originalMessages })
await consumer.subscribe({ topic, fromBeginning: true })
}

let shouldPause = true
const messagesConsumed = []
const resumeCallbacks = []
consumer.run({
eachBatch: async event => {
const {
batch: { topic, messages },
pause,
resolveOffset,
commitOffsetsIfNecessary,
} = event
messages.every(message => {
const whichTopic = topics.indexOf(topic)
const whichMessage = originalMessages.findIndex(
m => String(m.key) === String(message.key)
)

if (shouldPause && whichTopic === 0 && whichMessage === 1) {
resumeCallbacks.push(pause())
return false
} else if (shouldPause && whichTopic === 1 && whichMessage === 3) {
resumeCallbacks.push(pause())
return false
}
messagesConsumed.push({
topic: whichTopic,
message: whichMessage,
})
resolveOffset(message.offset)
return true
})
await commitOffsetsIfNecessary()
},
eachBatchAutoResolve: false,
})
await waitForConsumerToJoinGroup(consumer)
await waitForMessages(messagesConsumed, { number: 5 })
expect(consumer.paused()).toContainEqual({ topic: topics[0], partitions: [0] })
expect(consumer.paused()).toContainEqual({ topic: topics[1], partitions: [1] })
shouldPause = false
resumeCallbacks.forEach(resume => resume())
await waitForMessages(messagesConsumed, { number: 8 })
expect(consumer.paused()).toEqual([])
expect(messagesConsumed).toContainEqual({ topic: 0, message: 1 })
expect(messagesConsumed).toContainEqual({ topic: 1, message: 3 })
})

it('does not fetch messages for the paused topic', async () => {
await consumer.connect()
await producer.connect()
Expand Down
1 change: 1 addition & 0 deletions src/consumer/__tests__/runner.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ describe('Consumer > Runner', () => {
heartbeat: jest.fn(),
assigned: jest.fn(() => []),
isLeader: jest.fn(() => true),
isPaused: jest.fn().mockReturnValue(false),
}
instrumentationEmitter = new InstrumentationEventEmitter()

Expand Down
9 changes: 9 additions & 0 deletions src/consumer/consumerGroup.js
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,15 @@ module.exports = class ConsumerGroup {
return this.subscriptionState.paused()
}

/**
* @param {string} topic
* @param {string} partition
* @returns {boolean} whether the specified topic/partition are paused or not
*/
isPaused(topic, partition) {
return this.subscriptionState.isPaused(topic, partition)
}

async commitOffsetsIfNecessary() {
await this.offsetManager.commitOffsetsIfNecessary()
}
Expand Down
28 changes: 28 additions & 0 deletions src/consumer/runner.js
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ module.exports = class Runner extends EventEmitter {
async processEachMessage(batch) {
const { topic, partition } = batch

const pause = () => {
this.consumerGroup.pause([{ topic, partitions: [partition] }])
return () => this.consumerGroup.resume([{ topic, partitions: [partition] }])
}
for (const message of batch.messages) {
if (!this.running || this.consumerGroup.hasSeekOffset({ topic, partition })) {
break
Expand All @@ -193,6 +197,7 @@ module.exports = class Runner extends EventEmitter {
partition,
message,
heartbeat: () => this.heartbeat(),
pause,
})
} catch (e) {
if (!isKafkaJSError(e)) {
Expand All @@ -213,13 +218,22 @@ module.exports = class Runner extends EventEmitter {
this.consumerGroup.resolveOffset({ topic, partition, offset: message.offset })
await this.heartbeat()
await this.autoCommitOffsetsIfNecessary()

if (this.consumerGroup.isPaused(topic, partition)) {
break
}
}
}

async processEachBatch(batch) {
const { topic, partition } = batch
const lastFilteredMessage = batch.messages[batch.messages.length - 1]

const pause = () => {
this.consumerGroup.pause([{ topic, partitions: [partition] }])
return () => this.consumerGroup.resume([{ topic, partitions: [partition] }])
}

try {
await this.eachBatch({
batch,
Expand All @@ -245,6 +259,10 @@ module.exports = class Runner extends EventEmitter {
this.consumerGroup.resolveOffset({ topic, partition, offset: offsetToResolve })
},
heartbeat: () => this.heartbeat(),
/**
* Pause consumption for the current topic/partition being processed
*/
pause,
/**
* Commit offsets if provided. Otherwise commit most recent resolved offsets
* if the autoCommit conditions are met.
Expand Down Expand Up @@ -416,6 +434,16 @@ module.exports = class Runner extends EventEmitter {
})
return
}
if (this.consumerGroup.isPaused(batch.topic, batch.partition)) {
brianphillips marked this conversation as resolved.
Show resolved Hide resolved
this.logger.debug('topic/partition paused, will not retry', {
error: e.message,
groupId: this.consumerGroup.groupId,
memberId: this.consumerGroup.memberId,
topic: batch.topic,
partition: batch.partition,
})
return
}

if (
isRebalancing(e) ||
Expand Down
2 changes: 2 additions & 0 deletions types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -899,12 +899,14 @@ export interface EachMessagePayload {
partition: number
message: KafkaMessage
heartbeat(): Promise<void>
pause(): () => void
}

export interface EachBatchPayload {
batch: Batch
resolveOffset(offset: string): void
heartbeat(): Promise<void>
pause(): () => void
commitOffsetsIfNecessary(offsets?: Offsets): Promise<void>
uncommittedOffsets(): OffsetsByTopicPartition
isRunning(): boolean
Expand Down