Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ServerCnx] Close connection after receiving unexpected SendCommand #12780

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -1352,7 +1352,9 @@ protected void handleSend(CommandSend send, ByteBuf headersAndPayload) {
CompletableFuture<Producer> producerFuture = producers.get(send.getProducerId());

if (producerFuture == null || !producerFuture.isDone() || producerFuture.isCompletedExceptionally()) {
log.warn("[{}] Producer had already been closed: {}", remoteAddress, send.getProducerId());
log.warn("[{}] Received message, but the producer is not ready : {}. Closing the connection.",
remoteAddress, send.getProducerId());
close();
return;
}

Expand Down
Expand Up @@ -648,6 +648,28 @@ public void testSendCommand() throws Exception {
channel.finish();
}

@Test(timeOut = 30000)
public void testSendCommandBeforeCreatingProducer() throws Exception {
resetChannel();
setChannelConnected();

// test SEND before producer is created
MessageMetadata messageMetadata = new MessageMetadata()
.setPublishTime(System.currentTimeMillis())
.setProducerName("prod-name")
.setSequenceId(0);
ByteBuf data = Unpooled.buffer(1024);

ByteBuf clientCommand = ByteBufPair.coalesce(Commands.newSend(1, 0, 1,
ChecksumType.None, messageMetadata, data));
channel.writeInbound(Unpooled.copiedBuffer(clientCommand));
clientCommand.release();

// Then expect channel to close
Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> !channel.isActive());
channel.finish();
}

@Test(timeOut = 30000)
public void testUseSameProducerName() throws Exception {
resetChannel();
Expand Down