Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#683 Improve concurrency #1258

Merged
merged 82 commits into from
Mar 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
82 commits
Select commit Hold shift + click to select a range
3b74924
refactor: remove currency concurrency and locking. clear restart time…
priitkaard Nov 23, 2021
3141922
refactor: start runner via runnerPool. remove partitionsConsumedConcu…
priitkaard Nov 23, 2021
4a3a00d
feat: experiment with concurrencyManager distribution algorithm
priitkaard Nov 24, 2021
4c4055e
feat: separate worker for concurrent fetching per broker
priitkaard Dec 6, 2021
b019bc3
refactor: fetch worker for each broker
priitkaard Dec 7, 2021
7b3a219
refactor: reduce the amount of changes done to tests. Rename fetch to…
priitkaard Dec 7, 2021
61c0859
refactor: rename scheduleFetch to scheduleConsume
priitkaard Dec 7, 2021
544722c
Merge remote-tracking branch 'origin/master' into concurrency
priitkaard Dec 7, 2021
6332d8d
fix: remove interval hack
priitkaard Dec 7, 2021
efdbaf6
fix: mark offset out of range error retriable. merge fetch error hand…
priitkaard Dec 7, 2021
11030e1
refactor: improve concurrent crash handling
priitkaard Dec 7, 2021
64ddb41
refactor: combine fetch queue. rename pool to fetch manager. revert o…
priitkaard Dec 8, 2021
e39603a
refactor: restore more error handling
priitkaard Dec 8, 2021
8907437
test: uncomment some failing tests
priitkaard Dec 8, 2021
ecbd862
test: fix runner tests
priitkaard Dec 8, 2021
88d4b40
fix: safely leave consumer group on restart
priitkaard Dec 8, 2021
7a1ca90
fix: avoid restart after disconnected by afterEach hooks
priitkaard Dec 8, 2021
1e31962
refactor: cleanup onCrash
priitkaard Dec 8, 2021
ef5a135
test: fix rebalance without retries
priitkaard Dec 8, 2021
ec11fc8
feat: distribute partitions across concurrent runners
priitkaard Dec 8, 2021
731fbbc
refactor: change seekOffset data structure for easier separation bet…
priitkaard Dec 16, 2021
f0c9c96
refactor: fetch per node + concurrent runner.
priitkaard Dec 16, 2021
45f81bf
refactor: cleanup hardcoded sleeps in runner tests
priitkaard Dec 16, 2021
def6c95
refactor: cleanup active/node partition filtering
priitkaard Dec 16, 2021
b277ba6
refactor: cleanup
priitkaard Dec 16, 2021
07450f7
Merge remote-tracking branch 'origin/master' into concurrency
priitkaard Dec 16, 2021
6872058
refactor: minor changes before refactoring to single requests per bro…
priitkaard Dec 17, 2021
b783a62
feat: socket for polls
priitkaard Dec 17, 2021
1c87136
1.15.1
priitkaard Dec 17, 2021
5031975
#683 refactor: final renaming before PR
priitkaard Dec 20, 2021
bf4c827
#683 test: fetch manager
priitkaard Dec 20, 2021
96bf844
test: fix mock reset
priitkaard Dec 20, 2021
d2128cc
Merge pull request #1 from priitkaard/concurrency
priitkaard Dec 20, 2021
cc9559c
Merge branch 'tulios:master' into master
priitkaard Dec 20, 2021
2c37d39
fix: flatMap implementation
priitkaard Dec 20, 2021
eacb34a
fix: improve error handling during concurrent rebalance
priitkaard Dec 20, 2021
ef75150
1.16.1
priitkaard Dec 20, 2021
e622ed7
refactor: clear onCrash timeout on stop()
priitkaard Dec 22, 2021
69fef0e
refactor: connection pool
priitkaard Dec 28, 2021
0619844
test: spy a single connection
priitkaard Dec 28, 2021
9bc80d3
fix: move default connectionTimeout to upper scope
priitkaard Dec 31, 2021
3aeb591
Merge remote-tracking branch 'tulios/master'
priitkaard Jan 13, 2022
4dbd48a
Merge remote-tracking branch 'tulios/master'
priitkaard Feb 9, 2022
69077ec
Revert "Merge remote-tracking branch 'tulios/master'"
priitkaard Feb 9, 2022
eecee03
revert last two commits
priitkaard Feb 9, 2022
1436605
Merge branch 'master' of github.com:priitkaard/kafkajs
priitkaard Feb 9, 2022
3b44fd2
merge tulios/master
priitkaard Feb 10, 2022
e9c39f6
commit stashed
priitkaard Feb 10, 2022
702e086
Merge remote-tracking branch 'tulios/master'
priitkaard Feb 10, 2022
f431356
fix: tests
priitkaard Feb 10, 2022
3580f9d
revert: seek offsets data structure
priitkaard Feb 14, 2022
7342110
fix: don't export error helper functions
priitkaard Feb 14, 2022
4a195d7
fix: tests and some types
priitkaard Feb 14, 2022
49cc00c
fix: use getConnection to get connection from pool
priitkaard Feb 14, 2022
d6f32b8
fix: update fetch event payloads
priitkaard Feb 14, 2022
0c7fa43
refactor: split runner and runnerPool tests
priitkaard Feb 14, 2022
eef1431
refactor: reduce coupling from runner to consumerGroup
priitkaard Feb 15, 2022
ec8e4d8
refactor: assign fetch manager partitions on creating
priitkaard Feb 15, 2022
10c0e70
refactor: make consumerGroup.fetch private
priitkaard Feb 15, 2022
31176a5
rename: message to fetchResult
priitkaard Feb 15, 2022
02a8e51
feat: worker, workerQueue, fetcher, fetchManager
priitkaard Feb 15, 2022
f328d45
* refactor: reduce coupling between consumerGroup and concurrent work…
priitkaard Feb 16, 2022
b392db9
feat: handle new/inactive nodes from refreshing cluster metadata
priitkaard Feb 17, 2022
26410ed
refactor: ConnectionPool
priitkaard Feb 17, 2022
e3af849
fix: flat() to flatten()
priitkaard Feb 17, 2022
c13c147
minor cleanup
priitkaard Feb 18, 2022
a40e72b
fix: remove isEmptyDueToFiltering from public Batch interface
priitkaard Feb 21, 2022
af1cb7a
refactor: concurrency logic
priitkaard Feb 21, 2022
1c6f37a
fix: disconnecting status as connected
priitkaard Feb 21, 2022
c79858f
chore: skip workerQueue.push if !isRunning
priitkaard Feb 22, 2022
2542663
fix: test
priitkaard Feb 22, 2022
2b4e62b
feat: commit offsets before rejoining the consumer group on rebalance
priitkaard Feb 23, 2022
e3c1c0c
fix: handle unknown_member_id from offsetCommit
priitkaard Mar 1, 2022
65a82c8
Improve performance of assignment diff
Nevon Mar 7, 2022
1c4b02e
Ensure only one worker can operate on a topic-partition at any given …
Nevon Mar 7, 2022
8a4716f
Apply suggestions from code review
Nevon Mar 8, 2022
f64263f
Handle partition reassignment in fetcher
Nevon Mar 8, 2022
a10806f
Revert 1c4b02e1
Nevon Mar 8, 2022
d0e790d
Merge branch 'handle-partition-reassignment' of github.com:tulios/kaf…
Nevon Mar 8, 2022
6ea295c
Merge pull request #2 from tulios/handle-partition-reassignment
priitkaard Mar 8, 2022
5c0191c
Merge branch 'master' into master
Nevon Mar 9, 2022
c359518
Merge branch 'master' into master
Nevon Mar 15, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const AclResourceTypes = require('./src/protocol/aclResourceTypes')
const AclOperationTypes = require('./src/protocol/aclOperationTypes')
const AclPermissionTypes = require('./src/protocol/aclPermissionTypes')
const ResourcePatternTypes = require('./src/protocol/resourcePatternTypes')
const Errors = require('./src/errors')
const { isRebalancing, isKafkaJSError, ...errors } = require('./src/errors')
const { LEVELS } = require('./src/loggers')

