Skip to content

Commit

Permalink
feat: Add support for kafka event startingPositionTimestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniele Iasella committed Oct 27, 2022
1 parent d2b6926 commit 43e3096
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 1 deletion.
18 changes: 17 additions & 1 deletion lib/plugins/aws/package/compile/events/kafka.js
Expand Up @@ -84,7 +84,11 @@ class AwsCompileKafkaEvents {
},
startingPosition: {
type: 'string',
enum: ['LATEST', 'TRIM_HORIZON'],
enum: ['LATEST', 'TRIM_HORIZON', 'AT_TIMESTAMP'],
},
startingPositionTimestamp: {
type: 'number',
minimum: 0,
},
topic: {
type: 'string',
Expand Down Expand Up @@ -149,6 +153,14 @@ class AwsCompileKafkaEvents {
hasKafkaEvent = true;
const { topic, batchSize, maximumBatchingWindow, enabled, consumerGroupId } = event.kafka;
const startingPosition = event.kafka.startingPosition || 'TRIM_HORIZON';
const startingPositionTimestamp = event.kafka.startingPositionTimestamp;

if (startingPosition === 'AT_TIMESTAMP' && !startingPositionTimestamp) {
throw new ServerlessError(
`You must specify startingPositionTimestamp for function: ${functionName} when startingPosition is AT_TIMESTAMP`,
'FUNCTION_KAFKA_STARTING_POSITION_TIMESTAMP_INVALID'
);
}

const kafkaEventLogicalId = this.provider.naming.getKafkaEventLogicalId(
functionName,
Expand Down Expand Up @@ -243,6 +255,10 @@ class AwsCompileKafkaEvents {
};
}

if (startingPositionTimestamp) {
kafkaResource.Properties.StartingPositionTimestamp = startingPositionTimestamp;
}

cfTemplate.Resources[kafkaEventLogicalId] = kafkaResource;
});

Expand Down
59 changes: 59 additions & 0 deletions test/unit/lib/plugins/aws/package/compile/events/kafka.test.js
Expand Up @@ -533,6 +533,65 @@ describe('test/unit/lib/plugins/aws/package/compile/events/kafka.test.js', () =>
});
});

describe('startingPositionAtTimestamp', () => {
it('should fail to compile EventSourceMapping resource properties for startingPosition AT_TIMESTAMP with no startingPositionAtTimestamp', async () => {
await expect(
await runServerless({
fixture: 'function',
configExt: {
functions: {
basic: {
role: { 'Fn::ImportValue': 'MyImportedRole' },
events: [
{
kafka: {
topic,
bootstrapServers: ['abc.xyz:9092'],
accessConfigurations: { saslScram256Auth: saslScram256AuthArn },
startingPosition: 'AT_TIMESTAMP',
},
},
],
},
},
},
command: 'package',
})
).to.be.rejected.and.eventually.contain({
code: 'FUNCTION_KAFKA_STARTING_POSITION_TIMESTAMP_INVALID',
});
});
});

it('should correctly compile EventSourceMapping resource properties for startingPosition', async () => {
const { awsNaming, cfTemplate } = await runServerless({
fixture: 'function',
configExt: {
functions: {
basic: {
role: { 'Fn::ImportValue': 'MyImportedRole' },
events: [
{
kafka: {
topic,
bootstrapServers: ['abc.xyz:9092'],
accessConfigurations: { saslScram256Auth: saslScram256AuthArn },
startingPosition: 'AT_TIMESTAMP',
startingPositionAtTimestamp: 123,
},
},
],
},
},
},
command: 'package',
});

const eventSourceMappingResource =
cfTemplate.Resources[awsNaming.getKafkaEventLogicalId('basic', 'TestingTopic')];
expect(eventSourceMappingResource.DependsOn).to.deep.equal(123);
});

it('should not add dependsOn for imported role', async () => {
const { awsNaming, cfTemplate } = await runServerless({
fixture: 'function',
Expand Down

0 comments on commit 43e3096

Please sign in to comment.