From 37217cd1b47d0df519d578c47320f0996a5fa0c8 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Thu, 26 Aug 2021 16:00:51 -0700 Subject: [PATCH 1/2] Producer getting producer busy is removing existing producer from list --- .../pulsar/broker/service/Producer.java | 6 ++- .../pulsar/broker/service/ServerCnx.java | 18 ++++++++ .../service/PersistentTopicE2ETest.java | 45 +++++++++++++++++++ 3 files changed, 68 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index 8c35e660c76e7..147aa4d49d4bc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -147,7 +147,11 @@ public int hashCode() { public boolean equals(Object obj) { if (obj instanceof Producer) { Producer other = (Producer) obj; - return Objects.equals(producerName, other.producerName) && Objects.equals(topic, other.topic); + return Objects.equals(producerName, other.producerName) + && Objects.equals(topic, other.topic) + && producerId == other.producerId + && epoch == other.epoch + && Objects.equals(cnx, other.cnx); } return false; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 7d02e6a2910bc..e3041d7cb4c9e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -41,6 +41,7 @@ import java.util.IdentityHashMap; import java.util.Map; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -1544,6 +1545,23 @@ protected void handleSeek(CommandSeek seek) { } } + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ServerCnx other = (ServerCnx) o; + return Objects.equals(ctx().channel().id(), other.ctx().channel().id()); + } + + @Override + public int hashCode() { + return Objects.hash(ctx().channel().id()); + } + @Override protected void handleCloseProducer(CommandCloseProducer closeProducer) { checkArgument(state == State.Connected); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java index 881fe266a004d..42512b0f38bf8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java @@ -1814,4 +1814,49 @@ public void testWithEventTime() throws Exception { assertEquals(msg.getValue(), "test"); assertEquals(msg.getEventTime(), 5); } + + @Test + public void testProducerBusy() throws Exception { + final String topicName = "prop/ns-abc/producer-busy-" + System.nanoTime(); + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic(topicName) + .producerName("xxx") + .create(); + + assertEquals(admin.topics().getStats(topicName).getPublishers().size(), 1); + + for (int i =0; i < 5; i++) { + try { + pulsarClient.newProducer(Schema.STRING) + .topic(topicName) + .producerName("xxx") + .create(); + fail("Should have failed"); + } catch (ProducerBusyException e) { + // Expected + } + + assertEquals(admin.topics().getStats(topicName).getPublishers().size(), 1); + } + + // Try from different connection + @Cleanup + PulsarClient client2 = PulsarClient.builder() + .serviceUrl(getPulsar().getBrokerServiceUrl()) + .build(); + + try { + client2.newProducer(Schema.STRING) + .topic(topicName) + .producerName("xxx") + .create(); + fail("Should have failed"); + } catch (ProducerBusyException e) { + // Expected + } + + assertEquals(admin.topics().getStats(topicName).getPublishers().size(), 1); + } } From e69d95d299d0d57cdaeb4c1900b944b1a5ed6e01 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Thu, 26 Aug 2021 23:36:38 -0700 Subject: [PATCH 2/2] Fixed test --- .../src/main/java/org/apache/pulsar/broker/service/Producer.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index 147aa4d49d4bc..12697f6cd11af 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -150,7 +150,6 @@ public boolean equals(Object obj) { return Objects.equals(producerName, other.producerName) && Objects.equals(topic, other.topic) && producerId == other.producerId - && epoch == other.epoch && Objects.equals(cnx, other.cnx); }