Skip to content

Commit

Permalink
Revert "[Broker] Fix producer getting incorrectly removed from topic'…
Browse files Browse the repository at this point in the history
…s producers map (apache#12846)"

This reverts commit 1ede4f3.
  • Loading branch information
nicoloboschi committed Nov 22, 2021
1 parent 61176c9 commit c16ba3e
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 67 deletions.
Expand Up @@ -161,7 +161,7 @@ public void addComplete(int rc, final LedgerHandle lh, long entryId, Object ctx)
return;
}

this.entryId = entryId;Producer.java
this.entryId = entryId;
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] write-complete: ledger-id={} entry-id={} size={} rc={}", this, ml.getName(),
lh.getId(), entryId, dataLength, rc);
Expand Down
Expand Up @@ -913,72 +913,6 @@ public void testCreateProducerTimeoutThenCreateSameNamedProducerShouldFail() thr
channel.finish();
}

@Test(timeOut = 30000)
public void testCreateProducerTimeoutThenCreateSameNamedProducerShouldFail() throws Exception {
resetChannel();
setChannelConnected();

// Delay the topic creation in a deterministic way
CompletableFuture<Runnable> openTopicFuture = new CompletableFuture<>();
doAnswer(invocationOnMock -> {
openTopicFuture.complete(() -> {
((OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
});
return null;
}).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class), any(Supplier.class), any());

// In a create producer timeout from client side we expect to see this sequence of commands :
// 1. create producer
// 2. close producer (when the timeout is triggered, which may be before the producer was created on the broker
// 3. create producer (triggered by reconnection logic)
// Then, when another producer is created with the same name, it should fail. Because we only have one
// channel here, we just use a different producer id

// These operations need to be serialized, to allow the last create producer to finally succeed
// (There can be more create/close pairs in the sequence, depending on the client timeout

String producerName = "my-producer";

ByteBuf createProducer1 = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */,
producerName, Collections.emptyMap(), false);
channel.writeInbound(createProducer1);

ByteBuf closeProducer = Commands.newCloseProducer(1 /* producer id */, 2 /* request id */ );
channel.writeInbound(closeProducer);

ByteBuf createProducer2 = Commands.newProducer(successTopicName, 1 /* producer id */, 3 /* request id */,
producerName, Collections.emptyMap(), false);
channel.writeInbound(createProducer2);

// Complete the topic opening: It will make 2nd producer creation successful
openTopicFuture.get().run();

// Close succeeds
Object response = getResponse();
assertEquals(response.getClass(), CommandSuccess.class);
assertEquals(((CommandSuccess) response).getRequestId(), 2);

// 2nd producer will be successfully created as topic is open by then
response = getResponse();
assertEquals(response.getClass(), CommandProducerSuccess.class);
assertEquals(((CommandProducerSuccess) response).getRequestId(), 3);

// Send create command after getting the CommandProducerSuccess to ensure correct ordering
ByteBuf createProducer3 = Commands.newProducer(successTopicName, 2 /* producer id */, 4 /* request id */,
producerName, Collections.emptyMap(), false);
channel.writeInbound(createProducer3);

// 3nd producer will fail
response = getResponse();
assertEquals(response.getClass(), CommandError.class);
assertEquals(((CommandError) response).getRequestId(), 4);

assertTrue(channel.isActive());

channel.finish();
}

@Test(timeOut = 30000, enabled = false)
public void testCreateProducerMultipleTimeouts() throws Exception {
resetChannel();
Expand Down

0 comments on commit c16ba3e

Please sign in to comment.