forked from google/quiver-dart
-
Notifications
You must be signed in to change notification settings - Fork 0
/
stream_buffer.dart
184 lines (161 loc) · 5.25 KB
/
stream_buffer.dart
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
// Copyright 2013 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import 'dart:async';
/// Underflow errors happen when the socket feeding a buffer is finished while
/// there are still blocked readers. Each reader will complete with this error.
class UnderflowError extends Error {
/// The [message] describes the underflow.
UnderflowError([this.message]);
final String? message;
@override
String toString() {
if (message != null) {
return 'StreamBuffer Underflow: $message';
}
return 'StreamBuffer Underflow';
}
}
/// Allow orderly reading of elements from a datastream, such as Socket, which
/// might not receive `List<int>` bytes regular chunks.
///
/// Example usage:
///
/// StreamBuffer<int> buffer = StreamBuffer();
/// Socket.connect('127.0.0.1', 5555).then((sock) => sock.pipe(buffer));
/// buffer.read(100).then((bytes) {
/// // do something with 100 bytes;
/// });
///
/// Throws [UnderflowError] if [throwOnError] is true. Useful for unexpected
/// [Socket] disconnects.
class StreamBuffer<T> implements StreamConsumer<List<T>> {
/// Create a stream buffer with optional, soft [limit] to the amount of data
/// the buffer will hold before pausing the underlying stream. A limit of 0
/// means no buffer limits.
StreamBuffer({bool throwOnError = false, int limit = 0})
: _throwOnError = throwOnError,
_limit = limit;
int _offset = 0;
int _counter = 0; // sum(_chunks[*].length) - _offset
final List<T> _chunks = [];
final List<_ReaderInWaiting<List<T>>> _readers = [];
StreamSubscription<List<T>>? _sub;
final bool _throwOnError;
Stream<List<T>>? _currentStream;
int _limit = 0;
set limit(int limit) {
_limit = limit;
if (_sub != null) {
if (!limited || _counter < limit) {
_sub!.resume();
} else {
_sub!.pause();
}
}
}
int get limit => _limit;
bool get limited => _limit > 0;
/// The amount of unread data buffered.
int get buffered => _counter;
List<T> _consume(int size) {
var follower = 0;
var ret = List<T?>.filled(size, null);
var leftToRead = size;
while (leftToRead > 0) {
var chunk = _chunks.first;
var listCap = (chunk is List) ? chunk.length - _offset : 1;
var subsize = leftToRead > listCap ? listCap : leftToRead;
if (chunk is List) {
ret.setRange(follower, follower + subsize,
chunk.getRange(_offset, _offset + subsize).cast<T>());
} else {
ret[follower] = chunk;
}
follower += subsize;
_offset += subsize;
_counter -= subsize;
leftToRead -= subsize;
if (!(chunk is List && _offset < chunk.length)) {
_offset = 0;
_chunks.removeAt(0);
}
}
if (limited && _sub!.isPaused && _counter < limit) {
_sub!.resume();
}
return ret.cast<T>();
}
/// Read fully [size] bytes from the stream and return in the future.
///
/// Throws [ArgumentError] if size is larger than optional buffer [limit].
Future<List<T>> read(int size) {
if (limited && size > limit) {
throw ArgumentError('Cannot read $size with limit $limit');
}
// If we have enough data to consume and there are no other readers, then
// we can return immediately.
if (size <= buffered && _readers.isEmpty) {
return Future<List<T>>.value(_consume(size));
}
final completer = Completer<List<T>>();
_readers.add(_ReaderInWaiting<List<T>>(size, completer));
return completer.future;
}
@override
Future addStream(Stream<List<T>> stream) {
var lastStream = _currentStream ?? stream;
_sub?.cancel();
_currentStream = stream;
final streamDone = Completer<void>();
_sub = stream.listen((items) {
_chunks.addAll(items);
_counter += items.length;
if (limited && _counter >= limit) {
_sub!.pause();
}
while (_readers.isNotEmpty && _readers.first.size <= _counter) {
var waiting = _readers.removeAt(0);
waiting.completer.complete(_consume(waiting.size));
}
}, onDone: () {
// User is piping in a new stream
if (stream == lastStream && _throwOnError) {
_closed(UnderflowError());
}
streamDone.complete();
}, onError: (e, stack) {
_closed(e, stack);
});
return streamDone.future;
}
void _closed(e, [StackTrace? stack]) {
for (final reader in _readers) {
if (!reader.completer.isCompleted) {
reader.completer.completeError(e, stack);
}
}
_readers.clear();
}
@override
Future close() {
final Future? ret = _sub?.cancel();
_sub = null;
return ret ?? Future.value(null);
}
}
class _ReaderInWaiting<T> {
_ReaderInWaiting(this.size, this.completer);
int size;
Completer<T> completer;
}