From 6aef83f3b77c343b9ea3edc1c07dbaf6bac9bd59 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Fri, 27 Aug 2021 02:18:41 -0700 Subject: [PATCH] Producer getting producer busy is removing existing producer from list (#11804) ### Motivation When a producer is getting error because of ProducerBusy (existing producer with the same name), it will trigger a producer close operation that will eventually lead to the existing producer getting removed from the topic map (even though that producer is still writing on the topic). The problem is the producer close is triggering a removal from the map: pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java Line 683 in 43ded59 if (producers.remove(producer.getProducerName(), producer)) { Even though we check for producer equality, the Producer.equals() is only comparing the producer name, so the old instance gets removed from the map. Instead, the equality of producer needs to be based on the connection id (local & remote addresses and unique id), plus the producer id within that connection. * Producer getting producer busy is removing existing producer from list * Fixed test --- .../pulsar/broker/service/Producer.java | 5 ++- .../pulsar/broker/service/ServerCnx.java | 18 ++++++++ .../service/PersistentTopicE2ETest.java | 45 +++++++++++++++++++ 3 files changed, 67 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index 8c35e660c76e7..12697f6cd11af 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -147,7 +147,10 @@ public int hashCode() { public boolean equals(Object obj) { if (obj instanceof Producer) { Producer other = (Producer) obj; - return Objects.equals(producerName, other.producerName) && Objects.equals(topic, other.topic); + return Objects.equals(producerName, other.producerName) + && Objects.equals(topic, other.topic) + && producerId == other.producerId + && Objects.equals(cnx, other.cnx); } return false; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index c84836216a398..4ef6745975686 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -41,6 +41,7 @@ import java.util.IdentityHashMap; import java.util.Map; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -1530,6 +1531,23 @@ protected void handleSeek(CommandSeek seek) { } } + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ServerCnx other = (ServerCnx) o; + return Objects.equals(ctx().channel().id(), other.ctx().channel().id()); + } + + @Override + public int hashCode() { + return Objects.hash(ctx().channel().id()); + } + @Override protected void handleCloseProducer(CommandCloseProducer closeProducer) { checkArgument(state == State.Connected); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java index 881fe266a004d..42512b0f38bf8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java @@ -1814,4 +1814,49 @@ public void testWithEventTime() throws Exception { assertEquals(msg.getValue(), "test"); assertEquals(msg.getEventTime(), 5); } + + @Test + public void testProducerBusy() throws Exception { + final String topicName = "prop/ns-abc/producer-busy-" + System.nanoTime(); + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic(topicName) + .producerName("xxx") + .create(); + + assertEquals(admin.topics().getStats(topicName).getPublishers().size(), 1); + + for (int i =0; i < 5; i++) { + try { + pulsarClient.newProducer(Schema.STRING) + .topic(topicName) + .producerName("xxx") + .create(); + fail("Should have failed"); + } catch (ProducerBusyException e) { + // Expected + } + + assertEquals(admin.topics().getStats(topicName).getPublishers().size(), 1); + } + + // Try from different connection + @Cleanup + PulsarClient client2 = PulsarClient.builder() + .serviceUrl(getPulsar().getBrokerServiceUrl()) + .build(); + + try { + client2.newProducer(Schema.STRING) + .topic(topicName) + .producerName("xxx") + .create(); + fail("Should have failed"); + } catch (ProducerBusyException e) { + // Expected + } + + assertEquals(admin.topics().getStats(topicName).getPublishers().size(), 1); + } }