diff --git a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java index 58ec934c42e..f6a39c944c4 100644 --- a/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java +++ b/dubbo-plugin/dubbo-reactive/src/main/java/org/apache/dubbo/reactive/calls/ReactorServerCalls.java @@ -19,10 +19,11 @@ import org.apache.dubbo.common.stream.StreamObserver; import org.apache.dubbo.reactive.ServerTripleReactorPublisher; import org.apache.dubbo.reactive.ServerTripleReactorSubscriber; +import org.apache.dubbo.rpc.StatusRpcException; +import org.apache.dubbo.rpc.TriRpcStatus; import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver; import org.apache.dubbo.rpc.protocol.tri.observer.ServerCallToObserverAdapter; -import java.util.concurrent.CompletableFuture; import java.util.function.Function; import reactor.core.publisher.Flux; @@ -43,16 +44,18 @@ private ReactorServerCalls() {} * @param func service implementation */ public static void oneToOne(T request, StreamObserver responseObserver, Function, Mono> func) { - func.apply(Mono.just(request)).subscribe(res -> { - CompletableFuture.completedFuture(res).whenComplete((r, t) -> { - if (t != null) { - responseObserver.onError(t); - } else { - responseObserver.onNext(r); - responseObserver.onCompleted(); - } - }); - }); + try { + func.apply(Mono.just(request)) + .subscribe( + res -> { + responseObserver.onNext(res); + responseObserver.onCompleted(); + }, + throwable -> doOnResponseHasException(throwable, responseObserver), + () -> doOnResponseHasException(TriRpcStatus.NOT_FOUND.asException(), responseObserver)); + } catch (Throwable throwable) { + doOnResponseHasException(throwable, responseObserver); + } } /** @@ -131,4 +134,10 @@ public static StreamObserver manyToMany( return serverPublisher; } + + private static void doOnResponseHasException(Throwable throwable, StreamObserver responseObserver) { + StatusRpcException statusRpcException = + TriRpcStatus.getStatus(throwable).asException(); + responseObserver.onError(statusRpcException); + } }