Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch RPC requests #1300

Closed
wants to merge 15 commits into from
50 changes: 44 additions & 6 deletions packages/tendermint-rpc/src/rpcclients/httpclient.ts
Expand Up @@ -65,29 +65,67 @@ export interface HttpEndpoint {
readonly headers: Record<string, string>;
}

export interface HttpClientOptions {
dispatchInterval: number;
}

export const defaultHttpClientOptions: HttpClientOptions = { dispatchInterval: 0 };
webmaster128 marked this conversation as resolved.
Show resolved Hide resolved

export class HttpClient implements RpcClient {
webmaster128 marked this conversation as resolved.
Show resolved Hide resolved
protected readonly url: string;
protected readonly headers: Record<string, string> | undefined;
protected readonly options: HttpClientOptions;

public constructor(endpoint: string | HttpEndpoint) {
private stack: Array<{
webmaster128 marked this conversation as resolved.
Show resolved Hide resolved
request: JsonRpcRequest;
resolve: (a: JsonRpcSuccessResponse) => void;
reject: (a: Error) => void;
}> = [];

public constructor(endpoint: string | HttpEndpoint, options: HttpClientOptions = defaultHttpClientOptions) {
this.options = options;
if (typeof endpoint === "string") {
// accept host.name:port and assume http protocol
this.url = hasProtocol(endpoint) ? endpoint : "http://" + endpoint;
} else {
this.url = endpoint.url;
this.headers = endpoint.headers;
}
setInterval(() => this.tick(), options.dispatchInterval);
}

public disconnect(): void {
// nothing to be done
}

public async execute(request: JsonRpcRequest): Promise<JsonRpcSuccessResponse> {
const response = parseJsonRpcResponse(await http("POST", this.url, this.headers, request));
if (isJsonRpcErrorResponse(response)) {
throw new Error(JSON.stringify(response.error));
}
return response;
return new Promise((resolve, reject) => {
this.stack.push({ request, resolve, reject });
});
}

private async tick(): Promise<void> {
// Avoid race conditions
const stack = this.stack;
this.stack = [];
webmaster128 marked this conversation as resolved.
Show resolved Hide resolved

if (!stack.length) return;

const request = stack.map((s) => s.request);
const raw = await http("POST", this.url, this.headers, request);
// Requests with a single entry return as an object
const arr = Array.isArray(raw) ? raw : [raw];

arr.forEach((el) => {
const req = stack.find((s) => s.request.id === el.id);
if (!req) return;
const { reject, resolve } = req;
const response = parseJsonRpcResponse(el);
if (isJsonRpcErrorResponse(response)) {
reject(new Error(JSON.stringify(response.error)));
} else {
resolve(response);
}
});
}
}