Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make JavaCompatiblePartitioner new default #1339

Merged
merged 5 commits into from
May 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 14 additions & 10 deletions docs/Producing.md
Original file line number Diff line number Diff line change
Expand Up @@ -213,16 +213,20 @@ kafka.producer({ createPartitioner: MyPartitioner })

### Default Partitioners

KafkaJS ships with 2 partitioners: `DefaultPartitioner` and `JavaCompatiblePartitioner`.

The `JavaCompatiblePartitioner` should be compatible with the default partitioner that ships with the Java Kafka client. This can be important to meet the [co-partitioning requirement](https://docs.confluent.io/current/ksql/docs/developer-guide/partition-data.html#co-partitioning-requirements) when joining multiple topics.

Use the `JavaCompatiblePartitioner` by importing it and providing it to the Producer constructor:

```javascript
const { Partitioners } = require('kafkajs')
kafka.producer({ createPartitioner: Partitioners.JavaCompatiblePartitioner })
```
KafkaJS ships with 2 partitioners: `DefaultPartitioner` and `LegacyPartitioner`.

The `DefaultPartitioner` should be compatible with the default partitioner that ships with the Java Kafka client. This can be important to meet the [co-partitioning requirement](https://docs.confluent.io/current/ksql/docs/developer-guide/partition-data.html#co-partitioning-requirements) when joining multiple topics.

> 🚨 **Important** 🚨
>
> **The `LegacyPartitioner` was the default until v2.0.0. If you are upgrading from a version
older and want to retain the previous partitioning behavior, use the `LegacyPartitioner`
by importing it and providing it to the Producer constructor:**
>
> ```javascript
> const { Partitioners } = require('kafkajs')
> kafka.producer({ createPartitioner: Partitioners.LegacyPartitioner })
> ```

## <a name="retry"></a> Retry

Expand Down
16 changes: 16 additions & 0 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ const createConsumer = require('./consumer')
const createAdmin = require('./admin')
const ISOLATION_LEVEL = require('./protocol/isolationLevel')
const defaultSocketFactory = require('./network/socketFactory')
const once = require('./utils/once')
const websiteUrl = require('./utils/websiteUrl')

const PRIVATE = {
CREATE_CLUSTER: Symbol('private:Kafka:createCluster'),
Expand All @@ -20,6 +22,16 @@ const PRIVATE = {
}

const DEFAULT_METADATA_MAX_AGE = 300000
const warnOfDefaultPartitioner = once(logger => {
if (process.env.KAFKAJS_NO_PARTITIONER_WARNING == null) {
logger.warn(
`KafkaJS v2.0.0 switched default partitioner. To retain the same partitioning behavior as in previous versions, create the producer with the option "createPartitioner: Partitioners.LegacyPartitioner". See ${websiteUrl(
'docs/producing',
'default-partitioners'
)} for details. Silence this warning by setting the environment variable "KAFKAJS_NO_PARTITIONER_WARNING=1"`
)
}
})

module.exports = class Client {
/**
Expand Down Expand Up @@ -104,6 +116,10 @@ module.exports = class Client {
instrumentationEmitter,
})

if (createPartitioner == null) {
warnOfDefaultPartitioner(this[PRIVATE.LOGGER])
}

return createProducer({
retry: { ...this[PRIVATE.CLUSTER_RETRY], ...retry },
logger: this[PRIVATE.LOGGER],
Expand Down
2 changes: 1 addition & 1 deletion src/producer/partitioners/default/index.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const murmur2 = require('./murmur2')
const createDefaultPartitioner = require('./partitioner')
const createDefaultPartitioner = require('../legacy/partitioner')

module.exports = createDefaultPartitioner(murmur2)
38 changes: 20 additions & 18 deletions src/producer/partitioners/default/murmur2.js
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
/* eslint-disable */
const Long = require('../../../utils/long')

// Based on the kafka client 0.10.2 murmur2 implementation
// https://github.com/apache/kafka/blob/0.10.2/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L364

const SEED = 0x9747b28c
const SEED = Long.fromValue(0x9747b28c)

// 'm' and 'r' are mixing constants generated offline.
// They're not really 'magic', they just happen to work well.
const M = 0x5bd1e995
const R = 24
const M = Long.fromValue(0x5bd1e995)
const R = Long.fromValue(24)

module.exports = key => {
const data = Buffer.isBuffer(key) ? key : Buffer.from(String(key))
const length = data.length

// Initialize the hash to a random value
let h = SEED ^ length
let length4 = length / 4
let h = Long.fromValue(SEED.xor(length))
let length4 = Math.floor(length / 4)

for (let i = 0; i < length4; i++) {
const i4 = i * 4
Expand All @@ -25,27 +26,28 @@ module.exports = key => {
((data[i4 + 1] & 0xff) << 8) +
((data[i4 + 2] & 0xff) << 16) +
((data[i4 + 3] & 0xff) << 24)
k *= M
k ^= k >>> R
k *= M
h *= M
h ^= k
k = Long.fromValue(k)
k = k.multiply(M)
k = k.xor(k.toInt() >>> R)
k = Long.fromValue(k).multiply(M)
h = h.multiply(M)
h = h.xor(k)
}

// Handle the last few bytes of the input array
switch (length % 4) {
case 3:
h ^= (data[(length & ~3) + 2] & 0xff) << 16
h = h.xor((data[(length & ~3) + 2] & 0xff) << 16)
case 2:
h ^= (data[(length & ~3) + 1] & 0xff) << 8
h = h.xor((data[(length & ~3) + 1] & 0xff) << 8)
case 1:
h ^= data[length & ~3] & 0xff
h *= M
h = h.xor(data[length & ~3] & 0xff)
h = h.multiply(M)
}

h ^= h >>> 13
h *= M
h ^= h >>> 15
h = h.xor(h.toInt() >>> 13)
h = h.multiply(M)
h = h.xor(h.toInt() >>> 15)

return h
return h.toInt()
}
38 changes: 20 additions & 18 deletions src/producer/partitioners/default/murmur2.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,31 @@ describe('Producer > Partitioner > Default > murmur2', () => {
})

test('it handles numeric input', () => {
expect(murmur2(0)).toEqual(272173970)
expect(murmur2(0)).toEqual(971027396)
})

test('it handles buffer input', () => {
expect(murmur2(Buffer.from('1'))).toEqual(1311020360)
expect(murmur2(Buffer.from('1'))).toEqual(-1993445489)
})
})

// Generated with src/producer/partitioners/defaultJava/Test.java
const testData = {
'0': 272173970,
'1': 1311020360,
'128': 2053105854,
'2187': -2081355488,
'16384': 204404061,
'78125': -677491393,
'279936': -622460209,
'823543': 651276451,
'2097152': 944683677,
'4782969': -892695770,
'10000000': -1778616326,
'19487171': -518311627,
'35831808': 556972389,
'62748517': -233806557,
'105413504': -109398538,
'170859375': 102939717,
'0': 971027396,
'1': -1993445489,
'128': -326012175,
'2187': -1508407203,
'16384': -325739742,
'78125': -1654490814,
'279936': 1462227128,
'823543': -2014198330,
'2097152': 607668903,
'4782969': -1182699775,
'10000000': -1830336757,
'19487171': -1603849305,
'35831808': -857013643,
'62748517': -1167431028,
'105413504': -381294639,
'170859375': -1658323481,
'100:48069': 1009543857,
}
4 changes: 0 additions & 4 deletions src/producer/partitioners/defaultJava/index.js

This file was deleted.

38 changes: 0 additions & 38 deletions src/producer/partitioners/defaultJava/murmur2.spec.js

This file was deleted.

11 changes: 9 additions & 2 deletions src/producer/partitioners/index.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
const DefaultPartitioner = require('./default')
const JavaCompatiblePartitioner = require('./defaultJava')
const LegacyPartitioner = require('./legacy')

module.exports = {
DefaultPartitioner,
JavaCompatiblePartitioner,
LegacyPartitioner,
/**
* @deprecated Use DefaultPartitioner instead
*
* The JavaCompatiblePartitioner was renamed DefaultPartitioner
* and made to be the default in 2.0.0.
*/
JavaCompatiblePartitioner: DefaultPartitioner,
}
4 changes: 4 additions & 0 deletions src/producer/partitioners/legacy/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
const murmur2 = require('./murmur2')
const createLegacyPartitioner = require('./partitioner')

module.exports = createLegacyPartitioner(murmur2)
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
const createPartitioner = require('./index')

describe('Producer > Partitioner > Default', () => {
describe('Producer > Partitioner > Legacy', () => {
let topic, partitioner, partitionMetadata

beforeEach(() => {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
/* eslint-disable */
const Long = require('../../../utils/long')

// Based on the kafka client 0.10.2 murmur2 implementation
// https://github.com/apache/kafka/blob/0.10.2/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L364

const SEED = Long.fromValue(0x9747b28c)
const SEED = 0x9747b28c

// 'm' and 'r' are mixing constants generated offline.
// They're not really 'magic', they just happen to work well.
const M = Long.fromValue(0x5bd1e995)
const R = Long.fromValue(24)
const M = 0x5bd1e995
const R = 24

module.exports = key => {
const data = Buffer.isBuffer(key) ? key : Buffer.from(String(key))
const length = data.length

// Initialize the hash to a random value
let h = Long.fromValue(SEED.xor(length))
let length4 = Math.floor(length / 4)
let h = SEED ^ length
let length4 = length / 4

for (let i = 0; i < length4; i++) {
const i4 = i * 4
Expand All @@ -26,28 +25,27 @@ module.exports = key => {
((data[i4 + 1] & 0xff) << 8) +
((data[i4 + 2] & 0xff) << 16) +
((data[i4 + 3] & 0xff) << 24)
k = Long.fromValue(k)
k = k.multiply(M)
k = k.xor(k.toInt() >>> R)
k = Long.fromValue(k).multiply(M)
h = h.multiply(M)
h = h.xor(k)
k *= M
k ^= k >>> R
k *= M
h *= M
h ^= k
}

// Handle the last few bytes of the input array
switch (length % 4) {
case 3:
h = h.xor((data[(length & ~3) + 2] & 0xff) << 16)
h ^= (data[(length & ~3) + 2] & 0xff) << 16
case 2:
h = h.xor((data[(length & ~3) + 1] & 0xff) << 8)
h ^= (data[(length & ~3) + 1] & 0xff) << 8
case 1:
h = h.xor(data[length & ~3] & 0xff)
h = h.multiply(M)
h ^= data[length & ~3] & 0xff
h *= M
}

h = h.xor(h.toInt() >>> 13)
h = h.multiply(M)
h = h.xor(h.toInt() >>> 15)
h ^= h >>> 13
h *= M
h ^= h >>> 15

return h.toInt()
return h
}
36 changes: 36 additions & 0 deletions src/producer/partitioners/legacy/murmur2.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
const murmur2 = require('./murmur2')

describe('Producer > Partitioner > Default > murmur2', () => {
test('it works', () => {
Object.keys(testData).forEach(key => {
expect(murmur2(key)).toEqual(testData[key])
})
})

test('it handles numeric input', () => {
expect(murmur2(0)).toEqual(272173970)
})

test('it handles buffer input', () => {
expect(murmur2(Buffer.from('1'))).toEqual(1311020360)
})
})

const testData = {
'0': 272173970,
'1': 1311020360,
'128': 2053105854,
'2187': -2081355488,
'16384': 204404061,
'78125': -677491393,
'279936': -622460209,
'823543': 651276451,
'2097152': 944683677,
'4782969': -892695770,
'10000000': -1778616326,
'19487171': -518311627,
'35831808': 556972389,
'62748517': -233806557,
'105413504': -109398538,
'170859375': 102939717,
}
10 changes: 10 additions & 0 deletions src/utils/once.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
module.exports = fn => {
let called = false

return (...args) => {
if (!called) {
called = true
return fn(...args)
}
}
}