Skip to content

Use EmitterProcessor in the FluxMessageChannel #3104

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conversation

artembilan
Copy link
Member

The EmitterProcessor has a good logic to block upstream producer
when its downstream subscriber cannot keep up with overproducing.

  • Rework FluxMessageChannel logic to rely on the EmitterProcessor
    instead of Flux.create()
  • Cancel FluxMessageChannel internal subscriptions in the destroy()
  • Fix ReactiveStreamsTests.testFluxTransform() for the splitter's
    delimiter
  • Ensure in the FluxMessageChannelTests.testFluxMessageChannel
    that we can have several concurrent subscribers to the
    FluxMessageChannel

@artembilan artembilan added this to the 5.2.2 milestone Nov 6, 2019
@artembilan
Copy link
Member Author

@bsideup ,

Would you mind to take a look into this fix, please?

Thanks

Copy link
Contributor

@bsideup bsideup left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(here was a comment but I missed an important detail so I am updating it :D)

@artembilan artembilan force-pushed the EmitterProcessor_for_FluxMessageChannel branch from fb52b63 to e066b8c Compare November 8, 2019 18:19
@artembilan
Copy link
Member Author

I think I found a solution with the delaySubscription(Publisher<U> subscriptionDelay).

Stay tuned! Will come back to you today 😄

@artembilan artembilan force-pushed the EmitterProcessor_for_FluxMessageChannel branch from 86bee7f to cefe941 Compare November 13, 2019 20:07
@artembilan
Copy link
Member Author

So, here you are a fix based on the delaySubscription() and some additional internal EmitterProcessor to handle external subscriptions to this FluxMessageChannel.

Mono.fromSupplier(this.subscribed::get)
.filter((subscribers) -> subscribers > 0)
.switchIfEmpty(this.subscriptionDelay.next()))
.subscribe();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the proper way would be to have a subscribeTo() to return Mono<Void> instead to let that caller to subscribe or compose further its reactive flow.
This way we would defer an emission from this source publisher as late as possible.

But I'd like to revise that stuff in a separate PR.

Thanks

@artembilan artembilan force-pushed the EmitterProcessor_for_FluxMessageChannel branch 2 times, most recently from 1f1c315 to f267182 Compare November 19, 2019 17:16
@artembilan
Copy link
Member Author

@bsideup , gentle ping. 😄

@artembilan artembilan force-pushed the EmitterProcessor_for_FluxMessageChannel branch from 13bd6a0 to 15d2628 Compare November 19, 2019 20:29
@artembilan
Copy link
Member Author

OK! I have minimized the change for this PR as much as possible - 2 files. 😄

Looking forward for your feedback!

@artembilan artembilan force-pushed the EmitterProcessor_for_FluxMessageChannel branch from 15d2628 to 5d0fa19 Compare November 22, 2019 18:11
@artembilan
Copy link
Member Author

OK. I pushed what we have discussed with some workaround for subscription race condition.

Thank you for your feedback, @bsideup, looking forward for more!

@artembilan artembilan force-pushed the EmitterProcessor_for_FluxMessageChannel branch from 5d0fa19 to b60bc55 Compare December 2, 2019 19:04
@artembilan
Copy link
Member Author

@bsideup ,

so, I have rebased to master and decreased a change as much as possible. Again 😄

Would be great to have some feedback before release this Wednesday.

Thanks

Fixes spring-projects#3107

The `MessagingGatewaySupport` has an `errorOnTimeout` option to throw
a `MessageTimeoutException` when downstream reply doesn't come back in
time for configured reply timeout

* Expose an `errorOnTimeout` option as a `TcpInboundGateway` ctor
property
* Add new factory methods into a `Tcp` factory for Java DSL
* Ensure a property works as expected in the `IpIntegrationTests`
* Document a new option
The `EmitterProcessor` has a good logic to block upstream producer
when its downstream subscriber cannot keep up with overproducing.

* Rework `FluxMessageChannel` logic to rely on the `EmitterProcessor`
instead of `Flux.create()`
* Cancel `FluxMessageChannel` internal subscriptions in the `destroy()`
* Fix `ReactiveStreamsTests.testFluxTransform()` for the splitter's
delimiter
* Ensure in the `FluxMessageChannelTests.testFluxMessageChannel`
that we can have several concurrent subscribers to the
`FluxMessageChannel`
* Change `subscribers` list into just `AtomicInteger` count marker
* fix `DefaultSplitterTests` according a new logic in the `FluxMessageChannel`
…ssageChannel`

to wait until this one subscribed.
* Use an `EmitterProcessor` to catch subscriptions and pass them as a
signal to delayed upstream publishers
* Fix  `FluxMessageChannelTests.testFluxMessageChannelCleanUp` to
verify an actual property instead of removed.
* Fix `RSocketOutboundGatewayIntegrationTests` for the proper subscription
into a `FluxMessageChannel` before actual interaction with an RSocket
gateway.
This should help us also to avoid some race conditions in the future
…bedSignal`.

This one is used `delaySubscription` for the upstream publishers
* Use a `AtomicBoolean` for subscription state since `doOnSubscribe()`
is called before `EmitterProcessor` adds subscribers for its `downstreams`
* Use `publishOn(Schedulers.boundedElastic())` for upstream publishers
to avoid blocking over there when our `EmitterProcessor` doesn't have
enough demand
* Refactor reactive tests to have a subscription into the `FluxMessageChannel`
earlier than emission happens for it
instead of `doOnSubscribe`
* Check for `this.processor.hasDownstreams()` before emitting such an event
@artembilan artembilan force-pushed the EmitterProcessor_for_FluxMessageChannel branch from f75e027 to 9a35815 Compare December 3, 2019 16:20
@artembilan
Copy link
Member Author

Pushed the fix as we discussed in private chat.

Thanks


this.publishers.values().forEach(ConnectableFlux::connect);
if (this.processor.hasDownstreams()) {
this.subscribedSignal.onNext(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably this.subscribedSignal.onNext(this.processor.hasDownstreams()) would be better

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

He-he! True. Will fix soon.

…ribedSignal`.

This way we are less vulnerable race conditions when subscribers are changed
actively
@artembilan
Copy link
Member Author

@garyrussell ,

We're done here with @bsideup . So, let us know if you are OK and we merge (or you).

Thanks

@garyrussell garyrussell merged commit 94e0816 into spring-projects:master Dec 3, 2019
@artembilan artembilan deleted the EmitterProcessor_for_FluxMessageChannel branch May 15, 2023 20:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants