Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch RPC requests #1300

Closed
wants to merge 15 commits into from
91 changes: 91 additions & 0 deletions packages/tendermint-rpc/src/rpcclients/httpbatchclient.spec.ts
@@ -0,0 +1,91 @@
/* eslint-disable @typescript-eslint/naming-convention */
import { createJsonRpcRequest } from "../jsonrpc";
import { defaultInstance } from "../testutil.spec";
import { HttpBatchClient } from "./httpbatchclient";
import { http } from "./httpclient";

function pendingWithoutTendermint(): void {
if (!process.env.TENDERMINT_ENABLED) {
pending("Set TENDERMINT_ENABLED to enable Tendermint RPC tests");
}
}

function pendingWithoutHttpServer(): void {
if (!process.env.HTTPSERVER_ENABLED) {
pending("Set HTTPSERVER_ENABLED to enable HTTP tests");
}
}

const tendermintUrl = defaultInstance.url;
const echoUrl = "http://localhost:5555/echo_headers";

describe("http", () => {
it("can send a health request", async () => {
pendingWithoutTendermint();
const response = await http("POST", `http://${tendermintUrl}`, undefined, createJsonRpcRequest("health"));
expect(response).toEqual(jasmine.objectContaining({ jsonrpc: "2.0" }));
});

it("errors for non-open port", async () => {
await expectAsync(
http("POST", `http://localhost:56745`, undefined, createJsonRpcRequest("health")),
).toBeRejectedWithError(/(ECONNREFUSED|Failed to fetch)/i);
});

it("can send custom headers", async () => {
pendingWithoutHttpServer();
// Without custom headers
const response1 = await http("POST", echoUrl, undefined, createJsonRpcRequest("health"));
expect(response1).toEqual({
request_headers: jasmine.objectContaining({
// Basic headers from http client
Accept: jasmine.any(String),
"Content-Length": jasmine.any(String),
"Content-Type": "application/json",
Host: jasmine.any(String),
"User-Agent": jasmine.any(String),
}),
});

// With custom headers
const response2 = await http(
"POST",
echoUrl,
{ foo: "bar123", Authorization: "Basic Z3Vlc3Q6bm9QYXNzMTIz" },
createJsonRpcRequest("health"),
);
expect(response2).toEqual({
request_headers: jasmine.objectContaining({
// Basic headers from http client
"Content-Length": jasmine.any(String),
"Content-Type": "application/json",
Host: jasmine.any(String),
"User-Agent": jasmine.any(String),
// Custom headers
foo: "bar123",
Authorization: "Basic Z3Vlc3Q6bm9QYXNzMTIz",
}),
});
});
});

describe("HttpBatchClient", () => {
it("can make a simple call", async () => {
pendingWithoutTendermint();
const client = new HttpBatchClient(tendermintUrl);

const healthResponse = await client.execute(createJsonRpcRequest("health"));
expect(healthResponse.result).toEqual({});

const statusResponse = await client.execute(createJsonRpcRequest("status"));
expect(statusResponse.result).toBeTruthy();
expect(statusResponse.result.node_info).toBeTruthy();

await client
.execute(createJsonRpcRequest("no-such-method"))
.then(() => fail("must not resolve"))
.catch((error) => expect(error).toBeTruthy());

client.disconnect();
});
});
87 changes: 87 additions & 0 deletions packages/tendermint-rpc/src/rpcclients/httpbatchclient.ts
@@ -0,0 +1,87 @@
import {
isJsonRpcErrorResponse,
JsonRpcRequest,
JsonRpcSuccessResponse,
parseJsonRpcResponse,
} from "@cosmjs/json-rpc";

import { http, HttpEndpoint } from "./httpclient";
import { hasProtocol, RpcClient } from "./rpcclient";

export interface HttpBatchClientOptions {
dispatchInterval: number;
batchSizeLimit: number;
}

export const defaultHttpBatchClientOptions: HttpBatchClientOptions = {
dispatchInterval: 20,
batchSizeLimit: 20,
};

export class HttpBatchClient implements RpcClient {
protected readonly url: string;
protected readonly headers: Record<string, string> | undefined;
protected readonly options: HttpBatchClientOptions;
private timer?: NodeJS.Timer;

private readonly queue: Array<{
request: JsonRpcRequest;
resolve: (a: JsonRpcSuccessResponse) => void;
reject: (a: Error) => void;
}> = [];

public constructor(
endpoint: string | HttpEndpoint,
options: HttpBatchClientOptions = defaultHttpBatchClientOptions,
) {
this.options = options;
if (typeof endpoint === "string") {
// accept host.name:port and assume http protocol
this.url = hasProtocol(endpoint) ? endpoint : "http://" + endpoint;
} else {
this.url = endpoint.url;
this.headers = endpoint.headers;
}
this.timer = setInterval(() => this.tick(), options.dispatchInterval);
this.validate();
}

public disconnect(): void {
this.timer && clearInterval(this.timer);
this.timer = undefined;
}
webmaster128 marked this conversation as resolved.
Show resolved Hide resolved

public async execute(request: JsonRpcRequest): Promise<JsonRpcSuccessResponse> {
return new Promise((resolve, reject) => {
this.queue.push({ request, resolve, reject });
});
}

private validate(): void {
if (this.options.batchSizeLimit < 1) throw new Error("batchSizeLimit < 1");
webmaster128 marked this conversation as resolved.
Show resolved Hide resolved
}

private async tick(): Promise<void> {
// Avoid race conditions
const queue = this.queue.splice(0, this.options.batchSizeLimit);
webmaster128 marked this conversation as resolved.
Show resolved Hide resolved

if (!queue.length) return;

const request = queue.map((s) => s.request);
const raw = await http("POST", this.url, this.headers, request);
// Requests with a single entry return as an object
const arr = Array.isArray(raw) ? raw : [raw];

arr.forEach((el) => {
const req = queue.find((s) => s.request.id === el.id);
if (!req) return;
const { reject, resolve } = req;
const response = parseJsonRpcResponse(el);
if (isJsonRpcErrorResponse(response)) {
reject(new Error(JSON.stringify(response.error)));
} else {
resolve(response);
}
});
}
}