From 80aa379a40ebc7ea49b7814f5571af9f3f6212e0 Mon Sep 17 00:00:00 2001 From: leolinchen Date: Tue, 1 Mar 2022 15:18:05 +0800 Subject: [PATCH] =?UTF-8?q?--bug=3D97153739=20producer=E9=87=8D=E5=90=8D?= =?UTF-8?q?=E9=97=AE=E9=A2=98=E4=BF=AE=E5=A4=8D=20(merge=20request=20!57)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Squash merge branch 'fix_producer' into '2.8.1' Fixes # ### Motivation --bug=97153739 producer重名问题修复 chery pick from https://github.com/apache/pulsar/pull/12846 TAPD: --bug=097153739 --- .../pulsar/broker/service/AbstractTopic.java | 8 +-- .../pulsar/broker/service/Producer.java | 27 +++----- .../broker/service/PersistentTopicTest.java | 14 +++- .../pulsar/broker/service/ServerCnxTest.java | 67 +++++++++++++++++++ 4 files changed, 92 insertions(+), 24 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 39b0650c2cb5bb..008ff660457ad9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -641,13 +641,9 @@ protected void internalAddProducer(Producer producer) throws BrokerServiceExcept private void tryOverwriteOldProducer(Producer oldProducer, Producer newProducer) throws BrokerServiceException { - boolean canOverwrite = false; - if (oldProducer.equals(newProducer) && !isUserProvidedProducerName(oldProducer) - && !isUserProvidedProducerName(newProducer) && newProducer.getEpoch() > oldProducer.getEpoch()) { + if (newProducer.isSuccessorTo(oldProducer) && !isUserProvidedProducerName(oldProducer) + && !isUserProvidedProducerName(newProducer)) { oldProducer.close(false); - canOverwrite = true; - } - if (canOverwrite) { if (!producers.replace(newProducer.getProducerName(), oldProducer, newProducer)) { // Met concurrent update, throw exception here so that client can try reconnect later. throw new BrokerServiceException.NamingException("Producer with name '" + newProducer.getProducerName() diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index a14de3a746ca5a..67fdaf63981ee8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -140,22 +140,17 @@ public Producer(Topic topic, TransportCnx cnx, long producerId, String producerN this.clientAddress = cnx.clientSourceAddress(); } - @Override - public int hashCode() { - return Objects.hash(producerName); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof Producer) { - Producer other = (Producer) obj; - return Objects.equals(producerName, other.producerName) - && Objects.equals(topic, other.topic) - && producerId == other.producerId - && Objects.equals(cnx, other.cnx); - } - - return false; + /** + * Method to determine if this producer can replace another producer. + * @param other - producer to compare to this one + * @return true if this producer is a subsequent instantiation of the same logical producer. Otherwise, false. + */ + public boolean isSuccessorTo(Producer other) { + return Objects.equals(producerName, other.producerName) + && Objects.equals(topic, other.topic) + && producerId == other.producerId + && Objects.equals(cnx, other.cnx) + && other.getEpoch() < epoch; } public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index a71cc64d60ac12..f7998997d21cf5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -37,6 +37,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertSame; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -444,11 +445,20 @@ public void testAddRemoveProducer() throws Exception { // OK } - // 4. simple remove producer + // 4. Try to remove with unequal producer + Producer producerCopy = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name", + role, false, null, SchemaVersion.Latest, 0, false, + ProducerAccessMode.Shared, Optional.empty()); + topic.removeProducer(producerCopy); + // Expect producer to be in map + assertEquals(topic.getProducers().size(), 1); + assertSame(topic.getProducers().get(producer.getProducerName()), producer); + + // 5. simple remove producer topic.removeProducer(producer); assertEquals(topic.getProducers().size(), 0); - // 5. duplicate remove + // 6. duplicate remove topic.removeProducer(producer); /* noop */ } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 2e2f0f20b08946..b2c3dc32418e24 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -827,6 +827,73 @@ public void testCreateProducerTimeout() throws Exception { channel.finish(); } + @Test(timeOut = 30000) + public void testCreateProducerTimeoutThenCreateSameNamedProducerShouldFail() throws Exception { + resetChannel(); + setChannelConnected(); + + // Delay the topic creation in a deterministic way + CompletableFuture openTopicFuture = new CompletableFuture<>(); + doAnswer(invocationOnMock -> { + openTopicFuture.complete(() -> { + ((OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null); + }); + return null; + }).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class), + any(OpenLedgerCallback.class), any(Supplier.class), any()); + + // In a create producer timeout from client side we expect to see this sequence of commands : + // 1. create producer + // 2. close producer (when the timeout is triggered, which may be before the producer was created on the broker + // 3. create producer (triggered by reconnection logic) + // Then, when another producer is created with the same name, it should fail. Because we only have one + // channel here, we just use a different producer id + + // These operations need to be serialized, to allow the last create producer to finally succeed + // (There can be more create/close pairs in the sequence, depending on the client timeout + + String producerName = "my-producer"; + + ByteBuf createProducer1 = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */, + producerName, Collections.emptyMap()); + channel.writeInbound(createProducer1); + + ByteBuf closeProducer = Commands.newCloseProducer(1 /* producer id */, 2 /* request id */ ); + channel.writeInbound(closeProducer); + + ByteBuf createProducer2 = Commands.newProducer(successTopicName, 1 /* producer id */, 3 /* request id */, + producerName, Collections.emptyMap()); + channel.writeInbound(createProducer2); + + // Complete the topic opening: It will make 2nd producer creation successful + openTopicFuture.get().run(); + + // Close succeeds + Object response = getResponse(); + assertEquals(response.getClass(), CommandSuccess.class); + assertEquals(((CommandSuccess) response).getRequestId(), 2); + + // 2nd producer will be successfully created as topic is open by then + response = getResponse(); + assertEquals(response.getClass(), CommandProducerSuccess.class); + assertEquals(((CommandProducerSuccess) response).getRequestId(), 3); + + // Send create command after getting the CommandProducerSuccess to ensure correct ordering + ByteBuf createProducer3 = Commands.newProducer(successTopicName, 2 /* producer id */, 4 /* request id */, + producerName, Collections.emptyMap()); + channel.writeInbound(createProducer3); + + // 3nd producer will fail + response = getResponse(); + assertEquals(response.getClass(), CommandError.class); + assertEquals(((CommandError) response).getRequestId(), 4); + + assertTrue(channel.isActive()); + + channel.finish(); + } + + @Test(timeOut = 30000, enabled = false) public void testCreateProducerMultipleTimeouts() throws Exception { resetChannel();