diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 6b1af573a0878a..8ab6b5bb988322 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -46,7 +46,6 @@ import java.net.SocketAddress; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.IdentityHashMap; import java.util.List; import java.util.Map; @@ -186,7 +185,6 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { private final BrokerService service; private final SchemaRegistryService schemaService; private final String listenerName; - private final HashMap recentlyClosedProducers; private final ConcurrentLongHashMap> producers; private final ConcurrentLongHashMap> consumers; private final boolean enableSubscriptionPatternEvaluation; @@ -291,7 +289,6 @@ public ServerCnx(PulsarService pulsar, String listenerName) { .expectedItems(8) .concurrencyLevel(1) .build(); - this.recentlyClosedProducers = new HashMap<>(); this.replicatorPrefix = conf.getReplicatorPrefix(); this.maxNonPersistentPendingMessages = conf.getMaxConcurrentNonPersistentMessagePerConnection(); this.schemaValidationEnforced = conf.isSchemaValidationEnforced(); @@ -1696,14 +1693,6 @@ protected void handleSend(CommandSend send, ByteBuf headersAndPayload) { CompletableFuture producerFuture = producers.get(send.getProducerId()); if (producerFuture == null || !producerFuture.isDone() || producerFuture.isCompletedExceptionally()) { - if (recentlyClosedProducers.containsKey(send.getProducerId())) { - if (log.isDebugEnabled()) { - log.debug("[{}] Received message, but the producer was recently closed : {}. Ignoring message.", - remoteAddress, send.getProducerId()); - } - // We expect these messages because we recently closed the producer. Do not close the connection. - return; - } log.warn("[{}] Received message, but the producer is not ready : {}. Closing the connection.", remoteAddress, send.getProducerId()); close(); @@ -2972,17 +2961,6 @@ public void closeProducer(Producer producer) { safelyRemoveProducer(producer); if (getRemoteEndpointProtocolVersion() >= v5.getValue()) { writeAndFlush(Commands.newCloseProducer(producer.getProducerId(), -1L)); - // The client does not necessarily know that the producer is closed, but the connection is still - // active, and there could be messages in flight already. We want to ignore these messages for a time - // because they are expected. Once the interval has passed, the client should have received the - // CloseProducer command and should not send any additional messages until it sends a create Producer - // command. - final long epoch = producer.getEpoch(); - final long producerId = producer.getProducerId(); - recentlyClosedProducers.put(producerId, epoch); - ctx.executor().schedule(() -> { - recentlyClosedProducers.remove(producerId, epoch); - }, service.getKeepAliveIntervalSeconds(), TimeUnit.SECONDS); } else { close(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index c3bab634a42c1f..b75e5386922b2a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -103,7 +103,6 @@ import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxnResponse; import org.apache.pulsar.common.api.proto.CommandAuthChallenge; import org.apache.pulsar.common.api.proto.CommandAuthResponse; -import org.apache.pulsar.common.api.proto.CommandCloseProducer; import org.apache.pulsar.common.api.proto.CommandConnected; import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse; import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscriptionResponse; @@ -1405,7 +1404,15 @@ public void testSendCommand() throws Exception { assertTrue(getResponse() instanceof CommandProducerSuccess); // test SEND success - sendMessage(); + MessageMetadata messageMetadata = new MessageMetadata() + .setPublishTime(System.currentTimeMillis()) + .setProducerName("prod-name") + .setSequenceId(0); + ByteBuf data = Unpooled.buffer(1024); + + clientCommand = ByteBufPair.coalesce(Commands.newSend(1, 0, 1, ChecksumType.None, messageMetadata, data)); + channel.writeInbound(Unpooled.copiedBuffer(clientCommand)); + clientCommand.release(); assertTrue(getResponse() instanceof CommandSendReceipt); channel.finish(); @@ -1417,115 +1424,6 @@ public void testSendCommandBeforeCreatingProducer() throws Exception { setChannelConnected(); // test SEND before producer is created - sendMessage(); - - // Then expect channel to close - Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> !channel.isActive()); - channel.finish(); - } - - @Test(timeOut = 30000) - public void testSendCommandAfterBrokerClosedProducer() throws Exception { - resetChannel(); - setChannelConnected(); - setConnectionVersion(ProtocolVersion.v5.getValue()); - serverCnx.cancelKeepAliveTask(); - - String producerName = "my-producer"; - - ByteBuf clientCommand1 = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */, - producerName, Collections.emptyMap(), false); - channel.writeInbound(clientCommand1); - assertTrue(getResponse() instanceof CommandProducerSuccess); - - // Call disconnect method on producer to trigger activity similar to unloading - Producer producer = serverCnx.getProducers().get(1).get(); - assertNotNull(producer); - producer.disconnect(); - channel.runPendingTasks(); - assertTrue(getResponse() instanceof CommandCloseProducer); - - // Send message and expect no response - sendMessage(); - - // Move clock forward to trigger scheduled clean up task - channel.advanceTimeBy(svcConfig.getKeepAliveIntervalSeconds(), TimeUnit.SECONDS); - channel.runScheduledPendingTasks(); - assertTrue(channel.outboundMessages().isEmpty()); - assertTrue(channel.isActive()); - - // Send message and expect closed connection - sendMessage(); - - // Then expect channel to close - Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> !channel.isActive()); - channel.finish(); - } - - @Test(timeOut = 30000) - public void testBrokerClosedProducerClientRecreatesProducerThenSendCommand() throws Exception { - resetChannel(); - setChannelConnected(); - setConnectionVersion(ProtocolVersion.v5.getValue()); - serverCnx.cancelKeepAliveTask(); - - String producerName = "my-producer"; - - ByteBuf clientCommand1 = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */, - producerName, Collections.emptyMap(), false); - channel.writeInbound(clientCommand1); - assertTrue(getResponse() instanceof CommandProducerSuccess); - - // Call disconnect method on producer to trigger activity similar to unloading - Producer producer = serverCnx.getProducers().get(1).get(); - assertNotNull(producer); - producer.disconnect(); - channel.runPendingTasks(); - assertTrue(getResponse() instanceof CommandCloseProducer); - - // Send message and expect no response - sendMessage(); - - assertTrue(channel.outboundMessages().isEmpty()); - - // Move clock forward to trigger scheduled clean up task - ByteBuf createProducer2 = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */, - producerName, Collections.emptyMap(), false); - channel.writeInbound(createProducer2); - assertTrue(getResponse() instanceof CommandProducerSuccess); - - // Send message and expect success - sendMessage(); - - assertTrue(getResponse() instanceof CommandSendReceipt); - channel.finish(); - } - - @Test(timeOut = 30000) - public void testClientClosedProducerThenSendsMessageAndGetsClosed() throws Exception { - resetChannel(); - setChannelConnected(); - setConnectionVersion(ProtocolVersion.v5.getValue()); - serverCnx.cancelKeepAliveTask(); - - String producerName = "my-producer"; - - ByteBuf clientCommand1 = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */, - producerName, Collections.emptyMap(), false); - channel.writeInbound(clientCommand1); - assertTrue(getResponse() instanceof CommandProducerSuccess); - - ByteBuf closeProducer = Commands.newCloseProducer(1,2); - channel.writeInbound(closeProducer); - assertTrue(getResponse() instanceof CommandSuccess); - - // Send message and get disconnected - sendMessage(); - Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> !channel.isActive()); - channel.finish(); - } - - private void sendMessage() { MessageMetadata messageMetadata = new MessageMetadata() .setPublishTime(System.currentTimeMillis()) .setProducerName("prod-name") @@ -1536,6 +1434,10 @@ private void sendMessage() { ChecksumType.None, messageMetadata, data)); channel.writeInbound(Unpooled.copiedBuffer(clientCommand)); clientCommand.release(); + + // Then expect channel to close + Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> !channel.isActive()); + channel.finish(); } @Test(timeOut = 30000)