From 959cc1636b5d38fe8ea2dcedc21eb7f812057e76 Mon Sep 17 00:00:00 2001 From: overbit <2861984+overbit@users.noreply.github.com> Date: Thu, 27 Oct 2022 21:03:25 +0100 Subject: [PATCH 1/2] feat: add support for StartingPositionTimestamp for kinesis and s3 streams --- .../aws/package/compile/events/stream.js | 20 ++++++++++++- .../aws/package/compile/events/stream.test.js | 30 ++++++++++++++++++- 2 files changed, 48 insertions(+), 2 deletions(-) diff --git a/lib/plugins/aws/package/compile/events/stream.js b/lib/plugins/aws/package/compile/events/stream.js index 876b4b700b8..129997eedb6 100644 --- a/lib/plugins/aws/package/compile/events/stream.js +++ b/lib/plugins/aws/package/compile/events/stream.js @@ -2,6 +2,7 @@ const _ = require('lodash'); const resolveLambdaTarget = require('../../../utils/resolve-lambda-target'); +const ServerlessError = require('../../../../../serverless-error'); class AwsCompileStreamEvents { constructor(serverless) { @@ -23,7 +24,8 @@ class AwsCompileStreamEvents { type: { enum: ['dynamodb', 'kinesis'] }, batchSize: { type: 'integer', minimum: 1, maximum: 10000 }, parallelizationFactor: { type: 'integer', minimum: 1, maximum: 10 }, - startingPosition: { enum: ['LATEST', 'TRIM_HORIZON'] }, + startingPosition: { enum: ['LATEST', 'TRIM_HORIZON', 'AT_TIMESTAMP'] }, + startingPositionTimestamp: { type: 'number' }, enabled: { type: 'boolean' }, consumer: { anyOf: [{ const: true }, { $ref: '#/definitions/awsArn' }] }, batchWindow: { type: 'integer', minimum: 0, maximum: 300 }, @@ -157,6 +159,7 @@ class AwsCompileStreamEvents { ParallelizationFactor = event.stream.parallelizationFactor; } StartingPosition = event.stream.startingPosition || StartingPosition; + if (typeof event.stream.enabled !== 'undefined') { Enabled = event.stream.enabled; } @@ -317,6 +320,21 @@ class AwsCompileStreamEvents { streamResource.Properties.EventSourceArn = consumerArn; kinesisConsumerStatement.Resource.push(consumerArn); } + + if ( + event.stream.startingPosition === 'AT_TIMESTAMP' && + !event.stream.startingPositionTimestamp + ) { + throw new ServerlessError( + `You must specify startingPositionTimestamp for function: ${functionName} when startingPosition is AT_TIMESTAMP`, + 'FUNCTION_STREAM_STARTING_POSITION_TIMESTAMP_INVALID' + ); + } + + if (event.stream.startingPositionTimestamp) { + streamResource.Properties.StartingPositionTimestamp = + event.stream.startingPositionTimestamp; + } } _.merge( diff --git a/test/unit/lib/plugins/aws/package/compile/events/stream.test.js b/test/unit/lib/plugins/aws/package/compile/events/stream.test.js index 0cad1d43c34..a3cbb9c06e3 100644 --- a/test/unit/lib/plugins/aws/package/compile/events/stream.test.js +++ b/test/unit/lib/plugins/aws/package/compile/events/stream.test.js @@ -1030,6 +1030,8 @@ describe('AwsCompileStreamEvents', () => { stream: { arn: 'arn:aws:kinesis:region:account:stream/abc', consumer: true, + startingPosition: 'AT_TIMESTAMP', + startingPositionTimestamp: 123, }, }, { @@ -1252,7 +1254,11 @@ describe('AwsCompileStreamEvents', () => { expect( awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate .Resources.FirstEventSourceMappingKinesisAbc.Properties.StartingPosition - ).to.equal('TRIM_HORIZON'); + ).to.equal('AT_TIMESTAMP'); + expect( + awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate + .Resources.FirstEventSourceMappingKinesisAbc.Properties.StartingPositionTimestamp + ).to.equal(123); expect( awsCompileStreamEvents.serverless.service.provider.compiledCloudFormationTemplate .Resources.FirstEventSourceMappingKinesisAbc.Properties.Enabled @@ -1421,6 +1427,28 @@ describe('AwsCompileStreamEvents', () => { .Resources.IamRoleLambdaExecution.Properties.Policies[0].PolicyDocument.Statement ).to.deep.equal(iamRoleStatements); }); + + it('should fail to compile EventSourceMapping resource properties for startingPosition AT_TIMESTAMP with no startingPositionTimestamp', () => { + expect(() => { + awsCompileStreamEvents.serverless.service.functions = { + first: { + events: [ + { + stream: { + arn: 'arn:aws:kinesis:region:account:stream/abc', + consumer: true, + startingPosition: 'AT_TIMESTAMP', + }, + }, + ], + }, + }; + + awsCompileStreamEvents.compileStreamEvents(); + }).to.throw( + 'You must specify startingPositionTimestamp for function: first when startingPosition is AT_TIMESTAMP' + ); + }); }); it('should remove all non-alphanumerics from stream names for the resource logical ids', () => { From 5422225b50b233969d6584c22b52053dca4e79e8 Mon Sep 17 00:00:00 2001 From: overbit <2861984+overbit@users.noreply.github.com> Date: Thu, 27 Oct 2022 22:07:53 +0100 Subject: [PATCH 2/2] doc: add documentation --- docs/providers/aws/events/streams.md | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/docs/providers/aws/events/streams.md b/docs/providers/aws/events/streams.md index 1a53ba41660..6e11dc2c236 100644 --- a/docs/providers/aws/events/streams.md +++ b/docs/providers/aws/events/streams.md @@ -90,6 +90,24 @@ functions: enabled: false ``` +### Setting the Kinesis StartingPosition + +This configuration sets up a disabled Kinesis stream event for the `preprocess` function. The starting position is +`AT_TIMESTAMP` and the timestamp value is `1000000001`. + +```yml +functions: + preprocess: + handler: handler.preprocess + events: + - stream: + arn: arn:aws:kinesis:region:XXXXXX:stream/foo + startingPosition: AT_TIMESTAMP + startingPositionTimestamp: 1000000001 + maximumRetryAttempts: 10 + enabled: false +``` + ## Setting the BatchWindow The configuration below sets up a Kinesis stream event for the `preprocess` function which has a batch window of `10`.