Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make ConflatedBroadcastChannel allocation-free (#395) #402

Closed
wants to merge 1 commit into from

Conversation

fvasco
Copy link
Contributor

@fvasco fvasco commented Jun 16, 2018

openSubscription, offer and send are O(n) where n is subscriptions count.
All synchronized block are allocation free.
Closed subscriptions are not eligible to GC, a invocation to openSubscription, offer, send, cancel or close is required.

Fixes #395

@fvasco
Copy link
Contributor Author

fvasco commented Jun 16, 2018

Some issue with UNCONFINED dispatcher.

I will use a copy-on-write subscriptions list.

@fvasco fvasco force-pushed the conflated-broadcast-channel branch 3 times, most recently from f0f19f8 to 345aa68 Compare June 17, 2018 14:44
@qwwdfsad
Copy link
Member

I haven't carefully read your solution yet, so can't say whether your approach works in general, but here are a few pointers anyway:

  1. Binary compatibility is broken (PublicApiTest fails)
  2. Stress tests start to fail which indicates that your solution has some problems
  3. Any non-trivial performance improvement should be paired with the corresponding benchmark which shows pros and cons of the proposed solution. It's not necessary to have benchmarks in PR, but at least gist with reproducible results
    It should both indicate that allocation pressure is reduced, read-write throughput it the same and subscription rate has the same order of magnitude
  4. 0x3a28213a

@fvasco
Copy link
Contributor Author

fvasco commented Jun 18, 2018

Hi @qwwdfsad

  1. Should miss
public static final field CLOSED Lkotlinx/coroutines/experimental/channels/ConflatedBroadcastChannel$Closed;
public static final field INITIAL_STATE Lkotlinx/coroutines/experimental/channels/ConflatedBroadcastChannel$State;

can you confirm? Should them be added?

  1. I cannot reproduce, can you provide more details?

  2. Have you a preferred way for this measurations?

@fvasco fvasco force-pushed the conflated-broadcast-channel branch from 345aa68 to d8f2a44 Compare June 18, 2018 02:56
@qwwdfsad
Copy link
Member

  1. It looks like a problem in our compatibility validator, because these fields shouldn't be reported, I'll investigate it separately

  2. BroadcastChannelSubStressTest#testStress fails for conflated channel with "java.lang.IllegalStateException: Send stalled at 226 events".
    ConflatedBroadcastChannelNotifyStressTest#testStressNotify fails with

--- ConflatedBroadcastChannelNotifyStressTest
!!! Test timed out kotlinx.coroutines.experimental.channels.ClosedSendChannelException: Channel was closed
Tested with nSenders=2, nReceivers=3
Completed successfully 0 sender coroutines
Completed successfully 3 receiver coroutines
                  Sent 212 events
              Received 11 events

java.lang.AssertionError: 
Expected: <2> but: was <0>
	at kotlinx.coroutines.experimental.channels.ConflatedBroadcastChannelNotifyStressTest$testStressNotify$1.doResume(ConflatedBroadcastChannelNotifyStressTest.kt:92)

On 8-cores i7.

  1. JMH benchmark. It has -prof gc for allocation rate.

@qwwdfsad
Copy link
Member

qwwdfsad commented Jun 18, 2018

Binary compatibility issue can be ignored, please delete these lines from kotlinx-coroutines-core.txt.

General issues:

  1. ConflatedChannel contract is broken, offer now can return false
  2. ConflatedChannel contract is broken, offer doesn't throw an exception if the channel is closed
  3. Subscription removal via onClosed is bugged: onClose is invoked after receiving channel is closed, thus forcing ConflatedBroadcastChannel#offer to fail between these two events. Subscription should be firstly removed and only then completed with close-token.
  4. Binary compatibility for old code is broken: Subscription no longer implements SubscriptionReceiveChannel

Observations:
It's not a good idea to make channel fully synchronized:
Only offerInternal can obtain the lock and use if (!tryLock) return pattern to avoid unnecessary contention.
If subscription list will be managed via CoW (note: we have SubscribersList in common module), then lock on all read/subscriptions paths can be avoided.
Please do not reorder fields/methods if possible, it makes the review a bit harder.

Also note, that it doesn't fixe #395 completely, it just eliminates allocation of State on sender path

@fvasco
Copy link
Contributor Author

fvasco commented Jun 18, 2018

Hi @qwwdfsad
thanks for your review.

I found the ReentrantLock and I am trying to apply your suggestions.

However I have some doubts, can I share these?

close method must obtain lock because we cannot loose close invocation.

offerInternal can use tryLock because we can merge concurrent offers.

If offerInternal uses tryLock then openSubscription cannot use the lock, we cannot cancel offer during a subscription.

So openSubscription have to (happy path):

  1. create the subscription
  2. add the subscription in the _subscriptions list
  3. get the current value
  4. send the current value

Unfortunately between step 3 and 4 an entire offer/close are possible, so we read the old value in the step 3, the offer/close put the new value in the subscription and then we send the old value in the step 4 (or get exception).

Have you some other suggestion?

@fvasco fvasco force-pushed the conflated-broadcast-channel branch from d8f2a44 to fbbf196 Compare June 19, 2018 20:48
@fvasco
Copy link
Contributor Author

fvasco commented Jun 19, 2018

Hi @qwwdfsad
I made some changes, please review the code, especially the while loop inside openSubscription.
Thank you

@qwwdfsad
Copy link
Member

Hi @fvasco, sorry for the delay.

While reviewing your solution I've found that current (from develop branch) implementation is not linearizable which is likely a bug. Trivial fix is to protect every operation by a lock, but spinlock is too CPU-intensive (especially for large volumes of subscribers), while ReentrantLock has large memory footprint compared with the single-element channel.

I need some time to think about the importance of linearizability here and possible solutions and depending on that modify your PR, but unfortunately it's not in my major priority list right now (spoiler alert: native coroutines are incoming), so I should postpone this PR for a while, sorry :(

@fvasco
Copy link
Contributor Author

fvasco commented Jun 23, 2018

Hi @qwwdfsad
I tried to implement a different solution, unfortunately I lost myself on deprecated/undocumented code.
My idea is to avoid offerInternal loop using a Subscriber.initialize(value: E) method, this method should set-up the initial value for ConflatedChannel or return immediately.

Using this new method we can define

    public override fun openSubscription(): ReceiveChannel<E> {
        val subscriber = Subscriber()
        _subscribers.add(subscriber)

        val state = _state
        @Suppress("UNCHECKED_CAST")
        when {
            state is Closed -> subscriber.close(state.closeCause)
            state !== UNDEFINED -> subscriber.initialize(state as E)
        }

        if (subscriber.isClosedForSend) _subscribers.remove(subscriber)
        return subscriber
    }

Using the AbstractSendChannel.queue's CAS it is possible to initialize the channel without replacing the current offered value.

@qwwdfsad
Copy link
Member

Hi,
We have done some changes in project layout and channels, could you please rebase your branch on top of develop?

Also, please add ConflatedBroadcastChannel to ChannelLinearizabilityTest.

@fvasco fvasco force-pushed the conflated-broadcast-channel branch 2 times, most recently from 0c6de5a to 6057b54 Compare July 25, 2018 14:02
@fvasco
Copy link
Contributor Author

fvasco commented Jul 25, 2018

add ConflatedBroadcastChannel to ChannelLinearizabilityTest

I don't understand how, ChannelLinearizabilityTest does not manage BroadcastChannel.

@qwwdfsad
Copy link
Member

qwwdfsad commented Aug 6, 2018

You can use TestChannelKind wrapper instead of integer capacity

@fvasco
Copy link
Contributor Author

fvasco commented Aug 6, 2018

Hi @qwwdfsad,
I cannot take care of this PR at this time, sorry.

@qwwdfsad
Copy link
Member

qwwdfsad commented Aug 6, 2018

Sure, take your time.
Feel free to ping me when you are ready

@fvasco
Copy link
Contributor Author

fvasco commented Sep 12, 2018

Hi @qwwdfsad,
I am reviewing your comments but I cannot understand well your suggestions.

All ChannelLinearizabilityTest's tests run the runTest(capacity: Int) function, so Channel(capacity) is invoked, there is no way to create a ConflatedBroadcastChannel using this primitives.

Can you offer me another hint, a code snippet or, if you wish, put a your commit on this PR?

@qwwdfsad qwwdfsad force-pushed the develop branch 5 times, most recently from 69dc390 to eaf9b7c Compare October 25, 2018 10:41
@qwwdfsad qwwdfsad force-pushed the develop branch 3 times, most recently from bc68d63 to 3179683 Compare November 12, 2018 11:49
@qwwdfsad
Copy link
Member

qwwdfsad commented Apr 9, 2019

I apologize for letting it hang; I will work on my PR reviewing policy.

I am not ready to spend your and my time on this PR now: we currently have a major rework of channels that will completely mess the implementation and in parallel, we'd want to implement #736 somehow.
It is better to abandon what is done then spend even more time to have the work thrown away eventually.

@qwwdfsad qwwdfsad closed this Apr 9, 2019
@fvasco
Copy link
Contributor Author

fvasco commented Apr 9, 2019

I will try to contribute on future work, thank you for your effort.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants