diff --git a/lib/concat.js b/lib/concat.js index 7edf75937..b4857c9d2 100644 --- a/lib/concat.js +++ b/lib/concat.js @@ -1,5 +1,5 @@ -import concat from './internal/concat'; -import doParallel from './internal/doParallel'; +import doLimit from './internal/doLimit'; +import concatLimit from './concatLimit'; /** * Applies `iteratee` to each item in `coll`, concatenating the results. Returns @@ -26,4 +26,4 @@ import doParallel from './internal/doParallel'; * // files is now a list of filenames that exist in the 3 directories * }); */ -export default doParallel(concat); +export default doLimit(concatLimit, Infinity); diff --git a/lib/concatLimit.js b/lib/concatLimit.js index a86fd74e3..4df03ea05 100644 --- a/lib/concatLimit.js +++ b/lib/concatLimit.js @@ -1,5 +1,9 @@ -import doParallelLimit from './internal/doParallelLimit'; -import concat from './internal/concat'; +import noop from 'lodash/noop'; +import wrapAsync from './internal/wrapAsync'; +import slice from './internal/slice'; +import mapLimit from './mapLimit'; + +var _concat = Array.prototype.concat; /** * The same as [`concat`]{@link module:Collections.concat} but runs a maximum of `limit` async operations at a time. @@ -14,9 +18,27 @@ import concat from './internal/concat'; * @param {number} limit - The maximum number of async operations at a time. * @param {AsyncFunction} iteratee - A function to apply to each item in `coll`, * which should use an array as its result. Invoked with (item, callback). - * @param {Function} [callback(err)] - A callback which is called after all the + * @param {Function} [callback] - A callback which is called after all the * `iteratee` functions have finished, or an error occurs. Results is an array * containing the concatenated results of the `iteratee` function. Invoked with * (err, results). */ -export default doParallelLimit(concat); +export default function(coll, limit, iteratee, callback) { + callback = callback || noop; + var _iteratee = wrapAsync(iteratee); + mapLimit(coll, limit, function(val, callback) { + _iteratee(val, function(err /*, ...args*/) { + if (err) return callback(err); + return callback(null, slice(arguments, 1)); + }); + }, function(err, mapResults) { + var result = []; + for (var i = 0; i < mapResults.length; i++) { + if (mapResults[i]) { + result = _concat.apply(result, mapResults[i]); + } + } + + return callback(err, result); + }); +} diff --git a/lib/concatSeries.js b/lib/concatSeries.js index f5b3fab9c..26f5268e5 100644 --- a/lib/concatSeries.js +++ b/lib/concatSeries.js @@ -1,5 +1,5 @@ -import concat from './internal/concat'; -import doSeries from './internal/doSeries'; +import doLimit from './internal/doLimit'; +import concatLimit from './concatLimit'; /** * The same as [`concat`]{@link module:Collections.concat} but runs only a single async operation at a time. @@ -19,4 +19,4 @@ import doSeries from './internal/doSeries'; * containing the concatenated results of the `iteratee` function. Invoked with * (err, results). */ -export default doSeries(concat); +export default doLimit(concatLimit, 1); diff --git a/lib/internal/concat.js b/lib/internal/concat.js deleted file mode 100644 index a55de2eec..000000000 --- a/lib/internal/concat.js +++ /dev/null @@ -1,11 +0,0 @@ -export default function concat(eachfn, arr, fn, callback) { - var result = []; - eachfn(arr, function (x, index, cb) { - fn(x, function (err, y) { - result = result.concat(y || []); - cb(err); - }); - }, function (err) { - callback(err, result); - }); -} diff --git a/lib/internal/doSeries.js b/lib/internal/doSeries.js deleted file mode 100644 index 9fea488d9..000000000 --- a/lib/internal/doSeries.js +++ /dev/null @@ -1,8 +0,0 @@ -import eachOfSeries from '../eachOfSeries'; -import wrapAsync from './wrapAsync'; - -export default function doSeries(fn) { - return function (obj, iteratee, callback) { - return fn(eachOfSeries, obj, wrapAsync(iteratee), callback); - }; -} diff --git a/mocha_test/concat.js b/mocha_test/concat.js index 97c0a96ab..f6b73b3b9 100644 --- a/mocha_test/concat.js +++ b/mocha_test/concat.js @@ -3,92 +3,415 @@ var expect = require('chai').expect; var assert = require('assert'); describe('concat', function() { - it('concat', function(done) { - var call_order = []; - var iteratee = function (x, cb) { - setTimeout(function(){ - call_order.push(x); - var r = []; - while (x > 0) { - r.push(x); - x--; + this.timeout(250); + + function concatIteratee(callOrder, val, next) { + setTimeout(function() { + callOrder.push(val); + next(null, [val, val+1]); + }, val * 25); + } + + context('concat', function() { + it('basics', function(done) { + var callOrder = []; + async.concat([1, 3, 2], concatIteratee.bind(this, callOrder), function(err, result) { + expect(err).to.eql(null); + expect(callOrder).to.eql([1, 2, 3]); + expect(result).to.eql([1, 2, 3, 4, 2, 3]); + done(); + }); + }); + + it('error', function(done) { + async.concat([1, 3, 2], function(val, next) { + if (val === 3) { + return next(new Error('fail')); } - cb(null, r); - }, x*25); - }; - async.concat([1,3,2], iteratee, function(err, results){ - expect(results).to.eql([1,2,1,3,2,1]); - expect(call_order).to.eql([1,2,3]); - assert(err === null, err + " passed instead of 'null'"); - done(); + next(null, [val, val+1]); + }, function(err, result) { + expect(err).to.not.eql(null); + expect(result).to.eql([1, 2]); + done(); + }); }); - }); - it('concat error', function(done) { - var iteratee = function (x, cb) { - cb(new Error('test error')); - }; - async.concat([1,2,3], iteratee, function(err){ - assert(err); - done(); + it('original untouched', function(done) { + var arr = ['foo', 'bar', 'baz']; + async.concat(arr, function(val, next) { + next(null, [val, val]); + }, function(err, result) { + expect(arr).to.eql(['foo', 'bar', 'baz']); + expect(result).to.eql(['foo', 'foo', 'bar', 'bar', 'baz', 'baz']); + done(); + }); + }); + + it('empty results', function(done) { + var arr = ['foo', 'bar', 'baz']; + async.concat(arr, function(val, next) { + next(null); + }, function(err, result) { + expect(err).to.eql(null); + expect(result).to.be.an('array').that.is.empty; + done(); + }); + }); + + it('empty arrays', function(done) { + var arr = ['foo', 'bar', 'baz']; + async.concat(arr, function(val, next) { + next(null, []); + }, function(err, result) { + expect(err).to.eql(null); + expect(result).to.be.an('array').that.is.empty; + done(); + }); + }); + + it('handles empty object', function(done) { + async.concat({}, function(val, next) { + assert(false, 'iteratee should not be called'); + next(); + }, function(err, result) { + expect(err).to.eql(null); + expect(result).to.be.an('array').that.is.empty; + done(); + }); + }); + + it('variadic', function(done) { + var arr = ['foo', 'bar', 'baz']; + async.concat(arr, function(val, next) { + next(null, val, val); + }, function(err, result) { + expect(err).to.eql(null); + expect(result).to.eql(['foo', 'foo', 'bar', 'bar', 'baz', 'baz']); + done(); + }); + }); + + it('flattens arrays', function(done) { + var arr = ['foo', 'bar']; + async.concat(arr, function(val, next) { + next(null, [val, [val]]); + }, function(err, result) { + expect(err).to.eql(null); + expect(result).to.eql(['foo', ['foo'], 'bar', ['bar']]); + done(); + }); }); - }); - it('concatSeries', function(done) { - var call_order = []; - var iteratee = function (x, cb) { - setTimeout(function(){ - call_order.push(x); - var r = []; - while (x > 0) { - r.push(x); - x--; + it('handles fasly values', function(done) { + var falsy = [null, undefined, 0, '']; + async.concat(falsy, function(val, next) { + next(null, val); + }, function(err, result) { + expect(err).to.eql(null); + expect(result).to.eql(falsy); + done(); + }); + }); + + it('handles objects', function(done) { + var obj = {a: 'foo', b: 'bar', c: 'baz'}; + async.concat(obj, function(val, next) { + next(null, val); + }, function(err, result) { + expect(err).to.eql(null); + expect(result).to.eql(['foo', 'bar', 'baz']); + done(); + }); + }); + + it('main callback optional', function(done) { + var arr = [1, 2, 3]; + var runs = []; + async.concat(arr, function(val, next) { + runs.push(val); + var _done = (runs.length === arr.length); + async.setImmediate(function() { + next(null); + if (_done) { + expect(runs).to.eql(arr); + done(); + } + }); + }); + }); + + it('iteratee callback is only called once', function(done) { + async.concat([1, 2], function(val, next) { + try { + next(val); + } catch (exception) { + expect(function() { + next(exception); + }).to.throw(/already called/); + done(); } - cb(null, r); - }, x*25); - }; - async.concatSeries([1,3,2], iteratee, function(err, results){ - expect(results).to.eql([1,3,2,1,2,1]); - expect(call_order).to.eql([1,3,2]); - assert(err === null, err + " passed instead of 'null'"); - done(); + }, function() { + throw new Error(); + }); }); - }); - it('concatLimit basics', function(done) { - var running = 0; - var concurrency = { - 'foo': 2, - 'bar': 2, - 'baz': 1 - }; - - async.concatLimit(['foo', 'bar', 'baz'], 2, function(val, next) { - running++; - async.setImmediate(function() { - expect(running).to.equal(concurrency[val]); - running--; - next(null, [val, val]); + it('preserves order', function(done) { + var arr = [30, 15]; + async.concat(arr, function(x, cb) { + setTimeout(function() { + cb(null, x); + }, x); + }, function(err, result) { + expect(err).to.eql(null); + expect(result).to.eql(arr); + done(); + }); + }); + + it('handles Map', function(done) { + if (typeof Map !== 'function') { + return done(); + } + + var map = new Map([ + ['a', 'b'], + ['b', 'c'], + ['c', 'd'] + ]); + + async.concat(map, function(val, next) { + next(null, val); + }, function(err, result) { + expect(err).to.eql(null); + expect(result).to.eql(['a', 'b', 'b', 'c', 'c', 'd']); + done(); + }); + }); + + it('handles sparse results', function(done) { + var arr = [1, 2, 3, 4]; + async.concat(arr, function(val, next) { + if (val === 1 || val === 3) { + return next(null, val+1); + } else if (val === 2) { + async.setImmediate(function() { + return next(null, val+1); + }); + } else { + return next('error'); + } + }, function(err, result) { + expect(err).to.not.eql(null); + expect(result).to.eql([2, 4]); + async.setImmediate(done); }); - }, function(err, result) { - expect(running).to.equal(0); - expect(err).to.eql(null); - expect(result).to.eql(['foo', 'foo', 'bar', 'bar', 'baz', 'baz']); - done(); }); }); - it('concatLimit error', function(done) { + context('concatLimit', function() { var arr = ['foo', 'bar', 'baz']; - async.concatLimit(arr, 2, function(val, next) { - if (val === 'bar') { - return next(new Error('fail')); + it('basics', function(done) { + var running = 0; + var concurrency = {'foo': 2, 'bar': 2, 'baz': 1}; + async.concatLimit(arr, 2, function(val, next) { + running++; + async.setImmediate(function() { + expect(running).to.equal(concurrency[val]); + running--; + next(null, val, val); + }) + }, function(err, result) { + expect(running).to.equal(0); + expect(err).to.eql(null); + expect(result).to.eql(['foo', 'foo', 'bar', 'bar', 'baz', 'baz']); + done(); + }); + }); + + it('error', function(done) { + async.concatLimit(arr, 1, function(val, next) { + if (val === 'bar') { + return next(new Error('fail')); + } + next(null, val); + }, function(err, result) { + expect(err).to.not.eql(null); + expect(result).to.eql(['foo']); + done(); + }); + }); + + it('handles objects', function(done) { + async.concatLimit({'foo': 1, 'bar': 2, 'baz': 3}, 2, function(val, next) { + next(null, val+1); + }, function(err, result) { + expect(err).to.eql(null); + expect(result).to.eql([2, 3, 4]); + done(); + }); + }); + + it('handles empty object', function(done) { + async.concatLimit({}, 2, function(val, next) { + assert(false, 'iteratee should not be called'); + next(); + }, function(err, result) { + expect(err).to.eql(null); + expect(result).to.be.an('array').that.is.empty; + done(); + }); + }); + + it('handles undefined', function(done) { + async.concatLimit(undefined, 2, function(val, next) { + assert(false, 'iteratee should not be called'); + next(); + }, function(err, result) { + expect(err).to.eql(null); + expect(result).to.be.an('array').that.is.empty; + done(); + }); + }); + + it('limit exceeds size', function(done) { + var callOrder = []; + async.concatLimit([3, 2, 2, 1], 10, concatIteratee.bind(this, callOrder), function(err, result) { + expect(err).to.eql(null); + expect(result).to.eql([3, 4, 2, 3, 2, 3, 1, 2]); + expect(callOrder).to.eql([1, 2, 2, 3]); + done(); + }); + }); + + it('limit equal size', function(done) { + var callOrder = []; + async.concatLimit([3, 2, 2, 1], 4, concatIteratee.bind(this, callOrder), function(err, result) { + expect(err).to.eql(null); + expect(result).to.eql([3, 4, 2, 3, 2, 3, 1, 2]); + expect(callOrder).to.eql([1, 2, 2, 3]); + done(); + }); + }); + + it('zero limit', function(done) { + async.concatLimit([3, 2, 2, 1], 0, function(val, next) { + assert(false, 'iteratee should not be called'); + next(); + }, function(err, result) { + expect(err).to.eql(null); + expect(result).to.be.an('array').that.is.empty; + done(); + }); + }); + + it('does not continue replenishing after error', function(done) { + var started = 0; + var arr = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]; + var limit = 3; + var step = 0; + var maxSteps = arr.length; + + async.concatLimit(arr, limit, function(val, next) { + started++; + if (started === 3) { + return next(new Error('fail')); + } + + async.setImmediate(function() { + next(); + }); + }, function(err, result) { + expect(err).to.not.eql(null); + expect(result).to.be.an('array').that.is.empty; + }); + + // wait `maxSteps` event loop cycles before calling done to ensure + // the iteratee is not called on more items in arr. + function waitCycle() { + step++; + if (step >= maxSteps) { + expect(started).to.equal(3); + done(); + return; + } else { + async.setImmediate(waitCycle); + } } - next(null, [val, val]); - }, function(err, result) { - expect(err).to.not.eql(null); - expect(result).to.eql(['foo', 'foo']); - done(); + + async.setImmediate(waitCycle); + }); + }); + + context('concatSeries', function() { + it('basics', function(done) { + var callOrder = []; + var running = 0; + var iteratee = function (x, cb) { + running++; + setTimeout(function() { + expect(running).to.equal(1); + running--; + callOrder.push(x); + var r = []; + while (x > 0) { + r.push(x); + x--; + } + cb(null, r); + }, x*25); + }; + async.concatSeries([1,3,2], iteratee, function(err, results) { + expect(results).to.eql([1,3,2,1,2,1]); + expect(running).to.equal(0); + expect(callOrder).to.eql([1,3,2]); + assert(err === null, err + " passed instead of 'null'"); + done(); + }); + }); + + it('error', function(done) { + async.concatSeries(['foo', 'bar', 'baz'], function(val, next) { + if (val === 'bar') { + return next(new Error('fail')); + } + next(null, [val, val]); + }, function(err, result) { + expect(err).to.not.eql(null); + expect(result).to.eql(['foo', 'foo']); + done(); + }); + }); + + it('handles objects', function(done) { + async.concatSeries({'foo': 1, 'bar': 2, 'baz': 3}, function(val, next) { + return next(null, [val, val+1]); + }, function(err, result) { + expect(err).to.eql(null); + expect(result).to.eql([1, 2, 2, 3, 3, 4]); + done(); + }); + }); + + it('handles empty object', function(done) { + async.concatSeries({}, function(val, next) { + assert(false, 'iteratee should not be called'); + next(); + }, function(err, result) { + expect(err).to.eql(null); + expect(result).to.be.an('array').that.is.empty; + done(); + }); + }); + + it('handles undefined', function(done) { + async.concatSeries(undefined, function(val, next) { + assert(false, 'iteratee should not be called'); + next(); + }, function(err, result) { + expect(err).to.eql(null); + expect(result).to.be.an('array').that.is.empty; + done(); + }); }); }); }); diff --git a/mocha_test/es2017/asyncFunctions.js b/mocha_test/es2017/asyncFunctions.js index eb282bca7..1a93f4f75 100644 --- a/mocha_test/es2017/asyncFunctions.js +++ b/mocha_test/es2017/asyncFunctions.js @@ -212,6 +212,14 @@ module.exports = function () { }); }); + it('should handle async functions in concatLimit', (done) => { + async.concatLimit(input, 2, asyncIdentity, (err, result) => { + expect(err).to.eql(null); + expect(result).to.eql(input); + done(err); + }); + }); + it('should handle async functions in concatSeries', (done) => { async.concatSeries(input, asyncIdentity, (err, result) => { expect(result).to.eql(input); diff --git a/mocha_test/groupBy.js b/mocha_test/groupBy.js index afb612b0c..d20f385f4 100644 --- a/mocha_test/groupBy.js +++ b/mocha_test/groupBy.js @@ -330,7 +330,7 @@ describe('groupBy', function() { }); it('handles empty object', function(done) { - async.groupByLimit({}, 2, function(val, next) { + async.groupBySeries({}, function(val, next) { assert(false, 'iteratee should not be called'); next(); }, function(err, result) { diff --git a/mocha_test/slice.js b/mocha_test/slice.js new file mode 100644 index 000000000..70205264d --- /dev/null +++ b/mocha_test/slice.js @@ -0,0 +1,32 @@ +var slice = require('../lib/internal/slice').default; +var expect = require('chai').expect; + +describe('slice', function() { + it('should slice arrays', function() { + var arr = ['foo', 'bar', 'baz']; + var result = slice(arr, 2); + expect(arr).to.eql(['foo', 'bar', 'baz']); + expect(result).to.eql(['baz']); + }); + + it('should handle ArrayLike objects', function() { + var args = {0: 'foo', 1: 'bar', 2: 'baz', length: 3}; + var result = slice(args, 1); + expect(result).to.be.an('array'); + expect(result).to.eql(['bar', 'baz']); + }); + + it('should handle arguments', function() { + var foo = function() { + return slice(arguments, 1); + }; + var result = foo.apply(null, ['foo', 'bar', 'baz']); + expect(result).to.be.an('array'); + expect(result).to.eql(['bar', 'baz']); + }); + + it('should return an empty array on an invalid start', function() { + var result = slice(['foo', 'bar', 'baz'], 10); + expect(result).to.be.an('array').that.is.empty; + }); +}); diff --git a/perf/suites.js b/perf/suites.js index 677517ce9..77ab59beb 100644 --- a/perf/suites.js +++ b/perf/suites.js @@ -130,6 +130,24 @@ module.exports = [{ }); }, done); } +}, { + name: "concat", + // args lists are passed to the setup function + args: [ + [10], + [300], + [10000] + ], + setup: function setup(count) { + tasks = _.range(count); + }, + fn: function(async, done) { + async.concat(tasks, function(num, cb) { + async.setImmediate(function() { + cb(null, [num]); + }); + }, done); + } }, { name: "eachOf", // args lists are passed to the setup function