diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index 8c35e660c76e76..12697f6cd11afd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -147,7 +147,10 @@ public int hashCode() { public boolean equals(Object obj) { if (obj instanceof Producer) { Producer other = (Producer) obj; - return Objects.equals(producerName, other.producerName) && Objects.equals(topic, other.topic); + return Objects.equals(producerName, other.producerName) + && Objects.equals(topic, other.topic) + && producerId == other.producerId + && Objects.equals(cnx, other.cnx); } return false; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index c84836216a398c..4ef67459756868 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -41,6 +41,7 @@ import java.util.IdentityHashMap; import java.util.Map; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -1530,6 +1531,23 @@ protected void handleSeek(CommandSeek seek) { } } + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ServerCnx other = (ServerCnx) o; + return Objects.equals(ctx().channel().id(), other.ctx().channel().id()); + } + + @Override + public int hashCode() { + return Objects.hash(ctx().channel().id()); + } + @Override protected void handleCloseProducer(CommandCloseProducer closeProducer) { checkArgument(state == State.Connected); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java index 881fe266a004d9..42512b0f38bf89 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java @@ -1814,4 +1814,49 @@ public void testWithEventTime() throws Exception { assertEquals(msg.getValue(), "test"); assertEquals(msg.getEventTime(), 5); } + + @Test + public void testProducerBusy() throws Exception { + final String topicName = "prop/ns-abc/producer-busy-" + System.nanoTime(); + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic(topicName) + .producerName("xxx") + .create(); + + assertEquals(admin.topics().getStats(topicName).getPublishers().size(), 1); + + for (int i =0; i < 5; i++) { + try { + pulsarClient.newProducer(Schema.STRING) + .topic(topicName) + .producerName("xxx") + .create(); + fail("Should have failed"); + } catch (ProducerBusyException e) { + // Expected + } + + assertEquals(admin.topics().getStats(topicName).getPublishers().size(), 1); + } + + // Try from different connection + @Cleanup + PulsarClient client2 = PulsarClient.builder() + .serviceUrl(getPulsar().getBrokerServiceUrl()) + .build(); + + try { + client2.newProducer(Schema.STRING) + .topic(topicName) + .producerName("xxx") + .create(); + fail("Should have failed"); + } catch (ProducerBusyException e) { + // Expected + } + + assertEquals(admin.topics().getStats(topicName).getPublishers().size(), 1); + } }