Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(deps): remove stream-events dependency #2022

Merged
merged 4 commits into from Aug 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 0 additions & 1 deletion package.json
Expand Up @@ -67,7 +67,6 @@
"p-limit": "^3.0.1",
"pumpify": "^2.0.0",
"retry-request": "^5.0.0",
"stream-events": "^1.0.4",
"teeny-request": "^8.0.0",
"uuid": "^8.0.0"
},
Expand Down
25 changes: 15 additions & 10 deletions src/file.ts
Expand Up @@ -31,8 +31,7 @@ import * as mime from 'mime';
// eslint-disable-next-line @typescript-eslint/no-var-requires
const pumpify = require('pumpify');
import * as resumableUpload from './resumable-upload';
import {Duplex, Writable, Readable, PassThrough} from 'stream';
import * as streamEvents from 'stream-events';
import {Writable, Readable, PassThrough} from 'stream';
import * as zlib from 'zlib';
import * as http from 'http';

Expand Down Expand Up @@ -65,6 +64,7 @@ import {
objectKeyToLowercase,
unicodeJSONStringify,
formatAsUTCISO,
PassThroughShim,
} from './util';
import {CRC32CValidatorGenerator} from './crc32c';
import {HashStreamValidator} from './hash-stream-validator';
Expand Down Expand Up @@ -1363,7 +1363,7 @@ class File extends ServiceObject<File> {

let validateStream: HashStreamValidator | undefined = undefined;

const throughStream = streamEvents(new PassThrough());
const throughStream = new PassThroughShim();

let isServedCompressed = true;
let crc32c = true;
Expand Down Expand Up @@ -1938,13 +1938,18 @@ class File extends ServiceObject<File> {
stream.emit('progress', evt);
});

const stream = streamEvents(
pumpify([
gzip ? zlib.createGzip() : new PassThrough(),
validateStream,
fileWriteStream,
])
) as Duplex;
const passThroughShim = new PassThroughShim();

passThroughShim.on('writing', () => {
stream.emit('writing');
});

const stream = pumpify([
passThroughShim,
gzip ? zlib.createGzip() : new PassThrough(),
validateStream,
fileWriteStream,
]);

// Wait until we've received data to determine what upload technique to use.
stream.on('writing', () => {
Expand Down
43 changes: 43 additions & 0 deletions src/util.ts
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

import * as querystring from 'querystring';
import {PassThrough} from 'stream';

export function normalize<T = {}, U = Function>(
optionsOrCallback?: T | U,
Expand Down Expand Up @@ -166,3 +167,45 @@ export function formatAsUTCISO(

return resultString;
}

export class PassThroughShim extends PassThrough {
private shouldEmitReading = true;
private shouldEmitWriting = true;
Comment on lines +172 to +173
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at this further, 'reading' and 'writing' are called on every _read and _write event - to be fully backwards-compatible this class should do the same

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah good catch, I misread the stream-events / stubs code. Will fix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I do believe it is only called once: https://github.com/stephenplusplus/stubs/blob/580b4d8b3a34c1193d5ec10e0db0d6ea4b4c0ec5/index.js#L30 it looks like after the number of calls is exhausted we revert to the cached function and stream-events would stop emitting reading / writing.


_read(size: number): void {
if (this.shouldEmitReading) {
this.emit('reading');
this.shouldEmitReading = false;
}
super._read(size);
}

_write(
danielbankhead marked this conversation as resolved.
Show resolved Hide resolved
chunk: never,
encoding: BufferEncoding,
callback: (error?: Error | null | undefined) => void
): void {
if (this.shouldEmitWriting) {
this.emit('writing');
this.shouldEmitWriting = false;
}
// Per the nodejs documention, callback must be invoked on the next tick
process.nextTick(() => {
super._write(chunk, encoding, callback);
});
}

_final(callback: (error?: Error | null | undefined) => void): void {
// If the stream is empty (i.e. empty file) final will be invoked before _read / _write
// and we should still emit the proper events.
if (this.shouldEmitReading) {
this.emit('reading');
this.shouldEmitReading = false;
}
if (this.shouldEmitWriting) {
this.emit('writing');
this.shouldEmitWriting = false;
}
callback(null);
}
}