forked from google/quiver-dart
-
Notifications
You must be signed in to change notification settings - Fork 0
/
concat.dart
84 lines (75 loc) · 2.85 KB
/
concat.dart
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
// Copyright 2013 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import 'dart:async';
/// Returns the concatenation of the input streams.
///
/// When the returned stream is listened to, the [streams] are iterated through
/// asynchronously, forwarding all events (both data and error) for the current
/// stream to the returned stream before advancing the iterator and listening
/// to the next stream. If advancing the iterator throws an error, the
/// returned stream ends immediately with that error.
///
/// Pausing and resuming the returned stream's subscriptions will pause and
/// resume the subscription of the current stream being listened to.
///
/// Note: Events from pre-existing broadcast streams which occur before the
/// stream is reached by the iteration will be dropped.
///
/// Example:
///
/// concat(files.map((file) =>
/// file.openRead().transform(const LineSplitter())))
Stream<T> concat<T>(Iterable<Stream<T>> streams) => _ConcatStream(streams);
class _ConcatStream<T> extends Stream<T> {
_ConcatStream(Iterable<Stream<T>> streams) : _streams = streams;
final Iterable<Stream<T>> _streams;
@override
StreamSubscription<T> listen(void Function(T data)? onData,
{Function? onError, void Function()? onDone, bool? cancelOnError}) {
cancelOnError = true == cancelOnError;
StreamSubscription<T>? currentSubscription;
StreamController<T>? controller;
final iterator = _streams.iterator;
void nextStream(StreamController<T> controller) {
late final bool hasNext;
try {
hasNext = iterator.moveNext();
} catch (e, s) {
controller.addError(e, s);
controller.close();
return;
}
if (hasNext) {
currentSubscription = iterator.current.listen(controller.add,
onError: controller.addError,
onDone: () => nextStream(controller),
cancelOnError: cancelOnError);
} else {
controller.close();
}
}
controller = StreamController<T>(
onPause: () {
currentSubscription?.pause();
},
onResume: () {
currentSubscription?.resume();
},
onCancel: () => currentSubscription?.cancel(),
);
nextStream(controller);
return controller.stream.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}
}