diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index 15076f9dc3550a..8c35e660c76e76 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -34,6 +34,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import org.apache.pulsar.broker.service.BrokerServiceException.TopicClosedException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException; @@ -90,6 +91,7 @@ public class Producer { private final SchemaVersion schemaVersion; private final String clientAddress; // IP address only, no port number included + private final AtomicBoolean isDisconnecting = new AtomicBoolean(false); public Producer(Topic topic, TransportCnx cnx, long producerId, String producerName, String appId, boolean isEncrypted, Map metadata, SchemaVersion schemaVersion, long epoch, @@ -552,6 +554,7 @@ public void closeNow(boolean removeFromTopic) { log.debug("Removed producer: {}", this); } closeFuture.complete(null); + isDisconnecting.set(false); } /** @@ -561,7 +564,7 @@ public void closeNow(boolean removeFromTopic) { * @return Completable future indicating completion of producer close */ public CompletableFuture disconnect() { - if (!closeFuture.isDone()) { + if (!closeFuture.isDone() && isDisconnecting.compareAndSet(false, true)) { log.info("Disconnecting producer: {}", this); cnx.execute(() -> { cnx.closeProducer(this); @@ -669,6 +672,10 @@ public String getClientAddress() { return clientAddress; } + public boolean isDisconnecting() { + return isDisconnecting.get(); + } + private static final Logger log = LoggerFactory.getLogger(Producer.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 8b9ae77c560f00..7d02e6a2910bcb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -2599,7 +2599,7 @@ public PulsarCommandSender getCommandSender() { @Override public void execute(Runnable runnable) { - ctx.channel().eventLoop().execute(runnable); + ctx().channel().eventLoop().execute(runnable); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 83d6f5ec00aa56..03e6cd6afc62c7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -516,9 +516,15 @@ public synchronized void addFailed(ManagedLedgerException exception, Object ctx) // fence topic when failed to write a message to BK fence(); // close all producers - List> futures = Lists.newArrayList(); - producers.values().forEach(producer -> futures.add(producer.disconnect())); - FutureUtil.waitForAll(futures).handle((BiFunction) (aVoid, throwable) -> { + CompletableFuture disconnectProducersFuture; + if (producers.size() > 0) { + List> futures = Lists.newArrayList(); + producers.forEach((__, producer) -> futures.add(producer.disconnect())); + disconnectProducersFuture = FutureUtil.waitForAll(futures); + } else { + disconnectProducersFuture = CompletableFuture.completedFuture(null); + } + disconnectProducersFuture.handle((BiFunction) (aVoid, throwable) -> { decrementPendingWriteOpsAndCheck(); return null; }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 2605a58401e00e..ad9f7c78ee5b8e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -56,7 +56,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -64,6 +63,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.DefaultEventLoop; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import lombok.Cleanup; @@ -94,7 +96,6 @@ import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.resources.PulsarResources; -import org.apache.pulsar.broker.service.nonpersistent.NonPersistentReplicator; import org.apache.pulsar.broker.service.persistent.CompactorSubscription; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer; @@ -228,6 +229,11 @@ public void setup() throws Exception { doReturn(new InetSocketAddress("localhost", 1234)).when(serverCnx).clientAddress(); doReturn(new PulsarCommandSenderImpl(null, serverCnx)) .when(serverCnx).getCommandSender(); + ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + Channel channel = mock(Channel.class); + doReturn(spy(DefaultEventLoop.class)).when(channel).eventLoop(); + doReturn(channel).when(ctx).channel(); + doReturn(ctx).when(serverCnx).ctx(); NamespaceService nsSvc = mock(NamespaceService.class); NamespaceBundle bundle = mock(NamespaceBundle.class); @@ -2177,6 +2183,20 @@ public void testGetDurableSubscription() throws Exception { f2.get(); } + @Test + public void testDisconnectProducer() throws Exception { + PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); + String role = "appid1"; + Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name", + role, false, null, SchemaVersion.Latest, 0, false, + ProducerAccessMode.Shared, Optional.empty()); + assertFalse(producer.isDisconnecting()); + // Disconnect the producer multiple times. + producer.disconnect(); + producer.disconnect(); + verify(serverCnx).execute(any()); + }; + private ByteBuf getMessageWithMetadata(byte[] data) { MessageMetadata messageData = new MessageMetadata() .setPublishTime(System.currentTimeMillis())