Skip to content

Commit

Permalink
Add condition, take, fork to forkApi
Browse files Browse the repository at this point in the history
  • Loading branch information
kkirby committed Apr 22, 2024
1 parent e46eb99 commit 6bfbf34
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 58 deletions.
123 changes: 76 additions & 47 deletions packages/toolkit/src/listenerMiddleware/index.ts
Expand Up @@ -82,52 +82,6 @@ const INTERNAL_NIL_TOKEN = {} as const

const alm = 'listenerMiddleware' as const

const createFork = (
parentAbortSignal: AbortSignalWithReason<unknown>,
parentBlockingPromises: Promise<any>[],
) => {
const linkControllers = (controller: AbortController) =>
addAbortSignalListener(parentAbortSignal, () =>
abortControllerWithReason(controller, parentAbortSignal.reason),
)

return <T>(
taskExecutor: ForkedTaskExecutor<T>,
opts?: ForkOptions,
): ForkedTask<T> => {
assertFunction(taskExecutor, 'taskExecutor')
const childAbortController = new AbortController()

linkControllers(childAbortController)

const result = runTask<T>(
async (): Promise<T> => {
validateActive(parentAbortSignal)
validateActive(childAbortController.signal)
const result = (await taskExecutor({
pause: createPause(childAbortController.signal),
delay: createDelay(childAbortController.signal),
signal: childAbortController.signal,
})) as T
validateActive(childAbortController.signal)
return result
},
() => abortControllerWithReason(childAbortController, taskCompleted),
)

if (opts?.autoJoin) {
parentBlockingPromises.push(result.catch(noop))
}

return {
result: createPause<TaskResult<T>>(parentAbortSignal)(result),
cancel() {
abortControllerWithReason(childAbortController, taskCancelled)
},
}
}
}

const createTakePattern = <S>(
startListening: AddListenerOverloads<UnsubscribeListener, S, Dispatch>,
signal: AbortSignal,
Expand Down Expand Up @@ -192,6 +146,77 @@ const createTakePattern = <S>(
catchRejection(take(predicate, timeout))) as TakePattern<S>
}

const createFork = (
parentAbortSignal: AbortSignalWithReason<unknown>,
parentBlockingPromises: Promise<any>[],
startListening: AddListenerOverloads<any>,
) => {
const linkControllers = (controller: AbortController) =>
addAbortSignalListener(parentAbortSignal, () =>
abortControllerWithReason(controller, parentAbortSignal.reason),
)

return <T>(
taskExecutor: ForkedTaskExecutor<T>,
opts?: ForkOptions,
): ForkedTask<T> => {
assertFunction(taskExecutor, 'taskExecutor')
const childAbortController = new AbortController()

linkControllers(childAbortController)

const take = createTakePattern(
startListening as AddListenerOverloads<any>,
childAbortController.signal,
)

const autoJoinPromises: Promise<any>[] = []

const result = runTask<T>(
async (): Promise<T> => {
try {
validateActive(parentAbortSignal)
validateActive(childAbortController.signal)
const result = (await taskExecutor({
condition: (
predicate: AnyListenerPredicate<any>,
timeout?: number,
) => take(predicate, timeout).then(Boolean),
take,
delay: createDelay(childAbortController.signal),
pause: createPause(childAbortController.signal),
signal: childAbortController.signal,
fork: createFork(
childAbortController.signal,
autoJoinPromises,
startListening,
),
throwIfCancelled: () => {
validateActive(childAbortController.signal)
},
})) as T
validateActive(childAbortController.signal)
return result
} finally {
await Promise.all(autoJoinPromises)
}
},
() => abortControllerWithReason(childAbortController, taskCompleted),
)

if (opts?.autoJoin) {
parentBlockingPromises.push(result.catch(noop))
}

return {
result: createPause<TaskResult<T>>(parentAbortSignal)(result),
cancel() {
abortControllerWithReason(childAbortController, taskCancelled)
},
}
}
}

