Skip to content

Commit

Permalink
Merge pull request #1258 from priitkaard/master
Browse files Browse the repository at this point in the history
#683 Improve concurrency
  • Loading branch information
Nevon committed Mar 15, 2022
2 parents 0edaa45 + c359518 commit 4246f92
Show file tree
Hide file tree
Showing 84 changed files with 1,866 additions and 1,347 deletions.
4 changes: 2 additions & 2 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const AclResourceTypes = require('./src/protocol/aclResourceTypes')
const AclOperationTypes = require('./src/protocol/aclOperationTypes')
const AclPermissionTypes = require('./src/protocol/aclPermissionTypes')
const ResourcePatternTypes = require('./src/protocol/resourcePatternTypes')
const Errors = require('./src/errors')
const { isRebalancing, isKafkaJSError, ...errors } = require('./src/errors')
const { LEVELS } = require('./src/loggers')

module.exports = {
Expand All @@ -34,5 +34,5 @@ module.exports = {
AclPermissionTypes,
ResourcePatternTypes,
ConfigSource,
...Errors,
...errors,
}
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
"test:types": "tsc -p types/"
},
"devDependencies": {
"@types/jest": "^27.4.0",
"@types/node": "^12.0.8",
"@typescript-eslint/typescript-estree": "^1.10.2",
"eslint": "^6.8.0",
Expand Down
5 changes: 3 additions & 2 deletions src/admin/__tests__/fetchTopicOffsetsByTimestamp.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const {
} = require('testHelpers')

describe('Admin', () => {
let topicName, admin, producer, cluster
let topicName, admin, producer, cluster, consumer

beforeEach(async () => {
topicName = `test-topic-${secureRandom()}`
Expand All @@ -30,6 +30,7 @@ describe('Admin', () => {
})
afterEach(async () => {
admin && (await admin.disconnect())
consumer && (await consumer.disconnect())
producer && (await producer.disconnect())
})

Expand Down Expand Up @@ -72,7 +73,7 @@ describe('Admin', () => {
)
expect(offsetsFutureTimestamp).toEqual([{ partition: 0, offset: '20' }])
const groupId = `consumer-group-id-${secureRandom()}`
const consumer = createConsumer({
consumer = createConsumer({
cluster,
groupId,
maxWaitTimeInMs: 1,
Expand Down
16 changes: 11 additions & 5 deletions src/admin/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ const { createCluster, newLogger, secureRandom } = require('testHelpers')
const createRetry = require('../retry')

describe('Admin', () => {
let admin

afterEach(async () => {
admin && (await admin.disconnect())
})

it('gives access to its logger', () => {
expect(
createAdmin({
Expand All @@ -14,7 +20,7 @@ describe('Admin', () => {
})

it('emits connection events', async () => {
const admin = createAdmin({
admin = createAdmin({
cluster: createCluster(),
logger: newLogger(),
})
Expand All @@ -33,7 +39,7 @@ describe('Admin', () => {

test('emits the request event', async () => {
const emitter = new InstrumentationEventEmitter()
const admin = createAdmin({
admin = createAdmin({
cluster: createCluster({ instrumentationEmitter: emitter }),
logger: newLogger(),
instrumentationEmitter: emitter,
Expand Down Expand Up @@ -72,7 +78,7 @@ describe('Admin', () => {
instrumentationEmitter: emitter,
})

const admin = createAdmin({
admin = createAdmin({
cluster,
logger: newLogger(),
instrumentationEmitter: emitter,
Expand Down Expand Up @@ -121,7 +127,7 @@ describe('Admin', () => {
maxInFlightRequests: 1,
})

const admin = createAdmin({
admin = createAdmin({
cluster,
logger: newLogger(),
instrumentationEmitter: emitter,
Expand Down Expand Up @@ -155,7 +161,7 @@ describe('Admin', () => {
})

test('on throws an error when provided with an invalid event name', () => {
const admin = createAdmin({
admin = createAdmin({
cluster: createCluster(),
logger: newLogger(),
})
Expand Down
6 changes: 3 additions & 3 deletions src/broker/__tests__/addOffsetsToTxn.spec.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
const Broker = require('../index')
const COORDINATOR_TYPES = require('../../protocol/coordinatorTypes')
const { secureRandom, createConnection, newLogger, retryProtocol } = require('testHelpers')
const { secureRandom, createConnectionPool, newLogger, retryProtocol } = require('testHelpers')
const { KafkaJSProtocolError } = require('../../errors')

describe('Broker > AddOffsetsToTxn', () => {
Expand All @@ -11,7 +11,7 @@ describe('Broker > AddOffsetsToTxn', () => {
consumerGroupId = `group-id-${secureRandom()}`

seedBroker = new Broker({
connection: createConnection(),
connectionPool: createConnectionPool(),
logger: newLogger(),
})

Expand All @@ -29,7 +29,7 @@ describe('Broker > AddOffsetsToTxn', () => {
)

broker = new Broker({
connection: createConnection({ host, port }),
connectionPool: createConnectionPool({ host, port }),
logger: newLogger(),
})

Expand Down
6 changes: 3 additions & 3 deletions src/broker/__tests__/addPartitionsToTxn.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const { KafkaJSProtocolError } = require('../../errors')
const {
secureRandom,
createTopic,
createConnection,
createConnectionPool,
newLogger,
retryProtocol,
} = require('testHelpers')
Expand All @@ -17,7 +17,7 @@ describe('Broker > AddPartitionsToTxn', () => {
topicName = `test-topic-${secureRandom()}`

seedBroker = new Broker({
connection: createConnection(),
connectionPool: createConnectionPool(),
logger: newLogger(),
})

Expand All @@ -36,7 +36,7 @@ describe('Broker > AddPartitionsToTxn', () => {
)

broker = new Broker({
connection: createConnection({ host, port }),
connectionPool: createConnectionPool({ host, port }),
logger: newLogger(),
})

Expand Down
6 changes: 3 additions & 3 deletions src/broker/__tests__/alterConfigs.spec.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const { createConnection, connectionOpts, secureRandom, newLogger } = require('testHelpers')
const { createConnectionPool, connectionOpts, secureRandom, newLogger } = require('testHelpers')
const RESOURCE_TYPES = require('../../protocol/resourceTypes')
const Broker = require('../index')

Expand All @@ -13,7 +13,7 @@ describe('Broker > alterConfigs', () => {

beforeEach(async () => {
seedBroker = new Broker({
connection: createConnection(connectionOpts()),
connectionPool: createConnectionPool(connectionOpts()),
logger: newLogger(),
})
await seedBroker.connect()
Expand All @@ -22,7 +22,7 @@ describe('Broker > alterConfigs', () => {
const newBrokerData = metadata.brokers.find(b => b.nodeId === metadata.controllerId)

broker = new Broker({
connection: createConnection(newBrokerData),
connectionPool: createConnectionPool(newBrokerData),
logger: newLogger(),
})
})
Expand Down
4 changes: 2 additions & 2 deletions src/broker/__tests__/apiVersions.spec.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const { createConnection, connectionOpts, newLogger } = require('testHelpers')
const { createConnectionPool, connectionOpts, newLogger } = require('testHelpers')
const { KafkaJSProtocolError, KafkaJSNonRetriableError } = require('../../errors')
const { createErrorFromCode } = require('../../protocol/error')
const { requests } = require('../../protocol/requests')
Expand All @@ -11,7 +11,7 @@ describe('Broker > ApiVersions', () => {

beforeEach(async () => {
broker = new Broker({
connection: createConnection(connectionOpts()),
connectionPool: createConnectionPool(connectionOpts()),
logger: newLogger(),
})
await broker.connect()
Expand Down
81 changes: 30 additions & 51 deletions src/broker/__tests__/connect.spec.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
const {
createConnection,
createConnectionPool,
connectionOpts,
saslSCRAM256ConnectionOpts,
newLogger,
Expand All @@ -12,13 +12,11 @@ const Long = require('../../utils/long')
const Broker = require('../index')

describe('Broker > connect', () => {
let broker
let broker, connectionPool

beforeEach(() => {
broker = new Broker({
connection: createConnection(connectionOpts()),
logger: newLogger(),
})
connectionPool = createConnectionPool(connectionOpts())
broker = new Broker({ connectionPool, logger: newLogger() })
})

afterEach(async () => {
Expand All @@ -27,7 +25,7 @@ describe('Broker > connect', () => {

test('establish the connection', async () => {
await broker.connect()
expect(broker.connection.connected).toEqual(true)
expect(broker.connectionPool.isConnected()).toEqual(true)
})

test('load api versions if not provided', async () => {
Expand All @@ -39,7 +37,7 @@ describe('Broker > connect', () => {
for (const e of saslEntries) {
test(`authenticate with SASL ${e.name} if configured`, async () => {
broker = new Broker({
connection: createConnection(e.opts()),
connectionPool: createConnectionPool(e.opts()),
logger: newLogger(),
})
expect(broker.isConnected()).toEqual(false)
Expand All @@ -51,7 +49,7 @@ describe('Broker > connect', () => {
describeIfOauthbearerDisabled('when SASL SCRAM is configured', () => {
test('parallel calls to connect using SCRAM', async () => {
broker = new Broker({
connection: createConnection(saslSCRAM256ConnectionOpts()),
connectionPool: createConnectionPool(saslSCRAM256ConnectionOpts()),
logger: newLogger(),
})

Expand All @@ -69,32 +67,17 @@ describe('Broker > connect', () => {
})
})

test('sets the authenticatedAt timer', async () => {
const error = new Error('not connected')
const timer = process.hrtime()
broker.authenticatedAt = timer
broker.connection.connect = jest.fn(() => {
throw error
})

expect(broker.authenticatedAt).toEqual(timer)
await expect(broker.connect()).rejects.toEqual(error)
expect(broker.authenticatedAt).toBe(null)
})

describe('#isConnected', () => {
test('returns false when not connected', () => {
expect(broker.isConnected()).toEqual(false)
})

for (const e of saslEntries) {
test(`returns false when connected but not authenticated on connections with SASL ${e.name}`, async () => {
broker = new Broker({
connection: createConnection(e.opts()),
logger: newLogger(),
})
const connectionPool = createConnectionPool(e.opts())
broker = new Broker({ connectionPool, logger: newLogger() })
expect(broker.isConnected()).toEqual(false)
await broker.connection.connect()
await connectionPool.getConnection()
expect(broker.isConnected()).toEqual(false)
})
}
Expand All @@ -107,65 +90,61 @@ describe('Broker > connect', () => {
describe('when SaslAuthenticate protocol is available', () => {
for (const e of saslEntries) {
test(`returns true when connected and authenticated on connections with SASL ${e.name}`, async () => {
broker = new Broker({
connection: createConnection(e.opts()),
logger: newLogger(),
})
const connectionPool = createConnectionPool(e.opts())
broker = new Broker({ connectionPool, logger: newLogger() })
await broker.connect()
expect(broker.isConnected()).toEqual(true)
})

test('returns false when the session lifetime has expired', async () => {
const sessionLifetime = 15000
const reauthenticationThreshold = 10000
broker = new Broker({
connection: createConnection(e.opts()),
logger: newLogger(),
reauthenticationThreshold,
})
const connectionPool = createConnectionPool({ ...e.opts(), reauthenticationThreshold })
broker = new Broker({ connectionPool, logger: newLogger() })

await broker.connect()
expect(broker.isConnected()).toEqual(true)

broker.sessionLifetime = Long.fromValue(sessionLifetime)
const [seconds] = broker.authenticatedAt
broker.authenticatedAt = [seconds - sessionLifetime / 1000, 0]
const connection = await connectionPool.getConnection()

connection.sessionLifetime = Long.fromValue(sessionLifetime)
const [seconds] = connection.authenticatedAt
connection.authenticatedAt = [seconds - sessionLifetime / 1000, 0]

expect(broker.isConnected()).toEqual(false)
})

test('returns true when the session lifetime is 0', async () => {
broker = new Broker({
connection: createConnection(e.opts()),
logger: newLogger(),
})
const connectionPool = createConnectionPool(e.opts())
broker = new Broker({ connectionPool, logger: newLogger() })

await broker.connect()
expect(broker.isConnected()).toEqual(true)

broker.sessionLifetime = Long.ZERO
broker.authenticatedAt = [0, 0]
const connection = await connectionPool.getConnection()
connection.sessionLifetime = Long.ZERO
connection.authenticatedAt = [0, 0]

expect(broker.isConnected()).toEqual(true)
})

testIfKafkaAtLeast_1_1_0(`authenticate with SASL ${e.name} if configured`, async () => {
broker = new Broker({
connection: createConnection(e.opts()),
logger: newLogger(),
})
const connectionPool = createConnectionPool(e.opts())
broker = new Broker({ connectionPool, logger: newLogger() })
expect(broker.isConnected()).toEqual(false)
await broker.connect()
expect(broker.isConnected()).toEqual(true)
expect(broker.supportAuthenticationProtocol).toEqual(true)

const connection = await connectionPool.getConnection()
expect(connection.getSupportAuthenticationProtocol()).toEqual(true)
})
}
})

describeIfOauthbearerDisabled('when SASL SCRAM is configured', () => {
testIfKafkaAtLeast_1_1_0('parallel calls to connect using SCRAM', async () => {
broker = new Broker({
connection: createConnection(saslSCRAM256ConnectionOpts()),
connectionPool: createConnectionPool(saslSCRAM256ConnectionOpts()),
logger: newLogger(),
})

Expand Down
6 changes: 3 additions & 3 deletions src/broker/__tests__/createAcls.spec.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const { createConnection, connectionOpts, secureRandom, newLogger } = require('testHelpers')
const { createConnectionPool, connectionOpts, secureRandom, newLogger } = require('testHelpers')

const Broker = require('../index')
const ACL_RESOURCE_TYPES = require('../../protocol/aclResourceTypes')
Expand All @@ -11,7 +11,7 @@ describe('Broker > createAcls', () => {

beforeEach(async () => {
seedBroker = new Broker({
connection: createConnection(connectionOpts()),
connectionPool: createConnectionPool(connectionOpts()),
logger: newLogger(),
})
await seedBroker.connect()
Expand All @@ -20,7 +20,7 @@ describe('Broker > createAcls', () => {
const newBrokerData = metadata.brokers.find(b => b.nodeId === metadata.controllerId)

broker = new Broker({
connection: createConnection(newBrokerData),
connectionPool: createConnectionPool(newBrokerData),
logger: newLogger(),
})
})
Expand Down

0 comments on commit 4246f92

Please sign in to comment.