diff --git a/packages/hash-stream-node/src/fileStreamHasher.ts b/packages/hash-stream-node/src/fileStreamHasher.ts index b15a92af4a03..5accbf903cb2 100644 --- a/packages/hash-stream-node/src/fileStreamHasher.ts +++ b/packages/hash-stream-node/src/fileStreamHasher.ts @@ -4,6 +4,7 @@ import { Readable } from "stream"; import { HashCalculator } from "./HashCalculator"; +// ToDo: deprecate in favor of readableStreamHasher export const fileStreamHasher: StreamHasher = (hashCtor: HashConstructor, fileStream: Readable) => new Promise((resolve, reject) => { if (!isReadStream(fileStream)) { diff --git a/packages/hash-stream-node/src/fsCreateReadStream.spec.ts b/packages/hash-stream-node/src/fsCreateReadStream.spec.ts new file mode 100644 index 000000000000..5a7e21a6b4b9 --- /dev/null +++ b/packages/hash-stream-node/src/fsCreateReadStream.spec.ts @@ -0,0 +1,46 @@ +import * as fs from "fs"; + +import { fsCreateReadStream } from "./fsCreateReadStream"; + +jest.setTimeout(30000); + +describe(fsCreateReadStream.name, () => { + const mockFileContents = fs.readFileSync(__filename, "utf8"); + + it("uses file descriptor if defined", (done) => { + fs.promises.open(__filename, "r").then((fd) => { + if ((fd as any).createReadStream) { + const readStream = (fd as any).createReadStream(); + const readStreamCopy = fsCreateReadStream(readStream); + + const chunks: Array = []; + readStreamCopy.on("data", (chunk) => { + chunks.push(chunk); + }); + readStreamCopy.on("end", () => { + const outputFileContents = Buffer.concat(chunks).toString(); + expect(outputFileContents).toEqual(mockFileContents); + done(); + }); + } else { + console.log(`Skipping createReadStream test as it's not available.`); + done(); + } + }); + }); + + it("uses start and end if file descriptor is not defined", (done) => { + const readStream = fs.createReadStream(__filename); + const readStreamCopy = fsCreateReadStream(readStream); + + const chunks: Array = []; + readStreamCopy.on("data", (chunk) => { + chunks.push(chunk); + }); + readStreamCopy.on("end", () => { + const outputFileContents = Buffer.concat(chunks).toString(); + expect(outputFileContents).toEqual(mockFileContents); + done(); + }); + }); +}); diff --git a/packages/hash-stream-node/src/fsCreateReadStream.ts b/packages/hash-stream-node/src/fsCreateReadStream.ts new file mode 100644 index 000000000000..837f51b5cf1d --- /dev/null +++ b/packages/hash-stream-node/src/fsCreateReadStream.ts @@ -0,0 +1,8 @@ +import { createReadStream, ReadStream } from "fs"; + +export const fsCreateReadStream = (readStream: ReadStream) => + createReadStream(readStream.path, { + fd: (readStream as any).fd, + start: (readStream as any).start, + end: (readStream as any).end, + }); diff --git a/packages/hash-stream-node/src/isFileStream.spec.ts b/packages/hash-stream-node/src/isFileStream.spec.ts new file mode 100644 index 000000000000..d8985dd8f21e --- /dev/null +++ b/packages/hash-stream-node/src/isFileStream.spec.ts @@ -0,0 +1,34 @@ +import { createReadStream } from "fs"; +import { Readable } from "stream"; + +import { isFileStream } from "./isFileStream"; + +describe(isFileStream.name, () => { + describe("returns true if readablestream is fs.ReadStream", () => { + it("with string path", () => { + const readStream = createReadStream(__filename); + expect(isFileStream(readStream)).toStrictEqual(true); + }); + + it("with buffer path", () => { + const readStream = createReadStream(Buffer.from(__filename, "utf-8")); + expect(isFileStream(readStream)).toStrictEqual(true); + }); + + it("with filehandle", async () => { + const { promises } = await import("fs"); + const fd = await promises.open(__filename, "r"); + // @ts-expect-error createReadStream is added in v16.11.0 + if (fd.createReadStream) { + // @ts-expect-error createReadStream is added in v16.11.0 + const readStream = fd.createReadStream(); + expect(isFileStream(readStream)).toStrictEqual(true); + } + }); + }); + + it("returns false if readablestream is not an fs.ReadStream", () => { + const readableStream = new Readable({ read: (size) => {} }); + expect(isFileStream(readableStream)).toStrictEqual(false); + }); +}); diff --git a/packages/hash-stream-node/src/isFileStream.ts b/packages/hash-stream-node/src/isFileStream.ts new file mode 100644 index 000000000000..3117afd0d914 --- /dev/null +++ b/packages/hash-stream-node/src/isFileStream.ts @@ -0,0 +1,7 @@ +import { ReadStream } from "fs"; +import { Readable } from "stream"; + +export const isFileStream = (stream: Readable): stream is ReadStream => + typeof (stream as ReadStream).path === "string" || + Buffer.isBuffer((stream as ReadStream).path) || + typeof (stream as any).fd === "number"; diff --git a/packages/hash-stream-node/src/readableStreamHasher.spec.ts b/packages/hash-stream-node/src/readableStreamHasher.spec.ts index d002a6ccc511..130e18051cf3 100644 --- a/packages/hash-stream-node/src/readableStreamHasher.spec.ts +++ b/packages/hash-stream-node/src/readableStreamHasher.spec.ts @@ -1,10 +1,16 @@ import { Hash } from "@aws-sdk/types"; -import { Readable, Writable, WritableOptions } from "stream"; +import { createReadStream } from "fs"; +import { Readable, Writable } from "stream"; +import { fsCreateReadStream } from "./fsCreateReadStream"; import { HashCalculator } from "./HashCalculator"; +import { isFileStream } from "./isFileStream"; import { readableStreamHasher } from "./readableStreamHasher"; +jest.mock("./fsCreateReadStream"); jest.mock("./HashCalculator"); +jest.mock("./isFileStream"); +jest.mock("fs"); describe(readableStreamHasher.name, () => { const mockDigest = jest.fn(); @@ -38,6 +44,7 @@ describe(readableStreamHasher.name, () => { (HashCalculator as unknown as jest.Mock).mockImplementation( (hash) => new MockHashCalculator(hash, mockHashCalculatorWrite, mockHashCalculatorEnd) ); + (isFileStream as unknown as jest.Mock).mockReturnValue(false); mockDigest.mockResolvedValue(mockHash); }); @@ -45,6 +52,21 @@ describe(readableStreamHasher.name, () => { jest.clearAllMocks(); }); + it("creates a copy in case of fileStream", () => { + (fsCreateReadStream as jest.Mock).mockReturnValue( + new Readable({ + read: (size) => {}, + }) + ); + (isFileStream as unknown as jest.Mock).mockReturnValue(true); + + const fsReadStream = createReadStream(__filename); + readableStreamHasher(mockHashCtor, fsReadStream); + + expect(isFileStream).toHaveBeenCalledWith(fsReadStream); + expect(fsCreateReadStream).toHaveBeenCalledWith(fsReadStream); + }); + it("computes hash for a readable stream", async () => { const readableStream = new Readable({ read: (size) => {}, diff --git a/packages/hash-stream-node/src/readableStreamHasher.ts b/packages/hash-stream-node/src/readableStreamHasher.ts index 272b0ee438af..6ee1296091dd 100644 --- a/packages/hash-stream-node/src/readableStreamHasher.ts +++ b/packages/hash-stream-node/src/readableStreamHasher.ts @@ -1,15 +1,20 @@ import { HashConstructor, StreamHasher } from "@aws-sdk/types"; import { Readable } from "stream"; +import { fsCreateReadStream } from "./fsCreateReadStream"; import { HashCalculator } from "./HashCalculator"; +import { isFileStream } from "./isFileStream"; export const readableStreamHasher: StreamHasher = (hashCtor: HashConstructor, readableStream: Readable) => { + // ToDo: throw if readableStream is already flowing and it's copy can't be created. + const streamToPipe = isFileStream(readableStream) ? fsCreateReadStream(readableStream) : readableStream; + const hash = new hashCtor(); const hashCalculator = new HashCalculator(hash); - readableStream.pipe(hashCalculator); + streamToPipe.pipe(hashCalculator); return new Promise((resolve, reject) => { - readableStream.on("error", (err: Error) => { + streamToPipe.on("error", (err: Error) => { // if the source errors, the destination stream needs to manually end hashCalculator.end(); reject(err);