Skip to content

Commit

Permalink
Add support for Coroutines transactions
Browse files Browse the repository at this point in the history
This commits adds Coroutines extensions for
TransactionalOperator.transactional that accept suspending lambda or
Kotlin Flow parameters.

@transactional on suspending functions is not supported yet, spring-projectsgh-23575
has been created for that purpose.

This commit also renames invokeHandlerMethod to invokeSuspendingFunction
in CoroutinesUtils.

Closes spring-projectsgh-22915
  • Loading branch information
sdeleuze committed Sep 3, 2019
1 parent aeb857c commit f233a47
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ internal fun <T: Any> monoToDeferred(source: Mono<T>) =
GlobalScope.async(Dispatchers.Unconfined) { source.awaitFirstOrNull() }

/**
* Invoke an handler method converting suspending method to [Mono] or
* [reactor.core.publisher.Flux] if necessary.
* Invoke a suspending function converting it to [Mono] or [reactor.core.publisher.Flux]
* if necessary.
*
* @author Sebastien Deleuze
* @since 5.2
*/
@Suppress("UNCHECKED_CAST")
@ExperimentalCoroutinesApi
internal fun invokeHandlerMethod(method: Method, bean: Any, vararg args: Any?): Any? {
internal fun invokeSuspendingFunction(method: Method, bean: Any, vararg args: Any?): Any? {
val function = method.kotlinFunction!!
return if (function.isSuspend) {
val mono = mono(Dispatchers.Unconfined) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public Mono<Object> invoke(Message<?> message, Object... providedArgs) {
Method method = getBridgedMethod();
ReflectionUtils.makeAccessible(method);
if (KotlinDetector.isKotlinReflectPresent() && KotlinDetector.isKotlinType(method.getDeclaringClass())) {
value = CoroutinesUtils.invokeHandlerMethod(method, getBean(), args);
value = CoroutinesUtils.invokeSuspendingFunction(method, getBean(), args);
}
else {
value = method.invoke(getBean(), args);
Expand Down
7 changes: 7 additions & 0 deletions spring-tx/spring-tx.gradle
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
description = "Spring Transaction"

apply plugin: "kotlin"

dependencies {
compile(project(":spring-beans"))
compile(project(":spring-core"))
optional(project(":spring-aop"))
optional(project(":spring-context")) // for JCA, @EnableTransactionManagement
optional(project(":kotlin-coroutines"))
optional("javax.ejb:javax.ejb-api")
optional("javax.interceptor:javax.interceptor-api")
optional("javax.resource:javax.resource-api")
optional("javax.transaction:javax.transaction-api")
optional("com.ibm.websphere:uow")
optional("io.projectreactor:reactor-core")
optional("io.vavr:vavr")
optional("org.jetbrains.kotlin:kotlin-reflect")
optional("org.jetbrains.kotlin:kotlin-stdlib")
optional("org.jetbrains.kotlinx:kotlinx-coroutines-core")
optional("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
testCompile("org.aspectj:aspectjweaver")
testCompile("org.codehaus.groovy:groovy")
testCompile("org.eclipse.persistence:javax.persistence")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.concurrent.ConcurrentMap;

import io.vavr.control.Try;
import kotlin.reflect.KFunction;
import kotlin.reflect.jvm.ReflectJvmMapping;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Flux;
Expand All @@ -30,6 +32,7 @@
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.BeanFactoryAnnotationUtils;
import org.springframework.core.KotlinDetector;
import org.springframework.core.NamedThreadLocal;
import org.springframework.core.ReactiveAdapter;
import org.springframework.core.ReactiveAdapterRegistry;
Expand All @@ -41,6 +44,7 @@
import org.springframework.transaction.TransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.TransactionSystemException;
import org.springframework.transaction.TransactionUsageException;
import org.springframework.transaction.reactive.TransactionContextManager;
import org.springframework.transaction.support.CallbackPreferringPlatformTransactionManager;
import org.springframework.util.Assert;
Expand Down Expand Up @@ -322,6 +326,10 @@ protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targe
final InvocationCallback invocation) throws Throwable {

if (this.reactiveAdapterRegistry != null) {
if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) {
throw new TransactionUsageException("Annotated transactions on suspending functions are not supported," +
" use TransactionalOperator.transactional extensions instead.");
}
ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType());
if (adapter != null) {
return new ReactiveTransactionSupport(adapter).invokeWithinTransaction(method, targetClass, invocation);
Expand Down Expand Up @@ -809,6 +817,17 @@ public static Object evaluateTryFailure(Object retVal, TransactionAttribute txAt
}
}

/**
* Inner class to avoid a hard dependency on Kotlin at runtime.
*/
private static class KotlinDelegate {

static private boolean isSuspend(Method method) {
KFunction<?> function = ReflectJvmMapping.getKotlinFunction(method);
return function != null && function.isSuspend();
}
}


/**
* Delegate for Reactor-based management of transactional methods with a
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.springframework.transaction.reactive

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.reactive.awaitFirstOrNull
import kotlinx.coroutines.reactor.asFlux
import kotlinx.coroutines.reactor.mono

/**
* Coroutines variant of [TransactionalOperator.transactional] with a [Flow] parameter.
*
* @author Sebastien Deleuze
* @since 5.2
*/
@ExperimentalCoroutinesApi
fun <T : Any> TransactionalOperator.transactional(flow: Flow<T>): Flow<T> =
transactional(flow.asFlux()).asFlow()

/**
* Coroutines variant of [TransactionalOperator.transactional] with a suspending lambda
* parameter.
*
* @author Sebastien Deleuze
* @since 5.2
*/
suspend fun <T : Any> TransactionalOperator.transactional(f: suspend () -> T?): T? =
transactional(mono(Dispatchers.Unconfined) { f() }).awaitFirstOrNull()
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.transaction.reactive

import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.runBlocking
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.fail
import org.springframework.transaction.support.DefaultTransactionDefinition

class TransactionalOperatorExtensionsTests {

private val tm = ReactiveTestTransactionManager(false, true)

@Test
fun commitWithSuspendingFunction() {
val operator = TransactionalOperator.create(tm, DefaultTransactionDefinition())
runBlocking {
operator.transactional {
delay(1)
true
}
}
assertThat(tm.commit).isTrue()
assertThat(tm.rollback).isFalse()
}

@Test
fun rollbackWithSuspendingFunction() {
val operator = TransactionalOperator.create(tm, DefaultTransactionDefinition())
runBlocking {
try {
operator.transactional {
delay(1)
throw IllegalStateException()
}
} catch (ex: IllegalStateException) {
assertThat(tm.commit).isFalse()
assertThat(tm.rollback).isTrue()
return@runBlocking
}
fail("")
}
}

@Test
@ExperimentalCoroutinesApi
fun commitWithFlow() {
val operator = TransactionalOperator.create(tm, DefaultTransactionDefinition())
val flow = flow {
emit(1)
emit(2)
emit(3)
emit(4)
}
runBlocking {
val list = operator.transactional(flow).toList()
assertThat(list).hasSize(4)
}
assertThat(tm.commit).isTrue()
assertThat(tm.rollback).isFalse()
}

@Test
@ExperimentalCoroutinesApi
fun rollbackWithFlow() {
val operator = TransactionalOperator.create(tm, DefaultTransactionDefinition())
val flow = flow<Int> {
delay(1)
throw IllegalStateException()
}
runBlocking {
try {
operator.transactional(flow).toList()
} catch (ex: IllegalStateException) {
assertThat(tm.commit).isFalse()
assertThat(tm.rollback).isTrue()
return@runBlocking
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public Mono<HandlerResult> invoke(
ReflectionUtils.makeAccessible(getBridgedMethod());
Method method = getBridgedMethod();
if (KotlinDetector.isKotlinReflectPresent() && KotlinDetector.isKotlinType(method.getDeclaringClass())) {
value = CoroutinesUtils.invokeHandlerMethod(method, getBean(), args);
value = CoroutinesUtils.invokeSuspendingFunction(method, getBean(), args);
}
else {
value = method.invoke(getBean(), args);
Expand Down
6 changes: 6 additions & 0 deletions src/docs/asciidoc/languages/kotlin.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,12 @@ class UserHandler(builder: WebClient.Builder) {
}
----

=== Transactions

Transactions on Coroutines are supported via the programmatic variant of the Reactive
transaction management provided as of Spring Framework 5.2. `TransactionalOperator.transactional`
extensions with suspending lambda and Kotlin `Flow` parameter are provided for that purpose.

[[kotlin-spring-projects-in-kotlin]]
== Spring Projects in Kotlin

Expand Down

0 comments on commit f233a47

Please sign in to comment.