Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not close ClusteredEventBus client and server before parent closing #4544

Merged
merged 1 commit into from Nov 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -19,18 +19,31 @@
import io.vertx.core.eventbus.AddressHelper;
import io.vertx.core.eventbus.EventBusOptions;
import io.vertx.core.eventbus.MessageCodec;
import io.vertx.core.eventbus.impl.*;
import io.vertx.core.eventbus.impl.CodecManager;
import io.vertx.core.eventbus.impl.EventBusImpl;
import io.vertx.core.eventbus.impl.HandlerHolder;
import io.vertx.core.eventbus.impl.HandlerRegistration;
import io.vertx.core.eventbus.impl.MessageImpl;
import io.vertx.core.eventbus.impl.OutboundDeliveryContext;
import io.vertx.core.impl.CloseFuture;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.EventLoopContext;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.impl.utils.ConcurrentCyclicSequence;
import io.vertx.core.net.*;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetClientOptions;
import io.vertx.core.net.NetServer;
import io.vertx.core.net.NetServerOptions;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.impl.NetClientBuilder;
import io.vertx.core.parsetools.RecordParser;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.core.spi.cluster.NodeInfo;
import io.vertx.core.spi.cluster.NodeSelector;
import io.vertx.core.spi.cluster.RegistrationInfo;
import io.vertx.core.spi.metrics.VertxMetrics;

import java.util.Iterator;
import java.util.Objects;
Expand All @@ -57,6 +70,8 @@ public class ClusteredEventBus extends EventBusImpl {
private final NetClient client;

private final ConcurrentMap<String, ConnectionHolder> connections = new ConcurrentHashMap<>();
private final CloseFuture closeFuture;
private final EventLoopContext ebContext;

private NodeInfo nodeInfo;
private String nodeId;
Expand All @@ -67,7 +82,19 @@ public ClusteredEventBus(VertxInternal vertx, VertxOptions options, ClusterManag
this.options = options.getEventBusOptions();
this.clusterManager = clusterManager;
this.nodeSelector = nodeSelector;
this.client = vertx.createNetClient(new NetClientOptions(this.options.toJson()));
closeFuture = new CloseFuture(log);
ebContext = vertx.createEventLoopContext(null, closeFuture, null, Thread.currentThread().getContextClassLoader());
this.client = createNetClient(vertx, new NetClientOptions(this.options.toJson()), closeFuture);
}

private NetClient createNetClient(VertxInternal vertx, NetClientOptions clientOptions, CloseFuture closeFuture) {
NetClientBuilder builder = new NetClientBuilder(vertx, clientOptions);
VertxMetrics metricsSPI = vertx.metricsSPI();
if (metricsSPI != null) {
builder.metrics(metricsSPI.createNetClientMetrics(clientOptions));
}
builder.closeFuture(closeFuture);
return builder.build();
}

NetClient client() {
Expand All @@ -85,42 +112,39 @@ public void start(Promise<Void> promise) {
server.connectHandler(getServerHandler());
int port = getClusterPort();
String host = getClusterHost();
server.listen(port, host).flatMap(v -> {
int publicPort = getClusterPublicPort(server.actualPort());
String publicHost = getClusterPublicHost(host);
nodeInfo = new NodeInfo(publicHost, publicPort, options.getClusterNodeMetadata());
nodeId = clusterManager.getNodeId();
Promise<Void> setPromise = Promise.promise();
clusterManager.setNodeInfo(nodeInfo, setPromise);
return setPromise.future();
}).andThen(ar -> {
if (ar.succeeded()) {
started = true;
nodeSelector.eventBusStarted();
}
}).onComplete(promise);
ebContext.runOnContext(v -> {
server.listen(port, host).flatMap(v2 -> {
int publicPort = getClusterPublicPort(server.actualPort());
String publicHost = getClusterPublicHost(host);
nodeInfo = new NodeInfo(publicHost, publicPort, options.getClusterNodeMetadata());
nodeId = clusterManager.getNodeId();
Promise<Void> setPromise = Promise.promise();
clusterManager.setNodeInfo(nodeInfo, setPromise);
return setPromise.future();
}).andThen(ar -> {
if (ar.succeeded()) {
started = true;
nodeSelector.eventBusStarted();
}
}).onComplete(promise);
});
}

@Override
public void close(Promise<Void> promise) {
Promise<Void> parentClose = Promise.promise();
super.close(parentClose);
parentClose.future().onComplete(ar -> {
if (server != null) {
server.close(serverClose -> {
if (serverClose.failed()) {
log.error("Failed to close server", serverClose.cause());
}
parentClose.future()
.transform(ar -> closeFuture.close())
.andThen(ar -> {
if (server != null) {
// Close all outbound connections explicitly - don't rely on context hooks
for (ConnectionHolder holder : connections.values()) {
holder.close();
}
promise.handle(serverClose);
});
} else {
promise.handle(ar);
}
});
}
})
.onComplete(promise);
}

@Override
Expand Down
Expand Up @@ -11,9 +11,18 @@

package io.vertx.core.eventbus;

import io.vertx.core.*;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.shareddata.AsyncMapTest;
import io.vertx.core.spi.cluster.*;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.core.spi.cluster.NodeSelector;
import io.vertx.core.spi.cluster.RegistrationUpdateEvent;
import io.vertx.core.spi.cluster.WrappedClusterManager;
import io.vertx.core.spi.cluster.WrappedNodeSelector;
import io.vertx.test.core.TestUtils;
import io.vertx.test.fakecluster.FakeClusterManager;
import org.junit.Test;
Expand All @@ -24,8 +33,7 @@
import java.util.function.Consumer;
import java.util.function.Supplier;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.*;


/**
Expand Down Expand Up @@ -270,4 +278,51 @@ public void registrationsUpdated(RegistrationUpdateEvent event) {
}));
await();
}

@Test
public void testMessagingInStopMethod() throws Exception {
startNodes(2);

AtomicInteger count = new AtomicInteger();

class MyVerticle extends AbstractVerticle {

final String pingServerAddress;
final String pingClientAddress;

MyVerticle(String pingServerAddress, String pingClientAddress) {
this.pingServerAddress = pingServerAddress;
this.pingClientAddress = pingClientAddress;
}

@Override
public void start(Promise<Void> startPromise) throws Exception {
vertx.eventBus().consumer(pingServerAddress, msg -> msg.reply("pong")).completionHandler(startPromise);
}

@Override
public void stop(Promise<Void> stopPromise) throws Exception {
vertx.eventBus().<String>request(pingClientAddress, "ping", onSuccess(msg -> {
assertEquals("pong", msg.body());
count.incrementAndGet();
vertx.setPeriodic(10, l -> {
if (count.get() == 2) {
stopPromise.complete();
}
});
}));
}
}

waitFor(2);

vertices[0].deployVerticle(new MyVerticle("foo", "bar"), onSuccess(id1 -> {
vertices[1].deployVerticle(new MyVerticle("bar", "foo"), onSuccess(id2 -> {
vertices[0].close(onSuccess(v -> complete()));
vertices[1].close(onSuccess(v -> complete()));
}));
}));

await();
}
}