diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index f4f7615688fc2..12bf0973b7b04 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -639,13 +639,9 @@ protected void internalAddProducer(Producer producer) throws BrokerServiceExcept private void tryOverwriteOldProducer(Producer oldProducer, Producer newProducer) throws BrokerServiceException { - boolean canOverwrite = false; - if (oldProducer.equals(newProducer) && !isUserProvidedProducerName(oldProducer) - && !isUserProvidedProducerName(newProducer) && newProducer.getEpoch() > oldProducer.getEpoch()) { + if (newProducer.isSuccessorTo(oldProducer) && !isUserProvidedProducerName(oldProducer) + && !isUserProvidedProducerName(newProducer)) { oldProducer.close(false); - canOverwrite = true; - } - if (canOverwrite) { if (!producers.replace(newProducer.getProducerName(), oldProducer, newProducer)) { // Met concurrent update, throw exception here so that client can try reconnect later. throw new BrokerServiceException.NamingException("Producer with name '" + newProducer.getProducerName() diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index 0514542d2f567..d72f904019a4e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -153,22 +153,17 @@ private String parseRemoteClusterName(String producerName, boolean isRemote, Str return null; } - @Override - public int hashCode() { - return Objects.hash(producerName); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof Producer) { - Producer other = (Producer) obj; - return Objects.equals(producerName, other.producerName) - && Objects.equals(topic, other.topic) - && producerId == other.producerId - && Objects.equals(cnx, other.cnx); - } - - return false; + /** + * Method to determine if this producer can replace another producer. + * @param other - producer to compare to this one + * @return true if this producer is a subsequent instantiation of the same logical producer. Otherwise, false. + */ + public boolean isSuccessorTo(Producer other) { + return Objects.equals(producerName, other.producerName) + && Objects.equals(topic, other.topic) + && producerId == other.producerId + && Objects.equals(cnx, other.cnx) + && other.getEpoch() < epoch; } public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 5fe651be704ce..b8cbe0b2291a6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -37,6 +37,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertSame; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -442,11 +443,20 @@ public void testAddRemoveProducer() throws Exception { // OK } - // 4. simple remove producer + // 4. Try to remove with unequal producer + Producer producerCopy = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name", + role, false, null, SchemaVersion.Latest, 0, false, + ProducerAccessMode.Shared, Optional.empty()); + topic.removeProducer(producerCopy); + // Expect producer to be in map + assertEquals(topic.getProducers().size(), 1); + assertSame(topic.getProducers().get(producer.getProducerName()), producer); + + // 5. simple remove producer topic.removeProducer(producer); assertEquals(topic.getProducers().size(), 0); - // 5. duplicate remove + // 6. duplicate remove topic.removeProducer(producer); /* noop */ } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 80be3ef89f927..4fe14d89f7c67 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -837,6 +837,72 @@ public void testCreateProducerTimeout() throws Exception { channel.finish(); } + @Test(timeOut = 30000) + public void testCreateProducerTimeoutThenCreateSameNamedProducerShouldFail() throws Exception { + resetChannel(); + setChannelConnected(); + + // Delay the topic creation in a deterministic way + CompletableFuture openTopicFuture = new CompletableFuture<>(); + doAnswer(invocationOnMock -> { + openTopicFuture.complete(() -> { + ((OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null); + }); + return null; + }).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class), + any(OpenLedgerCallback.class), any(Supplier.class), any()); + + // In a create producer timeout from client side we expect to see this sequence of commands : + // 1. create producer + // 2. close producer (when the timeout is triggered, which may be before the producer was created on the broker + // 3. create producer (triggered by reconnection logic) + // Then, when another producer is created with the same name, it should fail. Because we only have one + // channel here, we just use a different producer id + + // These operations need to be serialized, to allow the last create producer to finally succeed + // (There can be more create/close pairs in the sequence, depending on the client timeout + + String producerName = "my-producer"; + + ByteBuf createProducer1 = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */, + producerName, Collections.emptyMap(), false); + channel.writeInbound(createProducer1); + + ByteBuf closeProducer = Commands.newCloseProducer(1 /* producer id */, 2 /* request id */ ); + channel.writeInbound(closeProducer); + + ByteBuf createProducer2 = Commands.newProducer(successTopicName, 1 /* producer id */, 3 /* request id */, + producerName, Collections.emptyMap(), false); + channel.writeInbound(createProducer2); + + // Complete the topic opening: It will make 2nd producer creation successful + openTopicFuture.get().run(); + + // Close succeeds + Object response = getResponse(); + assertEquals(response.getClass(), CommandSuccess.class); + assertEquals(((CommandSuccess) response).getRequestId(), 2); + + // 2nd producer will be successfully created as topic is open by then + response = getResponse(); + assertEquals(response.getClass(), CommandProducerSuccess.class); + assertEquals(((CommandProducerSuccess) response).getRequestId(), 3); + + // Send create command after getting the CommandProducerSuccess to ensure correct ordering + ByteBuf createProducer3 = Commands.newProducer(successTopicName, 2 /* producer id */, 4 /* request id */, + producerName, Collections.emptyMap(), false); + channel.writeInbound(createProducer3); + + // 3nd producer will fail + response = getResponse(); + assertEquals(response.getClass(), CommandError.class); + assertEquals(((CommandError) response).getRequestId(), 4); + + assertTrue(channel.isActive()); + + channel.finish(); + } + @Test(timeOut = 30000, enabled = false) public void testCreateProducerMultipleTimeouts() throws Exception { resetChannel();