From 55ca044b60309216ab4f01f2576a76060c1ae3db Mon Sep 17 00:00:00 2001 From: Simon Warta <2603011+webmaster128@users.noreply.github.com> Date: Mon, 24 Oct 2022 14:44:28 +0200 Subject: [PATCH] Batch RPC requests 2 (#1303) * WIP Batch RPC requests * Lint warnings * Match responses by ID * Review Comments * Lint * Update packages/tendermint-rpc/src/rpcclients/httpbatchclient.spec.ts Co-authored-by: Simon Warta <2603011+webmaster128@users.noreply.github.com> * Update packages/tendermint-rpc/src/rpcclients/httpbatchclient.ts Co-authored-by: Simon Warta <2603011+webmaster128@users.noreply.github.com> * Update packages/tendermint-rpc/src/rpcclients/httpbatchclient.ts Co-authored-by: Simon Warta <2603011+webmaster128@users.noreply.github.com> * review updates * lint * check falsy values * CHANGELOG & export * Update CHANGELOG.md Co-authored-by: Simon Warta <2603011+webmaster128@users.noreply.github.com> * Update CHANGELOG.md Co-authored-by: Simon Warta <2603011+webmaster128@users.noreply.github.com> * yarn format-text * Fix linter issues * Test batchSizeLimit for safe integer and improve error message * Move changelog to Unreleased section Co-authored-by: codehans <94654388+codehans@users.noreply.github.com> --- CHANGELOG.md | 7 ++ .../src/rpcclients/httpbatchclient.spec.ts | 91 ++++++++++++++++++ .../src/rpcclients/httpbatchclient.ts | 93 +++++++++++++++++++ .../tendermint-rpc/src/rpcclients/index.ts | 1 + 4 files changed, 192 insertions(+) create mode 100644 packages/tendermint-rpc/src/rpcclients/httpbatchclient.spec.ts create mode 100644 packages/tendermint-rpc/src/rpcclients/httpbatchclient.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 4fa478c2a2..3ba34fd9d8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,11 @@ and this project adheres to ## [Unreleased] +### Added + +- @cosmjs/tendermint-rpc: Add `HttpBatchClient`, which implements `RpcClient`, + supporting batch RPC requests ([#1300]). + ## [0.29.2] - 2022-10-13 ### Added @@ -19,6 +24,8 @@ and this project adheres to - @cosmjs/stargate: Add missing `{is,}MsgBeginRedelegateEncodeObject`, `{is,MsgCreateValidatorEncodeObject}` and `{is,MsgEditValidatorEncodeObject}`. +[#1300]: https://github.com/cosmos/cosmjs/pull/1300 + ### Fixed - @cosmjs/cosmwasm-stargate: Use type `JsonObject = any` for smart query diff --git a/packages/tendermint-rpc/src/rpcclients/httpbatchclient.spec.ts b/packages/tendermint-rpc/src/rpcclients/httpbatchclient.spec.ts new file mode 100644 index 0000000000..c03628340c --- /dev/null +++ b/packages/tendermint-rpc/src/rpcclients/httpbatchclient.spec.ts @@ -0,0 +1,91 @@ +/* eslint-disable @typescript-eslint/naming-convention */ +import { createJsonRpcRequest } from "../jsonrpc"; +import { defaultInstance } from "../testutil.spec"; +import { HttpBatchClient } from "./httpbatchclient"; +import { http } from "./httpclient"; + +function pendingWithoutTendermint(): void { + if (!process.env.TENDERMINT_ENABLED) { + pending("Set TENDERMINT_ENABLED to enable Tendermint RPC tests"); + } +} + +function pendingWithoutHttpServer(): void { + if (!process.env.HTTPSERVER_ENABLED) { + pending("Set HTTPSERVER_ENABLED to enable HTTP tests"); + } +} + +const tendermintUrl = defaultInstance.url; +const echoUrl = "http://localhost:5555/echo_headers"; + +describe("http", () => { + it("can send a health request", async () => { + pendingWithoutTendermint(); + const response = await http("POST", `http://${tendermintUrl}`, undefined, createJsonRpcRequest("health")); + expect(response).toEqual(jasmine.objectContaining({ jsonrpc: "2.0" })); + }); + + it("errors for non-open port", async () => { + await expectAsync( + http("POST", `http://localhost:56745`, undefined, createJsonRpcRequest("health")), + ).toBeRejectedWithError(/(ECONNREFUSED|Failed to fetch)/i); + }); + + it("can send custom headers", async () => { + pendingWithoutHttpServer(); + // Without custom headers + const response1 = await http("POST", echoUrl, undefined, createJsonRpcRequest("health")); + expect(response1).toEqual({ + request_headers: jasmine.objectContaining({ + // Basic headers from http client + Accept: jasmine.any(String), + "Content-Length": jasmine.any(String), + "Content-Type": "application/json", + Host: jasmine.any(String), + "User-Agent": jasmine.any(String), + }), + }); + + // With custom headers + const response2 = await http( + "POST", + echoUrl, + { foo: "bar123", Authorization: "Basic Z3Vlc3Q6bm9QYXNzMTIz" }, + createJsonRpcRequest("health"), + ); + expect(response2).toEqual({ + request_headers: jasmine.objectContaining({ + // Basic headers from http client + "Content-Length": jasmine.any(String), + "Content-Type": "application/json", + Host: jasmine.any(String), + "User-Agent": jasmine.any(String), + // Custom headers + foo: "bar123", + Authorization: "Basic Z3Vlc3Q6bm9QYXNzMTIz", + }), + }); + }); +}); + +describe("HttpBatchClient", () => { + it("can make a simple call", async () => { + pendingWithoutTendermint(); + const client = new HttpBatchClient(tendermintUrl); + + const healthResponse = await client.execute(createJsonRpcRequest("health")); + expect(healthResponse.result).toEqual({}); + + const statusResponse = await client.execute(createJsonRpcRequest("status")); + expect(statusResponse.result).toBeTruthy(); + expect(statusResponse.result.node_info).toBeTruthy(); + + await client + .execute(createJsonRpcRequest("no-such-method")) + .then(() => fail("must not resolve")) + .catch((error) => expect(error).toBeTruthy()); + + client.disconnect(); + }); +}); diff --git a/packages/tendermint-rpc/src/rpcclients/httpbatchclient.ts b/packages/tendermint-rpc/src/rpcclients/httpbatchclient.ts new file mode 100644 index 0000000000..1b616c3bb7 --- /dev/null +++ b/packages/tendermint-rpc/src/rpcclients/httpbatchclient.ts @@ -0,0 +1,93 @@ +import { + isJsonRpcErrorResponse, + JsonRpcRequest, + JsonRpcSuccessResponse, + parseJsonRpcResponse, +} from "@cosmjs/json-rpc"; + +import { http, HttpEndpoint } from "./httpclient"; +import { hasProtocol, RpcClient } from "./rpcclient"; + +export interface HttpBatchClientOptions { + dispatchInterval: number; + batchSizeLimit: number; +} + +export const defaultHttpBatchClientOptions: HttpBatchClientOptions = { + dispatchInterval: 20, + batchSizeLimit: 20, +}; + +export class HttpBatchClient implements RpcClient { + protected readonly url: string; + protected readonly headers: Record | undefined; + protected readonly options: HttpBatchClientOptions; + private timer?: NodeJS.Timer; + + private readonly queue: Array<{ + request: JsonRpcRequest; + resolve: (a: JsonRpcSuccessResponse) => void; + reject: (a: Error) => void; + }> = []; + + public constructor( + endpoint: string | HttpEndpoint, + options: HttpBatchClientOptions = defaultHttpBatchClientOptions, + ) { + this.options = options; + if (typeof endpoint === "string") { + // accept host.name:port and assume http protocol + this.url = hasProtocol(endpoint) ? endpoint : "http://" + endpoint; + } else { + this.url = endpoint.url; + this.headers = endpoint.headers; + } + this.timer = setInterval(() => this.tick(), options.dispatchInterval); + this.validate(); + } + + public disconnect(): void { + this.timer && clearInterval(this.timer); + this.timer = undefined; + } + + public async execute(request: JsonRpcRequest): Promise { + return new Promise((resolve, reject) => { + this.queue.push({ request, resolve, reject }); + }); + } + + private validate(): void { + if ( + !this.options.batchSizeLimit || + !Number.isSafeInteger(this.options.batchSizeLimit) || + this.options.batchSizeLimit < 1 + ) { + throw new Error("batchSizeLimit must be a safe integer >= 1"); + } + } + + private async tick(): Promise { + // Avoid race conditions + const queue = this.queue.splice(0, this.options.batchSizeLimit); + + if (!queue.length) return; + + const request = queue.map((s) => s.request); + const raw = await http("POST", this.url, this.headers, request); + // Requests with a single entry return as an object + const arr = Array.isArray(raw) ? raw : [raw]; + + arr.forEach((el) => { + const req = queue.find((s) => s.request.id === el.id); + if (!req) return; + const { reject, resolve } = req; + const response = parseJsonRpcResponse(el); + if (isJsonRpcErrorResponse(response)) { + reject(new Error(JSON.stringify(response.error))); + } else { + resolve(response); + } + }); + } +} diff --git a/packages/tendermint-rpc/src/rpcclients/index.ts b/packages/tendermint-rpc/src/rpcclients/index.ts index bff2953116..cf7c1a70ee 100644 --- a/packages/tendermint-rpc/src/rpcclients/index.ts +++ b/packages/tendermint-rpc/src/rpcclients/index.ts @@ -1,5 +1,6 @@ // This folder contains Tendermint-specific RPC clients +export { defaultHttpBatchClientOptions, HttpBatchClient } from "./httpbatchclient"; export { HttpClient, HttpEndpoint } from "./httpclient"; export { instanceOfRpcStreamingClient, RpcClient, RpcStreamingClient, SubscriptionEvent } from "./rpcclient"; export { WebsocketClient } from "./websocketclient";