Skip to content

Commit

Permalink
fix: Determine Content-Length Before Attempting Multi-chunk Upload (#…
Browse files Browse the repository at this point in the history
…2074)

* test: Add system-tests for resumable multi-chunk uploads

* test: streamline multi-chunk upload system tests

* chore: verify `Content-Range`: `bytes ${this.offset}-*/*` behavior

* style: typo

* style: typo

* fix: determine content-length before attempting multi-chunk upload

* test: Use bigger file for multi-chunk test since chunks need to be large

* feat: dynamically determine Content-Length when unspecified

* feat: dynamically determine Content-Length when unspecified

* docs: clarifications, make it easier for the future team

* docs: Another clarification
  • Loading branch information
danielbankhead committed Sep 23, 2022
1 parent 4fb3e00 commit 666402a
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 16 deletions.
48 changes: 37 additions & 11 deletions src/resumable-upload.ts
Expand Up @@ -665,7 +665,7 @@ export class Upload extends Writable {

let expectedUploadSize: number | undefined = undefined;

// Set `expectedUploadSize` to `contentLength` if available
// Set `expectedUploadSize` to `contentLength - this.numBytesWritten`, if available
if (typeof this.contentLength === 'number') {
expectedUploadSize = this.contentLength - this.numBytesWritten;
}
Expand Down Expand Up @@ -718,13 +718,37 @@ export class Upload extends Writable {
};

// If using multiple chunk upload, set appropriate header
if (multiChunkMode && expectedUploadSize) {
// The '-1' is because the ending byte is inclusive in the request.
const endingByte = expectedUploadSize + this.numBytesWritten - 1;
headers['Content-Length'] = expectedUploadSize;
if (multiChunkMode) {
// We need to know how much data is available upstream to set the `Content-Range` header.
const oneChunkIterator = this.upstreamIterator(expectedUploadSize, true);
const {value} = await oneChunkIterator.next();

const bytesToUpload = value!.chunk.byteLength;

// Important: we want to know if the upstream has ended and the queue is empty before
// unshifting data back into the queue. This way we will know if this is the last request or not.
const isLastChunkOfUpload = !(await this.waitForNextChunk());

// Important: put the data back in the queue for the actual upload iterator
this.unshiftChunkBuffer(value!.chunk);

let totalObjectSize = this.contentLength;

if (typeof this.contentLength !== 'number' && isLastChunkOfUpload) {
// Let's let the server know this is the last chunk since
// we didn't know the content-length beforehand.
totalObjectSize = bytesToUpload + this.numBytesWritten;
}

// `- 1` as the ending byte is inclusive in the request.
const endingByte = bytesToUpload + this.numBytesWritten - 1;

// `Content-Length` for multiple chunk uploads is the size of the chunk,
// not the overall object
headers['Content-Length'] = bytesToUpload;
headers[
'Content-Range'
] = `bytes ${this.offset}-${endingByte}/${this.contentLength}`;
] = `bytes ${this.offset}-${endingByte}/${totalObjectSize}`;
} else {
headers['Content-Range'] = `bytes ${this.offset}-*/${this.contentLength}`;
}
Expand Down Expand Up @@ -798,11 +822,13 @@ export class Upload extends Writable {
// continue uploading next chunk
this.continueUploading();
} else if (!this.isSuccessfulResponse(resp.status)) {
const err: ApiError = {
code: resp.status,
name: 'Upload failed',
message: 'Upload failed',
};
const err: ApiError = new Error('Upload failed');
err.code = resp.status;
err.name = 'Upload failed';
if (resp?.data) {
err.errors = [resp?.data];
}

this.destroy(err);
} else {
// remove the last chunk sent to free memory
Expand Down
82 changes: 82 additions & 0 deletions system-test/storage.ts
Expand Up @@ -32,6 +32,7 @@ import {
Notification,
DeleteBucketCallback,
CRC32C,
UploadOptions,
} from '../src';
import * as nock from 'nock';
import {Transform} from 'stream';
Expand Down Expand Up @@ -2688,6 +2689,87 @@ describe('storage', () => {
});
});

