forked from cosmos/cosmjs
-
Notifications
You must be signed in to change notification settings - Fork 0
/
httpbatchclient.ts
88 lines (75 loc) · 2.53 KB
/
httpbatchclient.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import {
isJsonRpcErrorResponse,
JsonRpcRequest,
JsonRpcSuccessResponse,
parseJsonRpcResponse,
} from "@cosmjs/json-rpc";
import { http, HttpEndpoint } from "./httpclient";
import { hasProtocol, RpcClient } from "./rpcclient";
export interface HttpBatchClientOptions {
dispatchInterval: number;
batchSizeLimit: number;
}
export const defaultHttpBatchClientOptions: HttpBatchClientOptions = {
dispatchInterval: 20,
batchSizeLimit: 20,
};
export class HttpBatchClient implements RpcClient {
protected readonly url: string;
protected readonly headers: Record<string, string> | undefined;
protected readonly options: HttpBatchClientOptions;
private timer?: NodeJS.Timer;
private readonly queue: Array<{
request: JsonRpcRequest;
resolve: (a: JsonRpcSuccessResponse) => void;
reject: (a: Error) => void;
}> = [];
public constructor(
endpoint: string | HttpEndpoint,
options: HttpBatchClientOptions = defaultHttpBatchClientOptions,
) {
this.options = options;
if (typeof endpoint === "string") {
// accept host.name:port and assume http protocol
this.url = hasProtocol(endpoint) ? endpoint : "http://" + endpoint;
} else {
this.url = endpoint.url;
this.headers = endpoint.headers;
}
this.timer = setInterval(() => this.tick(), options.dispatchInterval);
this.validate();
}
public disconnect(): void {
this.timer && clearInterval(this.timer);
this.timer = undefined;
}
public async execute(request: JsonRpcRequest): Promise<JsonRpcSuccessResponse> {
return new Promise((resolve, reject) => {
this.queue.push({ request, resolve, reject });
});
}
private validate(): void {
if (!this.options.batchSizeLimit || this.options.batchSizeLimit < 1)
throw new Error("batchSizeLimit < 1");
}
private async tick(): Promise<void> {
// Avoid race conditions
const queue = this.queue.splice(0, this.options.batchSizeLimit);
if (!queue.length) return;
const request = queue.map((s) => s.request);
const raw = await http("POST", this.url, this.headers, request);
// Requests with a single entry return as an object
const arr = Array.isArray(raw) ? raw : [raw];
arr.forEach((el) => {
const req = queue.find((s) => s.request.id === el.id);
if (!req) return;
const { reject, resolve } = req;
const response = parseJsonRpcResponse(el);
if (isJsonRpcErrorResponse(response)) {
reject(new Error(JSON.stringify(response.error)));
} else {
resolve(response);
}
});
}
}