Skip to content

Commit

Permalink
Fixes #5409 - HttpClient fails intermittently with "Invalid response …
Browse files Browse the repository at this point in the history
…state TRANSIENT".

The problem was a race condition during content decoding.
Since decoding needs to be done in a loop, the condition to loop is to
check whether there is demand for the next chunk of decoded content.

Checking for demand also sets the stalled flag, and this must be done
only after the response state has been set back to CONTENT.
Unfortunately this was not done in the decoding loop.

The fix is to always update the response state in the decoding loop.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet committed Oct 13, 2020
1 parent c37c2c5 commit c5df807
Show file tree
Hide file tree
Showing 3 changed files with 183 additions and 135 deletions.
Expand Up @@ -290,11 +290,11 @@ public String toString()
{
synchronized (this)
{
return String.format("%s@%x req=%s/%s@%h res=%s/%s@%h",
return String.format("%s@%x{req=%s[%s/%s] res=%s[%s/%s]}",
HttpExchange.class.getSimpleName(),
hashCode(),
requestState, requestFailure, requestFailure,
responseState, responseFailure, responseFailure);
request, requestState, requestFailure,
response, responseState, responseFailure);
}
}

Expand Down
267 changes: 135 additions & 132 deletions jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java
Expand Up @@ -187,7 +187,7 @@ protected boolean responseBegin(HttpExchange exchange)
{
handlerListener = protocolHandler.getResponseListener();
if (LOG.isDebugEnabled())
LOG.debug("Found protocol handler {}", protocolHandler);
LOG.debug("Response {} found protocol handler {}", response, protocolHandler);
}
exchange.getConversation().updateResponseListeners(handlerListener);

