Skip to content

Commit

Permalink
Merge pull request #1339 from tulios/java-compatible-default-partitioner
Browse files Browse the repository at this point in the history
Make JavaCompatiblePartitioner new default
  • Loading branch information
Nevon authored May 5, 2022
2 parents c834b54 + fdde0ab commit 3323145
Show file tree
Hide file tree
Showing 21 changed files with 185 additions and 122 deletions.
24 changes: 14 additions & 10 deletions docs/Producing.md
Original file line number Diff line number Diff line change
Expand Up @@ -213,16 +213,20 @@ kafka.producer({ createPartitioner: MyPartitioner })

### Default Partitioners

KafkaJS ships with 2 partitioners: `DefaultPartitioner` and `JavaCompatiblePartitioner`.

The `JavaCompatiblePartitioner` should be compatible with the default partitioner that ships with the Java Kafka client. This can be important to meet the [co-partitioning requirement](https://docs.confluent.io/current/ksql/docs/developer-guide/partition-data.html#co-partitioning-requirements) when joining multiple topics.

Use the `JavaCompatiblePartitioner` by importing it and providing it to the Producer constructor:

```javascript
const { Partitioners } = require('kafkajs')
kafka.producer({ createPartitioner: Partitioners.JavaCompatiblePartitioner })
```
KafkaJS ships with 2 partitioners: `DefaultPartitioner` and `LegacyPartitioner`.

The `DefaultPartitioner` should be compatible with the default partitioner that ships with the Java Kafka client. This can be important to meet the [co-partitioning requirement](https://docs.confluent.io/current/ksql/docs/developer-guide/partition-data.html#co-partitioning-requirements) when joining multiple topics.

> 🚨 **Important** 🚨
>
> **The `LegacyPartitioner` was the default until v2.0.0. If you are upgrading from a version
older and want to retain the previous partitioning behavior, use the `LegacyPartitioner`
by importing it and providing it to the Producer constructor:**
>
> ```javascript
> const { Partitioners } = require('kafkajs')
> kafka.producer({ createPartitioner: Partitioners.LegacyPartitioner })
> ```
## <a name="retry"></a> Retry
Expand Down
16 changes: 16 additions & 0 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ const createConsumer = require('./consumer')
const createAdmin = require('./admin')
const ISOLATION_LEVEL = require('./protocol/isolationLevel')
const defaultSocketFactory = require('./network/socketFactory')
const once = require('./utils/once')
const websiteUrl = require('./utils/websiteUrl')

const PRIVATE = {
CREATE_CLUSTER: Symbol('private:Kafka:createCluster'),
Expand All @@ -20,6 +22,16 @@ const PRIVATE = {
}

const DEFAULT_METADATA_MAX_AGE = 300000
const warnOfDefaultPartitioner = once(logger => {
if (process.env.KAFKAJS_NO_PARTITIONER_WARNING == null) {
logger.warn(
`KafkaJS v2.0.0 switched default partitioner. To retain the same partitioning behavior as in previous versions, create the producer with the option "createPartitioner: Partitioners.LegacyPartitioner". See ${websiteUrl(
'docs/producing',
'default-partitioners'
)} for details. Silence this warning by setting the environment variable "KAFKAJS_NO_PARTITIONER_WARNING=1"`
)
}
})

module.exports = class Client {
/**
Expand Down Expand Up @@ -104,6 +116,10 @@ module.exports = class Client {
instrumentationEmitter,
})

if (createPartitioner == null) {
warnOfDefaultPartitioner(this[PRIVATE.LOGGER])
}

return createProducer({
retry: { ...this[PRIVATE.CLUSTER_RETRY], ...retry },
logger: this[PRIVATE.LOGGER],
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion src/producer/partitioners/default/index.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const murmur2 = require('./murmur2')
const createDefaultPartitioner = require('./partitioner')
const createDefaultPartitioner = require('../legacy/partitioner')

module.exports = createDefaultPartitioner(murmur2)
38 changes: 20 additions & 18 deletions src/producer/partitioners/default/murmur2.js
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
/* eslint-disable */
const Long = require('../../../utils/long')

// Based on the kafka client 0.10.2 murmur2 implementation
// https://github.com/apache/kafka/blob/0.10.2/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L364

const SEED = 0x9747b28c
const SEED = Long.fromValue(0x9747b28c)

// 'm' and 'r' are mixing constants generated offline.
// They're not really 'magic', they just happen to work well.
const M = 0x5bd1e995
const R = 24
const M = Long.fromValue(0x5bd1e995)
const R = Long.fromValue(24)

module.exports = key => {
const data = Buffer.isBuffer(key) ? key : Buffer.from(String(key))
const length = data.length

// Initialize the hash to a random value
let h = SEED ^ length
let length4 = length / 4
let h = Long.fromValue(SEED.xor(length))
let length4 = Math.floor(length / 4)

for (let i = 0; i < length4; i++) {
const i4 = i * 4
Expand All @@ -25,27 +26,28 @@ module.exports = key => {
((data[i4 + 1] & 0xff) << 8) +
((data[i4 + 2] & 0xff) << 16) +
((data[i4 + 3] & 0xff) << 24)
k *= M
k ^= k >>> R
k *= M
h *= M
h ^= k
k = Long.fromValue(k)
k = k.multiply(M)
k = k.xor(k.toInt() >>> R)
k = Long.fromValue(k).multiply(M)
h = h.multiply(M)
h = h.xor(k)
}

// Handle the last few bytes of the input array
switch (length % 4) {
case 3:
h ^= (data[(length & ~3) + 2] & 0xff) << 16
h = h.xor((data[(length & ~3) + 2] & 0xff) << 16)
case 2:
h ^= (data[(length & ~3) + 1] & 0xff) << 8
h = h.xor((data[(length & ~3) + 1] & 0xff) << 8)
case 1:
h ^= data[length & ~3] & 0xff
h *= M
h = h.xor(data[length & ~3] & 0xff)
h = h.multiply(M)
}

h ^= h >>> 13
h *= M
h ^= h >>> 15
h = h.xor(h.toInt() >>> 13)
h = h.multiply(M)
h = h.xor(h.toInt() >>> 15)

return h
return h.toInt()
}
38 changes: 20 additions & 18 deletions src/producer/partitioners/default/murmur2.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,31 @@ describe('Producer > Partitioner > Default > murmur2', () => {
})

test('it handles numeric input', () => {
expect(murmur2(0)).toEqual(272173970)
expect(murmur2(0)).toEqual(971027396)
})

test('it handles buffer input', () => {
expect(murmur2(Buffer.from('1'))).toEqual(1311020360)
expect(murmur2(Buffer.from('1'))).toEqual(-1993445489)
})
})

// Generated with src/producer/partitioners/defaultJava/Test.java
const testData = {
'0': 272173970,
'1': 1311020360,
'128': 2053105854,
'2187': -2081355488,
'16384': 204404061,
'78125': -677491393,
'279936': -622460209,
'823543': 651276451,
'2097152': 944683677,
'4782969': -892695770,
'10000000': -1778616326,
'19487171': -518311627,
'35831808': 556972389,
'62748517': -233806557,
'105413504': -109398538,
'170859375': 102939717,
'0': 971027396,
'1': -1993445489,
'128': -326012175,
'2187': -1508407203,
'16384': -325739742,
'78125': -1654490814,
'279936': 1462227128,
'823543': -2014198330,
'2097152': 607668903,
'4782969': -1182699775,
'10000000': -1830336757,
'19487171': -1603849305,
'35831808': -857013643,
'62748517': -1167431028,
'105413504': -381294639,
'170859375': -1658323481,
'100:48069': 1009543857,
}
4 changes: 0 additions & 4 deletions src/producer/partitioners/defaultJava/index.js

This file was deleted.

38 changes: 0 additions & 38 deletions src/producer/partitioners/defaultJava/murmur2.spec.js

This file was deleted.

11 changes: 9 additions & 2 deletions src/producer/partitioners/index.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
const DefaultPartitioner = require('./default')
const JavaCompatiblePartitioner = require('./defaultJava')
const LegacyPartitioner = require('./legacy')

module.exports = {
DefaultPartitioner,
JavaCompatiblePartitioner,
LegacyPartitioner,
/**
* @deprecated Use DefaultPartitioner instead
*
* The JavaCompatiblePartitioner was renamed DefaultPartitioner
* and made to be the default in 2.0.0.
*/
JavaCompatiblePartitioner: DefaultPartitioner,
}
4 changes: 4 additions & 0 deletions src/producer/partitioners/legacy/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
const murmur2 = require('./murmur2')
const createLegacyPartitioner = require('./partitioner')

module.exports = createLegacyPartitioner(murmur2)
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
const createPartitioner = require('./index')

describe('Producer > Partitioner > Default', () => {
describe('Producer > Partitioner > Legacy', () => {
let topic, partitioner, partitionMetadata

beforeEach(() => {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
/* eslint-disable */
const Long = require('../../../utils/long')

// Based on the kafka client 0.10.2 murmur2 implementation
// https://github.com/apache/kafka/blob/0.10.2/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L364

const SEED = Long.fromValue(0x9747b28c)
const SEED = 0x9747b28c

// 'm' and 'r' are mixing constants generated offline.
// They're not really 'magic', they just happen to work well.
const M = Long.fromValue(0x5bd1e995)
const R = Long.fromValue(24)
const M = 0x5bd1e995
const R = 24

module.exports = key => {
const data = Buffer.isBuffer(key) ? key : Buffer.from(String(key))
const length = data.length

// Initialize the hash to a random value
let h = Long.fromValue(SEED.xor(length))
let length4 = Math.floor(length / 4)
let h = SEED ^ length
let length4 = length / 4

for (let i = 0; i < length4; i++) {
const i4 = i * 4
Expand All @@ -26,28 +25,27 @@ module.exports = key => {
((data[i4 + 1] & 0xff) << 8) +
((data[i4 + 2] & 0xff) << 16) +
((data[i4 + 3] & 0xff) << 24)
k = Long.fromValue(k)
k = k.multiply(M)
k = k.xor(k.toInt() >>> R)
k = Long.fromValue(k).multiply(M)
h = h.multiply(M)
h = h.xor(k)
k *= M
k ^= k >>> R
k *= M
h *= M
h ^= k
}

// Handle the last few bytes of the input array
switch (length % 4) {
case 3:
h = h.xor((data[(length & ~3) + 2] & 0xff) << 16)
h ^= (data[(length & ~3) + 2] & 0xff) << 16
case 2:
h = h.xor((data[(length & ~3) + 1] & 0xff) << 8)
h ^= (data[(length & ~3) + 1] & 0xff) << 8
case 1:
h = h.xor(data[length & ~3] & 0xff)
h = h.multiply(M)
h ^= data[length & ~3] & 0xff
h *= M
}

h = h.xor(h.toInt() >>> 13)
h = h.multiply(M)
h = h.xor(h.toInt() >>> 15)
h ^= h >>> 13
h *= M
h ^= h >>> 15

return h.toInt()
return h
}
36 changes: 36 additions & 0 deletions src/producer/partitioners/legacy/murmur2.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
const murmur2 = require('./murmur2')

describe('Producer > Partitioner > Default > murmur2', () => {
test('it works', () => {
Object.keys(testData).forEach(key => {
expect(murmur2(key)).toEqual(testData[key])
})
})

test('it handles numeric input', () => {
expect(murmur2(0)).toEqual(272173970)
})

test('it handles buffer input', () => {
expect(murmur2(Buffer.from('1'))).toEqual(1311020360)
})
})

const testData = {
'0': 272173970,
'1': 1311020360,
'128': 2053105854,
'2187': -2081355488,
'16384': 204404061,
'78125': -677491393,
'279936': -622460209,
'823543': 651276451,
'2097152': 944683677,
'4782969': -892695770,
'10000000': -1778616326,
'19487171': -518311627,
'35831808': 556972389,
'62748517': -233806557,
'105413504': -109398538,
'170859375': 102939717,
}
10 changes: 10 additions & 0 deletions src/utils/once.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
module.exports = fn => {
let called = false

return (...args) => {
if (!called) {
called = true
return fn(...args)
}
}
}
Loading

0 comments on commit 3323145

Please sign in to comment.