diff --git a/docs/providers/aws/events/streams.md b/docs/providers/aws/events/streams.md index 1a53ba41660..6e11dc2c236 100644 --- a/docs/providers/aws/events/streams.md +++ b/docs/providers/aws/events/streams.md @@ -90,6 +90,24 @@ functions: enabled: false ``` +### Setting the Kinesis StartingPosition + +This configuration sets up a disabled Kinesis stream event for the `preprocess` function. The starting position is +`AT_TIMESTAMP` and the timestamp value is `1000000001`. + +```yml +functions: + preprocess: + handler: handler.preprocess + events: + - stream: + arn: arn:aws:kinesis:region:XXXXXX:stream/foo + startingPosition: AT_TIMESTAMP + startingPositionTimestamp: 1000000001 + maximumRetryAttempts: 10 + enabled: false +``` + ## Setting the BatchWindow The configuration below sets up a Kinesis stream event for the `preprocess` function which has a batch window of `10`. diff --git a/lib/plugins/aws/package/compile/events/stream.js b/lib/plugins/aws/package/compile/events/stream.js index 876b4b700b8..129997eedb6 100644 --- a/lib/plugins/aws/package/compile/events/stream.js +++ b/lib/plugins/aws/package/compile/events/stream.js @@ -2,6 +2,7 @@ const _ = require('lodash'); const resolveLambdaTarget = require('../../../utils/resolve-lambda-target'); +const ServerlessError = require('../../../../../serverless-error'); class AwsCompileStreamEvents { constructor(serverless) { @@ -23,7 +24,8 @@ class AwsCompileStreamEvents { type: { enum: ['dynamodb', 'kinesis'] }, batchSize: { type: 'integer', minimum: 1, maximum: 10000 }, parallelizationFactor: { type: 'integer', minimum: 1, maximum: 10 }, - startingPosition: { enum: ['LATEST', 'TRIM_HORIZON'] }, + startingPosition: { enum: ['LATEST', 'TRIM_HORIZON', 'AT_TIMESTAMP'] }, + startingPositionTimestamp: { type: 'number' }, enabled: { type: 'boolean' }, consumer: { anyOf: [{ const: true }, { $ref: '#/definitions/awsArn' }] }, batchWindow: { type: 'integer', minimum: 0, maximum: 300 }, @@ -157,6 +159,7 @@ class AwsCompileStreamEvents { ParallelizationFactor = event.stream.parallelizationFactor; } StartingPosition = event.stream.startingPosition || StartingPosition; + if (typeof event.stream.enabled !== 'undefined') { Enabled = event.stream.enabled; } @@ -317,6 +320,21 @@ class AwsCompileStreamEvents { streamResource.Properties.EventSourceArn = consumerArn; kinesisConsumerStatement.Resource.push(consumerArn); } + + if ( + event.stream.startingPosition === 'AT_TIMESTAMP' && + !event.stream.startingPositionTimestamp + ) { + throw new ServerlessError( + `You must specify startingPositionTimestamp for function: ${functionName} when startingPosition is AT_TIMESTAMP`, + 'FUNCTION_STREAM_STARTING_POSITION_TIMESTAMP_INVALID' + ); + } + + if (event.stream.startingPositionTimestamp) { + streamResource.Properties.StartingPositionTimestamp = + event.stream.startingPositionTimestamp; + } } _.merge( diff --git a/test/unit/lib/plugins/aws/package/compile/events/stream.test.js b/test/unit/lib/plugins/aws/package/compile/events/stream.test.js index 0cad1d43c34..a3cbb9c06e3 100644 --- a/test/unit/lib/plugins/aws/package/compile/events/stream.test.js +++ b/test/unit/lib/plugins/aws/package/compile/events/stream.test.js @@ -1030,6 +1030,8 @@ describe('AwsCompileStreamEvents', () => { stream: { arn: 'arn:aws:kinesis:region:account:stream/abc', consumer: true, + startingPosition: 'AT_TIMESTAMP', + startingPositionTimestamp: 123, }, }, { @@ -1252,7 +1254,11 @@ describe('AwsCompileStreamEvents', () => { expect( awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate .Resources.FirstEventSourceMappingKinesisAbc.Properties.StartingPosition - ).to.equal('TRIM_HORIZON'); + ).to.equal('AT_TIMESTAMP'); + expect( + awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate + .Resources.FirstEventSourceMappingKinesisAbc.Properties.StartingPositionTimestamp + ).to.equal(123); expect( awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate .Resources.FirstEventSourceMappingKinesisAbc.Properties.Enabled @@ -1421,6 +1427,28 @@ describe('AwsCompileStreamEvents', () => { .Resources.IamRoleLambdaExecution.Properties.Policies[0].PolicyDocument.Statement ).to.deep.equal(iamRoleStatements); }); + + it('should fail to compile EventSourceMapping resource properties for startingPosition AT_TIMESTAMP with no startingPositionTimestamp', () => { + expect(() => { + awsCompileStreamEvents.serverless.service.functions = { + first: { + events: [ + { + stream: { + arn: 'arn:aws:kinesis:region:account:stream/abc', + consumer: true, + startingPosition: 'AT_TIMESTAMP', + }, + }, + ], + }, + }; + + awsCompileStreamEvents.compileStreamEvents(); + }).to.throw( + 'You must specify startingPositionTimestamp for function: first when startingPosition is AT_TIMESTAMP' + ); + }); }); it('should remove all non-alphanumerics from stream names for the resource logical ids', () => {