describe('resumable upload', () => {
describe('multi-chunk upload', () => {
describe('upload configurations', () => {
const filePath: string = FILES.big.path;
const fileSize = fs.statSync(filePath).size;
let crc32c: string;

before(async () => {
// get a CRC32C value from the file
crc32c = await new Promise((resolve, reject) => {
const crc32c = new CRC32C();

fs.createReadStream(filePath)
.on('data', (d: Buffer) => crc32c.update(d))
.on('end', () => resolve(crc32c.toString()))
.on('error', reject);
});
});

async function uploadAndVerify(
file: File,
options: Omit<UploadOptions, 'destination'>
) {
await bucket.upload(filePath, {
destination: file,
...options,
});

const [metadata] = await file.getMetadata();

// assert we uploaded the expected data
assert.equal(metadata.crc32c, crc32c);
}

it('should support uploads where `contentLength < chunkSize`', async () => {
const file = bucket.file(generateName());

const metadata = {contentLength: fileSize};
// off by +1 to ensure `contentLength < chunkSize`
const chunkSize = fileSize + 1;

await uploadAndVerify(file, {chunkSize, metadata});
});

it('should support uploads where `contentLength % chunkSize != 0`', async () => {
const file = bucket.file(generateName());

const metadata = {contentLength: fileSize};
// off by -1 to ensure `contentLength % chunkSize != 0`
const chunkSize = fileSize - 1;

await uploadAndVerify(file, {chunkSize, metadata});
});

it('should support uploads where `fileSize % chunkSize != 0` && `!contentLength`', async () => {
const file = bucket.file(generateName());
// off by +1 to ensure `fileSize % chunkSize != 0`
const chunkSize = fileSize + 1;

await uploadAndVerify(file, {chunkSize});
});

it('should support uploads where `fileSize < chunkSize && `!contentLength`', async () => {
const file = bucket.file(generateName());
// off by `* 2 +1` to ensure `fileSize < chunkSize`
const chunkSize = fileSize * 2 + 1;

await uploadAndVerify(file, {chunkSize});
});

it('should support uploads where `fileSize > chunkSize` && `!contentLength`', async () => {
const file = bucket.file(generateName());
// off by -1 to ensure `fileSize > chunkSize`
const chunkSize = fileSize - 1;

await uploadAndVerify(file, {chunkSize});
});
});
});
});

describe('bucket upload with progress', () => {
it('show bytes sent with resumable upload', async () => {
const fileSize = fs.statSync(FILES.big.path).size;
Expand Down
20 changes: 15 additions & 5 deletions test/resumable-upload.ts
Expand Up @@ -1045,6 +1045,9 @@ describe('resumable-upload', () => {
});

describe('request preparation', () => {
// Simulating the amount of data written from upstream (exhaustive)
const UPSTREAM_BUFFER_SIZE = 512;
const UPSTREAM_ENDED = true;
// a convenient handle for getting the request options
let reqOpts: GaxiosOptions;

Expand Down Expand Up @@ -1074,8 +1077,8 @@ describe('resumable-upload', () => {

reqOpts = requestOptions;
};
up.upstreamChunkBuffer = Buffer.alloc(512);
up.upstreamEnded = true;
up.upstreamChunkBuffer = Buffer.alloc(UPSTREAM_BUFFER_SIZE);
up.upstreamEnded = UPSTREAM_ENDED;
});

describe('single chunk', () => {
Expand Down Expand Up @@ -1153,18 +1156,25 @@ describe('resumable-upload', () => {

it('should prepare a valid request if `contentLength` is unknown', async () => {
const OFFSET = 100;
const EXPECTED_STREAM_AMOUNT = Math.min(
UPSTREAM_BUFFER_SIZE - OFFSET,
CHUNK_SIZE
);
const ENDING_BYTE = EXPECTED_STREAM_AMOUNT + OFFSET - 1;

up.offset = OFFSET;
up.contentLength = '*';

await up.startUploading();

const endByte = OFFSET + CHUNK_SIZE - 1;
assert(reqOpts.headers);
assert.equal(reqOpts.headers['Content-Length'], CHUNK_SIZE);
assert.equal(
reqOpts.headers['Content-Length'],
EXPECTED_STREAM_AMOUNT
);
assert.equal(
reqOpts.headers['Content-Range'],
`bytes ${OFFSET}-${endByte}/*`
`bytes ${OFFSET}-${ENDING_BYTE}/*`
);
assert.ok(
X_GOOG_API_HEADER_REGEX.test(reqOpts.headers['x-goog-api-client'])
Expand Down

0 comments on commit 666402a

Please sign in to comment.