From 93f8dc76ab1f986672b81d6466f9d2c6ffeef336 Mon Sep 17 00:00:00 2001 From: Phillip Webb Date: Wed, 30 Nov 2022 20:29:54 -0800 Subject: [PATCH] Fix Neo4jReactiveHealthIndicator NoSuchElementException Update `Neo4jReactiveHealthIndicator` to ensure that `result.records()` is called before `result.consume()`. Prior to this commit, the indicator used `zipWith` to merge records with a summary. This worked with the previous RxJava based driver, but fails with the Reactor based driver due to a `NoSuchElementException: Source was empty` error. Fixes gh-33428 --- .../neo4j/Neo4jReactiveHealthIndicator.java | 46 +++++++++++++++---- 1 file changed, 37 insertions(+), 9 deletions(-) diff --git a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/neo4j/Neo4jReactiveHealthIndicator.java b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/neo4j/Neo4jReactiveHealthIndicator.java index 99b6c0801dbb..55cb87689212 100644 --- a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/neo4j/Neo4jReactiveHealthIndicator.java +++ b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/neo4j/Neo4jReactiveHealthIndicator.java @@ -19,9 +19,11 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.neo4j.driver.Driver; +import org.neo4j.driver.Record; import org.neo4j.driver.exceptions.SessionExpiredException; import org.neo4j.driver.reactivestreams.ReactiveResult; import org.neo4j.driver.reactivestreams.ReactiveSession; +import org.neo4j.driver.summary.ResultSummary; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.retry.Retry; @@ -36,6 +38,7 @@ * * @author Michael J. Simons * @author Stephane Nicoll + * @author Phillip Webb * @since 2.4.0 */ public final class Neo4jReactiveHealthIndicator extends AbstractReactiveHealthIndicator { @@ -63,15 +66,40 @@ protected Mono doHealthCheck(Health.Builder builder) { } Mono runHealthCheckQuery() { - // We use WRITE here to make sure UP is returned for a server that supports - // all possible workloads - return Mono.using(() -> this.driver.session(ReactiveSession.class, Neo4jHealthIndicator.DEFAULT_SESSION_CONFIG), - (session) -> { - Mono resultMono = Flux.from(session.run(Neo4jHealthIndicator.CYPHER)).single(); - return resultMono - .flatMapMany((result) -> Flux.from(result.records()).zipWith(Flux.from(result.consume()))) - .map((tuple) -> new Neo4jHealthDetails(tuple.getT1(), tuple.getT2())).single(); - }, ReactiveSession::close); + return Mono.using(this::session, this::healthDetails, ReactiveSession::close); + } + + private ReactiveSession session() { + return this.driver.session(ReactiveSession.class, Neo4jHealthIndicator.DEFAULT_SESSION_CONFIG); + } + + private Mono healthDetails(ReactiveSession session) { + return Mono.from(session.run(Neo4jHealthIndicator.CYPHER)).flatMap(this::healthDetails); + } + + private Mono healthDetails(ReactiveResult result) { + Flux records = Flux.from(result.records()); + Mono summary = Mono.from(result.consume()); + Neo4jHealthDetailsBuilder builder = new Neo4jHealthDetailsBuilder(); + return records.single().doOnNext(builder::record).then(summary).map(builder::build); + } + + /** + * Builder used to create a {@link Neo4jHealthDetails} from a {@link Record} and a + * {@link ResultSummary}. + */ + private static class Neo4jHealthDetailsBuilder { + + private Record record; + + void record(Record record) { + this.record = record; + } + + private Neo4jHealthDetails build(ResultSummary summary) { + return new Neo4jHealthDetails(this.record, summary); + } + } }