Skip to content

Commit

Permalink
[ServerCnx] Close connection after receiving unexpected SendCommand (a…
Browse files Browse the repository at this point in the history
…pache#12780)

(cherry picked from commit ba58095)
  • Loading branch information
michaeljmarshall authored and eolivelli committed Nov 15, 2021
1 parent f46f54d commit b4a0083
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 1 deletion.
Expand Up @@ -1324,7 +1324,9 @@ protected void handleSend(CommandSend send, ByteBuf headersAndPayload) {
CompletableFuture<Producer> producerFuture = producers.get(send.getProducerId());

if (producerFuture == null || !producerFuture.isDone() || producerFuture.isCompletedExceptionally()) {
log.warn("[{}] Producer had already been closed: {}", remoteAddress, send.getProducerId());
log.warn("[{}] Received message, but the producer is not ready : {}. Closing the connection.",
remoteAddress, send.getProducerId());
close();
return;
}

Expand Down
Expand Up @@ -658,6 +658,28 @@ public void testSendCommand() throws Exception {
channel.finish();
}

@Test(timeOut = 30000)
public void testSendCommandBeforeCreatingProducer() throws Exception {
resetChannel();
setChannelConnected();

// test SEND before producer is created
MessageMetadata messageMetadata = new MessageMetadata()
.setPublishTime(System.currentTimeMillis())
.setProducerName("prod-name")
.setSequenceId(0);
ByteBuf data = Unpooled.buffer(1024);

ByteBuf clientCommand = ByteBufPair.coalesce(Commands.newSend(1, 0, 1,
ChecksumType.None, messageMetadata, data));
channel.writeInbound(Unpooled.copiedBuffer(clientCommand));
clientCommand.release();

// Then expect channel to close
Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> !channel.isActive());
channel.finish();
}

@Test(timeOut = 30000)
public void testUseSameProducerName() throws Exception {
resetChannel();
Expand Down

0 comments on commit b4a0083

Please sign in to comment.