forked from ReactiveX/rxjs
/
mergeInternals.ts
149 lines (139 loc) · 5.91 KB
/
mergeInternals.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import { Observable } from '../Observable';
import { innerFrom } from '../observable/from';
import { Subscriber } from '../Subscriber';
import { ObservableInput, SchedulerLike } from '../types';
import { executeSchedule } from '../util/executeSchedule';
import { OperatorSubscriber } from './OperatorSubscriber';
/**
* A process embodying the general "merge" strategy. This is used in
* `mergeMap` and `mergeScan` because the logic is otherwise nearly identical.
* @param source The original source observable
* @param subscriber The consumer subscriber
* @param project The projection function to get our inner sources
* @param concurrent The number of concurrent inner subscriptions
* @param onBeforeNext Additional logic to apply before nexting to our consumer
* @param expand If `true` this will perform an "expand" strategy, which differs only
* in that it recurses, and the inner subscription must be schedule-able.
* @param innerSubScheduler A scheduler to use to schedule inner subscriptions,
* this is to support the expand strategy, mostly, and should be deprecated
*/
export function mergeInternals<T, R>(
source: Observable<T>,
subscriber: Subscriber<R>,
project: (value: T, index: number) => ObservableInput<R>,
concurrent: number,
onBeforeNext?: (innerValue: R) => void,
expand?: boolean,
innerSubScheduler?: SchedulerLike,
additionalTeardown?: () => void
) {
// Buffered values, in the event of going over our concurrency limit
const buffer: T[] = [];
// The number of active inner subscriptions.
let active = 0;
// An index to pass to our accumulator function
let index = 0;
// Whether or not the outer source has completed.
let isComplete = false;
/**
* Checks to see if we can complete our result or not.
*/
const checkComplete = () => {
// If the outer has completed, and nothing is left in the buffer,
// and we don't have any active inner subscriptions, then we can
// Emit the state and complete.
if (isComplete && !buffer.length && !active) {
subscriber.complete();
}
};
// If we're under our concurrency limit, just start the inner subscription, otherwise buffer and wait.
const outerNext = (value: T) => (active < concurrent ? doInnerSub(value) : buffer.push(value));
const doInnerSub = (value: T) => {
// If we're expanding, we need to emit the outer values and the inner values
// as the inners will "become outers" in a way as they are recursively fed
// back to the projection mechanism.
expand && subscriber.next(value as any);
// Increment the number of active subscriptions so we can track it
// against our concurrency limit later.
active++;
// A flag used to show that the inner observable completed.
// This is checked during finalization to see if we should
// move to the next item in the buffer, if there is on.
let innerComplete = false;
// Start our inner subscription.
innerFrom(project(value, index++)).subscribe(
new OperatorSubscriber(
subscriber,
(innerValue) => {
// `mergeScan` has additional handling here. For example
// taking the inner value and updating state.
onBeforeNext?.(innerValue);
if (expand) {
// If we're expanding, then just recurse back to our outer
// handler. It will emit the value first thing.
outerNext(innerValue as any);
} else {
// Otherwise, emit the inner value.
subscriber.next(innerValue);
}
},
() => {
// Flag that we have completed, so we know to check the buffer
// during finalization.
innerComplete = true;
},
// Errors are passed to the destination.
undefined,
() => {
// During finalization, if the inner completed (it wasn't errored or
// cancelled), then we want to try the next item in the buffer if
// there is one.
if (innerComplete) {
// We have to wrap this in a try/catch because it happens during
// finalization, possibly asynchronously, and we want to pass
// any errors that happen (like in a projection function) to
// the outer Subscriber.
try {
// INNER SOURCE COMPLETE
// Decrement the active count to ensure that the next time
// we try to call `doInnerSub`, the number is accurate.
active--;
// If we have more values in the buffer, try to process those
// Note that this call will increment `active` ahead of the
// next conditional, if there were any more inner subscriptions
// to start.
while (buffer.length && active < concurrent) {
const bufferedValue = buffer.shift()!;
// Particularly for `expand`, we need to check to see if a scheduler was provided
// for when we want to start our inner subscription. Otherwise, we just start
// are next inner subscription.
if (innerSubScheduler) {
executeSchedule(subscriber, innerSubScheduler, () => doInnerSub(bufferedValue));
} else {
doInnerSub(bufferedValue);
}
}
// Check to see if we can complete, and complete if so.
checkComplete();
} catch (err) {
subscriber.error(err);
}
}
}
)
);
};
// Subscribe to our source observable.
source.subscribe(
new OperatorSubscriber(subscriber, outerNext, () => {
// Outer completed, make a note of it, and check to see if we can complete everything.
isComplete = true;
checkComplete();
})
);
// Additional teardown (for when the destination is torn down).
// Other teardown is added implicitly via subscription above.
return () => {
additionalTeardown?.();
};
}