Skip to content

Commit

Permalink
Allow stopping iteration via pMap.stop
Browse files Browse the repository at this point in the history
  • Loading branch information
papb committed Jun 5, 2021
1 parent 4146ef4 commit 8c2411c
Show file tree
Hide file tree
Showing 5 changed files with 285 additions and 19 deletions.
89 changes: 82 additions & 7 deletions index.d.ts
@@ -1,3 +1,7 @@
declare const stop: unique symbol;

export type StopSymbol = typeof stop;

export interface Options {
/**
Number of concurrently pending promises returned by `mapper`.
Expand All @@ -16,6 +20,46 @@ export interface Options {
readonly stopOnError?: boolean;
}

export interface OngoingMappingsStopOptions {
/**
Whether or not to remove all holes from the result array (caused by pending mappings).
@default false
*/
readonly collapse?: boolean;

/**
Value to use as immediate result for pending mappings, replacing holes from the result array.
This option is ignored if `collapse` is set to `true`.
@default undefined
*/
readonly fillWith?: unknown;
}

export interface StopOptions<NewElement> {
/**
Value to provide as result for this iteration.
@default undefined
*/
readonly value?: NewElement;

/**
Options to configure what `pMap` must do with any concurrent ongoing mappings at the moment `stop` is called.
*/
readonly ongoingMappings?: OngoingMappingsStopOptions;
}

type BaseStopValueWrapper<NewElement> = {
[stop]: Required<StopOptions<NewElement>>;
};

export type StopValueWrapper<NewElement> = NewElement extends any ? BaseStopValueWrapper<NewElement> : never;

type MaybeWrappedInStop<NewElement> = NewElement | StopValueWrapper<NewElement>;

/**
Function which is called for every item in `input`. Expected to return a `Promise` or value.
Expand All @@ -25,7 +69,7 @@ Function which is called for every item in `input`. Expected to return a `Promis
export type Mapper<Element = any, NewElement = unknown> = (
element: Element,
index: number
) => NewElement | Promise<NewElement>;
) => MaybeWrappedInStop<NewElement> | Promise<MaybeWrappedInStop<NewElement>>;

/**
@param input - Iterated over concurrently in the `mapper` function.
Expand All @@ -38,7 +82,7 @@ import pMap from 'p-map';
import got from 'got';
const sites = [
getWebsiteFromUsername('https://sindresorhus'), //=> Promise
getWebsiteFromUsername('sindresorhus'), //=> Promise
'https://avajs.dev',
'https://github.com'
];
Expand All @@ -54,8 +98,39 @@ console.log(result);
//=> ['https://sindresorhus.com/', 'https://avajs.dev/', 'https://github.com/']
```
*/
export default function pMap<Element, NewElement>(
input: Iterable<Element>,
mapper: Mapper<Element, NewElement>,
options?: Options
): Promise<NewElement[]>;
declare const pMap: {
<Element, NewElement>(
input: Iterable<Element>,
mapper: Mapper<Element, NewElement>,
options?: Options
): Promise<NewElement[]>;

/**
Creates a special object that indicates to `pMap` that iteration must stop immediately. This object should just be returned from within the mapper (and not used directly for anything).
@example
```
import pMap from 'p-map';
import got from 'got';
const numbers = Array.from({ length: 2000 }).map((_, i) => i + 1);
//=> [1, 2, ..., 1999, 2000]
const mapper = async number => {
if (number !== 404) {
const { transcript } = await got(`https://xkcd.com/${number}/info.0.json`).json();
if (/unicorn/.test(transcript)) {
console.log('Found a XKCD comic with an unicorn:', number);
return pMap.stop();
}
}
};
await pMap(numbers, mapper, { concurrency: 50 });
//=> Found a XKCD comic with an unicorn: 948
```
*/
stop: <NewElement>(options?: StopOptions<NewElement>) => StopValueWrapper<NewElement>;
};

export default pMap;
48 changes: 39 additions & 9 deletions index.js
@@ -1,5 +1,7 @@
import AggregateError from 'aggregate-error';

const stopSymbol = Symbol('pMap.stop');

export default async function pMap(
iterable,
mapper,
Expand All @@ -20,13 +22,13 @@ export default async function pMap(
const result = [];
const errors = [];
const iterator = iterable[Symbol.iterator]();
let isRejected = false;
const pendingIndexes = new Set();
let finished = false;
let isIterableDone = false;
let resolvingCount = 0;
let currentIndex = 0;

const next = () => {
if (isRejected) {
if (finished) {
return;
}

Expand All @@ -37,7 +39,7 @@ export default async function pMap(
if (nextItem.done) {
isIterableDone = true;

if (resolvingCount === 0) {
if (pendingIndexes.size === 0) {
if (!stopOnError && errors.length > 0) {
reject(new AggregateError(errors));
} else {
Expand All @@ -48,21 +50,47 @@ export default async function pMap(
return;
}

resolvingCount++;
pendingIndexes.add(index);

(async () => {
try {
const element = await nextItem.value;

if (finished) {
return;
}

result[index] = await mapper(element, index);
resolvingCount--;
next();

if (finished) {
return;
}

pendingIndexes.delete(index);

if (result[index] && result[index][stopSymbol]) {
finished = true;
const stopConfig = result[index][stopSymbol];
result[index] = stopConfig.value;
if (stopConfig.ongoingMappings.collapse) {
resolve(result.flat(0));
} else {
for (const pendingIndex of pendingIndexes) {
result[pendingIndex] = stopConfig.ongoingMappings.fillWith;
}

resolve(result);
}
} else {
next();
}
} catch (error) {
if (stopOnError) {
isRejected = true;
finished = true;
reject(error);
} else {
errors.push(error);
resolvingCount--;
pendingIndexes.delete(index);
next();
}
}
Expand All @@ -78,3 +106,5 @@ export default async function pMap(
}
});
}

pMap.stop = ({value, ongoingMappings = {}} = {}) => ({[stopSymbol]: {value, ongoingMappings}});
28 changes: 27 additions & 1 deletion index.test-d.ts
@@ -1,4 +1,4 @@
import {expectType, expectAssignable} from 'tsd';
import {expectType, expectAssignable, expectNotAssignable} from 'tsd';
import pMap, {Options, Mapper} from './index.js';

const sites = [
Expand All @@ -18,13 +18,26 @@ const asyncSyncMapper = async (site: string, index: number): Promise<string> =>
index > 1 ? site : Promise.resolve(site);
const multiResultTypeMapper = async (site: string, index: number): Promise<string | number> =>
index > 1 ? site.length : site;
const mapperStoppingEarly1 = async (site: string, index: number) =>
index > 1 ? pMap.stop({value: site.length}) : site;
const mapperStoppingEarly2 = async (site: string, index: number) =>
index > 1 ? pMap.stop({value: index > 2 ? site.length : site}) : site;
const mapperStoppingEarly3 = async (site: string, index: number) =>
index > 1 ? pMap.stop({value: index > 2 ? site.length : (index > 3 ? Date.now() : site)}) : true;

expectAssignable<Mapper>(asyncMapper);
expectAssignable<Mapper<string, string>>(asyncMapper);
expectAssignable<Mapper>(asyncSyncMapper);
expectAssignable<Mapper<string, string>>(asyncSyncMapper);
expectAssignable<Mapper<string, string | Promise<string>>>(asyncSyncMapper);
expectAssignable<Mapper>(multiResultTypeMapper);
expectAssignable<Mapper<string, string | number>>(multiResultTypeMapper);
expectAssignable<Mapper>(mapperStoppingEarly1);
expectAssignable<Mapper<string, string | number>>(mapperStoppingEarly1);
expectAssignable<Mapper>(mapperStoppingEarly2);
expectAssignable<Mapper<string, string | number>>(mapperStoppingEarly2);
expectAssignable<Mapper>(mapperStoppingEarly3);
expectAssignable<Mapper<string, string | number | boolean | Date>>(mapperStoppingEarly3);

expectAssignable<Options>({});
expectAssignable<Options>({concurrency: 0});
Expand All @@ -40,3 +53,16 @@ expectType<Promise<string[]>>(pMap(sites, (site: string) => site));
expectType<Promise<number[]>>(pMap(sites, (site: string) => site.length));

expectType<Promise<number[]>>(pMap(numbers, (number: number) => number * 2));

expectType<Promise<number[]>>(pMap(numbers, (number: number) => number * 2));

pMap.stop();
pMap.stop({});
pMap.stop({value: 123});
pMap.stop({ongoingMappings: {}});
pMap.stop({ongoingMappings: {collapse: true}});
pMap.stop({ongoingMappings: {fillWith: 'hello'}});
pMap.stop({value: Date.now(), ongoingMappings: {collapse: false, fillWith: 'hello'}});

const shouldBeUnusableDirectly = pMap.stop({value: 123});
expectNotAssignable<number>(shouldBeUnusableDirectly);
69 changes: 67 additions & 2 deletions readme.md
Expand Up @@ -4,7 +4,11 @@
Useful when you need to run promise-returning & async functions multiple times with different inputs concurrently.

This is different from `Promise.all()` in that you can control the concurrency and also decide whether or not to stop iterating when there's an error.
This is different from `Promise.all()` in that you can:

* Control the concurrency
* Decide whether or not to stop iterating when there's an error
* Stop iterating at any point (like `break` in standard loops)

## Install

Expand All @@ -19,7 +23,7 @@ import pMap from 'p-map';
import got from 'got';

const sites = [
getWebsiteFromUsername('https://sindresorhus'), //=> Promise
getWebsiteFromUsername('sindresorhus'), //=> Promise
'https://avajs.dev',
'https://github.com'
];
Expand All @@ -35,6 +39,29 @@ console.log(result);
//=> ['https://sindresorhus.com/', 'https://avajs.dev/', 'https://github.com/']
```

### Breaking from iteration

```js
import pMap from 'p-map';
import got from 'got';

const numbers = Array.from({ length: 2000 }).map((_, i) => i + 1);
//=> [1, 2, ..., 1999, 2000]

const mapper = async number => {
if (number !== 404) {
const { transcript } = await got(`https://xkcd.com/${number}/info.0.json`).json();
if (/unicorn/.test(transcript)) {
console.log('Found a XKCD comic with an unicorn:', number);
return pMap.stop();
}
}
};

await pMap(numbers, mapper, { concurrency: 50 });
//=> Found a XKCD comic with an unicorn: 948
```

## API

### pMap(input, mapper, options?)
Expand Down Expand Up @@ -72,6 +99,44 @@ Default: `true`

When set to `false`, instead of stopping when a promise rejects, it will wait for all the promises to settle and then reject with an [aggregated error](https://github.com/sindresorhus/aggregate-error) containing all the errors from the rejected promises.

### pMap.stop(options?)

Creates a special object that indicates to `pMap` that iteration must stop immediately. This object should just be returned from within the mapper (and not used directly for anything).

#### options

Type: `object`

##### value

Type: `any`\
Default: `undefined`

Value to provide as result for this iteration.

##### ongoingMappings

Type: `object`

Options to configure what `pMap` must do with any concurrent ongoing mappings at the moment `stop` is called.

###### collapse

Type: `boolean`\
Default: `false`

Whether or not to remove all holes from the result array (caused by pending mappings).

###### fillWith

Type: `any`\
Default: `undefined`

Value to use as immediate result for pending mappings, replacing holes from the result array.

This option is ignored if `collapse` is set to `true`.


## p-map for enterprise

Available as part of the Tidelift Subscription.
Expand Down

0 comments on commit 8c2411c

Please sign in to comment.