From efeabf579aef2b68d9e1ca65a6fc65950e39b5b1 Mon Sep 17 00:00:00 2001 From: Daniele Iasella Date: Thu, 27 Oct 2022 12:09:05 +0100 Subject: [PATCH] feat: Add support for kafka event startingPositionTimestamp --- .../aws/package/compile/events/kafka.js | 18 +++++- .../aws/package/compile/events/kafka.test.js | 59 +++++++++++++++++++ 2 files changed, 76 insertions(+), 1 deletion(-) diff --git a/lib/plugins/aws/package/compile/events/kafka.js b/lib/plugins/aws/package/compile/events/kafka.js index 473c9e7c4a62..54a2f85b2c98 100644 --- a/lib/plugins/aws/package/compile/events/kafka.js +++ b/lib/plugins/aws/package/compile/events/kafka.js @@ -84,7 +84,11 @@ class AwsCompileKafkaEvents { }, startingPosition: { type: 'string', - enum: ['LATEST', 'TRIM_HORIZON'], + enum: ['LATEST', 'TRIM_HORIZON', 'AT_TIMESTAMP'], + }, + startingPositionTimestamp: { + type: 'number', + minimum: 0, }, topic: { type: 'string', @@ -149,6 +153,14 @@ class AwsCompileKafkaEvents { hasKafkaEvent = true; const { topic, batchSize, maximumBatchingWindow, enabled, consumerGroupId } = event.kafka; const startingPosition = event.kafka.startingPosition || 'TRIM_HORIZON'; + const startingPositionTimestamp = event.kafka.startingPositionTimestamp; + + if (startingPosition === 'AT_TIMESTAMP' && !startingPositionTimestamp) { + throw new ServerlessError( + `You must specify startingPositionTimestamp for function: ${functionName} when startingPosition is AT_TIMESTAMP`, + 'FUNCTION_KAFKA_STARTING_POSITION_TIMESTAMP_INVALID' + ); + } const kafkaEventLogicalId = this.provider.naming.getKafkaEventLogicalId( functionName, @@ -243,6 +255,10 @@ class AwsCompileKafkaEvents { }; } + if (startingPositionTimestamp) { + kafkaResource.Properties.StartingPositionTimestamp = startingPositionTimestamp; + } + cfTemplate.Resources[kafkaEventLogicalId] = kafkaResource; }); diff --git a/test/unit/lib/plugins/aws/package/compile/events/kafka.test.js b/test/unit/lib/plugins/aws/package/compile/events/kafka.test.js index 853601df691a..077d666de9ce 100644 --- a/test/unit/lib/plugins/aws/package/compile/events/kafka.test.js +++ b/test/unit/lib/plugins/aws/package/compile/events/kafka.test.js @@ -533,6 +533,65 @@ describe('test/unit/lib/plugins/aws/package/compile/events/kafka.test.js', () => }); }); + describe('startingPositionTimestamp', () => { + it('should fail to compile EventSourceMapping resource properties for startingPosition AT_TIMESTAMP with no startingPositionTimestamp', async () => { + await expect( + await runServerless({ + fixture: 'function', + configExt: { + functions: { + basic: { + role: { 'Fn::ImportValue': 'MyImportedRole' }, + events: [ + { + kafka: { + topic, + bootstrapServers: ['abc.xyz:9092'], + accessConfigurations: { saslScram256Auth: saslScram256AuthArn }, + startingPosition: 'AT_TIMESTAMP', + }, + }, + ], + }, + }, + }, + command: 'package', + }) + ).to.be.rejected.and.eventually.contain({ + code: 'FUNCTION_KAFKA_STARTING_POSITION_TIMESTAMP_INVALID', + }); + }); + }); + + it('should correctly compile EventSourceMapping resource properties for startingPosition', async () => { + const { awsNaming, cfTemplate } = await runServerless({ + fixture: 'function', + configExt: { + functions: { + basic: { + role: { 'Fn::ImportValue': 'MyImportedRole' }, + events: [ + { + kafka: { + topic, + bootstrapServers: ['abc.xyz:9092'], + accessConfigurations: { saslScram256Auth: saslScram256AuthArn }, + startingPosition: 'AT_TIMESTAMP', + startingPositionTimestamp: 123, + }, + }, + ], + }, + }, + }, + command: 'package', + }); + + const eventSourceMappingResource = + cfTemplate.Resources[awsNaming.getKafkaEventLogicalId('basic', 'TestingTopic')]; + expect(eventSourceMappingResource.Properties.StartingPositionTimestamp).to.deep.equal(123); + }); + it('should not add dependsOn for imported role', async () => { const { awsNaming, cfTemplate } = await runServerless({ fixture: 'function',