diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java index 40d75fe0ab7fa..21d210ca78a20 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java @@ -161,7 +161,7 @@ public void addComplete(int rc, final LedgerHandle lh, long entryId, Object ctx) return; } - this.entryId = entryId;Producer.java + this.entryId = entryId; if (log.isDebugEnabled()) { log.debug("[{}] [{}] write-complete: ledger-id={} entry-id={} size={} rc={}", this, ml.getName(), lh.getId(), entryId, dataLength, rc); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 646f9c7006070..351274098fe3e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -913,72 +913,6 @@ public void testCreateProducerTimeoutThenCreateSameNamedProducerShouldFail() thr channel.finish(); } - @Test(timeOut = 30000) - public void testCreateProducerTimeoutThenCreateSameNamedProducerShouldFail() throws Exception { - resetChannel(); - setChannelConnected(); - - // Delay the topic creation in a deterministic way - CompletableFuture openTopicFuture = new CompletableFuture<>(); - doAnswer(invocationOnMock -> { - openTopicFuture.complete(() -> { - ((OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null); - }); - return null; - }).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class), - any(OpenLedgerCallback.class), any(Supplier.class), any()); - - // In a create producer timeout from client side we expect to see this sequence of commands : - // 1. create producer - // 2. close producer (when the timeout is triggered, which may be before the producer was created on the broker - // 3. create producer (triggered by reconnection logic) - // Then, when another producer is created with the same name, it should fail. Because we only have one - // channel here, we just use a different producer id - - // These operations need to be serialized, to allow the last create producer to finally succeed - // (There can be more create/close pairs in the sequence, depending on the client timeout - - String producerName = "my-producer"; - - ByteBuf createProducer1 = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */, - producerName, Collections.emptyMap(), false); - channel.writeInbound(createProducer1); - - ByteBuf closeProducer = Commands.newCloseProducer(1 /* producer id */, 2 /* request id */ ); - channel.writeInbound(closeProducer); - - ByteBuf createProducer2 = Commands.newProducer(successTopicName, 1 /* producer id */, 3 /* request id */, - producerName, Collections.emptyMap(), false); - channel.writeInbound(createProducer2); - - // Complete the topic opening: It will make 2nd producer creation successful - openTopicFuture.get().run(); - - // Close succeeds - Object response = getResponse(); - assertEquals(response.getClass(), CommandSuccess.class); - assertEquals(((CommandSuccess) response).getRequestId(), 2); - - // 2nd producer will be successfully created as topic is open by then - response = getResponse(); - assertEquals(response.getClass(), CommandProducerSuccess.class); - assertEquals(((CommandProducerSuccess) response).getRequestId(), 3); - - // Send create command after getting the CommandProducerSuccess to ensure correct ordering - ByteBuf createProducer3 = Commands.newProducer(successTopicName, 2 /* producer id */, 4 /* request id */, - producerName, Collections.emptyMap(), false); - channel.writeInbound(createProducer3); - - // 3nd producer will fail - response = getResponse(); - assertEquals(response.getClass(), CommandError.class); - assertEquals(((CommandError) response).getRequestId(), 4); - - assertTrue(channel.isActive()); - - channel.finish(); - } - @Test(timeOut = 30000, enabled = false) public void testCreateProducerMultipleTimeouts() throws Exception { resetChannel();