Skip to content

Commit

Permalink
Issue #4331 Close Complete
Browse files Browse the repository at this point in the history
Better javadoc
refactored onWriteComplete logic to be simpler
fixed bug with flush of last written byte

Signed-off-by: Greg Wilkins <gregw@webtide.com>
  • Loading branch information
gregw committed Dec 17, 2019
1 parent d623c00 commit 88fff66
Showing 1 changed file with 54 additions and 43 deletions.
97 changes: 54 additions & 43 deletions jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java
Expand Up @@ -105,13 +105,13 @@ enum State
\ \ |
+--owcL------------------->--owcL--------------------------------+
swl = setWriteListener
w = write
owc = onWriteComplete(false,null)
owcL = onWriteComplete(true,null)
irf = isReady()==false
irt = osReady()==true
last = close() or complete(Callback) or write of known last content
swl : setWriteListener
w : write
owc : onWriteComplete last == false
owcL : onWriteComplete last == true
irf : isReady() == false
irt : isReady() == true
last : close() or complete(Callback) or write of known last content
</pre>
*/
enum ApiState
Expand Down Expand Up @@ -285,63 +285,41 @@ private void channelWrite(ByteBuffer content, boolean last, Callback callback)

private void onWriteComplete(boolean last, Throwable failure)
{
String state = null;
boolean wake = false;
Callback closedCallback = null;
ByteBuffer closeContent = null;
synchronized (_channelState)
{
if (LOG.isDebugEnabled())
state = stateString();

// Transition to CLOSED state if we were the last write or we have failed
if (last || failure != null)
{
_state = State.CLOSED;
closedCallback = _closedCallback;
_closedCallback = null;
releaseBuffer();
wake = updateApiState(failure);
}

// Did somebody require a close while we were writing?
if (_state == State.CLOSE)
else if (_state == State.CLOSE)
{
// Somebody called close or complete while we were writing.
// We can now send a (probably empty) last buffer and then when it completes
// onWriteCompletion will be called again to actually execute the _completeCallback
_state = State.CLOSING;
closeContent = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER;
}
else
{
switch (_apiState)
{
case BLOCKED:
_apiState = ApiState.BLOCKING;
break;

case PENDING:
_apiState = ApiState.ASYNC;
if (failure != null)
{
_onError = failure;
wake = _channelState.onWritePossible();
}
break;

case UNREADY:
_apiState = ApiState.READY;
if (failure != null)
_onError = failure;
wake = _channelState.onWritePossible();
break;

default:
if (_state == State.CLOSED)
break;
throw new IllegalStateException(stateString());
}
wake = updateApiState(null);
}
}

if (LOG.isDebugEnabled())
LOG.debug("onWriteComplete({},{}) {} c={} cb={} w={}",
last, failure, stateString(), BufferUtil.toDetailString(closeContent), closedCallback, wake);
LOG.debug("onWriteComplete({},{}) {}->{} c={} cb={} w={}",
last, failure, state, stateString(), BufferUtil.toDetailString(closeContent), closedCallback, wake);

if (failure != null)
_channel.abort(failure);
Expand Down Expand Up @@ -369,6 +347,39 @@ private void onWriteComplete(boolean last, Throwable failure)
}
}

private boolean updateApiState(Throwable failure)
{
boolean wake = false;
switch (_apiState)
{
case BLOCKED:
_apiState = ApiState.BLOCKING;
break;

case PENDING:
_apiState = ApiState.ASYNC;
if (failure != null)
{
_onError = failure;
wake = _channelState.onWritePossible();
}
break;

case UNREADY:
_apiState = ApiState.READY;
if (failure != null)
_onError = failure;
wake = _channelState.onWritePossible();
break;

default:
if (_state == State.CLOSED)
break;
throw new IllegalStateException(stateString());
}
return wake;
}

public void softClose()
{
synchronized (_channelState)
Expand Down Expand Up @@ -676,7 +687,7 @@ public void flush() throws IOException
}

if (content == null)
new AsyncFlush().iterate();
new AsyncFlush(false).iterate();
else
{
try
Expand Down Expand Up @@ -960,7 +971,7 @@ public void write(int b) throws IOException

if (async)
// Do the asynchronous writing from the callback
new AsyncFlush().iterate();
new AsyncFlush(last).iterate();
else
{
try
Expand Down Expand Up @@ -1562,9 +1573,9 @@ private class AsyncFlush extends ChannelWriteCB
{
volatile boolean _flushed;

AsyncFlush()
AsyncFlush(boolean last)
{
super(false);
super(last);
}

@Override
Expand Down

0 comments on commit 88fff66

Please sign in to comment.