Skip to content

Commit

Permalink
Batch RPC requests 2 (#1303)
Browse files Browse the repository at this point in the history
* WIP Batch RPC requests

* Lint warnings

* Match responses by ID

* Review Comments

* Lint

* Update packages/tendermint-rpc/src/rpcclients/httpbatchclient.spec.ts

Co-authored-by: Simon Warta <2603011+webmaster128@users.noreply.github.com>

* Update packages/tendermint-rpc/src/rpcclients/httpbatchclient.ts

Co-authored-by: Simon Warta <2603011+webmaster128@users.noreply.github.com>

* Update packages/tendermint-rpc/src/rpcclients/httpbatchclient.ts

Co-authored-by: Simon Warta <2603011+webmaster128@users.noreply.github.com>

* review updates

* lint

* check falsy values

* CHANGELOG & export

* Update CHANGELOG.md

Co-authored-by: Simon Warta <2603011+webmaster128@users.noreply.github.com>

* Update CHANGELOG.md

Co-authored-by: Simon Warta <2603011+webmaster128@users.noreply.github.com>

* yarn format-text

* Fix linter issues

* Test batchSizeLimit for safe integer and improve error message

* Move changelog to Unreleased section

Co-authored-by: codehans <94654388+codehans@users.noreply.github.com>
  • Loading branch information
webmaster128 and codehans committed Oct 24, 2022
1 parent bba7780 commit 55ca044
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 0 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Expand Up @@ -6,6 +6,11 @@ and this project adheres to

## [Unreleased]

### Added

- @cosmjs/tendermint-rpc: Add `HttpBatchClient`, which implements `RpcClient`,
supporting batch RPC requests ([#1300]).

## [0.29.2] - 2022-10-13

### Added
Expand All @@ -19,6 +24,8 @@ and this project adheres to
- @cosmjs/stargate: Add missing `{is,}MsgBeginRedelegateEncodeObject`,
`{is,MsgCreateValidatorEncodeObject}` and `{is,MsgEditValidatorEncodeObject}`.

[#1300]: https://github.com/cosmos/cosmjs/pull/1300

### Fixed

- @cosmjs/cosmwasm-stargate: Use type `JsonObject = any` for smart query
Expand Down
91 changes: 91 additions & 0 deletions packages/tendermint-rpc/src/rpcclients/httpbatchclient.spec.ts
@@ -0,0 +1,91 @@
/* eslint-disable @typescript-eslint/naming-convention */
import { createJsonRpcRequest } from "../jsonrpc";
import { defaultInstance } from "../testutil.spec";
import { HttpBatchClient } from "./httpbatchclient";
import { http } from "./httpclient";

function pendingWithoutTendermint(): void {
if (!process.env.TENDERMINT_ENABLED) {
pending("Set TENDERMINT_ENABLED to enable Tendermint RPC tests");
}
}

function pendingWithoutHttpServer(): void {
if (!process.env.HTTPSERVER_ENABLED) {
pending("Set HTTPSERVER_ENABLED to enable HTTP tests");
}
}

const tendermintUrl = defaultInstance.url;
const echoUrl = "http://localhost:5555/echo_headers";

describe("http", () => {
it("can send a health request", async () => {
pendingWithoutTendermint();
const response = await http("POST", `http://${tendermintUrl}`, undefined, createJsonRpcRequest("health"));
expect(response).toEqual(jasmine.objectContaining({ jsonrpc: "2.0" }));
});

it("errors for non-open port", async () => {
await expectAsync(
http("POST", `http://localhost:56745`, undefined, createJsonRpcRequest("health")),
).toBeRejectedWithError(/(ECONNREFUSED|Failed to fetch)/i);
});

it("can send custom headers", async () => {
pendingWithoutHttpServer();
// Without custom headers
const response1 = await http("POST", echoUrl, undefined, createJsonRpcRequest("health"));
expect(response1).toEqual({
request_headers: jasmine.objectContaining({
// Basic headers from http client
Accept: jasmine.any(String),
"Content-Length": jasmine.any(String),
"Content-Type": "application/json",
Host: jasmine.any(String),
"User-Agent": jasmine.any(String),
}),
});

// With custom headers
const response2 = await http(
"POST",
echoUrl,
{ foo: "bar123", Authorization: "Basic Z3Vlc3Q6bm9QYXNzMTIz" },
createJsonRpcRequest("health"),
);
expect(response2).toEqual({
request_headers: jasmine.objectContaining({
// Basic headers from http client
"Content-Length": jasmine.any(String),
"Content-Type": "application/json",
Host: jasmine.any(String),
"User-Agent": jasmine.any(String),
// Custom headers
foo: "bar123",
Authorization: "Basic Z3Vlc3Q6bm9QYXNzMTIz",
}),
});
});
});

describe("HttpBatchClient", () => {
it("can make a simple call", async () => {
pendingWithoutTendermint();
const client = new HttpBatchClient(tendermintUrl);

const healthResponse = await client.execute(createJsonRpcRequest("health"));
expect(healthResponse.result).toEqual({});

const statusResponse = await client.execute(createJsonRpcRequest("status"));
expect(statusResponse.result).toBeTruthy();
expect(statusResponse.result.node_info).toBeTruthy();

await client
.execute(createJsonRpcRequest("no-such-method"))
.then(() => fail("must not resolve"))
.catch((error) => expect(error).toBeTruthy());

client.disconnect();
});
});
93 changes: 93 additions & 0 deletions packages/tendermint-rpc/src/rpcclients/httpbatchclient.ts
@@ -0,0 +1,93 @@
import {
isJsonRpcErrorResponse,
JsonRpcRequest,
JsonRpcSuccessResponse,
parseJsonRpcResponse,
} from "@cosmjs/json-rpc";

import { http, HttpEndpoint } from "./httpclient";
import { hasProtocol, RpcClient } from "./rpcclient";

export interface HttpBatchClientOptions {
dispatchInterval: number;
batchSizeLimit: number;
}

export const defaultHttpBatchClientOptions: HttpBatchClientOptions = {
dispatchInterval: 20,
batchSizeLimit: 20,
};

export class HttpBatchClient implements RpcClient {
protected readonly url: string;
protected readonly headers: Record<string, string> | undefined;
protected readonly options: HttpBatchClientOptions;
private timer?: NodeJS.Timer;

private readonly queue: Array<{
request: JsonRpcRequest;
resolve: (a: JsonRpcSuccessResponse) => void;
reject: (a: Error) => void;
}> = [];

public constructor(
endpoint: string | HttpEndpoint,
options: HttpBatchClientOptions = defaultHttpBatchClientOptions,
) {
this.options = options;
if (typeof endpoint === "string") {
// accept host.name:port and assume http protocol
this.url = hasProtocol(endpoint) ? endpoint : "http://" + endpoint;
} else {
this.url = endpoint.url;
this.headers = endpoint.headers;
}
this.timer = setInterval(() => this.tick(), options.dispatchInterval);
this.validate();
}

public disconnect(): void {
this.timer && clearInterval(this.timer);
this.timer = undefined;
}

public async execute(request: JsonRpcRequest): Promise<JsonRpcSuccessResponse> {
return new Promise((resolve, reject) => {
this.queue.push({ request, resolve, reject });
});
}

private validate(): void {
if (
!this.options.batchSizeLimit ||
!Number.isSafeInteger(this.options.batchSizeLimit) ||
this.options.batchSizeLimit < 1
) {
throw new Error("batchSizeLimit must be a safe integer >= 1");
}
}

private async tick(): Promise<void> {
// Avoid race conditions
const queue = this.queue.splice(0, this.options.batchSizeLimit);

if (!queue.length) return;

const request = queue.map((s) => s.request);
const raw = await http("POST", this.url, this.headers, request);
// Requests with a single entry return as an object
const arr = Array.isArray(raw) ? raw : [raw];

arr.forEach((el) => {
const req = queue.find((s) => s.request.id === el.id);
if (!req) return;
const { reject, resolve } = req;
const response = parseJsonRpcResponse(el);
if (isJsonRpcErrorResponse(response)) {
reject(new Error(JSON.stringify(response.error)));
} else {
resolve(response);
}
});
}
}
1 change: 1 addition & 0 deletions packages/tendermint-rpc/src/rpcclients/index.ts
@@ -1,5 +1,6 @@
// This folder contains Tendermint-specific RPC clients

export { defaultHttpBatchClientOptions, HttpBatchClient } from "./httpbatchclient";
export { HttpClient, HttpEndpoint } from "./httpclient";
export { instanceOfRpcStreamingClient, RpcClient, RpcStreamingClient, SubscriptionEvent } from "./rpcclient";
export { WebsocketClient } from "./websocketclient";

0 comments on commit 55ca044

Please sign in to comment.