Skip to content

Commit

Permalink
Jetty 9.4.x 4855 h2spec failures (#4946)
Browse files Browse the repository at this point in the history
* Fixes #4855 - Occasional h2spec failures on CI

In case of bad usage of the HTTP/2 API, we don't want to close()
the stream but just fail the callback, because the stream
may be performing actions triggered by a legit API usage.

In case of a call to `AsyncListener.onError()`, applications may decide to call
AsyncContext.complete() and that would be a correct usage of the Servlet API.
This case was not well handled and was wrongly producing a WARN log with an
`IllegalStateException`.

Completely rewritten `HttpTransportOverHTTP2.TransportCallback`.
The rewrite handles correctly asynchronous failures that now are executed
sequentially (and not concurrently) with writes.
If a write is in progress, the failure will just change the state and at the
end of the write a check on the state will determine what actions to take.

A session failure is now handled in HTTP2Session by first failing all the
streams - which notifies the Stream.Listeners - and then failing the session
- which notifies the Session.Listener.
The stream failures are executed concurrently by dispatching each one to a
different thread; this means that the stream failure callbacks are executed
concurrently (likely sending RST_STREAM frames).
The session failure callback is completed only when all the stream failure
callbacks have completed, to ensure that a GOAWAY frame is processed after
all the RST_STREAM frames.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet committed Jun 9, 2020
1 parent 7c1d290 commit 56bda1b
Show file tree
Hide file tree
Showing 16 changed files with 527 additions and 220 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public Connection newConnection(EndPoint endPoint, Map<String, Object> context)
return customize(connection, context);
}

private class HTTP2ClientConnection extends HTTP2Connection implements Callback
private static class HTTP2ClientConnection extends HTTP2Connection implements Callback
{
private final HTTP2Client client;
private final Promise<Session> promise;
Expand Down Expand Up @@ -154,7 +154,7 @@ public void failed(Throwable x)
}
}

private class ConnectionListener implements Connection.Listener
private static class ConnectionListener implements Connection.Listener
{
@Override
public void onOpened(Connection connection)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import javax.servlet.http.HttpServlet;

import org.eclipse.jetty.http.HostPortHttpField;
Expand All @@ -33,7 +32,7 @@
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.server.AbstractHTTP2ServerConnectionFactory;
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory;
import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.HttpConfiguration;
Expand All @@ -54,7 +53,7 @@ public class AbstractTest

protected void start(HttpServlet servlet) throws Exception
{
HTTP2ServerConnectionFactory connectionFactory = new HTTP2ServerConnectionFactory(new HttpConfiguration());
HTTP2CServerConnectionFactory connectionFactory = new HTTP2CServerConnectionFactory(new HttpConfiguration());
connectionFactory.setInitialSessionRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
connectionFactory.setInitialStreamRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
prepareServer(connectionFactory);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@

package org.eclipse.jetty.http2.client;

import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
Expand All @@ -41,6 +45,7 @@
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.FrameType;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PingFrame;
import org.eclipse.jetty.http2.frames.PrefaceFrame;
Expand All @@ -63,6 +68,7 @@
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class PrefaceTest extends AbstractTest
Expand Down Expand Up @@ -332,4 +338,71 @@ public void onHeaders(HeadersFrame frame)
assertTrue(clientSettingsLatch.await(5, TimeUnit.SECONDS));
}
}

@Test
public void testInvalidServerPreface() throws Exception
{
try (ServerSocket server = new ServerSocket(0))
{
prepareClient();
client.start();

CountDownLatch failureLatch = new CountDownLatch(1);
Promise.Completable<Session> promise = new Promise.Completable<>();
InetSocketAddress address = new InetSocketAddress("localhost", server.getLocalPort());
client.connect(address, new Session.Listener.Adapter()
{
@Override
public void onFailure(Session session, Throwable failure)
{
failureLatch.countDown();
}
}, promise);

try (Socket socket = server.accept())
{
OutputStream output = socket.getOutputStream();
output.write("enough_junk_bytes".getBytes(StandardCharsets.UTF_8));

Session session = promise.get(5, TimeUnit.SECONDS);
assertNotNull(session);

assertTrue(failureLatch.await(5, TimeUnit.SECONDS));

// Verify that the client closed the socket.
InputStream input = socket.getInputStream();
while (true)
{
int read = input.read();
if (read < 0)
break;
}
}
}
}

