Skip to content

Commit

Permalink
feat(hash-stream-node): add readableStreamHasher (#3088)
Browse files Browse the repository at this point in the history
  • Loading branch information
trivikr committed Dec 10, 2021
1 parent 5c63cef commit 943dab0
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 3 deletions.
4 changes: 1 addition & 3 deletions packages/hash-stream-node/README.md
Expand Up @@ -3,9 +3,7 @@
[![NPM version](https://img.shields.io/npm/v/@aws-sdk/hash-stream-node/latest.svg)](https://www.npmjs.com/package/@aws-sdk/hash-stream-node)
[![NPM downloads](https://img.shields.io/npm/dm/@aws-sdk/hash-stream-node.svg)](https://www.npmjs.com/package/@aws-sdk/hash-stream-node)

A utility for calculating the hash of Node.JS readable streams. This package is
currently only compatible with file streams, as no other stream type can be
replayed.
A utility for calculating the hash of Node.JS readable streams.

> An internal package
Expand Down
1 change: 1 addition & 0 deletions packages/hash-stream-node/src/index.ts
@@ -1 +1,2 @@
export * from "./fileStreamHasher";
export * from "./readableStreamHasher";
140 changes: 140 additions & 0 deletions packages/hash-stream-node/src/readableStreamHasher.spec.ts
@@ -0,0 +1,140 @@
import { Hash } from "@aws-sdk/types";
import { Readable, Writable, WritableOptions } from "stream";

import { HashCalculator } from "./HashCalculator";
import { readableStreamHasher } from "./readableStreamHasher";

jest.mock("./HashCalculator");

describe(readableStreamHasher.name, () => {
const mockDigest = jest.fn();
const mockHashCtor = jest.fn().mockImplementation(() => ({
update: jest.fn(),
digest: mockDigest,
}));

const mockHashCalculatorWrite = jest.fn();
const mockHashCalculatorEnd = jest.fn();

const mockHash = new Uint8Array(Buffer.from("mockHash"));

class MockHashCalculator extends Writable {
constructor(public readonly hash: Hash, public readonly mockWrite, public readonly mockEnd) {
super();
}

_write(chunk: Buffer, encoding: string, callback: (err?: Error) => void) {
this.mockWrite(chunk);
callback();
}

end() {
this.mockEnd();
super.end();
}
}

beforeEach(() => {
(HashCalculator as unknown as jest.Mock).mockImplementation(
(hash) => new MockHashCalculator(hash, mockHashCalculatorWrite, mockHashCalculatorEnd)
);
mockDigest.mockResolvedValue(mockHash);
});

afterEach(() => {
jest.clearAllMocks();
});

it("computes hash for a readable stream", async () => {
const readableStream = new Readable({
read: (size) => {},
});
const hashPromise = readableStreamHasher(mockHashCtor, readableStream);

// @ts-ignore Property '_readableState' does not exist on type 'Readable'.
const { pipesCount } = readableStream._readableState;
expect(pipesCount).toEqual(1);

const mockDataChunks = ["Hello", "World"];
setTimeout(() => {
mockDataChunks.forEach((chunk) => readableStream.emit("data", chunk));
readableStream.emit("end");
}, 100);

expect(await hashPromise).toEqual(mockHash);
expect(mockHashCalculatorWrite).toHaveBeenCalledTimes(mockDataChunks.length);
mockDataChunks.forEach((chunk, index) =>
expect(mockHashCalculatorWrite).toHaveBeenNthCalledWith(index + 1, Buffer.from(chunk))
);
expect(mockDigest).toHaveBeenCalledTimes(1);
expect(mockHashCalculatorEnd).toHaveBeenCalledTimes(1);
});

it("throws error if readable stream throws error", async () => {
const readableStream = new Readable({
read: (size) => {},
});
const hashPromise = readableStreamHasher(mockHashCtor, readableStream);

const mockError = new Error("error");
setTimeout(() => {
readableStream.emit("error", mockError);
}, 100);

try {
await hashPromise;
fail(`should throw error ${mockError}`);
} catch (error) {
expect(error).toEqual(mockError);
expect(mockHashCalculatorEnd).toHaveBeenCalledTimes(1);
}
});

it("throws error if HashCalculator throws error", async () => {
const mockHashCalculator = new MockHashCalculator(
mockHashCtor as any,
mockHashCalculatorWrite,
mockHashCalculatorEnd
);
(HashCalculator as unknown as jest.Mock).mockImplementation((hash) => mockHashCalculator);

const readableStream = new Readable({
read: (size) => {},
});
const hashPromise = readableStreamHasher(mockHashCtor, readableStream);

const mockError = new Error("error");
setTimeout(() => {
mockHashCalculator.emit("error", mockError);
}, 100);

try {
await hashPromise;
fail(`should throw error ${mockError}`);
} catch (error) {
expect(error).toEqual(mockError);
}
});

it("throws error if hash.digest() throws error", async () => {
const readableStream = new Readable({
read: (size) => {},
});
const hashPromise = readableStreamHasher(mockHashCtor, readableStream);

setTimeout(() => {
readableStream.emit("end");
}, 100);

const mockError = new Error("error");
mockDigest.mockRejectedValue(mockError);

try {
await hashPromise;
fail(`should throw error ${mockError}`);
} catch (error) {
expect(error).toEqual(mockError);
expect(mockHashCalculatorEnd).toHaveBeenCalledTimes(1);
}
});
});
22 changes: 22 additions & 0 deletions packages/hash-stream-node/src/readableStreamHasher.ts
@@ -0,0 +1,22 @@
import { HashConstructor, StreamHasher } from "@aws-sdk/types";
import { Readable } from "stream";

import { HashCalculator } from "./HashCalculator";

export const readableStreamHasher: StreamHasher<Readable> = (hashCtor: HashConstructor, readableStream: Readable) => {
const hash = new hashCtor();
const hashCalculator = new HashCalculator(hash);
readableStream.pipe(hashCalculator);

return new Promise((resolve, reject) => {
readableStream.on("error", (err: Error) => {
// if the source errors, the destination stream needs to manually end
hashCalculator.end();
reject(err);
});
hashCalculator.on("error", reject);
hashCalculator.on("finish", () => {
hash.digest().then(resolve).catch(reject);
});
});
};

0 comments on commit 943dab0

Please sign in to comment.