Skip to content

Commit

Permalink
implement final, fix _write to invoke callback on nextTick
Browse files Browse the repository at this point in the history
  • Loading branch information
ddelgrosso1 committed Aug 9, 2022
1 parent 9eb1e88 commit 46494ce
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 10 deletions.
25 changes: 19 additions & 6 deletions src/util.ts
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

import * as querystring from 'querystring';
import {PassThrough} from 'stream';
import {PassThrough, TransformCallback} from 'stream';

export function normalize<T = {}, U = Function>(
optionsOrCallback?: T | U,
Expand Down Expand Up @@ -172,10 +172,6 @@ export class PassThroughShim extends PassThrough {
private shouldEmitReading = true;
private shouldEmitWriting = true;

constructor() {
super();
}

_read(size: number): void {
if (this.shouldEmitReading) {
this.emit('reading');
Expand All @@ -193,6 +189,23 @@ export class PassThroughShim extends PassThrough {
this.emit('writing');
this.shouldEmitWriting = false;
}
super._write(chunk, encoding, callback);
// Per the nodejs documention, callback must be invoked on the next tick
process.nextTick(() => {
super._write(chunk, encoding, callback);
});
}

_final(callback: (error?: Error | null | undefined) => void): void {
// If the stream is empty (i.e. empty file) final will be invoked before _read / _write
// and we should still emit the proper events.
if (this.shouldEmitReading) {
this.emit('reading');
this.shouldEmitReading = false;
}
if (this.shouldEmitWriting) {
this.emit('writing');
this.shouldEmitWriting = false;
}
super._final(callback);
}
}
6 changes: 2 additions & 4 deletions test/file.ts
Expand Up @@ -1916,10 +1916,8 @@ describe('File', () => {
const uploadStream = new PassThrough();

file.startResumableUpload_ = (dup: duplexify.Duplexify) => {
process.nextTick(() => {
dup.setWritable(uploadStream);
uploadStream.emit('error', error);
});
dup.setWritable(uploadStream);
uploadStream.emit('error', error);
};

const writable = file.createWriteStream();
Expand Down

0 comments on commit 46494ce

Please sign in to comment.