Skip to content

Commit

Permalink
feat: cas scaling ph 2 (s3 batching)
Browse files Browse the repository at this point in the history
  • Loading branch information
smrz2001 committed May 20, 2024
1 parent 0a2d4f3 commit fdf0c16
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 2 deletions.
2 changes: 2 additions & 0 deletions config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@
"type": "sqs",
"awsRegion": "us-east-1",
"sqsQueueUrl": "",
"s3BucketName": "myS3Bucket",
"s3Endpoint": "",
"maxTimeToHoldMessageSec": 21600,
"waitTimeForMessageSec": 0
}
Expand Down
2 changes: 2 additions & 0 deletions config/env/dev.json
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@
"type": "sqs",
"awsRegion": "@@AWS_REGION",
"sqsQueueUrl": "@@SQS_QUEUE_URL",
"s3BucketName": "@@S3_BUCKET_NAME",
"s3Endpoint": "@@S3_ENDPOINT",
"maxTimeToHoldMessageSec": "@@MAX_TIME_TO_HOLD_MESSAGE_SEC",
"waitTimeForMessageSec": "@@WAIT_TIME_FOR_MESSAGE_SEC"
}
Expand Down
2 changes: 2 additions & 0 deletions config/env/prod.json
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@
"type": "sqs",
"awsRegion": "@@AWS_REGION",
"sqsQueueUrl": "@@SQS_QUEUE_URL",
"s3BucketName": "@@S3_BUCKET_NAME",
"s3Endpoint": "@@S3_ENDPOINT",
"maxTimeToHoldMessageSec": "@@MAX_TIME_TO_HOLD_MESSAGE_SEC",
"waitTimeForMessageSec": "@@WAIT_TIME_FOR_MESSAGE_SEC"
}
Expand Down
1 change: 1 addition & 0 deletions config/env/test.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
"type": "sqs",
"awsRegion": "us-east-1",
"sqsQueueUrl": "",
"s3BucketName": "ceramic-tnet-cas",
"maxTimeToHoldMessageSec": 10800,
"waitTimeForMessageSec": 10
}
Expand Down
2 changes: 1 addition & 1 deletion src/services/anchor-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ export class AnchorService {
}

try {
logger.imp('Anchoring requests from the batch')
logger.imp(`Anchoring ${batchMessage.data.rids.length} requests from batch ${batchMessage.data.bid}`)
const requests = await this.requestRepository.findByIds(batchMessage.data.rids)

const requestsNotReplaced = requests.filter(
Expand Down
72 changes: 71 additions & 1 deletion src/services/queue/sqs-queue-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import {
ChangeMessageVisibilityCommand,
SendMessageCommand,
} from '@aws-sdk/client-sqs'
import AWSSDK from 'aws-sdk'
import LevelUp from 'levelup'
import S3LevelDOWN from 's3leveldown'
import { IpfsPubSubPublishQMessage, QueueMessageData } from '../../models/queue-message.js'
import {
IQueueConsumerService,
Expand All @@ -19,6 +22,8 @@ import { AbortOptions } from '@ceramicnetwork/common'

const DEFAULT_MAX_TIME_TO_HOLD_MESSAGES_S = 21600
const DEFAULT_WAIT_TIME_FOR_MESSAGE_S = 10
const BATCH_STORE_PATH = '/cas/anchor/batch'

/**
* Sqs Queue Message received by consumers.
* Once the message is done processing you can either "ack" the message (remove the message from the queue) or "nack" the message (put the message back on the queue)
Expand Down Expand Up @@ -60,6 +65,28 @@ export class SqsQueueMessage<TValue extends QueueMessageData> implements IQueueM
}
}

// This wrapper around SqsQueueMessage is used to handle the case where the list of batch request IDs is empty and must
// be fetched from S3. The underlying SqsQueueMessage remains the same (and is what is used for n/acking the message),
// but the data is updated to include the batch request IDs.
export class BatchQueueMessage implements IQueueMessage<AnchorBatchQMessage> {
readonly data: AnchorBatchQMessage

constructor(
private readonly anchorBatchMessage: IQueueMessage<AnchorBatchQMessage>,
batchJson: any
) {
this.data = decode(AnchorBatchQMessage, batchJson)
}

async ack(): Promise<void> {
await this.anchorBatchMessage.ack()
}

async nack(): Promise<void> {
await this.anchorBatchMessage.nack()
}
}

/**
* Consumer and Producer for Sqs Queues
*/
Expand Down Expand Up @@ -149,10 +176,53 @@ export class ValidationSqsQueueService extends SqsQueueService<RequestQMessage>
* AnchorBatchSqsQueueService is used to consume and publish anchor batch messages. These batches are anchored by anchor workers
*/
export class AnchorBatchSqsQueueService extends SqsQueueService<AnchorBatchQMessage> {
constructor(config: Config) {
constructor(
config: Config,
private s3StorePath = config.queue.s3BucketName + BATCH_STORE_PATH,
private s3Endpoint = config.queue.s3Endpoint ? config.queue.s3Endpoint : undefined,
private _s3store?: LevelUp.LevelUp
) {
const queueUrl = config.queue.sqsQueueUrl + 'batch'
super(config, queueUrl, AnchorBatchQMessage)
}

/**
* `new LevelUp` attempts to open a database, which leads to a request to AWS.
* Let's make initialization lazy.
*/
get s3store(): LevelUp.LevelUp {
if (!this._s3store) {
const levelDown = this.s3Endpoint
? new S3LevelDOWN(
this.s3StorePath,
new AWSSDK.S3({
endpoint: this.s3Endpoint,
s3ForcePathStyle: true,
})
)
: new S3LevelDOWN(this.s3StorePath)

this._s3store = new LevelUp(levelDown)
}
return this._s3store
}

override async receiveMessage(
abortOptions?: AbortOptions
): Promise<IQueueMessage<AnchorBatchQMessage> | undefined> {
const anchorBatchMessage: IQueueMessage<AnchorBatchQMessage> | undefined =
await super.receiveMessage(abortOptions)
// If the list of batch request IDs is empty, we need to fetch the full batch from S3.
if (anchorBatchMessage && anchorBatchMessage.data.rids.length === 0) {
try {
const batchJson = await this.s3store.get(anchorBatchMessage.data.bid)
return new BatchQueueMessage(anchorBatchMessage, JSON.parse(batchJson))
} catch (err: any) {
throw Error(`Error retrieving batch ${anchorBatchMessage.data.bid} from S3: ${err.message}`)
}
}
return anchorBatchMessage
}
}

/**
Expand Down

0 comments on commit fdf0c16

Please sign in to comment.