Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Promise style semantics to reactive streams #268

Closed
daviddawson opened this issue May 18, 2015 · 111 comments
Closed

Add Promise style semantics to reactive streams #268

daviddawson opened this issue May 18, 2015 · 111 comments

Comments

@daviddawson
Copy link

We are currently building a set of network APIs that use reactive streams. We're migrating more and more over the reactive streams to gain the benefits of swappable implementations over the standardised interfaces, back pressure support and good FRP.

When dealing with streaming data, the semantics are well understood by the users of the API. When dealing with something that is naturally request/ response (think similar to an HTTP request), then the semantics begin to break down.

We used a Future for this reason. Developers approaching the API expect to get a single response back, and we feel that the API should indicate to them what they should expect to happen. It's about reasoning and learning rather than mechanics.

Naturally, we would want to return a Promise from this type of API, indicating to the developer what they can reasonably expect to occur. Currently, our only option from reactive streams is a full publisher.

Mechanically, this works fine, but that isn't the issue. From the point of view of building an API for consumption by randoms on the internet, I don't want to have to indicate in the method name nor the documentation that a single item will be returned.

The case of up to 1 item being returned is special, and has spawned Future and Promise in the JDK and everywhere else respectively. For this reason, and for reactive streams to become even more useful for the developing of highly interoperable and usable APIs, I would like to suggest that a Promise interface be added to the core set.

Mechnically, it would be an extension of Publisher, and nothing more. Semantically, it would be constrained in the specification to return up to a single item.

@viktorklang
Copy link
Contributor

@daviddawson Given that we're talking about an end-user API, would it make sense to use CompletionStage in stead of the RS interfaces for these kinds of methods? (It would be a rather trivial exercise to write a bridge between CompletionStage and Publisher if one needs to pass one as the other).
Let me know what you think!

@jbrisbin
Copy link

@viktorklang Good idea, but needs to support Java 7 as a minimum IMO.

@jbrisbin
Copy link

I think this is somewhat related as well: in the @reactive-ipc/reactive-ipc-jvm project we're [ab]using the RS API to indicate completion or error state by using a Publisher<Void>. This means you'll never get a real value, only a notification of complete or error. I'm thinking that with a "real" Promise style interface, we could be using that specific indicator rather than the somewhat arbitrary use of the Publisher. The former indicates a more promissory intent IMO than Publisher<Void> which is technically fine and correct but just doesn't feel right to me. It feels like we're trying to get the benefits of the interoperability of the RS API so we can compose actions based on this but since a Publisher is meant to transmit values, it seems odd to me that we would specify Publisher<Void> which really means "values of type NULL"--which is the only possible value of the Void type. Since that's illegal, what we're saying is "use this technically illegal signature to express the intent that an operation will be completed later and will result in a complete or error state but will not have any associated value".

This type of situation has come up for us quite a few times and I really think that providing for it at the RS level is better because it allows us to stick to that API rather than extending it to our own and requiring some other kind of component that isn't sharable like the RS API is.

@daviddawson
Copy link
Author

Yes, I'd like Java 7 support.

Also, tbh CompletionStage is a bit opaque on how it should be handled by a user :-)

Reactor in particular exposes the reactive streams interfaces all over the place, so having a potential Promise there too would be fairly natural. If a Promise in reactive streams were just defined as

interface Promise extends Publisher {}

Then it would be totally non invasive and integrate with existing reactive streams implementations while extending the semantics of reactive streams into an area ripe for it.

Thinking about it, we'll do this anyway (define a Promise interface), but it would be good to have it standardised.

@jbrisbin
Copy link

I would like to take it one step further and specifically call out the special case of Promise<Void> to make sure it's noted that since the spec prohibits null values already, that a Promise<Void> indicates 0 values and a Promise<T> indicates 0-1 values.

@viktorklang
Copy link
Contributor

First, let me state that I am not trying to be a contrarian, or otherwise be difficult—I'm merely trying to see if there is a problem worth solving here, and if it is worth solving or if there is alternate solutions that would work in the place of the proposed solution.

@viktorklang Good idea, but needs to support Java 7 as a minimum IMO.

@jbrisbin Normally I would agree, but given that J7 is EOL and that it is possible to run j.u.c.CompletionStage via -Xbootclasspath if one must target an obsolete platform, does it really make sense to add yet another thing that maps directly to a JDK type?

I think this is somewhat related as well: in the @reactive-ipc/reactive-ipc-jvm project we're [ab]using the RS API to indicate completion or error state by using a Publisher. This means you'll never get a real value, only a notification of complete or error. I'm thinking that with a "real" Promise style interface, we could be using that specific indicator rather than the somewhat arbitrary use of the Publisher. The former indicates a more promissory intent IMO than Publisher which is technically fine and correct but just doesn't feel right to me.

But Promise wouldn't be better than Publisher? It's still no value expected, right?

, it seems odd to me that we would specify Publisher which really means "values of type NULL"--which is the only possible value of the Void type. Since that's illegal, what we're saying is "use this technically illegal signature to express the intent that an operation will be completed later and will result in a complete or error state but will not have any associated value".

Technically "Void" could be correct here, but the problem I see is that its Javadoc is terrible: https://docs.oracle.com/javase/6/docs/api/java/lang/Void.html

In a language with a more…sophisticated…type system one would use something more like Publisher[Nothing].

So what you could do instead is to define:

/**
 * A type that indicates that no value of this type can exist.
 */
public enum Nothing {;}

and then use:

Publisher<Nothing>

This type of situation has come up for us quite a few times and I really think that providing for it at the RS level is better because it allows us to stick to that API rather than extending it to our own and requiring some other kind of component that isn't sharable like the RS API is.

Agreed. But do you agree that the existence of CompletionStage and the EOL of Java7 diminishes the value of creating this new type, adding speccing for it, updating the TCK for it, updating the docs for it and having to ship a new version of RS. For a single marker interface? (Me remembering how hard I had to argue to get Processor approved by @benjchristensen back in the days.)

@jbrisbin
Copy link

@viktorklang I would expect you and @benjchristensen to push back on most things as it ends up being better for having people who don't just agree right away but pick at the solution and ask tough questions. I don't see anything wrong with being a contrarian! ;)

@jbrisbin Normally I would agree, but given that J7 is EOL and that it is possible to run j.u.c.CompletionStage via -Xbootclasspath if one must target an obsolete platform, does it really make sense to add yet another thing that maps directly to a JDK type?

Have to disagree pretty completely here. I realize that Java 7 is technically EOL but in the real world that makes no difference (and the bootclasspath option is a non-starter...I can't imagine the discussion with ops that would ensue to describe the need to add a custom JVM option to be distributed with the application...that is an unreasonable burden IMO). The fact of life is that it's unrealistic for someone writing applications designed to be adopted by folks still using legacy applications to upgrade to Java 8 just to get a simple feature that can be provided by an earlier JDK. I don't agree that the JDK 8 type adds anything meaningful (par for the course...there's me being the contrarian!).

But Promise wouldn't be better than Publisher? It's still no value expected, right?

True, but I think we need to address the use specifically because what we're saying is not "null values", which is really what Publisher<Void> is saying. As you pointed out, the JDK has no way of saying "nothing" so we're stuck with Void which is close but IMO still not correct.

But do you agree that the existence of CompletionStage and the EOL of Java7 diminishes the value of creating this new type, adding speccing for it, updating the TCK for it, updating the docs for it and having to ship a new version of RS. For a single marker interface? (Me remembering how hard I had to argue to get Processor approved by @benjchristensen back in the days.)

