Skip to content

Commit

Permalink
Refine Coroutines support in HttpServiceProxyFactory
Browse files Browse the repository at this point in the history
  • Loading branch information
sdeleuze committed Feb 7, 2023
1 parent 9f061f1 commit cb63164
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.Objects;

import kotlin.Unit;
import kotlin.coroutines.Continuation;
import kotlin.coroutines.CoroutineContext;
import kotlin.jvm.JvmClassMappingKt;
import kotlin.reflect.KClass;
Expand Down Expand Up @@ -68,10 +67,6 @@ public static <T> Deferred<T> monoToDeferred(Mono<T> source) {
(scope, continuation) -> MonoKt.awaitSingleOrNull(source, continuation));
}

public static <T> Object awaitSingleOrNull(Mono<T> source, Continuation<T> continuation) {
return MonoKt.awaitSingleOrNull(source, continuation);
}

/**
* Invoke a suspending function and converts it to {@link Mono} or
* {@link Flux}. Uses an {@linkplain Dispatchers#getUnconfined() unconfined}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ private static MethodParameter[] initMethodParameters(Method method) {
if (count == 0) {
return new MethodParameter[0];
}

if (KotlinDetector.isSuspendingFunction(method)) {
count -= 1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@
import java.util.stream.Collectors;

import kotlin.coroutines.Continuation;
import kotlinx.coroutines.reactor.MonoKt;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import reactor.core.publisher.Mono;

import org.springframework.aop.framework.ProxyFactory;
import org.springframework.aop.framework.ReflectiveMethodInvocation;
import org.springframework.core.CoroutinesUtils;
import org.springframework.core.KotlinDetector;
import org.springframework.core.MethodIntrospector;
import org.springframework.core.ReactiveAdapterRegistry;
Expand Down Expand Up @@ -268,18 +268,13 @@ private HttpServiceMethodInterceptor(List<HttpServiceMethod> methods) {
}

@Override
@SuppressWarnings({"unchecked"})
public Object invoke(MethodInvocation invocation) throws Throwable {
Method method = invocation.getMethod();
HttpServiceMethod httpServiceMethod = this.httpServiceMethods.get(method);
if (httpServiceMethod != null) {
if (KotlinDetector.isSuspendingFunction(method)) {
Object[] arguments = getSuspendedFunctionArgs(invocation.getArguments());
Continuation<Object> continuation = resolveContinuationArgument(invocation.getArguments());
Mono<Object> wrapped = (Mono<Object>) httpServiceMethod.invoke(arguments);
return CoroutinesUtils.awaitSingleOrNull(wrapped, continuation);
return KotlinDelegate.invokeSuspendingFunction(invocation, httpServiceMethod);
}

return httpServiceMethod.invoke(invocation.getArguments());
}
if (method.isDefault()) {
Expand All @@ -290,13 +285,23 @@ public Object invoke(MethodInvocation invocation) throws Throwable {
}
throw new IllegalStateException("Unexpected method invocation: " + method);
}
}

@SuppressWarnings({"unchecked"})
private static <T> Continuation<T> resolveContinuationArgument(Object[] args) {
return (Continuation<T>) args[args.length - 1];
/**
* Inner class to avoid a hard dependency on Kotlin at runtime.
*/
@SuppressWarnings("unchecked")
private static class KotlinDelegate {

public static Object invokeSuspendingFunction(MethodInvocation invocation, HttpServiceMethod httpServiceMethod) {
Object[] rawArguments = invocation.getArguments();
Object[] arguments = resolveArguments(rawArguments);
Continuation<Object> continuation = (Continuation<Object>) rawArguments[rawArguments.length - 1];
Mono<Object> wrapped = (Mono<Object>) httpServiceMethod.invoke(arguments);
return MonoKt.awaitSingleOrNull(wrapped, continuation);
}

private static Object[] getSuspendedFunctionArgs(Object[] args) {
private static Object[] resolveArguments(Object[] args) {
Object[] functionArgs = new Object[args.length - 1];
System.arraycopy(args, 0, functionArgs, 0, args.length - 1);
return functionArgs;
Expand Down

0 comments on commit cb63164

Please sign in to comment.