Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent idempotent producer error handling fixes #1172

Merged
merged 6 commits into from
Feb 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
60 changes: 42 additions & 18 deletions src/cluster/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const BrokerPool = require('./brokerPool')
const Lock = require('../utils/lock')
const sharedPromiseTo = require('../utils/sharedPromiseTo')
const createRetry = require('../retry')
const connectionBuilder = require('./connectionBuilder')
const flatten = require('../utils/flatten')
Expand All @@ -20,6 +21,13 @@ const mergeTopics = (obj, { topic, partitions }) => ({
[topic]: [...(obj[topic] || []), ...partitions],
})

const PRIVATE = {
CONNECT: Symbol('private:Cluster:connect'),
REFRESH_METADATA: Symbol('private:Cluster:refreshMetadata'),
REFRESH_METADATA_IF_NECESSARY: Symbol('private:Cluster:refreshMetadataIfNecessary'),
FIND_CONTROLLER_BROKER: Symbol('private:Cluster:findControllerBroker'),
}

module.exports = class Cluster {
/**
* @param {Object} options
Expand Down Expand Up @@ -95,6 +103,36 @@ module.exports = class Cluster {
metadataMaxAge,
})
this.committedOffsetsByGroup = offsets

this[PRIVATE.CONNECT] = sharedPromiseTo(async () => {
return await this.brokerPool.connect()
})

this[PRIVATE.REFRESH_METADATA] = sharedPromiseTo(async () => {
return await this.brokerPool.refreshMetadata(Array.from(this.targetTopics))
})

this[PRIVATE.REFRESH_METADATA_IF_NECESSARY] = sharedPromiseTo(async () => {
return await this.brokerPool.refreshMetadataIfNecessary(Array.from(this.targetTopics))
})

this[PRIVATE.FIND_CONTROLLER_BROKER] = sharedPromiseTo(async () => {
const { metadata } = this.brokerPool

if (!metadata || metadata.controllerId == null) {
throw new KafkaJSMetadataNotLoaded('Topic metadata not loaded')
}

const broker = await this.findBroker({ nodeId: metadata.controllerId })

if (!broker) {
throw new KafkaJSBrokerNotFound(
`Controller broker with id ${metadata.controllerId} not found in the cached metadata`
)
}

return broker
})
}

isConnected() {
Expand All @@ -106,7 +144,7 @@ module.exports = class Cluster {
* @returns {Promise<void>}
*/
async connect() {
await this.brokerPool.connect()
await this[PRIVATE.CONNECT]()
}

/**
Expand All @@ -132,15 +170,15 @@ module.exports = class Cluster {
* @returns {Promise<void>}
*/
async refreshMetadata() {
await this.brokerPool.refreshMetadata(Array.from(this.targetTopics))
await this[PRIVATE.REFRESH_METADATA]()
}

/**
* @public
* @returns {Promise<void>}
*/
async refreshMetadataIfNecessary() {
await this.brokerPool.refreshMetadataIfNecessary(Array.from(this.targetTopics))
await this[PRIVATE.REFRESH_METADATA_IF_NECESSARY]()
}

/**
Expand Down Expand Up @@ -232,21 +270,7 @@ module.exports = class Cluster {
* @returns {Promise<import("../../types").Broker>}
*/
async findControllerBroker() {
const { metadata } = this.brokerPool

if (!metadata || metadata.controllerId == null) {
throw new KafkaJSMetadataNotLoaded('Topic metadata not loaded')
}

const broker = await this.findBroker({ nodeId: metadata.controllerId })

if (!broker) {
throw new KafkaJSBrokerNotFound(
`Controller broker with id ${metadata.controllerId} not found in the cached metadata`
)
}

return broker
return await this[PRIVATE.FIND_CONTROLLER_BROKER]()
}

/**
Expand Down
7 changes: 3 additions & 4 deletions src/consumer/consumerGroup.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ const isRebalancing = e =>
const PRIVATE = {
JOIN: Symbol('private:ConsumerGroup:join'),
SYNC: Symbol('private:ConsumerGroup:sync'),
HEARTBEAT: Symbol('private:ConsumerGroup:heartbeat'),
SHAREDHEARTBEAT: Symbol('private:ConsumerGroup:sharedHeartbeat'),
SHARED_HEARTBEAT: Symbol('private:ConsumerGroup:sharedHeartbeat'),
}

module.exports = class ConsumerGroup {
Expand Down Expand Up @@ -111,7 +110,7 @@ module.exports = class ConsumerGroup {

this.lastRequest = Date.now()

this[PRIVATE.SHAREDHEARTBEAT] = sharedPromiseTo(async ({ interval }) => {
this[PRIVATE.SHARED_HEARTBEAT] = sharedPromiseTo(async ({ interval }) => {
const { groupId, generationId, memberId } = this
const now = Date.now()

Expand Down Expand Up @@ -404,7 +403,7 @@ module.exports = class ConsumerGroup {
}

async heartbeat({ interval }) {
return this[PRIVATE.SHAREDHEARTBEAT]({ interval })
return this[PRIVATE.SHARED_HEARTBEAT]({ interval })
}

async fetch() {
Expand Down
50 changes: 22 additions & 28 deletions src/producer/__tests__/idempotentProduceMessages.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ const {
createCluster,
createTopic,
waitForMessages,
waitFor,
} = require('testHelpers')
const { KafkaJSError, KafkaJSProtocolError } = require('../../errors')
const { KafkaJSError } = require('../../errors')

const createProducer = require('../index')
const createConsumer = require('../../consumer/index')
Expand Down Expand Up @@ -136,61 +135,56 @@ describe('Producer > Idempotent producer', () => {
await consumer.run({ eachMessage: async message => messagesConsumed.push(message) })

await waitForMessages(messagesConsumed, { number: messages.length })

expect(arrayUnique(messagesConsumed.map(({ message: { value } }) => value))).toHaveLength(
messages.length
)
expect(messagesConsumed).toHaveLength(messages.length)
})

it('concurrent produce() calls > where produce() throws a retriable error on the first call, all subsequent calls throw UNKNOWN_PRODUCER_ID', async () => {
it('concurrent produce() calls > where produce() throws a retriable error on the first call, all messages are written to the partition once', async () => {
for (const nodeId of [0, 1, 2]) {
const broker = await cluster.findBroker({ nodeId })

const brokerProduce = jest.spyOn(broker, 'produce')
brokerProduce.mockImplementationOnce(async () => {
await waitFor(() => brokerProduce.mock.calls.length >= messages.length) // for all the other concurrent calls to have completed
throw new KafkaJSError('retriable error')
})
}

const settlements = await PromiseAllSettled(
await PromiseAllSettled(
messages.map(m => producer.send({ acks: -1, topic: topicName, messages: [m] }))
).catch(e => e)
)

settlements
.filter(({ status }) => status === 'rejected')
.forEach(({ reason }) => {
expect(reason).toBeInstanceOf(KafkaJSProtocolError)
expect(reason.type).toBe('UNKNOWN_PRODUCER_ID')
})
const messagesConsumed = []
await consumer.run({ eachMessage: async message => messagesConsumed.push(message) })

expect(settlements.filter(({ status }) => status === 'fulfilled')).toHaveLength(1)
await waitForMessages(messagesConsumed, { number: messages.length })

expect(arrayUnique(messagesConsumed.map(({ message: { value } }) => value))).toHaveLength(
messages.length
)
})

it('concurrent produce() calls > where produce() throws a retriable error on 2nd call, all subsequent calls throw OUT_OF_ORDER_SEQUENCE_NUMBER', async () => {
it('concurrent produce() calls > where produce() throws a retriable error on 2nd call, all messages are written to the partition once', async () => {
for (const nodeId of [0, 1, 2]) {
const broker = await cluster.findBroker({ nodeId })

const brokerProduce = jest.spyOn(broker, 'produce')
brokerProduce.mockImplementationOnce()
brokerProduce.mockImplementationOnce(async () => {
await waitFor(() => brokerProduce.mock.calls.length >= messages.length) // for all the other concurrent calls to have completed
throw new KafkaJSError('retriable error')
})
}

const settlements = await PromiseAllSettled(
await PromiseAllSettled(
messages.map(m => producer.send({ acks: -1, topic: topicName, messages: [m] }))
).catch(e => e)
)

settlements
.filter(({ status }) => status === 'rejected')
.forEach(({ reason }) => {
expect(reason).toBeInstanceOf(KafkaJSProtocolError)
expect(reason.type).toBe('OUT_OF_ORDER_SEQUENCE_NUMBER')
})
const messagesConsumed = []
await consumer.run({ eachMessage: async message => messagesConsumed.push(message) })

expect(settlements.filter(({ status }) => status === 'fulfilled')).toHaveLength(2)
await waitForMessages(messagesConsumed, { number: messages.length })

expect(arrayUnique(messagesConsumed.map(({ message: { value } }) => value))).toHaveLength(
messages.length
)
})

it('concurrent produce() calls > where produce() throws a retriable error after the message is written to the log, all messages are written to the partition once', async () => {
Expand Down
19 changes: 19 additions & 0 deletions src/producer/eosManager/index.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const createRetry = require('../../retry')
const Lock = require('../../utils/lock')
const { KafkaJSNonRetriableError } = require('../../errors')
const COORDINATOR_TYPES = require('../../protocol/coordinatorTypes')
const createStateMachine = require('./transactionStateMachine')
Expand Down Expand Up @@ -64,6 +65,11 @@ module.exports = ({
*/
let producerSequence = {}

/**
* Idempotent production requires a mutex lock per broker to serialize requests with sequence number handling
*/
let brokerMutexLocks = {}

/**
* Topic partitions already participating in the transaction
*/
Expand Down Expand Up @@ -134,6 +140,7 @@ module.exports = ({
producerId = result.producerId
producerEpoch = result.producerEpoch
producerSequence = {}
brokerMutexLocks = {}

logger.debug('Initialized producer id & epoch', { producerId, producerEpoch })
} catch (e) {
Expand Down Expand Up @@ -304,6 +311,18 @@ module.exports = ({
return stateMachine.state() === STATES.TRANSACTING
},

async acquireBrokerLock(broker) {
if (this.isInitialized()) {
brokerMutexLocks[broker.nodeId] =
brokerMutexLocks[broker.nodeId] || new Lock({ timeout: 0xffff })
await brokerMutexLocks[broker.nodeId].acquire()
}
},

releaseBrokerLock(broker) {
if (this.isInitialized()) brokerMutexLocks[broker.nodeId].release()
},

/**
* Mark the provided offsets as participating in the transaction for the given consumer group.
*
Expand Down
3 changes: 3 additions & 0 deletions src/producer/sendMessages.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ module.exports = ({ logger, cluster, partitioner, eosManager, retrier }) => {

const topicData = createTopicData(topicDataForBroker)

await eosManager.acquireBrokerLock(broker)
try {
if (eosManager.isTransactional()) {
await eosManager.addPartitionsToTransaction(topicData)
Expand Down Expand Up @@ -118,6 +119,8 @@ module.exports = ({ logger, cluster, partitioner, eosManager, retrier }) => {
} catch (e) {
responsePerBroker.delete(broker)
throw e
} finally {
await eosManager.releaseBrokerLock(broker)
}
})
}
Expand Down
2 changes: 2 additions & 0 deletions src/producer/sendMessages.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ describe('Producer > sendMessages', () => {
updateSequence: jest.fn(),
isTransactional: jest.fn().mockReturnValue(false),
addPartitionsToTransaction: jest.fn(),
acquireBrokerLock: jest.fn(),
releaseBrokerLock: jest.fn(),
}

retrier = retry({ retries: 5 })
Expand Down