Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixup streams bugs #85

Merged
merged 6 commits into from Oct 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
34 changes: 24 additions & 10 deletions src/workerd/api/streams/README.md
Expand Up @@ -240,16 +240,25 @@ completely independent of any of the underlying source algorithms.
The `ReadableStream` API has a method `tee()` that will split the flow of data from the
`ReadableStream` into two separate `ReadableStream` instances.

What happens here is that ownership of the underlying ***controller*** of the original
`ReadableStream` is passed off to something called the ***Tee Adapter***. The adapter
maintains a collection of ***Tee Branches***. Each branch is a separate `ReadableStream`
maintaining its own queue of available data and pending reads. When the pull algorithm
pushes data into the the underlying ***controller***, the adapter pushes that data to
the internal queues of each of the attached branches. From there, reading from the branch
streams is the same as reading from a regular `ReadableStream` -- that is, when `read()`
is called, if there is data in the internal queue, the read is fulfilled immediately,
otherwise the branch will tell the adapter that it needs data to be provided to fulfill
the pending read.
In the standard definition of the `ReadableStream` API, the `tee()` method creates two
separate `ReadableStream` instances (called "branches") that share a single `Reader` that
consumes the data from the original `ReadableStream` (let's call it the "trunk"). When one
of the two branches uses the shared `Reader` to pull data from the trunk, that data is
used to fulfill the read request from the pulling branch, and a copy of the data is pushed
into a queue in the other branch. That copied data accumulates in memory until something
starts reading from it.

This spec defined behavior presents a problem for us in that it is possible for one branch
to consume data at a far greater pace than the other, causing the slower branch to accumulate
data in memory without any backpressure controls.

In our implementation, we have modified the `tee()` method implementation to avoid this
issue.

Each branch maintains it's own data buffer. But instead of those buffers containing a
copy of the data, they contain a collection of refcounted references to the data. The
backpressure signaling to the trunk is based on the branch wait the most unconsumed data
in its buffer.

```
+----------------+
Expand All @@ -276,6 +285,11 @@ the pending read.

```

Unfortunately, with this model, we cannot completely avoid the possibility of one branch
reading much slower than the other but we do prevent the memory pileup that would otherwise
occur *so long as the underlying source of the `ReadableStream` is paying proper attention to
the backpressure signaling mechanisms*.

## Data-flow in an Internal ReadableStream

For ***Internal*** streams the implementation is quite different and it is important to
Expand Down
15 changes: 10 additions & 5 deletions src/workerd/api/streams/internal.c++
Expand Up @@ -403,7 +403,10 @@ kj::Maybe<jsg::Promise<ReadResult>> ReadableStreamInternalController::read(
store = v8::ArrayBuffer::NewBackingStore(js.v8Isolate, byteLength);
}

auto bytes = kj::arrayPtr(static_cast<kj::byte*>(store->Data()), byteOffset + byteLength);
KJ_ASSERT(store->ByteLength() == byteOffset + byteLength);

auto ptr = static_cast<kj::byte*>(store->Data());
auto bytes = kj::arrayPtr(ptr + byteOffset, byteLength);
disturbed = true;

KJ_SWITCH_ONEOF(state) {
Expand Down Expand Up @@ -551,18 +554,20 @@ ReadableStreamController::Tee ReadableStreamInternalController::tee(jsg::Lock& j
// Create two closed ReadableStreams.
return Tee {
.branch1 =
jsg::alloc<ReadableStream>(ReadableStreamInternalController(closed)),
jsg::alloc<ReadableStream>(kj::heap<ReadableStreamInternalController>(closed)),
.branch2 =
jsg::alloc<ReadableStream>(ReadableStreamInternalController(closed)),
jsg::alloc<ReadableStream>(kj::heap<ReadableStreamInternalController>(closed)),
};
}
KJ_CASE_ONEOF(errored, StreamStates::Errored) {
// Create two errored ReadableStreams.
return Tee {
.branch1 =
jsg::alloc<ReadableStream>(ReadableStreamInternalController(errored.addRef(js))),
jsg::alloc<ReadableStream>(kj::heap<ReadableStreamInternalController>(
errored.addRef(js))),
.branch2 =
jsg::alloc<ReadableStream>(ReadableStreamInternalController(errored.addRef(js))),
jsg::alloc<ReadableStream>(kj::heap<ReadableStreamInternalController>(
errored.addRef(js))),
};
}
KJ_CASE_ONEOF(readable, Readable) {
Expand Down
5 changes: 3 additions & 2 deletions src/workerd/api/streams/internal.h
Expand Up @@ -122,8 +122,9 @@ class ReadableStreamInternalController: public ReadableStreamController {
explicit ReadableStreamInternalController(Readable readable)
: state(kj::mv(readable)) {}

ReadableStreamInternalController(ReadableStreamInternalController&& other) = default;
ReadableStreamInternalController& operator=(ReadableStreamInternalController&& other) = default;
KJ_DISALLOW_COPY(ReadableStreamInternalController);
ReadableStreamInternalController(ReadableStreamInternalController&& other) = delete;
ReadableStreamInternalController& operator=(ReadableStreamInternalController&& other) = delete;

~ReadableStreamInternalController() noexcept(false) override;

Expand Down
8 changes: 4 additions & 4 deletions src/workerd/api/streams/queue-test.c++
Expand Up @@ -802,7 +802,7 @@ KJ_TEST("ByteQueue with multiple byob consumers (multi-reads)") {
// there should only be two actual BYOB requests
// processed by the queue, which will fulfill all four
// reads.
MustCall<void(ByteQueue::ByobReadRequest&)> respond([&](jsg::Lock&, auto& pending) {
MustCall<void(ByteQueue::ByobRequest&)> respond([&](jsg::Lock&, auto& pending) {
static uint counter = 0;
auto& req = pending.getRequest();
auto ptr = req.pullInto.store.asArrayPtr().begin();
Expand All @@ -812,7 +812,7 @@ KJ_TEST("ByteQueue with multiple byob consumers (multi-reads)") {
KJ_ASSERT(pending.isInvalidated());
}, 2);

kj::Maybe<kj::Own<ByteQueue::ByobReadRequest>> pendingByob;
kj::Maybe<kj::Own<ByteQueue::ByobRequest>> pendingByob;
while ((pendingByob = queue.nextPendingByobReadRequest()) != nullptr) {
auto& pending = KJ_ASSERT_NONNULL(pendingByob);
if (pending->isInvalidated()) {
Expand Down Expand Up @@ -884,7 +884,7 @@ KJ_TEST("ByteQueue with multiple byob consumers (multi-reads, 2)") {
// there should only be two actual BYOB requests
// processed by the queue, which will fulfill all four
// reads.
MustCall<void(ByteQueue::ByobReadRequest&)> respond([&](jsg::Lock&, auto& pending) {
MustCall<void(ByteQueue::ByobRequest&)> respond([&](jsg::Lock&, auto& pending) {
static uint counter = 0;
auto& req = pending.getRequest();
auto ptr = req.pullInto.store.asArrayPtr().begin();
Expand All @@ -894,7 +894,7 @@ KJ_TEST("ByteQueue with multiple byob consumers (multi-reads, 2)") {
KJ_ASSERT(pending.isInvalidated());
}, 2);

kj::Maybe<kj::Own<ByteQueue::ByobReadRequest>> pendingByob;
kj::Maybe<kj::Own<ByteQueue::ByobRequest>> pendingByob;
while ((pendingByob = queue.nextPendingByobReadRequest()) != nullptr) {
auto& pending = KJ_ASSERT_NONNULL(pendingByob);
if (pending->isInvalidated()) {
Expand Down