Skip to content

Commit

Permalink
fix: correct Kotlin coroutine interceptions (#7247)
Browse files Browse the repository at this point in the history
  • Loading branch information
dstepanov committed Apr 28, 2022
1 parent 9fa427c commit eec151b
Show file tree
Hide file tree
Showing 10 changed files with 350 additions and 14 deletions.
Expand Up @@ -100,13 +100,8 @@ public Argument<?> returnTypeValue() {

@Override
public CompletableFuture<Object> interceptResultAsCompletionStage() {
CompletableFutureContinuation completableFutureContinuation;
if (continuation instanceof CompletableFutureContinuation) {
completableFutureContinuation = (CompletableFutureContinuation) continuation;
} else {
completableFutureContinuation = new CompletableFutureContinuation(continuation);
replaceContinuation.accept(completableFutureContinuation);
}
CompletableFutureContinuation completableFutureContinuation = new CompletableFutureContinuation(continuation);
replaceContinuation.accept(completableFutureContinuation);
Object result = context.proceed();
replaceContinuation.accept(continuation);
if (result != KotlinUtils.COROUTINE_SUSPENDED) {
Expand All @@ -117,13 +112,8 @@ public CompletableFuture<Object> interceptResultAsCompletionStage() {

@Override
public CompletableFuture<Object> interceptResultAsCompletionStage(Interceptor<?, ?> from) {
CompletableFutureContinuation completableFutureContinuation;
if (continuation instanceof CompletableFutureContinuation) {
completableFutureContinuation = (CompletableFutureContinuation) continuation;
} else {
completableFutureContinuation = new CompletableFutureContinuation(continuation);
replaceContinuation.accept(completableFutureContinuation);
}
CompletableFutureContinuation completableFutureContinuation = new CompletableFutureContinuation(continuation);
replaceContinuation.accept(completableFutureContinuation);
Object result = context.proceed(from);
replaceContinuation.accept(continuation);
if (result != KotlinUtils.COROUTINE_SUSPENDED) {
Expand Down
@@ -0,0 +1,29 @@
/*
* Copyright 2017-2020 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.docs.server.suspend.multiple

@MyRepository
interface CustomRepository {

suspend fun xyz(): String

suspend fun abc(): String

suspend fun count1(): String

suspend fun count2(): String

}
@@ -0,0 +1,48 @@
/*
* Copyright 2017-2019 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.docs.server.suspend.multiple

import io.kotest.core.spec.style.StringSpec
import io.kotest.matchers.ints.shouldBeExactly
import io.kotest.matchers.shouldBe
import io.micronaut.context.ApplicationContext
import kotlinx.coroutines.runBlocking

class InterceptorSpec : StringSpec() {

val context = autoClose(
ApplicationContext.run()
)

private var myService = context.getBean(MyService::class.java)

init {
"test correct interceptors calls" {
runBlocking {
myService.someCall()
MyService.events.size shouldBeExactly 8
MyService.events[0] shouldBe "intercept1-start"
MyService.events[1] shouldBe "intercept2-start"
MyService.events[2] shouldBe "repository-abc"
MyService.events[3] shouldBe "repository-xyz"
MyService.events[4] shouldBe "intercept2-end"
MyService.events[5] shouldBe "intercept1-end"
MyService.events[6] shouldBe "repository-count1"
MyService.events[7] shouldBe "repository-count2"
}
}
}
}
@@ -0,0 +1,25 @@
/*
* Copyright 2017-2020 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.docs.server.suspend.multiple

import io.micronaut.aop.Introduction
import jakarta.inject.Singleton

@MustBeDocumented
@kotlin.annotation.Retention(AnnotationRetention.RUNTIME)
@Introduction
@Singleton
annotation class MyRepository()
@@ -0,0 +1,46 @@
/*
* Copyright 2017-2020 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.docs.server.suspend.multiple

import io.micronaut.aop.InterceptedMethod
import io.micronaut.aop.InterceptorBean
import io.micronaut.aop.MethodInterceptor
import io.micronaut.aop.MethodInvocationContext
import jakarta.inject.Singleton
import java.io.IOException
import java.util.concurrent.CompletableFuture

@InterceptorBean(MyRepository::class)
@Singleton
class MyRepositoryInterceptorImpl : MethodInterceptor<Any, Any> {
override fun intercept(context: MethodInvocationContext<Any, Any>?): Any? {
val interceptedMethod = InterceptedMethod.of(context)
return try {
if (interceptedMethod.resultType() == InterceptedMethod.ResultType.COMPLETION_STAGE) {
MyService.events.add("repository-" + context!!.methodName)
val cf: CompletableFuture<String> = CompletableFuture.supplyAsync{
Thread.sleep(1000)
context!!.methodName
}
interceptedMethod.handleResult(cf)
} else {
throw IllegalStateException()
}
} catch (e: Exception) {
interceptedMethod.handleException<Exception>(e)
}
}
}
@@ -0,0 +1,35 @@
package io.micronaut.docs.server.suspend.multiple

import jakarta.inject.Singleton
import java.util.*
import kotlin.collections.ArrayList

@Singleton
open class MyService(
private val repository: CustomRepository
) {

companion object {
val events: MutableList<String> = Collections.synchronizedList(ArrayList())
}

open suspend fun someCall() {
// Simulate accessing two different data-source repositories using two transactions
tx1()
// Call another coroutine
repository.count1()
repository.count2()
}

@Transaction1
open suspend fun tx1() {
tx2()
}

@Transaction2
open suspend fun tx2() {
repository.abc()
repository.xyz()
}

}
@@ -0,0 +1,30 @@
/*
* Copyright 2017-2020 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.docs.server.suspend.multiple

import io.micronaut.aop.Around
import kotlin.annotation.AnnotationRetention.RUNTIME
import kotlin.annotation.AnnotationTarget.CLASS
import kotlin.annotation.AnnotationTarget.FILE
import kotlin.annotation.AnnotationTarget.FUNCTION
import kotlin.annotation.AnnotationTarget.PROPERTY_GETTER
import kotlin.annotation.AnnotationTarget.PROPERTY_SETTER

@MustBeDocumented
@Retention(RUNTIME)
@Target(CLASS, FILE, FUNCTION, PROPERTY_GETTER, PROPERTY_SETTER)
@Around
annotation class Transaction1
@@ -0,0 +1,52 @@
/*
* Copyright 2017-2020 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.docs.server.suspend.multiple

import io.micronaut.aop.InterceptedMethod
import io.micronaut.aop.InterceptorBean
import io.micronaut.aop.MethodInterceptor
import io.micronaut.aop.MethodInvocationContext
import jakarta.inject.Singleton
import java.util.concurrent.CompletableFuture
import java.util.function.BiConsumer

@InterceptorBean(Transaction1::class)
@Singleton
class Transaction1Interceptor : MethodInterceptor<Any, Any> {
override fun intercept(context: MethodInvocationContext<Any, Any>): Any? {
val interceptedMethod = InterceptedMethod.of(context)
return try {
return if (interceptedMethod.resultType() == InterceptedMethod.ResultType.COMPLETION_STAGE) {
MyService.events.add("intercept1-start")
val completionStage = interceptedMethod.interceptResultAsCompletionStage()
val cf = CompletableFuture<Any>()
completionStage.whenComplete { value, throwable ->
MyService.events.add("intercept1-end")
if (throwable == null) {
cf.complete(value)
} else {
cf.completeExceptionally(throwable)
}
}
interceptedMethod.handleResult(cf)
} else {
throw IllegalStateException()
}
} catch (e: Exception) {
interceptedMethod.handleException<Exception>(e)
}
}
}
@@ -0,0 +1,30 @@
/*
* Copyright 2017-2020 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.docs.server.suspend.multiple

import io.micronaut.aop.Around
import kotlin.annotation.AnnotationRetention.RUNTIME
import kotlin.annotation.AnnotationTarget.CLASS
import kotlin.annotation.AnnotationTarget.FILE
import kotlin.annotation.AnnotationTarget.FUNCTION
import kotlin.annotation.AnnotationTarget.PROPERTY_GETTER
import kotlin.annotation.AnnotationTarget.PROPERTY_SETTER

@MustBeDocumented
@Retention(RUNTIME)
@Target(CLASS, FILE, FUNCTION, PROPERTY_GETTER, PROPERTY_SETTER)
@Around
annotation class Transaction2
@@ -0,0 +1,51 @@
/*
* Copyright 2017-2020 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.docs.server.suspend.multiple

import io.micronaut.aop.InterceptedMethod
import io.micronaut.aop.InterceptorBean
import io.micronaut.aop.MethodInterceptor
import io.micronaut.aop.MethodInvocationContext
import jakarta.inject.Singleton
import java.util.concurrent.CompletableFuture

@InterceptorBean(Transaction2::class)
@Singleton
class Transaction2Interceptor : MethodInterceptor<Any, Any> {
override fun intercept(context: MethodInvocationContext<Any, Any>): Any? {
val interceptedMethod = InterceptedMethod.of(context)
return try {
return if (interceptedMethod.resultType() == InterceptedMethod.ResultType.COMPLETION_STAGE) {
MyService.events.add("intercept2-start")
val completionStage = interceptedMethod.interceptResultAsCompletionStage()
val cf = CompletableFuture<Any>()
completionStage.whenComplete { value, throwable ->
MyService.events.add("intercept2-end")
if (throwable == null) {
cf.complete(value)
} else {
cf.completeExceptionally(throwable)
}
}
interceptedMethod.handleResult(cf)
} else {
throw IllegalStateException()
}
} catch (e: Exception) {
interceptedMethod.handleException<Exception>(e)
}
}
}

0 comments on commit eec151b

Please sign in to comment.