Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(streams): concatReadableStreams() #4747

Merged
merged 10 commits into from
May 20, 2024

Conversation

BlackAsLight
Copy link
Contributor

Implements #4500

This implementation broadens the scope of concatenating streams to concatenating anything that implements the Symbol.asyncIterator or Symbol.iterator. The reason for this is that the only difference needed to support the broader scope compared to just concatenating ReadableStreams exists in TypeScript alone.

This implementation allows it to be used in two ways, either;

  1. providing the "array" of "streams" as an argument new ConcatStreams(streams); or
  2. using it within a .pipeThrough method

When the static method ReadableStream.from is supported more widely in the browser, this code can be simplified by several lines.

Copy link

codecov bot commented May 16, 2024

Codecov Report

Attention: Patch coverage is 84.00000% with 4 lines in your changes are missing coverage. Please review.

Project coverage is 91.45%. Comparing base (aa35b35) to head (6948d9f).
Report is 27 commits behind head on main.

Files Patch % Lines
streams/concat_readable_streams.ts 84.00% 4 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #4747      +/-   ##
==========================================
- Coverage   91.89%   91.45%   -0.44%     
==========================================
  Files         484      486       +2     
  Lines       41296    41340      +44     
  Branches     5319     5288      -31     
==========================================
- Hits        37947    37807     -140     
- Misses       3292     3474     +182     
- Partials       57       59       +2     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@kt3k
Copy link
Member

kt3k commented May 16, 2024

I'm not sure it's good idea to implement this as TransformStream. No Deno API or Web API returns stream of streams or stream of iterables.

The original comment of #4500 suggests a function which concatenate the streams in an array. That design makes more sense to me.

- Converted ConcatStream from a TransformStream into a ReadableStream, also now with proper cleaning up if the `.cancel()` method is called.
@BlackAsLight
Copy link
Contributor Author

I'm not sure it's good idea to implement this as TransformStream. No Deno API or Web API returns stream of streams or stream of iterables.

The original comment of #4500 suggests a function which concatenate the streams in an array. That design makes more sense to me.

I changed it up to a ReadableStream constructor instead. Taking in an iterable, like an array, of readable streams and concatenating them on a pulling method.

There is also extra code to preform a clean up if the .cancel() method is called, which will be iterating through the rest of the array and calling cancel on each readable stream.

@BlackAsLight
Copy link
Contributor Author

Example:

const streams = new Array(10).fill(0).map((_x, i) =>
  ReadableStream.from(function* () {
    for (let j = i * 10; j < i * 10 + 10; ++j) {
      yield j;
    }
  }())
);

console.log(await Array.fromAsync(new ConcatStreams(streams)));
[
   0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11,
  12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23,
  24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35,
  36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47,
  48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59,
  60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71,
  72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83,
  84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95,
  96, 97, 98, 99
]

@iuioiua
Copy link
Collaborator

iuioiua commented May 19, 2024

I believe Yoshiya meant that he thought this should be a function, like mergeReadableStream() and zipReadableStreams(). We only implement classes when a function won't suffice.

@crowlKats, able to take a look at this?

@BlackAsLight
Copy link
Contributor Author

BlackAsLight commented May 19, 2024

Oh okay, converting it to a function would be quite simple. Essentially just:

function concatStreams<T>(streams: ...) {
  const gen = ...
  let lock = false
  return new ReadableStream<T>({
    ... 
  })
}

I can do this if you'd like

@iuioiua
Copy link
Collaborator

iuioiua commented May 19, 2024

Sounds good 👍🏾

@BlackAsLight BlackAsLight changed the title feat(streams): new ConcatStreams() feat(streams): concatStreams(streams) May 19, 2024
Copy link
Collaborator

@iuioiua iuioiua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might using for await...of in the implementation make things a little simpler and clearer?

streams/concat_streams.ts Outdated Show resolved Hide resolved
streams/concat_streams.ts Outdated Show resolved Hide resolved
streams/concat_streams.ts Outdated Show resolved Hide resolved
streams/concat_streams.ts Outdated Show resolved Hide resolved
streams/concat_streams.ts Outdated Show resolved Hide resolved
streams/concat_streams.ts Outdated Show resolved Hide resolved
streams/concat_streams.ts Outdated Show resolved Hide resolved
streams/concat_streams_test.ts Outdated Show resolved Hide resolved
streams/concat_streams.ts Outdated Show resolved Hide resolved
@iuioiua iuioiua changed the title feat(streams): concatStreams(streams) feat(streams): concatStreams() May 19, 2024
streams/concat_streams.ts Outdated Show resolved Hide resolved
Copy link
Collaborator

@iuioiua iuioiua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thought, let's make this more consistent with other similar APIs - mergeReadableStreams() and zipReadableStreams(). Can you please rename this to concatReadableStreams() and make it accept ...streams: ReadableStream<T>[]?

While the initial issue requested it accepts AsyncIterable<ReadableStream<T>>, handling arrays may be sufficient for most, if not almost all, use cases. If there's sufficient demand for async iterators in the future, we can extend the function without breaking changes.

@crowlKats
Copy link
Member

crowlKats commented May 19, 2024

here is a completely alternative implementation to match the above comment:

export function concatReadableStreams<T>(
  ...streams: ReadableStream<T>[]
): ReadableStream<T> {
  let currentStream = 0;

  return new ReadableStream<T>({
    async pull(controller) {
      const stream = streams[currentStream];
      const reader = stream.getReader();
      try {
        const read = await reader.read();

        if (read.done) {
          currentStream++;
          if (streams.length == currentStream) {
            controller.close();
          } else {
            await this.pull(controller);
          }
        } else {
          controller.enqueue(read.value);
        }
      } catch (e) {
        controller.error(e);
      }

      reader.releaseLock();
    },
  });
}

@BlackAsLight
Copy link
Contributor Author

} else {
  await this.pull(controller);
}

I don't think this line is needed as you're essentially forcing the contents of all the streams inside the queue regardless if the queue is being emptied or not. Essentially turning a pulling method into a pushing one.

@crowlKats
Copy link
Member

crowlKats commented May 19, 2024

@BlackAsLight that line is necessary because if a pull is called but the read of the current internal stream is done, we would be enqueueing nothing, which isn't valid. That call just calls pull again to get the next value; it doesn't turn into a pushing one

@BlackAsLight
Copy link
Contributor Author

@BlackAsLight that line is necessary because if a pull is called but the read of the current internal stream is done, we would be enqueueing nothing, which isn't valid. That call just calls pull again to get the next value; it doesn't turn into a pushing one

Oh ya, you're right. I was thinking about it wrong

@iuioiua iuioiua changed the title feat(streams): concatStreams() feat(streams): concatReadableStreams() May 20, 2024
Copy link
Collaborator

@iuioiua iuioiua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thank you. Looking nice now 👍🏾

@kt3k, please review.

Copy link
Member

@kt3k kt3k left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Co-authored-by: crowlKats <crowlkats@toaxl.com>
@iuioiua iuioiua enabled auto-merge (squash) May 20, 2024 22:19
@iuioiua iuioiua merged commit 39c2a4c into denoland:main May 20, 2024
12 checks passed
@BlackAsLight BlackAsLight deleted the concat_streams branch May 24, 2024 05:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants