From 778a7da7266f5588ad4964f4b61fc65f6911352f Mon Sep 17 00:00:00 2001 From: papb Date: Sat, 5 Jun 2021 20:42:23 -0300 Subject: [PATCH] Allow stopping iteration via `pMap.stop` --- index.d.ts | 87 +++++++++++++++++++++++++++++++++++++++++++++---- index.js | 48 ++++++++++++++++++++++----- index.test-d.ts | 28 +++++++++++++++- readme.md | 67 ++++++++++++++++++++++++++++++++++++- test.js | 70 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 283 insertions(+), 17 deletions(-) diff --git a/index.d.ts b/index.d.ts index 7dfbb7f..2c84a6a 100644 --- a/index.d.ts +++ b/index.d.ts @@ -1,3 +1,7 @@ +declare const stop: unique symbol; + +export type StopSymbol = typeof stop; + export interface Options { /** Number of concurrently pending promises returned by `mapper`. @@ -16,6 +20,46 @@ export interface Options { readonly stopOnError?: boolean; } +export interface OngoingMappingsStopOptions { + /** + Whether or not to remove all holes from the result array (caused by pending mappings). + + @default false + */ + readonly collapse?: boolean; + + /** + Value to use as immediate result for pending mappings, replacing holes from the result array. + + This option is ignored if `collapse` is set to `true`. + + @default undefined + */ + readonly fillWith?: unknown; +} + +export interface StopOptions { + /** + Value to provide as result for this iteration. + + @default undefined + */ + readonly value?: NewElement; + + /** + Options to configure what `pMap` must do with any concurrent ongoing mappings at the moment `stop` is called. + */ + readonly ongoingMappings?: OngoingMappingsStopOptions; +} + +type BaseStopValueWrapper = { + [stop]: Required>; +}; + +export type StopValueWrapper = NewElement extends any ? BaseStopValueWrapper : never; + +type MaybeWrappedInStop = NewElement | StopValueWrapper; + /** Function which is called for every item in `input`. Expected to return a `Promise` or value. @@ -25,7 +69,7 @@ Function which is called for every item in `input`. Expected to return a `Promis export type Mapper = ( element: Element, index: number -) => NewElement | Promise; +) => MaybeWrappedInStop | Promise>; /** @param input - Iterated over concurrently in the `mapper` function. @@ -54,8 +98,39 @@ console.log(result); //=> ['https://sindresorhus.com/', 'https://avajs.dev/', 'https://github.com/'] ``` */ -export default function pMap( - input: Iterable, - mapper: Mapper, - options?: Options -): Promise; +declare const pMap: { + ( + input: Iterable, + mapper: Mapper, + options?: Options + ): Promise; + + /** + Creates a special object that indicates to `pMap` that iteration must stop immediately. This object should just be returned from within the mapper (and not used directly for anything). + + @example + ``` + import pMap from 'p-map'; + import got from 'got'; + + const numbers = Array.from({ length: 2000 }).map((_, i) => i + 1); + //=> [1, 2, ..., 1999, 2000] + + const mapper = async number => { + if (number !== 404) { + const { transcript } = await got(`https://xkcd.com/${number}/info.0.json`).json(); + if (/unicorn/.test(transcript)) { + console.log('Found a XKCD comic with an unicorn:', number); + return pMap.stop(); + } + } + }; + + await pMap(numbers, mapper, { concurrency: 50 }); + //=> Found a XKCD comic with an unicorn: 948 + ``` + */ + stop: (options?: StopOptions) => StopValueWrapper; +}; + +export default pMap; diff --git a/index.js b/index.js index 15f3da2..5314058 100644 --- a/index.js +++ b/index.js @@ -1,5 +1,7 @@ import AggregateError from 'aggregate-error'; +const stopSymbol = Symbol('pMap.stop'); + export default async function pMap( iterable, mapper, @@ -20,13 +22,13 @@ export default async function pMap( const result = []; const errors = []; const iterator = iterable[Symbol.iterator](); - let isRejected = false; + const pendingIndexes = new Set(); + let finished = false; let isIterableDone = false; - let resolvingCount = 0; let currentIndex = 0; const next = () => { - if (isRejected) { + if (finished) { return; } @@ -37,7 +39,7 @@ export default async function pMap( if (nextItem.done) { isIterableDone = true; - if (resolvingCount === 0) { + if (pendingIndexes.size === 0) { if (!stopOnError && errors.length > 0) { reject(new AggregateError(errors)); } else { @@ -48,21 +50,47 @@ export default async function pMap( return; } - resolvingCount++; + pendingIndexes.add(index); (async () => { try { const element = await nextItem.value; + + if (finished) { + return; + } + result[index] = await mapper(element, index); - resolvingCount--; - next(); + + if (finished) { + return; + } + + pendingIndexes.delete(index); + + if (result[index] && result[index][stopSymbol]) { + finished = true; + const stopConfig = result[index][stopSymbol]; + result[index] = stopConfig.value; + if (stopConfig.ongoingMappings.collapse) { + resolve(result.flat(0)); + } else { + for (const pendingIndex of pendingIndexes) { + result[pendingIndex] = stopConfig.ongoingMappings.fillWith; + } + + resolve(result); + } + } else { + next(); + } } catch (error) { if (stopOnError) { - isRejected = true; + finished = true; reject(error); } else { errors.push(error); - resolvingCount--; + pendingIndexes.delete(index); next(); } } @@ -78,3 +106,5 @@ export default async function pMap( } }); } + +pMap.stop = ({value, ongoingMappings = {}} = {}) => ({[stopSymbol]: {value, ongoingMappings}}); diff --git a/index.test-d.ts b/index.test-d.ts index f5595ce..88e5959 100644 --- a/index.test-d.ts +++ b/index.test-d.ts @@ -1,4 +1,4 @@ -import {expectType, expectAssignable} from 'tsd'; +import {expectType, expectAssignable, expectNotAssignable} from 'tsd'; import pMap, {Options, Mapper} from './index.js'; const sites = [ @@ -18,13 +18,26 @@ const asyncSyncMapper = async (site: string, index: number): Promise => index > 1 ? site : Promise.resolve(site); const multiResultTypeMapper = async (site: string, index: number): Promise => index > 1 ? site.length : site; +const mapperStoppingEarly1 = async (site: string, index: number) => + index > 1 ? pMap.stop({value: site.length}) : site; +const mapperStoppingEarly2 = async (site: string, index: number) => + index > 1 ? pMap.stop({value: index > 2 ? site.length : site}) : site; +const mapperStoppingEarly3 = async (site: string, index: number) => + index > 1 ? pMap.stop({value: index > 2 ? site.length : (index > 3 ? Date.now() : site)}) : true; expectAssignable(asyncMapper); expectAssignable>(asyncMapper); expectAssignable(asyncSyncMapper); +expectAssignable>(asyncSyncMapper); expectAssignable>>(asyncSyncMapper); expectAssignable(multiResultTypeMapper); expectAssignable>(multiResultTypeMapper); +expectAssignable(mapperStoppingEarly1); +expectAssignable>(mapperStoppingEarly1); +expectAssignable(mapperStoppingEarly2); +expectAssignable>(mapperStoppingEarly2); +expectAssignable(mapperStoppingEarly3); +expectAssignable>(mapperStoppingEarly3); expectAssignable({}); expectAssignable({concurrency: 0}); @@ -40,3 +53,16 @@ expectType>(pMap(sites, (site: string) => site)); expectType>(pMap(sites, (site: string) => site.length)); expectType>(pMap(numbers, (number: number) => number * 2)); + +expectType>(pMap(numbers, (number: number) => number * 2)); + +pMap.stop(); +pMap.stop({}); +pMap.stop({value: 123}); +pMap.stop({ongoingMappings: {}}); +pMap.stop({ongoingMappings: {collapse: true}}); +pMap.stop({ongoingMappings: {fillWith: 'hello'}}); +pMap.stop({value: Date.now(), ongoingMappings: {collapse: false, fillWith: 'hello'}}); + +const shouldBeUnusableDirectly = pMap.stop({value: 123}); +expectNotAssignable(shouldBeUnusableDirectly); diff --git a/readme.md b/readme.md index 8b666a4..557d518 100644 --- a/readme.md +++ b/readme.md @@ -4,7 +4,11 @@ Useful when you need to run promise-returning & async functions multiple times with different inputs concurrently. -This is different from `Promise.all()` in that you can control the concurrency and also decide whether or not to stop iterating when there's an error. +This is different from `Promise.all()` in that you can: + +* Control the concurrency +* Decide whether or not to stop iterating when there's an error +* Stop iterating at any point (like `break` in standard loops) ## Install @@ -35,6 +39,29 @@ console.log(result); //=> ['https://sindresorhus.com/', 'https://avajs.dev/', 'https://github.com/'] ``` +### Breaking from iteration + +```js +import pMap from 'p-map'; +import got from 'got'; + +const numbers = Array.from({ length: 2000 }).map((_, i) => i + 1); +//=> [1, 2, ..., 1999, 2000] + +const mapper = async number => { + if (number !== 404) { + const { transcript } = await got(`https://xkcd.com/${number}/info.0.json`).json(); + if (/unicorn/.test(transcript)) { + console.log('Found a XKCD comic with an unicorn:', number); + return pMap.stop(); + } + } +}; + +await pMap(numbers, mapper, { concurrency: 50 }); +//=> Found a XKCD comic with an unicorn: 948 +``` + ## API ### pMap(input, mapper, options?) @@ -72,6 +99,44 @@ Default: `true` When set to `false`, instead of stopping when a promise rejects, it will wait for all the promises to settle and then reject with an [aggregated error](https://github.com/sindresorhus/aggregate-error) containing all the errors from the rejected promises. +### pMap.stop(options?) + +Creates a special object that indicates to `pMap` that iteration must stop immediately. This object should just be returned from within the mapper (and not used directly for anything). + +#### options + +Type: `object` + +##### value + +Type: `any`\ +Default: `undefined` + +Value to provide as result for this iteration. + +##### ongoingMappings + +Type: `object` + +Options to configure what `pMap` must do with any concurrent ongoing mappings at the moment `stop` is called. + +###### collapse + +Type: `boolean`\ +Default: `false` + +Whether or not to remove all holes from the result array (caused by pending mappings). + +###### fillWith + +Type: `any`\ +Default: `undefined` + +Value to use as immediate result for pending mappings, replacing holes from the result array. + +This option is ignored if `collapse` is set to `true`. + + ## p-map for enterprise Available as part of the Tidelift Subscription. diff --git a/test.js b/test.js index 35d361a..4bfaa1d 100644 --- a/test.js +++ b/test.js @@ -107,3 +107,73 @@ test('aggregate errors when stopOnError is false', async t => { await t.throwsAsync(pMap(errorInput1, mapper, {concurrency: 1, stopOnError: false}), {instanceOf: AggregateError, message: /foo(.|\n)*bar/}); await t.throwsAsync(pMap(errorInput2, mapper, {concurrency: 1, stopOnError: false}), {instanceOf: AggregateError, message: /bar(.|\n)*foo/}); }); + +test('stop early, finite concurrency, default ongoingMappings behavior', async t => { + const input = [ + [1, 300], + [2, 100], + [() => pMap.stop({value: 3}), 200], + [4, 300], + [5, 50], + [6, 100], + [7, 50] + ]; + + const result = await pMap(input, mapper, {concurrency: 4}); + t.deepEqual(result, [undefined, 2, 3, undefined, 5, undefined]); +}); + +test('stop early, finite concurrency, collapse', async t => { + const input = [ + [1, 300], + [2, 100], + [() => pMap.stop({value: 3, ongoingMappings: {collapse: true}}), 200], + [4, 300], + [5, 50], + [6, 100], + [7, 50] + ]; + + const result = await pMap(input, mapper, {concurrency: 4}); + t.deepEqual(result, [2, 3, 5]); +}); + +test('stop early, finite concurrency, fill with zeros', async t => { + const input = [ + [1, 300], + [2, 100], + [() => pMap.stop({value: 3, ongoingMappings: {fillWith: 0}}), 200], + [4, 300], + [5, 50], + [6, 100], + [7, 50] + ]; + + const result = await pMap(input, mapper, {concurrency: 4}); + t.deepEqual(result, [0, 2, 3, 0, 5, 0]); +}); + +test('stop early, collapse overrides fillWith', async t => { + const input = [ + [1, 100], + [2, 10], + [() => pMap.stop({value: 3, ongoingMappings: {collapse: true, fillWith: 0}}), 20], + [4, 50], + [5, 50] + ]; + + const result = await pMap(input, mapper); + t.deepEqual(result, [2, 3]); +}); + +test('stop early, stop value is optional', async t => { + const input = [ + [1, 10], + [2, 10], + [() => pMap.stop(), 10], + [4, 10] + ]; + + const result = await pMap(input, mapper, {concurrency: 1}); + t.deepEqual(result, [1, 2, undefined]); +});