Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ability to use custom authenticators #1372

Merged
merged 18 commits into from
Jul 7, 2022
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 17 additions & 0 deletions docs/Configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,23 @@ A complete breakdown can be found in the IAM User Guide's
It is **highly recommended** that you use SSL for encryption when using `PLAIN` or `AWS`,
otherwise credentials will be transmitted in cleartext!

### Custom Authentication Mechanisms

If an authentication mechanism is not supported out of the box in KafkaJS, a custom authentication
mechanism can be introduced as a plugin:

```js
{
sasl: {
mechanism: <mechanism name>,
authenticationProvider: (host, port, logger, saslAuthenticate) => { authenticate: () => Promise<void> }
Nevon marked this conversation as resolved.
Show resolved Hide resolved
}
}
```

See [Custom Authentication Mechanisms](CustomAuthenticationMechanism.md) for more information on how to implement your own
authentication mechanism.

## Connection Timeout

Time in milliseconds to wait for a successful connection. The default value is: `1000`.
Expand Down
151 changes: 151 additions & 0 deletions docs/CustomAuthenticationMechanism.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
---
id: custom-authentication-mechanism
title: Custom Authentication Mechanisms
---

To use an authentication mechanism that is not supported out of the box by KafkaJS,
custom authentication mechanisms can be introduced:

```js
{
sasl: {
mechanism: <mechanism name>,
authenticationProvider: (host, port, logger, saslAuthenticate) => { authenticate: () => Promise<void> }
Nevon marked this conversation as resolved.
Show resolved Hide resolved
}
}
```

`<mechanism name>` needs to match the SASL mechanism configured in the `sasl.enabled.mechanisms`
property in `server.properties`. See the Kafka documentation for information on how to
configure your brokers.

## Writing a custom authentication mechanism

A custom authentication mechanism needs to fulfill the following interface:

