Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature Request: Add Observable.usingWhen() for resources generated by a Publisher #7548

Open
KarboniteKream opened this issue Mar 25, 2023 · 3 comments

Comments

@KarboniteKream
Copy link
Contributor

KarboniteKream commented Mar 25, 2023

Version: 3.1.6

In Reactor, there is Mono.usingWhen(), which works similarly to Mono.using(), but supports resources that are generated and cleaned up with a Publisher.

For example, here's a use case in r2dbc-pool:

Mono<Object> result = Mono.usingWhen(
    connectionFactory.create(), // Publisher<Connection>
    connection -> executeQuery(connection),
    Connection::close // Publisher<Void>
);

In RxJava, we have Single.using(), which works the same way as Mono.using(), so there's no support for reactive resource generation/cleanup. I was thinking of doing something like this:

Single<Object> result = Single
    .fromPublisher(connectionFactory.create())
    .flatMap(connection -> executeQuery(connection)
        .flatMap(result -> Completable
            .fromPublisher(connection.close())
            .toSingleDefault(result)));

However, this does not handle cases like connection cleanup on error, dispose or terminate. The .doOnXyz() methods would not suffice, so I'm guessing this would require a custom Observable implementation, unless I'm missing something obvious. I've also checked RxJavaExtensions for anything similar to what I'm trying to achieve, but I was not able to find anything, nor could I find anything relevant on Stack Overflow.

I currently have the option of just using Mono.usingWhen() and then convert it to a Single, but it would be nice to have this natively available in RxJava. Would you be willing to add support for usingWhen()?

@Mahammadnajaf
Copy link

One way to handle resource cleanup in RxJava is to use the using operator to create a Disposable object that handles the cleanup logic. You can combine this with the Single.defer operator to create a single that generates the resource and executes your query, with the Disposable taking care of the cleanup.

Here's an exmp:
Single result = Single.defer(() -> {
Connection connection = connectionFactory.create().blockingGet();
return executeQuery(connection)
.doFinally(() -> connection.close());
});

In this example, the defer operator is used to delay the creation of the connection until the Single is subscribed to. The blockingGet method is used to block and wait for the connection to be created. Then, executeQuery is called with the connection, and the doFinally operator is used to clean up the connection when the Single completes or errors out.

You could also create a custom operator that wraps the using operator and converts the resulting Mono to a Single. However, this would require more code than the previous example and may not be necessary for your use case.

I hope this helps! :)

@akarnokd
Copy link
Member

@KarboniteKream Thanks for the feedback. I'm still thinking about the implementation details and support considerations. Stay tuned.

@Mahammadnajaf Suggesting blocking operations is 99.99% wrong.

@Mahammadnajaf
Copy link

well
Single result = Single.defer(() -> {
Connection connection = connectionFactory.create().blockingGet();
return executeQuery(connection)
.doFinally(() -> connection.close());
});

In this code, the blockingGet method is only used once to retrieve the Connection object, which should be fine as long as it is done in a non-blocking context (such as during application startup or initialization).

However, if you need to avoid blocking operations entirely, you could modify the code to use reactive streams instead of blocking operations. Here's an example:

Single result = Single.defer(() -> {
return connectionFactory.create()
.flatMap(connection -> executeQuery(connection)
.doFinally(() -> connection.close()));
});

In this example, the create method returns a Mono that emits a single Connection object when it is available. The flatMap operator is used to execute the executeQuery method with the Connection object, and the doFinally operator is used to close the Connection when the Single completes or errors out.

This code should not block, as all operations are done in a reactive, non-blocking manner.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants