This repository has been archived by the owner on Mar 31, 2024. It is now read-only.
forked from elastic/kibana
-
Notifications
You must be signed in to change notification settings - Fork 3
/
collector_set.js
159 lines (138 loc) · 4.56 KB
/
collector_set.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { flatten, isEmpty } from 'lodash';
import Promise from 'bluebird';
import { getCollectorLogger } from '../lib';
import { Collector } from './collector';
import { UsageCollector } from './usage_collector';
/*
* A collector object has types registered into it with the register(type)
* function. Each type that gets registered defines how to fetch its own data
* and combine it into a unified payload for bulk upload.
*/
export class CollectorSet {
/*
* @param {Object} server - server object
* @param {Number} options.interval - in milliseconds
* @param {Function} options.combineTypes
* @param {Function} options.onPayload
*/
constructor(server, { interval, combineTypes, onPayload }) {
this._collectors = [];
this._timer = null;
if (typeof interval !== 'number') {
throw new Error('interval number of milliseconds is required');
}
if (typeof combineTypes !== 'function') {
throw new Error('combineTypes function is required');
}
if (typeof onPayload !== 'function') {
throw new Error('onPayload function is required');
}
this._log = getCollectorLogger(server);
this._interval = interval;
this._combineTypes = combineTypes;
this._onPayload = onPayload;
}
/*
* @param collector {Collector} collector object
*/
register(collector) {
// check instanceof
if (!(collector instanceof Collector)) {
throw new Error('CollectorSet can only have Collector instances registered');
}
this._collectors.push(collector);
}
/*
* Call all the init methods
* if fetchAfterInit is true, fetch and collect immediately
*/
start() {
const initialCollectors = [];
this._log.info(`Starting all stats collectors`);
this._collectors.forEach(collector => {
if (collector.init) {
this._log.debug(`Initializing ${collector.type} collector`);
collector.init();
}
this._log.debug(`Setting logger for ${collector.type} collector`);
if (collector.fetchAfterInit) {
initialCollectors.push(collector);
}
});
// do some fetches and bulk collect
if (initialCollectors.length > 0) {
this._fetchAndUpload(initialCollectors);
}
this._timer = setInterval(() => {
this._fetchAndUpload(this._collectors);
}, this._interval);
}
async _fetchAndUpload(collectors) {
const data = await this._bulkFetch(collectors);
const usableData = data.filter(d => Boolean(d) && !isEmpty(d.result));
const payload = usableData.map(({ result, type }) => {
if (!isEmpty(result)) {
return [
{ index: { _type: type } },
result
];
}
});
if (payload.length > 0) {
try {
const combinedData = this._combineTypes(payload); // use the collector types combiner
this._log.debug(`Uploading bulk stats payload to the local cluster`);
this._onPayload(flatten(combinedData));
} catch(err) {
this._log.warn(err);
this._log.warn(`Unable to bulk upload the stats payload to the local cluster`);
}
} else {
this._log.debug(`Skipping bulk uploading of an empty stats payload`);
}
}
/*
* Call a bunch of fetch methods and then do them in bulk
*/
_bulkFetch(collectors) {
return Promise.map(collectors, collector => {
const collectorType = collector.type;
this._log.debug(`Fetching data from ${collectorType} collector`);
return Promise.props({
type: collectorType,
result: collector.fetch()
})
.catch(err => {
this._log.warn(err);
this._log.warn(`Unable to fetch data from ${collectorType} collector`);
});
});
}
async bulkFetchUsage() {
const usageCollectors = this._collectors.filter(c => c instanceof UsageCollector);
const bulk = await this._bulkFetch(usageCollectors);
// summarize each type of stat
return bulk.reduce((accumulatedStats, currentStat) => {
return {
...accumulatedStats,
[currentStat.type]: currentStat.result,
};
}, {});
}
cleanup() {
this._log.info(`Stopping all stats collectors`);
// stop fetching
clearInterval(this._timer);
this._collectors.forEach(collector => {
if (collector.cleanup) {
this._log.debug(`Running ${collector.type} cleanup`);
collector.cleanup();
}
});
}
}