Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #5409 - HttpClient fails intermittently with "Invalid response … #5449

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -290,11 +290,11 @@ public String toString()
{
synchronized (this)
{
return String.format("%s@%x req=%s/%s@%h res=%s/%s@%h",
return String.format("%s@%x{req=%s[%s/%s] res=%s[%s/%s]}",
HttpExchange.class.getSimpleName(),
hashCode(),
requestState, requestFailure, requestFailure,
responseState, responseFailure, responseFailure);
request, requestState, requestFailure,
response, responseState, responseFailure);
}
}

Expand Down
267 changes: 135 additions & 132 deletions jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java
Expand Up @@ -187,7 +187,7 @@ protected boolean responseBegin(HttpExchange exchange)
{
handlerListener = protocolHandler.getResponseListener();
if (LOG.isDebugEnabled())
LOG.debug("Found protocol handler {}", protocolHandler);
LOG.debug("Response {} found protocol handler {}", response, protocolHandler);
}
exchange.getConversation().updateResponseListeners(handlerListener);

Expand Down Expand Up @@ -218,19 +218,8 @@ protected boolean responseBegin(HttpExchange exchange)
*/
protected boolean responseHeader(HttpExchange exchange, HttpField field)
{
while (true)
{
ResponseState current = responseState.get();
if (current == ResponseState.BEGIN || current == ResponseState.HEADER)
{
if (updateResponseState(current, ResponseState.TRANSIENT))
break;
}
else
{
return false;
}
}
if (!updateResponseState(ResponseState.BEGIN, ResponseState.HEADER, ResponseState.TRANSIENT))
return false;

HttpResponse response = exchange.getResponse();
ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
Expand Down Expand Up @@ -296,19 +285,8 @@ protected void storeCookie(URI uri, HttpField field)
*/
protected boolean responseHeaders(HttpExchange exchange)
{
while (true)
{
ResponseState current = responseState.get();
if (current == ResponseState.BEGIN || current == ResponseState.HEADER)
{
if (updateResponseState(current, ResponseState.TRANSIENT))
break;
}
else
{
return false;
}
}
if (!updateResponseState(ResponseState.BEGIN, ResponseState.HEADER, ResponseState.TRANSIENT))
return false;

HttpResponse response = exchange.getResponse();
if (LOG.isDebugEnabled())
Expand Down Expand Up @@ -342,7 +320,7 @@ protected boolean responseHeaders(HttpExchange exchange)
{
boolean hasDemand = hasDemandOrStall();
if (LOG.isDebugEnabled())
LOG.debug("Response headers {}, hasDemand={}", response, hasDemand);
LOG.debug("Response headers hasDemand={} {}", hasDemand, response);
return hasDemand;
}

Expand All @@ -363,78 +341,51 @@ protected boolean responseHeaders(HttpExchange exchange)
*/
protected boolean responseContent(HttpExchange exchange, ByteBuffer buffer, Callback callback)
{
while (true)
{
ResponseState current = responseState.get();
if (current == ResponseState.HEADERS || current == ResponseState.CONTENT)
{
if (updateResponseState(current, ResponseState.TRANSIENT))
break;
}
else
{
callback.failed(new IllegalStateException("Invalid response state " + current));
return false;
}
}

boolean proceed = true;
if (LOG.isDebugEnabled())
LOG.debug("Response content {}{}{}", exchange.getResponse(), System.lineSeparator(), BufferUtil.toDetailString(buffer));
if (demand() <= 0)
{
callback.failed(new IllegalStateException("No demand for response content"));
proceed = false;
return false;
}
if (decoder == null)
return plainResponseContent(exchange, buffer, callback);
else
return decodeResponseContent(buffer, callback);
}

HttpResponse response = exchange.getResponse();
if (proceed)
private boolean plainResponseContent(HttpExchange exchange, ByteBuffer buffer, Callback callback)
{
if (!updateResponseState(ResponseState.HEADERS, ResponseState.CONTENT, ResponseState.TRANSIENT))
{
if (LOG.isDebugEnabled())
LOG.debug("Response content {}{}{}", response, System.lineSeparator(), BufferUtil.toDetailString(buffer));
if (contentListeners.isEmpty())
{
callback.succeeded();
}
else
{
if (decoder == null)
{
contentListeners.notifyContent(response, buffer, callback);
}
else
{
try
{
proceed = decoder.decode(buffer, callback);
}
catch (Throwable x)
{
callback.failed(x);
proceed = false;
}
}
}
callback.failed(new IllegalStateException("Invalid response state " + responseState));
return false;
}

HttpResponse response = exchange.getResponse();
if (contentListeners.isEmpty())
callback.succeeded();
else
contentListeners.notifyContent(response, buffer, callback);

if (updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT))
{
if (proceed)
{
boolean hasDemand = hasDemandOrStall();
if (LOG.isDebugEnabled())
LOG.debug("Response content {}, hasDemand={}", response, hasDemand);
return hasDemand;
}
else
{
return false;
}
boolean hasDemand = hasDemandOrStall();
if (LOG.isDebugEnabled())
LOG.debug("Response content {}, hasDemand={}", response, hasDemand);
return hasDemand;
}

dispose();
terminateResponse(exchange);
return false;
}

private boolean decodeResponseContent(ByteBuffer buffer, Callback callback)
{
return decoder.decode(buffer, callback);
}

/**
* Method to be invoked when the response is successful.
* <p>
Expand Down Expand Up @@ -614,15 +565,42 @@ public boolean abort(HttpExchange exchange, Throwable failure)
}
}

private boolean updateResponseState(ResponseState from1, ResponseState from2, ResponseState to)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not fond of this method's signature, what do you think of that one instead?

private boolean updateResponseState(EnumSet<ResponseState> from, ResponseState to)

that would make the callers look like this:

if (updateResponseState(EnumSet.of(ResponseState.HEADERS, ResponseState.CONTENT), ResponseState.TRANSIENT))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It allocates 😃

{
while (true)
{
ResponseState current = responseState.get();
if (current == from1 || current == from2)
{
if (updateResponseState(current, to))
return true;
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("State update failed: [{},{}] -> {}: {}", from1, from2, to, current);
return false;
}
}
}

private boolean updateResponseState(ResponseState from, ResponseState to)
{
boolean updated = responseState.compareAndSet(from, to);
if (!updated)
while (true)
{
if (LOG.isDebugEnabled())
LOG.debug("State update failed: {} -> {}: {}", from, to, responseState.get());
ResponseState current = responseState.get();
if (current == from)
{
if (responseState.compareAndSet(current, to))
return true;
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("State update failed: {} -> {}: {}", from, to, current);
return false;
}
}
return updated;
}

@Override
Expand Down Expand Up @@ -778,14 +756,62 @@ private Decoder(HttpExchange exchange, ContentDecoder decoder)

private boolean decode(ByteBuffer encoded, Callback callback)
{
// Store the buffer to decode in case the
// decoding produces multiple decoded buffers.
this.encoded = encoded;
this.callback = callback;
return decode();

HttpResponse response = exchange.getResponse();
if (LOG.isDebugEnabled())
LOG.debug("Response content decoding {} with {}{}{}", response, decoder, System.lineSeparator(), BufferUtil.toDetailString(encoded));

boolean needInput = decode();
if (!needInput)
return false;

boolean hasDemand = hasDemandOrStall();
if (LOG.isDebugEnabled())
LOG.debug("Response content decoded, hasDemand={} {}", hasDemand, response);
return hasDemand;
}

private boolean decode()
{
while (true)
{
if (!updateResponseState(ResponseState.HEADERS, ResponseState.CONTENT, ResponseState.TRANSIENT))
{
callback.failed(new IllegalStateException("Invalid response state " + responseState));
return false;
}

DecodeResult result = decodeChunk();

if (updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT))
{
if (result == DecodeResult.NEED_INPUT)
return true;
if (result == DecodeResult.ABORT)
return false;

boolean hasDemand = hasDemandOrStall();
if (LOG.isDebugEnabled())
LOG.debug("Response content decoded chunk, hasDemand={} {}", hasDemand, exchange.getResponse());
if (hasDemand)
continue;
else
return false;
}

dispose();
terminateResponse(exchange);
return false;
}
}

private DecodeResult decodeChunk()
{
try
{
ByteBuffer buffer;
while (true)
Expand All @@ -798,27 +824,30 @@ private boolean decode()
callback.succeeded();
encoded = null;
callback = null;
return true;
return DecodeResult.NEED_INPUT;
}
}

ByteBuffer decoded = buffer;
HttpResponse response = exchange.getResponse();
if (LOG.isDebugEnabled())
LOG.debug("Response content decoded ({}) {}{}{}", decoder, exchange, System.lineSeparator(), BufferUtil.toDetailString(decoded));
LOG.debug("Response content decoded chunk {}{}{}", response, System.lineSeparator(), BufferUtil.toDetailString(decoded));

contentListeners.notifyContent(exchange.getResponse(), decoded, Callback.from(() -> decoder.release(decoded), callback::failed));
contentListeners.notifyContent(response, decoded, Callback.from(() -> decoder.release(decoded), callback::failed));

boolean hasDemand = hasDemandOrStall();
if (LOG.isDebugEnabled())
LOG.debug("Response content decoded {}, hasDemand={}", exchange, hasDemand);
if (!hasDemand)
return false;
return DecodeResult.DECODE;
}
catch (Throwable x)
{
callback.failed(x);
return DecodeResult.ABORT;
}
}

private void resume()
{
if (LOG.isDebugEnabled())
LOG.debug("Response content resuming decoding {}", exchange);
LOG.debug("Response content resume decoding {} with {}", exchange.getResponse(), decoder);

// The content and callback may be null
// if there is no initial content demand.
Expand All @@ -828,40 +857,9 @@ private void resume()
return;
}

while (true)
{
ResponseState current = responseState.get();
if (current == ResponseState.HEADERS || current == ResponseState.CONTENT)
{
if (updateResponseState(current, ResponseState.TRANSIENT))
break;
}
else
{
callback.failed(new IllegalStateException("Invalid response state " + current));
return;
}
}

boolean decoded = false;
try
{
decoded = decode();
}
catch (Throwable x)
{
callback.failed(x);
}

if (updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT))
{
if (decoded)
receive();
return;
}

dispose();
terminateResponse(exchange);
boolean needInput = decode();
if (needInput)
receive();
}

@Override
Expand All @@ -871,4 +869,9 @@ public void destroy()
((Destroyable)decoder).destroy();
}
}

private enum DecodeResult
{
DECODE, NEED_INPUT, ABORT
}
}