Skip to content

Commit

Permalink
Merge pull request #5449 from eclipse/jetty-9.4.x-5409-invalid_respon…
Browse files Browse the repository at this point in the history
…se_state_transient

Fixes #5409 - HttpClient fails intermittently with "Invalid response …
  • Loading branch information
sbordet committed Oct 14, 2020
2 parents 4d0edf9 + c5df807 commit 7bfca25
Show file tree
Hide file tree
Showing 3 changed files with 183 additions and 135 deletions.
Expand Up @@ -290,11 +290,11 @@ public String toString()
{
synchronized (this)
{
return String.format("%s@%x req=%s/%s@%h res=%s/%s@%h",
return String.format("%s@%x{req=%s[%s/%s] res=%s[%s/%s]}",
HttpExchange.class.getSimpleName(),
hashCode(),
requestState, requestFailure, requestFailure,
responseState, responseFailure, responseFailure);
request, requestState, requestFailure,
response, responseState, responseFailure);
}
}

Expand Down
267 changes: 135 additions & 132 deletions jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java
Expand Up @@ -187,7 +187,7 @@ protected boolean responseBegin(HttpExchange exchange)
{
handlerListener = protocolHandler.getResponseListener();
if (LOG.isDebugEnabled())
LOG.debug("Found protocol handler {}", protocolHandler);
LOG.debug("Response {} found protocol handler {}", response, protocolHandler);
}
exchange.getConversation().updateResponseListeners(handlerListener);