```ts
type AuthenticationProvider<ParseResult> = (
Nevon marked this conversation as resolved.
Show resolved Hide resolved
host: string,
port: number,
logger: Logger,
saslAuthenticate: (request: SaslAuthenticationRequest, response?: SaslAuthenticationResponse<ParseResult>) => Promise<ParseResult | void>
) => Authenticator

type Authenticator = {
authenticate(): Promise<void>
}
type SaslAuthenticationRequest = {
encode: () => Buffer | Promise<Buffer>
}
type SaslAuthenticationResponse<ParseResult> = {
decode: (rawResponse: Buffer) => Buffer | Promise<Buffer>
parse: (data: Buffer) => ParseResult
}
```
* `host` - Hostname of the specific broker to connect to
* `port` - Port of the specific broker to connect to
* `logger` - A logger instance namespaced to the authentication mechanism
* `saslAuthenticate` - an async function to make [`SaslAuthenticate`](https://kafka.apache.org/protocol.html#The_Messages_SaslAuthenticate)
requests towards the broker. The `request` and `response` functions are used to encode the `auth_bytes` of the request, and to optionally
decode and parse the `auth_bytes` in the response. `response` can be omitted if no response `auth_bytes` are expected.
### Example
In this example we will create a custom authentication mechanism called `simon`. The general
flow will be:
1. Send a [`SaslAuthenticate`](https://kafka.apache.org/protocol.html#The_Messages_SaslAuthenticate)
request with the value of `says` as `auth_bytes`.
2. Read the response from the broker. If `says` starts with "Simon says", the response `auth_bytes`
should equal `says`, if it does not start with "Simon says", it should be an empty string.
```js
Nevon marked this conversation as resolved.
Show resolved Hide resolved
const simonAuthenticator = says = (host, port, logger, saslAuthenticate) => {
const INT32_SIZE = 4

const request = {
/**
* Encodes the value for `auth_bytes` in SaslAuthenticate request
* @see https://kafka.apache.org/protocol.html#The_Messages_SaslAuthenticate
*
* In this example, we are just sending `says` as a string,
* with the length of the string in bytes prepended as an int32
**/
encode: () => {
const byteLength = Buffer.byteLength(says, 'utf8')
const buf = Buffer.alloc(INT32_SIZE + byteLength)
buf.writeUInt32BE(byteLength, 0)
buf.write(says, INT32_SIZE, byteLength, 'utf8')
return buf
},
}
const response = {
/**
* Decodes the `auth_bytes` in SaslAuthenticate response
* @see https://kafka.apache.org/protocol.html#The_Messages_SaslAuthenticate
*
* This is essentially the reverse of `request.encode`, where
* we read the length of the string as an int32 and then read
* that many bytes
*/
decode: rawData => {
const byteLength = rawData.readInt32BE(0)
return rawData.slice(INT32_SIZE, INT32_SIZE + byteLength)
},
/**
* The return value from `response.decode` is passed into
* this function, which is responsible for interpreting
* the data. In this case, we just turn the buffer back
* into a string
*/
parse: data => {
return data.toString()
},
}
return {
/**
* This function is responsible for orchestrating the authentication flow.
* Essentially we will send a SaslAuthenticate request with the
* value of `sasl.says` to the broker, and expect to
* get the same value back.
*
* Other authentication methods may do any other operations they
* like, but communication with the brokers goes through
* the SaslAuthenticate request.
*/
authenticate: async () => {
if (says == null) {
throw new Error('SASL Simon: Invalid "says"')
}
const broker = `${host}:${port}`
try {
logger.info('Authenticate with SASL Simon', { broker })
const authenticateResponse = await saslAuthenticate({ request, response })

const saidSimon = says.startsWith("Simon says ")
const expectedResponse = saidSimon ? says : ""
if (authenticateResponse !== expectedResponse) {
throw new Error("Mismatching response from broker")
}
logger.info('SASL Simon authentication successful', { broker })
} catch (e) {
const error = new Error(
`SASL Simon authentication failed: ${e.message}`
)
logger.error(error.message, { broker })
throw error
}
},
}
}
```

The `response` argument to `saslAuthenticate` is optional, in case the authentication
method does not require the `auth_bytes` in the response.

In the example above, we expect the client to be configured as such:

```js
const config = {
sasl: {
mechanism: 'simon'
authenticationProvider: simonAuthenticator('Simon says authenticate me')
}
}
```
24 changes: 24 additions & 0 deletions src/broker/__tests__/connect.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ const {
createConnectionPool,
connectionOpts,
saslSCRAM256ConnectionOpts,
sslConnectionOpts,
newLogger,
testIfKafkaAtLeast_1_1_0,
describeIfOauthbearerDisabled,
Expand Down Expand Up @@ -34,6 +35,29 @@ describe('Broker > connect', () => {
expect(broker.versions).toBeTruthy()
})

test("throws if the mechanism isn't supported by the server", async () => {
broker = new Broker({
connectionPool: createConnectionPool(
Object.assign(sslConnectionOpts(), {
port: 9094,
sasl: {
mechanism: 'fake-mechanism',
authenticationProvider: () => ({
authenticate: async () => {
throw new Error('🥸')
},
}),
},
})
),
logger: newLogger(),
})

await expect(broker.connect()).rejects.toThrow(
'The broker does not support the requested SASL mechanism'
)
})

for (const e of saslEntries) {
test(`authenticate with SASL ${e.name} if configured`, async () => {
broker = new Broker({
Expand Down
68 changes: 31 additions & 37 deletions src/broker/saslAuthenticator/awsIam.js
Original file line number Diff line number Diff line change
@@ -1,43 +1,37 @@
const awsIam = require('../../protocol/sasl/awsIam')
const { request, response } = require('../../protocol/sasl/awsIam')
const { KafkaJSSASLAuthenticationError } = require('../../errors')

module.exports = class AWSIAMAuthenticator {
constructor(connection, logger, saslAuthenticate) {
this.connection = connection
this.logger = logger.namespace('SASLAWSIAMAuthenticator')
this.saslAuthenticate = saslAuthenticate
}

async authenticate() {
const { sasl } = this.connection
if (!sasl.authorizationIdentity) {
throw new KafkaJSSASLAuthenticationError('SASL AWS-IAM: Missing authorizationIdentity')
}
if (!sasl.accessKeyId) {
throw new KafkaJSSASLAuthenticationError('SASL AWS-IAM: Missing accessKeyId')
}
if (!sasl.secretAccessKey) {
throw new KafkaJSSASLAuthenticationError('SASL AWS-IAM: Missing secretAccessKey')
}
if (!sasl.sessionToken) {
sasl.sessionToken = ''
}
const awsIAMAuthenticatorProvider = sasl => (host, port, logger, saslAuthenticate) => {
return {
authenticate: async () => {
if (!sasl.authorizationIdentity) {
throw new KafkaJSSASLAuthenticationError('SASL AWS-IAM: Missing authorizationIdentity')
}
if (!sasl.accessKeyId) {
throw new KafkaJSSASLAuthenticationError('SASL AWS-IAM: Missing accessKeyId')
}
if (!sasl.secretAccessKey) {
throw new KafkaJSSASLAuthenticationError('SASL AWS-IAM: Missing secretAccessKey')
}
if (!sasl.sessionToken) {
sasl.sessionToken = ''
}

const request = awsIam.request(sasl)
const response = awsIam.response
const { host, port } = this.connection
const broker = `${host}:${port}`
const broker = `${host}:${port}`

try {
this.logger.debug('Authenticate with SASL AWS-IAM', { broker })
await this.saslAuthenticate({ request, response })
this.logger.debug('SASL AWS-IAM authentication successful', { broker })
} catch (e) {
const error = new KafkaJSSASLAuthenticationError(
`SASL AWS-IAM authentication failed: ${e.message}`
)
this.logger.error(error.message, { broker })
throw error
}
try {
logger.debug('Authenticate with SASL AWS-IAM', { broker })
await saslAuthenticate({ request: request(sasl), response })
logger.debug('SASL AWS-IAM authentication successful', { broker })
} catch (e) {
const error = new KafkaJSSASLAuthenticationError(
`SASL AWS-IAM authentication failed: ${e.message}`
)
logger.error(error.message, { broker })
throw error
}
},
}
}

module.exports = awsIAMAuthenticatorProvider
25 changes: 10 additions & 15 deletions src/broker/saslAuthenticator/awsIam.spec.js
Original file line number Diff line number Diff line change
@@ -1,32 +1,27 @@
const { newLogger } = require('testHelpers')
const AWSIAM = require('./awsIam')
const awsIAMAuthenticatorProvider = require('./awsIam')

describe('Broker > SASL Authenticator > AWS-IAM', () => {
it('throws KafkaJSSASLAuthenticationError for missing authorizationIdentity', async () => {
const awsIam = new AWSIAM({ sasl: {} }, newLogger())
const awsIam = awsIAMAuthenticatorProvider({})('', 0, newLogger())
await expect(awsIam.authenticate()).rejects.toThrow(
'SASL AWS-IAM: Missing authorizationIdentity'
)
})

it('throws KafkaJSSASLAuthenticationError for invalid accessKeyId', async () => {
const awsIam = new AWSIAM(
{
sasl: {
authorizationIdentity: '<authorizationIdentity>',
secretAccessKey: '<secretAccessKey>',
},
},
newLogger()
)
const awsIam = awsIAMAuthenticatorProvider({
authorizationIdentity: '<authorizationIdentity>',
secretAccessKey: '<secretAccessKey>',
})('', 0, newLogger())
await expect(awsIam.authenticate()).rejects.toThrow('SASL AWS-IAM: Missing accessKeyId')
})

it('throws KafkaJSSASLAuthenticationError for invalid secretAccessKey', async () => {
const awsIam = new AWSIAM(
{ sasl: { authorizationIdentity: '<authorizationIdentity>', accessKeyId: '<accessKeyId>' } },
newLogger()
)
const awsIam = awsIAMAuthenticatorProvider({
authorizationIdentity: '<authorizationIdentity>',
accessKeyId: '<accessKeyId>',
})('', 0, newLogger())
await expect(awsIam.authenticate()).rejects.toThrow('SASL AWS-IAM: Missing secretAccessKey')
})
})