Skip to content

Commit

Permalink
Producer getting producer busy is removing existing producer from list (
Browse files Browse the repository at this point in the history
#11804)

### Motivation
When a producer is getting error because of ProducerBusy (existing producer with the same name), it will trigger a producer close operation that will eventually lead to the existing producer getting removed from the topic map (even though that producer is still writing on the topic).

The problem is the producer close is triggering a removal from the map:
pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java

Line 683 in 43ded59

 if (producers.remove(producer.getProducerName(), producer)) { 
Even though we check for producer equality, the Producer.equals() is only comparing the producer name, so the old instance gets removed from the map.

Instead, the equality of producer needs to be based on the connection id (local & remote addresses and unique id), plus the producer id within that connection.

* Producer getting producer busy is removing existing producer from list

* Fixed test
  • Loading branch information
merlimat committed Aug 27, 2021
1 parent f32527f commit 6aef83f
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 1 deletion.
Expand Up @@ -147,7 +147,10 @@ public int hashCode() {
public boolean equals(Object obj) {
if (obj instanceof Producer) {
Producer other = (Producer) obj;
return Objects.equals(producerName, other.producerName) && Objects.equals(topic, other.topic);
return Objects.equals(producerName, other.producerName)
&& Objects.equals(topic, other.topic)
&& producerId == other.producerId
&& Objects.equals(cnx, other.cnx);
}

return false;
Expand Down
Expand Up @@ -41,6 +41,7 @@
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -1530,6 +1531,23 @@ protected void handleSeek(CommandSeek seek) {
}
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ServerCnx other = (ServerCnx) o;
return Objects.equals(ctx().channel().id(), other.ctx().channel().id());
}

@Override
public int hashCode() {
return Objects.hash(ctx().channel().id());
}

@Override
protected void handleCloseProducer(CommandCloseProducer closeProducer) {
checkArgument(state == State.Connected);
Expand Down
Expand Up @@ -1814,4 +1814,49 @@ public void testWithEventTime() throws Exception {
assertEquals(msg.getValue(), "test");
assertEquals(msg.getEventTime(), 5);
}

@Test
public void testProducerBusy() throws Exception {
final String topicName = "prop/ns-abc/producer-busy-" + System.nanoTime();

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topicName)
.producerName("xxx")
.create();

assertEquals(admin.topics().getStats(topicName).getPublishers().size(), 1);

for (int i =0; i < 5; i++) {
try {
pulsarClient.newProducer(Schema.STRING)
.topic(topicName)
.producerName("xxx")
.create();
fail("Should have failed");
} catch (ProducerBusyException e) {
// Expected
}

assertEquals(admin.topics().getStats(topicName).getPublishers().size(), 1);
}

// Try from different connection
@Cleanup
PulsarClient client2 = PulsarClient.builder()
.serviceUrl(getPulsar().getBrokerServiceUrl())
.build();

try {
client2.newProducer(Schema.STRING)
.topic(topicName)
.producerName("xxx")
.create();
fail("Should have failed");
} catch (ProducerBusyException e) {
// Expected
}

assertEquals(admin.topics().getStats(topicName).getPublishers().size(), 1);
}
}

0 comments on commit 6aef83f

Please sign in to comment.