Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIP-455: Implement alterPartitionReassignments/listPartitionReassignments admin methods #1419

Merged
merged 21 commits into from
Jul 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
60 changes: 60 additions & 0 deletions docs/Admin.md
Original file line number Diff line number Diff line change
Expand Up @@ -680,3 +680,63 @@ Be aware that the security features might be disabled in your cluster. In that c
```sh
KafkaJSProtocolError: Security features are disabled
```

## <a name="alter-partition-reassignments"></a> Alter Partition Reassignments
This is used to reassign the replicas that partitions are on. This method will throw exceptions in the case of errors.

```typescript
await admin.alterPartitionReassignments({
topics: <PartitionReassignment[]>,
timeout: <Number> // optional - 5000 default
})
```

PartitionReassignment Structure:
```typescript
{
topic: <String>,
partitionAssignment: <Number[]> // Example: [{ partition: 0, replicas: [0,1,2] }]
}
```

## <a name="list-partition-reassignments"></a> List Partition Reassignments
This is used to list current partition reassignments in progress. This method will throw exceptions in the case of errors and resolve to ListPartitionReassignmentsResponse on success. If a requested partition does not exist it will not be included in the response.

```javascript
await admin.listPartitionReassignments({
topics: <TopicPartitions[]>, // optional, if null then all topics will be returned.
timeout: <Number> // optional - 5000 default
})
```

TopicPartitions Structure:
```typescript
{
topic: <String>,
partitions: <Array>
}
```

Resulting ListPartitionReassignmentsResponse Structure:
```typescript
{
topics: <OngoingTopicReassignment[]>
}
```
OngoingTopicReassignment Structure:
```typescript
{
topic: <String>,
partitions: <OngoingPartitionReassignment[]>
}
```
OngoingPartitionReassignment Structure:
```typescript
{
partitionIndex: <Number>,
replicas: <Number[]>, // The current replica set
addingReplicas: <Number[]> // The set of replicas being added
removingReplicas: <Number[]> // The set of replicas being removed
}
```
**Note:** If a partition is not going through a reassignment, its AddingReplicas and RemovingReplicas fields will simply be empty.
203 changes: 203 additions & 0 deletions src/admin/__tests__/alterPartitionReassignments.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
const createAdmin = require('../index')
const { KafkaJSProtocolError } = require('../../errors')
const { createErrorFromCode } = require('../../protocol/error')

const { secureRandom, createCluster, newLogger } = require('testHelpers')

const NOT_CONTROLLER = 41

