Skip to content

Commit

Permalink
initial experiment with async fn support
Browse files Browse the repository at this point in the history
  • Loading branch information
aearly committed Mar 22, 2017
1 parent 295b307 commit 9fc2995
Show file tree
Hide file tree
Showing 13 changed files with 139 additions and 11 deletions.
2 changes: 1 addition & 1 deletion .eslintrc
Expand Up @@ -6,7 +6,7 @@
"es6": true
},
"parserOptions": {
"ecmaVersion": 6,
"ecmaVersion": 8,
"sourceType": "module"
},
"rules": {
Expand Down
4 changes: 3 additions & 1 deletion .travis.yml
Expand Up @@ -4,6 +4,8 @@ node_js:
- "0.10"
- "0.12"
- "4"
- "6"
- "7"

matrix:
include:
Expand All @@ -27,4 +29,4 @@ script:
# ensure buildable
- "[ $MAKE_TEST == false ] || make"
# test in firefox
- "[ $BROWSER == false ] || npm run mocha-browser-test"
- "[ $BROWSER == false ] || npm run mocha-browser-test"
2 changes: 1 addition & 1 deletion lib/asyncify.js
Expand Up @@ -48,7 +48,7 @@ import initialParams from './internal/initialParams';
* }
* ], callback);
*
* // es6 example
* // es2017 example
* var q = async.queue(async.asyncify(async function(file) {

This comment has been minimized.

Copy link
@megawac

megawac Mar 23, 2017

Collaborator

So the asyncify part of this won't be necessary any more? Very cool!

* var intermediateStep = await processFile(file);
* return await somePromise(intermediateStep)
Expand Down
3 changes: 2 additions & 1 deletion lib/each.js
@@ -1,5 +1,6 @@
import eachOf from './eachOf';
import withoutIndex from './internal/withoutIndex';
import wrapAsync from './internal/wrapAsync'

/**
* Applies the function `iteratee` to each item in `coll`, in parallel.
Expand Down Expand Up @@ -61,5 +62,5 @@ import withoutIndex from './internal/withoutIndex';
* });
*/
export default function eachLimit(coll, iteratee, callback) {
eachOf(coll, withoutIndex(iteratee), callback);
eachOf(coll, withoutIndex(wrapAsync(iteratee)), callback);
}
3 changes: 2 additions & 1 deletion lib/eachLimit.js
@@ -1,5 +1,6 @@
import eachOfLimit from './internal/eachOfLimit';
import withoutIndex from './internal/withoutIndex';
import wrapAsync from './internal/wrapAsync';

/**
* The same as [`each`]{@link module:Collections.each} but runs a maximum of `limit` async operations at a time.
Expand All @@ -23,5 +24,5 @@ import withoutIndex from './internal/withoutIndex';
* `iteratee` functions have finished, or an error occurs. Invoked with (err).
*/
export default function eachLimit(coll, limit, iteratee, callback) {
eachOfLimit(limit)(coll, withoutIndex(iteratee), callback);
eachOfLimit(limit)(coll, withoutIndex(wrapAsync(iteratee)), callback);
}
3 changes: 2 additions & 1 deletion lib/eachOf.js
Expand Up @@ -6,6 +6,7 @@ import doLimit from './internal/doLimit';
import noop from 'lodash/noop';
import once from './internal/once';
import onlyOnce from './internal/onlyOnce';
import wrapAsync from './internal/wrapAsync';

// eachOf implementation optimized for array-likes
function eachOfArrayLike(coll, iteratee, callback) {
Expand Down Expand Up @@ -76,5 +77,5 @@ var eachOfGeneric = doLimit(eachOfLimit, Infinity);
*/
export default function(coll, iteratee, callback) {
var eachOfImplementation = isArrayLike(coll) ? eachOfArrayLike : eachOfGeneric;
eachOfImplementation(coll, iteratee, callback);
eachOfImplementation(coll, wrapAsync(iteratee), callback);
}
3 changes: 2 additions & 1 deletion lib/eachOfLimit.js
@@ -1,4 +1,5 @@
import _eachOfLimit from './internal/eachOfLimit';
import wrapAsync from './internal/wrapAsync';

/**
* The same as [`eachOf`]{@link module:Collections.eachOf} but runs a maximum of `limit` async operations at a
Expand All @@ -23,5 +24,5 @@ import _eachOfLimit from './internal/eachOfLimit';
* `iteratee` functions have finished, or an error occurs. Invoked with (err).
*/
export default function eachOfLimit(coll, limit, iteratee, callback) {
_eachOfLimit(limit)(coll, iteratee, callback);
_eachOfLimit(limit)(coll, wrapAsync(iteratee), callback);
}
4 changes: 3 additions & 1 deletion lib/internal/filter.js
Expand Up @@ -3,6 +3,8 @@ import isArrayLike from 'lodash/isArrayLike';
import property from 'lodash/_baseProperty';
import noop from 'lodash/noop';

import wrapAsync from './wrapAsync';

function filterArray(eachfn, arr, iteratee, callback) {
var truthValues = new Array(arr.length);
eachfn(arr, function (x, index, callback) {
Expand Down Expand Up @@ -46,5 +48,5 @@ function filterGeneric(eachfn, coll, iteratee, callback) {

export default function _filter(eachfn, coll, iteratee, callback) {
var filter = isArrayLike(coll) ? filterArray : filterGeneric;
filter(eachfn, coll, iteratee, callback || noop);
filter(eachfn, coll, wrapAsync(iteratee), callback || noop);
}
4 changes: 3 additions & 1 deletion lib/internal/map.js
@@ -1,14 +1,16 @@
import noop from 'lodash/noop';
import wrapAsync from './wrapAsync';

export default function _asyncMap(eachfn, arr, iteratee, callback) {
callback = callback || noop;
arr = arr || [];
var results = [];
var counter = 0;
var _iteratee = wrapAsync(iteratee);

eachfn(arr, function (value, _, callback) {
var index = counter++;
iteratee(value, function (err, v) {
_iteratee(value, function (err, v) {
results[index] = v;
callback(err);
});
Expand Down
11 changes: 11 additions & 0 deletions lib/internal/wrapAsync.js
@@ -0,0 +1,11 @@
import asyncify from '../asyncify';

var supportsSymbol = typeof Symbol !== 'undefined';

This comment has been minimized.

Copy link
@megawac

megawac Mar 23, 2017

Collaborator

nit: typeof Symbol === 'function'


export default function wrapAsync(asyncFn) {
if (!supportsSymbol) return asyncFn;

var isAsync = asyncFn[Symbol.toStringTag] === 'AsyncFunction';

return isAsync ? asyncify(asyncFn) : asyncFn;
}
20 changes: 20 additions & 0 deletions mocha_test/asyncFunctions.js
@@ -0,0 +1,20 @@
describe('async function support', function () {
this.timeout(100);

var supportsAsync;
var supportsSymbol = typeof Symbol !== 'undefined';
try {
/* eslint no-eval:0 */
var fn = eval('(async function() {})')
supportsAsync = supportsSymbol &&
fn[Symbol.toStringTag] === 'AsyncFunction';
} catch (e) {
supportsAsync = false;
}

if (supportsAsync) {
require('./es2017/asyncFunctions.js')();
} else {
it('should not test async functions in this environment');
}
});
86 changes: 86 additions & 0 deletions mocha_test/es2017/asyncFunctions.js
@@ -0,0 +1,86 @@
var async = require('../../lib');
const expect = require('chai').expect;
const assert = require('assert');


module.exports = function () {
async function asyncIdentity(val) {
var res = await Promise.resolve(val);
return res;
}

const input = [1, 2, 3];

it('should asyncify async functions', (done) => {
async.asyncify(asyncIdentity)(42, (err, val) => {
assert(val === 42);
done(err);
})
});

it('should handle async functions in each', (done) => {
async.each(input, asyncIdentity, done);
});

it('should handle async functions in eachLimit', (done) => {
async.eachLimit(input, 2, asyncIdentity, done);
});

it('should handle async functions in eachSeries', (done) => {
async.eachSeries(input, asyncIdentity, done);
});

it('should handle async functions in eachOf', (done) => {
async.eachOf(input, asyncIdentity, done);
});

it('should handle async functions in eachOfLimit', (done) => {
async.eachOfLimit(input, 2, asyncIdentity, done);
});

it('should handle async functions in eachOfSeries', (done) => {
async.eachOfSeries(input, asyncIdentity, done);
});

it('should handle async functions in map', (done) => {
async.map(input, asyncIdentity, (err, result) => {
expect(result).to.eql(input);
done(err);
});
});

it('should handle async functions in mapLimit', (done) => {
async.mapLimit(input, 2, asyncIdentity, (err, result) => {
expect(result).to.eql(input);
done(err);
});
});

it('should handle async functions in mapSeries', (done) => {
async.mapSeries(input, asyncIdentity, (err, result) => {
expect(result).to.eql(input);
done(err);
});
});

it('should handle async functions in filter', (done) => {
async.filter(input, asyncIdentity, (err, result) => {
expect(result).to.eql(input);
done(err);
});
});

it('should handle async functions in filterLimit', (done) => {
async.filterLimit(input, 2, asyncIdentity, (err, result) => {
expect(result).to.eql(input);
done(err);
});
});

it('should handle async functions in filterSeries', (done) => {
async.filterSeries(input, asyncIdentity, (err, result) => {
expect(result).to.eql(input);
done(err);
});
});
}
5 changes: 3 additions & 2 deletions package.json
Expand Up @@ -22,12 +22,13 @@
"lodash-es": "^4.14.0"
},
"devDependencies": {
"babel-cli": "^6.16.0",
"babel-core": "^6.3.26",
"babel-cli": "^6.24.0",
"babel-core": "^6.24.0",
"babel-plugin-add-module-exports": "^0.2.1",
"babel-plugin-istanbul": "^2.0.1",
"babel-plugin-transform-es2015-modules-commonjs": "^6.3.16",
"babel-preset-es2015": "^6.3.13",
"babel-preset-es2017": "^6.22.0",
"babelify": "^7.2.0",
"benchmark": "^2.1.1",
"bluebird": "^3.4.6",
Expand Down

5 comments on commit 9fc2995

@aearly
Copy link
Collaborator Author

@aearly aearly commented on 9fc2995 Mar 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@megawac @hargasinski I'd love to get your 👀 on this before I continue further.

@hargasinski
Copy link
Collaborator

@hargasinski hargasinski commented on 9fc2995 Mar 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The wrapAsync implementation looks good to me, especially with the refactor in ac0bdd0.

One question though, is there a reason for only wrapping certain iteratees in wrapAsync? Wrapping the iteratee in internal/eachOfLimit would give most methods support for async functions. From #1386, I know async/await does not make sense in flow controls functions, but I don't see a reason not to support it unless the additional performance of an extra wrapper is a concern?

By the way, is there anything I could help with?

@aearly
Copy link
Collaborator Author

@aearly aearly commented on 9fc2995 Mar 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just the first pass with a handful of functions -- I wanted to validate the strategy before applying it to every method.

Wrapping the iteratee in internal/eachOfLimit does catch a lot of things, but we have to wrap async functions before they are wrapped with something like withoutIndex, so it means it will have to be handled in a lot of places.

I think it's worth adding support to control flow functions -- will come in handy if you need to mix and match async functions and callback functions in the same parallel call, for example.

@megawac
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really cool! Are there going to be some cases we have to worry about where the async callback has a different form than (err, result) arguments? For example, would waterfall/series be supported?

Anyway, looks like great progress 🥇

@aearly
Copy link
Collaborator Author

@aearly aearly commented on 9fc2995 Mar 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since promises only support resolving to a single value, things like waterfall would get weird/tricky. You'd have to use arrays like:

async.waterfall([
  async function () {
    let a = await foo();
    return [a, b, c];
  }
  async function([a, b, c]) {
    let d = await bar(a, b, c);
    return [a, d];
  },
  async function ([a, d]) {
    // etc...
  }
], done)

Please sign in to comment.