Skip to content

Commit

Permalink
Avoid duplicated disconnecting producer when after add entry failed. (#…
Browse files Browse the repository at this point in the history
…11741)

### Motivation

Currently, if encounter the add entry failure, will call producer.disconnect() multiple times during the disconnecting the producer
which will add many disconnect producer tasks to the EventLoop.

### Changes

1. Added isDisconnecting state for the producer, if the producer in isDisconnecting state, skip the disconnect operation
2. Create new future list only the topic have producers to reduce the heap allocation

### Verify

Added test to cover disconnecting the producer multiple times, but the EventLoop only execute one time.
  • Loading branch information
codelipenghui committed Aug 23, 2021
1 parent 7906bb5 commit 49c0796
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 7 deletions.
Expand Up @@ -34,6 +34,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicClosedException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException;
Expand Down Expand Up @@ -90,6 +91,7 @@ public class Producer {

private final SchemaVersion schemaVersion;
private final String clientAddress; // IP address only, no port number included
private final AtomicBoolean isDisconnecting = new AtomicBoolean(false);

public Producer(Topic topic, TransportCnx cnx, long producerId, String producerName, String appId,
boolean isEncrypted, Map<String, String> metadata, SchemaVersion schemaVersion, long epoch,
Expand Down Expand Up @@ -552,6 +554,7 @@ public void closeNow(boolean removeFromTopic) {
log.debug("Removed producer: {}", this);
}
closeFuture.complete(null);
isDisconnecting.set(false);
}

/**
Expand All @@ -561,7 +564,7 @@ public void closeNow(boolean removeFromTopic) {
* @return Completable future indicating completion of producer close
*/
public CompletableFuture<Void> disconnect() {
if (!closeFuture.isDone()) {
if (!closeFuture.isDone() && isDisconnecting.compareAndSet(false, true)) {
log.info("Disconnecting producer: {}", this);
cnx.execute(() -> {
cnx.closeProducer(this);
Expand Down Expand Up @@ -669,6 +672,10 @@ public String getClientAddress() {
return clientAddress;
}

public boolean isDisconnecting() {
return isDisconnecting.get();
}

private static final Logger log = LoggerFactory.getLogger(Producer.class);

}
Expand Up @@ -2599,7 +2599,7 @@ public PulsarCommandSender getCommandSender() {

@Override
public void execute(Runnable runnable) {
ctx.channel().eventLoop().execute(runnable);
ctx().channel().eventLoop().execute(runnable);
}

@Override
Expand Down
Expand Up @@ -516,9 +516,15 @@ public synchronized void addFailed(ManagedLedgerException exception, Object ctx)
// fence topic when failed to write a message to BK
fence();
// close all producers
List<CompletableFuture<Void>> futures = Lists.newArrayList();
producers.values().forEach(producer -> futures.add(producer.disconnect()));
FutureUtil.waitForAll(futures).handle((BiFunction<Void, Throwable, Void>) (aVoid, throwable) -> {
CompletableFuture<Void> disconnectProducersFuture;
if (producers.size() > 0) {
List<CompletableFuture<Void>> futures = Lists.newArrayList();
producers.forEach((__, producer) -> futures.add(producer.disconnect()));
disconnectProducersFuture = FutureUtil.waitForAll(futures);
} else {
disconnectProducersFuture = CompletableFuture.completedFuture(null);
}
disconnectProducersFuture.handle((BiFunction<Void, Throwable, Void>) (aVoid, throwable) -> {
decrementPendingWriteOpsAndCheck();
return null;
});
Expand Down
Expand Up @@ -56,14 +56,16 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.DefaultEventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import lombok.Cleanup;
Expand Down Expand Up @@ -94,7 +96,6 @@
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentReplicator;
import org.apache.pulsar.broker.service.persistent.CompactorSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
Expand Down Expand Up @@ -228,6 +229,11 @@ public void setup() throws Exception {
doReturn(new InetSocketAddress("localhost", 1234)).when(serverCnx).clientAddress();
doReturn(new PulsarCommandSenderImpl(null, serverCnx))
.when(serverCnx).getCommandSender();
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
Channel channel = mock(Channel.class);
doReturn(spy(DefaultEventLoop.class)).when(channel).eventLoop();
doReturn(channel).when(ctx).channel();
doReturn(ctx).when(serverCnx).ctx();

NamespaceService nsSvc = mock(NamespaceService.class);
NamespaceBundle bundle = mock(NamespaceBundle.class);
Expand Down Expand Up @@ -2177,6 +2183,20 @@ public void testGetDurableSubscription() throws Exception {
f2.get();
}

@Test
public void testDisconnectProducer() throws Exception {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
String role = "appid1";
Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name",
role, false, null, SchemaVersion.Latest, 0, false,
ProducerAccessMode.Shared, Optional.empty());
assertFalse(producer.isDisconnecting());
// Disconnect the producer multiple times.
producer.disconnect();
producer.disconnect();
verify(serverCnx).execute(any());
};

private ByteBuf getMessageWithMetadata(byte[] data) {
MessageMetadata messageData = new MessageMetadata()
.setPublishTime(System.currentTimeMillis())
Expand Down

0 comments on commit 49c0796

Please sign in to comment.