Skip to content

Commit

Permalink
Ignore invalid connect frame
Browse files Browse the repository at this point in the history
Closes gh-28443
  • Loading branch information
rstoyanchev committed May 11, 2022
1 parent e4ec376 commit dc2947c
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 3 deletions.
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -306,6 +306,12 @@ protected void handleMessageInternal(Message<?> message) {
else if (SimpMessageType.CONNECT.equals(messageType)) {
logMessage(message);
if (sessionId != null) {
if (this.sessions.get(sessionId) != null) {
if (logger.isWarnEnabled()) {
logger.warn("Ignoring CONNECT in session " + sessionId + ". Already connected.");
}
return;
}
long[] heartbeatIn = SimpMessageHeaderAccessor.getHeartbeat(headers);
long[] heartbeatOut = getHeartbeatValue();
Principal user = SimpMessageHeaderAccessor.getUser(headers);
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -552,6 +552,12 @@ else if (accessor instanceof SimpMessageHeaderAccessor) {
}

if (StompCommand.CONNECT.equals(command) || StompCommand.STOMP.equals(command)) {
if (this.connectionHandlers.get(sessionId) != null) {
if (logger.isWarnEnabled()) {
logger.warn("Ignoring CONNECT in session " + sessionId + ". Already connected.");
}
return;
}
if (logger.isDebugEnabled()) {
logger.debug(stompAccessor.getShortLogMessage(EMPTY_PAYLOAD));
}
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -260,6 +260,30 @@ void systemSubscription() {
assertThat(captor.getValue()).isSameAs(message);
}

@Test
void alreadyConnected() {

this.brokerRelay.start();

Message<byte[]> connect = connectMessage("sess1", "joe");
this.brokerRelay.handleMessage(connect);

assertThat(this.tcpClient.getSentMessages().size()).isEqualTo(2);

StompHeaderAccessor headers1 = this.tcpClient.getSentHeaders(0);
assertThat(headers1.getCommand()).isEqualTo(StompCommand.CONNECT);
assertThat(headers1.getSessionId()).isEqualTo(StompBrokerRelayMessageHandler.SYSTEM_SESSION_ID);

StompHeaderAccessor headers2 = this.tcpClient.getSentHeaders(1);
assertThat(headers2.getCommand()).isEqualTo(StompCommand.CONNECT);
assertThat(headers2.getSessionId()).isEqualTo("sess1");

this.brokerRelay.handleMessage(connect);

assertThat(this.tcpClient.getSentMessages().size()).isEqualTo(2);
assertThat(this.outboundChannel.getMessages()).isEmpty();
}

private Message<byte[]> connectMessage(String sessionId, String user) {
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT);
headers.setSessionId(sessionId);
Expand Down

0 comments on commit dc2947c

Please sign in to comment.