diff --git a/internal-tooling/performApplicationPerformanceTest.ts b/internal-tooling/performApplicationPerformanceTest.ts new file mode 100644 index 000000000..3ece2633e --- /dev/null +++ b/internal-tooling/performApplicationPerformanceTest.ts @@ -0,0 +1,191 @@ +/*! + * Copyright 2022 Google LLC. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import yargs from 'yargs'; +import {promises as fsp, rmSync} from 'fs'; +import {Bucket, DownloadOptions, DownloadResponse, UploadOptions} from '../src'; +import {performance} from 'perf_hooks'; +// eslint-disable-next-line node/no-unsupported-features/node-builtins +import {parentPort} from 'worker_threads'; +import { + BLOCK_SIZE_IN_BYTES, + DEFAULT_PROJECT_ID, + DEFAULT_NUMBER_OF_OBJECTS, + DEFAULT_SMALL_FILE_SIZE_BYTES, + DEFAULT_LARGE_FILE_SIZE_BYTES, + NODE_DEFAULT_HIGHWATER_MARK_BYTES, + generateRandomDirectoryStructure, + getValidationType, + performanceTestSetup, + TestResult, +} from './performanceUtils'; +import {TRANSFER_MANAGER_TEST_TYPES} from './performanceTest'; + +const TEST_NAME_STRING = 'nodejs-perf-metrics-application'; +const DEFAULT_BUCKET_NAME = 'nodejs-perf-metrics-shaffeeullah'; + +let bucket: Bucket; + +const checkType = getValidationType(); + +const argv = yargs(process.argv.slice(2)) + .options({ + bucket: {type: 'string', default: DEFAULT_BUCKET_NAME}, + small: {type: 'number', default: DEFAULT_SMALL_FILE_SIZE_BYTES}, + large: {type: 'number', default: DEFAULT_LARGE_FILE_SIZE_BYTES}, + projectid: {type: 'string', default: DEFAULT_PROJECT_ID}, + numobjects: {type: 'number', default: DEFAULT_NUMBER_OF_OBJECTS}, + }) + .parseSync(); + +/** + * Main entry point. This function performs a test iteration and posts the message back + * to the parent thread. + */ +async function main() { + let result: TestResult = { + op: '', + objectSize: 0, + appBufferSize: 0, + libBufferSize: 0, + crc32Enabled: false, + md5Enabled: false, + apiName: 'JSON', + elapsedTimeUs: 0, + cpuTimeUs: 0, + status: '[OK]', + }; + + ({bucket} = await performanceTestSetup(argv.projectid, argv.bucket)); + + switch (argv.testtype) { + case TRANSFER_MANAGER_TEST_TYPES.APPLICATION_UPLOAD_MULTIPLE_OBJECTS: + result = await performWriteTest(); + break; + case TRANSFER_MANAGER_TEST_TYPES.APPLICATION_DOWNLOAD_MULTIPLE_OBJECTS: + result = await performReadTest(); + break; + // case TRANSFER_MANAGER_TEST_TYPES.APPLICATION_LARGE_FILE_DOWNLOAD: + // result = await performLargeReadTest(); + // break; + default: + break; + } + parentPort?.postMessage(result); +} + +async function uploadInParallel( + bucket: Bucket, + paths: string[], + options: UploadOptions +) { + const promises = []; + for (const index in paths) { + const path = paths[index]; + const stat = await fsp.lstat(path); + if (stat.isDirectory()) { + continue; + } + options.destination = path; + promises.push(bucket.upload(path, options)); + } + await Promise.all(promises).catch(console.error); +} + +async function downloadInParallel(bucket: Bucket, options: DownloadOptions) { + const promises: Promise[] = []; + const [files] = await bucket.getFiles(); + files.forEach(file => { + promises.push(file.download(options)); + }); + await Promise.all(promises).catch(console.error); +} + +/** + * Performs an iteration of the Write multiple objects test. + * + * @returns {Promise} Promise that resolves to a test result of an iteration. + */ +async function performWriteTest(): Promise { + await bucket.deleteFiles(); //start clean + + const creationInfo = generateRandomDirectoryStructure( + argv.numobjects, + TEST_NAME_STRING, + argv.small, + argv.large + ); + + const start = performance.now(); + await uploadInParallel(bucket, creationInfo.paths, {validation: checkType}); + const end = performance.now(); + + await bucket.deleteFiles(); //cleanup files + rmSync(TEST_NAME_STRING, {recursive: true, force: true}); + + const result: TestResult = { + op: 'WRITE', + objectSize: creationInfo.totalSizeInBytes, + appBufferSize: BLOCK_SIZE_IN_BYTES, + libBufferSize: NODE_DEFAULT_HIGHWATER_MARK_BYTES, + crc32Enabled: checkType === 'crc32c', + md5Enabled: checkType === 'md5', + apiName: 'JSON', + elapsedTimeUs: Math.round((end - start) * 1000), + cpuTimeUs: -1, + status: '[OK]', + }; + return result; +} + +/** + * Performs an iteration of the read multiple objects test. + * + * @returns {Promise} Promise that resolves to an array of test results for the iteration. + */ +async function performReadTest(): Promise { + await bucket.deleteFiles(); // start clean + const creationInfo = generateRandomDirectoryStructure( + argv.numobjects, + TEST_NAME_STRING, + argv.small, + argv.large + ); + await uploadInParallel(bucket, creationInfo.paths, {validation: checkType}); + + const start = performance.now(); + await downloadInParallel(bucket, {validation: checkType}); + const end = performance.now(); + + const result: TestResult = { + op: 'READ', + objectSize: creationInfo.totalSizeInBytes, + appBufferSize: BLOCK_SIZE_IN_BYTES, + libBufferSize: NODE_DEFAULT_HIGHWATER_MARK_BYTES, + crc32Enabled: checkType === 'crc32c', + md5Enabled: checkType === 'md5', + apiName: 'JSON', + elapsedTimeUs: Math.round((end - start) * 1000), + cpuTimeUs: -1, + status: '[OK]', + }; + + rmSync(TEST_NAME_STRING, {recursive: true, force: true}); + await bucket.deleteFiles(); //cleanup + return result; +} + +main(); diff --git a/internal-tooling/performPerformanceTest.ts b/internal-tooling/performPerformanceTest.ts index 946339daa..eb125b1e6 100644 --- a/internal-tooling/performPerformanceTest.ts +++ b/internal-tooling/performPerformanceTest.ts @@ -15,57 +15,39 @@ */ import yargs from 'yargs'; -import * as uuid from 'uuid'; -import {execSync} from 'child_process'; -import {unlinkSync} from 'fs'; -import {Storage} from '../src'; import {performance} from 'perf_hooks'; // eslint-disable-next-line node/no-unsupported-features/node-builtins import {parentPort} from 'worker_threads'; import path = require('path'); +import { + BLOCK_SIZE_IN_BYTES, + cleanupFile, + DEFAULT_LARGE_FILE_SIZE_BYTES, + DEFAULT_PROJECT_ID, + DEFAULT_SMALL_FILE_SIZE_BYTES, + generateRandomFile, + generateRandomFileName, + getValidationType, + NODE_DEFAULT_HIGHWATER_MARK_BYTES, + performanceTestSetup, + TestResult, +} from './performanceUtils'; +import {Bucket} from '../src'; const TEST_NAME_STRING = 'nodejs-perf-metrics'; const DEFAULT_NUMBER_OF_WRITES = 1; const DEFAULT_NUMBER_OF_READS = 3; const DEFAULT_BUCKET_NAME = 'nodejs-perf-metrics'; -const DEFAULT_SMALL_FILE_SIZE_BYTES = 5120; -const DEFAULT_LARGE_FILE_SIZE_BYTES = 2.147e9; -const BLOCK_SIZE_IN_BYTES = 1024; -const NODE_DEFAULT_HIGHWATER_MARK_BYTES = 16384; - -export interface TestResult { - op: string; - objectSize: number; - appBufferSize: number; - libBufferSize: number; - crc32Enabled: boolean; - md5Enabled: boolean; - apiName: 'JSON' | 'XML'; - elapsedTimeUs: number; - cpuTimeUs: number; - status: '[OK]'; -} -/** - * Create a uniformly distributed random integer beween the inclusive min and max provided. - * - * @param {number} minInclusive lower bound (inclusive) of the range of random integer to return. - * @param {number} maxInclusive upper bound (inclusive) of the range of random integer to return. - * @returns {number} returns a random integer between minInclusive and maxInclusive - */ -const randomInteger = (minInclusive: number, maxInclusive: number) => { - // Utilizing Math.random will generate uniformly distributed random numbers. - return ( - Math.floor(Math.random() * (maxInclusive - minInclusive + 1)) + minInclusive - ); -}; +let bucket: Bucket; +const checkType = getValidationType(); const argv = yargs(process.argv.slice(2)) .options({ bucket: {type: 'string', default: DEFAULT_BUCKET_NAME}, small: {type: 'number', default: DEFAULT_SMALL_FILE_SIZE_BYTES}, large: {type: 'number', default: DEFAULT_LARGE_FILE_SIZE_BYTES}, - projectid: {type: 'string'}, + projectid: {type: 'string', default: DEFAULT_PROJECT_ID}, }) .parseSync(); @@ -81,22 +63,14 @@ async function main() { /** * Performs an iteration of the Write 1 / Read 3 performance measuring test. * - * @returns {Promise} Promise that resolves to an array of test results for the iteration. */ async function performWriteReadTest(): Promise { const results: TestResult[] = []; - const fileName = generateRandomFileName(); - const sizeInBytes = generateRandomFile(fileName); - const checkType = randomInteger(0, 2); - - const stg = new Storage({ - projectId: argv.projectid, - }); + const fileName = generateRandomFileName(TEST_NAME_STRING); + const sizeInBytes = generateRandomFile(fileName, argv.small, argv.large); - let bucket = stg.bucket(argv.bucket); - if (!(await bucket.exists())[0]) { - await bucket.create(); - } + ({bucket} = await performanceTestSetup(argv.projectid, argv.bucket)); for (let j = 0; j < DEFAULT_NUMBER_OF_WRITES; j++) { let start = 0; @@ -107,41 +81,22 @@ async function performWriteReadTest(): Promise { objectSize: sizeInBytes, appBufferSize: BLOCK_SIZE_IN_BYTES, libBufferSize: NODE_DEFAULT_HIGHWATER_MARK_BYTES, - crc32Enabled: false, - md5Enabled: false, + crc32Enabled: checkType === 'crc32c', + md5Enabled: checkType === 'md5', apiName: 'JSON', elapsedTimeUs: 0, cpuTimeUs: -1, status: '[OK]', }; - bucket = stg.bucket(argv.bucket, { - preconditionOpts: { - ifGenerationMatch: 0, - }, - }); - - if (checkType === 0) { - start = performance.now(); - await bucket.upload(`${__dirname}/${fileName}`, {validation: false}); - end = performance.now(); - } else if (checkType === 1) { - iterationResult.crc32Enabled = true; - start = performance.now(); - await bucket.upload(`${__dirname}/${fileName}`, {validation: 'crc32c'}); - end = performance.now(); - } else { - iterationResult.md5Enabled = true; - start = performance.now(); - await bucket.upload(`${__dirname}/${fileName}`, {validation: 'md5'}); - end = performance.now(); - } + start = performance.now(); + await bucket.upload(`${__dirname}/${fileName}`, {validation: checkType}); + end = performance.now(); iterationResult.elapsedTimeUs = Math.round((end - start) * 1000); results.push(iterationResult); } - bucket = stg.bucket(argv.bucket); for (let j = 0; j < DEFAULT_NUMBER_OF_READS; j++) { let start = 0; let end = 0; @@ -151,31 +106,21 @@ async function performWriteReadTest(): Promise { objectSize: sizeInBytes, appBufferSize: BLOCK_SIZE_IN_BYTES, libBufferSize: NODE_DEFAULT_HIGHWATER_MARK_BYTES, - crc32Enabled: false, - md5Enabled: false, + crc32Enabled: checkType === 'crc32c', + md5Enabled: checkType === 'md5', apiName: 'JSON', elapsedTimeUs: 0, cpuTimeUs: -1, status: '[OK]', }; - const destinationFileName = generateRandomFileName(); + const destinationFileName = generateRandomFileName(TEST_NAME_STRING); const destination = path.join(__dirname, destinationFileName); - if (checkType === 0) { - start = performance.now(); - await file.download({validation: false, destination}); - end = performance.now(); - } else if (checkType === 1) { - iterationResult.crc32Enabled = true; - start = performance.now(); - await file.download({validation: 'crc32c', destination}); - end = performance.now(); - } else { - iterationResult.md5Enabled = true; - start = performance.now(); - await file.download({validation: 'md5', destination}); - end = performance.now(); - } + + start = performance.now(); + await file.download({validation: checkType, destination}); + end = performance.now(); + cleanupFile(destinationFileName); iterationResult.elapsedTimeUs = Math.round((end - start) * 1000); results.push(iterationResult); @@ -186,38 +131,4 @@ async function performWriteReadTest(): Promise { return results; } -/** - * Creates a file with a size between the small (default 5120 bytes) and large (2.147e9 bytes) parameters. - * The file is filled with random data. - * - * @param {string} fileName name of the file to generate. - * @returns {number} the size of the file generated. - */ -function generateRandomFile(fileName: string) { - const fileSizeBytes = randomInteger(argv.small, argv.large); - const numberNeeded = Math.ceil(fileSizeBytes / BLOCK_SIZE_IN_BYTES); - const cmd = `dd if=/dev/urandom of=${__dirname}/${fileName} bs=${BLOCK_SIZE_IN_BYTES} count=${numberNeeded} status=none iflag=fullblock`; - execSync(cmd); - - return fileSizeBytes; -} - -/** - * Creates a random file name by appending a UUID to the TEST_NAME_STRING. - * - * @returns {string} random file name that was generated. - */ -function generateRandomFileName(): string { - return `${TEST_NAME_STRING}.${uuid.v4()}`; -} - -/** - * Deletes the file specified by the fileName parameter. - * - * @param {string} fileName name of the file to delete. - */ -function cleanupFile(fileName: string) { - unlinkSync(`${__dirname}/${fileName}`); -} - main(); diff --git a/internal-tooling/performTransferManagerTest.ts b/internal-tooling/performTransferManagerTest.ts new file mode 100644 index 000000000..64082eca0 --- /dev/null +++ b/internal-tooling/performTransferManagerTest.ts @@ -0,0 +1,249 @@ +/*! + * Copyright 2022 Google LLC. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// eslint-disable-next-line node/no-unsupported-features/node-builtins +import {parentPort} from 'worker_threads'; +import yargs from 'yargs'; +import {Bucket, TransferManager} from '../src'; +import {TRANSFER_MANAGER_TEST_TYPES} from './performanceTest'; +import { + BLOCK_SIZE_IN_BYTES, + cleanupFile, + DEFAULT_LARGE_FILE_SIZE_BYTES, + DEFAULT_PROJECT_ID, + DEFAULT_SMALL_FILE_SIZE_BYTES, + generateRandomDirectoryStructure, + generateRandomFile, + generateRandomFileName, + getValidationType, + NODE_DEFAULT_HIGHWATER_MARK_BYTES, + DEFAULT_NUMBER_OF_OBJECTS, + performanceTestSetup, + TestResult, +} from './performanceUtils'; +import {performance} from 'perf_hooks'; +import {rmSync} from 'fs'; +import * as path from 'path'; + +const TEST_NAME_STRING = 'tm-perf-metrics'; +const DEFAULT_BUCKET_NAME = 'nodejs-transfer-manager-perf-metrics'; +const DEFAULT_NUMBER_OF_PROMISES = 2; +const DEFAULT_CHUNK_SIZE_BYTES = 16 * 1024 * 1024; +const DIRECTORY_PROBABILITY = 0.1; + +let bucket: Bucket; +let transferManager: TransferManager; +const checkType = getValidationType(); + +const argv = yargs(process.argv.slice(2)) + .options({ + bucket: {type: 'string', default: DEFAULT_BUCKET_NAME}, + small: {type: 'number', default: DEFAULT_SMALL_FILE_SIZE_BYTES}, + large: {type: 'number', default: DEFAULT_LARGE_FILE_SIZE_BYTES}, + numpromises: {type: 'number', default: DEFAULT_NUMBER_OF_PROMISES}, + numobjects: {type: 'number', default: DEFAULT_NUMBER_OF_OBJECTS}, + chunksize: {type: 'number', default: DEFAULT_CHUNK_SIZE_BYTES}, + projectid: {type: 'string', default: DEFAULT_PROJECT_ID}, + testtype: { + type: 'string', + choices: [ + TRANSFER_MANAGER_TEST_TYPES.TRANSFER_MANAGER_UPLOAD_MULTIPLE_OBJECTS, + TRANSFER_MANAGER_TEST_TYPES.TRANSFER_MANAGER_DOWNLOAD_MULTIPLE_OBJECTS, + TRANSFER_MANAGER_TEST_TYPES.TRANSFER_MANAGER_LARGE_FILE_DOWNLOAD, + ], + }, + }) + .parseSync(); + +/** + * Main entry point. This function performs a test iteration and posts the message back + * to the parent thread. + */ +async function main() { + let result: TestResult = { + op: '', + objectSize: 0, + appBufferSize: 0, + libBufferSize: 0, + crc32Enabled: false, + md5Enabled: false, + apiName: 'JSON', + elapsedTimeUs: 0, + cpuTimeUs: 0, + status: '[OK]', + }; + + ({bucket, transferManager} = await performanceTestSetup( + argv.projectid, + argv.bucket + )); + + switch (argv.testtype) { + case TRANSFER_MANAGER_TEST_TYPES.TRANSFER_MANAGER_UPLOAD_MULTIPLE_OBJECTS: + result = await performUploadManyFilesTest(); + break; + case TRANSFER_MANAGER_TEST_TYPES.TRANSFER_MANAGER_DOWNLOAD_MULTIPLE_OBJECTS: + result = await performDownloadManyFilesTest(); + break; + case TRANSFER_MANAGER_TEST_TYPES.TRANSFER_MANAGER_LARGE_FILE_DOWNLOAD: + result = await performDownloadFileInChunksTest(); + break; + default: + break; + } + parentPort?.postMessage(result); + await performTestCleanup(); +} + +/** + * Cleans up after a test is complete by removing all files from the bucket + */ +async function performTestCleanup() { + await bucket.deleteFiles(); +} + +/** + * Performs a test where multiple objects are uploaded in parallel to a bucket. + * + * @returns {Promise} A promise that resolves containing information about the test results. + */ +async function performUploadManyFilesTest(): Promise { + const creationInfo = generateRandomDirectoryStructure( + argv.numobjects, + TEST_NAME_STRING, + argv.small, + argv.large, + DIRECTORY_PROBABILITY + ); + + const start = performance.now(); + await transferManager.uploadManyFiles(creationInfo.paths, { + concurrencyLimit: argv.numpromises, + passthroughOptions: { + validation: checkType, + }, + }); + const end = performance.now(); + + rmSync(TEST_NAME_STRING, {recursive: true, force: true}); + + const result: TestResult = { + op: 'WRITE', + objectSize: creationInfo.totalSizeInBytes, + appBufferSize: BLOCK_SIZE_IN_BYTES, + libBufferSize: NODE_DEFAULT_HIGHWATER_MARK_BYTES, + crc32Enabled: checkType === 'crc32c', + md5Enabled: checkType === 'md5', + apiName: 'JSON', + elapsedTimeUs: Math.round((end - start) * 1000), + cpuTimeUs: -1, + status: '[OK]', + }; + + return result; +} + +/** + * Performs a test where multiple objects are downloaded in parallel from a bucket. + * + * @returns {Promise} A promise that resolves containing information about the test results. + */ +async function performDownloadManyFilesTest(): Promise { + const creationInfo = generateRandomDirectoryStructure( + argv.numobjects, + TEST_NAME_STRING, + argv.small, + argv.large, + DIRECTORY_PROBABILITY + ); + + await transferManager.uploadManyFiles(creationInfo.paths, { + concurrencyLimit: argv.numpromises, + passthroughOptions: { + validation: checkType, + }, + }); + const start = performance.now(); + await transferManager.downloadManyFiles(TEST_NAME_STRING, { + prefix: path.join(__dirname, '..', '..'), + concurrencyLimit: argv.numpromises, + passthroughOptions: { + validation: checkType, + }, + }); + const end = performance.now(); + + rmSync(TEST_NAME_STRING, {recursive: true, force: true}); + + const result: TestResult = { + op: 'READ', + objectSize: creationInfo.totalSizeInBytes, + appBufferSize: BLOCK_SIZE_IN_BYTES, + libBufferSize: NODE_DEFAULT_HIGHWATER_MARK_BYTES, + crc32Enabled: checkType === 'crc32c', + md5Enabled: checkType === 'md5', + apiName: 'JSON', + elapsedTimeUs: Math.round((end - start) * 1000), + cpuTimeUs: -1, + status: '[OK]', + }; + return result; +} + +/** + * Performs a test where a large file is downloaded as chunks in parallel. + * + * @returns {Promise} A promise that resolves containing information about the test results. + */ +async function performDownloadFileInChunksTest(): Promise { + const fileName = generateRandomFileName(TEST_NAME_STRING); + const sizeInBytes = generateRandomFile( + fileName, + argv.small, + argv.large, + __dirname + ); + const file = bucket.file(`${fileName}`); + + await bucket.upload(`${__dirname}/${fileName}`); + cleanupFile(fileName); + const start = performance.now(); + await transferManager.downloadFileInChunks(file, { + concurrencyLimit: argv.numpromises, + chunkSizeBytes: argv.chunksize, + destination: path.join(__dirname, fileName), + }); + const end = performance.now(); + + cleanupFile(fileName); + + const result: TestResult = { + op: 'READ', + objectSize: sizeInBytes, + appBufferSize: BLOCK_SIZE_IN_BYTES, + libBufferSize: NODE_DEFAULT_HIGHWATER_MARK_BYTES, + crc32Enabled: false, + md5Enabled: false, + apiName: 'JSON', + elapsedTimeUs: Math.round((end - start) * 1000), + cpuTimeUs: -1, + status: '[OK]', + }; + + return result; +} + +main(); diff --git a/internal-tooling/performanceTest.ts b/internal-tooling/performanceTest.ts index 55c2b6baf..408d2f7f3 100644 --- a/internal-tooling/performanceTest.ts +++ b/internal-tooling/performanceTest.ts @@ -18,7 +18,7 @@ import {appendFile} from 'fs/promises'; // eslint-disable-next-line node/no-unsupported-features/node-builtins import {Worker} from 'worker_threads'; import yargs = require('yargs'); -import {TestResult} from './performPerformanceTest'; +import {TestResult} from './performanceUtils'; import {existsSync} from 'fs'; import {writeFile} from 'fs/promises'; @@ -27,11 +27,33 @@ const DEFAULT_THREADS = 1; const CSV_HEADERS = 'Op,ObjectSize,AppBufferSize,LibBufferSize,Crc32cEnabled,MD5Enabled,ApiName,ElapsedTimeUs,CpuTimeUs,Status\n'; const START_TIME = Date.now(); +export const enum TRANSFER_MANAGER_TEST_TYPES { + WRITE_ONE_READ_THREE = 'w1r3', + TRANSFER_MANAGER_UPLOAD_MULTIPLE_OBJECTS = 'tm-upload', + TRANSFER_MANAGER_DOWNLOAD_MULTIPLE_OBJECTS = 'tm-download', + TRANSFER_MANAGER_LARGE_FILE_DOWNLOAD = 'tm-large', + APPLICATION_LARGE_FILE_DOWNLOAD = 'application-large', + APPLICATION_UPLOAD_MULTIPLE_OBJECTS = 'application-upload', + APPLICATION_DOWNLOAD_MULTIPLE_OBJECTS = 'application-download', +} const argv = yargs(process.argv.slice(2)) .options({ iterations: {type: 'number', default: DEFAULT_ITERATIONS}, numthreads: {type: 'number', default: DEFAULT_THREADS}, + testtype: { + type: 'string', + choices: [ + TRANSFER_MANAGER_TEST_TYPES.WRITE_ONE_READ_THREE, + TRANSFER_MANAGER_TEST_TYPES.TRANSFER_MANAGER_UPLOAD_MULTIPLE_OBJECTS, + TRANSFER_MANAGER_TEST_TYPES.TRANSFER_MANAGER_DOWNLOAD_MULTIPLE_OBJECTS, + TRANSFER_MANAGER_TEST_TYPES.TRANSFER_MANAGER_LARGE_FILE_DOWNLOAD, + TRANSFER_MANAGER_TEST_TYPES.APPLICATION_DOWNLOAD_MULTIPLE_OBJECTS, + TRANSFER_MANAGER_TEST_TYPES.APPLICATION_LARGE_FILE_DOWNLOAD, + TRANSFER_MANAGER_TEST_TYPES.APPLICATION_UPLOAD_MULTIPLE_OBJECTS, + ], + default: TRANSFER_MANAGER_TEST_TYPES.WRITE_ONE_READ_THREE, + }, }) .parseSync(); @@ -51,6 +73,9 @@ function main() { ); numThreads = iterationsRemaining; } + if (argv.testtype !== TRANSFER_MANAGER_TEST_TYPES.WRITE_ONE_READ_THREE) { + numThreads = 1; + } for (let i = 0; i < numThreads; i++) { createWorker(); } @@ -65,9 +90,33 @@ function createWorker() { console.log( `Starting new iteration. Current iterations remaining: ${iterationsRemaining}` ); - const w = new Worker(__dirname + '/performPerformanceTest.js', { + let testPath = ''; + if (argv.testtype === TRANSFER_MANAGER_TEST_TYPES.WRITE_ONE_READ_THREE) { + testPath = `${__dirname}/performPerformanceTest.js`; + } else if ( + argv.testtype === + TRANSFER_MANAGER_TEST_TYPES.TRANSFER_MANAGER_UPLOAD_MULTIPLE_OBJECTS || + argv.testtype === + TRANSFER_MANAGER_TEST_TYPES.TRANSFER_MANAGER_LARGE_FILE_DOWNLOAD || + argv.testtype === + TRANSFER_MANAGER_TEST_TYPES.TRANSFER_MANAGER_DOWNLOAD_MULTIPLE_OBJECTS + ) { + testPath = `${__dirname}/performTransferManagerTest.js`; + } else if ( + argv.testtype === + TRANSFER_MANAGER_TEST_TYPES.APPLICATION_UPLOAD_MULTIPLE_OBJECTS || + argv.testtype === + TRANSFER_MANAGER_TEST_TYPES.APPLICATION_LARGE_FILE_DOWNLOAD || + argv.testtype === + TRANSFER_MANAGER_TEST_TYPES.APPLICATION_DOWNLOAD_MULTIPLE_OBJECTS + ) { + testPath = `${__dirname}/performApplicationPerformanceTest.js`; + } + + const w = new Worker(testPath, { argv: process.argv.slice(2), }); + w.on('message', data => { console.log('Successfully completed iteration.'); appendResultToCSV(data); @@ -75,8 +124,9 @@ function createWorker() { createWorker(); } }); - w.on('error', () => { + w.on('error', e => { console.log('An error occurred.'); + console.log(e); }); } @@ -85,13 +135,16 @@ function createWorker() { * * @param {TestResult[]} results */ -async function appendResultToCSV(results: TestResult[]) { +async function appendResultToCSV(results: TestResult[] | TestResult) { const fileName = `nodejs-perf-metrics-${START_TIME}-${argv.iterations}.csv`; + const resultsToAppend: TestResult[] = Array.isArray(results) + ? results + : [results]; if (!existsSync(fileName)) { await writeFile(fileName, CSV_HEADERS); } - const csv = results.map(result => Object.values(result)); + const csv = resultsToAppend.map(result => Object.values(result)); const csvString = csv.join('\n'); await appendFile(fileName, `${csvString}\n`); } diff --git a/internal-tooling/performanceUtils.ts b/internal-tooling/performanceUtils.ts new file mode 100644 index 000000000..df8f69b08 --- /dev/null +++ b/internal-tooling/performanceUtils.ts @@ -0,0 +1,226 @@ +/*! + * Copyright 2022 Google LLC. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import {execSync} from 'child_process'; +import {mkdirSync, mkdtempSync, unlinkSync} from 'fs'; +import path = require('path'); +import {Bucket, Storage, TransferManager} from '../src'; + +export const BLOCK_SIZE_IN_BYTES = 1024; +export const DEFAULT_SMALL_FILE_SIZE_BYTES = 5120; +export const DEFAULT_LARGE_FILE_SIZE_BYTES = 2.147e9; +export const NODE_DEFAULT_HIGHWATER_MARK_BYTES = 16384; +export const DEFAULT_DIRECTORY_PROBABILITY = 0.1; +export const DEFAULT_PROJECT_ID = 'GCS_NODE_PERFORMANCE_METRICS'; +export const DEFAULT_NUMBER_OF_OBJECTS = 1000; + +export interface TestResult { + op: string; + objectSize: number; + appBufferSize: number; + libBufferSize: number; + crc32Enabled: boolean; + md5Enabled: boolean; + apiName: 'JSON' | 'XML'; + elapsedTimeUs: number; + cpuTimeUs: number; + status: '[OK]'; +} + +export interface RandomDirectoryCreationInformation { + paths: string[]; + totalSizeInBytes: number; +} + +export interface PerformanceTestSetupResults { + storage: Storage; + bucket: Bucket; + transferManager: TransferManager; +} + +/** + * Create a uniformly distributed random integer beween the inclusive min and max provided. + * + * @param {number} minInclusive lower bound (inclusive) of the range of random integer to return. + * @param {number} maxInclusive upper bound (inclusive) of the range of random integer to return. + * @returns {number} returns a random integer between minInclusive and maxInclusive + */ +export function randomInteger(minInclusive: number, maxInclusive: number) { + // Utilizing Math.random will generate uniformly distributed random numbers. + return ( + Math.floor(Math.random() * (maxInclusive - minInclusive + 1)) + minInclusive + ); +} + +/** + * Returns a boolean value with the provided probability + * + * @param {number} trueProbablity the probability the value will be true + * + * @returns {boolean} a boolean value with the probablity provided. + */ +export function weightedRandomBoolean(trueProbablity: number): boolean { + return Math.random() <= trueProbablity ? true : false; +} + +/** + * Return a string of 6 random characters + * + * @returns {string} a random string value with length of 6 + */ +export function randomString(): string { + return Math.random().toString(36).slice(-6); +} + +/** + * Creates a random file name by appending a UUID to the baseName. + * + * @param {string} baseName the base file name. A random uuid will be appended to this value. + * + * @returns {string} random file name that was generated. + */ +export function generateRandomFileName(baseName: string): string { + return `${baseName}.${randomString()}`; +} + +/** + * Creates a file with a size between the small (default 5120 bytes) and large (2.147e9 bytes) parameters. + * The file is filled with random data. + * + * @param {string} fileName name of the file to generate. + * @param {number} fileSizeLowerBoundBytes minimum size of file to generate. + * @param {number} fileSizeUpperBoundBytes maximum size of file to generate. + * @param {string} currentDirectory the directory in which to generate the file. + * + * @returns {number} the size of the file generated. + */ +export function generateRandomFile( + fileName: string, + fileSizeLowerBoundBytes: number = DEFAULT_SMALL_FILE_SIZE_BYTES, + fileSizeUpperBoundBytes: number = DEFAULT_LARGE_FILE_SIZE_BYTES, + currentDirectory: string = mkdtempSync(randomString()) +): number { + const fileSizeBytes = randomInteger( + fileSizeLowerBoundBytes, + fileSizeUpperBoundBytes + ); + + execSync( + `head --bytes=${fileSizeBytes} /dev/urandom > ${currentDirectory}/${fileName}` + ); + + return fileSizeBytes; +} + +/** + * Creates a random directory structure consisting of subdirectories and random files. + * + * @param {number} maxObjects the total number of subdirectories and files to generate. + * @param {string} baseName the starting directory under which everything else is added. File names will have this value prepended. + * @param {number} fileSizeLowerBoundBytes minimum size of file to generate. + * @param {number} fileSizeUpperBoundBytes maximum size of file to generate. + * + * @returns {array} an array of all the generated paths + */ +export function generateRandomDirectoryStructure( + maxObjects: number, + baseName: string, + fileSizeLowerBoundBytes: number = DEFAULT_SMALL_FILE_SIZE_BYTES, + fileSizeUpperBoundBytes: number = DEFAULT_LARGE_FILE_SIZE_BYTES, + directoryProbability: number = DEFAULT_DIRECTORY_PROBABILITY +): RandomDirectoryCreationInformation { + let curPath = baseName; + const creationInfo: RandomDirectoryCreationInformation = { + paths: [], + totalSizeInBytes: 0, + }; + + mkdirSync(curPath); + for (let i = 0; i < maxObjects; i++) { + if (weightedRandomBoolean(directoryProbability)) { + curPath = path.join(curPath, randomString()); + mkdirSync(curPath, {recursive: true}); + creationInfo.paths.push(curPath); + } else { + const randomName = randomString(); + creationInfo.totalSizeInBytes += generateRandomFile( + randomName, + fileSizeLowerBoundBytes, + fileSizeUpperBoundBytes, + curPath + ); + creationInfo.paths.push(path.join(curPath, randomName)); + } + } + + return creationInfo; +} + +/** + * Deletes the file specified by the fileName parameter. + * + * @param {string} fileName name of the file to delete. + */ +export function cleanupFile( + fileName: string, + directoryName: string = __dirname +): void { + unlinkSync(`${directoryName}/${fileName}`); +} + +/** + * Creates the necessary structures for performing a performance test. + * + * @param {string} projectId the project ID to use. + * @param {string} bucketName the name of the bucket to use. + * @returns {object} object containing the created storage, bucket, and transfer manager instance. + */ +export async function performanceTestSetup( + projectId: string, + bucketName: string +): Promise { + const storage = new Storage({projectId}); + const bucket = storage.bucket(bucketName, { + preconditionOpts: { + ifGenerationMatch: 0, + }, + }); + if (!(await bucket.exists())[0]) { + await bucket.create(); + } + const transferManager = new TransferManager(bucket); + return { + storage, + bucket, + transferManager, + }; +} + +/** + * Randomly returns the type of validation check to run on upload / download + * + * @returns {string | boolean | undefined} the type of validation to run (crc32c, md5, or none). + */ +export function getValidationType(): 'md5' | 'crc32c' | boolean | undefined { + const checkType = randomInteger(0, 2); + if (checkType === 0) { + return false; + } else if (checkType === 1) { + return 'crc32c'; + } else { + return 'md5'; + } +} diff --git a/samples/downloadFileInChunksWithTransferManager.js b/samples/downloadFileInChunksWithTransferManager.js new file mode 100644 index 000000000..891f8cd7b --- /dev/null +++ b/samples/downloadFileInChunksWithTransferManager.js @@ -0,0 +1,77 @@ +/** + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * @experimental + */ + +const path = require('path'); +const cwd = path.join(__dirname, '..'); + +// sample-metadata: +// title: Download a File in Chunks Utilzing Transfer Manager +// description: Downloads a single file in in chunks in parallel utilizing transfer manager. +// usage: node downloadFileInChunksWithTransferManager.js + +function main( + bucketName = 'my-bucket', + fileName = 'file1.txt', + destFileName = path.join(cwd, fileName), + chunkSize = 1024 +) { + // [START storage_download_many_files_transfer_manager] + /** + * TODO(developer): Uncomment the following lines before running the sample. + */ + // The ID of your GCS bucket + // const bucketName = 'your-unique-bucket-name'; + + // The ID of the GCS file to download + // const fileName = 'your-file-name'; + + // The path to which the file should be downloaded + // const destFileName = '/local/path/to/file.txt'; + + // The size of each chunk to be downloaded + // const chunkSize = 1024; + + // Imports the Google Cloud client library + const {Storage, TransferManager} = require('@google-cloud/storage'); + + // Creates a client + const storage = new Storage(); + + // Creates a transfer manager instance + const transferManager = new TransferManager(storage.bucket(bucketName)); + + async function downloadFileInChunksWithTransferManager() { + // Downloads the files + await transferManager.downloadFileInChunks(fileName, { + destination: destFileName, + chunkSizeBytes: chunkSize, + }); + + console.log( + `gs://${bucketName}/${fileName} downloaded to ${destFileName}.` + ); + } + + downloadFileInChunksWithTransferManager().catch(console.error); + // [END storage_download_many_files_transfer_manager] +} + +process.on('unhandledRejection', err => { + console.error(err.message); + process.exitCode = 1; +}); +main(...process.argv.slice(2)); diff --git a/samples/downloadManyFilesWithTransferManager.js b/samples/downloadManyFilesWithTransferManager.js new file mode 100644 index 000000000..3ddfb0980 --- /dev/null +++ b/samples/downloadManyFilesWithTransferManager.js @@ -0,0 +1,67 @@ +/** + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * @experimental + */ + +// sample-metadata: +// title: Download Many Files With Transfer Manager +// description: Downloads many files in parallel utilizing transfer manager. +// usage: node downloadManyFilesWithTransferManager.js + +function main( + bucketName = 'my-bucket', + firstFileName = 'file1.txt', + secondFileName = 'file2.txt' +) { + // [START storage_download_many_files_transfer_manager] + /** + * TODO(developer): Uncomment the following lines before running the sample. + */ + // The ID of your GCS bucket + // const bucketName = 'your-unique-bucket-name'; + + // The ID of the first GCS file to download + // const firstFileName = 'your-first-file-name'; + + // The ID of the second GCS file to download + // const secondFileName = 'your-second-file-name; + + // Imports the Google Cloud client library + const {Storage, TransferManager} = require('@google-cloud/storage'); + + // Creates a client + const storage = new Storage(); + + // Creates a transfer manager instance + const transferManager = new TransferManager(storage.bucket(bucketName)); + + async function downloadManyFilesWithTransferManager() { + // Downloads the files + await transferManager.downloadManyFiles([firstFileName, secondFileName]); + + for (const fileName of [firstFileName, secondFileName]) { + console.log(`gs://${bucketName}/${fileName} downloaded to ${fileName}.`); + } + } + + downloadManyFilesWithTransferManager().catch(console.error); + // [END storage_download_many_files_transfer_manager] +} + +process.on('unhandledRejection', err => { + console.error(err.message); + process.exitCode = 1; +}); +main(...process.argv.slice(2)); diff --git a/samples/downloaded.txt b/samples/downloaded.txt new file mode 100644 index 000000000..c57eff55e --- /dev/null +++ b/samples/downloaded.txt @@ -0,0 +1 @@ +Hello World! \ No newline at end of file diff --git a/samples/resources/test2.txt b/samples/resources/test2.txt new file mode 100644 index 000000000..010302410 --- /dev/null +++ b/samples/resources/test2.txt @@ -0,0 +1 @@ +Hello World 2! \ No newline at end of file diff --git a/samples/system-test/transfer-manager.test.js b/samples/system-test/transfer-manager.test.js new file mode 100644 index 000000000..983d9cd32 --- /dev/null +++ b/samples/system-test/transfer-manager.test.js @@ -0,0 +1,85 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +'use strict'; + +const path = require('path'); +const {Storage} = require('@google-cloud/storage'); +const {before, after, it, describe} = require('mocha'); +const uuid = require('uuid'); +const cp = require('child_process'); +const {assert} = require('chai'); + +const execSync = cmd => cp.execSync(cmd, {encoding: 'utf-8'}); +const storage = new Storage(); +const cwd = path.join(__dirname, '..'); +const bucketName = generateName(); +const bucket = storage.bucket(bucketName); +const firstFileName = 'test.txt'; +const secondFileName = 'test2.txt'; +const firstFilePath = path.join(cwd, 'resources', firstFileName); +const secondFilePath = path.join(cwd, 'resources', secondFileName); +const downloadFilePath = path.join(cwd, 'downloaded.txt'); +const chunkSize = 1024; + +describe('transfer manager', () => { + before(async () => { + await bucket.create(); + }); + + after(async () => { + await bucket.deleteFiles({force: true}).catch(console.error); + await bucket.delete().catch(console.error); + }); + + it('should upload multiple files', async () => { + const output = execSync( + `node uploadManyFilesWithTransferManager.js ${bucketName} ${firstFilePath} ${secondFilePath}` + ); + assert.match( + output, + new RegExp( + `${firstFilePath} uploaded to ${bucketName}.\n${secondFilePath} uploaded to ${bucketName}` + ) + ); + }); + + it('should download mulitple files', async () => { + const output = execSync( + `node downloadManyFilesWithTransferManager.js ${bucketName} ${firstFilePath} ${secondFilePath}` + ); + assert.match( + output, + new RegExp( + `gs://${bucketName}/${firstFilePath} downloaded to ${firstFilePath}.\ngs://${bucketName}/${secondFilePath} downloaded to ${secondFilePath}.` + ) + ); + }); + + it('should download a file utilizing chunked download', async () => { + const output = execSync( + `node downloadFileInChunksWithTransferManager.js ${bucketName} ${firstFilePath} ${downloadFilePath} ${chunkSize}` + ); + assert.match( + output, + new RegExp( + `gs://${bucketName}/${firstFilePath} downloaded to ${downloadFilePath}.` + ) + ); + }); +}); + +function generateName() { + return `nodejs-storage-samples-${uuid.v4()}`; +} diff --git a/samples/uploadManyFilesWithTransferManager.js b/samples/uploadManyFilesWithTransferManager.js new file mode 100644 index 000000000..98795029f --- /dev/null +++ b/samples/uploadManyFilesWithTransferManager.js @@ -0,0 +1,67 @@ +/** + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * @experimental + */ + +// sample-metadata: +// title: Upload Many Files With Transfer Manager +// description: Uploads many files in parallel utilizing transfer manager. +// usage: node uploadManyFilesWithTransferManager.js + +function main( + bucketName = 'my-bucket', + firstFilePath = './local/path/to/file1.txt', + secondFilePath = './local/path/to/file2.txt' +) { + // [START storage_upload_many_files_transfer_manager] + /** + * TODO(developer): Uncomment the following lines before running the sample. + */ + // The ID of your GCS bucket + // const bucketName = 'your-unique-bucket-name'; + + // The ID of the first GCS file to download + // const firstFileName = 'your-first-file-name'; + + // The ID of the second GCS file to download + // const secondFileName = 'your-second-file-name; + + // Imports the Google Cloud client library + const {Storage, TransferManager} = require('@google-cloud/storage'); + + // Creates a client + const storage = new Storage(); + + // Creates a transfer manager instance + const transferManager = new TransferManager(storage.bucket(bucketName)); + + async function uploadManyFilesWithTransferManager() { + // Uploads the files + await transferManager.uploadManyFiles([firstFilePath, secondFilePath]); + + for (const filePath of [firstFilePath, secondFilePath]) { + console.log(`${filePath} uploaded to ${bucketName}.`); + } + } + + uploadManyFilesWithTransferManager().catch(console.error); + // [END storage_upload_many_files_transfer_manager] +} + +process.on('unhandledRejection', err => { + console.error(err.message); + process.exitCode = 1; +}); +main(...process.argv.slice(2)); diff --git a/src/index.ts b/src/index.ts index 0f8210aa9..c2e6b9df6 100644 --- a/src/index.ts +++ b/src/index.ts @@ -256,3 +256,4 @@ export { StorageOptions, } from './storage'; export {GetSignedUrlCallback, GetSignedUrlResponse} from './signer'; +export * from './transfer-manager'; diff --git a/src/transfer-manager.ts b/src/transfer-manager.ts new file mode 100644 index 000000000..49beabb22 --- /dev/null +++ b/src/transfer-manager.ts @@ -0,0 +1,342 @@ +/*! + * Copyright 2022 Google LLC. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import {Bucket, UploadOptions, UploadResponse} from './bucket'; +import {DownloadOptions, DownloadResponse, File} from './file'; +import * as pLimit from 'p-limit'; +import * as path from 'path'; +import * as extend from 'extend'; +import {promises as fsp} from 'fs'; + +/** + * Default number of concurrently executing promises to use when calling uploadManyFiles. + * @experimental + */ +const DEFAULT_PARALLEL_UPLOAD_LIMIT = 2; +/** + * Default number of concurrently executing promises to use when calling downloadManyFiles. + * @experimental + */ +const DEFAULT_PARALLEL_DOWNLOAD_LIMIT = 2; +/** + * Default number of concurrently executing promises to use when calling downloadFileInChunks. + * @experimental + */ +const DEFAULT_PARALLEL_CHUNKED_DOWNLOAD_LIMIT = 2; +/** + * The minimum size threshold in bytes at which to apply a chunked download strategy when calling downloadFileInChunks. + * @experimental + */ +const DOWNLOAD_IN_CHUNKS_FILE_SIZE_THRESHOLD = 256 * 1024 * 1024; +/** + * The chunk size in bytes to use when calling downloadFileInChunks. + * @experimental + */ +const DOWNLOAD_IN_CHUNKS_DEFAULT_CHUNK_SIZE = 10 * 1024 * 1024; +const EMPTY_REGEX = '(?:)'; +export interface UploadManyFilesOptions { + concurrencyLimit?: number; + skipIfExists?: boolean; + prefix?: string; + passthroughOptions?: Omit; +} + +export interface DownloadManyFilesOptions { + concurrencyLimit?: number; + prefix?: string; + stripPrefix?: string; + passthroughOptions?: DownloadOptions; +} + +export interface DownloadFileInChunksOptions { + concurrencyLimit?: number; + chunkSizeBytes?: number; + destination?: string; +} + +/** + * Create a TransferManager object to perform parallel transfer operations on a Cloud Storage bucket. + * + * @class + * @hideconstructor + * + * @param {Bucket} bucket A {@link Bucket} instance + * @experimental + */ +export class TransferManager { + bucket: Bucket; + constructor(bucket: Bucket) { + this.bucket = bucket; + } + + /** + * @typedef {object} UploadManyFilesOptions + * @property {number} [concurrencyLimit] The number of concurrently executing promises + * to use when uploading the files. + * @property {boolean} [skipIfExists] Do not upload the file if it already exists in + * the bucket. This will set the precondition ifGenerationMatch = 0. + * @property {string} [prefix] A prefix to append to all of the uploaded files. + * @property {object} [passthroughOptions] {@link UploadOptions} Options to be passed through + * to each individual upload operation. + * @experimental + */ + /** + * Upload multiple files in parallel to the bucket. This is a convenience method + * that utilizes {@link Bucket#upload} to perform the upload. + * + * @param {array} [filePaths] An array of fully qualified paths to the files. + * to be uploaded to the bucket + * @param {UploadManyFilesOptions} [options] Configuration options. + * @returns {Promise} + * + * @example + * ``` + * const {Storage} = require('@google-cloud/storage'); + * const storage = new Storage(); + * const bucket = storage.bucket('my-bucket'); + * const transferManager = new TransferManager(bucket); + * + * //- + * // Upload multiple files in parallel. + * //- + * const response = await transferManager.uploadManyFiles(['/local/path/file1.txt, 'local/path/file2.txt']); + * // Your bucket now contains: + * // - "local/path/file1.txt" (with the contents of '/local/path/file1.txt') + * // - "local/path/file2.txt" (with the contents of '/local/path/file2.txt') + * ``` + * @experimental + */ + async uploadManyFiles( + filePaths: string[], + options: UploadManyFilesOptions = {} + ): Promise { + if (options.skipIfExists && options.passthroughOptions?.preconditionOpts) { + options.passthroughOptions.preconditionOpts.ifGenerationMatch = 0; + } else if ( + options.skipIfExists && + options.passthroughOptions === undefined + ) { + options.passthroughOptions = { + preconditionOpts: { + ifGenerationMatch: 0, + }, + }; + } + + const limit = pLimit( + options.concurrencyLimit || DEFAULT_PARALLEL_UPLOAD_LIMIT + ); + const promises = []; + + for (const filePath of filePaths) { + const stat = await fsp.lstat(filePath); + if (stat.isDirectory()) { + continue; + } + const passThroughOptionsCopy: UploadOptions = extend( + true, + {}, + options.passthroughOptions + ); + passThroughOptionsCopy.destination = filePath; + if (options.prefix) { + passThroughOptionsCopy.destination = path.join( + options.prefix, + passThroughOptionsCopy.destination + ); + } + promises.push( + limit(() => this.bucket.upload(filePath, passThroughOptionsCopy)) + ); + } + + return Promise.all(promises); + } + + /** + * @typedef {object} DownloadManyFilesOptions + * @property {number} [concurrencyLimit] The number of concurrently executing promises + * to use when downloading the files. + * @property {string} [prefix] A prefix to append to all of the downloaded files. + * @property {string} [stripPrefix] A prefix to remove from all of the downloaded files. + * @property {object} [passthroughOptions] {@link DownloadOptions} Options to be passed through + * to each individual download operation. + * @experimental + */ + /** + * Download multiple files in parallel to the local filesystem. This is a convenience method + * that utilizes {@link File#download} to perform the download. + * + * @param {array | string} [filesOrFolder] An array of file name strings or file objects to be downloaded. If + * a string is provided this will be treated as a GCS prefix and all files with that prefix will be downloaded. + * @param {DownloadManyFilesOptions} [options] Configuration options. + * @returns {Promise} + * + * @example + * ``` + * const {Storage} = require('@google-cloud/storage'); + * const storage = new Storage(); + * const bucket = storage.bucket('my-bucket'); + * const transferManager = new TransferManager(bucket); + * + * //- + * // Download multiple files in parallel. + * //- + * const response = await transferManager.downloadManyFiles(['file1.txt', 'file2.txt']); + * // The following files have been downloaded: + * // - "file1.txt" (with the contents from my-bucket.file1.txt) + * // - "file2.txt" (with the contents from my-bucket.file2.txt) + * const response = await transferManager.downloadManyFiles([bucket.File('file1.txt'), bucket.File('file2.txt')]); + * // The following files have been downloaded: + * // - "file1.txt" (with the contents from my-bucket.file1.txt) + * // - "file2.txt" (with the contents from my-bucket.file2.txt) + * const response = await transferManager.downloadManyFiles('test-folder'); + * // All files with GCS prefix of 'test-folder' have been downloaded. + * ``` + * @experimental + */ + async downloadManyFiles( + filesOrFolder: File[] | string[] | string, + options: DownloadManyFilesOptions = {} + ): Promise { + const limit = pLimit( + options.concurrencyLimit || DEFAULT_PARALLEL_DOWNLOAD_LIMIT + ); + const promises = []; + let files: File[] = []; + + if (!Array.isArray(filesOrFolder)) { + const directoryFiles = await this.bucket.getFiles({ + prefix: filesOrFolder, + }); + files = directoryFiles[0]; + } else { + files = filesOrFolder.map(curFile => { + if (typeof curFile === 'string') { + return this.bucket.file(curFile); + } + return curFile; + }); + } + + const stripRegexString = options.stripPrefix + ? `^${options.stripPrefix}` + : EMPTY_REGEX; + const regex = new RegExp(stripRegexString, 'g'); + + for (const file of files) { + const passThroughOptionsCopy = extend( + true, + {}, + options.passthroughOptions + ); + if (options.prefix) { + passThroughOptionsCopy.destination = path.join( + options.prefix || '', + passThroughOptionsCopy.destination || '', + file.name + ); + } + if (options.stripPrefix) { + passThroughOptionsCopy.destination = file.name.replace(regex, ''); + } + promises.push(limit(() => file.download(passThroughOptionsCopy))); + } + + return Promise.all(promises); + } + + /** + * @typedef {object} DownloadFileInChunksOptions + * @property {number} [concurrencyLimit] The number of concurrently executing promises + * to use when downloading the file. + * @property {number} [chunkSizeBytes] The size in bytes of each chunk to be downloaded. + * @experimental + */ + /** + * Download a large file in chunks utilizing parallel download operations. This is a convenience method + * that utilizes {@link File#download} to perform the download. + * + * @param {object} [file | string] {@link File} to download. + * @param {DownloadFileInChunksOptions} [options] Configuration options. + * @returns {Promise} + * + * @example + * ``` + * const {Storage} = require('@google-cloud/storage'); + * const storage = new Storage(); + * const bucket = storage.bucket('my-bucket'); + * const transferManager = new TransferManager(bucket); + * + * //- + * // Download a large file in chunks utilizing parallel operations. + * //- + * const response = await transferManager.downloadLargeFile(bucket.file('large-file.txt'); + * // Your local directory now contains: + * // - "large-file.txt" (with the contents from my-bucket.large-file.txt) + * ``` + * @experimental + */ + async downloadFileInChunks( + fileOrName: File | string, + options: DownloadFileInChunksOptions = {} + ): Promise { + let chunkSize = + options.chunkSizeBytes || DOWNLOAD_IN_CHUNKS_DEFAULT_CHUNK_SIZE; + let limit = pLimit( + options.concurrencyLimit || DEFAULT_PARALLEL_CHUNKED_DOWNLOAD_LIMIT + ); + const promises = []; + const file: File = + typeof fileOrName === 'string' + ? this.bucket.file(fileOrName) + : fileOrName; + + const fileInfo = await file.get(); + const size = parseInt(fileInfo[0].metadata.size); + // If the file size does not meet the threshold download it as a single chunk. + if (size < DOWNLOAD_IN_CHUNKS_FILE_SIZE_THRESHOLD) { + limit = pLimit(1); + chunkSize = size; + } + + let start = 0; + const filePath = options.destination || path.basename(file.name); + const fileToWrite = await fsp.open(filePath, 'w+'); + while (start < size) { + const chunkStart = start; + let chunkEnd = start + chunkSize - 1; + chunkEnd = chunkEnd > size ? size : chunkEnd; + promises.push( + limit(() => + file.download({start: chunkStart, end: chunkEnd}).then(resp => { + return fileToWrite.write(resp[0], 0, resp[0].length, chunkStart); + }) + ) + ); + + start += chunkSize; + } + + return Promise.all(promises) + .then(results => { + return results.map(result => result.buffer) as DownloadResponse; + }) + .finally(async () => { + await fileToWrite.close(); + }); + } +} diff --git a/test/transfer-manager.ts b/test/transfer-manager.ts new file mode 100644 index 000000000..159c965ce --- /dev/null +++ b/test/transfer-manager.ts @@ -0,0 +1,294 @@ +/*! + * Copyright 2022 Google LLC. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* eslint-disable @typescript-eslint/no-explicit-any */ +import {ServiceObject, ServiceObjectConfig, util} from '../src/nodejs-common'; +import * as pLimit from 'p-limit'; +import * as proxyquire from 'proxyquire'; +import { + Bucket, + CRC32C, + CreateWriteStreamOptions, + DownloadOptions, + FileOptions, + IdempotencyStrategy, + UploadOptions, +} from '../src'; +import * as assert from 'assert'; +import * as path from 'path'; +import * as stream from 'stream'; +import * as extend from 'extend'; +import * as fs from 'fs'; + +const fakeUtil = Object.assign({}, util); +fakeUtil.noop = util.noop; + +class FakeServiceObject extends ServiceObject { + calledWith_: IArguments; + constructor(config: ServiceObjectConfig) { + super(config); + // eslint-disable-next-line prefer-rest-params + this.calledWith_ = arguments; + } +} + +class FakeAcl { + calledWith_: Array<{}>; + constructor(...args: Array<{}>) { + this.calledWith_ = args; + } +} + +class FakeFile { + calledWith_: IArguments; + bucket: Bucket; + name: string; + options: FileOptions; + metadata: {}; + createWriteStream: Function; + isSameFile = () => false; + constructor(bucket: Bucket, name: string, options?: FileOptions) { + // eslint-disable-next-line prefer-rest-params + this.calledWith_ = arguments; + this.bucket = bucket; + this.name = name; + this.options = options || {}; + this.metadata = {}; + + this.createWriteStream = (options: CreateWriteStreamOptions) => { + this.metadata = options.metadata; + const ws = new stream.Writable(); + ws.write = () => { + ws.emit('complete'); + ws.end(); + return true; + }; + return ws; + }; + } +} + +class HTTPError extends Error { + code: number; + constructor(message: string, code: number) { + super(message); + this.code = code; + } +} + +let pLimitOverride: Function | null; +const fakePLimit = (limit: number) => (pLimitOverride || pLimit)(limit); +const fakeFs = extend(true, {}, fs, { + get promises() { + return { + open: () => { + return { + close: () => {}, + write: (buffer: Buffer) => { + return Promise.resolve({buffer}); + }, + }; + }, + lstat: () => { + return { + isDirectory: () => { + return false; + }, + }; + }, + }; + }, +}); + +describe('Transfer Manager', () => { + let TransferManager: any; + let transferManager: any; + let Bucket: any; + let bucket: any; + let File: any; + + const STORAGE: any = { + createBucket: util.noop, + retryOptions: { + autoRetry: true, + maxRetries: 3, + retryDelayMultipier: 2, + totalTimeout: 600, + maxRetryDelay: 60, + retryableErrorFn: (err: HTTPError) => { + return err.code === 500; + }, + idempotencyStrategy: IdempotencyStrategy.RetryConditional, + }, + crc32cGenerator: () => new CRC32C(), + }; + const BUCKET_NAME = 'test-bucket'; + + before(() => { + Bucket = proxyquire('../src/bucket.js', { + 'p-limit': fakePLimit, + './nodejs-common': { + ServiceObject: FakeServiceObject, + util: fakeUtil, + }, + './acl.js': {Acl: FakeAcl}, + './file.js': {File: FakeFile}, + }).Bucket; + + File = proxyquire('../src/file.js', { + './nodejs-common': { + ServiceObject: FakeServiceObject, + util: fakeUtil, + }, + }).File; + + TransferManager = proxyquire('../src/transfer-manager.js', { + 'p-limit': fakePLimit, + './nodejs-common': { + ServiceObject: FakeServiceObject, + util: fakeUtil, + }, + './acl.js': {Acl: FakeAcl}, + './file.js': {File: FakeFile}, + fs: fakeFs, + fsp: fakeFs, + }).TransferManager; + }); + + beforeEach(() => { + bucket = new Bucket(STORAGE, BUCKET_NAME); + transferManager = new TransferManager(bucket); + }); + + describe('instantiation', () => { + it('should correctly set the bucket', () => { + assert.strictEqual(transferManager.bucket, bucket); + }); + }); + + describe('uploadManyFiles', () => { + it('calls upload with the provided file paths', async () => { + const paths = ['/a/b/c', '/d/e/f', '/h/i/j']; + let count = 0; + + bucket.upload = (path: string) => { + count++; + assert(paths.includes(path)); + }; + + await transferManager.uploadManyFiles(paths); + assert.strictEqual(count, paths.length); + }); + + it('sets ifGenerationMatch to 0 if skipIfExists is set', async () => { + const paths = ['/a/b/c']; + + bucket.upload = (_path: string, options: UploadOptions) => { + assert.strictEqual(options.preconditionOpts?.ifGenerationMatch, 0); + }; + + await transferManager.uploadManyFiles(paths, {skipIfExists: true}); + }); + + it('sets destination to prefix + filename when prefix is supplied', async () => { + const paths = ['/a/b/foo/bar.txt']; + const expectedDestination = path.normalize('hello/world/a/b/foo/bar.txt'); + + bucket.upload = (_path: string, options: UploadOptions) => { + assert.strictEqual(options.destination, expectedDestination); + }; + + await transferManager.uploadManyFiles(paths, {prefix: 'hello/world'}); + }); + + it('returns a promise with the uploaded file if there is no callback', async () => { + const paths = [path.join(__dirname, '../../test/testdata/testfile.json')]; + const result = await transferManager.uploadManyFiles(paths); + assert.strictEqual(result[0][0].name, paths[0]); + }); + }); + + describe('downloadManyFiles', () => { + it('calls download for each provided file', async () => { + let count = 0; + const download = () => { + count++; + }; + const firstFile = new File(bucket, 'first.txt'); + firstFile.download = download; + const secondFile = new File(bucket, 'second.txt'); + secondFile.download = download; + + const files = [firstFile, secondFile]; + await transferManager.downloadManyFiles(files); + assert.strictEqual(count, 2); + }); + + it('sets the destination correctly when provided a prefix', async () => { + const prefix = 'test-prefix'; + const filename = 'first.txt'; + const expectedDestination = path.normalize(`${prefix}/${filename}`); + const download = (options: DownloadOptions) => { + assert.strictEqual(options.destination, expectedDestination); + }; + + const file = new File(bucket, filename); + file.download = download; + await transferManager.downloadManyFiles([file], {prefix}); + }); + + it('sets the destination correctly when provided a strip prefix', async () => { + const stripPrefix = 'should-be-removed/'; + const filename = 'should-be-removed/first.txt'; + const expectedDestination = 'first.txt'; + const download = (options: DownloadOptions) => { + assert.strictEqual(options.destination, expectedDestination); + }; + + const file = new File(bucket, filename); + file.download = download; + await transferManager.downloadManyFiles([file], {stripPrefix}); + }); + }); + + describe('downloadFileInChunks', () => { + let file: any; + + beforeEach(() => { + file = new File(bucket, 'some-large-file'); + file.get = () => { + return [ + { + metadata: { + size: 1024, + }, + }, + ]; + }; + }); + + it('should download a single chunk if file size is below threshold', async () => { + let downloadCallCount = 0; + file.download = () => { + downloadCallCount++; + return Promise.resolve([Buffer.alloc(100)]); + }; + + await transferManager.downloadFileInChunks(file); + assert.strictEqual(downloadCallCount, 1); + }); + }); +});