Expand Down Expand Up @@ -218,19 +218,8 @@ protected boolean responseBegin(HttpExchange exchange)
*/
protected boolean responseHeader(HttpExchange exchange, HttpField field)
{
while (true)
{
ResponseState current = responseState.get();
if (current == ResponseState.BEGIN || current == ResponseState.HEADER)
{
if (updateResponseState(current, ResponseState.TRANSIENT))
break;
}
else
{
return false;
}
}
if (!updateResponseState(ResponseState.BEGIN, ResponseState.HEADER, ResponseState.TRANSIENT))
return false;

HttpResponse response = exchange.getResponse();
ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
Expand Down Expand Up @@ -296,19 +285,8 @@ protected void storeCookie(URI uri, HttpField field)
*/
protected boolean responseHeaders(HttpExchange exchange)
{
while (true)
{
ResponseState current = responseState.get();
if (current == ResponseState.BEGIN || current == ResponseState.HEADER)
{
if (updateResponseState(current, ResponseState.TRANSIENT))
break;
}
else
{
return false;
}
}
if (!updateResponseState(ResponseState.BEGIN, ResponseState.HEADER, ResponseState.TRANSIENT))
return false;

HttpResponse response = exchange.getResponse();
if (LOG.isDebugEnabled())
Expand Down Expand Up @@ -342,7 +320,7 @@ protected boolean responseHeaders(HttpExchange exchange)
{
boolean hasDemand = hasDemandOrStall();
if (LOG.isDebugEnabled())
LOG.debug("Response headers {}, hasDemand={}", response, hasDemand);
LOG.debug("Response headers hasDemand={} {}", hasDemand, response);
return hasDemand;
}

Expand All @@ -363,78 +341,51 @@ protected boolean responseHeaders(HttpExchange exchange)
*/
protected boolean responseContent(HttpExchange exchange, ByteBuffer buffer, Callback callback)
{
while (true)
{
ResponseState current = responseState.get();
if (current == ResponseState.HEADERS || current == ResponseState.CONTENT)
{
if (updateResponseState(current, ResponseState.TRANSIENT))
break;
}
else
{
callback.failed(new IllegalStateException("Invalid response state " + current));
return false;
}
}

boolean proceed = true;
if (LOG.isDebugEnabled())
LOG.debug("Response content {}{}{}", exchange.getResponse(), System.lineSeparator(), BufferUtil.toDetailString(buffer));
if (demand() <= 0)
{
callback.failed(new IllegalStateException("No demand for response content"));
proceed = false;
return false;
}
if (decoder == null)
return plainResponseContent(exchange, buffer, callback);
else
return decodeResponseContent(buffer, callback);
}

HttpResponse response = exchange.getResponse();
if (proceed)
private boolean plainResponseContent(HttpExchange exchange, ByteBuffer buffer, Callback callback)
{
if (!updateResponseState(ResponseState.HEADERS, ResponseState.CONTENT, ResponseState.TRANSIENT))
{
if (LOG.isDebugEnabled())
LOG.debug("Response content {}{}{}", response, System.lineSeparator(), BufferUtil.toDetailString(buffer));
if (contentListeners.isEmpty())
{
callback.succeeded();
}
else
{
if (decoder == null)
{
contentListeners.notifyContent(response, buffer, callback);
}
else
{
try
{
proceed = decoder.decode(buffer, callback);
}
catch (Throwable x)
{
callback.failed(x);
proceed = false;
}
}
}
callback.failed(new IllegalStateException("Invalid response state " + responseState));
return false;
}

HttpResponse response = exchange.getResponse();
if (contentListeners.isEmpty())
callback.succeeded();
else
contentListeners.notifyContent(response, buffer, callback);

if (updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT))
{
if (proceed)
{
boolean hasDemand = hasDemandOrStall();
if (LOG.isDebugEnabled())
LOG.debug("Response content {}, hasDemand={}", response, hasDemand);
return hasDemand;
}
else
{
return false;
}
boolean hasDemand = hasDemandOrStall();
if (LOG.isDebugEnabled())
LOG.debug("Response content {}, hasDemand={}", response, hasDemand);
return hasDemand;
}

dispose();
terminateResponse(exchange);
return false;
}

private boolean decodeResponseContent(ByteBuffer buffer, Callback callback)
{
return decoder.decode(buffer, callback);
}

/**
* Method to be invoked when the response is successful.
* <p>
Expand Down Expand Up @@ -614,15 +565,42 @@ public boolean abort(HttpExchange exchange, Throwable failure)
}
}

private boolean updateResponseState(ResponseState from1, ResponseState from2, ResponseState to)
{
while (true)
{
ResponseState current = responseState.get();
if (current == from1 || current == from2)
{
if (updateResponseState(current, to))
return true;
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("State update failed: [{},{}] -> {}: {}", from1, from2, to, current);
return false;
}
}
}

private boolean updateResponseState(ResponseState from, ResponseState to)
{
boolean updated = responseState.compareAndSet(from, to);
if (!updated)
while (true)
{
if (LOG.isDebugEnabled())
LOG.debug("State update failed: {} -> {}: {}", from, to, responseState.get());
ResponseState current = responseState.get();
if (current == from)
{
if (responseState.compareAndSet(current, to))
return true;
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("State update failed: {} -> {}: {}", from, to, current);
return false;
}
}
return updated;
}

@Override
Expand Down Expand Up @@ -778,14 +756,62 @@ private Decoder(HttpExchange exchange, ContentDecoder decoder)

private boolean decode(ByteBuffer encoded, Callback callback)
{
// Store the buffer to decode in case the
// decoding produces multiple decoded buffers.
this.encoded = encoded;
this.callback = callback;
return decode();

HttpResponse response = exchange.getResponse();
if (LOG.isDebugEnabled())
LOG.debug("Response content decoding {} with {}{}{}", response, decoder, System.lineSeparator(), BufferUtil.toDetailString(encoded));

boolean needInput = decode();
if (!needInput)
return false;

boolean hasDemand = hasDemandOrStall();
if (LOG.isDebugEnabled())
LOG.debug("Response content decoded, hasDemand={} {}", hasDemand, response);
return hasDemand;
}

private boolean decode()
{
while (true)
{
if (!updateResponseState(ResponseState.HEADERS, ResponseState.CONTENT, ResponseState.TRANSIENT))
{
callback.failed(new IllegalStateException("Invalid response state " + responseState));
return false;
}

DecodeResult result = decodeChunk();

if (updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT))
{
if (result == DecodeResult.NEED_INPUT)
return true;
if (result == DecodeResult.ABORT)
return false;

boolean hasDemand = hasDemandOrStall();
if (LOG.isDebugEnabled())
LOG.debug("Response content decoded chunk, hasDemand={} {}", hasDemand, exchange.getResponse());
if (hasDemand)
continue;
else
return false;
}

dispose();
terminateResponse(exchange);
return false;
}
}

private DecodeResult decodeChunk()
{
try
{
ByteBuffer buffer;
while (true)
Expand All @@ -798,27 +824,30 @@ private boolean decode()
callback.succeeded();
encoded = null;
callback = null;
return true;
return DecodeResult.NEED_INPUT;
}
}

ByteBuffer decoded = buffer;
HttpResponse response = exchange.getResponse();
if (LOG.isDebugEnabled())
LOG.debug("Response content decoded ({}) {}{}{}", decoder, exchange, System.lineSeparator(), BufferUtil.toDetailString(decoded));
LOG.debug("Response content decoded chunk {}{}{}", response, System.lineSeparator(), BufferUtil.toDetailString(decoded));

contentListeners.notifyContent(exchange.getResponse(), decoded, Callback.from(() -> decoder.release(decoded), callback::failed));
contentListeners.notifyContent(response, decoded, Callback.from(() -> decoder.release(decoded), callback::failed));

boolean hasDemand = hasDemandOrStall();
if (LOG.isDebugEnabled())
LOG.debug("Response content decoded {}, hasDemand={}", exchange, hasDemand);
if (!hasDemand)
return false;
return DecodeResult.DECODE;
}
catch (Throwable x)
{
callback.failed(x);
return DecodeResult.ABORT;
}
}

private void resume()
{
if (LOG.isDebugEnabled())
LOG.debug("Response content resuming decoding {}", exchange);
LOG.debug("Response content resume decoding {} with {}", exchange.getResponse(), decoder);

// The content and callback may be null
// if there is no initial content demand.
Expand All @@ -828,40 +857,9 @@ private void resume()
return;
}

while (true)
{
ResponseState current = responseState.get();
if (current == ResponseState.HEADERS || current == ResponseState.CONTENT)
{
if (updateResponseState(current, ResponseState.TRANSIENT))
break;
}
else
{
callback.failed(new IllegalStateException("Invalid response state " + current));
return;
}
}

boolean decoded = false;
try
{
decoded = decode();
}
catch (Throwable x)
{
callback.failed(x);
}

if (updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT))
{
if (decoded)
receive();
return;
}

dispose();
terminateResponse(exchange);
boolean needInput = decode();
if (needInput)
receive();
}

@Override
Expand All @@ -871,4 +869,9 @@ public void destroy()
((Destroyable)decoder).destroy();
}
}

private enum DecodeResult
{
DECODE, NEED_INPUT, ABORT
}
}

0 comments on commit c5df807

Please sign in to comment.