Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch RPC requests 2 #1303

Merged
merged 18 commits into from Oct 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Expand Up @@ -6,6 +6,11 @@ and this project adheres to

## [Unreleased]

### Added

- @cosmjs/tendermint-rpc: Add `HttpBatchClient`, which implements `RpcClient`,
supporting batch RPC requests ([#1300]).

## [0.29.2] - 2022-10-13

### Added
Expand All @@ -19,6 +24,8 @@ and this project adheres to
- @cosmjs/stargate: Add missing `{is,}MsgBeginRedelegateEncodeObject`,
`{is,MsgCreateValidatorEncodeObject}` and `{is,MsgEditValidatorEncodeObject}`.

[#1300]: https://github.com/cosmos/cosmjs/pull/1300

### Fixed

- @cosmjs/cosmwasm-stargate: Use type `JsonObject = any` for smart query
Expand Down
91 changes: 91 additions & 0 deletions packages/tendermint-rpc/src/rpcclients/httpbatchclient.spec.ts
@@ -0,0 +1,91 @@
/* eslint-disable @typescript-eslint/naming-convention */
import { createJsonRpcRequest } from "../jsonrpc";
import { defaultInstance } from "../testutil.spec";
import { HttpBatchClient } from "./httpbatchclient";
import { http } from "./httpclient";

function pendingWithoutTendermint(): void {
if (!process.env.TENDERMINT_ENABLED) {
pending("Set TENDERMINT_ENABLED to enable Tendermint RPC tests");
}
}

function pendingWithoutHttpServer(): void {
if (!process.env.HTTPSERVER_ENABLED) {
pending("Set HTTPSERVER_ENABLED to enable HTTP tests");
}
}

const tendermintUrl = defaultInstance.url;
const echoUrl = "http://localhost:5555/echo_headers";

describe("http", () => {
it("can send a health request", async () => {
pendingWithoutTendermint();
const response = await http("POST", `http://${tendermintUrl}`, undefined, createJsonRpcRequest("health"));
expect(response).toEqual(jasmine.objectContaining({ jsonrpc: "2.0" }));
});

it("errors for non-open port", async () => {
await expectAsync(
http("POST", `http://localhost:56745`, undefined, createJsonRpcRequest("health")),
).toBeRejectedWithError(/(ECONNREFUSED|Failed to fetch)/i);
});

it("can send custom headers", async () => {
pendingWithoutHttpServer();
// Without custom headers
const response1 = await http("POST", echoUrl, undefined, createJsonRpcRequest("health"));
expect(response1).toEqual({
request_headers: jasmine.objectContaining({
// Basic headers from http client
Accept: jasmine.any(String),
"Content-Length": jasmine.any(String),
"Content-Type": "application/json",
Host: jasmine.any(String),
"User-Agent": jasmine.any(String),
}),
});

// With custom headers
const response2 = await http(
"POST",
echoUrl,
{ foo: "bar123", Authorization: "Basic Z3Vlc3Q6bm9QYXNzMTIz" },
createJsonRpcRequest("health"),
);
expect(response2).toEqual({
request_headers: jasmine.objectContaining({
// Basic headers from http client
"Content-Length": jasmine.any(String),
"Content-Type": "application/json",
Host: jasmine.any(String),
"User-Agent": jasmine.any(String),
// Custom headers
foo: "bar123",
Authorization: "Basic Z3Vlc3Q6bm9QYXNzMTIz",
}),
});
});
});

describe("HttpBatchClient", () => {
it("can make a simple call", async () => {
pendingWithoutTendermint();
const client = new HttpBatchClient(tendermintUrl);

const healthResponse = await client.execute(createJsonRpcRequest("health"));
expect(healthResponse.result).toEqual({});

const statusResponse = await client.execute(createJsonRpcRequest("status"));
expect(statusResponse.result).toBeTruthy();
expect(statusResponse.result.node_info).toBeTruthy();

await client
.execute(createJsonRpcRequest("no-such-method"))
.then(() => fail("must not resolve"))
.catch((error) => expect(error).toBeTruthy());

client.disconnect();
});
});
93 changes: 93 additions & 0 deletions packages/tendermint-rpc/src/rpcclients/httpbatchclient.ts
@@ -0,0 +1,93 @@
import {
isJsonRpcErrorResponse,
JsonRpcRequest,
JsonRpcSuccessResponse,
parseJsonRpcResponse,
} from "@cosmjs/json-rpc";

import { http, HttpEndpoint } from "./httpclient";
import { hasProtocol, RpcClient } from "./rpcclient";

export interface HttpBatchClientOptions {
dispatchInterval: number;
batchSizeLimit: number;
}

export const defaultHttpBatchClientOptions: HttpBatchClientOptions = {
dispatchInterval: 20,
batchSizeLimit: 20,
};

export class HttpBatchClient implements RpcClient {
protected readonly url: string;
protected readonly headers: Record<string, string> | undefined;
protected readonly options: HttpBatchClientOptions;
private timer?: NodeJS.Timer;

private readonly queue: Array<{
request: JsonRpcRequest;
resolve: (a: JsonRpcSuccessResponse) => void;
reject: (a: Error) => void;
}> = [];

public constructor(
endpoint: string | HttpEndpoint,
options: HttpBatchClientOptions = defaultHttpBatchClientOptions,
) {
this.options = options;
if (typeof endpoint === "string") {
// accept host.name:port and assume http protocol
this.url = hasProtocol(endpoint) ? endpoint : "http://" + endpoint;
} else {
this.url = endpoint.url;
this.headers = endpoint.headers;
}
this.timer = setInterval(() => this.tick(), options.dispatchInterval);
this.validate();
}

public disconnect(): void {
this.timer && clearInterval(this.timer);
this.timer = undefined;
}

public async execute(request: JsonRpcRequest): Promise<JsonRpcSuccessResponse> {
return new Promise((resolve, reject) => {
this.queue.push({ request, resolve, reject });
});
}

private validate(): void {
if (
!this.options.batchSizeLimit ||
!Number.isSafeInteger(this.options.batchSizeLimit) ||
this.options.batchSizeLimit < 1
) {
throw new Error("batchSizeLimit must be a safe integer >= 1");
}
}

private async tick(): Promise<void> {
// Avoid race conditions
const queue = this.queue.splice(0, this.options.batchSizeLimit);

if (!queue.length) return;

const request = queue.map((s) => s.request);
const raw = await http("POST", this.url, this.headers, request);
// Requests with a single entry return as an object
const arr = Array.isArray(raw) ? raw : [raw];

arr.forEach((el) => {
const req = queue.find((s) => s.request.id === el.id);
if (!req) return;
const { reject, resolve } = req;
const response = parseJsonRpcResponse(el);
if (isJsonRpcErrorResponse(response)) {
reject(new Error(JSON.stringify(response.error)));
} else {
resolve(response);
}
});
}
}
1 change: 1 addition & 0 deletions packages/tendermint-rpc/src/rpcclients/index.ts
@@ -1,5 +1,6 @@
// This folder contains Tendermint-specific RPC clients

export { defaultHttpBatchClientOptions, HttpBatchClient } from "./httpbatchclient";
export { HttpClient, HttpEndpoint } from "./httpclient";
export { instanceOfRpcStreamingClient, RpcClient, RpcStreamingClient, SubscriptionEvent } from "./rpcclient";
export { WebsocketClient } from "./websocketclient";