Skip to content

Commit

Permalink
feat(AWS Kinesis): Support AT_TIMESTAMP starting position (#11483)
Browse files Browse the repository at this point in the history
  • Loading branch information
overbit committed Oct 31, 2022
1 parent 0c186a6 commit 5d41995
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 2 deletions.
18 changes: 18 additions & 0 deletions docs/providers/aws/events/streams.md
Expand Up @@ -90,6 +90,24 @@ functions:
enabled: false
```

### Setting the Kinesis StartingPosition

This configuration sets up a disabled Kinesis stream event for the `preprocess` function. The starting position is
`AT_TIMESTAMP` and the timestamp value is `1000000001`.

```yml
functions:
preprocess:
handler: handler.preprocess
events:
- stream:
arn: arn:aws:kinesis:region:XXXXXX:stream/foo
startingPosition: AT_TIMESTAMP
startingPositionTimestamp: 1000000001
maximumRetryAttempts: 10
enabled: false
```

## Setting the BatchWindow

The configuration below sets up a Kinesis stream event for the `preprocess` function which has a batch window of `10`.
Expand Down
20 changes: 19 additions & 1 deletion lib/plugins/aws/package/compile/events/stream.js
Expand Up @@ -2,6 +2,7 @@

const _ = require('lodash');
const resolveLambdaTarget = require('../../../utils/resolve-lambda-target');
const ServerlessError = require('../../../../../serverless-error');

class AwsCompileStreamEvents {
constructor(serverless) {
Expand All @@ -23,7 +24,8 @@ class AwsCompileStreamEvents {
type: { enum: ['dynamodb', 'kinesis'] },
batchSize: { type: 'integer', minimum: 1, maximum: 10000 },
parallelizationFactor: { type: 'integer', minimum: 1, maximum: 10 },
startingPosition: { enum: ['LATEST', 'TRIM_HORIZON'] },
startingPosition: { enum: ['LATEST', 'TRIM_HORIZON', 'AT_TIMESTAMP'] },
startingPositionTimestamp: { type: 'number' },
enabled: { type: 'boolean' },
consumer: { anyOf: [{ const: true }, { $ref: '#/definitions/awsArn' }] },
batchWindow: { type: 'integer', minimum: 0, maximum: 300 },
Expand Down Expand Up @@ -157,6 +159,7 @@ class AwsCompileStreamEvents {
ParallelizationFactor = event.stream.parallelizationFactor;
}
StartingPosition = event.stream.startingPosition || StartingPosition;

if (typeof event.stream.enabled !== 'undefined') {
Enabled = event.stream.enabled;
}
Expand Down Expand Up @@ -317,6 +320,21 @@ class AwsCompileStreamEvents {
streamResource.Properties.EventSourceArn = consumerArn;
kinesisConsumerStatement.Resource.push(consumerArn);
}

if (
event.stream.startingPosition === 'AT_TIMESTAMP' &&
!event.stream.startingPositionTimestamp
) {
throw new ServerlessError(
`You must specify startingPositionTimestamp for function: ${functionName} when startingPosition is AT_TIMESTAMP`,
'FUNCTION_STREAM_STARTING_POSITION_TIMESTAMP_INVALID'
);
}

if (event.stream.startingPositionTimestamp) {
streamResource.Properties.StartingPositionTimestamp =
event.stream.startingPositionTimestamp;
}
}

_.merge(
Expand Down
30 changes: 29 additions & 1 deletion test/unit/lib/plugins/aws/package/compile/events/stream.test.js
Expand Up @@ -1030,6 +1030,8 @@ describe('AwsCompileStreamEvents', () => {
stream: {
arn: 'arn:aws:kinesis:region:account:stream/abc',
consumer: true,
startingPosition: 'AT_TIMESTAMP',
startingPositionTimestamp: 123,
},
},
{
Expand Down Expand Up @@ -1252,7 +1254,11 @@ describe('AwsCompileStreamEvents', () => {
expect(
awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate
.Resources.FirstEventSourceMappingKinesisAbc.Properties.StartingPosition
).to.equal('TRIM_HORIZON');
).to.equal('AT_TIMESTAMP');
expect(
awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate
.Resources.FirstEventSourceMappingKinesisAbc.Properties.StartingPositionTimestamp
).to.equal(123);
expect(
awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate
.Resources.FirstEventSourceMappingKinesisAbc.Properties.Enabled
Expand Down Expand Up @@ -1421,6 +1427,28 @@ describe('AwsCompileStreamEvents', () => {
.Resources.IamRoleLambdaExecution.Properties.Policies[0].PolicyDocument.Statement
).to.deep.equal(iamRoleStatements);
});

it('should fail to compile EventSourceMapping resource properties for startingPosition AT_TIMESTAMP with no startingPositionTimestamp', () => {
expect(() => {
awsCompileStreamEvents.serverless.service.functions = {
first: {
events: [
{
stream: {
arn: 'arn:aws:kinesis:region:account:stream/abc',
consumer: true,
startingPosition: 'AT_TIMESTAMP',
},
},
],
},
};

awsCompileStreamEvents.compileStreamEvents();
}).to.throw(
'You must specify startingPositionTimestamp for function: first when startingPosition is AT_TIMESTAMP'
);
});
});

it('should remove all non-alphanumerics from stream names for the resource logical ids', () => {
Expand Down

0 comments on commit 5d41995

Please sign in to comment.