From c5df807b6d5de75118aca797dc168327b7364ebd Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Tue, 13 Oct 2020 23:18:26 +0200 Subject: [PATCH] Fixes #5409 - HttpClient fails intermittently with "Invalid response state TRANSIENT". The problem was a race condition during content decoding. Since decoding needs to be done in a loop, the condition to loop is to check whether there is demand for the next chunk of decoded content. Checking for demand also sets the stalled flag, and this must be done only after the response state has been set back to CONTENT. Unfortunately this was not done in the decoding loop. The fix is to always update the response state in the decoding loop. Signed-off-by: Simone Bordet --- .../eclipse/jetty/client/HttpExchange.java | 6 +- .../eclipse/jetty/client/HttpReceiver.java | 267 +++++++++--------- .../jetty/client/HttpClientGZIPTest.java | 45 +++ 3 files changed, 183 insertions(+), 135 deletions(-) diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java index 94d204fe5594..f90feaa12155 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java @@ -290,11 +290,11 @@ public String toString() { synchronized (this) { - return String.format("%s@%x req=%s/%s@%h res=%s/%s@%h", + return String.format("%s@%x{req=%s[%s/%s] res=%s[%s/%s]}", HttpExchange.class.getSimpleName(), hashCode(), - requestState, requestFailure, requestFailure, - responseState, responseFailure, responseFailure); + request, requestState, requestFailure, + response, responseState, responseFailure); } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java index 271273887685..35df5596fdcc 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java @@ -187,7 +187,7 @@ protected boolean responseBegin(HttpExchange exchange) { handlerListener = protocolHandler.getResponseListener(); if (LOG.isDebugEnabled()) - LOG.debug("Found protocol handler {}", protocolHandler); + LOG.debug("Response {} found protocol handler {}", response, protocolHandler); } exchange.getConversation().updateResponseListeners(handlerListener); @@ -218,19 +218,8 @@ protected boolean responseBegin(HttpExchange exchange) */ protected boolean responseHeader(HttpExchange exchange, HttpField field) { - while (true) - { - ResponseState current = responseState.get(); - if (current == ResponseState.BEGIN || current == ResponseState.HEADER) - { - if (updateResponseState(current, ResponseState.TRANSIENT)) - break; - } - else - { - return false; - } - } + if (!updateResponseState(ResponseState.BEGIN, ResponseState.HEADER, ResponseState.TRANSIENT)) + return false; HttpResponse response = exchange.getResponse(); ResponseNotifier notifier = getHttpDestination().getResponseNotifier(); @@ -296,19 +285,8 @@ protected void storeCookie(URI uri, HttpField field) */ protected boolean responseHeaders(HttpExchange exchange) { - while (true) - { - ResponseState current = responseState.get(); - if (current == ResponseState.BEGIN || current == ResponseState.HEADER) - { - if (updateResponseState(current, ResponseState.TRANSIENT)) - break; - } - else - { - return false; - } - } + if (!updateResponseState(ResponseState.BEGIN, ResponseState.HEADER, ResponseState.TRANSIENT)) + return false; HttpResponse response = exchange.getResponse(); if (LOG.isDebugEnabled()) @@ -342,7 +320,7 @@ protected boolean responseHeaders(HttpExchange exchange) { boolean hasDemand = hasDemandOrStall(); if (LOG.isDebugEnabled()) - LOG.debug("Response headers {}, hasDemand={}", response, hasDemand); + LOG.debug("Response headers hasDemand={} {}", hasDemand, response); return hasDemand; } @@ -363,71 +341,39 @@ protected boolean responseHeaders(HttpExchange exchange) */ protected boolean responseContent(HttpExchange exchange, ByteBuffer buffer, Callback callback) { - while (true) - { - ResponseState current = responseState.get(); - if (current == ResponseState.HEADERS || current == ResponseState.CONTENT) - { - if (updateResponseState(current, ResponseState.TRANSIENT)) - break; - } - else - { - callback.failed(new IllegalStateException("Invalid response state " + current)); - return false; - } - } - - boolean proceed = true; + if (LOG.isDebugEnabled()) + LOG.debug("Response content {}{}{}", exchange.getResponse(), System.lineSeparator(), BufferUtil.toDetailString(buffer)); if (demand() <= 0) { callback.failed(new IllegalStateException("No demand for response content")); - proceed = false; + return false; } + if (decoder == null) + return plainResponseContent(exchange, buffer, callback); + else + return decodeResponseContent(buffer, callback); + } - HttpResponse response = exchange.getResponse(); - if (proceed) + private boolean plainResponseContent(HttpExchange exchange, ByteBuffer buffer, Callback callback) + { + if (!updateResponseState(ResponseState.HEADERS, ResponseState.CONTENT, ResponseState.TRANSIENT)) { - if (LOG.isDebugEnabled()) - LOG.debug("Response content {}{}{}", response, System.lineSeparator(), BufferUtil.toDetailString(buffer)); - if (contentListeners.isEmpty()) - { - callback.succeeded(); - } - else - { - if (decoder == null) - { - contentListeners.notifyContent(response, buffer, callback); - } - else - { - try - { - proceed = decoder.decode(buffer, callback); - } - catch (Throwable x) - { - callback.failed(x); - proceed = false; - } - } - } + callback.failed(new IllegalStateException("Invalid response state " + responseState)); + return false; } + HttpResponse response = exchange.getResponse(); + if (contentListeners.isEmpty()) + callback.succeeded(); + else + contentListeners.notifyContent(response, buffer, callback); + if (updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT)) { - if (proceed) - { - boolean hasDemand = hasDemandOrStall(); - if (LOG.isDebugEnabled()) - LOG.debug("Response content {}, hasDemand={}", response, hasDemand); - return hasDemand; - } - else - { - return false; - } + boolean hasDemand = hasDemandOrStall(); + if (LOG.isDebugEnabled()) + LOG.debug("Response content {}, hasDemand={}", response, hasDemand); + return hasDemand; } dispose(); @@ -435,6 +381,11 @@ protected boolean responseContent(HttpExchange exchange, ByteBuffer buffer, Call return false; } + private boolean decodeResponseContent(ByteBuffer buffer, Callback callback) + { + return decoder.decode(buffer, callback); + } + /** * Method to be invoked when the response is successful. *

