Skip to content

Commit

Permalink
Fix Neo4jReactiveHealthIndicator NoSuchElementException
Browse files Browse the repository at this point in the history
Update `Neo4jReactiveHealthIndicator` to ensure that `result.records()`
is called before `result.consume()`. Prior to this commit, the indicator
used `zipWith` to merge records with a summary. This worked with the
previous RxJava based driver, but fails with the Reactor based driver
due to a `NoSuchElementException: Source was empty` error.

Fixes gh-33428
  • Loading branch information
philwebb committed Dec 1, 2022
1 parent 8849f72 commit 93f8dc7
Showing 1 changed file with 37 additions and 9 deletions.
Expand Up @@ -19,9 +19,11 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.neo4j.driver.Driver;
import org.neo4j.driver.Record;
import org.neo4j.driver.exceptions.SessionExpiredException;
import org.neo4j.driver.reactivestreams.ReactiveResult;
import org.neo4j.driver.reactivestreams.ReactiveSession;
import org.neo4j.driver.summary.ResultSummary;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
Expand All @@ -36,6 +38,7 @@
*
* @author Michael J. Simons
* @author Stephane Nicoll
* @author Phillip Webb
* @since 2.4.0
*/
public final class Neo4jReactiveHealthIndicator extends AbstractReactiveHealthIndicator {
Expand Down Expand Up @@ -63,15 +66,40 @@ protected Mono<Health> doHealthCheck(Health.Builder builder) {
}

Mono<Neo4jHealthDetails> runHealthCheckQuery() {
// We use WRITE here to make sure UP is returned for a server that supports
// all possible workloads
return Mono.using(() -> this.driver.session(ReactiveSession.class, Neo4jHealthIndicator.DEFAULT_SESSION_CONFIG),
(session) -> {
Mono<ReactiveResult> resultMono = Flux.from(session.run(Neo4jHealthIndicator.CYPHER)).single();
return resultMono
.flatMapMany((result) -> Flux.from(result.records()).zipWith(Flux.from(result.consume())))
.map((tuple) -> new Neo4jHealthDetails(tuple.getT1(), tuple.getT2())).single();
}, ReactiveSession::close);
return Mono.using(this::session, this::healthDetails, ReactiveSession::close);
}

private ReactiveSession session() {
return this.driver.session(ReactiveSession.class, Neo4jHealthIndicator.DEFAULT_SESSION_CONFIG);
}

private Mono<Neo4jHealthDetails> healthDetails(ReactiveSession session) {
return Mono.from(session.run(Neo4jHealthIndicator.CYPHER)).flatMap(this::healthDetails);
}

private Mono<? extends Neo4jHealthDetails> healthDetails(ReactiveResult result) {
Flux<Record> records = Flux.from(result.records());
Mono<ResultSummary> summary = Mono.from(result.consume());
Neo4jHealthDetailsBuilder builder = new Neo4jHealthDetailsBuilder();
return records.single().doOnNext(builder::record).then(summary).map(builder::build);
}

/**
* Builder used to create a {@link Neo4jHealthDetails} from a {@link Record} and a
* {@link ResultSummary}.
*/
private static class Neo4jHealthDetailsBuilder {

private Record record;

void record(Record record) {
this.record = record;
}

private Neo4jHealthDetails build(ResultSummary summary) {
return new Neo4jHealthDetails(this.record, summary);
}

}

}

0 comments on commit 93f8dc7

Please sign in to comment.