From 8ceca742f1b038de4a6707d7c69fba4d504a2085 Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Tue, 7 Feb 2023 11:27:02 -0600 Subject: [PATCH] [fix][broker] Expect msgs after server initiated CloseProducer (#19446) (cherry picked from commit 524288cfbf0b83690b69e89344a809a001393228) (cherry picked from commit 4cbe68e778323ab4f3325be0ffe2766ff43f54c0) --- .../pulsar/broker/service/ServerCnx.java | 22 ++++ .../pulsar/broker/service/ServerCnxTest.java | 124 ++++++++++++++++-- 2 files changed, 133 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 4c8f074969ce8..7970ac8901247 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -40,6 +40,7 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.Collections; +import java.util.HashMap; import java.util.IdentityHashMap; import java.util.Map; import java.util.NoSuchElementException; @@ -156,6 +157,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { private final BrokerService service; private final SchemaRegistryService schemaService; private final String listenerName; + private final HashMap recentlyClosedProducers; private final ConcurrentLongHashMap> producers; private final ConcurrentLongHashMap> consumers; private final BrokerInterceptor brokerInterceptor; @@ -251,6 +253,7 @@ public ServerCnx(PulsarService pulsar, String listenerName) { .expectedItems(8) .concurrencyLevel(1) .build(); + this.recentlyClosedProducers = new HashMap<>(); this.replicatorPrefix = conf.getReplicatorPrefix(); this.maxNonPersistentPendingMessages = conf.getMaxConcurrentNonPersistentMessagePerConnection(); this.proxyRoles = conf.getProxyRoles(); @@ -1406,6 +1409,14 @@ protected void handleSend(CommandSend send, ByteBuf headersAndPayload) { CompletableFuture producerFuture = producers.get(send.getProducerId()); if (producerFuture == null || !producerFuture.isDone() || producerFuture.isCompletedExceptionally()) { + if (recentlyClosedProducers.containsKey(send.getProducerId())) { + if (log.isDebugEnabled()) { + log.debug("[{}] Received message, but the producer was recently closed : {}. Ignoring message.", + remoteAddress, send.getProducerId()); + } + // We expect these messages because we recently closed the producer. Do not close the connection. + return; + } log.warn("[{}] Received message, but the producer is not ready : {}. Closing the connection.", remoteAddress, send.getProducerId()); close(); @@ -2427,6 +2438,17 @@ public void closeProducer(Producer producer) { safelyRemoveProducer(producer); if (getRemoteEndpointProtocolVersion() >= v5.getValue()) { ctx.writeAndFlush(Commands.newCloseProducer(producer.getProducerId(), -1L)); + // The client does not necessarily know that the producer is closed, but the connection is still + // active, and there could be messages in flight already. We want to ignore these messages for a time + // because they are expected. Once the interval has passed, the client should have received the + // CloseProducer command and should not send any additional messages until it sends a create Producer + // command. + final long epoch = producer.getEpoch(); + final long producerId = producer.getProducerId(); + recentlyClosedProducers.put(producerId, epoch); + ctx.executor().schedule(() -> { + recentlyClosedProducers.remove(producerId, epoch); + }, service.getKeepAliveIntervalSeconds(), TimeUnit.SECONDS); } else { close(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index c07ad17e23a9e..657c7b6e2fd55 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -94,6 +94,7 @@ import org.apache.pulsar.common.api.proto.BaseCommand.Type; import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.api.proto.CommandAuthResponse; +import org.apache.pulsar.common.api.proto.CommandCloseProducer; import org.apache.pulsar.common.api.proto.CommandConnected; import org.apache.pulsar.common.api.proto.CommandError; import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse; @@ -650,15 +651,7 @@ public void testSendCommand() throws Exception { assertTrue(getResponse() instanceof CommandProducerSuccess); // test SEND success - MessageMetadata messageMetadata = new MessageMetadata() - .setPublishTime(System.currentTimeMillis()) - .setProducerName("prod-name") - .setSequenceId(0); - ByteBuf data = Unpooled.buffer(1024); - - clientCommand = ByteBufPair.coalesce(Commands.newSend(1, 0, 1, ChecksumType.None, messageMetadata, data)); - channel.writeInbound(Unpooled.copiedBuffer(clientCommand)); - clientCommand.release(); + sendMessage(); assertTrue(getResponse() instanceof CommandSendReceipt); channel.finish(); @@ -670,6 +663,115 @@ public void testSendCommandBeforeCreatingProducer() throws Exception { setChannelConnected(); // test SEND before producer is created + sendMessage(); + + // Then expect channel to close + Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> !channel.isActive()); + channel.finish(); + } + + @Test(timeOut = 30000) + public void testSendCommandAfterBrokerClosedProducer() throws Exception { + resetChannel(); + setChannelConnected(); + setConnectionVersion(ProtocolVersion.v5.getValue()); + serverCnx.cancelKeepAliveTask(); + + String producerName = "my-producer"; + + ByteBuf clientCommand1 = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */, + producerName, Collections.emptyMap(), false); + channel.writeInbound(clientCommand1); + assertTrue(getResponse() instanceof CommandProducerSuccess); + + // Call disconnect method on producer to trigger activity similar to unloading + Producer producer = serverCnx.getProducers().get(1).get(); + assertNotNull(producer); + producer.disconnect(); + channel.runPendingTasks(); + assertTrue(getResponse() instanceof CommandCloseProducer); + + // Send message and expect no response + sendMessage(); + + // Move clock forward to trigger scheduled clean up task + Thread.sleep(1000); + channel.runScheduledPendingTasks(); + assertTrue(channel.outboundMessages().isEmpty()); + assertTrue(channel.isActive()); + + // Send message and expect closed connection + sendMessage(); + + // Then expect channel to close + Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> !channel.isActive()); + channel.finish(); + } + + @Test(timeOut = 30000) + public void testBrokerClosedProducerClientRecreatesProducerThenSendCommand() throws Exception { + resetChannel(); + setChannelConnected(); + setConnectionVersion(ProtocolVersion.v5.getValue()); + serverCnx.cancelKeepAliveTask(); + + String producerName = "my-producer"; + + ByteBuf clientCommand1 = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */, + producerName, Collections.emptyMap(), false); + channel.writeInbound(clientCommand1); + assertTrue(getResponse() instanceof CommandProducerSuccess); + + // Call disconnect method on producer to trigger activity similar to unloading + Producer producer = serverCnx.getProducers().get(1).get(); + assertNotNull(producer); + producer.disconnect(); + channel.runPendingTasks(); + assertTrue(getResponse() instanceof CommandCloseProducer); + + // Send message and expect no response + sendMessage(); + + assertTrue(channel.outboundMessages().isEmpty()); + + // Move clock forward to trigger scheduled clean up task + ByteBuf createProducer2 = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */, + producerName, Collections.emptyMap(), false); + channel.writeInbound(createProducer2); + assertTrue(getResponse() instanceof CommandProducerSuccess); + + // Send message and expect success + sendMessage(); + + assertTrue(getResponse() instanceof CommandSendReceipt); + channel.finish(); + } + + @Test(timeOut = 30000) + public void testClientClosedProducerThenSendsMessageAndGetsClosed() throws Exception { + resetChannel(); + setChannelConnected(); + setConnectionVersion(ProtocolVersion.v5.getValue()); + serverCnx.cancelKeepAliveTask(); + + String producerName = "my-producer"; + + ByteBuf clientCommand1 = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */, + producerName, Collections.emptyMap(), false); + channel.writeInbound(clientCommand1); + assertTrue(getResponse() instanceof CommandProducerSuccess); + + ByteBuf closeProducer = Commands.newCloseProducer(1,2); + channel.writeInbound(closeProducer); + assertTrue(getResponse() instanceof CommandSuccess); + + // Send message and get disconnected + sendMessage(); + Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> !channel.isActive()); + channel.finish(); + } + + private void sendMessage() { MessageMetadata messageMetadata = new MessageMetadata() .setPublishTime(System.currentTimeMillis()) .setProducerName("prod-name") @@ -680,10 +782,6 @@ public void testSendCommandBeforeCreatingProducer() throws Exception { ChecksumType.None, messageMetadata, data)); channel.writeInbound(Unpooled.copiedBuffer(clientCommand)); clientCommand.release(); - - // Then expect channel to close - Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> !channel.isActive()); - channel.finish(); } @Test(timeOut = 30000)