Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove deprecated "topic" argument from admin.fetchOffsets #1335

Merged
merged 2 commits into from
May 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/Admin.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ Include the optional `resolveOffsets` flag to resolve the offsets without having

```javascript
await admin.resetOffsets({ groupId, topic })
await admin.fetchOffsets({ groupId, topic, resolveOffsets: false })
await admin.fetchOffsets({ groupId, topics: [topic], resolveOffsets: false })
// [
// { partition: 0, offset: '-1' },
// { partition: 1, offset: '-1' },
Expand All @@ -219,7 +219,7 @@ await admin.fetchOffsets({ groupId, topic, resolveOffsets: false })
// ]

await admin.resetOffsets({ groupId, topic })
await admin.fetchOffsets({ groupId, topic, resolveOffsets: true })
await admin.fetchOffsets({ groupId, topics: [topic], resolveOffsets: true })
// [
// { partition: 0, offset: '31004' },
// { partition: 1, offset: '54312' },
Expand Down
88 changes: 55 additions & 33 deletions src/admin/__tests__/fetchOffsets.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,15 @@ describe('Admin', () => {
)
})

test('throws an error if both topic and topics are set', async () => {
await expect(
admin.fetchOffsets({ groupId: 'groupId', topic: topicName, topics: [topicName] })
).rejects.toThrow(KafkaJSNonRetriableError, 'Either topic or topics must be set, not both')
})

test('returns unresolved consumer group offsets', async () => {
const offsets = await admin.fetchOffsets({
groupId,
topic: topicName,
topics: [topicName],
})

expect(offsets).toEqual([{ partition: 0, offset: '-1', metadata: null }])
expect(offsets).toEqual([
{ topic: topicName, partitions: [{ partition: 0, offset: '-1', metadata: null }] },
])
})

test('returns the current consumer group offset', async () => {
Expand All @@ -78,10 +74,12 @@ describe('Admin', () => {

const offsets = await admin.fetchOffsets({
groupId,
topic: topicName,
topics: [topicName],
})

expect(offsets).toEqual([{ partition: 0, offset: '13', metadata: null }])
expect(offsets).toEqual([
{ topic: topicName, partitions: [{ partition: 0, offset: '13', metadata: null }] },
])
})

test('returns consumer group offsets for all topics', async () => {
Expand Down Expand Up @@ -194,61 +192,79 @@ describe('Admin', () => {
})
const offsetsUponResolving = await admin.fetchOffsets({
groupId,
topic: topicName,
topics: [topicName],
resolveOffsets: true,
})
const offsetsAfterResolving = await admin.fetchOffsets({
groupId,
topic: topicName,
topics: [topicName],
})

expect(offsetsBeforeResolving).toEqual([{ partition: 0, offset: '5', metadata: null }])
expect(offsetsUponResolving).toEqual([{ partition: 0, offset: '5', metadata: null }])
expect(offsetsAfterResolving).toEqual([{ partition: 0, offset: '5', metadata: null }])
expect(offsetsBeforeResolving).toEqual([
{ topic: topicName, partitions: [{ partition: 0, offset: '5', metadata: null }] },
])
expect(offsetsUponResolving).toEqual([
{ topic: topicName, partitions: [{ partition: 0, offset: '5', metadata: null }] },
])
expect(offsetsAfterResolving).toEqual([
{ topic: topicName, partitions: [{ partition: 0, offset: '5', metadata: null }] },
])
})

test('reset to latest: returns latest *topic* offsets after resolving', async () => {
await admin.resetOffsets({ groupId, topic: topicName })

const offsetsBeforeResolving = await admin.fetchOffsets({
groupId,
topic: topicName,
topics: [topicName],
})
const offsetsUponResolving = await admin.fetchOffsets({
groupId,
topic: topicName,
topics: [topicName],
resolveOffsets: true,
})
const offsetsAfterResolving = await admin.fetchOffsets({
groupId,
topic: topicName,
topics: [topicName],
})

expect(offsetsBeforeResolving).toEqual([{ partition: 0, offset: '-1', metadata: null }])
expect(offsetsUponResolving).toEqual([{ partition: 0, offset: '10', metadata: null }])
expect(offsetsAfterResolving).toEqual([{ partition: 0, offset: '10', metadata: null }])
expect(offsetsBeforeResolving).toEqual([
{ topic: topicName, partitions: [{ partition: 0, offset: '-1', metadata: null }] },
])
expect(offsetsUponResolving).toEqual([
{ topic: topicName, partitions: [{ partition: 0, offset: '10', metadata: null }] },
])
expect(offsetsAfterResolving).toEqual([
{ topic: topicName, partitions: [{ partition: 0, offset: '10', metadata: null }] },
])
})

test('reset to earliest: returns earliest *topic* offsets after resolving', async () => {
await admin.resetOffsets({ groupId, topic: topicName, earliest: true })

const offsetsBeforeResolving = await admin.fetchOffsets({
groupId,
topic: topicName,
topics: [topicName],
})
const offsetsUponResolving = await admin.fetchOffsets({
groupId,
topic: topicName,
topics: [topicName],
resolveOffsets: true,
})
const offsetsAfterResolving = await admin.fetchOffsets({
groupId,
topic: topicName,
topics: [topicName],
})

expect(offsetsBeforeResolving).toEqual([{ partition: 0, offset: '-2', metadata: null }])
expect(offsetsUponResolving).toEqual([{ partition: 0, offset: '0', metadata: null }])
expect(offsetsAfterResolving).toEqual([{ partition: 0, offset: '0', metadata: null }])
expect(offsetsBeforeResolving).toEqual([
{ topic: topicName, partitions: [{ partition: 0, offset: '-2', metadata: null }] },
])
expect(offsetsUponResolving).toEqual([
{ topic: topicName, partitions: [{ partition: 0, offset: '0', metadata: null }] },
])
expect(offsetsAfterResolving).toEqual([
{ topic: topicName, partitions: [{ partition: 0, offset: '0', metadata: null }] },
])
})

testIfKafkaAtLeast_0_11(
Expand All @@ -267,21 +283,27 @@ describe('Admin', () => {

const offsetsBeforeResolving = await admin.fetchOffsets({
groupId,
topic: topicName,
topics: [topicName],
})
const offsetsUponResolving = await admin.fetchOffsets({
groupId,
topic: topicName,
topics: [topicName],
resolveOffsets: true,
})
const offsetsAfterResolving = await admin.fetchOffsets({
groupId,
topic: topicName,
topics: [topicName],
})

expect(offsetsBeforeResolving).toEqual([{ partition: 0, offset: '-2', metadata: null }])
expect(offsetsUponResolving).toEqual([{ partition: 0, offset: '7', metadata: null }])
expect(offsetsAfterResolving).toEqual([{ partition: 0, offset: '7', metadata: null }])
expect(offsetsBeforeResolving).toEqual([
{ topic: topicName, partitions: [{ partition: 0, offset: '-2', metadata: null }] },
])
expect(offsetsUponResolving).toEqual([
{ topic: topicName, partitions: [{ partition: 0, offset: '7', metadata: null }] },
])
expect(offsetsAfterResolving).toEqual([
{ topic: topicName, partitions: [{ partition: 0, offset: '7', metadata: null }] },
])
}
)
})
Expand Down
12 changes: 8 additions & 4 deletions src/admin/__tests__/resetOffsets.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,12 @@ describe('Admin', () => {

const offsets = await admin.fetchOffsets({
groupId,
topic: topicName,
topics: [topicName],
})

expect(offsets).toEqual([{ partition: 0, offset: '-1', metadata: null }])
expect(offsets).toEqual([
{ topic: topicName, partitions: [{ partition: 0, offset: '-1', metadata: null }] },
])
})

test('set the consumer group offsets to the earliest offsets', async () => {
Expand All @@ -84,10 +86,12 @@ describe('Admin', () => {

const offsets = await admin.fetchOffsets({
groupId,
topic: topicName,
topics: [topicName],
})

expect(offsets).toEqual([{ partition: 0, offset: '-2', metadata: null }])
expect(offsets).toEqual([
{ topic: topicName, partitions: [{ partition: 0, offset: '-2', metadata: null }] },
])
})

test('throws an error if the consumer group is running', async () => {
Expand Down
9 changes: 7 additions & 2 deletions src/admin/__tests__/setOffsets.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,13 @@ describe('Admin', () => {
partitions: [{ partition: 0, offset: 13 }],
})

const offsets = await admin.fetchOffsets({ groupId, topic: topicName })
expect(offsets).toEqual([{ partition: 0, offset: '13', metadata: null }])
const offsets = await admin.fetchOffsets({ groupId, topics: [topicName] })
expect(offsets).toEqual([
{
topic: topicName,
partitions: [{ partition: 0, offset: '13', metadata: null }],
},
])
})

test('throws an error if the consumer group is running', async () => {
Expand Down
25 changes: 5 additions & 20 deletions src/admin/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -408,30 +408,21 @@ module.exports = ({
* Note: set either topic or topics but not both.
*
* @param {string} groupId
* @param {string} topic - deprecated, use the `topics` parameter. Topic to fetch offsets for.
* @param {string[]} topics - list of topics to fetch offsets for, defaults to `[]` which fetches all topics for `groupId`.
* @param {boolean} [resolveOffsets=false]
* @return {Promise}
*/
const fetchOffsets = async ({ groupId, topic, topics, resolveOffsets = false }) => {
const fetchOffsets = async ({ groupId, topics, resolveOffsets = false }) => {
if (!groupId) {
throw new KafkaJSNonRetriableError(`Invalid groupId ${groupId}`)
}

if (!topic && !topics) {
if (!topics) {
topics = []
}

if (!topic && !Array.isArray(topics)) {
throw new KafkaJSNonRetriableError(`Expected topic or topics array to be set`)
}

if (topic && topics) {
throw new KafkaJSNonRetriableError(`Either topic or topics must be set, not both`)
}

if (topic) {
topics = [topic]
if (!Array.isArray(topics)) {
throw new KafkaJSNonRetriableError('Expected topics array to be set')
}

const coordinator = await cluster.findGroupCoordinator({ groupId })
Expand Down Expand Up @@ -476,7 +467,7 @@ module.exports = ({
)
}

const result = consumerOffsets.map(({ topic, partitions }) => {
return consumerOffsets.map(({ topic, partitions }) => {
const completePartitions = partitions.map(({ partition, offset, metadata }) => ({
partition,
offset,
Expand All @@ -485,12 +476,6 @@ module.exports = ({

return { topic, partitions: completePartitions }
})

if (topic) {
return result.pop().partitions
} else {
return result
}
}

/**
Expand Down
4 changes: 3 additions & 1 deletion src/consumer/__tests__/consumeMessages.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,9 @@ describe('Consumer', () => {

// check if all offsets are present
expect(messagesConsumed.map(m => m.message.offset)).toEqual(messages.map((_, i) => `${i}`))
const [partition] = await admin.fetchOffsets({ groupId, topic: topicName })
const response = await admin.fetchOffsets({ groupId, topics: [topicName] })
const { partitions } = response.find(({ topic }) => topic === topicName)
const partition = partitions.find(({ partition }) => partition === 0)
expect(partition.offset).toEqual('100') // check if offsets were committed
})

Expand Down
15 changes: 10 additions & 5 deletions src/consumer/__tests__/seek.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,16 @@ describe('Consumer', () => {
}),
])

await expect(admin.fetchOffsets({ groupId, topic: topicName })).resolves.toEqual([
expect.objectContaining({
partition: 0,
offset: '-1',
}),
await expect(admin.fetchOffsets({ groupId, topics: [topicName] })).resolves.toEqual([
{
topic: topicName,
partitions: expect.arrayContaining([
expect.objectContaining({
partition: 0,
offset: '-1',
}),
]),
},
])

messagesConsumed = []
Expand Down
8 changes: 0 additions & 8 deletions types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -451,14 +451,6 @@ export type Admin = {
topicPartitions: ITopicPartitionConfig[]
}): Promise<boolean>
fetchTopicMetadata(options?: { topics: string[] }): Promise<{ topics: Array<ITopicMetadata> }>
/**
* @deprecated "topic: string" replaced by "topics: string[]"
*/
fetchOffsets(options: {
groupId: string
topic: string
resolveOffsets?: boolean
}): Promise<FetchOffsetsPartition[]>
fetchOffsets(options: {
groupId: string
topics?: string[]
Expand Down
1 change: 0 additions & 1 deletion types/tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ const runAdmin = async () => {
await admin.listTopics()

await admin.fetchOffsets({ groupId: 'test-group' })
await admin.fetchOffsets({ groupId: 'test-group', topic: 'topic1' })
await admin.fetchOffsets({ groupId: 'test-group', topics: ['topic1', 'topic2'] })

await admin.createTopics({
Expand Down