Skip to content

Commit

Permalink
Avoid duplicated disconnecting producer when after add entry failed. (#…
Browse files Browse the repository at this point in the history
…11741)

Currently, if encounter the add entry failure, will call producer.disconnect() multiple times during the disconnecting the producer
which will add many disconnect producer tasks to the EventLoop.

1. Added isDisconnecting state for the producer, if the producer in isDisconnecting state, skip the disconnect operation
2. Create new future list only the topic have producers to reduce the heap allocation

Added test to cover disconnecting the producer multiple times, but the EventLoop only execute one time.

(cherry picked from commit 49c0796)
  • Loading branch information
codelipenghui authored and hangc0276 committed Aug 25, 2021
1 parent 7147d8a commit 9c2888f
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 7 deletions.
Expand Up @@ -34,6 +34,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicClosedException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException;
Expand Down Expand Up @@ -90,6 +91,7 @@ public class Producer {

private final SchemaVersion schemaVersion;
private final String clientAddress; // IP address only, no port number included
private final AtomicBoolean isDisconnecting = new AtomicBoolean(false);

public Producer(Topic topic, TransportCnx cnx, long producerId, String producerName, String appId,
boolean isEncrypted, Map<String, String> metadata, SchemaVersion schemaVersion, long epoch,
Expand Down Expand Up @@ -552,6 +554,7 @@ public void closeNow(boolean removeFromTopic) {
log.debug("Removed producer: {}", this);
}
closeFuture.complete(null);
isDisconnecting.set(false);
}

/**
Expand All @@ -561,7 +564,7 @@ public void closeNow(boolean removeFromTopic) {
* @return Completable future indicating completion of producer close
*/
public CompletableFuture<Void> disconnect() {
if (!closeFuture.isDone()) {
if (!closeFuture.isDone() && isDisconnecting.compareAndSet(false, true)) {
log.info("Disconnecting producer: {}", this);
cnx.execute(() -> {
cnx.closeProducer(this);
Expand Down Expand Up @@ -669,6 +672,10 @@ public String getClientAddress() {
return clientAddress;
}

public boolean isDisconnecting() {
return isDisconnecting.get();
}

private static final Logger log = LoggerFactory.getLogger(Producer.class);

}
Expand Up @@ -2572,7 +2572,7 @@ public PulsarCommandSender getCommandSender() {

@Override
public void execute(Runnable runnable) {
ctx.channel().eventLoop().execute(runnable);
ctx().channel().eventLoop().execute(runnable);
}

@Override
Expand Down
Expand Up @@ -503,9 +503,15 @@ public synchronized void addFailed(ManagedLedgerException exception, Object ctx)
// fence topic when failed to write a message to BK
fence();
// close all producers
List<CompletableFuture<Void>> futures = Lists.newArrayList();
producers.values().forEach(producer -> futures.add(producer.disconnect()));
FutureUtil.waitForAll(futures).handle((BiFunction<Void, Throwable, Void>) (aVoid, throwable) -> {
CompletableFuture<Void> disconnectProducersFuture;
if (producers.size() > 0) {
List<CompletableFuture<Void>> futures = Lists.newArrayList();
producers.forEach((__, producer) -> futures.add(producer.disconnect()));
disconnectProducersFuture = FutureUtil.waitForAll(futures);
} else {
disconnectProducersFuture = CompletableFuture.completedFuture(null);
}
disconnectProducersFuture.handle((BiFunction<Void, Throwable, Void>) (aVoid, throwable) -> {
decrementPendingWriteOpsAndCheck();
return null;
});
Expand Down
Expand Up @@ -56,14 +56,16 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.DefaultEventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import lombok.Cleanup;
Expand Down Expand Up @@ -93,7 +95,7 @@
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentReplicator;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.service.persistent.CompactorSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
Expand Down Expand Up @@ -216,6 +218,11 @@ public void setup() throws Exception {
doReturn(new InetSocketAddress("localhost", 1234)).when(serverCnx).clientAddress();
doReturn(new PulsarCommandSenderImpl(null, serverCnx))
.when(serverCnx).getCommandSender();
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
Channel channel = mock(Channel.class);
doReturn(spy(DefaultEventLoop.class)).when(channel).eventLoop();
doReturn(channel).when(ctx).channel();
doReturn(ctx).when(serverCnx).ctx();

NamespaceService nsSvc = mock(NamespaceService.class);
NamespaceBundle bundle = mock(NamespaceBundle.class);
Expand Down Expand Up @@ -2166,6 +2173,20 @@ public void testGetDurableSubscription() throws Exception {
f2.get();
}

@Test
public void testDisconnectProducer() throws Exception {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
String role = "appid1";
Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name",
role, false, null, SchemaVersion.Latest, 0, false,
ProducerAccessMode.Shared, Optional.empty());
assertFalse(producer.isDisconnecting());
// Disconnect the producer multiple times.
producer.disconnect();
producer.disconnect();
verify(serverCnx).execute(any());
};

private ByteBuf getMessageWithMetadata(byte[] data) {
MessageMetadata messageData = new MessageMetadata()
.setPublishTime(System.currentTimeMillis())
Expand Down

0 comments on commit 9c2888f

Please sign in to comment.