From 9eb1e88d43716403fc5a06f8896eb8b4beaba92c Mon Sep 17 00:00:00 2001 From: Denis DelGrosso Date: Mon, 8 Aug 2022 20:31:17 +0000 Subject: [PATCH 1/4] refactor(deps): remove stream-events dependency --- package.json | 1 - src/file.ts | 25 +++++++++++++++---------- src/util.ts | 30 ++++++++++++++++++++++++++++++ test/file.ts | 6 ++++-- 4 files changed, 49 insertions(+), 13 deletions(-) diff --git a/package.json b/package.json index ffa3a8edc..9e419629a 100644 --- a/package.json +++ b/package.json @@ -67,7 +67,6 @@ "p-limit": "^3.0.1", "pumpify": "^2.0.0", "retry-request": "^5.0.0", - "stream-events": "^1.0.4", "teeny-request": "^8.0.0", "uuid": "^8.0.0" }, diff --git a/src/file.ts b/src/file.ts index 6c7fc5cf0..9132f972d 100644 --- a/src/file.ts +++ b/src/file.ts @@ -31,8 +31,7 @@ import * as mime from 'mime'; // eslint-disable-next-line @typescript-eslint/no-var-requires const pumpify = require('pumpify'); import * as resumableUpload from './resumable-upload'; -import {Duplex, Writable, Readable, PassThrough} from 'stream'; -import * as streamEvents from 'stream-events'; +import {Writable, Readable, PassThrough} from 'stream'; import * as zlib from 'zlib'; import * as http from 'http'; @@ -65,6 +64,7 @@ import { objectKeyToLowercase, unicodeJSONStringify, formatAsUTCISO, + PassThroughShim, } from './util'; import {CRC32CValidatorGenerator} from './crc32c'; import {HashStreamValidator} from './hash-stream-validator'; @@ -1363,7 +1363,7 @@ class File extends ServiceObject { let validateStream: HashStreamValidator | undefined = undefined; - const throughStream = streamEvents(new PassThrough()); + const throughStream = new PassThroughShim(); let isServedCompressed = true; let crc32c = true; @@ -1938,13 +1938,18 @@ class File extends ServiceObject { stream.emit('progress', evt); }); - const stream = streamEvents( - pumpify([ - gzip ? zlib.createGzip() : new PassThrough(), - validateStream, - fileWriteStream, - ]) - ) as Duplex; + const passThroughShim = new PassThroughShim(); + + passThroughShim.on('writing', () => { + stream.emit('writing'); + }); + + const stream = pumpify([ + passThroughShim, + gzip ? zlib.createGzip() : new PassThrough(), + validateStream, + fileWriteStream, + ]); // Wait until we've received data to determine what upload technique to use. stream.on('writing', () => { diff --git a/src/util.ts b/src/util.ts index 89ba6a2d5..9322ad56d 100644 --- a/src/util.ts +++ b/src/util.ts @@ -13,6 +13,7 @@ // limitations under the License. import * as querystring from 'querystring'; +import {PassThrough} from 'stream'; export function normalize( optionsOrCallback?: T | U, @@ -166,3 +167,32 @@ export function formatAsUTCISO( return resultString; } + +export class PassThroughShim extends PassThrough { + private shouldEmitReading = true; + private shouldEmitWriting = true; + + constructor() { + super(); + } + + _read(size: number): void { + if (this.shouldEmitReading) { + this.emit('reading'); + this.shouldEmitReading = false; + } + super._read(size); + } + + _write( + chunk: never, + encoding: BufferEncoding, + callback: (error?: Error | null | undefined) => void + ): void { + if (this.shouldEmitWriting) { + this.emit('writing'); + this.shouldEmitWriting = false; + } + super._write(chunk, encoding, callback); + } +} diff --git a/test/file.ts b/test/file.ts index 9c9a2ffe8..fb980dcf3 100644 --- a/test/file.ts +++ b/test/file.ts @@ -1916,8 +1916,10 @@ describe('File', () => { const uploadStream = new PassThrough(); file.startResumableUpload_ = (dup: duplexify.Duplexify) => { - dup.setWritable(uploadStream); - uploadStream.emit('error', error); + process.nextTick(() => { + dup.setWritable(uploadStream); + uploadStream.emit('error', error); + }); }; const writable = file.createWriteStream(); From 46494ce11250ad0568f36106c539767528045a59 Mon Sep 17 00:00:00 2001 From: Denis DelGrosso Date: Tue, 9 Aug 2022 16:35:21 +0000 Subject: [PATCH 2/4] implement final, fix _write to invoke callback on nextTick --- src/util.ts | 25 +++++++++++++++++++------ test/file.ts | 6 ++---- 2 files changed, 21 insertions(+), 10 deletions(-) diff --git a/src/util.ts b/src/util.ts index 9322ad56d..90ebafcca 100644 --- a/src/util.ts +++ b/src/util.ts @@ -13,7 +13,7 @@ // limitations under the License. import * as querystring from 'querystring'; -import {PassThrough} from 'stream'; +import {PassThrough, TransformCallback} from 'stream'; export function normalize( optionsOrCallback?: T | U, @@ -172,10 +172,6 @@ export class PassThroughShim extends PassThrough { private shouldEmitReading = true; private shouldEmitWriting = true; - constructor() { - super(); - } - _read(size: number): void { if (this.shouldEmitReading) { this.emit('reading'); @@ -193,6 +189,23 @@ export class PassThroughShim extends PassThrough { this.emit('writing'); this.shouldEmitWriting = false; } - super._write(chunk, encoding, callback); + // Per the nodejs documention, callback must be invoked on the next tick + process.nextTick(() => { + super._write(chunk, encoding, callback); + }); + } + + _final(callback: (error?: Error | null | undefined) => void): void { + // If the stream is empty (i.e. empty file) final will be invoked before _read / _write + // and we should still emit the proper events. + if (this.shouldEmitReading) { + this.emit('reading'); + this.shouldEmitReading = false; + } + if (this.shouldEmitWriting) { + this.emit('writing'); + this.shouldEmitWriting = false; + } + super._final(callback); } } diff --git a/test/file.ts b/test/file.ts index fb980dcf3..9c9a2ffe8 100644 --- a/test/file.ts +++ b/test/file.ts @@ -1916,10 +1916,8 @@ describe('File', () => { const uploadStream = new PassThrough(); file.startResumableUpload_ = (dup: duplexify.Duplexify) => { - process.nextTick(() => { - dup.setWritable(uploadStream); - uploadStream.emit('error', error); - }); + dup.setWritable(uploadStream); + uploadStream.emit('error', error); }; const writable = file.createWriteStream(); From ae3e9ea5a18c7fea04245903481ee4ce057d3e63 Mon Sep 17 00:00:00 2001 From: Denis DelGrosso Date: Tue, 9 Aug 2022 16:52:23 +0000 Subject: [PATCH 3/4] remove unused variable --- src/util.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/util.ts b/src/util.ts index 90ebafcca..74c247e91 100644 --- a/src/util.ts +++ b/src/util.ts @@ -13,7 +13,7 @@ // limitations under the License. import * as querystring from 'querystring'; -import {PassThrough, TransformCallback} from 'stream'; +import {PassThrough} from 'stream'; export function normalize( optionsOrCallback?: T | U, From 82e338eff9caaa0d58b4d42aa5660e8b22cf4235 Mon Sep 17 00:00:00 2001 From: Denis DelGrosso Date: Tue, 9 Aug 2022 17:25:00 +0000 Subject: [PATCH 4/4] invoke callback directly in _final --- src/util.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/util.ts b/src/util.ts index 74c247e91..a9710554c 100644 --- a/src/util.ts +++ b/src/util.ts @@ -206,6 +206,6 @@ export class PassThroughShim extends PassThrough { this.emit('writing'); this.shouldEmitWriting = false; } - super._final(callback); + callback(null); } }