Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for kinesis event startingPositionTimestamp #11483

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 18 additions & 0 deletions docs/providers/aws/events/streams.md
Expand Up @@ -90,6 +90,24 @@ functions:
enabled: false
```

### Setting the Kinesis StartingPosition

This configuration sets up a disabled Kinesis stream event for the `preprocess` function. The starting position is
`AT_TIMESTAMP` and the timestamp value is `1000000001`.

```yml
functions:
preprocess:
handler: handler.preprocess
events:
- stream:
arn: arn:aws:kinesis:region:XXXXXX:stream/foo
startingPosition: AT_TIMESTAMP
startingPositionTimestamp: 1000000001
maximumRetryAttempts: 10
enabled: false
```

## Setting the BatchWindow

The configuration below sets up a Kinesis stream event for the `preprocess` function which has a batch window of `10`.
Expand Down
20 changes: 19 additions & 1 deletion lib/plugins/aws/package/compile/events/stream.js
Expand Up @@ -2,6 +2,7 @@

const _ = require('lodash');
const resolveLambdaTarget = require('../../../utils/resolve-lambda-target');
const ServerlessError = require('../../../../../serverless-error');

class AwsCompileStreamEvents {
constructor(serverless) {
Expand All @@ -23,7 +24,8 @@ class AwsCompileStreamEvents {
type: { enum: ['dynamodb', 'kinesis'] },
batchSize: { type: 'integer', minimum: 1, maximum: 10000 },
parallelizationFactor: { type: 'integer', minimum: 1, maximum: 10 },
startingPosition: { enum: ['LATEST', 'TRIM_HORIZON'] },
startingPosition: { enum: ['LATEST', 'TRIM_HORIZON', 'AT_TIMESTAMP'] },
startingPositionTimestamp: { type: 'number' },
enabled: { type: 'boolean' },
consumer: { anyOf: [{ const: true }, { $ref: '#/definitions/awsArn' }] },
batchWindow: { type: 'integer', minimum: 0, maximum: 300 },
Expand Down Expand Up @@ -157,6 +159,7 @@ class AwsCompileStreamEvents {
ParallelizationFactor = event.stream.parallelizationFactor;
}
StartingPosition = event.stream.startingPosition || StartingPosition;

if (typeof event.stream.enabled !== 'undefined') {
Enabled = event.stream.enabled;
}
Expand Down Expand Up @@ -317,6 +320,21 @@ class AwsCompileStreamEvents {
streamResource.Properties.EventSourceArn = consumerArn;
kinesisConsumerStatement.Resource.push(consumerArn);
}

if (
event.stream.startingPosition === 'AT_TIMESTAMP' &&
!event.stream.startingPositionTimestamp
) {
throw new ServerlessError(
`You must specify startingPositionTimestamp for function: ${functionName} when startingPosition is AT_TIMESTAMP`,
'FUNCTION_STREAM_STARTING_POSITION_TIMESTAMP_INVALID'
);
}

if (event.stream.startingPositionTimestamp) {
streamResource.Properties.StartingPositionTimestamp =
event.stream.startingPositionTimestamp;
}
}

_.merge(
Expand Down
30 changes: 29 additions & 1 deletion test/unit/lib/plugins/aws/package/compile/events/stream.test.js
Expand Up @@ -1030,6 +1030,8 @@ describe('AwsCompileStreamEvents', () => {
stream: {
arn: 'arn:aws:kinesis:region:account:stream/abc',
consumer: true,
startingPosition: 'AT_TIMESTAMP',
startingPositionTimestamp: 123,
},
},
{
Expand Down Expand Up @@ -1252,7 +1254,11 @@ describe('AwsCompileStreamEvents', () => {
expect(
awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate
.Resources.FirstEventSourceMappingKinesisAbc.Properties.StartingPosition
).to.equal('TRIM_HORIZON');
).to.equal('AT_TIMESTAMP');
expect(
awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate
.Resources.FirstEventSourceMappingKinesisAbc.Properties.StartingPositionTimestamp
).to.equal(123);
expect(
awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate
.Resources.FirstEventSourceMappingKinesisAbc.Properties.Enabled
Expand Down Expand Up @@ -1421,6 +1427,28 @@ describe('AwsCompileStreamEvents', () => {
.Resources.IamRoleLambdaExecution.Properties.Policies[0].PolicyDocument.Statement
).to.deep.equal(iamRoleStatements);
});

it('should fail to compile EventSourceMapping resource properties for startingPosition AT_TIMESTAMP with no startingPositionTimestamp', () => {
expect(() => {
awsCompileStreamEvents.serverless.service.functions = {
first: {
events: [
{
stream: {
arn: 'arn:aws:kinesis:region:account:stream/abc',
consumer: true,
startingPosition: 'AT_TIMESTAMP',
},
},
],
},
};

awsCompileStreamEvents.compileStreamEvents();
}).to.throw(
'You must specify startingPositionTimestamp for function: first when startingPosition is AT_TIMESTAMP'
);
});
});

it('should remove all non-alphanumerics from stream names for the resource logical ids', () => {
Expand Down