From b325db3b4f6e054cb20adcba922537c928f6f39a Mon Sep 17 00:00:00 2001 From: lipenghui Date: Tue, 24 Aug 2021 02:16:00 +0800 Subject: [PATCH] Avoid duplicated disconnecting producer when after add entry failed. (#11741) ### Motivation Currently, if encounter the add entry failure, will call producer.disconnect() multiple times during the disconnecting the producer which will add many disconnect producer tasks to the EventLoop. ### Changes 1. Added isDisconnecting state for the producer, if the producer in isDisconnecting state, skip the disconnect operation 2. Create new future list only the topic have producers to reduce the heap allocation ### Verify Added test to cover disconnecting the producer multiple times, but the EventLoop only execute one time. --- .../pulsar/broker/service/Producer.java | 9 ++++++- .../pulsar/broker/service/ServerCnx.java | 2 +- .../service/persistent/PersistentTopic.java | 12 +++++++--- .../broker/service/PersistentTopicTest.java | 24 +++++++++++++++++-- 4 files changed, 40 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index 15076f9dc3550a..8c35e660c76e76 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -34,6 +34,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import org.apache.pulsar.broker.service.BrokerServiceException.TopicClosedException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException; @@ -90,6 +91,7 @@ public class Producer { private final SchemaVersion schemaVersion; private final String clientAddress; // IP address only, no port number included + private final AtomicBoolean isDisconnecting = new AtomicBoolean(false); public Producer(Topic topic, TransportCnx cnx, long producerId, String producerName, String appId, boolean isEncrypted, Map metadata, SchemaVersion schemaVersion, long epoch, @@ -552,6 +554,7 @@ public void closeNow(boolean removeFromTopic) { log.debug("Removed producer: {}", this); } closeFuture.complete(null); + isDisconnecting.set(false); } /** @@ -561,7 +564,7 @@ public void closeNow(boolean removeFromTopic) { * @return Completable future indicating completion of producer close */ public CompletableFuture disconnect() { - if (!closeFuture.isDone()) { + if (!closeFuture.isDone() && isDisconnecting.compareAndSet(false, true)) { log.info("Disconnecting producer: {}", this); cnx.execute(() -> { cnx.closeProducer(this); @@ -669,6 +672,10 @@ public String getClientAddress() { return clientAddress; } + public boolean isDisconnecting() { + return isDisconnecting.get(); + } + private static final Logger log = LoggerFactory.getLogger(Producer.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 8b9ae77c560f00..7d02e6a2910bcb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -2599,7 +2599,7 @@ public PulsarCommandSender getCommandSender() { @Override public void execute(Runnable runnable) { - ctx.channel().eventLoop().execute(runnable); + ctx().channel().eventLoop().execute(runnable); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 83d6f5ec00aa56..03e6cd6afc62c7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -516,9 +516,15 @@ public synchronized void addFailed(ManagedLedgerException exception, Object ctx) // fence topic when failed to write a message to BK fence(); // close all producers - List> futures = Lists.newArrayList(); - producers.values().forEach(producer -> futures.add(producer.disconnect())); - FutureUtil.waitForAll(futures).handle((BiFunction) (aVoid, throwable) -> { + CompletableFuture disconnectProducersFuture; + if (producers.size() > 0) { + List> futures = Lists.newArrayList(); + producers.forEach((__, producer) -> futures.add(producer.disconnect())); + disconnectProducersFuture = FutureUtil.waitForAll(futures); + } else { + disconnectProducersFuture = CompletableFuture.completedFuture(null); + } + disconnectProducersFuture.handle((BiFunction) (aVoid, throwable) -> { decrementPendingWriteOpsAndCheck(); return null; }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 2605a58401e00e..ad9f7c78ee5b8e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -56,7 +56,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -64,6 +63,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.DefaultEventLoop; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import lombok.Cleanup; @@ -94,7 +96,6 @@ import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.resources.PulsarResources; -import org.apache.pulsar.broker.service.nonpersistent.NonPersistentReplicator; import org.apache.pulsar.broker.service.persistent.CompactorSubscription; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer; @@ -228,6 +229,11 @@ public void setup() throws Exception { doReturn(new InetSocketAddress("localhost", 1234)).when(serverCnx).clientAddress(); doReturn(new PulsarCommandSenderImpl(null, serverCnx)) .when(serverCnx).getCommandSender(); + ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + Channel channel = mock(Channel.class); + doReturn(spy(DefaultEventLoop.class)).when(channel).eventLoop(); + doReturn(channel).when(ctx).channel(); + doReturn(ctx).when(serverCnx).ctx(); NamespaceService nsSvc = mock(NamespaceService.class); NamespaceBundle bundle = mock(NamespaceBundle.class); @@ -2177,6 +2183,20 @@ public void testGetDurableSubscription() throws Exception { f2.get(); } + @Test + public void testDisconnectProducer() throws Exception { + PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); + String role = "appid1"; + Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name", + role, false, null, SchemaVersion.Latest, 0, false, + ProducerAccessMode.Shared, Optional.empty()); + assertFalse(producer.isDisconnecting()); + // Disconnect the producer multiple times. + producer.disconnect(); + producer.disconnect(); + verify(serverCnx).execute(any()); + }; + private ByteBuf getMessageWithMetadata(byte[] data) { MessageMetadata messageData = new MessageMetadata() .setPublishTime(System.currentTimeMillis())