Skip to content

Commit

Permalink
Improve single protocol handlers (#19730) (#19741)
Browse files Browse the repository at this point in the history
* Throw exception in the decoder instead of the encoder

* Extend missing error handling to TextHandshakeDecoder

* Delete SingleEncoderDecoderTest as it's no longer needed.

(cherry picked from commit 4224341)
  • Loading branch information
ramizdundar committed Oct 12, 2021
1 parent 008f2fd commit e70f143
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 125 deletions.
Expand Up @@ -44,7 +44,7 @@ public class SingleProtocolDecoder
protected final InboundHandler[] inboundHandlers;
protected final ProtocolType supportedProtocol;

private final SingleProtocolEncoder encoder;
final SingleProtocolEncoder encoder;
private final boolean shouldSignalMemberProtocolEncoder;

public SingleProtocolDecoder(ProtocolType supportedProtocol, InboundHandler next, SingleProtocolEncoder encoder) {
Expand Down Expand Up @@ -97,7 +97,10 @@ public HandlerStatus onRead() {
return CLEAN;
}

verifyProtocol(loadProtocol());
if (!verifyProtocol(loadProtocol())) {
// Exception will be thrown in the SingleProtocolEncoder.
return CLEAN;
}
encoder.signalProtocolVerified();

// Initialize the connection
Expand Down Expand Up @@ -127,16 +130,24 @@ protected void setupNextDecoder() {

// Verify that received protocol is expected one.
// If not then signal SingleProtocolEncoder and throw exception.
protected void verifyProtocol(String incomingProtocol) {
protected boolean verifyProtocol(String incomingProtocol) {
if (!incomingProtocol.equals(supportedProtocol.getDescriptor())) {
encoder.signalWrongProtocol();
String message = "Unsupported protocol exchange detected, " + "expected protocol: "
+ supportedProtocol.name() + ", actual protocol or first three bytes are: " + incomingProtocol;
if (incomingProtocol.equals(UNEXPECTED_PROTOCOL)) {
message = "Instance to be connected replied with HZX. "
+ "This means a different protocol than expected sent to target instance";
}
throw new ProtocolException(message);
handleUnexpectedProtocol(incomingProtocol);
encoder.signalWrongProtocol("Unsupported protocol exchange detected, expected protocol: "
+ supportedProtocol.name() + ", actual protocol or first three bytes are: " + incomingProtocol);
return false;
}
return true;
}

protected void handleUnexpectedProtocol(String incomingProtocol) {
if (incomingProtocol.equals(UNEXPECTED_PROTOCOL)) {
// We can throw exception here, and we don't need to signal the
// encoder because when HZX is received there is no data to be
// sent.
throw new ProtocolException("Instance to be connected replied with"
+ " HZX. This means a different protocol than expected sent"
+ " to target instance");
}
}

Expand Down
Expand Up @@ -47,6 +47,8 @@ public class SingleProtocolEncoder extends OutboundHandler<Void, ByteBuffer> {
private boolean isDecoderReceivedProtocol;
private boolean clusterProtocolBuffered;

private String exceptionMessage;

public SingleProtocolEncoder(OutboundHandler next) {
this(new OutboundHandler[]{next});
}
Expand All @@ -72,6 +74,10 @@ public HandlerStatus onWrite() throws Exception {
if (!sendProtocol()) {
return DIRTY;
}
// UNEXPECTED_PROTOCOL is sent (or at least in the socket
// buffer). We can now throw exception in the pipeline to close
// the channel.
throw new ProtocolException(exceptionMessage);
}

if (channel.isClientMode()) {
Expand Down Expand Up @@ -119,7 +125,8 @@ public void signalProtocolVerified() {
}

// Used by SingleProtocolDecoder in order to send HZX eventually
public void signalWrongProtocol() {
public void signalWrongProtocol(String exceptionMessage) {
this.exceptionMessage = exceptionMessage;
isDecoderReceivedProtocol = true;
isDecoderVerifiedProtocol = false;
channel.outboundPipeline().wakeup();
Expand Down
Expand Up @@ -31,16 +31,22 @@ public TextHandshakeDecoder(ProtocolType supportedProtocol, InboundHandler next,
}

@Override
protected void verifyProtocol(String incomingProtocol) {
protected boolean verifyProtocol(String incomingProtocol) {
handleUnexpectedProtocol(incomingProtocol);
if (ProtocolType.REST.equals(supportedProtocol)) {
if (!RestApiTextDecoder.TEXT_PARSERS.isCommandPrefix(incomingProtocol)) {
throw new IllegalStateException("Unsupported protocol exchange detected, expected protocol: REST");
encoder.signalWrongProtocol(
"Unsupported protocol exchange detected, expected protocol: REST");
return false;
}
} else {
if (!MemcacheTextDecoder.TEXT_PARSERS.isCommandPrefix(incomingProtocol)) {
throw new IllegalStateException("Unsupported protocol exchange detected, " + "expected protocol: MEMCACHED");
encoder.signalWrongProtocol(
"Unsupported protocol exchange detected, expected protocol: MEMCACHED");
return false;
}
}
return true;
}

@Override
Expand Down
Expand Up @@ -69,6 +69,11 @@ public void testCompleteMultisocketConfig() {
assertWrongProtocolAlert(CLIENT_PORT, Protocols.CLUSTER, "AAA");
assertWrongProtocolAlert(WAN1_PORT, Protocols.CLIENT_BINARY, "AAA");
assertWrongProtocolAlert(WAN2_PORT, Protocols.CLIENT_BINARY, "AAA");

assertWrongProtocolAlert(REST_PORT, Protocols.CLIENT_BINARY, "AAA");
assertWrongProtocolAlert(REST_PORT, Protocols.CLUSTER, "AAA");
assertWrongProtocolAlert(MEMCACHE_PORT, Protocols.CLIENT_BINARY, "AAA");
assertWrongProtocolAlert(MEMCACHE_PORT, Protocols.CLUSTER, "AAA");
}

@Test
Expand Down

This file was deleted.

0 comments on commit e70f143

Please sign in to comment.