Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve single protocol handlers #19730

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -44,7 +44,7 @@ public class SingleProtocolDecoder
protected final InboundHandler[] inboundHandlers;
protected final ProtocolType supportedProtocol;

private final SingleProtocolEncoder encoder;
final SingleProtocolEncoder encoder;
private final boolean shouldSignalMemberProtocolEncoder;

public SingleProtocolDecoder(ProtocolType supportedProtocol, InboundHandler next, SingleProtocolEncoder encoder) {
Expand Down Expand Up @@ -97,7 +97,10 @@ public HandlerStatus onRead() {
return CLEAN;
}

verifyProtocol(loadProtocol());
if (!verifyProtocol(loadProtocol())) {
// Exception will be thrown in the SingleProtocolEncoder.
return CLEAN;
}
encoder.signalProtocolVerified();

// Initialize the connection
Expand Down Expand Up @@ -127,16 +130,24 @@ protected void setupNextDecoder() {

// Verify that received protocol is expected one.
// If not then signal SingleProtocolEncoder and throw exception.
protected void verifyProtocol(String incomingProtocol) {
protected boolean verifyProtocol(String incomingProtocol) {
if (!incomingProtocol.equals(supportedProtocol.getDescriptor())) {
encoder.signalWrongProtocol();
String message = "Unsupported protocol exchange detected, " + "expected protocol: "
+ supportedProtocol.name() + ", actual protocol or first three bytes are: " + incomingProtocol;
if (incomingProtocol.equals(UNEXPECTED_PROTOCOL)) {
message = "Instance to be connected replied with HZX. "
+ "This means a different protocol than expected sent to target instance";
}
throw new ProtocolException(message);
handleUnexpectedProtocol(incomingProtocol);
encoder.signalWrongProtocol("Unsupported protocol exchange detected, expected protocol: "
+ supportedProtocol.name() + ", actual protocol or first three bytes are: " + incomingProtocol);
return false;
}
return true;
}

protected void handleUnexpectedProtocol(String incomingProtocol) {
if (incomingProtocol.equals(UNEXPECTED_PROTOCOL)) {
// We can throw exception here, and we don't need to signal the
// encoder because when HZX is received there is no data to be
// sent.
throw new ProtocolException("Instance to be connected replied with"
+ " HZX. This means a different protocol than expected sent"
+ " to target instance");
}
}

Expand Down
Expand Up @@ -47,6 +47,8 @@ public class SingleProtocolEncoder extends OutboundHandler<Void, ByteBuffer> {
private boolean isDecoderReceivedProtocol;
private boolean clusterProtocolBuffered;

private String exceptionMessage;

public SingleProtocolEncoder(OutboundHandler next) {
this(new OutboundHandler[]{next});
}
Expand All @@ -72,6 +74,10 @@ public HandlerStatus onWrite() throws Exception {
if (!sendProtocol()) {
return DIRTY;
}
// UNEXPECTED_PROTOCOL is sent (or at least in the socket
// buffer). We can now throw exception in the pipeline to close
// the channel.
throw new ProtocolException(exceptionMessage);
}

if (channel.isClientMode()) {
Expand Down Expand Up @@ -119,7 +125,8 @@ public void signalProtocolVerified() {
}

// Used by SingleProtocolDecoder in order to send HZX eventually
public void signalWrongProtocol() {
public void signalWrongProtocol(String exceptionMessage) {
this.exceptionMessage = exceptionMessage;
isDecoderReceivedProtocol = true;
isDecoderVerifiedProtocol = false;
channel.outboundPipeline().wakeup();
Expand Down
Expand Up @@ -31,16 +31,22 @@ public TextHandshakeDecoder(ProtocolType supportedProtocol, InboundHandler next,
}

@Override
protected void verifyProtocol(String incomingProtocol) {
protected boolean verifyProtocol(String incomingProtocol) {
handleUnexpectedProtocol(incomingProtocol);
if (ProtocolType.REST.equals(supportedProtocol)) {
if (!RestApiTextDecoder.TEXT_PARSERS.isCommandPrefix(incomingProtocol)) {
throw new IllegalStateException("Unsupported protocol exchange detected, expected protocol: REST");
encoder.signalWrongProtocol(
"Unsupported protocol exchange detected, expected protocol: REST");
return false;
}
} else {
if (!MemcacheTextDecoder.TEXT_PARSERS.isCommandPrefix(incomingProtocol)) {
throw new IllegalStateException("Unsupported protocol exchange detected, " + "expected protocol: MEMCACHED");
encoder.signalWrongProtocol(
"Unsupported protocol exchange detected, expected protocol: MEMCACHED");
return false;
}
}
return true;
}

@Override
Expand Down
Expand Up @@ -69,6 +69,11 @@ public void testCompleteMultisocketConfig() {
assertWrongProtocolAlert(CLIENT_PORT, Protocols.CLUSTER, "AAA");
assertWrongProtocolAlert(WAN1_PORT, Protocols.CLIENT_BINARY, "AAA");
assertWrongProtocolAlert(WAN2_PORT, Protocols.CLIENT_BINARY, "AAA");

assertWrongProtocolAlert(REST_PORT, Protocols.CLIENT_BINARY, "AAA");
assertWrongProtocolAlert(REST_PORT, Protocols.CLUSTER, "AAA");
assertWrongProtocolAlert(MEMCACHE_PORT, Protocols.CLIENT_BINARY, "AAA");
assertWrongProtocolAlert(MEMCACHE_PORT, Protocols.CLUSTER, "AAA");
}

@Test
Expand Down

This file was deleted.