Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ability to use custom authenticators #1372

Merged
merged 18 commits into from
Jul 7, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 24 additions & 0 deletions src/broker/__tests__/connect.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ const {
createConnectionPool,
connectionOpts,
saslSCRAM256ConnectionOpts,
sslConnectionOpts,
newLogger,
testIfKafkaAtLeast_1_1_0,
describeIfOauthbearerDisabled,
Expand Down Expand Up @@ -34,6 +35,29 @@ describe('Broker > connect', () => {
expect(broker.versions).toBeTruthy()
})

test("throws if the mechanism isn't supported by the server", async () => {
broker = new Broker({
connectionPool: createConnectionPool(
Object.assign(sslConnectionOpts(), {
port: 9094,
sasl: {
mechanism: 'fake-mechanism',
authenticationProvider: () => ({
authenticate: async () => {
throw new Error('🥸')
},
}),
},
})
),
logger: newLogger(),
})

await expect(broker.connect()).rejects.toThrow(
'The broker does not support the requested SASL mechanism'
)
})

for (const e of saslEntries) {
test(`authenticate with SASL ${e.name} if configured`, async () => {
broker = new Broker({
Expand Down
23 changes: 15 additions & 8 deletions src/broker/saslAuthenticator/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,6 @@ module.exports = class SASLAuthenticator {

async authenticate() {
const mechanism = this.connection.sasl.mechanism.toUpperCase()
if (!SUPPORTED_MECHANISMS.includes(mechanism)) {
throw new KafkaJSSASLAuthenticationError(
`SASL ${mechanism} mechanism is not supported by the client`
)
}

const handshake = await this.connection.send(this.saslHandshake({ mechanism }))
if (!handshake.enabledMechanisms.includes(mechanism)) {
throw new KafkaJSSASLAuthenticationError(
Expand Down Expand Up @@ -69,7 +63,20 @@ module.exports = class SASLAuthenticator {
return this.connection.sendAuthRequest({ request, response, authExpectResponse })
}

const Authenticator = AUTHENTICATORS[mechanism]
await new Authenticator(this.connection, this.logger, saslAuthenticate).authenticate()
if (SUPPORTED_MECHANISMS.includes(mechanism)) {
const Authenticator = AUTHENTICATORS[mechanism]
await new Authenticator(this.connection, this.logger, saslAuthenticate).authenticate()
} else {
await this.connection.sasl
.authenticationProvider(
{
host: this.connection.host,
port: this.connection.port,
},
this.logger.namespace(`SaslAuthenticator-${mechanism}`),
saslAuthenticate
)
.authenticate()
}
}
}
26 changes: 25 additions & 1 deletion types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,34 @@ export class Kafka {

export type BrokersFunction = () => string[] | Promise<string[]>

type SaslAuthenticationRequest = {
encode: () => Buffer | Promise<Buffer>
}
type SaslAuthenticationResponse<ParseResult> = {
decode: (rawResponse: Buffer) => Buffer | Promise<Buffer>
parse: (data: Buffer) => ParseResult
}

export type Authenticator = {
authenticate: () => Promise<void>
}

export type Mechanism = {
mechanism: string
authenticationProvider: (
connection: { host: string; port: number },
Nevon marked this conversation as resolved.
Show resolved Hide resolved
logger: Logger,
saslAuthenticate: <ParseResult>(
request: SaslAuthenticationRequest,
response?: SaslAuthenticationResponse<ParseResult>
) => Promise<ParseResult | void>
) => Authenticator
}

export interface KafkaConfig {
brokers: string[] | BrokersFunction
ssl?: tls.ConnectionOptions | boolean
sasl?: SASLOptions
sasl?: SASLOptions | Mechanism
clientId?: string
connectionTimeout?: number
authenticationTimeout?: number
Expand Down