Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for kafka event startingPositionTimestamp #11479

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/providers/aws/events/kafka.md
Expand Up @@ -152,7 +152,8 @@ The Serverless Framework will automatically configure the most minimal set of IA

You can set the `batchSize`, which effects how many messages can be processed in a single Lambda invocation. The default `batchSize` is 100, and the max `batchSize` is 10000.
Likewise `maximumBatchingWindow` can be set to determine the amount of time the Lambda spends gathering records before invoking the function. The default is 0, but **if you set `batchSize` to more than 10, you must set `maximumBatchingWindow` to at least 1**. The maximum is 300.
In addition, you can also configure `startingPosition`, which controls the position at which Lambda should start consuming messages from the topic. It supports two possible values, `TRIM_HORIZON` and `LATEST`, with `TRIM_HORIZON` being the default.
In addition, you can also configure `startingPosition`, which controls the position at which Lambda should start consuming messages from the topic. It supports three possible values, `TRIM_HORIZON`, `LATEST` and `AT_TIMESTAMP`, with `TRIM_HORIZON` being the default.
When `startingPosition` is configured as `AT_TIMESTAMP`, `startingPositionTimestamp` is also mandatory.

In the following example, we specify that the `compute` function should have a `kafka` event configured with `batchSize` of 1000, `maximumBatchingWindow` of 30 seconds and `startingPosition` equal to `LATEST`.

Expand Down
4 changes: 3 additions & 1 deletion docs/providers/aws/guide/serverless.yml.md
Expand Up @@ -1023,8 +1023,10 @@ functions:
batchSize: 100
# Optional, must be in 0-300 range (seconds)
maximumBatchingWindow: 30
# Optional, can be set to LATEST or TRIM_HORIZON
# Optional, can be set to LATEST, AT_TIMESTAMP or TRIM_HORIZON
startingPosition: LATEST
# Mandatory when startingPosition is AT_TIMESTAMP
startingPositionTimestamp: 10000123
# (default: true)
enabled: false
# Optional, specifies the consumer group ID to be used when consuming from Kafka. If not provided, a random UUID will be generated
Expand Down
17 changes: 16 additions & 1 deletion lib/plugins/aws/package/compile/events/kafka.js
Expand Up @@ -84,7 +84,10 @@ class AwsCompileKafkaEvents {
},
startingPosition: {
type: 'string',
enum: ['LATEST', 'TRIM_HORIZON'],
enum: ['LATEST', 'TRIM_HORIZON', 'AT_TIMESTAMP'],
},
startingPositionTimestamp: {
type: 'number',
},
topic: {
type: 'string',
Expand Down Expand Up @@ -149,6 +152,14 @@ class AwsCompileKafkaEvents {
hasKafkaEvent = true;
const { topic, batchSize, maximumBatchingWindow, enabled, consumerGroupId } = event.kafka;
const startingPosition = event.kafka.startingPosition || 'TRIM_HORIZON';
const startingPositionTimestamp = event.kafka.startingPositionTimestamp;

if (startingPosition === 'AT_TIMESTAMP' && !startingPositionTimestamp) {
overbit marked this conversation as resolved.
Show resolved Hide resolved
throw new ServerlessError(
`You must specify startingPositionTimestamp for function: ${functionName} when startingPosition is AT_TIMESTAMP.`,
'FUNCTION_KAFKA_STARTING_POSITION_TIMESTAMP_INVALID'
);
}

const kafkaEventLogicalId = this.provider.naming.getKafkaEventLogicalId(
functionName,
Expand Down Expand Up @@ -243,6 +254,10 @@ class AwsCompileKafkaEvents {
};
}

if (startingPositionTimestamp !== undefined) {
overbit marked this conversation as resolved.
Show resolved Hide resolved
kafkaResource.Properties.StartingPositionTimestamp = startingPositionTimestamp;
}

cfTemplate.Resources[kafkaEventLogicalId] = kafkaResource;
});

Expand Down
59 changes: 59 additions & 0 deletions test/unit/lib/plugins/aws/package/compile/events/kafka.test.js
Expand Up @@ -533,6 +533,65 @@ describe('test/unit/lib/plugins/aws/package/compile/events/kafka.test.js', () =>
});
});

describe('startingPositionTimestamp', () => {
it('should fail to compile EventSourceMapping resource properties for startingPosition AT_TIMESTAMP with no startingPositionTimestamp', async () => {
await expect(
runServerless({
fixture: 'function',
configExt: {
functions: {
basic: {
role: { 'Fn::ImportValue': 'MyImportedRole' },
events: [
{
kafka: {
topic,
bootstrapServers: ['abc.xyz:9092'],
accessConfigurations: { saslScram256Auth: saslScram256AuthArn },
startingPosition: 'AT_TIMESTAMP',
},
},
],
},
},
},
command: 'package',
})
).to.be.rejected.and.eventually.contain({
code: 'FUNCTION_KAFKA_STARTING_POSITION_TIMESTAMP_INVALID',
});
});

it('should correctly compile EventSourceMapping resource properties for startingPosition', async () => {
const { awsNaming, cfTemplate } = await runServerless({
fixture: 'function',
configExt: {
functions: {
basic: {
role: { 'Fn::ImportValue': 'MyImportedRole' },
events: [
{
kafka: {
topic,
bootstrapServers: ['abc.xyz:9092'],
accessConfigurations: { saslScram256Auth: saslScram256AuthArn },
startingPosition: 'AT_TIMESTAMP',
startingPositionTimestamp: 123,
},
},
],
},
},
},
command: 'package',
});

const eventSourceMappingResource =
cfTemplate.Resources[awsNaming.getKafkaEventLogicalId('basic', 'TestingTopic')];
expect(eventSourceMappingResource.Properties.StartingPositionTimestamp).to.deep.equal(123);
});
});

it('should not add dependsOn for imported role', async () => {
const { awsNaming, cfTemplate } = await runServerless({
fixture: 'function',
Expand Down