@@ -614,15 +565,42 @@ public boolean abort(HttpExchange exchange, Throwable failure) } } + private boolean updateResponseState(ResponseState from1, ResponseState from2, ResponseState to) + { + while (true) + { + ResponseState current = responseState.get(); + if (current == from1 || current == from2) + { + if (updateResponseState(current, to)) + return true; + } + else + { + if (LOG.isDebugEnabled()) + LOG.debug("State update failed: [{},{}] -> {}: {}", from1, from2, to, current); + return false; + } + } + } + private boolean updateResponseState(ResponseState from, ResponseState to) { - boolean updated = responseState.compareAndSet(from, to); - if (!updated) + while (true) { - if (LOG.isDebugEnabled()) - LOG.debug("State update failed: {} -> {}: {}", from, to, responseState.get()); + ResponseState current = responseState.get(); + if (current == from) + { + if (responseState.compareAndSet(current, to)) + return true; + } + else + { + if (LOG.isDebugEnabled()) + LOG.debug("State update failed: {} -> {}: {}", from, to, current); + return false; + } } - return updated; } @Override @@ -778,14 +756,62 @@ private Decoder(HttpExchange exchange, ContentDecoder decoder) private boolean decode(ByteBuffer encoded, Callback callback) { + // Store the buffer to decode in case the + // decoding produces multiple decoded buffers. this.encoded = encoded; this.callback = callback; - return decode(); + + HttpResponse response = exchange.getResponse(); + if (LOG.isDebugEnabled()) + LOG.debug("Response content decoding {} with {}{}{}", response, decoder, System.lineSeparator(), BufferUtil.toDetailString(encoded)); + + boolean needInput = decode(); + if (!needInput) + return false; + + boolean hasDemand = hasDemandOrStall(); + if (LOG.isDebugEnabled()) + LOG.debug("Response content decoded, hasDemand={} {}", hasDemand, response); + return hasDemand; } private boolean decode() { while (true) + { + if (!updateResponseState(ResponseState.HEADERS, ResponseState.CONTENT, ResponseState.TRANSIENT)) + { + callback.failed(new IllegalStateException("Invalid response state " + responseState)); + return false; + } + + DecodeResult result = decodeChunk(); + + if (updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT)) + { + if (result == DecodeResult.NEED_INPUT) + return true; + if (result == DecodeResult.ABORT) + return false; + + boolean hasDemand = hasDemandOrStall(); + if (LOG.isDebugEnabled()) + LOG.debug("Response content decoded chunk, hasDemand={} {}", hasDemand, exchange.getResponse()); + if (hasDemand) + continue; + else + return false; + } + + dispose(); + terminateResponse(exchange); + return false; + } + } + + private DecodeResult decodeChunk() + { + try { ByteBuffer buffer; while (true) @@ -798,27 +824,30 @@ private boolean decode() callback.succeeded(); encoded = null; callback = null; - return true; + return DecodeResult.NEED_INPUT; } } + ByteBuffer decoded = buffer; + HttpResponse response = exchange.getResponse(); if (LOG.isDebugEnabled()) - LOG.debug("Response content decoded ({}) {}{}{}", decoder, exchange, System.lineSeparator(), BufferUtil.toDetailString(decoded)); + LOG.debug("Response content decoded chunk {}{}{}", response, System.lineSeparator(), BufferUtil.toDetailString(decoded)); - contentListeners.notifyContent(exchange.getResponse(), decoded, Callback.from(() -> decoder.release(decoded), callback::failed)); + contentListeners.notifyContent(response, decoded, Callback.from(() -> decoder.release(decoded), callback::failed)); - boolean hasDemand = hasDemandOrStall(); - if (LOG.isDebugEnabled()) - LOG.debug("Response content decoded {}, hasDemand={}", exchange, hasDemand); - if (!hasDemand) - return false; + return DecodeResult.DECODE; + } + catch (Throwable x) + { + callback.failed(x); + return DecodeResult.ABORT; } } private void resume() { if (LOG.isDebugEnabled()) - LOG.debug("Response content resuming decoding {}", exchange); + LOG.debug("Response content resume decoding {} with {}", exchange.getResponse(), decoder); // The content and callback may be null // if there is no initial content demand. @@ -828,40 +857,9 @@ private void resume() return; } - while (true) - { - ResponseState current = responseState.get(); - if (current == ResponseState.HEADERS || current == ResponseState.CONTENT) - { - if (updateResponseState(current, ResponseState.TRANSIENT)) - break; - } - else - { - callback.failed(new IllegalStateException("Invalid response state " + current)); - return; - } - } - - boolean decoded = false; - try - { - decoded = decode(); - } - catch (Throwable x) - { - callback.failed(x); - } - - if (updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT)) - { - if (decoded) - receive(); - return; - } - - dispose(); - terminateResponse(exchange); + boolean needInput = decode(); + if (needInput) + receive(); } @Override @@ -871,4 +869,9 @@ public void destroy() ((Destroyable)decoder).destroy(); } } + + private enum DecodeResult + { + DECODE, NEED_INPUT, ABORT + } } diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientGZIPTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientGZIPTest.java index e29f0a3b573a..3120a97619d9 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientGZIPTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientGZIPTest.java @@ -20,6 +20,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InputStream; import java.io.InterruptedIOException; import java.nio.charset.StandardCharsets; import java.util.Arrays; @@ -32,11 +33,14 @@ import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.client.api.Response; +import org.eclipse.jetty.client.util.InputStreamResponseListener; import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.MappedByteBufferPool; import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.util.IO; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ArgumentsSource; @@ -266,6 +270,47 @@ protected void service(String target, Request jettyRequest, HttpServletRequest r assertThat(heapMemory, lessThan((long)content.length)); } + @ParameterizedTest + @ArgumentsSource(ScenarioProvider.class) + public void testLargeGZIPContentAsync(Scenario scenario) throws Exception + { + String digits = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; + Random random = new Random(); + byte[] content = new byte[32 * 1024 * 1024]; + for (int i = 0; i < content.length; ++i) + { + content[i] = (byte)digits.charAt(random.nextInt(digits.length())); + } + start(scenario, new EmptyServerHandler() + { + @Override + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + response.setContentType("text/plain;charset=" + StandardCharsets.US_ASCII.name()); + response.setHeader(HttpHeader.CONTENT_ENCODING.asString(), "gzip"); + GZIPOutputStream gzip = new GZIPOutputStream(response.getOutputStream()); + gzip.write(content); + gzip.finish(); + } + }); + + InputStreamResponseListener listener = new InputStreamResponseListener(); + client.newRequest("localhost", connector.getLocalPort()) + .scheme(scenario.getScheme()) + .timeout(5, TimeUnit.SECONDS) + .send(listener); + + Response response = listener.get(5, TimeUnit.SECONDS); + assertEquals(HttpStatus.OK_200, response.getStatus()); + + ByteArrayOutputStream output = new ByteArrayOutputStream(); + try (InputStream input = listener.getInputStream()) + { + IO.copy(input, output); + } + assertArrayEquals(content, output.toByteArray()); + } + private static void sleep(long ms) throws IOException { try