Skip to content

Commit

Permalink
Rollback a reactive transaction on pipeline cancel
Browse files Browse the repository at this point in the history
  • Loading branch information
rpuch committed May 18, 2020
1 parent 0ddc62e commit 95c2872
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 3 deletions.
Expand Up @@ -876,7 +876,7 @@ public Object invokeWithinTransaction(Method method, @Nullable Class<?> targetCl
},
this::commitTransactionAfterReturning,
(txInfo, err) -> Mono.empty(),
this::commitTransactionAfterReturning)
this::rollbackTransactionDueToCancel)
.onErrorResume(ex ->
completeTransactionAfterThrowing(it, ex).then(Mono.error(ex)));
}
Expand Down Expand Up @@ -906,7 +906,7 @@ public Object invokeWithinTransaction(Method method, @Nullable Class<?> targetCl
},
this::commitTransactionAfterReturning,
(txInfo, ex) -> Mono.empty(),
this::commitTransactionAfterReturning)
this::rollbackTransactionDueToCancel)
.onErrorResume(ex ->
completeTransactionAfterThrowing(it, ex).then(Mono.error(ex)));
}
Expand Down Expand Up @@ -973,6 +973,16 @@ private Mono<Void> commitTransactionAfterReturning(@Nullable ReactiveTransaction
return Mono.empty();
}

private Mono<Void> rollbackTransactionDueToCancel(@Nullable ReactiveTransactionInfo txInfo) {
if (txInfo != null && txInfo.getReactiveTransaction() != null) {
if (logger.isDebugEnabled()) {
logger.debug("Rolling transaction back for [" + txInfo.getJoinpointIdentification() + "] due to cancel");
}
return txInfo.getTransactionManager().rollback(txInfo.getReactiveTransaction());
}
return Mono.empty();
}

private Mono<Void> completeTransactionAfterThrowing(@Nullable ReactiveTransactionInfo txInfo, Throwable ex) {
if (txInfo != null && txInfo.getReactiveTransaction() != null) {
if (logger.isTraceEnabled()) {
Expand Down
Expand Up @@ -79,7 +79,7 @@ public <T> Mono<T> transactional(Mono<T> mono) {
// Need re-wrapping of ReactiveTransaction until we get hold of the exception
// through usingWhen.
return status.flatMap(it -> Mono.usingWhen(Mono.just(it), ignore -> mono,
this.transactionManager::commit, (res, err) -> Mono.empty(), this.transactionManager::commit)
this.transactionManager::commit, (res, err) -> Mono.empty(), this.transactionManager::rollback)
.onErrorResume(ex -> rollbackOnException(it, ex).then(Mono.error(ex))));
})
.subscriberContext(TransactionContextManager.getOrCreateContext())
Expand Down

0 comments on commit 95c2872

Please sign in to comment.