Skip to content

Commit

Permalink
Support suspending functions annotated with @transactional
Browse files Browse the repository at this point in the history
This commit makes TransactionInterceptor and TransactionAspectSupport
Coroutines aware, adapting Reactive transaction support to Coroutines.

Suspending functions returning a Flow are handled like Flux, for other
return types, they are handled like Mono.

Closes spring-projectsgh-23575
  • Loading branch information
sdeleuze committed Oct 25, 2020
1 parent 73eefea commit 200af61
Show file tree
Hide file tree
Showing 4 changed files with 449 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
import java.util.concurrent.ConcurrentMap;

import io.vavr.control.Try;
import kotlin.reflect.KFunction;
import kotlin.reflect.jvm.ReflectJvmMapping;
import kotlin.coroutines.Continuation;
import kotlinx.coroutines.reactive.AwaitKt;
import kotlinx.coroutines.reactive.ReactiveFlowKt;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand All @@ -33,6 +35,7 @@
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.BeanFactoryAnnotationUtils;
import org.springframework.core.KotlinDetector;
import org.springframework.core.MethodParameter;
import org.springframework.core.NamedThreadLocal;
import org.springframework.core.ReactiveAdapter;
import org.springframework.core.ReactiveAdapterRegistry;
Expand All @@ -44,7 +47,6 @@
import org.springframework.transaction.TransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.TransactionSystemException;
import org.springframework.transaction.TransactionUsageException;
import org.springframework.transaction.reactive.TransactionContextManager;
import org.springframework.transaction.support.CallbackPreferringPlatformTransactionManager;
import org.springframework.util.Assert;
Expand Down Expand Up @@ -96,6 +98,8 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
*/
private static final Object DEFAULT_TRANSACTION_MANAGER_KEY = new Object();

private static final String COROUTINES_FLOW_CLASS_NAME = "kotlinx.coroutines.flow.Flow";

/**
* Vavr library present on the classpath?
*/
Expand Down Expand Up @@ -336,21 +340,20 @@ protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targe
final TransactionManager tm = determineTransactionManager(txAttr);

if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
boolean isSuspendingFunction = KotlinDetector.isSuspendingFunction(method);
boolean hasSuspendingFlowReturnType = isSuspendingFunction && COROUTINES_FLOW_CLASS_NAME.equals(new MethodParameter(method, -1).getParameterType().getName());
ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {
if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) {
throw new TransactionUsageException(
"Unsupported annotated transaction on suspending function detected: " + method +
". Use TransactionalOperator.transactional extensions instead.");
}
ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType());
Class<?> reactiveType = (isSuspendingFunction ? (hasSuspendingFlowReturnType ? Flux.class : Mono.class) : method.getReturnType());
ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(reactiveType);
if (adapter == null) {
throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " +
method.getReturnType());
}
return new ReactiveTransactionSupport(adapter);
});
return txSupport.invokeWithinTransaction(
method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm);
Publisher<?> publisher = (Publisher<?>) txSupport.invokeWithinTransaction(method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm);
return (isSuspendingFunction ? (hasSuspendingFlowReturnType ? KotlinDelegate.asFlow(publisher) :
KotlinDelegate.awaitSingleOrNull(publisher, ((CoroutinesInvocationCallback) invocation).getContinuation())) : publisher);
}

PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
Expand Down Expand Up @@ -785,6 +788,11 @@ protected interface InvocationCallback {
Object proceedWithInvocation() throws Throwable;
}

protected interface CoroutinesInvocationCallback extends InvocationCallback {

Object getContinuation();
}


/**
* Internal holder class for a Throwable in a callback transaction model.
Expand Down Expand Up @@ -837,9 +845,13 @@ public static Object evaluateTryFailure(Object retVal, TransactionAttribute txAt
*/
private static class KotlinDelegate {

private static boolean isSuspend(Method method) {
KFunction<?> function = ReflectJvmMapping.getKotlinFunction(method);
return function != null && function.isSuspend();
private static Object asFlow(Publisher<?> publisher) {
return ReactiveFlowKt.asFlow(publisher);
}

@SuppressWarnings("unchecked")
private static Object awaitSingleOrNull(Publisher<?> publisher, Object continuation) {
return AwaitKt.awaitSingleOrNull(publisher, (Continuation<Object>) continuation);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@

import org.springframework.aop.support.AopUtils;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.core.CoroutinesUtils;
import org.springframework.core.KotlinDetector;
import org.springframework.lang.Nullable;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionManager;
Expand Down Expand Up @@ -115,6 +117,19 @@ public Object invoke(MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

// Adapt to TransactionAspectSupport's invokeWithinTransaction...
if (KotlinDetector.isSuspendingFunction(invocation.getMethod())) {
InvocationCallback callback = new CoroutinesInvocationCallback() {
@Override
public Object proceedWithInvocation() {
return CoroutinesUtils.invokeSuspendingFunction(invocation.getMethod(), invocation.getThis(), invocation.getArguments());
}
@Override
public Object getContinuation() {
return invocation.getArguments()[invocation.getArguments().length - 1];
}
};
return invokeWithinTransaction(invocation.getMethod(), targetClass, callback);
}
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}

Expand Down

0 comments on commit 200af61

Please sign in to comment.