Skip to content

Commit

Permalink
Rework slightly to not make use of timers or special exception classes
Browse files Browse the repository at this point in the history
  • Loading branch information
brianphillips committed Jun 1, 2022
1 parent e092c15 commit 96ad17c
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 102 deletions.
15 changes: 11 additions & 4 deletions docs/Consuming.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ The `eachMessage` handler provides a convenient and easy to use API, feeding you

```javascript
await consumer.run({
eachMessage: async ({ topic, partition, message, heartbeat }) => {
eachMessage: async ({ topic, partition, message, heartbeat, pause }) => {
console.log({
key: message.key.toString(),
value: message.value.toString(),
Expand All @@ -53,6 +53,7 @@ await consumer.run({
```

Be aware that the `eachMessage` handler should not block for longer than the configured [session timeout](#options) or else the consumer will be removed from the group. If your workload involves very slow processing times for individual messages then you should either increase the session timeout or make periodic use of the `heartbeat` function exposed in the handler payload.
The `pause` callback function is a convenience for `consumer.pause({ topic, partitions: [partition] })`. Depending on your use case, take care to throw an exception after pausing if you do not want the current message's offset resolved and committed (i.e. if you want to retry the message later).

## <a name="each-batch"></a> eachBatch

Expand Down Expand Up @@ -101,7 +102,7 @@ await consumer.run({
* `uncommittedOffsets()` returns all offsets by topic-partition which have not yet been committed.
* `isRunning()` returns true if consumer is in running state, else it returns false.
* `isStale()` returns whether the messages in the batch have been rendered stale through some other operation and should be discarded. For example, when calling [`consumer.seek`](#seek) the messages in the batch should be discarded, as they are not at the offset we seeked to.
* `pause(optionalTimeout)` can be used to pause the consumer for the current topic/partition for an optional period of time. All offsets resolved up to that point will be committed (subject to [autoCommit](#auto-commit) settings) and processing of this batch will be aborted. If no timeout is specified, the consumer will be paused indefinitely until `consumer.resume()` is called with the appropriate topic/partition. See [Pause & Resyume](#pause-resume) for more information about this feature.
* `pause()` can be used to pause the consumer for the current topic/partition. All offsets resolved up to that point will be committed (subject to `eachBatchAutoResolve` and [autoCommit](#auto-commit) settings). If you want to pause in the middle of the batch, you should throw an exception if you do not wish to resolve the current batch's offset or, alternatively, make sure that `eachBatchAutoResolve` is turned off. You can used the returned callback function to resume processing of the relevant partition and topic (i.e. using `setTimeout` or some other asynchronous trigger). See [Pause & Resume](#pause-resume) for more information about this feature.

### Example

Expand Down Expand Up @@ -306,7 +307,7 @@ consumer.run({
})
```

As a convenience, the `eachMessage` callback provides an easy means to pause the specific topic/partition for the message currently being processed. You can also specify a timeout for when that topic/partition will automatically be resumed:
As a convenience, the `eachMessage` callback provides a simple callback function to pause the specific topic/partition for the message currently being processed. You can also specify a timeout for when that topic/partition will automatically be resumed:

```javascript
await consumer.connect()
Expand All @@ -317,14 +318,20 @@ await consumer.run({ eachMessage: async ({ topic, message, pause }) => {
await sendToDependency(message)
} catch (e) {
if (e instanceof TooManyRequestsError) {
pause(e.retryAfter * 1000) // returns control to KafkaJS until timeout has expired
const resumeThisPartition = pause()
// Other partitions that are paused will continue to be paused
setTimeout(resumeThisPartition, e.retryAfter * 1000)
}

throw e
}
}})
```

Depending on your use case, be sure to throw an exception after `pause()` is called (as shown above) or the current message's offset will be resolved and committed.
Alternatively, if you want to pause processing of a topic's partition without retrying the current message, no exception needs to be thrown.
In either case, the `eachMessage` callback will not be called for the current topic/partition until the consumer is resumed.

It's possible to access the list of paused topic partitions using the `paused` method.

```javascript
Expand Down
91 changes: 33 additions & 58 deletions src/consumer/__tests__/pause.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ describe('Consumer', () => {
it('pauses the appropriate topic/partition when pausing via the eachMessage callback', async () => {
await consumer.connect()
await producer.connect()
const messages = [0, 0, 1, 0].map(partition => {
const messages = [0, 0, 1, 0].map((partition, i) => {
const key = secureRandom()
return { key: `key-${key}`, value: `value-${key}`, partition }
return { key: `key-${i}-${key}`, value: `message-${i}-partition-${partition}`, partition }
})

for (const topic of topics) {
Expand All @@ -83,11 +83,13 @@ describe('Consumer', () => {

let shouldPause = true
const messagesConsumed = []
const resumeCallbacks = []
consumer.run({
eachMessage: async event => {
const { topic, message, pause, partition } = event
if (shouldPause && topic === topics[0] && String(message.key) === messages[1].key) {
pause()
resumeCallbacks.push(pause())
throw new Error('bailing out')
}
messagesConsumed.push({
topic,
Expand All @@ -98,14 +100,14 @@ describe('Consumer', () => {
},
})
await waitForConsumerToJoinGroup(consumer)
await waitForMessages(messagesConsumed, { number: 3 })
await waitForMessages(messagesConsumed, { number: 3, delay: 10 })
const [pausedTopic, activeTopic] = topics
expect(consumer.paused()).toEqual([{ topic: pausedTopic, partitions: [0] }])

for (const topic of topics) {
await producer.send({ acks: 1, topic, messages: messages.slice(2) })
}
await waitForMessages(messagesConsumed, { number: 6 })
await waitForMessages(messagesConsumed, { number: 6, delay: 10 })

expect(messagesConsumed).toHaveLength(6)
expect(messagesConsumed).toContainEqual({ topic: pausedTopic, ...messages[0] }) // partition 0
Expand All @@ -117,7 +119,7 @@ describe('Consumer', () => {
expect(messagesConsumed).toContainEqual({ topic: activeTopic, ...messages[3] }) // partition 0

shouldPause = false
consumer.resume(consumer.paused())
resumeCallbacks.forEach(resume => resume())

await waitForMessages(messagesConsumed, { number: 8 })

Expand All @@ -127,56 +129,16 @@ describe('Consumer', () => {
expect(messagesConsumed).toContainEqual({ topic: pausedTopic, ...messages[3] }) // partition 0
})

it('pauses and resumes after timeout when pausing via the eachMessage callback', async () => {
it('pauses when pausing via the eachBatch callback', async () => {
await consumer.connect()
await producer.connect()
const messages = [0, 0, 0, 0].map(partition => {
const originalMessages = [0, 0, 0, 1].map((partition, i) => {
const key = secureRandom()
return { key: `key-${key}`, value: `value-${key}`, partition }
})

for (const topic of topics) {
await producer.send({ acks: 1, topic, messages: messages })
await consumer.subscribe({ topic, fromBeginning: true })
}

let shouldPause = true
const messagesConsumed = []
consumer.run({
eachMessage: async event => {
const { topic, message, pause, partition } = event
if (shouldPause && topic === topics[0] && String(message.key) === messages[1].key) {
pause(2000) // 2 seconds
} else if (
shouldPause &&
topic === topics[1] &&
String(message.key) === messages[3].key
) {
pause(3000) // 3 seconds
}
messagesConsumed.push({
topic,
key: String(message.key),
value: String(message.value),
partition,
})
},
})
await waitForConsumerToJoinGroup(consumer)
await waitForMessages(messagesConsumed, { number: 4 })
expect(consumer.paused()).toContainEqual({ topic: topics[0], partitions: [0] })
expect(consumer.paused()).toContainEqual({ topic: topics[1], partitions: [0] })
shouldPause = false
await waitForMessages(messagesConsumed, { number: 8 })
expect(consumer.paused()).toEqual([])
})

it('pauses and resumes after timeout when pausing via the eachBatch callback', async () => {
await consumer.connect()
await producer.connect()
const originalMessages = [0, 0, 0, 1].map(partition => {
const key = secureRandom()
return { key: `key-${key}`, value: `value-${key}`, partition }
return {
key: `key-${i}-${key}-${partition}`,
value: `value-${i}-${key}-${partition}`,
partition,
}
})

for (const topic of topics) {
Expand All @@ -186,6 +148,7 @@ describe('Consumer', () => {

let shouldPause = true
const messagesConsumed = []
const resumeCallbacks = []
consumer.run({
eachBatch: async event => {
const {
Expand All @@ -194,41 +157,53 @@ describe('Consumer', () => {
resolveOffset,
commitOffsetsIfNecessary,
} = event
messages.forEach(message => {
messages.every(message => {
if (
shouldPause &&
topic === topics[0] &&
String(message.key) === originalMessages[1].key
) {
pause(2000) // 2 seconds
resumeCallbacks.push(pause())
return false
} else if (
shouldPause &&
topic === topics[1] &&
String(message.key) === originalMessages[3].key
) {
pause(3000) // 3 seconds
resumeCallbacks.push(pause())
return false
}
messagesConsumed.push({ topic, key: String(message.key), partition })
messagesConsumed.push({
topic,
key: String(message.key),
value: String(message.value),
partition,
})
resolveOffset(message.offset)
return true
})
await commitOffsetsIfNecessary()
},
eachBatchAutoResolve: false,
})
await waitForConsumerToJoinGroup(consumer)
await waitForMessages(messagesConsumed, { number: 4 })
await waitForMessages(messagesConsumed, { number: 5 })
expect(consumer.paused()).toContainEqual({ topic: topics[0], partitions: [0] })
expect(consumer.paused()).toContainEqual({ topic: topics[1], partitions: [1] })
shouldPause = false
resumeCallbacks.forEach(resume => resume())
await waitForMessages(messagesConsumed, { number: 8 })
expect(consumer.paused()).toEqual([])
expect(messagesConsumed).toContainEqual({
topic: topics[0],
key: String(originalMessages[1].key),
value: String(originalMessages[1].value),
partition: 0,
})
expect(messagesConsumed).toContainEqual({
topic: topics[1],
key: String(originalMessages[3].key),
value: String(originalMessages[3].value),
partition: 1,
})
})
Expand Down
58 changes: 28 additions & 30 deletions src/consumer/runner.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const { EventEmitter } = require('events')
const Long = require('../utils/long')
const createRetry = require('../retry')
const { isKafkaJSError, isRebalancing, KafkaJSPauseConsumerError } = require('../errors')
const { isKafkaJSError, isRebalancing } = require('../errors')

const {
events: { FETCH, FETCH_START, START_BATCH_PROCESS, END_BATCH_PROCESS, REBALANCING },
Expand Down Expand Up @@ -182,6 +182,12 @@ module.exports = class Runner extends EventEmitter {
async processEachMessage(batch) {
const { topic, partition } = batch

let paused = false
const pause = () => {
paused = true
this.consumerGroup.pause([{ topic, partitions: [partition] }])
return () => this.consumerGroup.resume([{ topic, partitions: [partition] }])
}
for (const message of batch.messages) {
if (!this.running || this.consumerGroup.hasSeekOffset({ topic, partition })) {
break
Expand All @@ -193,22 +199,9 @@ module.exports = class Runner extends EventEmitter {
partition,
message,
heartbeat: () => this.heartbeat(),
pause: timeout => {
throw new KafkaJSPauseConsumerError(timeout)
},
pause,
})
} catch (e) {
if (e instanceof KafkaJSPauseConsumerError) {
this.consumerGroup.pause([{ topic, partitions: [partition] }])
if (e.timeout) {
setTimeout(
() => this.consumerGroup.resume([{ topic, partitions: [partition] }]),
e.timeout
)
}
await this.autoCommitOffsets()
break
}
if (!isKafkaJSError(e)) {
this.logger.error(`Error when calling eachMessage`, {
topic,
Expand All @@ -221,19 +214,33 @@ module.exports = class Runner extends EventEmitter {

// In case of errors, commit the previously consumed offsets unless autoCommit is disabled
await this.autoCommitOffsets()
if (paused) {
break
}
throw e
}

this.consumerGroup.resolveOffset({ topic, partition, offset: message.offset })
await this.heartbeat()
await this.autoCommitOffsetsIfNecessary()

if (paused) {
break
}
}
}

async processEachBatch(batch) {
const { topic, partition } = batch
const lastFilteredMessage = batch.messages[batch.messages.length - 1]

let paused = false
const pause = () => {
paused = true
this.consumerGroup.pause([{ topic, partitions: [partition] }])
return () => this.consumerGroup.resume([{ topic, partitions: [partition] }])
}

try {
await this.eachBatch({
batch,
Expand All @@ -260,11 +267,9 @@ module.exports = class Runner extends EventEmitter {
},
heartbeat: () => this.heartbeat(),
/**
* Pause consumption, committing whatever offsets have been resolved so far if auto-commit is enabled
* Pause consumption for the current topic/partition being processed
*/
pause: timeout => {
throw new KafkaJSPauseConsumerError(timeout)
},
pause,
/**
* Commit offsets if provided. Otherwise commit most recent resolved offsets
* if the autoCommit conditions are met.
Expand All @@ -281,17 +286,6 @@ module.exports = class Runner extends EventEmitter {
isStale: () => this.consumerGroup.hasSeekOffset({ topic, partition }),
})
} catch (e) {
if (e instanceof KafkaJSPauseConsumerError) {
this.consumerGroup.pause([{ topic, partitions: [partition] }])
if (e.timeout) {
setTimeout(
() => this.consumerGroup.resume([{ topic, partitions: [partition] }]),
e.timeout
)
}
await this.autoCommitOffsets()
return
}
if (!isKafkaJSError(e)) {
this.logger.error(`Error when calling eachBatch`, {
topic,
Expand All @@ -305,6 +299,10 @@ module.exports = class Runner extends EventEmitter {
// eachBatch has a special resolveOffset which can be used
// to keep track of the messages
await this.autoCommitOffsets()
if (paused) {
// we don't want to re-throw the exception because that will trigger a retry
return
}
throw e
}

Expand Down
8 changes: 0 additions & 8 deletions src/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -248,13 +248,6 @@ class KafkaJSAggregateError extends Error {

class KafkaJSFetcherRebalanceError extends Error {}

class KafkaJSPauseConsumerError extends Error {
constructor(timeout) {
super('Consumer paused on current topic/partition' + (timeout ? ` for ${timeout}ms` : ''))
this.timeout = timeout
}
}

const isRebalancing = e =>
e.type === 'REBALANCE_IN_PROGRESS' || e.type === 'NOT_COORDINATOR_FOR_GROUP'

Expand Down Expand Up @@ -290,7 +283,6 @@ module.exports = {
KafkaJSCreateTopicError,
KafkaJSAggregateError,
KafkaJSFetcherRebalanceError,
KafkaJSPauseConsumerError,
isRebalancing,
isKafkaJSError,
}

0 comments on commit 96ad17c

Please sign in to comment.