From a9fc1de8bff2d3da7db0b76d578beadc502e6038 Mon Sep 17 00:00:00 2001 From: Thomas Segismont Date: Thu, 17 Nov 2022 14:41:01 +0100 Subject: [PATCH] Do not close ClusteredEventBus client and server before parent closing Fixes #4326 Signed-off-by: Thomas Segismont --- .../impl/clustered/ClusteredEventBus.java | 82 ++++++++++++------- .../eventbus/ClusteredEventBusTestBase.java | 63 +++++++++++++- 2 files changed, 112 insertions(+), 33 deletions(-) diff --git a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java index 308e4e91c73..7497f5564aa 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java +++ b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java @@ -19,18 +19,31 @@ import io.vertx.core.eventbus.AddressHelper; import io.vertx.core.eventbus.EventBusOptions; import io.vertx.core.eventbus.MessageCodec; -import io.vertx.core.eventbus.impl.*; +import io.vertx.core.eventbus.impl.CodecManager; +import io.vertx.core.eventbus.impl.EventBusImpl; +import io.vertx.core.eventbus.impl.HandlerHolder; +import io.vertx.core.eventbus.impl.HandlerRegistration; +import io.vertx.core.eventbus.impl.MessageImpl; +import io.vertx.core.eventbus.impl.OutboundDeliveryContext; +import io.vertx.core.impl.CloseFuture; import io.vertx.core.impl.ContextInternal; +import io.vertx.core.impl.EventLoopContext; import io.vertx.core.impl.VertxInternal; import io.vertx.core.impl.logging.Logger; import io.vertx.core.impl.logging.LoggerFactory; import io.vertx.core.impl.utils.ConcurrentCyclicSequence; -import io.vertx.core.net.*; +import io.vertx.core.net.NetClient; +import io.vertx.core.net.NetClientOptions; +import io.vertx.core.net.NetServer; +import io.vertx.core.net.NetServerOptions; +import io.vertx.core.net.NetSocket; +import io.vertx.core.net.impl.NetClientBuilder; import io.vertx.core.parsetools.RecordParser; import io.vertx.core.spi.cluster.ClusterManager; import io.vertx.core.spi.cluster.NodeInfo; import io.vertx.core.spi.cluster.NodeSelector; import io.vertx.core.spi.cluster.RegistrationInfo; +import io.vertx.core.spi.metrics.VertxMetrics; import java.util.Iterator; import java.util.Objects; @@ -57,6 +70,8 @@ public class ClusteredEventBus extends EventBusImpl { private final NetClient client; private final ConcurrentMap connections = new ConcurrentHashMap<>(); + private final CloseFuture closeFuture; + private final EventLoopContext ebContext; private NodeInfo nodeInfo; private String nodeId; @@ -67,7 +82,19 @@ public ClusteredEventBus(VertxInternal vertx, VertxOptions options, ClusterManag this.options = options.getEventBusOptions(); this.clusterManager = clusterManager; this.nodeSelector = nodeSelector; - this.client = vertx.createNetClient(new NetClientOptions(this.options.toJson())); + closeFuture = new CloseFuture(log); + ebContext = vertx.createEventLoopContext(null, closeFuture, null, Thread.currentThread().getContextClassLoader()); + this.client = createNetClient(vertx, new NetClientOptions(this.options.toJson()), closeFuture); + } + + private NetClient createNetClient(VertxInternal vertx, NetClientOptions clientOptions, CloseFuture closeFuture) { + NetClientBuilder builder = new NetClientBuilder(vertx, clientOptions); + VertxMetrics metricsSPI = vertx.metricsSPI(); + if (metricsSPI != null) { + builder.metrics(metricsSPI.createNetClientMetrics(clientOptions)); + } + builder.closeFuture(closeFuture); + return builder.build(); } NetClient client() { @@ -85,42 +112,39 @@ public void start(Promise promise) { server.connectHandler(getServerHandler()); int port = getClusterPort(); String host = getClusterHost(); - server.listen(port, host).flatMap(v -> { - int publicPort = getClusterPublicPort(server.actualPort()); - String publicHost = getClusterPublicHost(host); - nodeInfo = new NodeInfo(publicHost, publicPort, options.getClusterNodeMetadata()); - nodeId = clusterManager.getNodeId(); - Promise setPromise = Promise.promise(); - clusterManager.setNodeInfo(nodeInfo, setPromise); - return setPromise.future(); - }).andThen(ar -> { - if (ar.succeeded()) { - started = true; - nodeSelector.eventBusStarted(); - } - }).onComplete(promise); + ebContext.runOnContext(v -> { + server.listen(port, host).flatMap(v2 -> { + int publicPort = getClusterPublicPort(server.actualPort()); + String publicHost = getClusterPublicHost(host); + nodeInfo = new NodeInfo(publicHost, publicPort, options.getClusterNodeMetadata()); + nodeId = clusterManager.getNodeId(); + Promise setPromise = Promise.promise(); + clusterManager.setNodeInfo(nodeInfo, setPromise); + return setPromise.future(); + }).andThen(ar -> { + if (ar.succeeded()) { + started = true; + nodeSelector.eventBusStarted(); + } + }).onComplete(promise); + }); } @Override public void close(Promise promise) { Promise parentClose = Promise.promise(); super.close(parentClose); - parentClose.future().onComplete(ar -> { - if (server != null) { - server.close(serverClose -> { - if (serverClose.failed()) { - log.error("Failed to close server", serverClose.cause()); - } + parentClose.future() + .transform(ar -> closeFuture.close()) + .andThen(ar -> { + if (server != null) { // Close all outbound connections explicitly - don't rely on context hooks for (ConnectionHolder holder : connections.values()) { holder.close(); } - promise.handle(serverClose); - }); - } else { - promise.handle(ar); - } - }); + } + }) + .onComplete(promise); } @Override diff --git a/src/test/java/io/vertx/core/eventbus/ClusteredEventBusTestBase.java b/src/test/java/io/vertx/core/eventbus/ClusteredEventBusTestBase.java index 810177c0b84..6a19543e4fc 100644 --- a/src/test/java/io/vertx/core/eventbus/ClusteredEventBusTestBase.java +++ b/src/test/java/io/vertx/core/eventbus/ClusteredEventBusTestBase.java @@ -11,9 +11,18 @@ package io.vertx.core.eventbus; -import io.vertx.core.*; +import io.vertx.core.AbstractVerticle; +import io.vertx.core.AsyncResult; +import io.vertx.core.Handler; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.core.VertxOptions; import io.vertx.core.shareddata.AsyncMapTest; -import io.vertx.core.spi.cluster.*; +import io.vertx.core.spi.cluster.ClusterManager; +import io.vertx.core.spi.cluster.NodeSelector; +import io.vertx.core.spi.cluster.RegistrationUpdateEvent; +import io.vertx.core.spi.cluster.WrappedClusterManager; +import io.vertx.core.spi.cluster.WrappedNodeSelector; import io.vertx.test.core.TestUtils; import io.vertx.test.fakecluster.FakeClusterManager; import org.junit.Test; @@ -24,8 +33,7 @@ import java.util.function.Consumer; import java.util.function.Supplier; -import static org.hamcrest.CoreMatchers.instanceOf; -import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.*; /** @@ -270,4 +278,51 @@ public void registrationsUpdated(RegistrationUpdateEvent event) { })); await(); } + + @Test + public void testMessagingInStopMethod() throws Exception { + startNodes(2); + + AtomicInteger count = new AtomicInteger(); + + class MyVerticle extends AbstractVerticle { + + final String pingServerAddress; + final String pingClientAddress; + + MyVerticle(String pingServerAddress, String pingClientAddress) { + this.pingServerAddress = pingServerAddress; + this.pingClientAddress = pingClientAddress; + } + + @Override + public void start(Promise startPromise) throws Exception { + vertx.eventBus().consumer(pingServerAddress, msg -> msg.reply("pong")).completionHandler(startPromise); + } + + @Override + public void stop(Promise stopPromise) throws Exception { + vertx.eventBus().request(pingClientAddress, "ping", onSuccess(msg -> { + assertEquals("pong", msg.body()); + count.incrementAndGet(); + vertx.setPeriodic(10, l -> { + if (count.get() == 2) { + stopPromise.complete(); + } + }); + })); + } + } + + waitFor(2); + + vertices[0].deployVerticle(new MyVerticle("foo", "bar"), onSuccess(id1 -> { + vertices[1].deployVerticle(new MyVerticle("bar", "foo"), onSuccess(id2 -> { + vertices[0].close(onSuccess(v -> complete())); + vertices[1].close(onSuccess(v -> complete())); + })); + })); + + await(); + } }