Skip to content

Commit

Permalink
Merge pull request #1309 from tulios/topic-create-types
Browse files Browse the repository at this point in the history
Validate configEntries when creating topics
  • Loading branch information
Nevon committed Mar 9, 2022
2 parents 907479d + 4ec5030 commit a93151d
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 2 deletions.
27 changes: 27 additions & 0 deletions src/admin/__tests__/createTopics.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,33 @@ describe('Admin', () => {
)
})

test.each([
[
'are not an array',
'this-is-not-an-array',
'Invalid configEntries for topic "topic-123", must be an array',
],
[
'contain a non-object',
['this-is-not-an-object'],
'Invalid configEntries for topic "topic-123". Entry 0 must be an object',
],
[
'contain an entry with missing value property',
[{ name: 'missing-value' }],
'Invalid configEntries for topic "topic-123". Entry 0 must have a valid "value" property',
],
[
'contain an entry with missing name property',
[{ value: 'missing-name' }],
'Invalid configEntries for topic "topic-123". Entry 0 must have a valid "name" property',
],
])('throws an error if the config entries %s', async (_, configEntries, errorMessage) => {
admin = createAdmin({ cluster: createCluster(), logger: newLogger() })
const topics = [{ topic: 'topic-123', configEntries }]
await expect(admin.createTopics({ topics })).rejects.toHaveProperty('message', errorMessage)
})

test('create the new topics and return true', async () => {
admin = createAdmin({ cluster: createCluster(), logger: newLogger() })

Expand Down
31 changes: 31 additions & 0 deletions src/admin/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,37 @@ module.exports = ({
)
}

for (const { topic, configEntries } of topics) {
if (configEntries == null) {
continue
}

if (!Array.isArray(configEntries)) {
throw new KafkaJSNonRetriableError(
`Invalid configEntries for topic "${topic}", must be an array`
)
}

configEntries.forEach((entry, index) => {
if (typeof entry !== 'object' || entry == null) {
throw new KafkaJSNonRetriableError(
`Invalid configEntries for topic "${topic}". Entry ${index} must be an object`
)
}

for (const requiredProperty of ['name', 'value']) {
if (
!Object.prototype.hasOwnProperty.call(entry, requiredProperty) ||
typeof entry[requiredProperty] !== 'string'
) {
throw new KafkaJSNonRetriableError(
`Invalid configEntries for topic "${topic}". Entry ${index} must have a valid "${requiredProperty}" property`
)
}
}
})
}

const retrier = createRetry(retry)

return retrier(async (bail, retryCount, retryTime) => {
Expand Down
9 changes: 7 additions & 2 deletions types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ export interface ITopicConfig {
numPartitions?: number
replicationFactor?: number
replicaAssignment?: object[]
configEntries?: object[]
configEntries?: IResourceConfigEntry[]
}

export interface ITopicPartitionConfig {
Expand Down Expand Up @@ -308,10 +308,15 @@ export interface DescribeConfigResponse {
throttleTime: number
}

export interface IResourceConfigEntry {
name: string
value: string
}

export interface IResourceConfig {
type: ResourceTypes | ConfigResourceTypes
name: string
configEntries: { name: string; value: string }[]
configEntries: IResourceConfigEntry[]
}

type ValueOf<T> = T[keyof T]
Expand Down

0 comments on commit a93151d

Please sign in to comment.