Expand Down Expand Up @@ -218,19 +218,8 @@ protected boolean responseBegin(HttpExchange exchange)
*/
protected boolean responseHeader(HttpExchange exchange, HttpField field)
{
while (true)
{
ResponseState current = responseState.get();
if (current == ResponseState.BEGIN || current == ResponseState.HEADER)
{
if (updateResponseState(current, ResponseState.TRANSIENT))
break;
}
else
{
return false;
}
}
if (!updateResponseState(ResponseState.BEGIN, ResponseState.HEADER, ResponseState.TRANSIENT))
return false;

HttpResponse response = exchange.getResponse();
ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
Expand Down Expand Up @@ -296,19 +285,8 @@ protected void storeCookie(URI uri, HttpField field)
*/
protected boolean responseHeaders(HttpExchange exchange)
{
while (true)
{
ResponseState current = responseState.get();
if (current == ResponseState.BEGIN || current == ResponseState.HEADER)
{
if (updateResponseState(current, ResponseState.TRANSIENT))
break;
}
else
{
return false;
}
}
if (!updateResponseState(ResponseState.BEGIN, ResponseState.HEADER, ResponseState.TRANSIENT))
return false;

HttpResponse response = exchange.getResponse();
if (LOG.isDebugEnabled())
Expand Down Expand Up @@ -342,7 +320,7 @@ protected boolean responseHeaders(HttpExchange exchange)
{
boolean hasDemand = hasDemandOrStall();
if (LOG.isDebugEnabled())
LOG.debug("Response headers {}, hasDemand={}", response, hasDemand);
LOG.debug("Response headers hasDemand={} {}", hasDemand, response);
return hasDemand;
}

Expand All @@ -363,78 +341,51 @@ protected boolean responseHeaders(HttpExchange exchange)
*/
protected boolean responseContent(HttpExchange exchange, ByteBuffer buffer, Callback callback)
{
while (true)
{
ResponseState current = responseState.get();
if (current == ResponseState.HEADERS || current == ResponseState.CONTENT)
{
if (updateResponseState(current, ResponseState.TRANSIENT))
break;
}
else
{
callback.failed(new IllegalStateException("Invalid response state " + current));
return false;
}
}

boolean proceed = true;
if (LOG.isDebugEnabled())
LOG.debug("Response content {}{}{}", exchange.getResponse(), System.lineSeparator(), BufferUtil.toDetailString(buffer));
if (demand() <= 0)
{
callback.failed(new IllegalStateException("No demand for response content"));
proceed = false;
return false;
}
if (decoder == null)
return plainResponseContent(exchange, buffer, callback);
else
return decodeResponseContent(buffer, callback);
}

HttpResponse response = exchange.getResponse();
if (proceed)
private boolean plainResponseContent(HttpExchange exchange, ByteBuffer buffer, Callback callback)
{
if (!updateResponseState(ResponseState.HEADERS, ResponseState.CONTENT, ResponseState.TRANSIENT))
{
if (LOG.isDebugEnabled())
LOG.debug("Response content {}{}{}", response, System.lineSeparator(), BufferUtil.toDetailString(buffer));
if (contentListeners.isEmpty())
{
callback.succeeded();
}
else
{
if (decoder == null)
{
contentListeners.notifyContent(response, buffer, callback);
}
else
{
try
{
proceed = decoder.decode(buffer, callback);
}
catch (Throwable x)
{
callback.failed(x);
proceed = false;
}
}
}
callback.failed(new IllegalStateException("Invalid response state " + responseState));
return false;
}

HttpResponse response = exchange.getResponse();
if (contentListeners.isEmpty())
callback.succeeded();
else
contentListeners.notifyContent(response, buffer, callback);

if (updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT))
{
if (proceed)
{
boolean hasDemand = hasDemandOrStall();
if (LOG.isDebugEnabled())
LOG.debug("Response content {}, hasDemand={}", response, hasDemand);
return hasDemand;
}
else
{
return false;
}
boolean hasDemand = hasDemandOrStall();
if (LOG.isDebugEnabled())
LOG.debug("Response content {}, hasDemand={}", response, hasDemand);
return hasDemand;
}

dispose();
terminateResponse(exchange);
return false;
}

private boolean decodeResponseContent(ByteBuffer buffer, Callback callback)
{
return decoder.decode(buffer, callback);
}

/**
* Method to be invoked when the response is successful.
* <p>
Expand Down Expand Up @@ -614,15 +565,42 @@ public boolean abort(HttpExchange exchange, Throwable failure)
}
}

private boolean updateResponseState(ResponseState from1, ResponseState from2, ResponseState to)
{
while (true)
{
ResponseState current = responseState.get();
if (current == from1 || current == from2)
{
if (updateResponseState(current, to))
return true;
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("State update failed: [{},{}] -> {}: {}", from1, from2, to, current);
return false;
}
}
}

private boolean updateResponseState(ResponseState from, ResponseState to)
{
boolean updated = responseState.compareAndSet(from, to);
if (!updated)
while (true)
{
if (LOG.isDebugEnabled())
LOG.debug("State update failed: {} -> {}: {}", from, to, responseState.get());
ResponseState current = responseState.get();
if (current == from)
{
if (responseState.compareAndSet(current, to))
return true;
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("State update failed: {} -> {}: {}", from, to, current);
return false;
}
}
return updated;
}

@Override
Expand Down Expand Up @@ -778,14 +756,62 @@ private Decoder(HttpExchange exchange, ContentDecoder decoder)

private boolean decode(ByteBuffer encoded, Callback callback)
{
// Store the buffer to decode in case the
// decoding produces multiple decoded buffers.
this.encoded = encoded;
this.callback = callback;
return decode();

HttpResponse response = exchange.getResponse();
if (LOG.isDebugEnabled())
LOG.debug("Response content decoding {} with {}{}{}", response, decoder, System.lineSeparator(), BufferUtil.toDetailString(encoded));

boolean needInput = decode();
if (!needInput)
return false;

boolean hasDemand = hasDemandOrStall();
if (LOG.isDebugEnabled())
LOG.debug("Response content decoded, hasDemand={} {}", hasDemand, response);
return hasDemand;
}

private boolean decode()
{
while (true)
{
if (!updateResponseState(ResponseState.HEADERS, ResponseState.CONTENT, ResponseState.TRANSIENT))
{
callback.failed(new IllegalStateException("Invalid response state " + responseState));
return false;
}

DecodeResult result = decodeChunk();

if (updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT))
{
if (result == DecodeResult.NEED_INPUT)
return true;
if (result == DecodeResult.ABORT)
return false;

boolean hasDemand = hasDemandOrStall();
if (LOG.isDebugEnabled())
LOG.debug("Response content decoded chunk, hasDemand={} {}", hasDemand, exchange.getResponse());
if (hasDemand)
continue;
else
return false;
}

dispose();
terminateResponse(exchange);
return false;
}
}

private DecodeResult decodeChunk()
{
try
{
ByteBuffer buffer;
while (true)
Expand All @@ -798,27 +824,30 @@ private boolean decode()
callback.succeeded();
encoded = null;
callback = null;
return true;
return DecodeResult.NEED_INPUT;
}
}

ByteBuffer decoded = buffer;
HttpResponse response = exchange.getResponse();
if (LOG.isDebugEnabled())
LOG.debug("Response content decoded ({}) {}{}{}", decoder, exchange, System.lineSeparator(), BufferUtil.toDetailString(decoded));
LOG.debug("Response content decoded chunk {}{}{}", response, System.lineSeparator(), BufferUtil.toDetailString(decoded));

contentListeners.notifyContent(exchange.getResponse(), decoded, Callback.from(() -> decoder.release(decoded), callback::failed));
contentListeners.notifyContent(response, decoded, Callback.from(() -> decoder.release(decoded), callback::failed));

boolean hasDemand = hasDemandOrStall();
if (LOG.isDebugEnabled())
LOG.debug("Response content decoded {}, hasDemand={}", exchange, hasDemand);
if (!hasDemand)
return false;
return DecodeResult.DECODE;
}
catch (Throwable x)
{
callback.failed(x);
return DecodeResult.ABORT;
}
}

private void resume()
{
if (LOG.isDebugEnabled())
LOG.debug("Response content resuming decoding {}", exchange);
LOG.debug("Response content resume decoding {} with {}", exchange.getResponse(), decoder);

// The content and callback may be null
// if there is no initial content demand.
Expand All @@ -828,40 +857,9 @@ private void resume()
return;
}

while (true)
{
ResponseState current = responseState.get();
if (current == ResponseState.HEADERS || current == ResponseState.CONTENT)
{
if (updateResponseState(current, ResponseState.TRANSIENT))
break;
}
else
{
callback.failed(new IllegalStateException("Invalid response state " + current));
return;
}
}

boolean decoded = false;
try
{
decoded = decode();
}
catch (Throwable x)
{
callback.failed(x);
}

if (updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT))
{
if (decoded)
receive();
return;
}

dispose();
terminateResponse(exchange);
boolean needInput = decode();
if (needInput)
receive();
}

@Override
Expand All @@ -871,4 +869,9 @@ public void destroy()
((Destroyable)decoder).destroy();
}
}

private enum DecodeResult
{
DECODE, NEED_INPUT, ABORT
}
}

0 comments on commit 7bfca25

Please sign in to comment.