Skip to content

Commit

Permalink
fix(hash-stream-node): support file stream in readableStreamHasher (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
trivikr committed Feb 21, 2022
1 parent 15e0512 commit 1e3faa1
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 3 deletions.
1 change: 1 addition & 0 deletions packages/hash-stream-node/src/fileStreamHasher.ts
Expand Up @@ -4,6 +4,7 @@ import { Readable } from "stream";

import { HashCalculator } from "./HashCalculator";

// ToDo: deprecate in favor of readableStreamHasher
export const fileStreamHasher: StreamHasher<Readable> = (hashCtor: HashConstructor, fileStream: Readable) =>
new Promise((resolve, reject) => {
if (!isReadStream(fileStream)) {
Expand Down
46 changes: 46 additions & 0 deletions packages/hash-stream-node/src/fsCreateReadStream.spec.ts
@@ -0,0 +1,46 @@
import * as fs from "fs";

import { fsCreateReadStream } from "./fsCreateReadStream";

jest.setTimeout(30000);

describe(fsCreateReadStream.name, () => {
const mockFileContents = fs.readFileSync(__filename, "utf8");

it("uses file descriptor if defined", (done) => {
fs.promises.open(__filename, "r").then((fd) => {
if ((fd as any).createReadStream) {
const readStream = (fd as any).createReadStream();
const readStreamCopy = fsCreateReadStream(readStream);

const chunks: Array<Buffer> = [];
readStreamCopy.on("data", (chunk) => {
chunks.push(chunk);
});
readStreamCopy.on("end", () => {
const outputFileContents = Buffer.concat(chunks).toString();
expect(outputFileContents).toEqual(mockFileContents);
done();
});
} else {
console.log(`Skipping createReadStream test as it's not available.`);
done();
}
});
});

it("uses start and end if file descriptor is not defined", (done) => {
const readStream = fs.createReadStream(__filename);
const readStreamCopy = fsCreateReadStream(readStream);

const chunks: Array<Buffer> = [];
readStreamCopy.on("data", (chunk) => {
chunks.push(chunk);
});
readStreamCopy.on("end", () => {
const outputFileContents = Buffer.concat(chunks).toString();
expect(outputFileContents).toEqual(mockFileContents);
done();
});
});
});
8 changes: 8 additions & 0 deletions packages/hash-stream-node/src/fsCreateReadStream.ts
@@ -0,0 +1,8 @@
import { createReadStream, ReadStream } from "fs";

export const fsCreateReadStream = (readStream: ReadStream) =>
createReadStream(readStream.path, {
fd: (readStream as any).fd,
start: (readStream as any).start,
end: (readStream as any).end,
});
34 changes: 34 additions & 0 deletions packages/hash-stream-node/src/isFileStream.spec.ts
@@ -0,0 +1,34 @@
import { createReadStream } from "fs";
import { Readable } from "stream";

import { isFileStream } from "./isFileStream";

describe(isFileStream.name, () => {
describe("returns true if readablestream is fs.ReadStream", () => {
it("with string path", () => {
const readStream = createReadStream(__filename);
expect(isFileStream(readStream)).toStrictEqual(true);
});

it("with buffer path", () => {
const readStream = createReadStream(Buffer.from(__filename, "utf-8"));
expect(isFileStream(readStream)).toStrictEqual(true);
});

it("with filehandle", async () => {
const { promises } = await import("fs");
const fd = await promises.open(__filename, "r");
// @ts-expect-error createReadStream is added in v16.11.0
if (fd.createReadStream) {
// @ts-expect-error createReadStream is added in v16.11.0
const readStream = fd.createReadStream();
expect(isFileStream(readStream)).toStrictEqual(true);
}
});
});

it("returns false if readablestream is not an fs.ReadStream", () => {
const readableStream = new Readable({ read: (size) => {} });
expect(isFileStream(readableStream)).toStrictEqual(false);
});
});
7 changes: 7 additions & 0 deletions packages/hash-stream-node/src/isFileStream.ts
@@ -0,0 +1,7 @@
import { ReadStream } from "fs";
import { Readable } from "stream";

export const isFileStream = (stream: Readable): stream is ReadStream =>
typeof (stream as ReadStream).path === "string" ||
Buffer.isBuffer((stream as ReadStream).path) ||
typeof (stream as any).fd === "number";
24 changes: 23 additions & 1 deletion packages/hash-stream-node/src/readableStreamHasher.spec.ts
@@ -1,10 +1,16 @@
import { Hash } from "@aws-sdk/types";
import { Readable, Writable, WritableOptions } from "stream";
import { createReadStream } from "fs";
import { Readable, Writable } from "stream";

import { fsCreateReadStream } from "./fsCreateReadStream";
import { HashCalculator } from "./HashCalculator";
import { isFileStream } from "./isFileStream";
import { readableStreamHasher } from "./readableStreamHasher";

jest.mock("./fsCreateReadStream");
jest.mock("./HashCalculator");
jest.mock("./isFileStream");
jest.mock("fs");

describe(readableStreamHasher.name, () => {
const mockDigest = jest.fn();
Expand Down Expand Up @@ -38,13 +44,29 @@ describe(readableStreamHasher.name, () => {
(HashCalculator as unknown as jest.Mock).mockImplementation(
(hash) => new MockHashCalculator(hash, mockHashCalculatorWrite, mockHashCalculatorEnd)
);
(isFileStream as unknown as jest.Mock).mockReturnValue(false);
mockDigest.mockResolvedValue(mockHash);
});

afterEach(() => {
jest.clearAllMocks();
});

it("creates a copy in case of fileStream", () => {
(fsCreateReadStream as jest.Mock).mockReturnValue(
new Readable({
read: (size) => {},
})
);
(isFileStream as unknown as jest.Mock).mockReturnValue(true);

const fsReadStream = createReadStream(__filename);
readableStreamHasher(mockHashCtor, fsReadStream);

expect(isFileStream).toHaveBeenCalledWith(fsReadStream);
expect(fsCreateReadStream).toHaveBeenCalledWith(fsReadStream);
});

it("computes hash for a readable stream", async () => {
const readableStream = new Readable({
read: (size) => {},
Expand Down
9 changes: 7 additions & 2 deletions packages/hash-stream-node/src/readableStreamHasher.ts
@@ -1,15 +1,20 @@
import { HashConstructor, StreamHasher } from "@aws-sdk/types";
import { Readable } from "stream";

import { fsCreateReadStream } from "./fsCreateReadStream";
import { HashCalculator } from "./HashCalculator";
import { isFileStream } from "./isFileStream";

export const readableStreamHasher: StreamHasher<Readable> = (hashCtor: HashConstructor, readableStream: Readable) => {
// ToDo: throw if readableStream is already flowing and it's copy can't be created.
const streamToPipe = isFileStream(readableStream) ? fsCreateReadStream(readableStream) : readableStream;

const hash = new hashCtor();
const hashCalculator = new HashCalculator(hash);
readableStream.pipe(hashCalculator);
streamToPipe.pipe(hashCalculator);

return new Promise((resolve, reject) => {
readableStream.on("error", (err: Error) => {
streamToPipe.on("error", (err: Error) => {
// if the source errors, the destination stream needs to manually end
hashCalculator.end();
reject(err);
Expand Down

0 comments on commit 1e3faa1

Please sign in to comment.