From 15de1ea7a489dfa2a288ba4a119ad64a9dd37f9d Mon Sep 17 00:00:00 2001 From: overbit <2861984+overbit@users.noreply.github.com> Date: Thu, 27 Oct 2022 20:47:35 +0100 Subject: [PATCH 1/6] feat: Add support for kafka event startingPositionTimestamp --- .../aws/package/compile/events/kafka.js | 18 +++++- .../aws/package/compile/events/kafka.test.js | 59 +++++++++++++++++++ 2 files changed, 76 insertions(+), 1 deletion(-) diff --git a/lib/plugins/aws/package/compile/events/kafka.js b/lib/plugins/aws/package/compile/events/kafka.js index 473c9e7c4a6..54a2f85b2c9 100644 --- a/lib/plugins/aws/package/compile/events/kafka.js +++ b/lib/plugins/aws/package/compile/events/kafka.js @@ -84,7 +84,11 @@ class AwsCompileKafkaEvents { }, startingPosition: { type: 'string', - enum: ['LATEST', 'TRIM_HORIZON'], + enum: ['LATEST', 'TRIM_HORIZON', 'AT_TIMESTAMP'], + }, + startingPositionTimestamp: { + type: 'number', + minimum: 0, }, topic: { type: 'string', @@ -149,6 +153,14 @@ class AwsCompileKafkaEvents { hasKafkaEvent = true; const { topic, batchSize, maximumBatchingWindow, enabled, consumerGroupId } = event.kafka; const startingPosition = event.kafka.startingPosition || 'TRIM_HORIZON'; + const startingPositionTimestamp = event.kafka.startingPositionTimestamp; + + if (startingPosition === 'AT_TIMESTAMP' && !startingPositionTimestamp) { + throw new ServerlessError( + `You must specify startingPositionTimestamp for function: ${functionName} when startingPosition is AT_TIMESTAMP`, + 'FUNCTION_KAFKA_STARTING_POSITION_TIMESTAMP_INVALID' + ); + } const kafkaEventLogicalId = this.provider.naming.getKafkaEventLogicalId( functionName, @@ -243,6 +255,10 @@ class AwsCompileKafkaEvents { }; } + if (startingPositionTimestamp) { + kafkaResource.Properties.StartingPositionTimestamp = startingPositionTimestamp; + } + cfTemplate.Resources[kafkaEventLogicalId] = kafkaResource; }); diff --git a/test/unit/lib/plugins/aws/package/compile/events/kafka.test.js b/test/unit/lib/plugins/aws/package/compile/events/kafka.test.js index 853601df691..077d666de9c 100644 --- a/test/unit/lib/plugins/aws/package/compile/events/kafka.test.js +++ b/test/unit/lib/plugins/aws/package/compile/events/kafka.test.js @@ -533,6 +533,65 @@ describe('test/unit/lib/plugins/aws/package/compile/events/kafka.test.js', () => }); }); + describe('startingPositionTimestamp', () => { + it('should fail to compile EventSourceMapping resource properties for startingPosition AT_TIMESTAMP with no startingPositionTimestamp', async () => { + await expect( + await runServerless({ + fixture: 'function', + configExt: { + functions: { + basic: { + role: { 'Fn::ImportValue': 'MyImportedRole' }, + events: [ + { + kafka: { + topic, + bootstrapServers: ['abc.xyz:9092'], + accessConfigurations: { saslScram256Auth: saslScram256AuthArn }, + startingPosition: 'AT_TIMESTAMP', + }, + }, + ], + }, + }, + }, + command: 'package', + }) + ).to.be.rejected.and.eventually.contain({ + code: 'FUNCTION_KAFKA_STARTING_POSITION_TIMESTAMP_INVALID', + }); + }); + }); + + it('should correctly compile EventSourceMapping resource properties for startingPosition', async () => { + const { awsNaming, cfTemplate } = await runServerless({ + fixture: 'function', + configExt: { + functions: { + basic: { + role: { 'Fn::ImportValue': 'MyImportedRole' }, + events: [ + { + kafka: { + topic, + bootstrapServers: ['abc.xyz:9092'], + accessConfigurations: { saslScram256Auth: saslScram256AuthArn }, + startingPosition: 'AT_TIMESTAMP', + startingPositionTimestamp: 123, + }, + }, + ], + }, + }, + }, + command: 'package', + }); + + const eventSourceMappingResource = + cfTemplate.Resources[awsNaming.getKafkaEventLogicalId('basic', 'TestingTopic')]; + expect(eventSourceMappingResource.Properties.StartingPositionTimestamp).to.deep.equal(123); + }); + it('should not add dependsOn for imported role', async () => { const { awsNaming, cfTemplate } = await runServerless({ fixture: 'function', From 7e829f75180bc2b6491570c7d76dca087ee0a5ae Mon Sep 17 00:00:00 2001 From: overbit <2861984+overbit@users.noreply.github.com> Date: Thu, 27 Oct 2022 20:47:59 +0100 Subject: [PATCH 2/6] refactor: remove type restrictions to startingPositionTimestamp to leave them to Cloudformation --- lib/plugins/aws/package/compile/events/kafka.js | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/plugins/aws/package/compile/events/kafka.js b/lib/plugins/aws/package/compile/events/kafka.js index 54a2f85b2c9..1d381796bda 100644 --- a/lib/plugins/aws/package/compile/events/kafka.js +++ b/lib/plugins/aws/package/compile/events/kafka.js @@ -88,7 +88,6 @@ class AwsCompileKafkaEvents { }, startingPositionTimestamp: { type: 'number', - minimum: 0, }, topic: { type: 'string', @@ -157,7 +156,7 @@ class AwsCompileKafkaEvents { if (startingPosition === 'AT_TIMESTAMP' && !startingPositionTimestamp) { throw new ServerlessError( - `You must specify startingPositionTimestamp for function: ${functionName} when startingPosition is AT_TIMESTAMP`, + `You must specify startingPositionTimestamp for function: ${functionName} when startingPosition is AT_TIMESTAMP.`, 'FUNCTION_KAFKA_STARTING_POSITION_TIMESTAMP_INVALID' ); } From cebb79664038abe52776202c9ea23ce9a3c854a8 Mon Sep 17 00:00:00 2001 From: overbit <2861984+overbit@users.noreply.github.com> Date: Thu, 27 Oct 2022 20:48:10 +0100 Subject: [PATCH 3/6] fix: unit test for StartingPositionTimestamp --- .../aws/package/compile/events/kafka.test.js | 50 +++++++++---------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/test/unit/lib/plugins/aws/package/compile/events/kafka.test.js b/test/unit/lib/plugins/aws/package/compile/events/kafka.test.js index 077d666de9c..e0ca076defc 100644 --- a/test/unit/lib/plugins/aws/package/compile/events/kafka.test.js +++ b/test/unit/lib/plugins/aws/package/compile/events/kafka.test.js @@ -536,7 +536,7 @@ describe('test/unit/lib/plugins/aws/package/compile/events/kafka.test.js', () => describe('startingPositionTimestamp', () => { it('should fail to compile EventSourceMapping resource properties for startingPosition AT_TIMESTAMP with no startingPositionTimestamp', async () => { await expect( - await runServerless({ + runServerless({ fixture: 'function', configExt: { functions: { @@ -561,35 +561,35 @@ describe('test/unit/lib/plugins/aws/package/compile/events/kafka.test.js', () => code: 'FUNCTION_KAFKA_STARTING_POSITION_TIMESTAMP_INVALID', }); }); - }); - it('should correctly compile EventSourceMapping resource properties for startingPosition', async () => { - const { awsNaming, cfTemplate } = await runServerless({ - fixture: 'function', - configExt: { - functions: { - basic: { - role: { 'Fn::ImportValue': 'MyImportedRole' }, - events: [ - { - kafka: { - topic, - bootstrapServers: ['abc.xyz:9092'], - accessConfigurations: { saslScram256Auth: saslScram256AuthArn }, - startingPosition: 'AT_TIMESTAMP', - startingPositionTimestamp: 123, + it('should correctly compile EventSourceMapping resource properties for startingPosition', async () => { + const { awsNaming, cfTemplate } = await runServerless({ + fixture: 'function', + configExt: { + functions: { + basic: { + role: { 'Fn::ImportValue': 'MyImportedRole' }, + events: [ + { + kafka: { + topic, + bootstrapServers: ['abc.xyz:9092'], + accessConfigurations: { saslScram256Auth: saslScram256AuthArn }, + startingPosition: 'AT_TIMESTAMP', + startingPositionTimestamp: 123, + }, }, - }, - ], + ], + }, }, }, - }, - command: 'package', - }); + command: 'package', + }); - const eventSourceMappingResource = - cfTemplate.Resources[awsNaming.getKafkaEventLogicalId('basic', 'TestingTopic')]; - expect(eventSourceMappingResource.Properties.StartingPositionTimestamp).to.deep.equal(123); + const eventSourceMappingResource = + cfTemplate.Resources[awsNaming.getKafkaEventLogicalId('basic', 'TestingTopic')]; + expect(eventSourceMappingResource.Properties.StartingPositionTimestamp).to.deep.equal(123); + }); }); it('should not add dependsOn for imported role', async () => { From 76dbad8e4637f5e77d0543747974b9b7580bc4c7 Mon Sep 17 00:00:00 2001 From: overbit <2861984+overbit@users.noreply.github.com> Date: Thu, 27 Oct 2022 21:48:39 +0100 Subject: [PATCH 4/6] doc: update documentation with new values --- docs/providers/aws/events/kafka.md | 3 ++- docs/providers/aws/guide/serverless.yml.md | 4 +++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/docs/providers/aws/events/kafka.md b/docs/providers/aws/events/kafka.md index d06a95f6b44..de285752852 100644 --- a/docs/providers/aws/events/kafka.md +++ b/docs/providers/aws/events/kafka.md @@ -152,7 +152,8 @@ The Serverless Framework will automatically configure the most minimal set of IA You can set the `batchSize`, which effects how many messages can be processed in a single Lambda invocation. The default `batchSize` is 100, and the max `batchSize` is 10000. Likewise `maximumBatchingWindow` can be set to determine the amount of time the Lambda spends gathering records before invoking the function. The default is 0, but **if you set `batchSize` to more than 10, you must set `maximumBatchingWindow` to at least 1**. The maximum is 300. -In addition, you can also configure `startingPosition`, which controls the position at which Lambda should start consuming messages from the topic. It supports two possible values, `TRIM_HORIZON` and `LATEST`, with `TRIM_HORIZON` being the default. +In addition, you can also configure `startingPosition`, which controls the position at which Lambda should start consuming messages from the topic. It supports three possible values, `TRIM_HORIZON`, `LATEST` and `AT_TIMESTAMP`, with `TRIM_HORIZON` being the default. +When `startingPosition` is configured as `AT_TIMESTAMP`, `startingPositionTimestamp` is also mandatory. In the following example, we specify that the `compute` function should have a `kafka` event configured with `batchSize` of 1000, `maximumBatchingWindow` of 30 seconds and `startingPosition` equal to `LATEST`. diff --git a/docs/providers/aws/guide/serverless.yml.md b/docs/providers/aws/guide/serverless.yml.md index 8f4ce70ebb0..e183f5a0ca8 100644 --- a/docs/providers/aws/guide/serverless.yml.md +++ b/docs/providers/aws/guide/serverless.yml.md @@ -1023,8 +1023,10 @@ functions: batchSize: 100 # Optional, must be in 0-300 range (seconds) maximumBatchingWindow: 30 - # Optional, can be set to LATEST or TRIM_HORIZON + # Optional, can be set to LATEST, AT_TIMESTAMP or TRIM_HORIZON startingPosition: LATEST + # Mandatory when startingPosition is AT_TIMESTAMP + startingPositionTimestamp: 10000123 # (default: true) enabled: false # Optional, specifies the consumer group ID to be used when consuming from Kafka. If not provided, a random UUID will be generated From fe3b9e62af178f39b1821f3dc634b13e0f437fe3 Mon Sep 17 00:00:00 2001 From: Daniele Iasella <2861984+overbit@users.noreply.github.com> Date: Wed, 2 Nov 2022 15:36:29 +0100 Subject: [PATCH 5/6] fix: respect schema definitions for starting position timestamp parameter --- lib/plugins/aws/package/compile/events/kafka.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/plugins/aws/package/compile/events/kafka.js b/lib/plugins/aws/package/compile/events/kafka.js index 1d381796bda..b5ad3519c45 100644 --- a/lib/plugins/aws/package/compile/events/kafka.js +++ b/lib/plugins/aws/package/compile/events/kafka.js @@ -254,7 +254,7 @@ class AwsCompileKafkaEvents { }; } - if (startingPositionTimestamp) { + if (startingPositionTimestamp !== undefined) { kafkaResource.Properties.StartingPositionTimestamp = startingPositionTimestamp; } From 47654a3e0e6059f172269bf4718de94771077194 Mon Sep 17 00:00:00 2001 From: Daniele Iasella Date: Thu, 3 Nov 2022 22:30:23 +0100 Subject: [PATCH 6/6] fix: checks on kafka startingpositiontimestamp param --- lib/plugins/aws/package/compile/events/kafka.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/plugins/aws/package/compile/events/kafka.js b/lib/plugins/aws/package/compile/events/kafka.js index b5ad3519c45..f8629f755c9 100644 --- a/lib/plugins/aws/package/compile/events/kafka.js +++ b/lib/plugins/aws/package/compile/events/kafka.js @@ -154,7 +154,7 @@ class AwsCompileKafkaEvents { const startingPosition = event.kafka.startingPosition || 'TRIM_HORIZON'; const startingPositionTimestamp = event.kafka.startingPositionTimestamp; - if (startingPosition === 'AT_TIMESTAMP' && !startingPositionTimestamp) { + if (startingPosition === 'AT_TIMESTAMP' && startingPositionTimestamp == null) { throw new ServerlessError( `You must specify startingPositionTimestamp for function: ${functionName} when startingPosition is AT_TIMESTAMP.`, 'FUNCTION_KAFKA_STARTING_POSITION_TIMESTAMP_INVALID' @@ -254,7 +254,7 @@ class AwsCompileKafkaEvents { }; } - if (startingPositionTimestamp !== undefined) { + if (startingPositionTimestamp != null) { kafkaResource.Properties.StartingPositionTimestamp = startingPositionTimestamp; }