From 95c2872c0c3a8bebec06b413001148b28bc78f2a Mon Sep 17 00:00:00 2001 From: Roman Puchkovskiy Date: Mon, 18 May 2020 11:17:48 +0400 Subject: [PATCH] Rollback a reactive transaction on pipeline cancel ... instead of committing. This fixes atomicity violation problems. See https://github.com/spring-projects/spring-framework/issues/25091 and https://stackoverflow.com/questions/61822249/spring-reactive-transaction-gets-committed-on-cancel-producing-partial-commits?noredirect=1#comment109381171_61822249 --- .../interceptor/TransactionAspectSupport.java | 14 ++++++++++++-- .../reactive/TransactionalOperatorImpl.java | 2 +- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java b/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java index c2d9bf5599ab..23de9f3c12c9 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java +++ b/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java @@ -876,7 +876,7 @@ public Object invokeWithinTransaction(Method method, @Nullable Class targetCl }, this::commitTransactionAfterReturning, (txInfo, err) -> Mono.empty(), - this::commitTransactionAfterReturning) + this::rollbackTransactionDueToCancel) .onErrorResume(ex -> completeTransactionAfterThrowing(it, ex).then(Mono.error(ex))); } @@ -906,7 +906,7 @@ public Object invokeWithinTransaction(Method method, @Nullable Class targetCl }, this::commitTransactionAfterReturning, (txInfo, ex) -> Mono.empty(), - this::commitTransactionAfterReturning) + this::rollbackTransactionDueToCancel) .onErrorResume(ex -> completeTransactionAfterThrowing(it, ex).then(Mono.error(ex))); } @@ -973,6 +973,16 @@ private Mono commitTransactionAfterReturning(@Nullable ReactiveTransaction return Mono.empty(); } + private Mono rollbackTransactionDueToCancel(@Nullable ReactiveTransactionInfo txInfo) { + if (txInfo != null && txInfo.getReactiveTransaction() != null) { + if (logger.isDebugEnabled()) { + logger.debug("Rolling transaction back for [" + txInfo.getJoinpointIdentification() + "] due to cancel"); + } + return txInfo.getTransactionManager().rollback(txInfo.getReactiveTransaction()); + } + return Mono.empty(); + } + private Mono completeTransactionAfterThrowing(@Nullable ReactiveTransactionInfo txInfo, Throwable ex) { if (txInfo != null && txInfo.getReactiveTransaction() != null) { if (logger.isTraceEnabled()) { diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java index 26e343d285b6..0f9851daa780 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java @@ -79,7 +79,7 @@ public Mono transactional(Mono mono) { // Need re-wrapping of ReactiveTransaction until we get hold of the exception // through usingWhen. return status.flatMap(it -> Mono.usingWhen(Mono.just(it), ignore -> mono, - this.transactionManager::commit, (res, err) -> Mono.empty(), this.transactionManager::commit) + this.transactionManager::commit, (res, err) -> Mono.empty(), this.transactionManager::rollback) .onErrorResume(ex -> rollbackOnException(it, ex).then(Mono.error(ex)))); }) .subscriberContext(TransactionContextManager.getOrCreateContext())