Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Omit cancellation of transactional Monos in TransactionOperator #23562

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -38,6 +38,10 @@
* application services utilizing this class, making calls to the low-level
* services via an inner-class callback object.
*
* <p>Transactional Publishers should avoid Subscription cancellation.
* Cancelling initiates asynchronous transaction cleanup that does not allow for
* synchronization on completion.
*
* @author Mark Paluch
* @author Juergen Hoeller
* @since 5.2
Expand All @@ -64,9 +68,7 @@ default <T> Flux<T> transactional(Flux<T> flux) {
* @throws TransactionException in case of initialization, rollback, or system errors
* @throws RuntimeException if thrown by the TransactionCallback
*/
default <T> Mono<T> transactional(Mono<T> mono) {
return execute(it -> mono).next();
}
<T> Mono<T> transactional(Mono<T> mono);

/**
* Execute the action specified by the given callback object within a transaction.
Expand Down
Expand Up @@ -71,6 +71,21 @@ public ReactiveTransactionManager getTransactionManager() {
return this.transactionManager;
}

@Override
public <T> Mono<T> transactional(Mono<T> mono) {
return TransactionContextManager.currentContext().flatMap(context -> {
Mono<ReactiveTransaction> status = this.transactionManager.getReactiveTransaction(this.transactionDefinition);
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
// Need re-wrapping of ReactiveTransaction until we get hold of the exception
// through usingWhen.
return status.flatMap(it -> Mono.usingWhen(Mono.just(it), ignore -> mono,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't it be better to start from Mono.usingWhen?

return Mono.usingWhen(status, ignore -> mono, this.transactionManager::commit, ...)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We require the status value outside of usingWhen(…) (see rollbackOnException)

this.transactionManager::commit, s -> Mono.empty())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note that in case of cancellation, since no asyncCancel is defined here, it defaults to commit. Also, it feels weird that the error case is basically ignored

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commit on cancel is what we want if just a subset of data is processed. In imperative code this compares to processing a subset of a result list.

The error case is ignored because errors are handled after the usingWhen operator. In case a pre-commit action fails, we want to issue a rollback. Committing a transaction consists of several activities of which the actual commit is just one of them. So if a commit fails, we still need to rollback,

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense, thanks for the explanations. nothing to add 👍

.onErrorResume(ex -> rollbackOnException(it, ex).then(Mono.error(ex))));
})
.subscriberContext(TransactionContextManager.getOrCreateContext())
.subscriberContext(TransactionContextManager.getOrCreateContextHolder());
}

@Override
public <T> Flux<T> execute(TransactionCallback<T> action) throws TransactionException {
Expand Down
Expand Up @@ -16,6 +16,8 @@

package org.springframework.transaction.reactive;

import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -46,6 +48,19 @@ public void commitWithMono() {
assertThat(tm.rollback).isFalse();
}

@Test
public void monoSubscriptionNotCancelled() {
AtomicBoolean cancelled = new AtomicBoolean();
TransactionalOperator operator = TransactionalOperator.create(tm, new DefaultTransactionDefinition());
Mono.just(true).doOnCancel(() -> cancelled.set(true)).as(operator::transactional)
.as(StepVerifier::create)
.expectNext(true)
.verifyComplete();
assertThat(tm.commit).isTrue();
assertThat(tm.rollback).isFalse();
assertThat(cancelled).isFalse();
}

@Test
public void rollbackWithMono() {
TransactionalOperator operator = TransactionalOperator.create(tm, new DefaultTransactionDefinition());
Expand Down