const getListenerEntryPropsFrom = (options: FallbackAddListenerOptions) => {
let { type, actionCreator, matcher, predicate, effect } = options

Expand Down Expand Up @@ -408,7 +433,11 @@ export const createListenerMiddleware = <
pause: createPause<any>(internalTaskController.signal),
extra,
signal: internalTaskController.signal,
fork: createFork(internalTaskController.signal, autoJoinPromises),
fork: createFork(
internalTaskController.signal,
autoJoinPromises,
startListening,
),
unsubscribe: entry.unsubscribe,
subscribe: () => {
listenerMap.set(entry.id, entry)
Expand Down
84 changes: 73 additions & 11 deletions packages/toolkit/src/listenerMiddleware/types.ts
Expand Up @@ -51,38 +51,97 @@ export interface ConditionFunction<State> {
export type MatchFunction<T> = (v: any) => v is T

/** @public */
export interface ForkedTaskAPI {
export interface ForkedTaskAPI<State, Dispatch extends ReduxDispatch> {
/**
* Returns a promise that resolves when the input predicate returns `true` or
* rejects if the listener has been cancelled or is completed.
*
* The return value is `true` if the predicate succeeds or `false` if a timeout is provided and expires first.
*
* ### Example
*
* ```ts
* const updateBy = createAction<number>('counter/updateBy');
*
* middleware.startListening({
* actionCreator: updateBy,
* async effect(_, { condition }) {
* // wait at most 3s for `updateBy` actions.
* if(await condition(updateBy.match, 3_000)) {
* // `updateBy` has been dispatched twice in less than 3s.
* }
* }
* })
* ```
*/
condition: ConditionFunction<State>
/**
* Returns a promise that resolves when the input predicate returns `true` or
* rejects if the listener has been cancelled or is completed.
*
* The return value is the `[action, currentState, previousState]` combination that the predicate saw as arguments.
*
* The promise resolves to null if a timeout is provided and expires first,
*
* ### Example
*
* ```ts
* const updateBy = createAction<number>('counter/updateBy');
*
* middleware.startListening({
* actionCreator: updateBy,
* async effect(_, { take }) {
* const [{ payload }] = await take(updateBy.match);
* console.log(payload); // logs 5;
* }
* })
*
* store.dispatch(updateBy(5));
* ```
*/
take: TakePattern<State>
/**
* Returns a promise that resolves when `waitFor` resolves or
* rejects if the task or the parent listener has been cancelled or is completed.
*/
pause<W>(waitFor: Promise<W>): Promise<W>
/**
* An abort signal whose `aborted` property is set to `true`
* if the task execution is either aborted or completed.
* @see https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal
*/
signal: AbortSignal
/**
* Returns a promise that resolves after `timeoutMs` or
* rejects if the task or the parent listener has been cancelled or is completed.
* @param timeoutMs
*/
delay(timeoutMs: number): Promise<void>
/**
* An abort signal whose `aborted` property is set to `true`
* if the task execution is either aborted or completed.
* @see https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal
* Queues in the next microtask the execution of a task.
* @param executor
* @param options
*/
signal: AbortSignal
fork<T>(
executor: ForkedTaskExecutor<T, State, Dispatch>,
options?: ForkOptions,
): ForkedTask<T>
}

/** @public */
export interface AsyncTaskExecutor<T> {
(forkApi: ForkedTaskAPI): Promise<T>
export interface AsyncTaskExecutor<T, State, Dispatch extends ReduxDispatch> {
(forkApi: ForkedTaskAPI<State, Dispatch>): Promise<T>
}

/** @public */
export interface SyncTaskExecutor<T> {
(forkApi: ForkedTaskAPI): T
export interface SyncTaskExecutor<T, State, Dispatch extends ReduxDispatch> {
(forkApi: ForkedTaskAPI<State, Dispatch>): T
}

/** @public */
export type ForkedTaskExecutor<T> = AsyncTaskExecutor<T> | SyncTaskExecutor<T>
export type ForkedTaskExecutor<T, State, Dispatch extends ReduxDispatch> =
| AsyncTaskExecutor<T, State, Dispatch>
| SyncTaskExecutor<T, State, Dispatch>

/** @public */
export type TaskResolved<T> = {
Expand Down Expand Up @@ -257,7 +316,10 @@ export interface ListenerEffectAPI<
* @param executor
* @param options
*/
fork<T>(executor: ForkedTaskExecutor<T>, options?: ForkOptions): ForkedTask<T>
fork<T>(
executor: ForkedTaskExecutor<T, State, Dispatch>,
options?: ForkOptions,
): ForkedTask<T>
/**
* Returns a promise that resolves when `waitFor` resolves or
* rejects if the listener has been cancelled or is completed.
Expand Down

0 comments on commit 6bfbf34

Please sign in to comment.