Skip to content

Commit

Permalink
[fix][broker] Expect msgs after server initiated CloseProducer (#19446)
Browse files Browse the repository at this point in the history
(cherry picked from commit 524288c)
(cherry picked from commit 4cbe68e)
  • Loading branch information
michaeljmarshall committed Feb 7, 2023
1 parent 05cbbfd commit 283f773
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 13 deletions.
Expand Up @@ -42,6 +42,7 @@
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.NoSuchElementException;
Expand Down Expand Up @@ -161,6 +162,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
private final BrokerService service;
private final SchemaRegistryService schemaService;
private final String listenerName;
private final HashMap<Long, Long> recentlyClosedProducers;
private final ConcurrentLongHashMap<CompletableFuture<Producer>> producers;
private final ConcurrentLongHashMap<CompletableFuture<Consumer>> consumers;
private final BrokerInterceptor brokerInterceptor;
Expand Down Expand Up @@ -256,6 +258,7 @@ public ServerCnx(PulsarService pulsar, String listenerName) {
.expectedItems(8)
.concurrencyLevel(1)
.build();
this.recentlyClosedProducers = new HashMap<>();
this.replicatorPrefix = conf.getReplicatorPrefix();
this.maxNonPersistentPendingMessages = conf.getMaxConcurrentNonPersistentMessagePerConnection();
this.proxyRoles = conf.getProxyRoles();
Expand Down Expand Up @@ -1482,6 +1485,14 @@ protected void handleSend(CommandSend send, ByteBuf headersAndPayload) {
CompletableFuture<Producer> producerFuture = producers.get(send.getProducerId());

if (producerFuture == null || !producerFuture.isDone() || producerFuture.isCompletedExceptionally()) {
if (recentlyClosedProducers.containsKey(send.getProducerId())) {
if (log.isDebugEnabled()) {
log.debug("[{}] Received message, but the producer was recently closed : {}. Ignoring message.",
remoteAddress, send.getProducerId());
}
// We expect these messages because we recently closed the producer. Do not close the connection.
return;
}
log.warn("[{}] Received message, but the producer is not ready : {}. Closing the connection.",
remoteAddress, send.getProducerId());
close();
Expand Down Expand Up @@ -2536,6 +2547,17 @@ public void closeProducer(Producer producer) {
safelyRemoveProducer(producer);
if (getRemoteEndpointProtocolVersion() >= v5.getValue()) {
ctx.writeAndFlush(Commands.newCloseProducer(producer.getProducerId(), -1L));
// The client does not necessarily know that the producer is closed, but the connection is still
// active, and there could be messages in flight already. We want to ignore these messages for a time
// because they are expected. Once the interval has passed, the client should have received the
// CloseProducer command and should not send any additional messages until it sends a create Producer
// command.
final long epoch = producer.getEpoch();
final long producerId = producer.getProducerId();
recentlyClosedProducers.put(producerId, epoch);
ctx.executor().schedule(() -> {
recentlyClosedProducers.remove(producerId, epoch);
}, service.getKeepAliveIntervalSeconds(), TimeUnit.SECONDS);
} else {
close();
}
Expand Down
Expand Up @@ -94,6 +94,7 @@
import org.apache.pulsar.common.api.proto.BaseCommand.Type;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.CommandAuthResponse;
import org.apache.pulsar.common.api.proto.CommandCloseProducer;
import org.apache.pulsar.common.api.proto.CommandConnected;
import org.apache.pulsar.common.api.proto.CommandError;
import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
Expand Down Expand Up @@ -653,15 +654,7 @@ public void testSendCommand() throws Exception {
assertTrue(getResponse() instanceof CommandProducerSuccess);

// test SEND success
MessageMetadata messageMetadata = new MessageMetadata()
.setPublishTime(System.currentTimeMillis())
.setProducerName("prod-name")
.setSequenceId(0);
ByteBuf data = Unpooled.buffer(1024);

clientCommand = ByteBufPair.coalesce(Commands.newSend(1, 0, 1, ChecksumType.None, messageMetadata, data));
channel.writeInbound(Unpooled.copiedBuffer(clientCommand));
clientCommand.release();
sendMessage();

assertTrue(getResponse() instanceof CommandSendReceipt);
channel.finish();
Expand All @@ -673,6 +666,115 @@ public void testSendCommandBeforeCreatingProducer() throws Exception {
setChannelConnected();

// test SEND before producer is created
sendMessage();

// Then expect channel to close
Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> !channel.isActive());
channel.finish();
}

@Test(timeOut = 30000)
public void testSendCommandAfterBrokerClosedProducer() throws Exception {
resetChannel();
setChannelConnected();
setConnectionVersion(ProtocolVersion.v5.getValue());
serverCnx.cancelKeepAliveTask();

String producerName = "my-producer";

ByteBuf clientCommand1 = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */,
producerName, Collections.emptyMap(), false);
channel.writeInbound(clientCommand1);
assertTrue(getResponse() instanceof CommandProducerSuccess);

// Call disconnect method on producer to trigger activity similar to unloading
Producer producer = serverCnx.getProducers().get(1).get();
assertNotNull(producer);
producer.disconnect();
channel.runPendingTasks();
assertTrue(getResponse() instanceof CommandCloseProducer);

// Send message and expect no response
sendMessage();

// Move clock forward to trigger scheduled clean up task
channel.advanceTimeBy(svcConfig.getKeepAliveIntervalSeconds(), TimeUnit.SECONDS);
channel.runScheduledPendingTasks();
assertTrue(channel.outboundMessages().isEmpty());
assertTrue(channel.isActive());

// Send message and expect closed connection
sendMessage();

// Then expect channel to close
Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> !channel.isActive());
channel.finish();
}

@Test(timeOut = 30000)
public void testBrokerClosedProducerClientRecreatesProducerThenSendCommand() throws Exception {
resetChannel();
setChannelConnected();
setConnectionVersion(ProtocolVersion.v5.getValue());
serverCnx.cancelKeepAliveTask();

String producerName = "my-producer";

ByteBuf clientCommand1 = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */,
producerName, Collections.emptyMap(), false);
channel.writeInbound(clientCommand1);
assertTrue(getResponse() instanceof CommandProducerSuccess);

// Call disconnect method on producer to trigger activity similar to unloading
Producer producer = serverCnx.getProducers().get(1).get();
assertNotNull(producer);
producer.disconnect();
channel.runPendingTasks();
assertTrue(getResponse() instanceof CommandCloseProducer);

// Send message and expect no response
sendMessage();

assertTrue(channel.outboundMessages().isEmpty());

// Move clock forward to trigger scheduled clean up task
ByteBuf createProducer2 = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */,
producerName, Collections.emptyMap(), false);
channel.writeInbound(createProducer2);
assertTrue(getResponse() instanceof CommandProducerSuccess);

// Send message and expect success
sendMessage();

assertTrue(getResponse() instanceof CommandSendReceipt);
channel.finish();
}

@Test(timeOut = 30000)
public void testClientClosedProducerThenSendsMessageAndGetsClosed() throws Exception {
resetChannel();
setChannelConnected();
setConnectionVersion(ProtocolVersion.v5.getValue());
serverCnx.cancelKeepAliveTask();

String producerName = "my-producer";

ByteBuf clientCommand1 = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */,
producerName, Collections.emptyMap(), false);
channel.writeInbound(clientCommand1);
assertTrue(getResponse() instanceof CommandProducerSuccess);

ByteBuf closeProducer = Commands.newCloseProducer(1,2);
channel.writeInbound(closeProducer);
assertTrue(getResponse() instanceof CommandSuccess);

// Send message and get disconnected
sendMessage();
Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> !channel.isActive());
channel.finish();
}

private void sendMessage() {
MessageMetadata messageMetadata = new MessageMetadata()
.setPublishTime(System.currentTimeMillis())
.setProducerName("prod-name")
Expand All @@ -683,10 +785,6 @@ public void testSendCommandBeforeCreatingProducer() throws Exception {
ChecksumType.None, messageMetadata, data));
channel.writeInbound(Unpooled.copiedBuffer(clientCommand));
clientCommand.release();

// Then expect channel to close
Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> !channel.isActive());
channel.finish();
}

@Test(timeOut = 30000)
Expand Down

0 comments on commit 283f773

Please sign in to comment.