Skip to content

Commit

Permalink
feat: cas scaling
Browse files Browse the repository at this point in the history
  • Loading branch information
smrz2001 committed May 19, 2024
1 parent 0a2d4f3 commit 37f6a4e
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 16 deletions.
2 changes: 2 additions & 0 deletions config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@
"type": "sqs",
"awsRegion": "us-east-1",
"sqsQueueUrl": "",
"s3BucketName": "myS3Bucket",
"s3Endpoint": "",
"maxTimeToHoldMessageSec": 21600,
"waitTimeForMessageSec": 0
}
Expand Down
2 changes: 2 additions & 0 deletions config/env/dev.json
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@
"type": "sqs",
"awsRegion": "@@AWS_REGION",
"sqsQueueUrl": "@@SQS_QUEUE_URL",
"s3BucketName": "@@S3_BUCKET_NAME",
"s3Endpoint": "@@S3_ENDPOINT",
"maxTimeToHoldMessageSec": "@@MAX_TIME_TO_HOLD_MESSAGE_SEC",
"waitTimeForMessageSec": "@@WAIT_TIME_FOR_MESSAGE_SEC"
}
Expand Down
2 changes: 2 additions & 0 deletions config/env/prod.json
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@
"type": "sqs",
"awsRegion": "@@AWS_REGION",
"sqsQueueUrl": "@@SQS_QUEUE_URL",
"s3BucketName": "@@S3_BUCKET_NAME",
"s3Endpoint": "@@S3_ENDPOINT",
"maxTimeToHoldMessageSec": "@@MAX_TIME_TO_HOLD_MESSAGE_SEC",
"waitTimeForMessageSec": "@@WAIT_TIME_FOR_MESSAGE_SEC"
}
Expand Down
1 change: 1 addition & 0 deletions config/env/test.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
"type": "sqs",
"awsRegion": "us-east-1",
"sqsQueueUrl": "",
"s3BucketName": "ceramic-tnet-cas",
"maxTimeToHoldMessageSec": 10800,
"waitTimeForMessageSec": 10
}
Expand Down
16 changes: 11 additions & 5 deletions src/repositories/anchor-repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { parseCountResult } from './parse-count-result.util.js'
import { decode } from 'codeco'

const TABLE_NAME = 'anchor'
const SQL_ARGS_CHUNK_SIZE = 10000