module.exports = {
Expand All @@ -34,5 +34,5 @@ module.exports = {
AclPermissionTypes,
ResourcePatternTypes,
ConfigSource,
...Errors,
...errors,
}
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
"test:types": "tsc -p types/"
},
"devDependencies": {
"@types/jest": "^27.4.0",
"@types/node": "^12.0.8",
"@typescript-eslint/typescript-estree": "^1.10.2",
"eslint": "^6.8.0",
Expand Down
5 changes: 3 additions & 2 deletions src/admin/__tests__/fetchTopicOffsetsByTimestamp.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const {
} = require('testHelpers')

describe('Admin', () => {
let topicName, admin, producer, cluster
let topicName, admin, producer, cluster, consumer

beforeEach(async () => {
topicName = `test-topic-${secureRandom()}`
Expand All @@ -30,6 +30,7 @@ describe('Admin', () => {
})
afterEach(async () => {
admin && (await admin.disconnect())
consumer && (await consumer.disconnect())
producer && (await producer.disconnect())
})

Expand Down Expand Up @@ -72,7 +73,7 @@ describe('Admin', () => {
)
expect(offsetsFutureTimestamp).toEqual([{ partition: 0, offset: '20' }])
const groupId = `consumer-group-id-${secureRandom()}`
const consumer = createConsumer({
consumer = createConsumer({
cluster,
groupId,
maxWaitTimeInMs: 1,
Expand Down
16 changes: 11 additions & 5 deletions src/admin/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ const { createCluster, newLogger, secureRandom } = require('testHelpers')
const createRetry = require('../retry')

describe('Admin', () => {
let admin

afterEach(async () => {
admin && (await admin.disconnect())
})

it('gives access to its logger', () => {
expect(
createAdmin({
Expand All @@ -14,7 +20,7 @@ describe('Admin', () => {
})

it('emits connection events', async () => {
const admin = createAdmin({
admin = createAdmin({
cluster: createCluster(),
logger: newLogger(),
})
Expand All @@ -33,7 +39,7 @@ describe('Admin', () => {

test('emits the request event', async () => {
const emitter = new InstrumentationEventEmitter()
const admin = createAdmin({
admin = createAdmin({
cluster: createCluster({ instrumentationEmitter: emitter }),
logger: newLogger(),
instrumentationEmitter: emitter,
Expand Down Expand Up @@ -72,7 +78,7 @@ describe('Admin', () => {
instrumentationEmitter: emitter,
})

const admin = createAdmin({
admin = createAdmin({
cluster,
logger: newLogger(),
instrumentationEmitter: emitter,
Expand Down Expand Up @@ -121,7 +127,7 @@ describe('Admin', () => {
maxInFlightRequests: 1,
})

const admin = createAdmin({
admin = createAdmin({
cluster,
logger: newLogger(),
instrumentationEmitter: emitter,
Expand Down Expand Up @@ -155,7 +161,7 @@ describe('Admin', () => {
})

test('on throws an error when provided with an invalid event name', () => {
const admin = createAdmin({
admin = createAdmin({
cluster: createCluster(),
logger: newLogger(),
})
Expand Down
6 changes: 3 additions & 3 deletions src/broker/__tests__/addOffsetsToTxn.spec.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
const Broker = require('../index')
const COORDINATOR_TYPES = require('../../protocol/coordinatorTypes')
const { secureRandom, createConnection, newLogger, retryProtocol } = require('testHelpers')
const { secureRandom, createConnectionPool, newLogger, retryProtocol } = require('testHelpers')
const { KafkaJSProtocolError } = require('../../errors')

describe('Broker > AddOffsetsToTxn', () => {
Expand All @@ -11,7 +11,7 @@ describe('Broker > AddOffsetsToTxn', () => {
consumerGroupId = `group-id-${secureRandom()}`

seedBroker = new Broker({
connection: createConnection(),
connectionPool: createConnectionPool(),
logger: newLogger(),
})

Expand All @@ -29,7 +29,7 @@ describe('Broker > AddOffsetsToTxn', () => {
)

broker = new Broker({
connection: createConnection({ host, port }),
connectionPool: createConnectionPool({ host, port }),
logger: newLogger(),
})

Expand Down
6 changes: 3 additions & 3 deletions src/broker/__tests__/addPartitionsToTxn.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const { KafkaJSProtocolError } = require('../../errors')
const {
secureRandom,
createTopic,
createConnection,
createConnectionPool,
newLogger,
retryProtocol,
} = require('testHelpers')
Expand All @@ -17,7 +17,7 @@ describe('Broker > AddPartitionsToTxn', () => {
topicName = `test-topic-${secureRandom()}`

seedBroker = new Broker({
connection: createConnection(),
connectionPool: createConnectionPool(),
logger: newLogger(),
})

Expand All @@ -36,7 +36,7 @@ describe('Broker > AddPartitionsToTxn', () => {
)

broker = new Broker({
connection: createConnection({ host, port }),
connectionPool: createConnectionPool({ host, port }),
logger: newLogger(),
})

Expand Down
6 changes: 3 additions & 3 deletions src/broker/__tests__/alterConfigs.spec.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const { createConnection, connectionOpts, secureRandom, newLogger } = require('testHelpers')
const { createConnectionPool, connectionOpts, secureRandom, newLogger } = require('testHelpers')
const RESOURCE_TYPES = require('../../protocol/resourceTypes')
const Broker = require('../index')

Expand All @@ -13,7 +13,7 @@ describe('Broker > alterConfigs', () => {

beforeEach(async () => {
seedBroker = new Broker({
connection: createConnection(connectionOpts()),
connectionPool: createConnectionPool(connectionOpts()),
logger: newLogger(),
})
await seedBroker.connect()
Expand All @@ -22,7 +22,7 @@ describe('Broker > alterConfigs', () => {
const newBrokerData = metadata.brokers.find(b => b.nodeId === metadata.controllerId)

broker = new Broker({
connection: createConnection(newBrokerData),
connectionPool: createConnectionPool(newBrokerData),
logger: newLogger(),
})
})
Expand Down
4 changes: 2 additions & 2 deletions src/broker/__tests__/apiVersions.spec.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const { createConnection, connectionOpts, newLogger } = require('testHelpers')
const { createConnectionPool, connectionOpts, newLogger } = require('testHelpers')
const { KafkaJSProtocolError, KafkaJSNonRetriableError } = require('../../errors')
const { createErrorFromCode } = require('../../protocol/error')
const { requests } = require('../../protocol/requests')
Expand All @@ -11,7 +11,7 @@ describe('Broker > ApiVersions', () => {

beforeEach(async () => {
broker = new Broker({
connection: createConnection(connectionOpts()),
connectionPool: createConnectionPool(connectionOpts()),
logger: newLogger(),
})
await broker.connect()
Expand Down
81 changes: 30 additions & 51 deletions src/broker/__tests__/connect.spec.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
const {
createConnection,
createConnectionPool,
connectionOpts,
saslSCRAM256ConnectionOpts,
newLogger,
Expand All @@ -12,13 +12,11 @@ const Long = require('../../utils/long')
const Broker = require('../index')

describe('Broker > connect', () => {
let broker
let broker, connectionPool

beforeEach(() => {
broker = new Broker({
connection: createConnection(connectionOpts()),
logger: newLogger(),
})
connectionPool = createConnectionPool(connectionOpts())
broker = new Broker({ connectionPool, logger: newLogger() })
})

afterEach(async () => {
Expand All @@ -27,7 +25,7 @@ describe('Broker > connect', () => {

test('establish the connection', async () => {
await broker.connect()
expect(broker.connection.connected).toEqual(true)
expect(broker.connectionPool.isConnected()).toEqual(true)
})

test('load api versions if not provided', async () => {
Expand All @@ -39,7 +37,7 @@ describe('Broker > connect', () => {
for (const e of saslEntries) {
test(`authenticate with SASL ${e.name} if configured`, async () => {
broker = new Broker({
connection: createConnection(e.opts()),
connectionPool: createConnectionPool(e.opts()),
logger: newLogger(),
})
expect(broker.isConnected()).toEqual(false)
Expand All @@ -51,7 +49,7 @@ describe('Broker > connect', () => {
describeIfOauthbearerDisabled('when SASL SCRAM is configured', () => {
test('parallel calls to connect using SCRAM', async () => {
broker = new Broker({
connection: createConnection(saslSCRAM256ConnectionOpts()),
connectionPool: createConnectionPool(saslSCRAM256ConnectionOpts()),
logger: newLogger(),
})

Expand All @@ -69,32 +67,17 @@ describe('Broker > connect', () => {
})
})

test('sets the authenticatedAt timer', async () => {
const error = new Error('not connected')
const timer = process.hrtime()
broker.authenticatedAt = timer
broker.connection.connect = jest.fn(() => {
throw error
})

expect(broker.authenticatedAt).toEqual(timer)
await expect(broker.connect()).rejects.toEqual(error)
expect(broker.authenticatedAt).toBe(null)
})

describe('#isConnected', () => {
test('returns false when not connected', () => {
expect(broker.isConnected()).toEqual(false)
})

for (const e of saslEntries) {
test(`returns false when connected but not authenticated on connections with SASL ${e.name}`, async () => {
broker = new Broker({
connection: createConnection(e.opts()),
logger: newLogger(),
})
const connectionPool = createConnectionPool(e.opts())
broker = new Broker({ connectionPool, logger: newLogger() })
expect(broker.isConnected()).toEqual(false)
await broker.connection.connect()
await connectionPool.getConnection()
expect(broker.isConnected()).toEqual(false)
})
}
Expand All @@ -107,65 +90,61 @@ describe('Broker > connect', () => {
describe('when SaslAuthenticate protocol is available', () => {
for (const e of saslEntries) {
test(`returns true when connected and authenticated on connections with SASL ${e.name}`, async () => {
broker = new Broker({
connection: createConnection(e.opts()),
logger: newLogger(),
})
const connectionPool = createConnectionPool(e.opts())
broker = new Broker({ connectionPool, logger: newLogger() })
await broker.connect()
expect(broker.isConnected()).toEqual(true)
})

test('returns false when the session lifetime has expired', async () => {
const sessionLifetime = 15000
const reauthenticationThreshold = 10000
broker = new Broker({
connection: createConnection(e.opts()),
logger: newLogger(),
reauthenticationThreshold,
})
const connectionPool = createConnectionPool({ ...e.opts(), reauthenticationThreshold })
broker = new Broker({ connectionPool, logger: newLogger() })

await broker.connect()
expect(broker.isConnected()).toEqual(true)

broker.sessionLifetime = Long.fromValue(sessionLifetime)
const [seconds] = broker.authenticatedAt
broker.authenticatedAt = [seconds - sessionLifetime / 1000, 0]
const connection = await connectionPool.getConnection()

connection.sessionLifetime = Long.fromValue(sessionLifetime)
const [seconds] = connection.authenticatedAt
connection.authenticatedAt = [seconds - sessionLifetime / 1000, 0]

expect(broker.isConnected()).toEqual(false)
})

test('returns true when the session lifetime is 0', async () => {
broker = new Broker({
connection: createConnection(e.opts()),
logger: newLogger(),
})
const connectionPool = createConnectionPool(e.opts())
broker = new Broker({ connectionPool, logger: newLogger() })

await broker.connect()
expect(broker.isConnected()).toEqual(true)

broker.sessionLifetime = Long.ZERO
broker.authenticatedAt = [0, 0]
const connection = await connectionPool.getConnection()
connection.sessionLifetime = Long.ZERO
connection.authenticatedAt = [0, 0]

expect(broker.isConnected()).toEqual(true)
})

testIfKafkaAtLeast_1_1_0(`authenticate with SASL ${e.name} if configured`, async () => {
broker = new Broker({
connection: createConnection(e.opts()),
logger: newLogger(),
})
const connectionPool = createConnectionPool(e.opts())
broker = new Broker({ connectionPool, logger: newLogger() })
expect(broker.isConnected()).toEqual(false)
await broker.connect()
expect(broker.isConnected()).toEqual(true)
expect(broker.supportAuthenticationProtocol).toEqual(true)

const connection = await connectionPool.getConnection()
expect(connection.getSupportAuthenticationProtocol()).toEqual(true)
})
}
})

describeIfOauthbearerDisabled('when SASL SCRAM is configured', () => {
testIfKafkaAtLeast_1_1_0('parallel calls to connect using SCRAM', async () => {
broker = new Broker({
connection: createConnection(saslSCRAM256ConnectionOpts()),
connectionPool: createConnectionPool(saslSCRAM256ConnectionOpts()),
logger: newLogger(),
})

Expand Down
6 changes: 3 additions & 3 deletions src/broker/__tests__/createAcls.spec.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const { createConnection, connectionOpts, secureRandom, newLogger } = require('testHelpers')
const { createConnectionPool, connectionOpts, secureRandom, newLogger } = require('testHelpers')

const Broker = require('../index')
const ACL_RESOURCE_TYPES = require('../../protocol/aclResourceTypes')
Expand All @@ -11,7 +11,7 @@ describe('Broker > createAcls', () => {

beforeEach(async () => {
seedBroker = new Broker({
connection: createConnection(connectionOpts()),
connectionPool: createConnectionPool(connectionOpts()),
logger: newLogger(),
})
await seedBroker.connect()
Expand All @@ -20,7 +20,7 @@ describe('Broker > createAcls', () => {
const newBrokerData = metadata.brokers.find(b => b.nodeId === metadata.controllerId)

broker = new Broker({
connection: createConnection(newBrokerData),
connectionPool: createConnectionPool(newBrokerData),
logger: newLogger(),
})
})
Expand Down