describe('Admin', () => {
let topicName, admin

beforeEach(() => {
topicName = `test-topic-${secureRandom()}`
})

afterEach(async () => {
admin && (await admin.disconnect())
})

describe('alterPartitionReassignments', () => {
test('throws an error if the topics array is invalid', async () => {
admin = createAdmin({ cluster: createCluster(), logger: newLogger() })
await expect(admin.alterPartitionReassignments({ topics: null })).rejects.toHaveProperty(
'message',
'Invalid topics array null'
)

await expect(
admin.alterPartitionReassignments({ topics: 'this-is-not-an-array' })
).rejects.toHaveProperty('message', 'Invalid topics array this-is-not-an-array')
})

test('throws an error if the topic name is not a valid string', async () => {
admin = createAdmin({ cluster: createCluster(), logger: newLogger() })
await expect(
admin.alterPartitionReassignments({ topics: [{ topic: 123 }] })
).rejects.toHaveProperty(
'message',
'Invalid topics array, the topic names have to be a valid string'
)
})

test('throws an error if there are multiple entries for the same topic', async () => {
admin = createAdmin({ cluster: createCluster(), logger: newLogger() })
const topics = [{ topic: 'topic-123' }, { topic: 'topic-123' }]
await expect(admin.alterPartitionReassignments({ topics })).rejects.toHaveProperty(
'message',
'Invalid topics array, it cannot have multiple entries for the same topic'
)
})

test('throws an error if the partitionAssignment array is not valid', async () => {
admin = createAdmin({ cluster: createCluster(), logger: newLogger() })
await expect(
admin.alterPartitionReassignments({
topics: [{ topic: 'topic-123', partitionAssignment: null }],
})
).rejects.toHaveProperty('message', 'Invalid partitions array: null for topic: topic-123')

await expect(
admin.alterPartitionReassignments({
topics: [{ topic: 'topic-123', partitionAssignment: 'this-is-not-an-array' }],
})
).rejects.toHaveProperty(
'message',
'Invalid partitions array: this-is-not-an-array for topic: topic-123'
)
})

test('throws an error if the partition index is not valid', async () => {
admin = createAdmin({ cluster: createCluster(), logger: newLogger() })
await expect(
admin.alterPartitionReassignments({
topics: [{ topic: 'topic-123', partitionAssignment: [{ partition: null }] }],
})
).rejects.toHaveProperty('message', 'Invalid partitions index: null for topic: topic-123')

await expect(
admin.alterPartitionReassignments({
topics: [{ topic: 'topic-123', partitionAssignment: [{ partition: -1 }] }],
})
).rejects.toHaveProperty('message', 'Invalid partitions index: -1 for topic: topic-123')
})

test('throws an error if the replicas array is not valid', async () => {
admin = createAdmin({ cluster: createCluster(), logger: newLogger() })
await expect(
admin.alterPartitionReassignments({
topics: [{ topic: 'topic-123', partitionAssignment: [{ partition: 0, replicas: null }] }],
})
).rejects.toHaveProperty(
'message',
'Invalid replica assignment: null for topic: topic-123 on partition: 0'
)

await expect(
admin.alterPartitionReassignments({
topics: [
{ topic: 'topic-123', partitionAssignment: [{ partition: 0, replicas: [0, 'a'] }] },
],
})
).rejects.toHaveProperty(
'message',
'Invalid replica assignment: 0,a for topic: topic-123 on partition: 0. Replicas must be a non negative number'
)
})

test('throws an error if the if trying to reassign partitions for a topic that does not exist', async () => {
admin = createAdmin({ cluster: createCluster(), logger: newLogger() })

await admin.connect()

await expect(
admin.alterPartitionReassignments({
topics: [
{ topic: topicName + 'x', partitionAssignment: [{ partition: 0, replicas: [2, 1] }] },
],
})
).rejects.toHaveProperty('message', 'This server does not host this topic-partition')
})

test('throws an error trying to assign invalid broker', async () => {
admin = createAdmin({ cluster: createCluster(), logger: newLogger() })

await admin.connect()

await expect(
admin.createTopics({
waitForLeaders: false,
topics: [
{
topic: topicName,
replicaAssignment: [
{ partition: 0, replicas: [1, 0] },
{ partition: 1, replicas: [2, 1] },
],
},
],
})
).resolves.toEqual(true)

await expect(
admin.alterPartitionReassignments({
topics: [
{
topic: topicName,
partitionAssignment: [{ partition: 0, replicas: [5, 1] }],
},
],
})
).rejects.toHaveProperty('message', 'Replica assignment is invalid')
})

test('reassign partitions', async () => {
admin = createAdmin({ cluster: createCluster(), logger: newLogger() })

await admin.connect()
await expect(
admin.createTopics({
topics: [
{
topic: topicName,
replicaAssignment: [
{ partition: 0, replicas: [1, 0] },
{ partition: 1, replicas: [2, 1] },
],
},
],
})
).resolves.toEqual(true)

await expect(
admin.alterPartitionReassignments({
topics: [{ topic: topicName, partitionAssignment: [{ partition: 0, replicas: [2, 1] }] }],
})
).resolves.not.toThrow()
})

test('retries if the controller has moved', async () => {
const cluster = createCluster()
const broker = { alterPartitionReassignments: jest.fn(() => true) }

cluster.refreshMetadata = jest.fn()
cluster.findControllerBroker = jest
.fn()
.mockImplementationOnce(() => {
throw new KafkaJSProtocolError(createErrorFromCode(NOT_CONTROLLER))
})
.mockImplementationOnce(() => broker)

admin = createAdmin({ cluster, logger: newLogger() })
await expect(
admin.alterPartitionReassignments({
topics: [{ topic: topicName, partitionAssignment: [{ partition: 0, replicas: [2, 1] }] }],
})
).resolves.not.toThrow()

expect(cluster.refreshMetadata).toHaveBeenCalledTimes(2)
expect(cluster.findControllerBroker).toHaveBeenCalledTimes(2)
expect(broker.alterPartitionReassignments).toHaveBeenCalledTimes(1)
})
})
})
127 changes: 127 additions & 0 deletions src/admin/__tests__/listPartitionReassignments.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
const createAdmin = require('../index')
const { KafkaJSProtocolError } = require('../../errors')
const { createErrorFromCode } = require('../../protocol/error')