export class AnchorRepository implements IAnchorRepository {
static inject = ['dbConnection'] as const
Expand All @@ -32,11 +33,16 @@ export class AnchorRepository implements IAnchorRepository {
* @returns A promise that resolve to the number of anchors created
*/
async createAnchors(anchors: Array<FreshAnchor>): Promise<number> {
const result: any = await this.table
.insert(anchors.map((anchor) => FreshAnchor.encode(anchor)))
.onConflict('requestId')
.ignore()
return parseCountResult(result.rowCount)
const result =
(await this.connection
.batchInsert(
TABLE_NAME,
anchors.map((anchor) => FreshAnchor.encode(anchor)),
SQL_ARGS_CHUNK_SIZE)
.catch(function (error) {
console.error(error)
})) ?? []
return parseCountResult(result.length)
}

/**
Expand Down
25 changes: 16 additions & 9 deletions src/repositories/request-repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ export const FAILURE_RETRY_WINDOW = 1000 * 60 * 60 * 48 // 48H
export const FAILURE_RETRY_INTERVAL = 1000 * 60 * 60 * 6 // 6H

const TABLE_NAME = 'request'
const SQL_ARGS_CHUNK_SIZE = 10000

/**
* Records statistics about the set of requests
Expand Down Expand Up @@ -174,15 +175,21 @@ export class RequestRepository {
*/
async updateRequests(fields: RequestUpdateFields, requests: Request[]): Promise<number> {
const updatedAt = new Date()
const ids = requests.map((r) => r.id)
const result = await this.table
.update({
message: fields.message,
status: fields.status,
pinned: fields.pinned,
updatedAt: date.encode(updatedAt),
})
.whereIn('id', ids)
let result = 0

for (let i = 0; i < requests.length; i += SQL_ARGS_CHUNK_SIZE) {
const chunk = requests.slice(i, i + SQL_ARGS_CHUNK_SIZE)
const ids = chunk.map((r) => r.id)

result += await this.table
.update({
message: fields.message,
status: fields.status,
pinned: fields.pinned,
updatedAt: date.encode(updatedAt),
})
.whereIn('id', ids)
}

requests.map((request) => {
logEvent.db({
Expand Down
2 changes: 1 addition & 1 deletion src/services/anchor-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ export class AnchorService {
logger.debug('Creating IPFS anchor proof')
const ipfsProofCid = this._createIPFSProof(merkleTree.car, tx, merkleTree.root.data.cid)

// Create anchor records on IPFS
// Create anchor records
logger.debug('Creating anchor commits')
const anchors = await this._createAnchorCommits(ipfsProofCid, merkleTree)

Expand Down
72 changes: 71 additions & 1 deletion src/services/queue/sqs-queue-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import {
ChangeMessageVisibilityCommand,
SendMessageCommand,
} from '@aws-sdk/client-sqs'
import AWSSDK from 'aws-sdk'
import LevelUp from 'levelup'
import S3LevelDOWN from 's3leveldown'
import { IpfsPubSubPublishQMessage, QueueMessageData } from '../../models/queue-message.js'
import {
IQueueConsumerService,
Expand All @@ -19,6 +22,8 @@ import { AbortOptions } from '@ceramicnetwork/common'

const DEFAULT_MAX_TIME_TO_HOLD_MESSAGES_S = 21600
const DEFAULT_WAIT_TIME_FOR_MESSAGE_S = 10
const BATCH_STORE_PATH = '/cas/anchor/batch'

/**
* Sqs Queue Message received by consumers.
* Once the message is done processing you can either "ack" the message (remove the message from the queue) or "nack" the message (put the message back on the queue)
Expand Down Expand Up @@ -60,6 +65,28 @@ export class SqsQueueMessage<TValue extends QueueMessageData> implements IQueueM
}
}

// This wrapper around SqsQueueMessage is used to handle the case where the list of batch request IDs is empty and must
// be fetched from S3. The underlying SqsQueueMessage remains the same (and is what is used for n/acking the message),
// but the data is updated to include the batch request IDs.
export class BatchQueueMessage implements IQueueMessage<AnchorBatchQMessage> {
readonly data: AnchorBatchQMessage

constructor(
private readonly anchorBatchMessage: IQueueMessage<AnchorBatchQMessage>,
batchJson: any
) {
this.data = decode(AnchorBatchQMessage, batchJson)
}

async ack(): Promise<void> {
await this.anchorBatchMessage.ack()
}

async nack(): Promise<void> {
await this.anchorBatchMessage.nack()
}
}

/**
* Consumer and Producer for Sqs Queues
*/
Expand Down Expand Up @@ -149,10 +176,53 @@ export class ValidationSqsQueueService extends SqsQueueService<RequestQMessage>
* AnchorBatchSqsQueueService is used to consume and publish anchor batch messages. These batches are anchored by anchor workers
*/
export class AnchorBatchSqsQueueService extends SqsQueueService<AnchorBatchQMessage> {
constructor(config: Config) {
constructor(
config: Config,
private s3StorePath = config.queue.s3BucketName + BATCH_STORE_PATH,
private s3Endpoint = config.queue.s3Endpoint ? config.queue.s3Endpoint : undefined,
private _s3store?: LevelUp.LevelUp
) {
const queueUrl = config.queue.sqsQueueUrl + 'batch'
super(config, queueUrl, AnchorBatchQMessage)
}

/**
* `new LevelUp` attempts to open a database, which leads to a request to AWS.
* Let's make initialization lazy.
*/
get s3store(): LevelUp.LevelUp {
if (!this._s3store) {
const levelDown = this.s3Endpoint
? new S3LevelDOWN(
this.s3StorePath,
new AWSSDK.S3({
endpoint: this.s3Endpoint,
s3ForcePathStyle: true,
})
)
: new S3LevelDOWN(this.s3StorePath)

this._s3store = new LevelUp(levelDown)
}
return this._s3store
}

override async receiveMessage(
abortOptions?: AbortOptions
): Promise<IQueueMessage<AnchorBatchQMessage> | undefined> {
const anchorBatchMessage: IQueueMessage<AnchorBatchQMessage> | undefined =
await super.receiveMessage(abortOptions)
// If the list of batch request IDs is empty, we need to fetch the full batch from S3.
if (anchorBatchMessage && anchorBatchMessage.data.rids.length === 0) {
try {
const batchJson = await this.s3store.get(anchorBatchMessage.data.bid)
return new BatchQueueMessage(anchorBatchMessage, JSON.parse(batchJson))
} catch (err: any) {
throw Error(`Error retrieving batch ${anchorBatchMessage.data.bid} from S3: ${err.message}`)
}
}
return anchorBatchMessage
}
}

/**
Expand Down

0 comments on commit 37f6a4e

Please sign in to comment.