Skip to content

Commit

Permalink
Merge pull request #1335 from tulios/remove-deprecated-argument-from-…
Browse files Browse the repository at this point in the history
…fetchOffsets

Remove deprecated "topic" argument from admin.fetchOffsets
  • Loading branch information
Nevon committed May 2, 2022
2 parents 0451ea8 + 5283c32 commit 2c15fd7
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 76 deletions.
4 changes: 2 additions & 2 deletions docs/Admin.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ Include the optional `resolveOffsets` flag to resolve the offsets without having

```javascript
await admin.resetOffsets({ groupId, topic })
await admin.fetchOffsets({ groupId, topic, resolveOffsets: false })
await admin.fetchOffsets({ groupId, topics: [topic], resolveOffsets: false })
// [
// { partition: 0, offset: '-1' },
// { partition: 1, offset: '-1' },
Expand All @@ -219,7 +219,7 @@ await admin.fetchOffsets({ groupId, topic, resolveOffsets: false })
// ]

await admin.resetOffsets({ groupId, topic })
await admin.fetchOffsets({ groupId, topic, resolveOffsets: true })
await admin.fetchOffsets({ groupId, topics: [topic], resolveOffsets: true })
// [
// { partition: 0, offset: '31004' },
// { partition: 1, offset: '54312' },
Expand Down
88 changes: 55 additions & 33 deletions src/admin/__tests__/fetchOffsets.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,15 @@ describe('Admin', () => {
)
})

test('throws an error if both topic and topics are set', async () => {
await expect(
admin.fetchOffsets({ groupId: 'groupId', topic: topicName, topics: [topicName] })
).rejects.toThrow(KafkaJSNonRetriableError, 'Either topic or topics must be set, not both')
})

test('returns unresolved consumer group offsets', async () => {
const offsets = await admin.fetchOffsets({
groupId,
topic: topicName,
topics: [topicName],
})

expect(offsets).toEqual([{ partition: 0, offset: '-1', metadata: null }])
expect(offsets).toEqual([
{ topic: topicName, partitions: [{ partition: 0, offset: '-1', metadata: null }] },
])
})

test('returns the current consumer group offset', async () => {
Expand All @@ -78,10 +74,12 @@ describe('Admin', () => {

const offsets = await admin.fetchOffsets({
groupId,
topic: topicName,
topics: [topicName],
})

expect(offsets).toEqual([{ partition: 0, offset: '13', metadata: null }])
expect(offsets).toEqual([
{ topic: topicName, partitions: [{ partition: 0, offset: '13', metadata: null }] },
])
})

test('returns consumer group offsets for all topics', async () => {
Expand Down Expand Up @@ -194,61 +192,79 @@ describe('Admin', () => {
})
const offsetsUponResolving = await admin.fetchOffsets({
groupId,
topic: topicName,
topics: [topicName],
resolveOffsets: true,
})
const offsetsAfterResolving = await admin.fetchOffsets({
groupId,
topic: topicName,
topics: [topicName],
})

expect(offsetsBeforeResolving).toEqual([{ partition: 0, offset: '5', metadata: null }])
expect(offsetsUponResolving).toEqual([{ partition: 0, offset: '5', metadata: null }])
expect(offsetsAfterResolving).toEqual([{ partition: 0, offset: '5', metadata: null }])
expect(offsetsBeforeResolving).toEqual([
{ topic: topicName, partitions: [{ partition: 0, offset: '5', metadata: null }] },
])
expect(offsetsUponResolving).toEqual([
{ topic: topicName, partitions: [{ partition: 0, offset: '5', metadata: null }] },
])
expect(offsetsAfterResolving).toEqual([
{ topic: topicName, partitions: [{ partition: 0, offset: '5', metadata: null }] },
])
})

test('reset to latest: returns latest *topic* offsets after resolving', async () => {
await admin.resetOffsets({ groupId, topic: topicName })

const offsetsBeforeResolving = await admin.fetchOffsets({
groupId,
topic: topicName,
topics: [topicName],
})
const offsetsUponResolving = await admin.fetchOffsets({
groupId,
topic: topicName,
topics: [topicName],
resolveOffsets: true,
})
const offsetsAfterResolving = await admin.fetchOffsets({
groupId,
topic: topicName,
topics: [topicName],
})

expect(offsetsBeforeResolving).toEqual([{ partition: 0, offset: '-1', metadata: null }])
expect(offsetsUponResolving).toEqual([{ partition: 0, offset: '10', metadata: null }])
expect(offsetsAfterResolving).toEqual([{ partition: 0, offset: '10', metadata: null }])
expect(offsetsBeforeResolving).toEqual([
{ topic: topicName, partitions: [{ partition: 0, offset: '-1', metadata: null }] },
])
expect(offsetsUponResolving).toEqual([
{ topic: topicName, partitions: [{ partition: 0, offset: '10', metadata: null }] },
])
expect(offsetsAfterResolving).toEqual([
{ topic: topicName, partitions: [{ partition: 0, offset: '10', metadata: null }] },
])
})

test('reset to earliest: returns earliest *topic* offsets after resolving', async () => {
await admin.resetOffsets({ groupId, topic: topicName, earliest: true })

const offsetsBeforeResolving = await admin.fetchOffsets({
groupId,
topic: topicName,
topics: [topicName],
})
const offsetsUponResolving = await admin.fetchOffsets({
groupId,
topic: topicName,
topics: [topicName],
resolveOffsets: true,
})
const offsetsAfterResolving = await admin.fetchOffsets({
groupId,
topic: topicName,
topics: [topicName],
})

expect(offsetsBeforeResolving).toEqual([{ partition: 0, offset: '-2', metadata: null }])
expect(offsetsUponResolving).toEqual([{ partition: 0, offset: '0', metadata: null }])
expect(offsetsAfterResolving).toEqual([{ partition: 0, offset: '0', metadata: null }])
expect(offsetsBeforeResolving).toEqual([
{ topic: topicName, partitions: [{ partition: 0, offset: '-2', metadata: null }] },
])
expect(offsetsUponResolving).toEqual([
{ topic: topicName, partitions: [{ partition: 0, offset: '0', metadata: null }] },
])
expect(offsetsAfterResolving).toEqual([
{ topic: topicName, partitions: [{ partition: 0, offset: '0', metadata: null }] },
])
})

testIfKafkaAtLeast_0_11(
Expand All @@ -267,21 +283,27 @@ describe('Admin', () => {

const offsetsBeforeResolving = await admin.fetchOffsets({
groupId,
topic: topicName,
topics: [topicName],
})
const offsetsUponResolving = await admin.fetchOffsets({
groupId,
topic: topicName,
topics: [topicName],
resolveOffsets: true,
})
const offsetsAfterResolving = await admin.fetchOffsets({
groupId,
topic: topicName,
topics: [topicName],
})

expect(offsetsBeforeResolving).toEqual([{ partition: 0, offset: '-2', metadata: null }])
expect(offsetsUponResolving).toEqual([{ partition: 0, offset: '7', metadata: null }])
expect(offsetsAfterResolving).toEqual([{ partition: 0, offset: '7', metadata: null }])
expect(offsetsBeforeResolving).toEqual([
{ topic: topicName, partitions: [{ partition: 0, offset: '-2', metadata: null }] },
])
expect(offsetsUponResolving).toEqual([
{ topic: topicName, partitions: [{ partition: 0, offset: '7', metadata: null }] },
])
expect(offsetsAfterResolving).toEqual([
{ topic: topicName, partitions: [{ partition: 0, offset: '7', metadata: null }] },
])
}
)
})
Expand Down
12 changes: 8 additions & 4 deletions src/admin/__tests__/resetOffsets.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,12 @@ describe('Admin', () => {

const offsets = await admin.fetchOffsets({
groupId,
topic: topicName,
topics: [topicName],
})

expect(offsets).toEqual([{ partition: 0, offset: '-1', metadata: null }])
expect(offsets).toEqual([
{ topic: topicName, partitions: [{ partition: 0, offset: '-1', metadata: null }] },
])
})

test('set the consumer group offsets to the earliest offsets', async () => {
Expand All @@ -84,10 +86,12 @@ describe('Admin', () => {

const offsets = await admin.fetchOffsets({
groupId,
topic: topicName,
topics: [topicName],
})

expect(offsets).toEqual([{ partition: 0, offset: '-2', metadata: null }])
expect(offsets).toEqual([
{ topic: topicName, partitions: [{ partition: 0, offset: '-2', metadata: null }] },
])
})

test('throws an error if the consumer group is running', async () => {
Expand Down
9 changes: 7 additions & 2 deletions src/admin/__tests__/setOffsets.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,13 @@ describe('Admin', () => {
partitions: [{ partition: 0, offset: 13 }],
})

const offsets = await admin.fetchOffsets({ groupId, topic: topicName })
expect(offsets).toEqual([{ partition: 0, offset: '13', metadata: null }])
const offsets = await admin.fetchOffsets({ groupId, topics: [topicName] })
expect(offsets).toEqual([
{
topic: topicName,
partitions: [{ partition: 0, offset: '13', metadata: null }],
},
])
})

test('throws an error if the consumer group is running', async () => {
Expand Down
25 changes: 5 additions & 20 deletions src/admin/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -408,30 +408,21 @@ module.exports = ({
* Note: set either topic or topics but not both.
*
* @param {string} groupId
* @param {string} topic - deprecated, use the `topics` parameter. Topic to fetch offsets for.
* @param {string[]} topics - list of topics to fetch offsets for, defaults to `[]` which fetches all topics for `groupId`.
* @param {boolean} [resolveOffsets=false]
* @return {Promise}
*/
const fetchOffsets = async ({ groupId, topic, topics, resolveOffsets = false }) => {
const fetchOffsets = async ({ groupId, topics, resolveOffsets = false }) => {
if (!groupId) {
throw new KafkaJSNonRetriableError(`Invalid groupId ${groupId}`)
}

if (!topic && !topics) {
if (!topics) {
topics = []
}

if (!topic && !Array.isArray(topics)) {
throw new KafkaJSNonRetriableError(`Expected topic or topics array to be set`)
}

if (topic && topics) {
throw new KafkaJSNonRetriableError(`Either topic or topics must be set, not both`)
}

if (topic) {
topics = [topic]
if (!Array.isArray(topics)) {
throw new KafkaJSNonRetriableError('Expected topics array to be set')
}

const coordinator = await cluster.findGroupCoordinator({ groupId })
Expand Down Expand Up @@ -476,7 +467,7 @@ module.exports = ({
)
}

const result = consumerOffsets.map(({ topic, partitions }) => {
return consumerOffsets.map(({ topic, partitions }) => {
const completePartitions = partitions.map(({ partition, offset, metadata }) => ({
partition,
offset,
Expand All @@ -485,12 +476,6 @@ module.exports = ({

return { topic, partitions: completePartitions }
})

if (topic) {
return result.pop().partitions
} else {
return result
}
}

/**
Expand Down
4 changes: 3 additions & 1 deletion src/consumer/__tests__/consumeMessages.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,9 @@ describe('Consumer', () => {

// check if all offsets are present
expect(messagesConsumed.map(m => m.message.offset)).toEqual(messages.map((_, i) => `${i}`))
const [partition] = await admin.fetchOffsets({ groupId, topic: topicName })
const response = await admin.fetchOffsets({ groupId, topics: [topicName] })
const { partitions } = response.find(({ topic }) => topic === topicName)
const partition = partitions.find(({ partition }) => partition === 0)
expect(partition.offset).toEqual('100') // check if offsets were committed
})

Expand Down
15 changes: 10 additions & 5 deletions src/consumer/__tests__/seek.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,16 @@ describe('Consumer', () => {
}),
])

await expect(admin.fetchOffsets({ groupId, topic: topicName })).resolves.toEqual([
expect.objectContaining({
partition: 0,
offset: '-1',
}),
await expect(admin.fetchOffsets({ groupId, topics: [topicName] })).resolves.toEqual([
{
topic: topicName,
partitions: expect.arrayContaining([
expect.objectContaining({
partition: 0,
offset: '-1',
}),
]),
},
])

messagesConsumed = []
Expand Down
8 changes: 0 additions & 8 deletions types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -451,14 +451,6 @@ export type Admin = {
topicPartitions: ITopicPartitionConfig[]
}): Promise<boolean>
fetchTopicMetadata(options?: { topics: string[] }): Promise<{ topics: Array<ITopicMetadata> }>
/**
* @deprecated "topic: string" replaced by "topics: string[]"
*/
fetchOffsets(options: {
groupId: string
topic: string
resolveOffsets?: boolean
}): Promise<FetchOffsetsPartition[]>
fetchOffsets(options: {
groupId: string
topics?: string[]
Expand Down
1 change: 0 additions & 1 deletion types/tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ const runAdmin = async () => {
await admin.listTopics()

await admin.fetchOffsets({ groupId: 'test-group' })
await admin.fetchOffsets({ groupId: 'test-group', topic: 'topic1' })
await admin.fetchOffsets({ groupId: 'test-group', topics: ['topic1', 'topic2'] })

await admin.createTopics({
Expand Down

0 comments on commit 2c15fd7

Please sign in to comment.