From 512ccb69807952f77c934310f9da5be7317b3dba Mon Sep 17 00:00:00 2001 From: "Kamat, Trivikram" <16024985+trivikr@users.noreply.github.com> Date: Mon, 13 Dec 2021 15:37:53 +0000 Subject: [PATCH 1/8] fix(hash-stream-node): replay file stream --- .../hash-stream-node/src/readableStreamHasher.ts | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/packages/hash-stream-node/src/readableStreamHasher.ts b/packages/hash-stream-node/src/readableStreamHasher.ts index 272b0ee438af..10ed6b8bd848 100644 --- a/packages/hash-stream-node/src/readableStreamHasher.ts +++ b/packages/hash-stream-node/src/readableStreamHasher.ts @@ -1,15 +1,23 @@ import { HashConstructor, StreamHasher } from "@aws-sdk/types"; +import { createReadStream, ReadStream } from "fs"; import { Readable } from "stream"; import { HashCalculator } from "./HashCalculator"; export const readableStreamHasher: StreamHasher = (hashCtor: HashConstructor, readableStream: Readable) => { + const streamToPipe = isFileStream(readableStream) + ? createReadStream(readableStream.path, { + start: (readableStream as any).start, + end: (readableStream as any).end, + }) + : readableStream; + const hash = new hashCtor(); const hashCalculator = new HashCalculator(hash); - readableStream.pipe(hashCalculator); + streamToPipe.pipe(hashCalculator); return new Promise((resolve, reject) => { - readableStream.on("error", (err: Error) => { + streamToPipe.on("error", (err: Error) => { // if the source errors, the destination stream needs to manually end hashCalculator.end(); reject(err); @@ -20,3 +28,5 @@ export const readableStreamHasher: StreamHasher = (hashCtor: HashConst }); }); }; + +const isFileStream = (stream: Readable): stream is ReadStream => typeof (stream as ReadStream).path === "string"; From a96bbe803dfd680ad9c53b82433f3e7f530b43ec Mon Sep 17 00:00:00 2001 From: "Kamat, Trivikram" <16024985+trivikr@users.noreply.github.com> Date: Sun, 20 Feb 2022 16:11:18 +0000 Subject: [PATCH 2/8] chore: add isFileStream check --- .../hash-stream-node/src/isFileStream.spec.ts | 16 ++++++++++++++++ packages/hash-stream-node/src/isFileStream.ts | 4 ++++ 2 files changed, 20 insertions(+) create mode 100644 packages/hash-stream-node/src/isFileStream.spec.ts create mode 100644 packages/hash-stream-node/src/isFileStream.ts diff --git a/packages/hash-stream-node/src/isFileStream.spec.ts b/packages/hash-stream-node/src/isFileStream.spec.ts new file mode 100644 index 000000000000..fe4b726b77b0 --- /dev/null +++ b/packages/hash-stream-node/src/isFileStream.spec.ts @@ -0,0 +1,16 @@ +import { createReadStream } from "fs"; +import { Readable } from "stream"; + +import { isFileStream } from "./isFileStream"; + +describe(isFileStream.name, () => { + it("returns true if readablestream is fs.ReadStream", () => { + const readStream = createReadStream(__filename); + expect(isFileStream(readStream)).toStrictEqual(true); + }); + + it("returns false if readablestream is not an fs.ReadStream", () => { + const readableStream = new Readable({ read: (size) => {} }); + expect(isFileStream(readableStream)).toStrictEqual(false); + }); +}); diff --git a/packages/hash-stream-node/src/isFileStream.ts b/packages/hash-stream-node/src/isFileStream.ts new file mode 100644 index 000000000000..fbbf7b7c687d --- /dev/null +++ b/packages/hash-stream-node/src/isFileStream.ts @@ -0,0 +1,4 @@ +import { ReadStream } from "fs"; +import { Readable } from "stream"; + +export const isFileStream = (stream: Readable): stream is ReadStream => typeof (stream as ReadStream).path === "string"; From 507eef8b0e4a3e1426bc6bbace539eee9986a710 Mon Sep 17 00:00:00 2001 From: "Kamat, Trivikram" <16024985+trivikr@users.noreply.github.com> Date: Sun, 20 Feb 2022 16:18:09 +0000 Subject: [PATCH 3/8] fix: check for buffer path in ReadStream.path --- packages/hash-stream-node/src/isFileStream.spec.ts | 13 ++++++++++--- packages/hash-stream-node/src/isFileStream.ts | 3 ++- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/packages/hash-stream-node/src/isFileStream.spec.ts b/packages/hash-stream-node/src/isFileStream.spec.ts index fe4b726b77b0..4c895d90c500 100644 --- a/packages/hash-stream-node/src/isFileStream.spec.ts +++ b/packages/hash-stream-node/src/isFileStream.spec.ts @@ -4,9 +4,16 @@ import { Readable } from "stream"; import { isFileStream } from "./isFileStream"; describe(isFileStream.name, () => { - it("returns true if readablestream is fs.ReadStream", () => { - const readStream = createReadStream(__filename); - expect(isFileStream(readStream)).toStrictEqual(true); + describe("returns true if readablestream is fs.ReadStream", () => { + it("with string path", () => { + const readStream = createReadStream(__filename); + expect(isFileStream(readStream)).toStrictEqual(true); + }); + + it("with buffer path", () => { + const readStream = createReadStream(Buffer.from(__filename, "utf-8")); + expect(isFileStream(readStream)).toStrictEqual(true); + }); }); it("returns false if readablestream is not an fs.ReadStream", () => { diff --git a/packages/hash-stream-node/src/isFileStream.ts b/packages/hash-stream-node/src/isFileStream.ts index fbbf7b7c687d..4c0555f914a0 100644 --- a/packages/hash-stream-node/src/isFileStream.ts +++ b/packages/hash-stream-node/src/isFileStream.ts @@ -1,4 +1,5 @@ import { ReadStream } from "fs"; import { Readable } from "stream"; -export const isFileStream = (stream: Readable): stream is ReadStream => typeof (stream as ReadStream).path === "string"; +export const isFileStream = (stream: Readable): stream is ReadStream => + typeof (stream as ReadStream).path === "string" || Buffer.isBuffer((stream as ReadStream).path); From 44160a677d204d335128acd8987a23cdda6db259 Mon Sep 17 00:00:00 2001 From: "Kamat, Trivikram" <16024985+trivikr@users.noreply.github.com> Date: Sun, 20 Feb 2022 16:33:17 +0000 Subject: [PATCH 4/8] chore: create copy to replay in case of filestream --- .../src/readableStreamHasher.spec.ts | 25 ++++++++++++++++++- .../src/readableStreamHasher.ts | 5 ++-- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/packages/hash-stream-node/src/readableStreamHasher.spec.ts b/packages/hash-stream-node/src/readableStreamHasher.spec.ts index d002a6ccc511..86c576b9b4c6 100644 --- a/packages/hash-stream-node/src/readableStreamHasher.spec.ts +++ b/packages/hash-stream-node/src/readableStreamHasher.spec.ts @@ -1,10 +1,14 @@ import { Hash } from "@aws-sdk/types"; -import { Readable, Writable, WritableOptions } from "stream"; +import { createReadStream } from "fs"; +import { Readable, Writable } from "stream"; import { HashCalculator } from "./HashCalculator"; +import { isFileStream } from "./isFileStream"; import { readableStreamHasher } from "./readableStreamHasher"; jest.mock("./HashCalculator"); +jest.mock("./isFileStream"); +jest.mock("fs"); describe(readableStreamHasher.name, () => { const mockDigest = jest.fn(); @@ -38,6 +42,7 @@ describe(readableStreamHasher.name, () => { (HashCalculator as unknown as jest.Mock).mockImplementation( (hash) => new MockHashCalculator(hash, mockHashCalculatorWrite, mockHashCalculatorEnd) ); + (isFileStream as unknown as jest.Mock).mockReturnValue(false); mockDigest.mockResolvedValue(mockHash); }); @@ -45,6 +50,24 @@ describe(readableStreamHasher.name, () => { jest.clearAllMocks(); }); + it("creates a copy in case of fileStream", () => { + (createReadStream as jest.Mock).mockReturnValue( + new Readable({ + read: (size) => {}, + }) + ); + (isFileStream as unknown as jest.Mock).mockReturnValue(true); + + const fsReadStream = createReadStream(__filename); + readableStreamHasher(mockHashCtor, fsReadStream); + + expect(isFileStream).toHaveBeenCalledWith(fsReadStream); + expect(createReadStream).toHaveBeenCalledWith(fsReadStream.path, { + start: (fsReadStream as any).start, + end: (fsReadStream as any).end, + }); + }); + it("computes hash for a readable stream", async () => { const readableStream = new Readable({ read: (size) => {}, diff --git a/packages/hash-stream-node/src/readableStreamHasher.ts b/packages/hash-stream-node/src/readableStreamHasher.ts index 10ed6b8bd848..6962bbf78c9f 100644 --- a/packages/hash-stream-node/src/readableStreamHasher.ts +++ b/packages/hash-stream-node/src/readableStreamHasher.ts @@ -1,8 +1,9 @@ import { HashConstructor, StreamHasher } from "@aws-sdk/types"; -import { createReadStream, ReadStream } from "fs"; +import { createReadStream } from "fs"; import { Readable } from "stream"; import { HashCalculator } from "./HashCalculator"; +import { isFileStream } from "./isFileStream"; export const readableStreamHasher: StreamHasher = (hashCtor: HashConstructor, readableStream: Readable) => { const streamToPipe = isFileStream(readableStream) @@ -28,5 +29,3 @@ export const readableStreamHasher: StreamHasher = (hashCtor: HashConst }); }); }; - -const isFileStream = (stream: Readable): stream is ReadStream => typeof (stream as ReadStream).path === "string"; From 568eb4b338d43a2bb06a32cc81ea7fe1b1fc4573 Mon Sep 17 00:00:00 2001 From: "Kamat, Trivikram" <16024985+trivikr@users.noreply.github.com> Date: Mon, 21 Feb 2022 15:19:05 +0000 Subject: [PATCH 5/8] fix: detect fd.createReadStream --- packages/hash-stream-node/src/isFileStream.spec.ts | 11 +++++++++++ packages/hash-stream-node/src/isFileStream.ts | 5 ++++- packages/hash-stream-node/src/readableStreamHasher.ts | 1 + 3 files changed, 16 insertions(+), 1 deletion(-) diff --git a/packages/hash-stream-node/src/isFileStream.spec.ts b/packages/hash-stream-node/src/isFileStream.spec.ts index 4c895d90c500..d8985dd8f21e 100644 --- a/packages/hash-stream-node/src/isFileStream.spec.ts +++ b/packages/hash-stream-node/src/isFileStream.spec.ts @@ -14,6 +14,17 @@ describe(isFileStream.name, () => { const readStream = createReadStream(Buffer.from(__filename, "utf-8")); expect(isFileStream(readStream)).toStrictEqual(true); }); + + it("with filehandle", async () => { + const { promises } = await import("fs"); + const fd = await promises.open(__filename, "r"); + // @ts-expect-error createReadStream is added in v16.11.0 + if (fd.createReadStream) { + // @ts-expect-error createReadStream is added in v16.11.0 + const readStream = fd.createReadStream(); + expect(isFileStream(readStream)).toStrictEqual(true); + } + }); }); it("returns false if readablestream is not an fs.ReadStream", () => { diff --git a/packages/hash-stream-node/src/isFileStream.ts b/packages/hash-stream-node/src/isFileStream.ts index 4c0555f914a0..e0016879c551 100644 --- a/packages/hash-stream-node/src/isFileStream.ts +++ b/packages/hash-stream-node/src/isFileStream.ts @@ -2,4 +2,7 @@ import { ReadStream } from "fs"; import { Readable } from "stream"; export const isFileStream = (stream: Readable): stream is ReadStream => - typeof (stream as ReadStream).path === "string" || Buffer.isBuffer((stream as ReadStream).path); + typeof (stream as ReadStream).path === "string" || + Buffer.isBuffer((stream as ReadStream).path) || + // @ts-expect-error fd is not defined in @types/node + typeof (stream as ReadStream).fd === "number"; diff --git a/packages/hash-stream-node/src/readableStreamHasher.ts b/packages/hash-stream-node/src/readableStreamHasher.ts index 6962bbf78c9f..420bb7787bad 100644 --- a/packages/hash-stream-node/src/readableStreamHasher.ts +++ b/packages/hash-stream-node/src/readableStreamHasher.ts @@ -6,6 +6,7 @@ import { HashCalculator } from "./HashCalculator"; import { isFileStream } from "./isFileStream"; export const readableStreamHasher: StreamHasher = (hashCtor: HashConstructor, readableStream: Readable) => { + // ToDo: create accurate copy if filestream is created from file descriptor. const streamToPipe = isFileStream(readableStream) ? createReadStream(readableStream.path, { start: (readableStream as any).start, From e1661179d092568bde7bf85395acf7d70bfc07f0 Mon Sep 17 00:00:00 2001 From: "Kamat, Trivikram" <16024985+trivikr@users.noreply.github.com> Date: Mon, 21 Feb 2022 15:39:19 +0000 Subject: [PATCH 6/8] chore: add comment about throwing if uncopieable readableStream is already flowing --- packages/hash-stream-node/src/readableStreamHasher.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/hash-stream-node/src/readableStreamHasher.ts b/packages/hash-stream-node/src/readableStreamHasher.ts index 420bb7787bad..29b41a07d401 100644 --- a/packages/hash-stream-node/src/readableStreamHasher.ts +++ b/packages/hash-stream-node/src/readableStreamHasher.ts @@ -6,6 +6,7 @@ import { HashCalculator } from "./HashCalculator"; import { isFileStream } from "./isFileStream"; export const readableStreamHasher: StreamHasher = (hashCtor: HashConstructor, readableStream: Readable) => { + // ToDo: throw if readableStream is already flowing and it's copy can't be created. // ToDo: create accurate copy if filestream is created from file descriptor. const streamToPipe = isFileStream(readableStream) ? createReadStream(readableStream.path, { From 84cae7be2d4c7234a706be9c30b2fa069ce1f402 Mon Sep 17 00:00:00 2001 From: "Kamat, Trivikram" <16024985+trivikr@users.noreply.github.com> Date: Mon, 21 Feb 2022 17:05:52 +0000 Subject: [PATCH 7/8] feat: create read stream copy in case of file descriptor --- .../src/fsCreateReadStream.spec.ts | 46 +++++++++++++++++++ .../src/fsCreateReadStream.ts | 8 ++++ packages/hash-stream-node/src/isFileStream.ts | 3 +- .../src/readableStreamHasher.spec.ts | 9 ++-- .../src/readableStreamHasher.ts | 10 +--- 5 files changed, 61 insertions(+), 15 deletions(-) create mode 100644 packages/hash-stream-node/src/fsCreateReadStream.spec.ts create mode 100644 packages/hash-stream-node/src/fsCreateReadStream.ts diff --git a/packages/hash-stream-node/src/fsCreateReadStream.spec.ts b/packages/hash-stream-node/src/fsCreateReadStream.spec.ts new file mode 100644 index 000000000000..5a7e21a6b4b9 --- /dev/null +++ b/packages/hash-stream-node/src/fsCreateReadStream.spec.ts @@ -0,0 +1,46 @@ +import * as fs from "fs"; + +import { fsCreateReadStream } from "./fsCreateReadStream"; + +jest.setTimeout(30000); + +describe(fsCreateReadStream.name, () => { + const mockFileContents = fs.readFileSync(__filename, "utf8"); + + it("uses file descriptor if defined", (done) => { + fs.promises.open(__filename, "r").then((fd) => { + if ((fd as any).createReadStream) { + const readStream = (fd as any).createReadStream(); + const readStreamCopy = fsCreateReadStream(readStream); + + const chunks: Array = []; + readStreamCopy.on("data", (chunk) => { + chunks.push(chunk); + }); + readStreamCopy.on("end", () => { + const outputFileContents = Buffer.concat(chunks).toString(); + expect(outputFileContents).toEqual(mockFileContents); + done(); + }); + } else { + console.log(`Skipping createReadStream test as it's not available.`); + done(); + } + }); + }); + + it("uses start and end if file descriptor is not defined", (done) => { + const readStream = fs.createReadStream(__filename); + const readStreamCopy = fsCreateReadStream(readStream); + + const chunks: Array = []; + readStreamCopy.on("data", (chunk) => { + chunks.push(chunk); + }); + readStreamCopy.on("end", () => { + const outputFileContents = Buffer.concat(chunks).toString(); + expect(outputFileContents).toEqual(mockFileContents); + done(); + }); + }); +}); diff --git a/packages/hash-stream-node/src/fsCreateReadStream.ts b/packages/hash-stream-node/src/fsCreateReadStream.ts new file mode 100644 index 000000000000..837f51b5cf1d --- /dev/null +++ b/packages/hash-stream-node/src/fsCreateReadStream.ts @@ -0,0 +1,8 @@ +import { createReadStream, ReadStream } from "fs"; + +export const fsCreateReadStream = (readStream: ReadStream) => + createReadStream(readStream.path, { + fd: (readStream as any).fd, + start: (readStream as any).start, + end: (readStream as any).end, + }); diff --git a/packages/hash-stream-node/src/isFileStream.ts b/packages/hash-stream-node/src/isFileStream.ts index e0016879c551..3117afd0d914 100644 --- a/packages/hash-stream-node/src/isFileStream.ts +++ b/packages/hash-stream-node/src/isFileStream.ts @@ -4,5 +4,4 @@ import { Readable } from "stream"; export const isFileStream = (stream: Readable): stream is ReadStream => typeof (stream as ReadStream).path === "string" || Buffer.isBuffer((stream as ReadStream).path) || - // @ts-expect-error fd is not defined in @types/node - typeof (stream as ReadStream).fd === "number"; + typeof (stream as any).fd === "number"; diff --git a/packages/hash-stream-node/src/readableStreamHasher.spec.ts b/packages/hash-stream-node/src/readableStreamHasher.spec.ts index 86c576b9b4c6..130e18051cf3 100644 --- a/packages/hash-stream-node/src/readableStreamHasher.spec.ts +++ b/packages/hash-stream-node/src/readableStreamHasher.spec.ts @@ -2,10 +2,12 @@ import { Hash } from "@aws-sdk/types"; import { createReadStream } from "fs"; import { Readable, Writable } from "stream"; +import { fsCreateReadStream } from "./fsCreateReadStream"; import { HashCalculator } from "./HashCalculator"; import { isFileStream } from "./isFileStream"; import { readableStreamHasher } from "./readableStreamHasher"; +jest.mock("./fsCreateReadStream"); jest.mock("./HashCalculator"); jest.mock("./isFileStream"); jest.mock("fs"); @@ -51,7 +53,7 @@ describe(readableStreamHasher.name, () => { }); it("creates a copy in case of fileStream", () => { - (createReadStream as jest.Mock).mockReturnValue( + (fsCreateReadStream as jest.Mock).mockReturnValue( new Readable({ read: (size) => {}, }) @@ -62,10 +64,7 @@ describe(readableStreamHasher.name, () => { readableStreamHasher(mockHashCtor, fsReadStream); expect(isFileStream).toHaveBeenCalledWith(fsReadStream); - expect(createReadStream).toHaveBeenCalledWith(fsReadStream.path, { - start: (fsReadStream as any).start, - end: (fsReadStream as any).end, - }); + expect(fsCreateReadStream).toHaveBeenCalledWith(fsReadStream); }); it("computes hash for a readable stream", async () => { diff --git a/packages/hash-stream-node/src/readableStreamHasher.ts b/packages/hash-stream-node/src/readableStreamHasher.ts index 29b41a07d401..6ee1296091dd 100644 --- a/packages/hash-stream-node/src/readableStreamHasher.ts +++ b/packages/hash-stream-node/src/readableStreamHasher.ts @@ -1,19 +1,13 @@ import { HashConstructor, StreamHasher } from "@aws-sdk/types"; -import { createReadStream } from "fs"; import { Readable } from "stream"; +import { fsCreateReadStream } from "./fsCreateReadStream"; import { HashCalculator } from "./HashCalculator"; import { isFileStream } from "./isFileStream"; export const readableStreamHasher: StreamHasher = (hashCtor: HashConstructor, readableStream: Readable) => { // ToDo: throw if readableStream is already flowing and it's copy can't be created. - // ToDo: create accurate copy if filestream is created from file descriptor. - const streamToPipe = isFileStream(readableStream) - ? createReadStream(readableStream.path, { - start: (readableStream as any).start, - end: (readableStream as any).end, - }) - : readableStream; + const streamToPipe = isFileStream(readableStream) ? fsCreateReadStream(readableStream) : readableStream; const hash = new hashCtor(); const hashCalculator = new HashCalculator(hash); From 52a640d8194c3d22988591a8751d24fec0c15e4a Mon Sep 17 00:00:00 2001 From: "Kamat, Trivikram" <16024985+trivikr@users.noreply.github.com> Date: Mon, 21 Feb 2022 17:06:46 +0000 Subject: [PATCH 8/8] chore: add a ToDo to deprecate fileStreamHasher --- packages/hash-stream-node/src/fileStreamHasher.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/hash-stream-node/src/fileStreamHasher.ts b/packages/hash-stream-node/src/fileStreamHasher.ts index b15a92af4a03..5accbf903cb2 100644 --- a/packages/hash-stream-node/src/fileStreamHasher.ts +++ b/packages/hash-stream-node/src/fileStreamHasher.ts @@ -4,6 +4,7 @@ import { Readable } from "stream"; import { HashCalculator } from "./HashCalculator"; +// ToDo: deprecate in favor of readableStreamHasher export const fileStreamHasher: StreamHasher = (hashCtor: HashConstructor, fileStream: Readable) => new Promise((resolve, reject) => { if (!isReadStream(fileStream)) {