Skip to content

Commit

Permalink
[Broker] Fix producer getting incorrectly removed from topic's produc…
Browse files Browse the repository at this point in the history
…ers map (apache#12846)
  • Loading branch information
michaeljmarshall authored and fangxiaobing committed Dec 19, 2021
1 parent 88c26e3 commit 0bf6876
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 24 deletions.
Expand Up @@ -639,13 +639,9 @@ protected void internalAddProducer(Producer producer) throws BrokerServiceExcept

private void tryOverwriteOldProducer(Producer oldProducer, Producer newProducer)
throws BrokerServiceException {
boolean canOverwrite = false;
if (oldProducer.equals(newProducer) && !isUserProvidedProducerName(oldProducer)
&& !isUserProvidedProducerName(newProducer) && newProducer.getEpoch() > oldProducer.getEpoch()) {
if (newProducer.isSuccessorTo(oldProducer) && !isUserProvidedProducerName(oldProducer)
&& !isUserProvidedProducerName(newProducer)) {
oldProducer.close(false);
canOverwrite = true;
}
if (canOverwrite) {
if (!producers.replace(newProducer.getProducerName(), oldProducer, newProducer)) {
// Met concurrent update, throw exception here so that client can try reconnect later.
throw new BrokerServiceException.NamingException("Producer with name '" + newProducer.getProducerName()
Expand Down
Expand Up @@ -153,22 +153,17 @@ private String parseRemoteClusterName(String producerName, boolean isRemote, Str
return null;
}

@Override
public int hashCode() {
return Objects.hash(producerName);
}

@Override
public boolean equals(Object obj) {
if (obj instanceof Producer) {
Producer other = (Producer) obj;
return Objects.equals(producerName, other.producerName)
&& Objects.equals(topic, other.topic)
&& producerId == other.producerId
&& Objects.equals(cnx, other.cnx);
}

return false;
/**
* Method to determine if this producer can replace another producer.
* @param other - producer to compare to this one
* @return true if this producer is a subsequent instantiation of the same logical producer. Otherwise, false.
*/
public boolean isSuccessorTo(Producer other) {
return Objects.equals(producerName, other.producerName)
&& Objects.equals(topic, other.topic)
&& producerId == other.producerId
&& Objects.equals(cnx, other.cnx)
&& other.getEpoch() < epoch;
}

public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize,
Expand Down
Expand Up @@ -37,6 +37,7 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

Expand Down Expand Up @@ -442,11 +443,20 @@ public void testAddRemoveProducer() throws Exception {
// OK
}

// 4. simple remove producer
// 4. Try to remove with unequal producer
Producer producerCopy = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name",
role, false, null, SchemaVersion.Latest, 0, false,
ProducerAccessMode.Shared, Optional.empty());
topic.removeProducer(producerCopy);
// Expect producer to be in map
assertEquals(topic.getProducers().size(), 1);
assertSame(topic.getProducers().get(producer.getProducerName()), producer);

// 5. simple remove producer
topic.removeProducer(producer);
assertEquals(topic.getProducers().size(), 0);

// 5. duplicate remove
// 6. duplicate remove
topic.removeProducer(producer); /* noop */
}

Expand Down
Expand Up @@ -837,6 +837,72 @@ public void testCreateProducerTimeout() throws Exception {
channel.finish();
}

@Test(timeOut = 30000)
public void testCreateProducerTimeoutThenCreateSameNamedProducerShouldFail() throws Exception {
resetChannel();
setChannelConnected();

// Delay the topic creation in a deterministic way
CompletableFuture<Runnable> openTopicFuture = new CompletableFuture<>();
doAnswer(invocationOnMock -> {
openTopicFuture.complete(() -> {
((OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
});
return null;
}).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class), any(Supplier.class), any());

// In a create producer timeout from client side we expect to see this sequence of commands :
// 1. create producer
// 2. close producer (when the timeout is triggered, which may be before the producer was created on the broker
// 3. create producer (triggered by reconnection logic)
// Then, when another producer is created with the same name, it should fail. Because we only have one
// channel here, we just use a different producer id

// These operations need to be serialized, to allow the last create producer to finally succeed
// (There can be more create/close pairs in the sequence, depending on the client timeout

String producerName = "my-producer";

ByteBuf createProducer1 = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */,
producerName, Collections.emptyMap(), false);
channel.writeInbound(createProducer1);

ByteBuf closeProducer = Commands.newCloseProducer(1 /* producer id */, 2 /* request id */ );
channel.writeInbound(closeProducer);

ByteBuf createProducer2 = Commands.newProducer(successTopicName, 1 /* producer id */, 3 /* request id */,
producerName, Collections.emptyMap(), false);
channel.writeInbound(createProducer2);

// Complete the topic opening: It will make 2nd producer creation successful
openTopicFuture.get().run();

// Close succeeds
Object response = getResponse();
assertEquals(response.getClass(), CommandSuccess.class);
assertEquals(((CommandSuccess) response).getRequestId(), 2);

// 2nd producer will be successfully created as topic is open by then
response = getResponse();
assertEquals(response.getClass(), CommandProducerSuccess.class);
assertEquals(((CommandProducerSuccess) response).getRequestId(), 3);

// Send create command after getting the CommandProducerSuccess to ensure correct ordering
ByteBuf createProducer3 = Commands.newProducer(successTopicName, 2 /* producer id */, 4 /* request id */,
producerName, Collections.emptyMap(), false);
channel.writeInbound(createProducer3);

// 3nd producer will fail
response = getResponse();
assertEquals(response.getClass(), CommandError.class);
assertEquals(((CommandError) response).getRequestId(), 4);

assertTrue(channel.isActive());

channel.finish();
}

@Test(timeOut = 30000, enabled = false)
public void testCreateProducerMultipleTimeouts() throws Exception {
resetChannel();
Expand Down

0 comments on commit 0bf6876

Please sign in to comment.