diff --git a/lib/groupBy.js b/lib/groupBy.js new file mode 100644 index 000000000..31e2c0b75 --- /dev/null +++ b/lib/groupBy.js @@ -0,0 +1,40 @@ +import doLimit from './internal/doLimit'; +import groupByLimit from './groupByLimit'; + +/** + * Returns a new object, where each value corresponds to an array of items, from + * `coll`, that returned the corresponding key. That is, the keys of the object + * correspond to the values passed to the `iteratee` callback. + * + * Note: Since this function applies the `iteratee` to each item in parallel, + * there is no guarantee that the `iteratee` functions will complete in order. + * However, the values for each key in the `result` will be in the same order as + * the original `coll`. For Objects, the values will roughly be in the order of + * the original Objects' keys (but this can vary across JavaScript engines). + * + * @name groupBy + * @static + * @memberOf module:Collections + * @method + * @category Collection + * @param {Array|Iterable|Object} coll - A collection to iterate over. + * @param {Function} iteratee - A function to apply to each item in `coll`. + * The iteratee is passed a `callback(err, key)` which must be called once it + * has completed with an error (which can be `null`) and the `key` to group the + * value under. Invoked with (value, callback). + * @param {Function} [callback] - A callback which is called when all `iteratee` + * functions have finished, or an error occurs. Result is an `Object` whoses + * properties are arrays of values which returned the corresponding key. + * @example + * + * async.groupBy(['userId1', 'userId2', 'userId3'], function(userId, callback) { + * db.findById(userId, function(err, user) { + * if (err) return callback(err); + * return callback(null, user.age); + * }); + * }, function(err, result) { + * // result is object containing the userIds grouped by age + * // e.g. { 30: ['userId1', 'userId3'], 42: ['userId2']}; + * }); + */ +export default doLimit(groupByLimit, Infinity); diff --git a/lib/groupByLimit.js b/lib/groupByLimit.js new file mode 100644 index 000000000..9fc71a0c3 --- /dev/null +++ b/lib/groupByLimit.js @@ -0,0 +1,51 @@ +import noop from 'lodash/noop'; +import mapLimit from './mapLimit'; + +/** + * The same as [`groupBy`]{@link module:Collections.groupBy} but runs a maximum of `limit` async operations at a time. + * + * @name groupByLimit + * @static + * @memberOf module:Collections + * @method + * @see [async.groupBy]{@link module:Collections.groupBy} + * @category Collection + * @param {Array|Iterable|Object} coll - A collection to iterate over. + * @param {number} limit - The maximum number of async operations at a time. + * @param {Function} iteratee - A function to apply to each item in `coll`. + * The iteratee is passed a `callback(err, key)` which must be called once it + * has completed with an error (which can be `null`) and the `key` to group the + * value under. Invoked with (value, callback). + * @param {Function} [callback] - A callback which is called when all `iteratee` + * functions have finished, or an error occurs. Result is an `Object` whoses + * properties are arrays of values which returned the corresponding key. + */ +export default function(coll, limit, iteratee, callback) { + callback = callback || noop; + + mapLimit(coll, limit, function(val, callback) { + iteratee(val, function(err, key) { + if (err) return callback(err); + return callback(null, {key: key, val: val}); + }); + }, function(err, mapResults) { + var result = {}; + // from MDN, handle object having an `hasOwnProperty` prop + var hasOwnProperty = Object.prototype.hasOwnProperty; + + for (var i = 0; i < mapResults.length; i++) { + if (mapResults[i]) { + var key = mapResults[i].key; + var val = mapResults[i].val; + + if (hasOwnProperty.call(result, key)) { + result[key].push(val); + } else { + result[key] = [val]; + } + } + } + + return callback(err, result); + }); +}; diff --git a/lib/groupBySeries.js b/lib/groupBySeries.js new file mode 100644 index 000000000..5df9a3cbe --- /dev/null +++ b/lib/groupBySeries.js @@ -0,0 +1,23 @@ +import doLimit from './internal/doLimit'; +import groupByLimit from './groupByLimit'; + +/** + * The same as [`groupBy`]{@link module:Collections.groupBy} but runs only a single async operation at a time. + * + * @name groupBySeries + * @static + * @memberOf module:Collections + * @method + * @see [async.groupBy]{@link module:Collections.groupBy} + * @category Collection + * @param {Array|Iterable|Object} coll - A collection to iterate over. + * @param {number} limit - The maximum number of async operations at a time. + * @param {Function} iteratee - A function to apply to each item in `coll`. + * The iteratee is passed a `callback(err, key)` which must be called once it + * has completed with an error (which can be `null`) and the `key` to group the + * value under. Invoked with (value, callback). + * @param {Function} [callback] - A callback which is called when all `iteratee` + * functions have finished, or an error occurs. Result is an `Object` whoses + * properties are arrays of values which returned the corresponding key. + */ +export default doLimit(groupByLimit, 1); diff --git a/lib/index.js b/lib/index.js index 8a2c7cad9..0bb7efe19 100644 --- a/lib/index.js +++ b/lib/index.js @@ -54,6 +54,9 @@ import filter from './filter'; import filterLimit from './filterLimit'; import filterSeries from './filterSeries'; import forever from './forever'; +import groupBy from './groupBy'; +import groupByLimit from './groupByLimit'; +import groupBySeries from './groupBySeries'; import log from './log'; import map from './map'; import mapLimit from './mapLimit'; @@ -128,6 +131,9 @@ export default { filterLimit: filterLimit, filterSeries: filterSeries, forever: forever, + groupBy: groupBy, + groupByLimit: groupByLimit, + groupBySeries: groupBySeries, log: log, map: map, mapLimit: mapLimit, @@ -220,6 +226,9 @@ export { filterLimit as filterLimit, filterSeries as filterSeries, forever as forever, + groupBy as groupBy, + groupByLimit as groupByLimit, + groupBySeries as groupBySeries, log as log, map as map, mapLimit as mapLimit, diff --git a/lib/map.js b/lib/map.js index 4c0b22685..76d60582b 100644 --- a/lib/map.js +++ b/lib/map.js @@ -16,7 +16,7 @@ import map from './internal/map'; * * If `map` is passed an Object, the results will be an Array. The results * will roughly be in the order of the original Objects' keys (but this can - * vary across JavaScript engines) + * vary across JavaScript engines). * * @name map * @static diff --git a/mocha_test/groupBy.js b/mocha_test/groupBy.js new file mode 100644 index 000000000..afb612b0c --- /dev/null +++ b/mocha_test/groupBy.js @@ -0,0 +1,354 @@ +var async = require('../lib'); +var expect = require('chai').expect; +var assert = require('assert'); + +describe('groupBy', function() { + this.timeout(250); + + function groupByIteratee(callOrder, val, next) { + setTimeout(function() { + callOrder.push(val); + next(null, val+1); + }, val * 25); + } + + context('groupBy', function() { + it('basics', function(done) { + var callOrder = []; + async.groupBy([1, 3, 2], groupByIteratee.bind(this, callOrder), function(err, result) { + expect(err).to.eql(null); + expect(callOrder).to.eql([1, 2, 3]); + expect(result).to.eql({2: [1], 3: [2], 4: [3]}); + done(); + }); + }); + + it('error', function(done) { + async.groupBy([1, 3, 2], function(val, next) { + if (val === 3) { + return next(new Error('fail')); + } + next(null, val+1); + }, function(err, result) { + expect(err).to.not.eql(null); + expect(result).to.eql({2: [1]}); + done(); + }); + }); + + it('original untouched', function(done) { + var obj = {a: 'b', b: 'c', c: 'd'}; + async.groupBy(obj, function(val, next) { + next(null, val); + }, function(err, result) { + expect(obj).to.eql({a: 'b', b: 'c', c: 'd'}); + expect(result).to.eql({b: ['b'], c: ['c'], d: ['d']}); + done(); + }); + }); + + it('handles multiple matches', function(done) { + var callOrder = []; + async.groupBy([1, 3, 2, 2], groupByIteratee.bind(this, callOrder), function(err, result) { + expect(err).to.eql(null); + expect(callOrder).to.eql([1, 2, 2, 3]); + expect(result).to.eql({2: [1], 3: [2, 2], 4: [3]}); + done(); + }); + }); + + it('handles objects', function(done) { + var obj = {a: 'b', b: 'c', c: 'd'}; + var concurrency = {b: 3, c: 2, d: 1}; + var running = 0; + async.groupBy(obj, function(val, next) { + running++; + async.setImmediate(function() { + expect(running).to.equal(concurrency[val]); + running--; + next(null, val); + }); + }, function(err, result) { + expect(running).to.equal(0); + expect(err).to.eql(null); + expect(result).to.eql({b: ['b'], c: ['c'], d: ['d']}); + done(); + }); + }); + + it('handles undefined', function(done) { + async.groupBy(undefined, function(val, next) { + assert(false, 'iteratee should not be called'); + next(); + }, function(err, result) { + expect(err).to.eql(null); + expect(result).to.eql({}); + done(); + }); + }); + + it('handles empty object', function(done) { + async.groupBy({}, function(val, next) { + assert(false, 'iteratee should not be called'); + next(); + }, function(err, result) { + expect(err).to.eql(null); + expect(result).to.eql({}); + done(); + }); + }); + + it('main callback optional' , function(done) { + var arr = [1, 2, 3]; + var runs = []; + async.groupBy(arr, function(val, next) { + runs.push(val); + var _done = (runs.length === arr.length); + async.setImmediate(function() { + next(null); + if (_done) { + expect(runs).to.eql(arr); + done(); + } + }); + }); + }); + + it('iteratee callback is only called once', function(done) { + async.groupBy([1, 2], function(item, callback) { + try { + callback(item); + } catch (exception) { + expect(function() { + callback(exception); + }).to.throw(/already called/); + done(); + } + }, function() { + throw new Error(); + }); + }); + + it('handles Map', function(done) { + if (typeof Map !== 'function') { + return done(); + } + + var map = new Map([ + ['a', 'a'], + ['b', 'b'], + ['c', 'a'] + ]); + + async.groupBy(map, function(val, next) { + next(null, val[1]+1); + }, function(err, result) { + expect(err).to.eql(null); + expect(result).to.eql({ + a1: [ ['a', 'a'], ['c', 'a']], + b1: [ ['b', 'b'] ] + }); + done(); + }); + }); + + it('handles sparse results', function(done) { + var arr = [1, 2, 3]; + async.groupBy(arr, function(val, next) { + if (val === 1) { + return next(null, val+1); + } else if (val === 2) { + async.setImmediate(function() { + return next(null, val+1); + }); + } else { + return next('error'); + } + }, function(err, result) { + expect(err).to.not.eql(null); + expect(result).to.eql({2: [1]}); + async.setImmediate(done); + }); + }); + }); + + context('groupByLimit', function() { + var obj = {a: 'b', b: 'c', c: 'd'}; + it('basics', function(done) { + var running = 0; + var concurrency = {'b': 2, 'c': 2, 'd': 1}; + async.groupByLimit(obj, 2, function(val, next) { + running++; + async.setImmediate(function() { + expect(running).to.equal(concurrency[val]); + running--; + next(null, val); + }); + }, function(err, result) { + expect(running).to.equal(0); + expect(err).to.eql(null); + expect(result).to.eql({'b': ['b'], 'c': ['c'], 'd': ['d']}) + done(); + }); + }); + + it('error', function(done) { + async.groupByLimit(obj, 1, function(val, next) { + if (val === 'c') { + return next(new Error('fail')); + } + next(null, val); + }, function(err, result) { + expect(err).to.not.eql(null); + expect(result).to.eql({b: ['b']}); + done(); + }); + }); + + it('handles empty object', function(done) { + async.groupByLimit({}, 2, function(val, next) { + assert(false, 'iteratee should not be called'); + next(); + }, function(err, result) { + expect(err).to.eql(null); + expect(result).to.eql({}); + done(); + }); + }); + + it('handles undefined', function(done) { + async.groupByLimit(undefined, 2, function(val, next) { + assert(false, 'iteratee should not be called'); + next(); + }, function(err, result) { + expect(err).to.eql(null); + expect(result).to.eql({}); + done(); + }); + }); + + it('limit exceeds size', function(done) { + var callOrder = []; + async.groupByLimit([3, 2, 2, 1], 10, groupByIteratee.bind(this, callOrder), function(err, result) { + expect(err).to.eql(null); + expect(result).to.eql({2: [1], 3: [2, 2], 4: [3]}); + expect(callOrder).to.eql([1, 2, 2, 3]); + done(); + }); + }); + + it('limit equal size', function(done) { + var callOrder = []; + async.groupByLimit([3, 2, 2, 1], 4, groupByIteratee.bind(this, callOrder), function(err, result) { + expect(err).to.eql(null); + expect(result).to.eql({2: [1], 3: [2, 2], 4: [3]}); + expect(callOrder).to.eql([1, 2, 2, 3]); + done(); + }); + }); + + it('zero limit', function(done) { + async.groupByLimit([3, 2, 2, 1], 0, function(val, next) { + assert(false, 'iteratee should not be called'); + next(); + }, function(err, result) { + expect(err).to.eql(null); + expect(result).to.eql({}); + done(); + }); + }); + + it('does not continue replenishing after error', function(done) { + var started = 0; + var arr = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]; + var delay = 10; + var limit = 3; + var maxTime = 10 * arr.length; + + async.groupByLimit(arr, limit, function(val, next) { + started++; + if (started === 3) { + return next(new Error('fail')); + } + + setTimeout(function() { + next(); + }, delay); + }, function(err, result) { + expect(err).to.not.eql(null); + expect(result).to.eql({}); + }); + + setTimeout(function() { + expect(started).to.equal(3); + done(); + }, maxTime); + }); + }); + + context('groupBySeries', function() { + var obj = {a: 'b', b: 'c', c: 'd'}; + it('basics', function(done) { + var running = 0; + var concurrency = {'b': 1, 'c': 1, 'd': 1}; + async.groupBySeries(obj, function(val, next) { + running++; + async.setImmediate(function() { + expect(running).to.equal(concurrency[val]); + running--; + next(null, val); + }); + }, function(err, result) { + expect(running).to.equal(0); + expect(err).to.eql(null); + expect(result).to.eql({'b': ['b'], 'c': ['c'], 'd': ['d']}); + done(); + }); + }); + + it('error', function(done) { + async.groupBySeries(obj, function(val, next) { + if (val === 'c') { + return next(new Error('fail')); + } + next(null, val); + }, function(err, result) { + expect(err).to.not.eql(null); + expect(result).to.eql({b: ['b']}); + done(); + }); + }); + + it('handles arrays', function(done) { + async.groupBySeries(['a', 'a', 'b'], function(val, next) { + next(null, val); + }, function(err, result) { + expect(err).to.eql(null); + expect(result).to.eql({'a': ['a', 'a'], 'b': ['b']}); + done(); + }); + }); + + it('handles empty object', function(done) { + async.groupByLimit({}, 2, function(val, next) { + assert(false, 'iteratee should not be called'); + next(); + }, function(err, result) { + expect(err).to.eql(null); + expect(result).to.eql({}); + done(); + }); + }); + + it('handles undefined', function(done) { + async.groupBySeries(undefined, function(val, next) { + assert(false, 'iteratee should not be called'); + next(); + }, function(err, result) { + expect(err).to.eql(null); + expect(result).to.eql({}); + done(); + }); + }); + }); +});