Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[draft] [fix] [broker] Revert #19446 to fix the send message future never complete #21134

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -186,7 +185,6 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
private final BrokerService service;
private final SchemaRegistryService schemaService;
private final String listenerName;
private final HashMap<Long, Long> recentlyClosedProducers;
private final ConcurrentLongHashMap<CompletableFuture<Producer>> producers;
private final ConcurrentLongHashMap<CompletableFuture<Consumer>> consumers;
private final boolean enableSubscriptionPatternEvaluation;
Expand Down Expand Up @@ -291,7 +289,6 @@ public ServerCnx(PulsarService pulsar, String listenerName) {
.expectedItems(8)
.concurrencyLevel(1)
.build();
this.recentlyClosedProducers = new HashMap<>();
this.replicatorPrefix = conf.getReplicatorPrefix();
this.maxNonPersistentPendingMessages = conf.getMaxConcurrentNonPersistentMessagePerConnection();
this.schemaValidationEnforced = conf.isSchemaValidationEnforced();
Expand Down Expand Up @@ -1696,14 +1693,6 @@ protected void handleSend(CommandSend send, ByteBuf headersAndPayload) {
CompletableFuture<Producer> producerFuture = producers.get(send.getProducerId());

if (producerFuture == null || !producerFuture.isDone() || producerFuture.isCompletedExceptionally()) {
if (recentlyClosedProducers.containsKey(send.getProducerId())) {
if (log.isDebugEnabled()) {
log.debug("[{}] Received message, but the producer was recently closed : {}. Ignoring message.",
remoteAddress, send.getProducerId());
}
// We expect these messages because we recently closed the producer. Do not close the connection.
return;
}
log.warn("[{}] Received message, but the producer is not ready : {}. Closing the connection.",
remoteAddress, send.getProducerId());
close();
Expand Down Expand Up @@ -2972,17 +2961,6 @@ public void closeProducer(Producer producer) {
safelyRemoveProducer(producer);
if (getRemoteEndpointProtocolVersion() >= v5.getValue()) {
writeAndFlush(Commands.newCloseProducer(producer.getProducerId(), -1L));
// The client does not necessarily know that the producer is closed, but the connection is still
// active, and there could be messages in flight already. We want to ignore these messages for a time
// because they are expected. Once the interval has passed, the client should have received the
// CloseProducer command and should not send any additional messages until it sends a create Producer
// command.
final long epoch = producer.getEpoch();
final long producerId = producer.getProducerId();
recentlyClosedProducers.put(producerId, epoch);
ctx.executor().schedule(() -> {
recentlyClosedProducers.remove(producerId, epoch);
}, service.getKeepAliveIntervalSeconds(), TimeUnit.SECONDS);
} else {
close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@
import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxnResponse;
import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
import org.apache.pulsar.common.api.proto.CommandAuthResponse;
import org.apache.pulsar.common.api.proto.CommandCloseProducer;
import org.apache.pulsar.common.api.proto.CommandConnected;
import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse;
import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscriptionResponse;
Expand Down Expand Up @@ -1405,7 +1404,15 @@ public void testSendCommand() throws Exception {
assertTrue(getResponse() instanceof CommandProducerSuccess);

// test SEND success
sendMessage();
MessageMetadata messageMetadata = new MessageMetadata()
.setPublishTime(System.currentTimeMillis())
.setProducerName("prod-name")
.setSequenceId(0);
ByteBuf data = Unpooled.buffer(1024);

clientCommand = ByteBufPair.coalesce(Commands.newSend(1, 0, 1, ChecksumType.None, messageMetadata, data));
channel.writeInbound(Unpooled.copiedBuffer(clientCommand));
clientCommand.release();

assertTrue(getResponse() instanceof CommandSendReceipt);
channel.finish();
Expand All @@ -1417,115 +1424,6 @@ public void testSendCommandBeforeCreatingProducer() throws Exception {
setChannelConnected();

// test SEND before producer is created
sendMessage();

// Then expect channel to close
Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> !channel.isActive());
channel.finish();
}

@Test(timeOut = 30000)
public void testSendCommandAfterBrokerClosedProducer() throws Exception {
resetChannel();
setChannelConnected();
setConnectionVersion(ProtocolVersion.v5.getValue());
serverCnx.cancelKeepAliveTask();

String producerName = "my-producer";

ByteBuf clientCommand1 = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */,
producerName, Collections.emptyMap(), false);
channel.writeInbound(clientCommand1);
assertTrue(getResponse() instanceof CommandProducerSuccess);

// Call disconnect method on producer to trigger activity similar to unloading
Producer producer = serverCnx.getProducers().get(1).get();
assertNotNull(producer);
producer.disconnect();
channel.runPendingTasks();
assertTrue(getResponse() instanceof CommandCloseProducer);

// Send message and expect no response
sendMessage();

// Move clock forward to trigger scheduled clean up task
channel.advanceTimeBy(svcConfig.getKeepAliveIntervalSeconds(), TimeUnit.SECONDS);
channel.runScheduledPendingTasks();
assertTrue(channel.outboundMessages().isEmpty());
assertTrue(channel.isActive());

// Send message and expect closed connection
sendMessage();

// Then expect channel to close
Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> !channel.isActive());
channel.finish();
}

@Test(timeOut = 30000)
public void testBrokerClosedProducerClientRecreatesProducerThenSendCommand() throws Exception {
resetChannel();
setChannelConnected();
setConnectionVersion(ProtocolVersion.v5.getValue());
serverCnx.cancelKeepAliveTask();

String producerName = "my-producer";

ByteBuf clientCommand1 = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */,
producerName, Collections.emptyMap(), false);
channel.writeInbound(clientCommand1);
assertTrue(getResponse() instanceof CommandProducerSuccess);

// Call disconnect method on producer to trigger activity similar to unloading
Producer producer = serverCnx.getProducers().get(1).get();
assertNotNull(producer);
producer.disconnect();
channel.runPendingTasks();
assertTrue(getResponse() instanceof CommandCloseProducer);

// Send message and expect no response
sendMessage();

assertTrue(channel.outboundMessages().isEmpty());

// Move clock forward to trigger scheduled clean up task
ByteBuf createProducer2 = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */,
producerName, Collections.emptyMap(), false);
channel.writeInbound(createProducer2);
assertTrue(getResponse() instanceof CommandProducerSuccess);

// Send message and expect success
sendMessage();

assertTrue(getResponse() instanceof CommandSendReceipt);
channel.finish();
}

@Test(timeOut = 30000)
public void testClientClosedProducerThenSendsMessageAndGetsClosed() throws Exception {
resetChannel();
setChannelConnected();
setConnectionVersion(ProtocolVersion.v5.getValue());
serverCnx.cancelKeepAliveTask();

String producerName = "my-producer";

ByteBuf clientCommand1 = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */,
producerName, Collections.emptyMap(), false);
channel.writeInbound(clientCommand1);
assertTrue(getResponse() instanceof CommandProducerSuccess);

ByteBuf closeProducer = Commands.newCloseProducer(1,2);
channel.writeInbound(closeProducer);
assertTrue(getResponse() instanceof CommandSuccess);

// Send message and get disconnected
sendMessage();
Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> !channel.isActive());
channel.finish();
}

private void sendMessage() {
MessageMetadata messageMetadata = new MessageMetadata()
.setPublishTime(System.currentTimeMillis())
.setProducerName("prod-name")
Expand All @@ -1536,6 +1434,10 @@ private void sendMessage() {
ChecksumType.None, messageMetadata, data));
channel.writeInbound(Unpooled.copiedBuffer(clientCommand));
clientCommand.release();

// Then expect channel to close
Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> !channel.isActive());
channel.finish();
}

@Test(timeOut = 30000)
Expand Down