I don't think the effort of including it weighs on its usefulness. Those may be logistical hurdles to overcome but they shouldn't be factors in deciding whether or not the Promise has merit.

I believe it is important to include in the core RS API (one which gets reused and is guided and documented by a concomitant specification) a component for expressing the intent that a Publisher isn't just a "normal" Publisher but that you can guarantee either zero or 1 onNext plus an onComplete. The important thing as @daviddawson pointed out is that you can do this already but it's imprecise in code. It's technically correct to return a Publisher<T> but it's very vague about what expectations there are around that Publisher. In most cases that's good because it allows easy interoperability. But I know of two implementations of Reactive Streams (Reactor and Ratpack) that have their own, somewhat competing implementations of Promise. There's also Netty itself, which has a promise capability that we're trying to "convert" to the RS API. This divergence can't be good and could be avoided by providing in the RS spec a facility for expressing a sort-of-future-like component that expresses the idea of a single (or no) result + a complete.

@jbrisbin
Copy link

Technically "Void" could be correct here, but the problem I see is that its Javadoc is terrible: https://docs.oracle.com/javase/6/docs/api/java/lang/Void.html

In a language with a more…sophisticated…type system one would use something more like Publisher[Nothing]`.

So what you could do instead is to define:

/**
 * A type that indicates that no value of this type can exist.
 */
public enum Nothing {;}

That's definitely more explicit (and likely preferable) but I don't know that it does anything meaningful more than Void.

The one subtle thing that might argue in its favor is that enum Nothing {;} really does mean "nothing" because there's no way to communicate a value. With Void you're really getting a wrapper class that has a public static member of type Class<Void>. That's generally understood to mean "null" and not "nothing" which IMO is the core of the problem.

Publisher<Optional<T>> could be interesting as well, but you're back to the problem of being Java 8-only.

@jbrisbin
Copy link

Maybe @daviddawson and @smaldini could comment (and feel free to disagree if you want!) but I think we need two classes and two specification provisions:

public interface Promise<T> extends Publisher<T> {}

public enum Nothing {;}

The Promise<T> is spec'd to provide for a special kind of Publisher that a user can know for certain will never have more than 1 onNext signal plus the onComplete.

The Promise<Nothing> is spec'd to provide for a special situation where a Publisher will never receive an onNext signal, only an onComplete or onError. It doesn't necessarily have to be part of the TCK I don't think but could be a provision on one of the Promise rules that specifies in a predictable way what to expect if that is used in a library.

The reason for including these here is that this is the lowest common denominator when writing libraries that expose their Reactive Streams support. It seems less useful to have LibraryA create their own version of Publisher<T> that is effectually a Promise and LibraryB create their own version which does things a little differently. Both exhibit similar characteristics but neither clearly expresses the intent of how they're using a Publisher.

The alternative is to defer dealing with this to implementations and get what we have today which is very vague and not interoperable (I can't communicate the idea of a Promise from one library to the next, only a Publisher) and will lead to gaps with using the RS API where we have to supplement it with our own components.

@daviddawson
Copy link
Author

I strongly agree. I would very much like to have a standardised Promise whose primary purpose is interop between FRP style libraries and application stacks.

As an experiment, I just implemented a Promise across a test codebase where previously it was Publisher and and it becomes much clearer what does what. Looking through the Reactor codebase a little, I can see that a standardised interface could be retrofitted in there too and help clear things up (although since there's an internal Promise already, possibly not so much)

The other interface (Nothing) I'm not sure I've seen the need yet, but I understand your description and could see it being useful for us in the future.

I agree with @jbrisbin on standardisation. There are half a dozen Promise implementations now extant in the codebases we use regularly, and they are starting to grow in functionality to mean something more like 'stream processing' than 'I may give you something later' (looking at the Ratpack one in particular). If an attempt isn't made to give some interop under the purview of the reactive streams specifications, then I don't see any interop being possible around this concept, which I think would be a shame.

Also, this is too easy to pass up, my apologies.

public interface Promise<T> extends Publisher<T> {}

public enum YouAreDeadToMe {;}

Promise<YouAreDeadToMe>();

@jbrisbin
Copy link

public interface Promise<T> extends Publisher<T> {}

public enum YouAreDeadToMe {;}

Promise<YouAreDeadToMe>();

😆

@ldaley
Copy link
Contributor

ldaley commented May 18, 2015

One of the benefits of using a promise (a.k.a. a one, or zero-or-one depending on perspective, element stream) is that you don't have to worry about demand, hot vs. cold, async cancellation (i.e. in that it's simpler). Promise<T> extends Publisher<T> defeats this. You're still carrying all the complexity of Publisher.

You can easily adapt a promise to/from Publisher. That seems like a better strategy for end user API.

@viktorklang
Copy link
Contributor

@viktorklang I would expect you and @benjchristensen to push back on most things as it ends up being better for having people who don't just agree right away but pick at the solution and ask tough questions. I don't see anything wrong with being a contrarian! ;)

Thanks for being extremely understanding of the reason! :)
There's going to be a lot in this reply that can be perceived as me being negative (and I detest feeling like I'm only being negative) so please, please take everything I say below as the most constructive criticism I can bring forth after only getting 4.5h of sleep (thanks kids!).

Have to disagree pretty completely here. I realize that Java 7 is technically EOL but in the real world that makes no difference (and the bootclasspath option is a non-starter...I can't imagine the discussion with ops that would ensue to describe the need to add a custom JVM option to be distributed with the application...that is an unreasonable burden IMO). The fact of life is that it's unrealistic for someone writing applications designed to be adopted by folks still using legacy applications to upgrade to Java 8 just to get a simple feature that can be provided by an earlier JDK. I don't agree that the JDK 8 type adds anything meaningful (par for the course...there's me being the contrarian!).

Why don't you think the JDK8 type adds anything meaningful? The entire purpose of adding it was to accommodate for exactly this type of thing, at least that's what I recall from talking to @DougLea.

True, but I think we need to address the use specifically because what we're saying is not "null values", which is really what Publisher is saying. As you pointed out, the JDK has no way of saying "nothing" so we're stuck with Void which is close but IMO still not correct.

But if Promise does not solve the problem, why add it?

I don't think the effort of including it weighs on its usefulness. Those may be logistical hurdles to overcome but they shouldn't be factors in deciding whether or not the Promise has merit.

If logistical hurdles are not to be taken into account, then requiring people to go through the logistical hurdle of upgrading to JDK8 shouldn't be an issue either, right? :-)

I believe it is important to include in the core RS API (one which gets reused and is guided and documented by a concomitant specification) a component for expressing the intent that a Publisher isn't just a "normal" Publisher but that you can guarantee either zero or 1 onNext plus an onComplete.

So it guarantees that onError will never be signalled?

The important thing as @daviddawson pointed out is that you can do this already but it's imprecise in code. It's technically correct to return a Publisher but it's very vague about what expectations there are around that Publisher.

Understood. But where does it stop?

Do we need a Publisher type for something that only ever signals onComplete?
Do we need a Publisher type for something that only ever signals onError?
Do we need a Publisher type for something that signals 0…1 of any element and then onComplete?
Do we need a Publisher type for something that signals 0…1 of any element and then onComplete OR onError?
Do we need a Publisher type for something that signals 0…1 and ALWAYS the same element then onComplete?
Do we need a Publisher type for something that signals 0…1 and ALWAYS the same element then onComplete OR onError?
Do we need a Processor type for each of the permutations above?
How does all of these types relate to Subscriber? Because every Subscriber would still have to be written with the assumption of of Publisher, or do we need permutations of Subscriber for all of the above?

Reading the above, it is also clear that the definition of Promise is unclear, in my world a promise only ever produces the same result (deterministic) for all Subscribers, but a Publisher of 0…1 elements could send different elements to each. (hot/cold distinction)

In most cases that's good because it allows easy interoperability. But I know of two implementations of Reactive Streams (Reactor and Ratpack) that have their own, somewhat competing implementations of Promise. There's also Netty itself, which has a promise capability that we're trying to "convert" to the RS API. This divergence can't be good and could be avoided by providing in the RS spec a facility for expressing a sort-of-future-like component that expresses the idea of a single (or no) result + a complete.

The attempt of unifying these is CompletionStage for the JDK.
We used to have the same situation for Scala back in the days before I got involved in SIP-14 to standardize on scala.concurrent.Future, which required everyone to update to the Scala version which included it for interop (equivalent to requiring upgrade to JDK8 AFAICT).

The Promise is spec'd to provide for a special kind of Publisher that a user can know for certain will never have more than 1 onNext signal plus the onComplete.

But that's not the definition of a promise. A promise needs to publish the -same- element to all consumers (deterministic).

The Promise is spec'd to provide for a special situation where a Publisher will never receive an onNext signal, only an onComplete or onError. It doesn't necessarily have to be part of the TCK I don't think but could be a provision on one of the Promise rules that specifies in a predictable way what to expect if that is used in a library.

Having thought about it for a while, I think there's enough prior art to use Void: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html

(In Scala one'd use Nothing (the bottom type) since it has other implications from a type system perspective).

The reason for including these here is that this is the lowest common denominator when writing libraries that expose their Reactive Streams support. It seems less useful to have LibraryA create their own version of Publisher that is effectually a Promise and LibraryB create their own version which does things a little differently. Both exhibit similar characteristics but neither clearly expresses the intent of how they're using a Publisher.

Publisher generically is not effectually a Promise (from the definition I use) since Publisher does not distinguish hot/cold, and this for a reason, because it creates a terrible matrix of permutations, tracking hotness/coldness through combinators is a real pain, and that doesn't even take into the account things like never-completes, never-fails situations.

The alternative is to defer dealing with this to implementations and get what we have today which is very vague and not interoperable (I can't communicate the idea of a Promise from one library to the next, only a Publisher) and will lead to gaps with using the RS API where we have to supplement it with our own components.

The RS API is for interop (not end user, since it doesn't have any combinators or otherwise) and given my argumentation around the permutations I don't see the adding of a Promise interface as something that carries its weight from a utility PoV (especially considering j.u.c.CompletionStage)

@daviddawson

I strongly agree. I would very much like to have a standardised Promise whose primary purpose is interop between FRP style libraries and application stacks.

There is a standardised Promise! :)
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletionStage.html

As an experiment, I just implemented a Promise across a test codebase where previously it was Publisher and and it becomes much clearer what does what. Looking through the Reactor codebase a little, I can see that a standardised interface could be retrofitted in there too and help clear things up (although since there's an internal Promise already, possibly not so much)

Turning a Promise into a Publisher is very simple, as @alkemist pointed out, if you want to go in that direction and it's even simpler if you control your own Promise interface (since Java still doesn't have things like extension methods/typeclasses) as you can add a "toPublisher" method on it, don't you agree?

The other interface (Nothing) I'm not sure I've seen the need yet, but I understand your description and could see it being useful for us in the future.

It improves over Void in the sense that it can be properly documented as to its purpose where j.l.Void is poorly documented in that regard.

I agree with @jbrisbin on standardisation. There are half a dozen Promise implementations now extant in the codebases we use regularly, and they are starting to grow in functionality to mean something more like 'stream processing' than 'I may give you something later' (looking at the Ratpack one in particular). If an attempt isn't made to give some interop under the purview of the reactive streams specifications, then I don't see any interop being possible around this concept, which I think would be a shame.

While unification of Future/Promise implementations is a worthy goal (which I contend is solved in jdk8 with CompletionStage), Promises isn't what Reactive Streams sets out to solve (even if I am very fond of Promises:

*Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure.*

One of the benefits of using a promise (a.k.a. a one, or zero-or-one depending on perspective, element stream) is that you don't have to worry about demand, hot vs. cold, async cancellation (i.e. in that it's simpler). Promise extends Publisher defeats this. You're still carrying all the complexity of Publisher.

👍 Also, Futures/Promises are typically not demand driven.

You can easily adapt a promise to/from Publisher. That seems like a better strategy for end user API.

I'll have to side with @alkemist on this one!

@smaldini
Copy link
Contributor

I'm aligned with @viktorklang on that I would not encourage a generic extension for this. There are however things to clarify in the specification about the contract around this I agree tho.

@jbrisbin
Copy link

@viktorklang Thanks for taking the time to respond thoroughly. I disagree with you and I suspect we'll have to just leave it at that. But your input is appreciated!

I don't want to address individual points because I don't want to drag this out since I don't see any solution hovering over the horizon, just more ocean. But I do want to clarify a few things for the sake of people reading later:

  1. CompletionStage is a non-starter. We (Reactor/Spring) are simply not going to throw away all the users that can't upgrade to JDK 8. Period. The CompletableFuture API is also...I'll stop there (mom always said "if you can't say anything nice, don't say anything at all"...following that advice).

  2. My point about logistical hurdles applies to things we control and set out to do. It's a different thing entirely to put expectations on users that involve a pretty significant change to how they do things. I don't think it's a good idea to say "well that would be hard to do" as an argument on the merit of something. Either it has real value or it doesn't. The amount of work involved in implementing it may be a reason to say "yes, it has value but we can't afford to pursue it". I simply wanted to argue the case on the merits as it pertains to application and library developers without burdening it with other concerns. Those can come later.

  3. A Promise has no other characteristics different from a Publisher other than the expectation of what will published via onNext. It is still completely demand-driven (a "hot" stream might very well also have a value ready just like a Promise so in that case is indistinguishable). It still terminates with onComplete or onError. It is simply a Publisher with additional expectations that have been spec'd and documented so that library authors can rely on the behavior.

  4. Various solutions already exist to implement this behavior because it's a natural progression from "pure" RS to a "real" application. There are and will continue to be "competing" implementations even though there doesn't have to be. In order to have interop, we will have to use Publisher as the only means to do that since there's no other way provided.

Reactor will be happy to continue doing what we're doing by providing a Promise implementation that provides a good API and fills the gap between a pure RS Publisher and the application code that expects there to be a single response to a request they've submitted. But it's such a common use case that it seems a shame to not be able to provide for it in the core API. When building libraries that expose the RS API for interop, devs will simply have to say: "we'd like to just use RS by itself but since we need a Promise we can't do that so we have to depend on this other library X which does provide it...hope that doesn't interfere with library Y, which you're also using and which also provides a different kind of Promise to the one we're using...wish it were different but it ain't so there ya go!"

@daviddawson
Copy link
Author

Maybe there is potential for a sister specification for this instead then? Taking the points that @alkemist made and moving out of reactive-streams per se to it's own, highly related yet separate thing?

If there could be agreement between the major implementations of Promise for interop then that might avoid the need to overload reactive-streams and instead build upon it for reactive-promises (or whatever).

@He-Pin
Copy link

He-Pin commented May 19, 2015

There are so many Promises there,and a Promise just like some kind of Publisher even it's not demand driven,but you could let it be by trigger the computing eagerly or defer it when there is demand.and after the Promise computed,It will always publish the same value and only once for a dedicated Subscriber right?

for the only using of the CompletionStage here,that may not right,many company is still using java7 or java6,If you wan't to provide something nicer,you have to write a Promise/Future things. Maybe we should provide an standard Promise implement for the target java7 and java6 and then provide inter-ops to the CompletionStage for java8 and later.That's maybe not nice,but that the real world.I like scala's Promise/Future and hate so many Promises in java.

@jbrisbin
Copy link

@daviddawson That's an interesting proposal. We're already "extending" Reactive Streams in a standardized way with @reactive-ipc. Maybe a collaborative reactive-promise would make sense?

It would depend on and extend Reactive Streams proper but spec out the promissory behavior somewhat like Promises/A+ did. It doesn't have to deal with state and all the implementation details of things but it should provide a solid and predictable foundation that library authors can use to expose APIs to other libraries with consistent and predictable results.

@rkuhn
Copy link
Member

rkuhn commented May 19, 2015

The use cases that are discussed here are

  • passing a single asynchronous event to interested parties (this applies both to just signaling completion and transferring a single value—these are completely equivalent operations when prescribing a suitable “completion” value)
  • passing a stream of asynchronous events to interested parties (with back-pressure etc.)

As far as I can see solving the latter can be used for solving the former (as @jbrisbin proposes), but that would unnecessarily complicate the solution—passing a single event to asynchronous listeners can be done without all the ceremony that is needed for setting up and operating a stream subscription. There are a plethora of existing solutions out there, the immense duplication in this field should to my mind be attributed to Java’s unwillingness of hosting a sensible abstraction in this space prior to the introduction of lambdas.

Now, since the central, blessed space has been vacant for too long—and I sympathize with the argument that in practice it has not been filled for a sizable fraction of the user base yet—it would be great if there were another standard that we could use as our solution for expressing single event transmission. That standard would then receive Reactive Streams bridges in the form of an official Reactive Streams add-on library, since this problem does not be solved more than once. Is this what we are talking about here?

Another thought: if we consider this only from the viewpoint of Reactive Streams, would it not suffice to provide bridges for all relevant existing Promise implementations? Why do we need to add yet another one? This is the same argument that was used against SIP-14 (Scala’s Futures), and the answer was that this kind of process only makes sense if enough of the pre-existing implementations vouch to converge on the to-be-agreed standard—is there a chance that this will happen here? Achieving this feat will be tricky, much more difficult than Reactive Streams, since Futures/Promises are already deeply ingrained in languages and libraries and performing such a change without breaking compatibility guarantees that were given to the respective communities (source as well as binary) will be a challenge. I’m not saying that it is impossible—and I believe that we are in a unique position to use our collaborative experience to this end—but we should be clear about what we are setting out to do here.

@benjchristensen
Copy link
Contributor

This is a topic we have discussed for a while in RxJava. You can see the discussion at ReactiveX/RxJava#1594 and code here ReactiveX/RxJava#2641. In particular this comment where I summarize why we're considering it: ReactiveX/RxJava#1594 (comment)

I don't want to spam this discussion with that entire comment as a copy/paste, so I'll just let you read it over there.

In short, I think the Promise semantics make sense for libraries and applications. We are considering adding one to RxJava (though struggling with naming conventions) because the single-response use case is so common.

However, I am hesitant to immediately make it part of Reactive Streams. The biggest challenge for me on it currently is that Future, Task, Promise, etc already exist in abundance and have varying semantics on laziness/eagerness, etc and I don't want to further pollute the space. Many people can happily use the existing solutions alongside Reactive Streams and I don't think an RS implementation should have to also implement a Promise (though it is ridiculously simple once a Reactive Stream exists).

I could be swayed, but I lean towards not pursuing this as part of Reactive Streams.

@rkuhn
Copy link
Member

rkuhn commented May 19, 2015

Thanks, @benjchristensen, for the added reasoning, and I lean in the same direction (in case that was not entirely clear from my comment). One thing I should have mentioned is that we use Scala’s Future to encode this in Akka Streams, with natural bridges mediating between these and our Source/Sink types, and I think that is a nice and clean solution that is not in need of being fixed—unless of course I’m missing something (assuming that all RS implementations will supply suitable bridges to/from streams for their respective Promises of choice).

@jbrisbin
Copy link

The problem as I see it is that:

  1. Java users are SOL if the fix for this gap is "well, there's always Scala Futures" :)
  2. I can't reasonably expect a user to figure out a way to use 3 different reactive libraries, each with their own ideas and implementations of Promise, with no way to talk to one another other than by calling a static helper method that naively converts from one thing to another. It's impossible to imagine the nightmare stacktrace that would ensue for every Promise implementation to wrap everything in an RS helper and hand that Publisher to another library, which then adapts to its own internal types, then wraps in another Publisher to hand off to the user, etc... ad nauseum...

The power of Reactive Streams is that it has been accepted as a standard by the strength of the spec and the TCK. What's being suggested--whether it exists in the reactive-streams-jvm repo or not, is a type of Promise abstraction that builds on Reactive Streams proper but has expectations clearly defined by a spec and tested by a TCK and that the author of Library A can expose to their users a Reactive Promise and that the consumers of that Promise can treat it like a Publisher or a Promise, depending on their needs. Then Library A can hand a Promise to Library B and not have to worry about the impossibility of figuring out how to support promises from other random libraries.

@benjchristensen @rkuhn I actually see this as taking the pollution of the space which exists today and filtering it to produce a completely non-polluted promissory API that allows full interop, just like Publisher (because it is a Publisher) and lets library authors simply pull in the reactive-promise dep and speak to other components in that abstraction rather than the imprecise (but admittedly, technically correct) raw Publisher.

@jbrisbin
Copy link

I could be swayed, but I lean towards not pursuing this as part of Reactive Streams.

@benjchristensen What about the suggestion that a Promise could exist in a dedicated module that extends "pure" Reactive Streams and can be treated as a raw Publisher but is also spec'd to make its behavior predictable?

FTR I can totally understand the reticence to include this in RS proper. I don't completely disagree. But we know this area is ripe for doing exactly what we did for streams.

@He-Pin
Copy link

He-Pin commented May 19, 2015

I know that netty add promise/future for the version 4,and the idea is from scala's.If we don't want to revent the wheel here and there or doing some hack and prave for that this kind of wheel works should/may works like that kind.Scala's Future and Promise is grate but we can't use it in java,and in fact we all need something like this.Reactive Stream is for composeable,if such a little thing won't compose how could we expect that when the user using it ,it will compose well?
in scala we could use implicit conversion,but how about java?then many PromuseA to PromiseB?

all of this,I think is a waste of life,both for the author and the user,Reactive Stream is a good start and I think the collabration of the Promise will show its worth too,just like the scala's.

make it a pain long or short ,it's a problem.But after I learned scala's Promise and Future,I seems find my god.

@rkuhn
Copy link
Member

rkuhn commented May 20, 2015

@jbrisbin I see what you mean now, and there are several points that need to be clarified.

The first is that we are not actually talking about Promises/Futures, hence we should not use that term, it is not adequate. A Future is a read-only handle to a value that may be provided asynchronously and a Promise is a writable handle that backs a Future. The focus here is on the value that is to be filled in or taken out, which is why the callbacks that are installed on a Future in order to compose transformations are anonymous and not usually considered to be active participants at all.

What we are talking about here is an asynchronous provider of a single value plus an asynchronous consumer for exactly one value. This view is prescribed by the nature of Reactive Streams as we have defined them. The provider will need to have a registration method for consumers and the consumer will need to have methods like onSuccess and onFailure.

The second clarification follows from this, and this is the main reason that keeps me from agreeing with you: your assertion that this new abstraction must in any case be a Publisher is wrong. As discussed in the previous paragraph requiring the full Publisher–Subscriber protocol would be overkill. It would both be very inefficient (requiring too many method calls to be exchanged for a single transfer) and also too expressive, it does not adequately limit the scope of the abstraction. I can only follow you in this quest if we first remove the assertion that passing a single element across an asynchronous boundary is in any way related to stream processing. This does not preclude reaching a similar solution in the end, but having this initial requirement would severely obstruct the effort.


On a side note: yes, I can see where you are going, but if we allow ourselves to jump to conclusions then the end result will be not nearly as good as it could be—we will need the same amount of patience that we had last time.

@daviddawson
Copy link
Author

@rkuhn it was my original suggestion that a possible Promise extend Publisher, and so I may have started the discussion off incorrectly!

That came out of my desire to have something standardised and the observation that, since Publisher is currently used in this role, a Promise could work this way, and would integrate with existing implementations. Of course, this does not mean that it should do this and the question of should seems to be the perfect direction to take the discussion.

If we just assume for the moment that a Promise is worth pursuing, what should it look like, in an idealised implementation?

@viktorklang
Copy link
Contributor

Everyone, if I have got things correctly from this discussion (and please do let me know if that's not true), what we are really talking about is whether it makes sense to lead a new standardization effort for Futures/Promises for Java (or the JVM in general). This standardization effort is distinct enough to be standalone from Reactive Streams (which arguably solves a different, but important, problem).

I was a part of standardizing Futures/Promises in the Scala ecosystem a couple of years ago, and my experience with that is what @rkuhn mentioned earlier in this thread: In order to create a successful standard for such a thing when there already exists many current implementation with more than likely different semantics and different execution strategies, one needs to rally the lion's share of the current implementation to actively take part in shaping the end result.

Keep in mind something one wants to avoid w.r.t creating standards.

Such a Future/Promise standard could most definitely have a Reactive Streams bridge to-from for interop, this could exist as an optional add-on.

Just so that I set expectations accordingly, expect it to take a lot of time and a lot of effort into getting everyone involved pulling in the same direction, the following questions will arise for sure:

  • Are they cancellable?
  • Is execution of transformations/callbacks always asynchronous?
  • Does one offer a polling-API?
  • Does one offer a blocking-API?
  • Does one offer a callback-API?
  • Is there a separation between reading and writing the value?
  • Do they support values AND exceptions or are exceptions modeled in the value?
  • Is the execution/executor tied to the future or to the transformation(s)?
  • Is it possible to define transformations and defer execution?
  • What API should be exposed (minimal vs. "complete")?
    (and more)

I'm sympathetic to the idea of standardizing this for the JVM, but it would be a long road to get there, and in the light of CompletionStage and CompletableFuture for Java8, I'm not sure it would be worth the effort, no matter how much one likes the CompletionStage API.

If you got this far, thank you for listening, I appreciate it.

@rkuhn
Copy link
Member

rkuhn commented May 20, 2015

@viktorklang I fully agree with your points about Promises, but given the original motivation for this discussion I don’t think that it is a foregone conclusion that we are actually talking about standardizing Promises. We could instead go for solving the problem of passing a single value across an asynchronous boundary, and this would not be fraught with the difficulties you mention. But it requires the discipline to explicitly acknowledge that this is not a Promise.

@rkuhn
Copy link
Member

rkuhn commented May 20, 2015

Perhaps even more clear: using Provider and Consumer as the terms for the parties that exchange the single value, the correspondences would be that

  • Scala’s Promise extends Consumer
  • Scala’s Future extends Provider

In particular, the Promise is not the Provider.

@viktorklang
Copy link
Contributor

@rkuhn The challenges I see with baking any single-value-thing into RS was outlined in this comment #268 (comment)

In essence my stance is currently—passing (optionally) a single value, while extremely useful, doesn't really match the (more general) goals of Reactive Streams: "Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure."

So while you can most definitely optionally transmit a single value over Reactive Streams, anything more specific would not really fall under the goals of this project, and as such I'd say that it is a different project, albeit a good one!

@agentgt
Copy link

agentgt commented May 27, 2020

@simonbasle

No, I'd say the base case is "this is a cold Publisher", ie. each new subscriber triggers the creation of a new Connection object that is passed to said subscriber whenever it is read

It is not the base case because Publishers can decide to accept only one Subscriber per the spec and call onComplete immediately for new subscribers. ie one publisher per one subscriber.

Instead of base case I meant the most failsafe. Creating a new Publisher per Subscriber seems to be the one to guarantee to work. This is especially true if Publisher has some sort of resources you want cleaned up permanently only once (perhaps a queue disposed).

@agentgt
Copy link

agentgt commented May 27, 2020

@bsideup

No, because MonoPublisher is a valid Publisher, and, by the spec, must support multiple subscriptions.

Per spec its MAY... See section 1.11.

Furthermore I have implemented many publishers that expect only one subscriber particularly because Publishers do not have a dispose method but Subscriptions do (albeit for Connections it makes since to have multiple Connections that is not always the case for all resources).

@bsideup
Copy link

bsideup commented May 27, 2020

It is not the base case because Publishers can decide to accept only one Subscriber per the spec and call onComplete immediately for new subscribers. ie one publisher per one subscriber.

What you described is called "hot Publisher" and usually discouraged, because you can't repeat/retry it, nor you know about such behaviour.

https://github.com/reactive-streams/reactive-streams-jvm#1.11 that (I guess) you're referring to covers the opposite case -

Per spec its MAY... See section 1.11.

See section 1.10:

Publisher.subscribe MAY be called as many times as wanted but MUST be with a different Subscriber each time [see 2.12].

"MAY" here means that it should be ready to be subscribed multiple times, because that's a normal use case.

Furthermore I have implemented many publishers that expect only one subscriber

Why?

particularly because Publishers do not have a dispose method

Publisher is basically a factory of Subscription passed to Subscriber.
Why would you expect it to have dispose method?

@simonbasle
Copy link

It is not the base case because Publishers can decide to accept only one Subscriber per the spec and call onComplete immediately for new subscribers. ie one publisher per one subscriber.

But the standard relationship is by no means 1 Publisher == 1 Subscriber... It's 1 Subscription == 1 Subscriber.

@agentgt
Copy link

agentgt commented May 27, 2020

@bsideup and @simonbasle

Yes I realize it could be implemented that way but I wouldn't assume it so.

What you described is called "hot Publisher" and usually discouraged, because you can't repeat/retry it, nor you know about such behaviour.

I do not define that as Hot. The Publisher may or may not be doing anything once its created. If it is doing something and I may loose event it is Hot.

Cold doesn't guarantee a repeat me if other subscriptions are active. I'm talking 1 publisher to 1 subscriber.

And since @bsideup since you are familiar with RabbitMQ its an example where you might want only one Publisher to Subscriber particularly if you are handling ACKs downstream.

Publisher is basically a factory of Subscription passed to Subscriber.
Why would you expect it to have dispose method?

So what if the the Publisher is expensive to keep around. Again let's say its a RabbitMQ exclusive queue. How do I close it when I'm done? The Subscriber can just signal its done and the Publisher can now cleanup resources.

Given how poorly documented that rd2bc method is I can't really make any assumptions and thus I probably would call ConnectionFactory.create() for a new Publisher each time I wanted a connection and if I wanted the behavior you are talking about (ie retrying) I could just decorate or add a processor that calls ConnectionFactory.create() as its own Publisher.

@agentgt
Copy link

agentgt commented May 27, 2020

But the standard relationship is by no means 1 Publisher == 1 Subscriber... It's 1 Subscription == 1 Subscriber.

Yes I know that but I'm talking about the more than likely case to work coming from a library consumer.

See https://javadoc.io/doc/io.r2dbc/r2dbc-spi/latest/io/r2dbc/spi/ConnectionFactory.html

/**
 * Creates a new Connection.
 */

Publisher<? extends Connection> create()  

The only thing according to that doc is that I will get a Publisher that will deliver a Connection to a Subscriber (not all subscribers). The doc says Creates new Connection... not a publisher that will create a new Connection per Subscription.

Hell the implementation might implement caching. I mean why not for @bsideup retry point. Why reclaim a connection if you already have it and it's fine.

Anyway my point is I just think using Reactive Streams for everything instead of the specific reactive interfaces is ripe for confusion.

@bsideup
Copy link

bsideup commented May 27, 2020

@agentgt

I do not define that as Hot. The Publisher may or may not be doing anything once its created. If it is doing something and I may loose event it is Hot.

"doing anything once its created" remembering that it was subscribing before receiving a next subscriber sounds like "doing something" to me ;)

Cold doesn't guarantee a repeat me if other subscriptions are active.

Well, it definitely does, because it is "stateless", and does not depend on a number of subscriptions started.

And since @bsideup since you are familiar with RabbitMQ its an example where you might want only one Publisher to Subscriber particularly if you are handling ACKs downstream.

You're mixing application concerns with the SPI. No, I would never want 1 Publisher = 1 Subscriber relationship.

Given how poorly documented that rd2bc method is

Is it? https://r2dbc.io/spec/0.8.1.RELEASE/spec/html/#connections.factory

I probably would call ConnectionFactory.create() for a new Publisher each time I wanted a connection

Since it is Publisher, why would you call it each time?

The only thing according to that doc is that I will get a Publisher that will deliver a Connection to a Subscriber (not all subscribers).

Yes, that's how Reactive Streams work. You can check the spec that clearly states that this is the expected behaviour.

The doc says Creates new Connection... not a publisher that will create a new Connection per Subscription.

Again, it is the expectation from the spec. Even if the docs says "creates new Connection", it implicitly means "by subscription", because it is cold. You don't need to clarify such things in Reactive APIs.

Hell the implementation might implement caching.

That would be implementation's choice, I don't see a point here?

Anyway my point is I just think using Reactive Streams for everything instead of the specific reactive interfaces is ripe for confusion.

specific reactive interfaces

I only know one, Publisher, that does create some confusion where cardinality is 0..1. I hope we can fix that.
As soon as we have MonoPublisher, a lot of questions go away, while answers to the remaining are there, in the spec.

@agentgt
Copy link

agentgt commented May 27, 2020

"doing anything once its created" remembering that it was subscribing before receiving a next subscriber sounds like "doing something" to me ;)

The publisher in this case would not be doing anything till it received a request call from a Subscription. The first Subscription to call request would be the winner and all others would get a call to onComplete or onFailure depending on implementation I guess. I'll have to look how we implemented albeit this is for streams. If it was a mono I would probably do recreate connection like you said but again I can see how that might confuse people considering the other alternatives like Completable will just do essentially a replay.

The semantics of Hot and Cold are confusing and are debatable. Some cases people claim it has to replay all of the exact same events. Some say stateless but publishers are inherently very stateful. I really can't find a formal definition. For most I think it just means it's lazy and will not start doing work in the background till a request is received. In contrast a Future is hot since you don't initiate the start.

Anyway you can surely have special databases that only allow one connection right? Or a resource that only allows one subscription at a time. You act like this is a scenario that never happens but I can assure you it does.

Besides you can implement the cold retry logic with a simply decorator to the Publisher which is exactly what we did for some scenarios. It is just easier to initially write a Publisher that only allows one Subscription particularly if you want to guarantee unicast single step delivery.

@simonbasle
Copy link

Anyway you can surely have special databases that only allow one connection right? Or a resource that only allows one subscription at a time. You act like this is a scenario that never happens but I can assure you it does.

Oh I don't think anybody here believes it never happens, just that for Publishers it is the exception rather than the norm...

@bsideup
Copy link

bsideup commented May 27, 2020

The semantics of Hot and Cold are confusing and are debatable. Some cases people claim it has to replay all of the exact same events

I've never seen this expectation in the wild, TBH. People do seem to have problems understanding Hot vs Cold at first, but because of other reasons and definitely not replaying semantics.

Some say stateless but publishers are inherently very stateful

Usually, Publlishers aren't stateful. They may represent a stateful resource, but, as factories of Subscriptions, a stateful Publisher is a smell to be unless it is designed to be stateful (caching scenarios)

I really can't find a formal definition

Start here:
https://projectreactor.io/docs/core/release/reference/#reactor.hotCold

Anyway you can surely have special databases that only allow one connection right?
You act like this is a scenario that never happens but I can assure you it does.

I've seen many times resources that do not allow concurrent access. That said...

Or a resource that only allows one subscription at a time.

When you say "subscription" here do you mean "subscription" or Subscription? You can't limit the number of Subscription spec objects, but you can limit the number of logical subscriptions to a resource (let's maybe call it "session" to be less confusing). That's your business logic, and it is up to you how to handle unique sessions. The spec talks about a different thing.

I guess all of it is described very well here:
#482 (comment)
"Reactive Streams is not a resource management framework"

Besides you can implement the cold retry logic with a simply decorator to the Publisher which is exactly what we did for some scenarios

You never know whether you need to decorate Publisher or not. But If all your Publishers are cold you don't need to know that.

@agentgt
Copy link

agentgt commented May 27, 2020

When you say "subscription" here do you mean "subscription" or Subscription?

I meant lower case subscription.

You never know whether you need to decorate Publisher or not. But If all your Publishers are cold you don't need to know that.

That is my point. I agree with you the cold case is ideal and preferred but the spec doesn't force one way or the other. If you want to enforce specific logic for the case of r2dbc I think it's better to return something like R2DBCMono than the raw MonoPublisher interface. Then have all your sub interfaces say they are cold and or single etc etc.

The reason I'm sort of being contentious on this if you look the original title of this issue its Promise style semantics. Not add Single Publisher semantics. I see the inherit benefit of Single Publishers aka Mono aka Singles but its not really a Promise or Reactive-like Futures (e.g. Guava's ListenableFuture or CompletableFuture). Those types (e.g. Completable) despite having far more complicated interfaces are more constrained and thus I think easier to understand.

After implementing a lot of reactive stuff in the real world for our company it is becoming difficult to explain to other developers particularly if it is just RPC like call composition that is needed.

So yeah I'm very much for the new MonoPublisher but I have doubts how much easier it will make reactive programming.

Anyway thanks for the discussion and patience! I'm sold!

@hutchig
Copy link

hutchig commented May 28, 2020

@bsideup I just thought I would comment...

Above in response to

"Publishers can decide to accept only one Subscriber per the spec"
from @agentgt

you referenced the spec and said

"MAY" here means that it should be ready to be subscribed multiple times"
and later
"No, because MonoPublisher is a valid Publisher, and, by the spec, must support multiple subscriptions."

I would just like to point out that it is perfectly valid for an implementation of the spec to decide
to support only 1 or 0 Subscribers per Publisher if it chooses to.

This is spelled out explicitly and clearly in the spec in:
A Publisher MAY support multiple Subscribers and decides whether each Subscription is unicast or multicast.

The spec is well written and it is commonly held that the use of the capitalisd "MAY" has a very specific meaning in this context.

See RFC 2119:

"The key words "MUST", "MUST NOT", "REQUIRED", "SHALL", "SHALL
NOT", "SHOULD", "SHOULD NOT", "RECOMMENDED", "MAY", and
"OPTIONAL" in this document are to be interpreted as described in...
RFC 2119."https://www.ietf.org/rfc/rfc2119.txt

5. MAY This word, or the adjective "OPTIONAL", mean that an item is
truly optional. One vendor may choose to include the item because a
particular marketplace requires it or because the vendor feels that
it enhances the product while another vendor may omit the same item.
An implementation which does not include a particular option MUST be
prepared to interoperate with another implementation which does
include the option, though perhaps with reduced functionality. In the
same vein an implementation which does include a particular option
MUST be prepared to interoperate with another implementation which
does not include the option (except, of course, for the feature the
option provides.)

Also it is possible to pass whole TCK with only one Subscriber per Publisher.

Apologies for being contrary but I have learned that it is worth attending
to such small things where such important specs are concerned.

@bsideup
Copy link

bsideup commented May 28, 2020

@hutchig
the thing is - it is perfectly valid to call onError if some Publisher got subscribed a second time (we have it in Reactor, too).

But you must follow the spec, and you MUST NOT throw from subscribe, which means that... you accept the Subscriber, call onSubscribe on it and onError immediately after, which means that... you accept multiple Subscribers (my point), just you signal that some state/resource represented by this Publisher can only be accessed once.
But you MUST accept every incoming Subscriber, where "accept" means following the RS spec and call onSubscribe and later onNext/onError/onComplete on it.

So yes, it is "MAY be subscribed multiple times" but "MUST be ready to receive multiple subscribe calls", because there is no escape hatch from not accepting Subscriber other than accepting and doing the "normal" dance.

@agentgt
Copy link

agentgt commented May 28, 2020

@bsideup and @hutchig

My original point I was hoping to make is the easiest Publisher is a Publisher that only allows one subscription (besides the null case of an error or empty publisher). It's not just the easiest it's also the least expensive. Implementing multiple subscriptions requires a collection and generally not just any collection but a concurrent one. Even @bsideup brought up ceremony and the semantics given #428 of having to call onSubscribe then wait for request (or not for error) before onError and onComplete. I obviously meant follow the ceremony of calling onSubscribe prior which would allow multiple subscriptions ... but does it really?

Instead of agreeing that the only guaranteed publisher a library consumer can expect is 1-1 of publisher vs subscription we went down a rat hole of semantics and what @bsideup thinks is a default reactive library or model should be. Thats what makes me dubious and contentious about this change. It appears to make a few libraries needs/agendas which are trying to model a concurrency data structure with a stream one because the JDK option isn't an interface. I get that its low hanging fruit but I much rather have a tighter constrained type than what MonoPublisher is. By going down this seamless path I'm worried there will be an abuse of libraries putting that type all over the place when a more constrained type would be better.

That being said despite my doubts I still see the value and overall think its worth it.

@viktorklang
Copy link
Contributor

My main concern is that adding a new type doesn't make anyone implement it (all existing implementations would need new releases using the new type), and ultimately it doesn't provide any tangible benefit (i.e. the interface is semantically equivalent to connecting a Processor to any Publisher which only produces the first element, a.k.a. take(1))

@bsideup
Copy link

bsideup commented May 28, 2020

@viktorklang

My main concern is that adding a new type doesn't make anyone implement it (all existing implementations would need new releases using the new type)

I represent Project Reactor here. We will definitely implement the type as soon as it is released :D

and ultimately it doesn't provide any tangible benefit

There are many projects in the ecosystem that would benefit from this new type immediately, and there is even a migration path where a project is using Reactive Framework X and X does not support MonoPublisher yet, but they can do:

public MonoPublisher<String> getString() {
    MonoLikeType<String> p = ...;

    return new MonoPublisher<String>() {
        public void subscribe(Subscriber<String> subscriber) {
            p.subscribe(subscriber);
        }
    };
}

(or a custom type that extends framework's)

i.e. the interface is semantically equivalent to connecting a Processor to any Publisher which only produces the first element, a.k.a. take(1)

This is the behaviour, but let's not ignore the type safety part of the proposal. Also, such Processor cannot be detected (for optimizations) by frameworks, while a dedicated type is trivial to detect.

@agentgt
Copy link

agentgt commented May 28, 2020

@bsideup I'm curious is the optimization buffering and/or smaller/simpler publishers in reactor? I have no doubt optimizations can be made knowing its a mono but I'm curious what Reactor does.

Ideally for general performance improvements Subscription would have an additional method that returns meta data analogous to what Spliterator does: https://docs.oracle.com/javase/8/docs/api/java/util/Spliterator.html#characteristics--

ie Subscription.characteristics() but that is far massive undertaking the the simple type annotation however the Subscription.characteristics() could be propagated up the chain where as a type annotation could be accidentally wrapped (I think ... maybe not).

@bsideup
Copy link

bsideup commented May 28, 2020

@agentgt

I'm curious is the optimization buffering and/or smaller/simpler publishers in reactor

Rather the opposite - we can avoid allocating queues for prefetching, have less code to handle such publishers, etc etc

Ideally for general performance improvements Subscription would have an additional method that returns meta data analogous to what Spliterator does:

In Reactor, we have Scannable protocol that allows you to query certain characteristics of Publishers. But that's an off-topic and I suggest we talk about it separately to keep this discussion focused.

@OlegDokuka
Copy link
Member

My main concern is that adding a new type doesn't make anyone implement it

As RSocket project representative, I can say that we will use it ASAP as well

@agentgt
Copy link

agentgt commented May 29, 2020

@OlegDokuka that is because RSocket is using Reactor right? So yeah it would be trivial for it to be added to your library.

For my own implementation that doesn't rely on reactor or rxjava but does provide some Processor and decorators I have to instanceof on every single call to see if it is a mono and then wrap with the correct type. Every Processor, decorator etc will need two make two types (I mean sure you can make this logic DRY but its still two flows).

It's already an incredible burden for implementors of libraries that do want to provide streams aka Flow implementations but not complete composition.

They either need to:

  1. Pick a composing stream library to make writing publishers easier but conform to those stream libraries ideas of how to handle threads and cleanup as well as impose a dependency
  2. Or implement their own Publisher, Processor, Subscribers etc.

So to go back to @bsideup saying how r2dbc connection Publisher is obviously cold and will obviously give new connections and manage subscriptions etc etc... I say that logic is non trivial to implement for a database driver implementor. The easiest logic is for a database driver implementor to offer a Publisher that only does 1-1 to subscription and fail or replay for future subscriptions.... even easier would be to provide Supplier<? extends CompletableFuture<T>> or just make r2dbc have its own specific interface.

@bsideup Just to just how Stream libraries have their own biases... RxJava's Single.cache is cold where as your Reactor Mono.cache is hot (and hence I guess your confusion when I was saying you can have a replay without it being hot): http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Single.html#cache--

(RxJava's is presumably a decorator and I assume Monos is a Processor).

Anyway to @bsideup point on off topic I think an entirely different spec should be made for Reactive Promise like types. One that decides whether its hot, cold and cached/replay etc.

I recommend filing a new issue of "Add Mono/Single (0..1) to reactive streams" or something like that.

@bsideup
Copy link

bsideup commented May 29, 2020

Fair enough. I may have misunderstood the initial purpose of this issue (although, after reading the conversation, I got a feeling that it is precisely about MonoPublisher, just the naming is different).

One way or another, since there is an open PR, I suggest we continue the conversation (about the MonoPublisher type) in there:
#490

@bsideup
Copy link

bsideup commented May 29, 2020

oh, and btw:

The easiest logic is for a database driver implementor to offer a Publisher that only does 1-1 to subscription and fail or replay for future subscriptions

I disagree here and think that it is actually easier to implement a Publisher that returns a new (single) item per subscriber. Not to mention that if it fails (or replays), it goes against the common expectation when you work with reactive frameworks.

even easier would be to provide Supplier<? extends CompletableFuture<T>> or just make r2dbc have its own specific interface.

As I previously mentioned, Supplier<? extends CompletionStage<T>> is not enough because it does not support cancellation, and CompletableFuture neither an option since it is a class.

@bsideup Just to just how Stream libraries have their own biases... RxJava's Single.cache is cold where as your Reactor Mono.cache is hot

I guess the docs need to be adjusted. If you look at the implementation, Mono.cache() subscribes once on first subscribe and then replies, the same way as RxJava works. So there is no bias, actually.

@OlegDokuka
Copy link
Member

@agentgt

@OlegDokuka that is because RSocket is using Reactor right?

Not really. We are in the midway of adding RSocketClient which is an abstraction for RSocket interface.

That abstraction exposes lazy value providing:

requestXxx(Payload) becomes -> requestXxx(Mono<Payload>)

It is clear that Mono payload is vendor-specific. So, in case one using something else (non-projectreactor reactive-streams lib) their single type must be converted to Mono.

Having MonoPublisher we will be able to abstract the source of the value but will keep a strong requirement to specify a source of monoid value.

requestXxx(Payload) becomes -> requestXxx(MonoPublisher<Payload>)

so now, it is going to be compatible with all vendors single value sources

@agentgt
Copy link

agentgt commented May 29, 2020

I disagree here and think that it is actually easier to implement a Publisher that returns a new (single) item per subscriber. Not to mention that if it fails (or replays), it goes against the common expectation when you work with reactive frameworks.

Yes I guess for this case because Connection has close and it is a Mono the Publisher is essentially a Supplier of Subscriptions. Where I have seen it get complicated is resource management and exception handling (ie the issue of not being able to fail fast since subscribe has to be called) but that is orthogonal to this. I'm trying to remember the details but I remember it being easier for exactly the use case that retry-ing is not desired or ill-defined.

@bsideup
Copy link

bsideup commented May 29, 2020

because Connection has close and it is a Mono the Publisher is essentially a Supplier of Subscriptions

I think you clearly misunderstand the abstraction here :)
It is Mono<Connection>, not Connection implements Mono<OfWhatBTW>.

@agentgt
Copy link

agentgt commented May 29, 2020

I think you clearly misunderstand the abstraction here :)
It is Mono, not Connection implements Mono.

I didn't. What I don't know is if the subscription provided by Mono will close the connection on cancel.

Does it? I assume it does? And if so does it interrupt (although I suppose that doesn't matter in a full reactive platform)? Do I need to do a doFinally on the Mono using reactor parlance?

We agree that there is a 1-1 subscription to connection right?

Resource management is confusing with reactive frameworks. Actually almost all of it is.

After all this discussion Project loom is starting to look better and better for doing Request/Reply. I will still use Flow aka Publisher for true streams but I think a 0..1 stream is the wrong abstraction for modeling the apparently 90% use case of microservice REST frameworks which are not 0..1 stream but 1 or error. It is really sort of telling that most libraries are just single response and not stream.

Anyway I'm not going to implement Mono for our use cases but stick with our custom interfaces and provide a single bridge. I think most libraries that conform to reactive can do that instead of requiring a full bump of reactive streams and also JDK Flow.

@OlegDokuka
Copy link
Member

Resource management is confusing with reactive frameworks. Actually almost all of it is.

As @viktorklang said it once. Reactive-Streams is not a resource management framework

@bsideup
Copy link

bsideup commented May 29, 2020

What I don't know is if the subscription provided by Mono will close the connection on cancel.

No, why should it? O_O
I mean... I've been using reactive frameworks for years and NEVER seen such behaviour.

Resource management is confusing with reactive frameworks

No, it is not. Because there is no "Resource management" at all. "Reactive Streams is not a resource management framework" (c) #482 (comment)

After all this discussion Project loom is starting to look better and better for doing Request/Reply

Okay, this is an "atomic bomb" dropped into this discussion. I simply will not answer that.

Anyway I'm not going to implement Mono for our use cases but stick with our custom interfaces and provide a single bridge.

Out of curiosity, do you have it publicly available somewhere? Some OSS libs maybe?
I have a feeling that you come from a completely different world and want to learn something new :)

@agentgt
Copy link

agentgt commented May 29, 2020

Out of curiosity, do you have it publicly available somewhere? Some OSS libs maybe?
I have a feeling that you come from a completely different world and want to learn something new :)

It's not OSS. A lot of our problems are stream based. I'll see if I can figure out some medium where we can discuss more.

As for Loom.. yeah I am sorry for bringing that up but I have spent countless time trying to teach reactive programming to developers and for non-streaming RESTful microservices I have come to the conclusion it is not worth the slight performance benefits.

So what I see these days is people writing RESTful microservices using reactive frameworks adding significant cognitive load either through callbacks or monoids when the reality is request/reply is just not very reactive.

No, it is not. Because there is no "Resource management" at all. "Reactive Streams is not a resource management framework" (c) #482 (comment)

And while agree on that but the reality is if you are bridging to other APIs like for example CompletableFuture or actually physically needing to drop the stream to messages there is no guarantee. I mean you even said you want cancel right (remember request(1) has to be called first anyway)? Part of the problem on our end is we have existing APIs before reactive streams came out. The subscription would need more info or each event would need a close like you have for connection. So I guess our cancel do resource cleanup? Are you implying cancel should never cause the Publisher to do resource cleanup?

The reason it seems so obvious is because this is a Mono but for semi infinite streams that have more stringent guarantees like locking or what not this is not obvious. I mean cleanup has to happen somewhere for a stream and having each item have some sort of signal is not very ergonomic.

@agentgt
Copy link

agentgt commented May 29, 2020

I guess in less words I mean resource management is complicated with reactive streams precisely because it doesn't have semantics around it.

Blocking APIs don't have this problem because they can fail fast and there is try catch, etc.

Anyway it's been a couple of years since I have actually worked with reactive frameworks (I do more boring biz stuff these days) so I'm not the best to chime in but I wanted to give input from a non-library writer. As in we try (and tried) to use things like this in the real world and it's a lot more complicated than I think many of the reactive library writers might think.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests