Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consume as flow #1343

Merged
merged 3 commits into from
Jul 18, 2019
Merged

Consume as flow #1343

merged 3 commits into from
Jul 18, 2019

Conversation

elizarov
Copy link
Contributor

@elizarov elizarov commented Jul 16, 2019

  • This is a consuming conversion -- the resulting flow can be collected
    just once and the channel is closed after the first collect.
  • The implementation is made efficient (without iterators) using
    a new internal ReceiveChannel.consumeEachTo function which also ensure
    that the reference to the last emitted value is not retained.
  • AbstractChannel implementation is optimized to avoid code duplication
    in different receive methods.

@elizarov elizarov requested a review from qwwdfsad July 16, 2019 09:08
@elizarov elizarov changed the base branch from master to develop July 16, 2019 09:08
Copy link
Contributor

@qwwdfsad qwwdfsad left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise looks good, please squash commits if you want, fix comments and I will merge it as soon as it's ready

kotlinx-coroutines-core/common/src/channels/Channel.kt Outdated Show resolved Hide resolved
kotlinx-coroutines-core/common/src/channels/Channel.kt Outdated Show resolved Hide resolved
* 1) Flow consumer is cancelled when the original channel is cancelled.
* 2) Flow consumer completes normally when the original channel completes (~is closed) normally.
* 3) If the flow consumer fails with an exception, channel is cancelled.
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra indent

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line 74

kotlinx-coroutines-core/common/src/flow/Channels.kt Outdated Show resolved Hide resolved
@elizarov elizarov force-pushed the consume-as-flow branch 4 times, most recently from d5e12d2 to e54272b Compare July 18, 2019 00:15
@elizarov
Copy link
Contributor Author

elizarov commented Jul 18, 2019

I've squashed and rearranged all the old commit into two logically grouped commits, fixing all the review issues:

  1. Channel.receiveOrNull becomes extension, internal receiveOrClosed added combines all the improvement to channels machinery and introduces internal receiveOrClosed. It combines all commits pulled in from Implement receiveOrClosed #762 and adds a few minor cleanups that were previously done as a part of flow work.
  2. Introduce ReceiveChannel.consumeAsFlow and FlowCollector.emitAll(chan) implements consumeAsFlow using those channel improvements.

I've added a 3rd (new) commit that makes consumeAsFlow fuseable with all the channel-using flow operators so it avoids creation of an extra channel when possible and unless explicitly requested. Please, take a look.

qwwdfsad and others added 3 commits July 17, 2019 17:31
* The corresponding ReceiveChannel methods are deprecated.
* Introduced corresponding extensions with the same semantic and generic
  Any bound.
* Introduce internal ReceiveChannel.[on]receiveOrClosed
  * Using internal inline class ValueOrClosed.
  * To be stabilized and made public in the future when inline classes
    ABI stabilizes.
  * It is related to #330 but does not resolve it yet.
* Includes todos for future public ValueOrClose design.
* Simplify AbstractChannel select implementations.
* AbstractChannel implementation is optimized to avoid code
  duplication in suspension of different receive methods:
  receive, receiveOrNull, receiveOrClosed.
* This is a consuming conversion -- the resulting flow can be collected
  just once and the channel is closed after the first collect.
* The implementation is made efficient via emitAll extension.
* Experimental FlowCollector.emitAll extension is introduced.
* It is based on the (internal) Channel.receiveOrClose
  and ensures that the reference to the last emitted value is
  not retained (does not leak).

Fixes #1340
Fixes #1333
Copy link
Contributor

@qwwdfsad qwwdfsad left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants