Skip to content

Commit

Permalink
Fix unwrapping logic for ResponseEntity<Flux>
Browse files Browse the repository at this point in the history
This commit makes sure that the response returned by coroutine handler
methods that return ResponseEntity<Flux> is unwrapped correctly.

Closes gh-27809
  • Loading branch information
poutsma committed Mar 11, 2022
1 parent af977c0 commit a3e23cd
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 1 deletion.
Expand Up @@ -21,6 +21,7 @@
import java.util.Set;

import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import org.springframework.core.KotlinDetector;
Expand Down Expand Up @@ -130,7 +131,8 @@ protected Mono<Void> writeBody(@Nullable Object body, MethodParameter bodyParame
if (adapter != null) {
publisher = adapter.toPublisher(body);
boolean isUnwrapped = KotlinDetector.isSuspendingFunction(bodyParameter.getMethod()) &&
!COROUTINES_FLOW_CLASS_NAME.equals(bodyType.toClass().getName());
!COROUTINES_FLOW_CLASS_NAME.equals(bodyType.toClass().getName()) &&
!Flux.class.equals(bodyType.toClass());
ResolvableType genericType = isUnwrapped ? bodyType : bodyType.getGeneric();
elementType = getElementType(adapter, genericType);
actualElementType = elementType;
Expand Down
Expand Up @@ -36,6 +36,8 @@ import org.springframework.web.bind.annotation.RestController
import org.springframework.web.client.HttpServerErrorException
import org.springframework.web.reactive.config.EnableWebFlux
import org.springframework.web.testfixture.http.server.reactive.bootstrap.HttpServer
import reactor.core.publisher.Flux
import java.time.Duration

class CoroutinesIntegrationTests : AbstractRequestMappingIntegrationTests() {

Expand Down Expand Up @@ -110,6 +112,25 @@ class CoroutinesIntegrationTests : AbstractRequestMappingIntegrationTests() {
}
}

@ParameterizedHttpServerTest
fun `Suspending handler method returning ResponseEntity of Flux `(httpServer: HttpServer) {
startServer(httpServer)

val entity = performGet<String>("/entity-flux", HttpHeaders.EMPTY, String::class.java)
assertThat(entity.statusCode).isEqualTo(HttpStatus.OK)
assertThat(entity.body).isEqualTo("01234")
}

@ParameterizedHttpServerTest
fun `Suspending handler method returning ResponseEntity of Flow`(httpServer: HttpServer) {
startServer(httpServer)

val entity = performGet<String>("/entity-flow", HttpHeaders.EMPTY, String::class.java)
assertThat(entity.statusCode).isEqualTo(HttpStatus.OK)
assertThat(entity.body).isEqualTo("foobar")
}


@Configuration
@EnableWebFlux
@ComponentScan(resourcePattern = "**/CoroutinesIntegrationTests*")
Expand Down Expand Up @@ -167,6 +188,25 @@ class CoroutinesIntegrationTests : AbstractRequestMappingIntegrationTests() {
throw IllegalStateException()
}

@GetMapping("/entity-flux")
suspend fun entityFlux() : ResponseEntity<Flux<String>> {
val strings = Flux.interval(Duration.ofMillis(100)).take(5)
.map { l -> l.toString() }
delay(1)
return ResponseEntity.ok().body(strings)
}

@GetMapping("/entity-flow")
suspend fun entityFlow() : ResponseEntity<Flow<String>> {
val strings = flow {
emit("foo")
delay(1)
emit("bar")
delay(1)
}
return ResponseEntity.ok().body(strings)
}

}


Expand Down

0 comments on commit a3e23cd

Please sign in to comment.