const { secureRandom, createCluster, newLogger } = require('testHelpers')

const NOT_CONTROLLER = 41

describe('Admin', () => {
let topicName, admin

beforeEach(() => {
topicName = `test-topic-${secureRandom()}`
})

afterEach(async () => {
admin && (await admin.disconnect())
})

describe('listPartitionReassignments', () => {
test('throws an error if the topics array is invalid', async () => {
admin = createAdmin({ cluster: createCluster(), logger: newLogger() })

await expect(
admin.alterPartitionReassignments({ topics: 'this-is-not-an-array' })
).rejects.toHaveProperty('message', 'Invalid topics array this-is-not-an-array')
})

test('throws an error if the topic name is not a valid string', async () => {
admin = createAdmin({ cluster: createCluster(), logger: newLogger() })
await expect(
admin.listPartitionReassignments({ topics: [{ topic: 123 }] })
).rejects.toHaveProperty(
'message',
'Invalid topics array, the topic names have to be a valid string'
)
})

test('throws an error if there are multiple entries for the same topic', async () => {
admin = createAdmin({ cluster: createCluster(), logger: newLogger() })
const topics = [{ topic: 'topic-123' }, { topic: 'topic-123' }]
await expect(admin.listPartitionReassignments({ topics })).rejects.toHaveProperty(
'message',
'Invalid topics array, it cannot have multiple entries for the same topic'
)
})

test('throws an error if the partition array is not a valid array', async () => {
admin = createAdmin({ cluster: createCluster(), logger: newLogger() })
await expect(
admin.listPartitionReassignments({ topics: [{ topic: 'topic-123', partitions: null }] })
).rejects.toHaveProperty('message', 'Invalid partition array: null for topic: topic-123')

await expect(
admin.listPartitionReassignments({
topics: [{ topic: 'topic-123', partitions: 'this-is-not-an-array' }],
})
).rejects.toHaveProperty(
'message',
'Invalid partition array: this-is-not-an-array for topic: topic-123'
)
})

test('throws an error if the partitions are not a valid number', async () => {
admin = createAdmin({ cluster: createCluster(), logger: newLogger() })
await expect(
admin.listPartitionReassignments({ topics: [{ topic: 'topic-123', partitions: [0, 'a'] }] })
).rejects.toHaveProperty(
'message',
'Invalid partition array: 0,a for topic: topic-123. The partition indices have to be a valid number greater than 0.'
)
})

test('list reassignments', async () => {
admin = createAdmin({ cluster: createCluster(), logger: newLogger() })

await admin.connect()
await expect(
admin.createTopics({
topics: [
{
topic: topicName,
replicaAssignment: [
{ partition: 0, replicas: [1, 0] },
{ partition: 1, replicas: [2, 1] },
],
},
],
})
).resolves.toEqual(true)

await admin.alterPartitionReassignments({
topics: [{ topic: topicName, partitionAssignment: [{ partition: 0, replicas: [2, 1] }] }],
})

await expect(
admin.listPartitionReassignments({
topics: [{ topic: topicName, partitions: [0, 1, 2] }],
})
).resolves.not.toThrow()
})

test('retries if the controller has moved', async () => {
const cluster = createCluster()
const broker = { listPartitionReassignments: jest.fn(() => true) }

cluster.refreshMetadata = jest.fn()
cluster.findControllerBroker = jest
.fn()
.mockImplementationOnce(() => {
throw new KafkaJSProtocolError(createErrorFromCode(NOT_CONTROLLER))
})
.mockImplementationOnce(() => broker)

admin = createAdmin({ cluster, logger: newLogger() })
await expect(
admin.listPartitionReassignments({
topics: [{ topic: topicName, partitions: [0, 1, 2] }],
})
).resolves.not.toThrow()

expect(cluster.refreshMetadata).toHaveBeenCalledTimes(2)
expect(cluster.findControllerBroker).toHaveBeenCalledTimes(2)
expect(broker.listPartitionReassignments).toHaveBeenCalledTimes(1)
})
})
})