Skip to content

Commit

Permalink
Merge pull request #78 from cloudflare/harris/revert-streams-tee-refa…
Browse files Browse the repository at this point in the history
…ctor

Revert recent streams backpressure tee refactor
  • Loading branch information
harrishancock committed Oct 6, 2022
2 parents dce3fb4 + 7a7a950 commit 4815871
Show file tree
Hide file tree
Showing 12 changed files with 2,481 additions and 1,769 deletions.
34 changes: 10 additions & 24 deletions src/workerd/api/streams/README.md
Expand Up @@ -240,25 +240,16 @@ completely independent of any of the underlying source algorithms.
The `ReadableStream` API has a method `tee()` that will split the flow of data from the
`ReadableStream` into two separate `ReadableStream` instances.

In the standard definition of the `ReadableStream` API, the `tee()` method creates two
separate `ReadableStream` instances (called "branches") that share a single `Reader` that
consumes the data from the original `ReadableStream` (let's call it the "trunk"). When one
of the two branches uses the shared `Reader` to pull data from the trunk, that data is
used to fulfill the read request from the pulling branch, and a copy of the data is pushed
into a queue in the other branch. That copied data accumulates in memory until something
starts reading from it.

This spec defined behavior presents a problem for us in that it is possible for one branch
to consume data at a far greater pace than the other, causing the slower branch to accumulate
data in memory without any backpressure controls.

In our implementation, we have modified the `tee()` method implementation to avoid this
issue.

Each branch maintains it's own data buffer. But instead of those buffers containing a
copy of the data, they contain a collection of refcounted references to the data. The
backpressure signaling to the trunk is based on the branch wait the most unconsumed data
in its buffer.
What happens here is that ownership of the underlying ***controller*** of the original
`ReadableStream` is passed off to something called the ***Tee Adapter***. The adapter
maintains a collection of ***Tee Branches***. Each branch is a separate `ReadableStream`
maintaining its own queue of available data and pending reads. When the pull algorithm
pushes data into the the underlying ***controller***, the adapter pushes that data to
the internal queues of each of the attached branches. From there, reading from the branch
streams is the same as reading from a regular `ReadableStream` -- that is, when `read()`
is called, if there is data in the internal queue, the read is fulfilled immediately,
otherwise the branch will tell the adapter that it needs data to be provided to fulfill
the pending read.

```
+----------------+
Expand All @@ -285,11 +276,6 @@ in its buffer.
```

Unfortunately, with this model, we cannot completely avoid the possibility of one branch
reading much slower than the other but we do prevent the memory pileup that would otherwise
occur *so long as the underlying source of the `ReadableStream` is paying proper attention to
the backpressure signaling mechanisms*.

## Data-flow in an Internal ReadableStream

For ***Internal*** streams the implementation is quite different and it is important to
Expand Down
10 changes: 4 additions & 6 deletions src/workerd/api/streams/internal.c++
Expand Up @@ -551,20 +551,18 @@ ReadableStreamController::Tee ReadableStreamInternalController::tee(jsg::Lock& j
// Create two closed ReadableStreams.
return Tee {
.branch1 =
jsg::alloc<ReadableStream>(kj::heap<ReadableStreamInternalController>(closed)),
jsg::alloc<ReadableStream>(ReadableStreamInternalController(closed)),
.branch2 =
jsg::alloc<ReadableStream>(kj::heap<ReadableStreamInternalController>(closed)),
jsg::alloc<ReadableStream>(ReadableStreamInternalController(closed)),
};
}
KJ_CASE_ONEOF(errored, StreamStates::Errored) {
// Create two errored ReadableStreams.
return Tee {
.branch1 =
jsg::alloc<ReadableStream>(kj::heap<ReadableStreamInternalController>(
errored.addRef(js))),
jsg::alloc<ReadableStream>(ReadableStreamInternalController(errored.addRef(js))),
.branch2 =
jsg::alloc<ReadableStream>(kj::heap<ReadableStreamInternalController>(
errored.addRef(js))),
jsg::alloc<ReadableStream>(ReadableStreamInternalController(errored.addRef(js))),
};
}
KJ_CASE_ONEOF(readable, Readable) {
Expand Down
5 changes: 2 additions & 3 deletions src/workerd/api/streams/internal.h
Expand Up @@ -122,9 +122,8 @@ class ReadableStreamInternalController: public ReadableStreamController {
explicit ReadableStreamInternalController(Readable readable)
: state(kj::mv(readable)) {}

KJ_DISALLOW_COPY(ReadableStreamInternalController);
ReadableStreamInternalController(ReadableStreamInternalController&& other) = delete;
ReadableStreamInternalController& operator=(ReadableStreamInternalController&& other) = delete;
ReadableStreamInternalController(ReadableStreamInternalController&& other) = default;
ReadableStreamInternalController& operator=(ReadableStreamInternalController&& other) = default;

~ReadableStreamInternalController() noexcept(false) override;

Expand Down
8 changes: 4 additions & 4 deletions src/workerd/api/streams/queue-test.c++
Expand Up @@ -802,7 +802,7 @@ KJ_TEST("ByteQueue with multiple byob consumers (multi-reads)") {
// there should only be two actual BYOB requests
// processed by the queue, which will fulfill all four
// reads.
MustCall<void(ByteQueue::ByobRequest&)> respond([&](jsg::Lock&, auto& pending) {
MustCall<void(ByteQueue::ByobReadRequest&)> respond([&](jsg::Lock&, auto& pending) {
static uint counter = 0;
auto& req = pending.getRequest();
auto ptr = req.pullInto.store.asArrayPtr().begin();
Expand All @@ -812,7 +812,7 @@ KJ_TEST("ByteQueue with multiple byob consumers (multi-reads)") {
KJ_ASSERT(pending.isInvalidated());
}, 2);

kj::Maybe<kj::Own<ByteQueue::ByobRequest>> pendingByob;
kj::Maybe<kj::Own<ByteQueue::ByobReadRequest>> pendingByob;
while ((pendingByob = queue.nextPendingByobReadRequest()) != nullptr) {
auto& pending = KJ_ASSERT_NONNULL(pendingByob);
if (pending->isInvalidated()) {
Expand Down Expand Up @@ -884,7 +884,7 @@ KJ_TEST("ByteQueue with multiple byob consumers (multi-reads, 2)") {
// there should only be two actual BYOB requests
// processed by the queue, which will fulfill all four
// reads.
MustCall<void(ByteQueue::ByobRequest&)> respond([&](jsg::Lock&, auto& pending) {
MustCall<void(ByteQueue::ByobReadRequest&)> respond([&](jsg::Lock&, auto& pending) {
static uint counter = 0;
auto& req = pending.getRequest();
auto ptr = req.pullInto.store.asArrayPtr().begin();
Expand All @@ -894,7 +894,7 @@ KJ_TEST("ByteQueue with multiple byob consumers (multi-reads, 2)") {
KJ_ASSERT(pending.isInvalidated());
}, 2);

kj::Maybe<kj::Own<ByteQueue::ByobRequest>> pendingByob;
kj::Maybe<kj::Own<ByteQueue::ByobReadRequest>> pendingByob;
while ((pendingByob = queue.nextPendingByobReadRequest()) != nullptr) {
auto& pending = KJ_ASSERT_NONNULL(pendingByob);
if (pending->isInvalidated()) {
Expand Down

0 comments on commit 4815871

Please sign in to comment.