From 125c3a32a720928c3bbcd9f63a415ae898f1d304 Mon Sep 17 00:00:00 2001 From: penghui Date: Sun, 22 Aug 2021 11:46:27 +0800 Subject: [PATCH] Avoid duplicated disconnecting producer when after add entry failed. ### Motivation Currently, if encounter the add entry failure, will call producer.disconnect() multiple times during the disconnecting the producer which will add many disconnect producer tasks to the EventLoop. ### Changes 1. Added isDisconnecting state for the producer, if the producer in isDisconnecting state, skip the disconnect operation 2. Create new future list only the topic have producers to reduce the heap allocation ### Verify Added test to cover disconnecting the producer multiple times, but the EventLoop only execute one time. --- .../pulsar/broker/service/Producer.java | 9 ++++++- .../pulsar/broker/service/ServerCnx.java | 2 +- .../service/persistent/PersistentTopic.java | 12 +++++++--- .../broker/service/PersistentTopicTest.java | 24 +++++++++++++++++-- 4 files changed, 40 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index 15076f9dc3550..8c35e660c76e7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -34,6 +34,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import org.apache.pulsar.broker.service.BrokerServiceException.TopicClosedException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException; @@ -90,6 +91,7 @@ public class Producer { private final SchemaVersion schemaVersion; private final String clientAddress; // IP address only, no port number included + private final AtomicBoolean isDisconnecting = new AtomicBoolean(false); public Producer(Topic topic, TransportCnx cnx, long producerId, String producerName, String appId, boolean isEncrypted, Map metadata, SchemaVersion schemaVersion, long epoch, @@ -552,6 +554,7 @@ public void closeNow(boolean removeFromTopic) { log.debug("Removed producer: {}", this); } closeFuture.complete(null); + isDisconnecting.set(false); } /** @@ -561,7 +564,7 @@ public void closeNow(boolean removeFromTopic) { * @return Completable future indicating completion of producer close */ public CompletableFuture disconnect() { - if (!closeFuture.isDone()) { + if (!closeFuture.isDone() && isDisconnecting.compareAndSet(false, true)) { log.info("Disconnecting producer: {}", this); cnx.execute(() -> { cnx.closeProducer(this); @@ -669,6 +672,10 @@ public String getClientAddress() { return clientAddress; } + public boolean isDisconnecting() { + return isDisconnecting.get(); + } + private static final Logger log = LoggerFactory.getLogger(Producer.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 9a1232ff08b21..0a7d8417f4d20 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -2598,7 +2598,7 @@ public PulsarCommandSender getCommandSender() { @Override public void execute(Runnable runnable) { - ctx.channel().eventLoop().execute(runnable); + ctx().channel().eventLoop().execute(runnable); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 7128f08e4f3a6..0da23d59f0fc8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -514,9 +514,15 @@ public synchronized void addFailed(ManagedLedgerException exception, Object ctx) // fence topic when failed to write a message to BK fence(); // close all producers - List> futures = Lists.newArrayList(); - producers.values().forEach(producer -> futures.add(producer.disconnect())); - FutureUtil.waitForAll(futures).handle((BiFunction) (aVoid, throwable) -> { + CompletableFuture disconnectProducersFuture; + if (producers.size() > 0) { + List> futures = Lists.newArrayList(); + producers.forEach((__, producer) -> futures.add(producer.disconnect())); + disconnectProducersFuture = FutureUtil.waitForAll(futures); + } else { + disconnectProducersFuture = CompletableFuture.completedFuture(null); + } + disconnectProducersFuture.handle((BiFunction) (aVoid, throwable) -> { decrementPendingWriteOpsAndCheck(); return null; }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 2605a58401e00..ad9f7c78ee5b8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -56,7 +56,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -64,6 +63,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.DefaultEventLoop; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import lombok.Cleanup; @@ -94,7 +96,6 @@ import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.resources.PulsarResources; -import org.apache.pulsar.broker.service.nonpersistent.NonPersistentReplicator; import org.apache.pulsar.broker.service.persistent.CompactorSubscription; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer; @@ -228,6 +229,11 @@ public void setup() throws Exception { doReturn(new InetSocketAddress("localhost", 1234)).when(serverCnx).clientAddress(); doReturn(new PulsarCommandSenderImpl(null, serverCnx)) .when(serverCnx).getCommandSender(); + ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + Channel channel = mock(Channel.class); + doReturn(spy(DefaultEventLoop.class)).when(channel).eventLoop(); + doReturn(channel).when(ctx).channel(); + doReturn(ctx).when(serverCnx).ctx(); NamespaceService nsSvc = mock(NamespaceService.class); NamespaceBundle bundle = mock(NamespaceBundle.class); @@ -2177,6 +2183,20 @@ public void testGetDurableSubscription() throws Exception { f2.get(); } + @Test + public void testDisconnectProducer() throws Exception { + PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); + String role = "appid1"; + Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name", + role, false, null, SchemaVersion.Latest, 0, false, + ProducerAccessMode.Shared, Optional.empty()); + assertFalse(producer.isDisconnecting()); + // Disconnect the producer multiple times. + producer.disconnect(); + producer.disconnect(); + verify(serverCnx).execute(any()); + }; + private ByteBuf getMessageWithMetadata(byte[] data) { MessageMetadata messageData = new MessageMetadata() .setPublishTime(System.currentTimeMillis())