Skip to content

Commit

Permalink
feat(AWS Kafka): Support startingPositionTimestamp (#11479)
Browse files Browse the repository at this point in the history
  • Loading branch information
overbit committed Nov 7, 2022
1 parent a501918 commit 858758e
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 3 deletions.
3 changes: 2 additions & 1 deletion docs/providers/aws/events/kafka.md
Expand Up @@ -152,7 +152,8 @@ The Serverless Framework will automatically configure the most minimal set of IA

You can set the `batchSize`, which effects how many messages can be processed in a single Lambda invocation. The default `batchSize` is 100, and the max `batchSize` is 10000.
Likewise `maximumBatchingWindow` can be set to determine the amount of time the Lambda spends gathering records before invoking the function. The default is 0, but **if you set `batchSize` to more than 10, you must set `maximumBatchingWindow` to at least 1**. The maximum is 300.
In addition, you can also configure `startingPosition`, which controls the position at which Lambda should start consuming messages from the topic. It supports two possible values, `TRIM_HORIZON` and `LATEST`, with `TRIM_HORIZON` being the default.
In addition, you can also configure `startingPosition`, which controls the position at which Lambda should start consuming messages from the topic. It supports three possible values, `TRIM_HORIZON`, `LATEST` and `AT_TIMESTAMP`, with `TRIM_HORIZON` being the default.
When `startingPosition` is configured as `AT_TIMESTAMP`, `startingPositionTimestamp` is also mandatory.

In the following example, we specify that the `compute` function should have a `kafka` event configured with `batchSize` of 1000, `maximumBatchingWindow` of 30 seconds and `startingPosition` equal to `LATEST`.

Expand Down
4 changes: 3 additions & 1 deletion docs/providers/aws/guide/serverless.yml.md
Expand Up @@ -1024,8 +1024,10 @@ functions:
batchSize: 100
# Optional, must be in 0-300 range (seconds)
maximumBatchingWindow: 30
# Optional, can be set to LATEST or TRIM_HORIZON
# Optional, can be set to LATEST, AT_TIMESTAMP or TRIM_HORIZON
startingPosition: LATEST
# Mandatory when startingPosition is AT_TIMESTAMP
startingPositionTimestamp: 10000123
# (default: true)
enabled: false
# Optional, specifies the consumer group ID to be used when consuming from Kafka. If not provided, a random UUID will be generated
Expand Down
17 changes: 16 additions & 1 deletion lib/plugins/aws/package/compile/events/kafka.js
Expand Up @@ -84,7 +84,10 @@ class AwsCompileKafkaEvents {
},
startingPosition: {
type: 'string',
enum: ['LATEST', 'TRIM_HORIZON'],
enum: ['LATEST', 'TRIM_HORIZON', 'AT_TIMESTAMP'],
},
startingPositionTimestamp: {
type: 'number',
},
topic: {
type: 'string',
Expand Down Expand Up @@ -149,6 +152,14 @@ class AwsCompileKafkaEvents {
hasKafkaEvent = true;
const { topic, batchSize, maximumBatchingWindow, enabled, consumerGroupId } = event.kafka;
const startingPosition = event.kafka.startingPosition || 'TRIM_HORIZON';
const startingPositionTimestamp = event.kafka.startingPositionTimestamp;

if (startingPosition === 'AT_TIMESTAMP' && startingPositionTimestamp == null) {
throw new ServerlessError(
`You must specify startingPositionTimestamp for function: ${functionName} when startingPosition is AT_TIMESTAMP.`,
'FUNCTION_KAFKA_STARTING_POSITION_TIMESTAMP_INVALID'
);
}

const kafkaEventLogicalId = this.provider.naming.getKafkaEventLogicalId(
functionName,
Expand Down Expand Up @@ -243,6 +254,10 @@ class AwsCompileKafkaEvents {
};
}

if (startingPositionTimestamp != null) {
kafkaResource.Properties.StartingPositionTimestamp = startingPositionTimestamp;
}

cfTemplate.Resources[kafkaEventLogicalId] = kafkaResource;
});

Expand Down
59 changes: 59 additions & 0 deletions test/unit/lib/plugins/aws/package/compile/events/kafka.test.js
Expand Up @@ -533,6 +533,65 @@ describe('test/unit/lib/plugins/aws/package/compile/events/kafka.test.js', () =>
});
});

describe('startingPositionTimestamp', () => {
it('should fail to compile EventSourceMapping resource properties for startingPosition AT_TIMESTAMP with no startingPositionTimestamp', async () => {
await expect(
runServerless({
fixture: 'function',
configExt: {
functions: {
basic: {
role: { 'Fn::ImportValue': 'MyImportedRole' },
events: [
{
kafka: {
topic,
bootstrapServers: ['abc.xyz:9092'],
accessConfigurations: { saslScram256Auth: saslScram256AuthArn },
startingPosition: 'AT_TIMESTAMP',
},
},
],
},
},
},
command: 'package',
})
).to.be.rejected.and.eventually.contain({
code: 'FUNCTION_KAFKA_STARTING_POSITION_TIMESTAMP_INVALID',
});
});

it('should correctly compile EventSourceMapping resource properties for startingPosition', async () => {
const { awsNaming, cfTemplate } = await runServerless({
fixture: 'function',
configExt: {
functions: {
basic: {
role: { 'Fn::ImportValue': 'MyImportedRole' },
events: [
{
kafka: {
topic,
bootstrapServers: ['abc.xyz:9092'],
accessConfigurations: { saslScram256Auth: saslScram256AuthArn },
startingPosition: 'AT_TIMESTAMP',
startingPositionTimestamp: 123,
},
},
],
},
},
},
command: 'package',
});

const eventSourceMappingResource =
cfTemplate.Resources[awsNaming.getKafkaEventLogicalId('basic', 'TestingTopic')];
expect(eventSourceMappingResource.Properties.StartingPositionTimestamp).to.deep.equal(123);
});
});

it('should not add dependsOn for imported role', async () => {
const { awsNaming, cfTemplate } = await runServerless({
fixture: 'function',
Expand Down

0 comments on commit 858758e

Please sign in to comment.