@Test
public void testInvalidClientPreface() throws Exception
{
start(new ServerSessionListener.Adapter());

try (Socket client = new Socket("localhost", connector.getLocalPort()))
{
OutputStream output = client.getOutputStream();
output.write("enough_junk_bytes".getBytes(StandardCharsets.UTF_8));
output.flush();

byte[] bytes = new byte[1024];
InputStream input = client.getInputStream();
int read = input.read(bytes);
if (read < 0)
{
// Closing the connection without GOAWAY frame is fine.
return;
}

int type = bytes[3];
assertEquals(FrameType.GO_AWAY.getType(), type);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -425,12 +425,21 @@ public void failed(Throwable x)
super.failed(x);
}

/**
* @return whether the entry is stale and must not be processed
*/
private boolean isStale()
{
return !isProtocol() && stream != null && stream.isReset();
// If it is a protocol frame, process it.
if (isProtocolFrame(frame))
return false;
// It's an application frame; is the stream gone already?
if (stream == null)
return true;
return stream.isReset();
}

private boolean isProtocol()
private boolean isProtocolFrame(Frame frame)
{
switch (frame.getType())
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ public void onGoAway(final GoAwayFrame frame)
// We received a GO_AWAY, so try to write
// what's in the queue and then disconnect.
closeFrame = frame;
notifyClose(this, frame, new DisconnectCallback());
onClose(frame, new DisconnectCallback());
return;
}
break;
Expand Down Expand Up @@ -498,9 +498,15 @@ public void onWindowUpdate(WindowUpdateFrame frame)
public void onStreamFailure(int streamId, int error, String reason)
{
Callback callback = new ResetCallback(streamId, error, Callback.NOOP);
Throwable failure = toFailure("Stream failure", error, reason);
onStreamFailure(streamId, error, reason, failure, callback);
}

private void onStreamFailure(int streamId, int error, String reason, Throwable failure, Callback callback)
{
IStream stream = getStream(streamId);
if (stream != null)
stream.process(new FailureFrame(error, reason), callback);
stream.process(new FailureFrame(error, reason, failure), callback);
else
callback.succeeded();
}
Expand All @@ -513,7 +519,45 @@ public void onConnectionFailure(int error, String reason)

protected void onConnectionFailure(int error, String reason, Callback callback)
{
notifyFailure(this, new IOException(String.format("%d/%s", error, reason)), new CloseCallback(error, reason, callback));
Throwable failure = toFailure("Session failure", error, reason);
onFailure(error, reason, failure, new CloseCallback(error, reason, callback));
}

protected void abort(Throwable failure)
{
onFailure(ErrorCode.NO_ERROR.code, null, failure, new TerminateCallback(failure));
}

private void onFailure(int error, String reason, Throwable failure, Callback callback)
{
Collection<Stream> streams = getStreams();
int count = streams.size();
Callback countCallback = new CountingCallback(callback, count + 1);
for (Stream stream : streams)
{
onStreamFailure(stream.getId(), error, reason, failure, countCallback);
}
notifyFailure(this, failure, countCallback);
}

private void onClose(GoAwayFrame frame, Callback callback)
{
int error = frame.getError();
String reason = frame.tryConvertPayload();
Throwable failure = toFailure("Session close", error, reason);
Collection<Stream> streams = getStreams();
int count = streams.size();
Callback countCallback = new CountingCallback(callback, count + 1);
for (Stream stream : streams)
{
onStreamFailure(stream.getId(), error, reason, failure, countCallback);
}
notifyClose(this, frame, countCallback);
}

private Throwable toFailure(String message, int error, String reason)
{
return new IOException(String.format("%s %s/%s", message, ErrorCode.toString(error, null), reason));
}

@Override
Expand Down Expand Up @@ -998,11 +1042,6 @@ private void terminate(Throwable cause)
}
}

protected void abort(Throwable failure)
{
notifyFailure(this, failure, new TerminateCallback(failure));
}

public boolean isDisconnected()
{
return !endPoint.isOpen();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ private boolean startWrite(Callback callback)
{
if (writing.compareAndSet(null, callback))
return true;
close();
callback.failed(new WritePendingException());
return false;
}
Expand Down Expand Up @@ -177,7 +176,7 @@ public boolean isClosed()
public boolean isRemotelyClosed()
{
CloseState state = closeState.get();
return state == CloseState.REMOTELY_CLOSED || state == CloseState.CLOSING;
return state == CloseState.REMOTELY_CLOSED || state == CloseState.CLOSING || state == CloseState.CLOSED;
}

public boolean isLocallyClosed()
Expand Down Expand Up @@ -358,6 +357,8 @@ private void onWindowUpdate(WindowUpdateFrame frame, Callback callback)

private void onFailure(FailureFrame frame, Callback callback)
{
// Don't close or remove the stream, as the listener may
// want to use it, for example to send a RST_STREAM frame.
notifyFailure(this, frame, callback);
}

Expand Down Expand Up @@ -608,7 +609,7 @@ private void notifyFailure(Stream stream, FailureFrame frame, Callback callback)
{
try
{
listener.onFailure(stream, frame.getError(), frame.getReason(), callback);
listener.onFailure(stream, frame.getError(), frame.getReason(), frame.getFailure(), callback);
}
catch (Throwable x)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,24 @@ default boolean onIdleTimeout(Stream stream, Throwable x)
* @param stream the stream
* @param error the error code
* @param reason the error reason, or null
* @param failure the failure
* @param callback the callback to complete when the failure has been handled
*/
default void onFailure(Stream stream, int error, String reason, Throwable failure, Callback callback)
{
onFailure(stream, error, reason, callback);
}

/**
* <p>Callback method invoked when the stream failed.</p>
*
* @param stream the stream
* @param error the error code
* @param reason the error reason, or null
* @param callback the callback to complete when the failure has been handled
* @deprecated use {@link #onFailure(Stream, int, String, Throwable, Callback)} instead
*/
@Deprecated
default void onFailure(Stream stream, int error, String reason, Callback callback)
{
callback.succeeded();
Expand Down

0 comments on commit 56bda1b

Please sign in to comment.