diff --git a/package.json b/package.json index ffa3a8edc..9e419629a 100644 --- a/package.json +++ b/package.json @@ -67,7 +67,6 @@ "p-limit": "^3.0.1", "pumpify": "^2.0.0", "retry-request": "^5.0.0", - "stream-events": "^1.0.4", "teeny-request": "^8.0.0", "uuid": "^8.0.0" }, diff --git a/src/file.ts b/src/file.ts index 6c7fc5cf0..9132f972d 100644 --- a/src/file.ts +++ b/src/file.ts @@ -31,8 +31,7 @@ import * as mime from 'mime'; // eslint-disable-next-line @typescript-eslint/no-var-requires const pumpify = require('pumpify'); import * as resumableUpload from './resumable-upload'; -import {Duplex, Writable, Readable, PassThrough} from 'stream'; -import * as streamEvents from 'stream-events'; +import {Writable, Readable, PassThrough} from 'stream'; import * as zlib from 'zlib'; import * as http from 'http'; @@ -65,6 +64,7 @@ import { objectKeyToLowercase, unicodeJSONStringify, formatAsUTCISO, + PassThroughShim, } from './util'; import {CRC32CValidatorGenerator} from './crc32c'; import {HashStreamValidator} from './hash-stream-validator'; @@ -1363,7 +1363,7 @@ class File extends ServiceObject { let validateStream: HashStreamValidator | undefined = undefined; - const throughStream = streamEvents(new PassThrough()); + const throughStream = new PassThroughShim(); let isServedCompressed = true; let crc32c = true; @@ -1938,13 +1938,18 @@ class File extends ServiceObject { stream.emit('progress', evt); }); - const stream = streamEvents( - pumpify([ - gzip ? zlib.createGzip() : new PassThrough(), - validateStream, - fileWriteStream, - ]) - ) as Duplex; + const passThroughShim = new PassThroughShim(); + + passThroughShim.on('writing', () => { + stream.emit('writing'); + }); + + const stream = pumpify([ + passThroughShim, + gzip ? zlib.createGzip() : new PassThrough(), + validateStream, + fileWriteStream, + ]); // Wait until we've received data to determine what upload technique to use. stream.on('writing', () => { diff --git a/src/util.ts b/src/util.ts index 89ba6a2d5..a9710554c 100644 --- a/src/util.ts +++ b/src/util.ts @@ -13,6 +13,7 @@ // limitations under the License. import * as querystring from 'querystring'; +import {PassThrough} from 'stream'; export function normalize( optionsOrCallback?: T | U, @@ -166,3 +167,45 @@ export function formatAsUTCISO( return resultString; } + +export class PassThroughShim extends PassThrough { + private shouldEmitReading = true; + private shouldEmitWriting = true; + + _read(size: number): void { + if (this.shouldEmitReading) { + this.emit('reading'); + this.shouldEmitReading = false; + } + super._read(size); + } + + _write( + chunk: never, + encoding: BufferEncoding, + callback: (error?: Error | null | undefined) => void + ): void { + if (this.shouldEmitWriting) { + this.emit('writing'); + this.shouldEmitWriting = false; + } + // Per the nodejs documention, callback must be invoked on the next tick + process.nextTick(() => { + super._write(chunk, encoding, callback); + }); + } + + _final(callback: (error?: Error | null | undefined) => void): void { + // If the stream is empty (i.e. empty file) final will be invoked before _read / _write + // and we should still emit the proper events. + if (this.shouldEmitReading) { + this.emit('reading'); + this.shouldEmitReading = false; + } + if (this.shouldEmitWriting) { + this.emit('writing'); + this.shouldEmitWriting = false; + } + callback(null); + } +}