Skip to content

Commit

Permalink
feat: add validation bucket (#98)
Browse files Browse the repository at this point in the history
This pr refers to: #97

Add a new bucket for the validation.
Pickup store the file in a vavlidation bucket. The CAR is validated
then, if is valid, moved to the final Bucket.
  • Loading branch information
codeflyer committed Feb 14, 2023
1 parent e1d1f57 commit 800ab90
Show file tree
Hide file tree
Showing 17 changed files with 687 additions and 2,462 deletions.
2,562 changes: 254 additions & 2,308 deletions package-lock.json

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion pickup/index.js
Expand Up @@ -10,7 +10,8 @@ const {
BATCH_SIZE,
MAX_RETRY,
TIMEOUT_FETCH,
LOG_STATE_EVERY_SECONDS
LOG_STATE_EVERY_SECONDS,
VALIDATION_BUCKET
} = process.env

if (!IPFS_API_URL) throw new Error('IPFS_API_URL not found in ENV')
Expand All @@ -24,6 +25,7 @@ async function start () {
queueUrl: SQS_QUEUE_URL,
dynamoTable: DYNAMO_TABLE_NAME,
dynamoEndpoint: DYNAMO_DB_ENDPOINT || undefined,
validationBucket: VALIDATION_BUCKET || undefined,
batchSize: Number(BATCH_SIZE || 1),
maxRetry: Number(MAX_RETRY || 5),
timeoutFetchMs: Number(TIMEOUT_FETCH || 30) * 1000,
Expand Down
6 changes: 5 additions & 1 deletion pickup/lib/consumer.js
Expand Up @@ -47,6 +47,7 @@ export async function deleteMessage ({ sqs, queueUrl }, message) {
* fetch action do not respond while is downloading the blocks.
* @param {string} dynamoTable - The dynamo DB table
* @param {string} dynamoEndpoint - The dynamo DB endpoint
* @param {string} validationBucket - The s3 bucket for the validation, if exists replace the save bucket
* @param {DownloadStatusManager} downloadStatusManager
* @param {Number} downloadStatusLoggerSeconds - The interval in seconds for the download state
* @returns {Promise<Consumer>}
Expand All @@ -65,6 +66,7 @@ export async function createConsumer ({
timeoutFetchMs = 30000,
dynamoTable,
dynamoEndpoint,
validationBucket,
downloadStatusManager,
downloadStatusLoggerSeconds = 300 // logs every 5 minutes
}) {
Expand All @@ -82,7 +84,8 @@ export async function createConsumer ({
queueUrl,
handleMessageTimeout,
maxRetry,
timeoutFetchMs
timeoutFetchMs,
validationBucket
}, 'Create sqs consumer')

const app = Consumer.create({
Expand Down Expand Up @@ -110,6 +113,7 @@ export async function createConsumer ({
queueManager: app,
dynamo,
dynamoTable,
validationBucket,
timeoutFetchMs,
maxRetry,
downloadStatusManager
Expand Down
4 changes: 3 additions & 1 deletion pickup/lib/pickupBatch.js
Expand Up @@ -13,6 +13,7 @@ import { STATE_DONE, STATE_TIMEOUT, STATE_FAILED, STATE_QUEUED } from './downloa
* @param {Consumer} queueManager
* @param {import('@aws-sdk/lib-dynamodb'.DynamoDBClient)} dynamo
* @param {string} dynamoTable
* @param {string} validationBucket
* @param {number} timeoutFetchMs
* @param {number} maxRetry
* @param {DownloadStatusManager} downloadStatusManager
Expand All @@ -25,6 +26,7 @@ export async function pickupBatch (messages, {
queueManager,
dynamo,
dynamoTable,
validationBucket,
timeoutFetchMs,
maxRetry,
downloadStatusManager
Expand All @@ -39,7 +41,7 @@ export async function pickupBatch (messages, {
for (const message of messages) {
const { cid, origins, bucket, key, requestid } = JSON.parse(message.Body)
logger.trace({ cid, requestid }, 'Push message in job list')
jobs.push({ message, requestid, cid, upload: createS3Uploader({ bucket, key, client: s3 }) })
jobs.push({ message, requestid, cid, upload: createS3Uploader({ bucket: validationBucket ?? bucket, key, client: s3 }) })
allOrigins.concat(origins)
requestIds.push(requestid)
}
Expand Down
8 changes: 5 additions & 3 deletions pickup/test/pickup.test.js
Expand Up @@ -35,6 +35,7 @@ test('Process 3 messages concurrently and the last has a timeout', async t => {

const queueUrl = await createQueue()
const bucket = await createBucket()
const validationBucket = await createBucket()

// Preapre the data for the test
const cars = [
Expand Down Expand Up @@ -77,7 +78,8 @@ test('Process 3 messages concurrently and the last has a timeout', async t => {
dynamoEndpoint,
dynamoTable,
timeoutFetchMs: 2000,
downloadStatusManager: new DownloadStatusManager()
downloadStatusManager: new DownloadStatusManager(),
validationBucket
}
)

Expand All @@ -100,7 +102,7 @@ test('Process 3 messages concurrently and the last has a timeout', async t => {

consumer.on('message_processed', async msg => {
try {
await verifyMessage({ msg, cars, dynamoClient, dynamoTable, t, bucket, s3, expectedError: 'Download timeout' })
await verifyMessage({ msg, cars, dynamoClient, dynamoTable, t, bucket: validationBucket, s3, expectedError: 'Download timeout' })
resolved++

if (resolved === cars.length) {
Expand Down Expand Up @@ -221,7 +223,7 @@ test('Process 1 message that fails and returns in the list', async t => {
return done
})

test.only('Process 3 messages concurrently and the last has an error', async t => {
test('Process 3 messages concurrently and the last has an error', async t => {
t.timeout(1000 * 60)
const { createQueue, createBucket, ipfsApiUrl, sqs, s3, dynamoClient, dynamoEndpoint, dynamoTable } = t.context

Expand Down
8 changes: 4 additions & 4 deletions stacks/BasicApiStack.ts
Expand Up @@ -6,7 +6,7 @@ import * as apig from '@aws-cdk/aws-apigatewayv2-alpha'
export function BasicApiStack ({
app,
stack
}: StackContext): { queue: Queue, bucket: Bucket, dynamoDbTable: Table, updatePinQueue: Queue } {
}: StackContext): { queue: Queue, bucket: Bucket, dynamoDbTable: Table } {
const dlq = new Queue(stack, 'PinDlq')

const queue = new Queue(stack, 'Pin', {
Expand Down Expand Up @@ -59,9 +59,10 @@ export function BasicApiStack ({
}
}
})

const s3Topic = new Topic(stack, 'S3Events', {
subscribers: {
updatePinQueue: updatePinQueue
updatePinQueue
}
})

Expand Down Expand Up @@ -159,8 +160,7 @@ export function BasicApiStack ({
return {
queue,
bucket,
dynamoDbTable,
updatePinQueue
dynamoDbTable
}
}

Expand Down
84 changes: 68 additions & 16 deletions stacks/PickupStack.ts
@@ -1,4 +1,4 @@
import { StackContext, use, Queue, Bucket, Table } from '@serverless-stack/resources'
import { StackContext, use, Queue, Bucket, Table, Topic } from '@serverless-stack/resources'
import { BasicApiStack } from './BasicApiStack'
import { Cluster, ContainerImage, LogDrivers, Secret, FirelensLogRouterType, LogDriver } from 'aws-cdk-lib/aws-ecs'
import { Platform } from 'aws-cdk-lib/aws-ecr-assets'
Expand All @@ -12,13 +12,48 @@ type MutableQueueProcessingFargateServiceProps = { // The same object without re
}

export function PickupStack ({ app, stack }: StackContext): void {
const basicApi = use(BasicApiStack) as unknown as { queue: Queue, bucket: Bucket, dynamoDbTable: Table, updatePinQueue: Queue }
const basicApi = use(BasicApiStack) as unknown as { queue: Queue, bucket: Bucket, dynamoDbTable: Table }
const cluster = new Cluster(stack, 'ipfs', {
containerInsights: true
})
// Network calls to S3 and dynamodb through internal network
createVPCGateways(cluster.vpc)

const useValidation = process.env.USE_VALIDATION === 'VALIDATE'

let validationBucket
let validationPinQueue
if (useValidation) {
const validationPinDlq = new Queue(stack, 'ValidationPinDlq')
validationPinQueue = new Queue(stack, 'ValidationPinQueue', {
cdk: {
queue: {
deadLetterQueue: {
queue: validationPinDlq.cdk.queue,
maxReceiveCount: 2
}
}
}
})

const s3Topic = new Topic(stack, 'S3ValidationEvents', {
subscribers: {
validationPinQueue
}
})

validationBucket = new Bucket(stack, 'ValidationCar', {
notifications: {
topic: {
type: 'topic',
topic: s3Topic,
events: ['object_created']
}
}
})
validationBucket.cdk.bucket.enableEventBridgeNotification()
}

const baseServiceProps: MutableQueueProcessingFargateServiceProps & {
ephemeralStorageGiB: number
} = {
Expand All @@ -38,7 +73,8 @@ export function PickupStack ({ app, stack }: StackContext): void {
DYNAMO_TABLE_NAME: basicApi.dynamoDbTable.tableName,
BATCH_SIZE: process.env.BATCH_SIZE ?? '5',
TIMEOUT_FETCH: process.env.TIMEOUT_FETCH ?? '60',
MAX_RETRY: process.env.MAX_RETRY ?? '10'
MAX_RETRY: process.env.MAX_RETRY ?? '10',
VALIDATION_BUCKET: (validationBucket != null) ? validationBucket.bucketName : ''
},
queue: basicApi.queue.cdk.queue,
enableExecuteCommand: true,
Expand Down Expand Up @@ -114,8 +150,13 @@ export function PickupStack ({ app, stack }: StackContext): void {
}
})
basicApi.bucket.cdk.bucket.grantReadWrite(service.taskDefinition.taskRole)

basicApi.dynamoDbTable.cdk.table.grantReadWriteData(service.taskDefinition.taskRole)
basicApi.queue.cdk.queue.grantConsumeMessages(service.taskDefinition.taskRole)

if (validationBucket !== undefined) {
validationBucket.cdk.bucket.grantReadWrite(service.taskDefinition.taskRole)
}
} else {
const service = new QueueProcessingFargateService(stack, 'Service', {
...baseServiceProps,
Expand All @@ -141,9 +182,13 @@ export function PickupStack ({ app, stack }: StackContext): void {
basicApi.bucket.cdk.bucket.grantReadWrite(service.taskDefinition.taskRole)
basicApi.dynamoDbTable.cdk.table.grantReadWriteData(service.taskDefinition.taskRole)
basicApi.queue.cdk.queue.grantConsumeMessages(service.taskDefinition.taskRole)

if (validationBucket !== undefined) {
validationBucket.cdk.bucket.grantReadWrite(service.taskDefinition.taskRole)
}
}

if (process.env.USE_VALIDATION === 'VALIDATE') {
if (useValidation && validationPinQueue !== undefined) {
const productionParams: {
logDriver?: LogDriver
cpu?: number
Expand Down Expand Up @@ -188,26 +233,33 @@ export function PickupStack ({ app, stack }: StackContext): void {
memoryLimitMiB: 16 * 1024,
ephemeralStorageGiB: 30, // max 200
environment: {
SQS_QUEUE_URL: basicApi.updatePinQueue.queueUrl,
DYNAMO_TABLE_NAME: basicApi.dynamoDbTable.tableName
SQS_QUEUE_URL: validationPinQueue.queueUrl,
DYNAMO_TABLE_NAME: basicApi.dynamoDbTable.tableName,
DESTINATION_BUCKET: basicApi.bucket.bucketName
},
queue: basicApi.updatePinQueue.cdk.queue,
queue: validationPinQueue.cdk.queue,
enableExecuteCommand: true,
cluster,
...productionParams
})
basicApi.bucket.cdk.bucket.grantReadWrite(validationService.taskDefinition.taskRole)
basicApi.dynamoDbTable.cdk.table.grantReadWriteData(validationService.taskDefinition.taskRole)
basicApi.updatePinQueue.cdk.queue.grantConsumeMessages(validationService.taskDefinition.taskRole)
validationPinQueue.cdk.queue.grantConsumeMessages(validationService.taskDefinition.taskRole)

validationService.taskDefinition.taskRole.addManagedPolicy(ManagedPolicy.fromAwsManagedPolicyName('AmazonSSMReadOnlyAccess'))
// configure the custom image to log router
validationService.taskDefinition.addFirelensLogRouter('log-router', {
firelensConfig: {
type: FirelensLogRouterType.FLUENTBIT
},
image: ContainerImage.fromRegistry('grafana/fluent-bit-plugin-loki:1.6.0-amd64')
})
if (validationBucket !== undefined) {
validationBucket.cdk.bucket.grantReadWrite(validationService.taskDefinition.taskRole)
}

if (app.stage === 'prod' || app.stage === 'staging') {
validationService.taskDefinition.taskRole.addManagedPolicy(ManagedPolicy.fromAwsManagedPolicyName('AmazonSSMReadOnlyAccess'))
// configure the custom image to log router
validationService.taskDefinition.addFirelensLogRouter('log-router', {
firelensConfig: {
type: FirelensLogRouterType.FLUENTBIT
},
image: ContainerImage.fromRegistry('grafana/fluent-bit-plugin-loki:1.6.0-amd64')
})
}
}
}

Expand Down
13 changes: 12 additions & 1 deletion validator/index.js
@@ -1,10 +1,20 @@
import { createConsumer } from './lib/consumer.js'
import { logger } from './lib/logger.js'

const { IPFS_API_URL, SQS_QUEUE_URL, DYNAMO_TABLE_NAME, DYNAMO_DB_ENDPOINT, BATCH_SIZE, MAX_RETRY, TIMEOUT_FETCH } = process.env
const {
IPFS_API_URL,
SQS_QUEUE_URL,
DYNAMO_TABLE_NAME,
DYNAMO_DB_ENDPOINT,
BATCH_SIZE,
MAX_RETRY,
TIMEOUT_FETCH,
DESTINATION_BUCKET
} = process.env

if (!SQS_QUEUE_URL) throw new Error('SQS_QUEUE_URL not found in ENV')
if (!DYNAMO_TABLE_NAME) throw new Error('DYNAMO_TABLE_NAME not found in ENV')
if (!DESTINATION_BUCKET) throw new Error('VALIDATION_BUCKET not found in ENV')

async function start () {
logger.info({}, 'Pickup starting...')
Expand All @@ -13,6 +23,7 @@ async function start () {
queueUrl: SQS_QUEUE_URL,
dynamoTable: DYNAMO_TABLE_NAME,
dynamoEndpoint: DYNAMO_DB_ENDPOINT || undefined,
destinationBucket: DESTINATION_BUCKET,
batchSize: Number(BATCH_SIZE || 1),
maxRetry: Number(MAX_RETRY || 5),
timeoutFetchMs: Number(TIMEOUT_FETCH || 30) * 1000
Expand Down

0 comments on commit 800ab90

Please sign in to comment.