From bf88ab8ebf3964864cc4f2f45f5b3dbb7b0043b0 Mon Sep 17 00:00:00 2001 From: Hubert Argasinski Date: Mon, 6 Feb 2017 01:45:41 -0500 Subject: [PATCH 1/2] initial groupBy implementation --- lib/groupBy.js | 37 +++++ lib/groupByLimit.js | 44 +++++ lib/groupBySeries.js | 23 +++ lib/index.js | 9 + mocha_test/groupBy.js | 375 ++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 488 insertions(+) create mode 100644 lib/groupBy.js create mode 100644 lib/groupByLimit.js create mode 100644 lib/groupBySeries.js create mode 100644 mocha_test/groupBy.js diff --git a/lib/groupBy.js b/lib/groupBy.js new file mode 100644 index 000000000..e1a036781 --- /dev/null +++ b/lib/groupBy.js @@ -0,0 +1,37 @@ +import doLimit from './internal/doLimit'; +import groupByLimit from './groupByLimit'; + +/** + * Returns a new object, where each value corresponds to an array of items, from + * `coll`, that returned the corresponding key. That is, the keys of the object + * correspond to the values passed to the `iteratee` callback. Note: Since this + * function applies the `iteratee` to each item in parallel, there is no + * guarantee that the `iteratee` functions will complete in order and there is + * no guarantee that grouped items will appear in the same order as in `coll`. + * + * @name groupBy + * @static + * @memberOf module:Collections + * @method + * @category Collection + * @param {Array|Iterable|Object} coll - A collection to iterate over. + * @param {Function} iteratee - A function to apply to each item in `coll`. + * The iteratee is passed a `callback(err, key)` which must be called once it + * has completed with an error (which can be `null`) and the `key` to group the + * value under. Invoked with (value, callback). + * @param {Function} [callback] - A callback which is called when all `iteratee` + * functions have finished, or an error occurs. Result is an `Object` whoses + * properties are arrays of values which returned the corresponding key. + * @example + * + * async.groupBy(['userId1', 'userId2', 'userId3'], function(userId, callback) { + * db.findById(userId, function(err, user) { + * if (err) return callback(err); + * return callback(null, user.age); + * }); + * }, function(err, result) { + * // result is object containing the userIds grouped by age + * // e.g. { 30: ['userId1', 'userId3'], 42: ['userId2']}; + * }); + */ +export default doLimit(groupByLimit, Infinity); diff --git a/lib/groupByLimit.js b/lib/groupByLimit.js new file mode 100644 index 000000000..c9130fb9a --- /dev/null +++ b/lib/groupByLimit.js @@ -0,0 +1,44 @@ +import noop from 'lodash/noop'; +import doParallelLimit from './internal/doParallelLimit'; + +/** + * The same as [`groupBy`]{@link module:Collections.groupBy} but runs a maximum of `limit` async operations at a time. + * + * @name groupByLimit + * @static + * @memberOf module:Collections + * @method + * @see [async.groupBy]{@link module:Collections.groupBy} + * @category Collection + * @param {Array|Iterable|Object} coll - A collection to iterate over. + * @param {number} limit - The maximum number of async operations at a time. + * @param {Function} iteratee - A function to apply to each item in `coll`. + * The iteratee is passed a `callback(err, key)` which must be called once it + * has completed with an error (which can be `null`) and the `key` to group the + * value under. Invoked with (value, callback). + * @param {Function} [callback] - A callback which is called when all `iteratee` + * functions have finished, or an error occurs. Result is an `Object` whoses + * properties are arrays of values which returned the corresponding key. + */ +export default doParallelLimit(function(eachFn, coll, iteratee, callback) { + callback = callback || noop; + coll = coll || []; + var result = {}; + // from MDN, handle object having an `hasOwnProperty` prop + var hasOwnProperty = Object.prototype.hasOwnProperty; + + eachFn(coll, function(val, _, callback) { + iteratee(val, function(err, key) { + if (err) return callback(err); + + if (hasOwnProperty.call(result, key)) { + result[key].push(val); + } else { + result[key] = [val]; + } + callback(null); + }); + }, function(err) { + callback(err, result); + }); +}); diff --git a/lib/groupBySeries.js b/lib/groupBySeries.js new file mode 100644 index 000000000..5df9a3cbe --- /dev/null +++ b/lib/groupBySeries.js @@ -0,0 +1,23 @@ +import doLimit from './internal/doLimit'; +import groupByLimit from './groupByLimit'; + +/** + * The same as [`groupBy`]{@link module:Collections.groupBy} but runs only a single async operation at a time. + * + * @name groupBySeries + * @static + * @memberOf module:Collections + * @method + * @see [async.groupBy]{@link module:Collections.groupBy} + * @category Collection + * @param {Array|Iterable|Object} coll - A collection to iterate over. + * @param {number} limit - The maximum number of async operations at a time. + * @param {Function} iteratee - A function to apply to each item in `coll`. + * The iteratee is passed a `callback(err, key)` which must be called once it + * has completed with an error (which can be `null`) and the `key` to group the + * value under. Invoked with (value, callback). + * @param {Function} [callback] - A callback which is called when all `iteratee` + * functions have finished, or an error occurs. Result is an `Object` whoses + * properties are arrays of values which returned the corresponding key. + */ +export default doLimit(groupByLimit, 1); diff --git a/lib/index.js b/lib/index.js index 8a2c7cad9..0bb7efe19 100644 --- a/lib/index.js +++ b/lib/index.js @@ -54,6 +54,9 @@ import filter from './filter'; import filterLimit from './filterLimit'; import filterSeries from './filterSeries'; import forever from './forever'; +import groupBy from './groupBy'; +import groupByLimit from './groupByLimit'; +import groupBySeries from './groupBySeries'; import log from './log'; import map from './map'; import mapLimit from './mapLimit'; @@ -128,6 +131,9 @@ export default { filterLimit: filterLimit, filterSeries: filterSeries, forever: forever, + groupBy: groupBy, + groupByLimit: groupByLimit, + groupBySeries: groupBySeries, log: log, map: map, mapLimit: mapLimit, @@ -220,6 +226,9 @@ export { filterLimit as filterLimit, filterSeries as filterSeries, forever as forever, + groupBy as groupBy, + groupByLimit as groupByLimit, + groupBySeries as groupBySeries, log as log, map as map, mapLimit as mapLimit, diff --git a/mocha_test/groupBy.js b/mocha_test/groupBy.js new file mode 100644 index 000000000..a894626d8 --- /dev/null +++ b/mocha_test/groupBy.js @@ -0,0 +1,375 @@ +var async = require('../lib'); +var expect = require('chai').expect; +var assert = require('assert'); + +describe('groupBy', function() { + this.timeout(250); + + function groupByIteratee(callOrder, val, next) { + setTimeout(function() { + callOrder.push(val); + next(null, val+1); + }, val * 25); + } + + context('groupBy', function() { + it('basics', function(done) { + var callOrder = []; + async.groupBy([1, 3, 2], groupByIteratee.bind(this, callOrder), function(err, result) { + expect(err).to.eql(null); + expect(callOrder).to.eql([1, 2, 3]); + expect(result).to.eql({2: [1], 3: [2], 4: [3]}); + done(); + }); + }); + + it('error', function(done) { + async.groupBy([1, 3, 2], function(val, next) { + if (val === 3) { + return next(new Error('fail')); + } + next(null, val+1); + }, function(err, result) { + expect(err).to.not.eql(null); + expect(result).to.eql({2: [1]}); + done(); + }); + }); + + it('original untouched', function(done) { + var obj = {a: 'b', b: 'c', c: 'd'}; + async.groupBy(obj, function(val, next) { + next(null, val); + }, function(err, result) { + expect(obj).to.eql({a: 'b', b: 'c', c: 'd'}); + expect(result).to.eql({b: ['b'], c: ['c'], d: ['d']}); + done(); + }); + }); + + it('handles multiple matches', function(done) { + var callOrder = []; + async.groupBy([1, 3, 2, 2], groupByIteratee.bind(this, callOrder), function(err, result) { + expect(err).to.eql(null); + expect(callOrder).to.eql([1, 2, 2, 3]); + expect(result).to.eql({2: [1], 3: [2, 2], 4: [3]}); + done(); + }); + }); + + it('handles objects', function(done) { + var obj = {a: 'b', b: 'c', c: 'd'}; + var concurrency = {b: 3, c: 2, d: 1}; + var running = 0; + async.groupBy(obj, function(val, next) { + running++; + async.setImmediate(function() { + expect(running).to.equal(concurrency[val]); + running--; + next(null, val); + }); + }, function(err, result) { + expect(running).to.equal(0); + expect(err).to.eql(null); + expect(result).to.eql({b: ['b'], c: ['c'], d: ['d']}); + done(); + }); + }); + + it('handles undefined', function(done) { + async.groupBy(undefined, function(val, next) { + assert(false, 'iteratee should not be called'); + next(); + }, function(err, result) { + expect(err).to.eql(null); + expect(result).to.eql({}); + done(); + }); + }); + + it('handles empty object', function(done) { + async.groupBy({}, function(val, next) { + assert(false, 'iteratee should not be called'); + next(); + }, function(err, result) { + expect(err).to.eql(null); + expect(result).to.eql({}); + done(); + }); + }); + + it('with reflect', function(done) { + var obj = {a: 1, b: 3, c: 2}; + var callOrder = []; + async.groupBy(obj, async.reflect(function(val, next) { + setTimeout(function() { + callOrder.push(val); + next(null, val+1); + }, val*25); + }), function(err, result) { + expect(err).to.eql(null); + expect(callOrder).to.eql([1, 2, 3]); + expect(result).to.eql({ + '[object Object]': [1, 2, 3] + }); + done(); + }); + }); + + it('error with reflect' , function(done) { + var obj = {a: 1, b: 3, c: 2}; + var callOrder = []; + async.groupBy(obj, async.reflect(function(val, next) { + setTimeout(function() { + callOrder.push(val); + if (val === 2) { + return next('fail'); + } + next(null, val+1); + }, val*25); + }), function(err, result) { + expect(err).to.eql(null); + expect(callOrder).to.eql([1, 2, 3]); + expect(result).to.eql({ + '[object Object]': [1, 2, 3] + }); + done(); + }); + }); + + + it('main callback optional' , function(done) { + var arr = [1, 2, 3]; + var runs = []; + async.groupBy(arr, function(val, next) { + runs.push(val); + var _done = (runs.length === arr.length); + async.setImmediate(function() { + next(null); + if (_done) { + expect(runs).to.eql(arr); + done(); + } + }); + }); + }); + + it('iteratee callback is only called once', function(done) { + async.groupBy([1, 2], function(item, callback) { + try { + callback(item); + } catch (exception) { + expect(function() { + callback(exception); + }).to.throw(/already called/); + done(); + } + }, function() { + throw new Error(); + }); + }); + + it('handles Map', function(done) { + if (typeof Map !== 'function') { + return done(); + } + + var map = new Map([ + ['a', 'a'], + ['b', 'b'], + ['c', 'a'] + ]); + + async.groupBy(map, function(val, next) { + next(null, val[1]+1); + }, function(err, result) { + expect(err).to.eql(null); + expect(result).to.eql({ + a1: [ ['a', 'a'], ['c', 'a']], + b1: [ ['b', 'b'] ] + }); + done(); + }); + }); + }); + + context('groupByLimit', function() { + var obj = {a: 'b', b: 'c', c: 'd'}; + it('basics', function(done) { + var running = 0; + var concurrency = {'b': 2, 'c': 2, 'd': 1}; + async.groupByLimit(obj, 2, function(val, next) { + running++; + async.setImmediate(function() { + expect(running).to.equal(concurrency[val]); + running--; + next(null, val); + }); + }, function(err, result) { + expect(running).to.equal(0); + expect(err).to.eql(null); + expect(result).to.eql({'b': ['b'], 'c': ['c'], 'd': ['d']}) + done(); + }); + }); + + it('error', function(done) { + async.groupByLimit(obj, 1, function(val, next) { + if (val === 'c') { + return next(new Error('fail')); + } + next(null, val); + }, function(err, result) { + expect(err).to.not.eql(null); + expect(result).to.eql({b: ['b']}); + done(); + }); + }); + + it('handles empty object', function(done) { + async.groupByLimit({}, 2, function(val, next) { + assert(false, 'iteratee should not be called'); + next(); + }, function(err, result) { + expect(err).to.eql(null); + expect(result).to.eql({}); + done(); + }); + }); + + it('handles undefined', function(done) { + async.groupByLimit(undefined, 2, function(val, next) { + assert(false, 'iteratee should not be called'); + next(); + }, function(err, result) { + expect(err).to.eql(null); + expect(result).to.eql({}); + done(); + }); + }); + + it('limit exceeds size', function(done) { + var callOrder = []; + async.groupByLimit([3, 2, 2, 1], 10, groupByIteratee.bind(this, callOrder), function(err, result) { + expect(err).to.eql(null); + expect(result).to.eql({2: [1], 3: [2, 2], 4: [3]}); + expect(callOrder).to.eql([1, 2, 2, 3]); + done(); + }); + }); + + it('limit equal size', function(done) { + var callOrder = []; + async.groupByLimit([3, 2, 2, 1], 4, groupByIteratee.bind(this, callOrder), function(err, result) { + expect(err).to.eql(null); + expect(result).to.eql({2: [1], 3: [2, 2], 4: [3]}); + expect(callOrder).to.eql([1, 2, 2, 3]); + done(); + }); + }); + + it('zero limit', function(done) { + async.groupByLimit([3, 2, 2, 1], 0, function(val, next) { + assert(false, 'iteratee should not be called'); + next(); + }, function(err, result) { + expect(err).to.eql(null); + expect(result).to.eql({}); + done(); + }); + }); + + it('does not continue replenishing after error', function(done) { + var started = 0; + var arr = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]; + var delay = 10; + var limit = 3; + var maxTime = 10 * arr.length; + + async.groupByLimit(arr, limit, function(val, next) { + started++; + if (started === 3) { + return next(new Error('fail')); + } + + setTimeout(function() { + next(); + }, delay); + }, function(err, result) { + expect(err).to.not.eql(null); + expect(result).to.eql({}); + }); + + setTimeout(function() { + expect(started).to.equal(3); + done(); + }, maxTime); + }); + }); + + context('groupBySeries', function() { + var obj = {a: 'b', b: 'c', c: 'd'}; + it('basics', function(done) { + var running = 0; + var concurrency = {'b': 1, 'c': 1, 'd': 1}; + async.groupBySeries(obj, function(val, next) { + running++; + async.setImmediate(function() { + expect(running).to.equal(concurrency[val]); + running--; + next(null, val); + }); + }, function(err, result) { + expect(running).to.equal(0); + expect(err).to.eql(null); + expect(result).to.eql({'b': ['b'], 'c': ['c'], 'd': ['d']}); + done(); + }); + }); + + it('error', function(done) { + async.groupBySeries(obj, function(val, next) { + if (val === 'c') { + return next(new Error('fail')); + } + next(null, val); + }, function(err, result) { + expect(err).to.not.eql(null); + expect(result).to.eql({b: ['b']}); + done(); + }); + }); + + it('handles arrays', function(done) { + async.groupBySeries(['a', 'a', 'b'], function(val, next) { + next(null, val); + }, function(err, result) { + expect(err).to.eql(null); + expect(result).to.eql({'a': ['a', 'a'], 'b': ['b']}); + done(); + }); + }); + + it('handles empty object', function(done) { + async.groupByLimit({}, 2, function(val, next) { + assert(false, 'iteratee should not be called'); + next(); + }, function(err, result) { + expect(err).to.eql(null); + expect(result).to.eql({}); + done(); + }); + }); + + it('handles undefined', function(done) { + async.groupBySeries(undefined, function(val, next) { + assert(false, 'iteratee should not be called'); + next(); + }, function(err, result) { + expect(err).to.eql(null); + expect(result).to.eql({}); + done(); + }); + }); + }); +}); From 434d047847ae40dab1f7887ee26ce457556964d1 Mon Sep 17 00:00:00 2001 From: Hubert Argasinski Date: Sun, 19 Feb 2017 20:42:37 -0500 Subject: [PATCH 2/2] groupBy PR [#1364] fixes --- lib/groupBy.js | 11 +++++--- lib/groupByLimit.js | 39 ++++++++++++++++------------ lib/map.js | 2 +- mocha_test/groupBy.js | 59 ++++++++++++++----------------------------- 4 files changed, 50 insertions(+), 61 deletions(-) diff --git a/lib/groupBy.js b/lib/groupBy.js index e1a036781..31e2c0b75 100644 --- a/lib/groupBy.js +++ b/lib/groupBy.js @@ -4,10 +4,13 @@ import groupByLimit from './groupByLimit'; /** * Returns a new object, where each value corresponds to an array of items, from * `coll`, that returned the corresponding key. That is, the keys of the object - * correspond to the values passed to the `iteratee` callback. Note: Since this - * function applies the `iteratee` to each item in parallel, there is no - * guarantee that the `iteratee` functions will complete in order and there is - * no guarantee that grouped items will appear in the same order as in `coll`. + * correspond to the values passed to the `iteratee` callback. + * + * Note: Since this function applies the `iteratee` to each item in parallel, + * there is no guarantee that the `iteratee` functions will complete in order. + * However, the values for each key in the `result` will be in the same order as + * the original `coll`. For Objects, the values will roughly be in the order of + * the original Objects' keys (but this can vary across JavaScript engines). * * @name groupBy * @static diff --git a/lib/groupByLimit.js b/lib/groupByLimit.js index c9130fb9a..9fc71a0c3 100644 --- a/lib/groupByLimit.js +++ b/lib/groupByLimit.js @@ -1,5 +1,5 @@ import noop from 'lodash/noop'; -import doParallelLimit from './internal/doParallelLimit'; +import mapLimit from './mapLimit'; /** * The same as [`groupBy`]{@link module:Collections.groupBy} but runs a maximum of `limit` async operations at a time. @@ -20,25 +20,32 @@ import doParallelLimit from './internal/doParallelLimit'; * functions have finished, or an error occurs. Result is an `Object` whoses * properties are arrays of values which returned the corresponding key. */ -export default doParallelLimit(function(eachFn, coll, iteratee, callback) { +export default function(coll, limit, iteratee, callback) { callback = callback || noop; - coll = coll || []; - var result = {}; - // from MDN, handle object having an `hasOwnProperty` prop - var hasOwnProperty = Object.prototype.hasOwnProperty; - eachFn(coll, function(val, _, callback) { + mapLimit(coll, limit, function(val, callback) { iteratee(val, function(err, key) { if (err) return callback(err); + return callback(null, {key: key, val: val}); + }); + }, function(err, mapResults) { + var result = {}; + // from MDN, handle object having an `hasOwnProperty` prop + var hasOwnProperty = Object.prototype.hasOwnProperty; + + for (var i = 0; i < mapResults.length; i++) { + if (mapResults[i]) { + var key = mapResults[i].key; + var val = mapResults[i].val; - if (hasOwnProperty.call(result, key)) { - result[key].push(val); - } else { - result[key] = [val]; + if (hasOwnProperty.call(result, key)) { + result[key].push(val); + } else { + result[key] = [val]; + } } - callback(null); - }); - }, function(err) { - callback(err, result); + } + + return callback(err, result); }); -}); +}; diff --git a/lib/map.js b/lib/map.js index 4c0b22685..76d60582b 100644 --- a/lib/map.js +++ b/lib/map.js @@ -16,7 +16,7 @@ import map from './internal/map'; * * If `map` is passed an Object, the results will be an Array. The results * will roughly be in the order of the original Objects' keys (but this can - * vary across JavaScript engines) + * vary across JavaScript engines). * * @name map * @static diff --git a/mocha_test/groupBy.js b/mocha_test/groupBy.js index a894626d8..afb612b0c 100644 --- a/mocha_test/groupBy.js +++ b/mocha_test/groupBy.js @@ -98,46 +98,6 @@ describe('groupBy', function() { }); }); - it('with reflect', function(done) { - var obj = {a: 1, b: 3, c: 2}; - var callOrder = []; - async.groupBy(obj, async.reflect(function(val, next) { - setTimeout(function() { - callOrder.push(val); - next(null, val+1); - }, val*25); - }), function(err, result) { - expect(err).to.eql(null); - expect(callOrder).to.eql([1, 2, 3]); - expect(result).to.eql({ - '[object Object]': [1, 2, 3] - }); - done(); - }); - }); - - it('error with reflect' , function(done) { - var obj = {a: 1, b: 3, c: 2}; - var callOrder = []; - async.groupBy(obj, async.reflect(function(val, next) { - setTimeout(function() { - callOrder.push(val); - if (val === 2) { - return next('fail'); - } - next(null, val+1); - }, val*25); - }), function(err, result) { - expect(err).to.eql(null); - expect(callOrder).to.eql([1, 2, 3]); - expect(result).to.eql({ - '[object Object]': [1, 2, 3] - }); - done(); - }); - }); - - it('main callback optional' , function(done) { var arr = [1, 2, 3]; var runs = []; @@ -191,6 +151,25 @@ describe('groupBy', function() { done(); }); }); + + it('handles sparse results', function(done) { + var arr = [1, 2, 3]; + async.groupBy(arr, function(val, next) { + if (val === 1) { + return next(null, val+1); + } else if (val === 2) { + async.setImmediate(function() { + return next(null, val+1); + }); + } else { + return next('error'); + } + }, function(err, result) { + expect(err).to.not.eql(null); + expect(result).to.eql({2: [1]}); + async.setImmediate(done); + }); + }); }); context('groupByLimit', function() {