Skip to content
This repository has been archived by the owner on Mar 31, 2024. It is now read-only.

Commit

Permalink
Add some collector classes for objects that get registered in a Colle…
Browse files Browse the repository at this point in the history
…ctorSet (elastic#19098) (elastic#19230)

* Add some collector classes for objects that get registered in a CollectorSet

* comment cleanup

* don't pass an inline-defined logger to collectorSet

* add a helper logger function so collector has access to logger at construction
  • Loading branch information
tsullivan committed May 18, 2018
1 parent f17b311 commit 11c17c6
Show file tree
Hide file tree
Showing 13 changed files with 190 additions and 111 deletions.
Expand Up @@ -7,6 +7,7 @@
import { identity, noop } from 'lodash';
import sinon from 'sinon';
import expect from 'expect.js';
import { Collector } from '../collector';
import { CollectorSet } from '../collector_set';

const DEBUG_LOG = [ 'debug', 'monitoring-ui', 'kibana-monitoring' ];
Expand All @@ -17,52 +18,74 @@ const CHECK_DELAY = 100; // can be lower than COLLECTOR_INTERVAL because the col

describe('CollectorSet', () => {
describe('registers a collector set and runs lifecycle events', () => {
let log;
let server;
let init;
let cleanup;
let fetch;
beforeEach(() => {
log = sinon.spy();
server = {
log: sinon.spy()
};
init = noop;
cleanup = noop;
fetch = noop;
});

it('should throw an error if non-Collector type of object is registered', () => {
const collectors = new CollectorSet(server, {
interval: COLLECTOR_INTERVAL,
combineTypes: identity,
onPayload: identity
});

const registerPojo = () => {
collectors.register({
type: 'type_collector_test',
fetchAfterInit: true,
init,
fetch,
cleanup
});
};

expect(registerPojo).to.throwException(({ message }) => {
expect(message).to.be('CollectorSet can only have Collector instances registered');
});
});

it('should skip bulk upload if payload is empty', (done) => {
const collectors = new CollectorSet({
const collectors = new CollectorSet(server, {
interval: COLLECTOR_INTERVAL,
logger: log,
combineTypes: identity,
onPayload: identity
});

collectors.register({
collectors.register(new Collector(server, {
type: 'type_collector_test',
fetchAfterInit: true,
init,
fetch,
cleanup
});
}));

collectors.start();

// allow interval to tick a few times
setTimeout(() => {
collectors.cleanup();

expect(log.calledWith(INFO_LOG, 'Starting all stats collectors')).to.be(true);
expect(log.calledWith(DEBUG_LOG, 'Initializing type_collector_test collector')).to.be(true);
expect(log.calledWith(DEBUG_LOG, 'Fetching data from type_collector_test collector')).to.be(true);
expect(log.calledWith(DEBUG_LOG, 'Skipping bulk uploading of an empty stats payload')).to.be(true); // proof of skip
expect(log.calledWith(INFO_LOG, 'Stopping all stats collectors')).to.be(true);
expect(log.calledWith(DEBUG_LOG, 'Running type_collector_test cleanup')).to.be(true);
expect(server.log.calledWith(INFO_LOG, 'Starting all stats collectors')).to.be(true);
expect(server.log.calledWith(DEBUG_LOG, 'Initializing type_collector_test collector')).to.be(true);
expect(server.log.calledWith(DEBUG_LOG, 'Fetching data from type_collector_test collector')).to.be(true);
expect(server.log.calledWith(DEBUG_LOG, 'Skipping bulk uploading of an empty stats payload')).to.be(true); // proof of skip
expect(server.log.calledWith(INFO_LOG, 'Stopping all stats collectors')).to.be(true);
expect(server.log.calledWith(DEBUG_LOG, 'Running type_collector_test cleanup')).to.be(true);

done(); // for async exit
}, CHECK_DELAY);
});

it('should run the bulk upload handler', (done) => {
const log = sinon.spy();
const combineTypes = sinon.spy(data => {
return [
data[0][0],
Expand All @@ -71,34 +94,33 @@ describe('CollectorSet', () => {
});
const onPayload = sinon.spy();

const collectors = new CollectorSet({
const collectors = new CollectorSet(server, {
interval: COLLECTOR_INTERVAL,
logger: log,
combineTypes,
onPayload
});

fetch = () => ({ testFetch: true });
collectors.register({
collectors.register(new Collector(server, {
type: 'type_collector_test',
fetchAfterInit: true,
init,
fetch,
cleanup
});
}));

collectors.start();

// allow interval to tick a few times
setTimeout(() => {
collectors.cleanup();

expect(log.calledWith(INFO_LOG, 'Starting all stats collectors')).to.be(true);
expect(log.calledWith(DEBUG_LOG, 'Initializing type_collector_test collector')).to.be(true);
expect(log.calledWith(DEBUG_LOG, 'Fetching data from type_collector_test collector')).to.be(true);
expect(log.calledWith(DEBUG_LOG, 'Uploading bulk stats payload to the local cluster')).to.be(true);
expect(log.calledWith(INFO_LOG, 'Stopping all stats collectors')).to.be(true);
expect(log.calledWith(DEBUG_LOG, 'Running type_collector_test cleanup')).to.be(true);
expect(server.log.calledWith(INFO_LOG, 'Starting all stats collectors')).to.be(true);
expect(server.log.calledWith(DEBUG_LOG, 'Initializing type_collector_test collector')).to.be(true);
expect(server.log.calledWith(DEBUG_LOG, 'Fetching data from type_collector_test collector')).to.be(true);
expect(server.log.calledWith(DEBUG_LOG, 'Uploading bulk stats payload to the local cluster')).to.be(true);
expect(server.log.calledWith(INFO_LOG, 'Stopping all stats collectors')).to.be(true);
expect(server.log.calledWith(DEBUG_LOG, 'Running type_collector_test cleanup')).to.be(true);

// un-flattened
expect(combineTypes.getCall(0).args[0]).to.eql(
Expand All @@ -115,29 +137,28 @@ describe('CollectorSet', () => {
});

it('should log the info-level status of stopping and restarting', (done) => {
const collectors = new CollectorSet({
const collectors = new CollectorSet(server, {
interval: COLLECTOR_INTERVAL,
logger: log,
combineTypes: identity,
onPayload: identity
});

collectors.register({
collectors.register(new Collector(server, {
type: 'type_collector_test',
fetchAfterInit: true,
init,
fetch,
cleanup
});
}));

collectors.start();
expect(log.calledWith(INFO_LOG, 'Starting all stats collectors')).to.be(true);
expect(server.log.calledWith(INFO_LOG, 'Starting all stats collectors')).to.be(true);

collectors.cleanup();
expect(log.calledWith(INFO_LOG, 'Stopping all stats collectors')).to.be(true);
expect(server.log.calledWith(INFO_LOG, 'Stopping all stats collectors')).to.be(true);

collectors.start();
expect(log.calledWith(INFO_LOG, 'Starting all stats collectors')).to.be(true);
expect(server.log.calledWith(INFO_LOG, 'Starting all stats collectors')).to.be(true);

// exit
collectors.cleanup();
Expand Down
@@ -0,0 +1,27 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { getCollectorLogger } from '../lib';

export class Collector {
/*
* @param {Object} server - server object
* @param {String} properties.type - property name as the key for the data
* @param {Function} properties.init (optional) - initialization function
* @param {Function} properties.fetch - function to query data
* @param {Function} properties.cleanup (optional) - cleanup function
* @param {Boolean} properties.fetchAfterInit (optional) - if collector should fetch immediately after init
*/
constructor(server, { type, init, fetch, cleanup, fetchAfterInit }) {
this.type = type;
this.init = init;
this.fetch = fetch;
this.cleanup = cleanup;
this.fetchAfterInit = fetchAfterInit;

this.log = getCollectorLogger(server);
}
}
Expand Up @@ -5,10 +5,10 @@
*/

import { flatten, isEmpty } from 'lodash';
import { LOGGING_TAG, KIBANA_MONITORING_LOGGING_TAG } from '../../../common/constants';
import Promise from 'bluebird';

const LOGGING_TAGS = [LOGGING_TAG, KIBANA_MONITORING_LOGGING_TAG];
import { getCollectorLogger } from '../lib';
import { Collector } from './collector';
import { UsageCollector } from './usage_collector';

/*
* A collector object has types registered into it with the register(type)
Expand All @@ -18,47 +18,41 @@ const LOGGING_TAGS = [LOGGING_TAG, KIBANA_MONITORING_LOGGING_TAG];
export class CollectorSet {

/*
* @param options.interval {Number} in milliseconds
* @param options.logger {Function}
* @param options.combineTypes {Function}
* @param options.onPayload {Function}
* @param {Object} server - server object
* @param {Number} options.interval - in milliseconds
* @param {Function} options.combineTypes
* @param {Function} options.onPayload
*/
constructor({ interval, logger, combineTypes, onPayload }) {
constructor(server, { interval, combineTypes, onPayload }) {
this._collectors = [];
this._timer = null;

if (typeof interval !== 'number') {
throw new Error('interval number of milliseconds is required');
}
if (typeof logger !== 'function') {
throw new Error('Logger function is required');
}
if (typeof combineTypes !== 'function') {
throw new Error('combineTypes function is required');
}
if (typeof onPayload !== 'function') {
throw new Error('onPayload function is required');
}

this._log = {
debug: message => logger(['debug', ...LOGGING_TAGS], message),
info: message => logger(['info', ...LOGGING_TAGS], message),
warn: message => logger(['warning', ...LOGGING_TAGS], message)
};
this._log = getCollectorLogger(server);

this._interval = interval;
this._combineTypes = combineTypes;
this._onPayload = onPayload;
}

/*
* @param {String} type.type
* @param {Function} type.init (optional)
* @param {Function} type.fetch
* @param {Function} type.cleanup (optional)
* @param collector {Collector} collector object
*/
register(type) {
this._collectors.push(type);
register(collector) {
// check instanceof
if (!(collector instanceof Collector)) {
throw new Error('CollectorSet can only have Collector instances registered');
}
this._collectors.push(collector);
}

/*
Expand All @@ -75,10 +69,7 @@ export class CollectorSet {
collector.init();
}

if (collector.setLogger) {
this._log.debug(`Setting logger for ${collector.type} collector`);
collector.setLogger(this._log);
}
this._log.debug(`Setting logger for ${collector.type} collector`);

if (collector.fetchAfterInit) {
initialCollectors.push(collector);
Expand Down Expand Up @@ -139,6 +130,19 @@ export class CollectorSet {
});
}

async bulkFetchUsage() {
const usageCollectors = this._collectors.filter(c => c instanceof UsageCollector);
const bulk = await this._bulkFetch(usageCollectors);

// summarize each type of stat
return bulk.reduce((accumulatedStats, currentStat) => {
return {
...accumulatedStats,
[currentStat.type]: currentStat.result,
};
}, {});
}

cleanup() {
this._log.info(`Stopping all stats collectors`);

Expand Down
@@ -0,0 +1,13 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { Collector } from './collector';

export class UsageCollector extends Collector {
constructor(server, properties) {
super(server, properties);
}
}
Expand Up @@ -22,7 +22,8 @@ describe('getKibanaUsageCollector', () => {
getCluster: sinon.stub()
}
},
config: () => ({ get: sinon.stub() })
config: () => ({ get: sinon.stub() }),
log: sinon.stub(),
};
serverStub.plugins.elasticsearch.getCluster.withArgs('admin').returns(clusterStub);
callClusterStub = callClusterFactory(serverStub).getCallClusterInternal();
Expand Down
Expand Up @@ -6,6 +6,7 @@

import { get, snakeCase } from 'lodash';
import { KIBANA_USAGE_TYPE } from '../../../common/constants';
import { UsageCollector } from '../classes/usage_collector';

const TYPES = [
'dashboard',
Expand All @@ -20,7 +21,7 @@ const TYPES = [
* Fetches saved object client counts by querying the saved object index
*/
export function getKibanaUsageCollector(server, callCluster) {
return {
return new UsageCollector(server, {
type: KIBANA_USAGE_TYPE,
async fetch() {
const index = server.config().get('kibana.index');
Expand Down Expand Up @@ -52,15 +53,13 @@ export function getKibanaUsageCollector(server, callCluster) {

return {
index,

// combine the bucketCounts and 0s for types that don't have documents
...TYPES.reduce((acc, type) => ({
...TYPES.reduce((acc, type) => ({ // combine the bucketCounts and 0s for types that don't have documents
...acc,
[snakeCase(type)]: {
total: bucketCounts[type] || 0
}
}), {})
};
}
};
});
}

0 comments on commit 11c17c6

Please sign in to comment.