Skip to content

Commit

Permalink
Issue #4331 Async Close Complete
Browse files Browse the repository at this point in the history
Cleanups of write

Signed-off-by: Greg Wilkins <gregw@webtide.com>
  • Loading branch information
gregw committed Dec 3, 2019
1 parent d686f58 commit 41daa3f
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 65 deletions.
122 changes: 58 additions & 64 deletions jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java
Expand Up @@ -228,27 +228,25 @@ protected Blocker acquireWriteBlockingCallback() throws IOException
return _writeBlocker.acquire();
}

private void write(ByteBuffer content, boolean complete) throws IOException
private void channelWrite(ByteBuffer content, boolean complete) throws IOException
{
try (Blocker blocker = _writeBlocker.acquire())
{
write(content, complete, blocker);
channelWrite(content, complete, blocker);
blocker.block();
if (complete)
closed();
}
catch (Exception failure)
catch (Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug(failure);
abort(failure);
if (failure instanceof IOException)
throw failure;
throw new IOException(failure);
throw failure;
}
}

protected void write(ByteBuffer content, boolean complete, Callback callback)
protected void channelWrite(ByteBuffer content, boolean complete, Callback callback)
{
if (_firstByteTimeStamp == -1)
{
Expand Down Expand Up @@ -373,14 +371,14 @@ public void close()
if (closeCallback == BLOCKING_CLOSE_CALLBACK)
{
// Do a blocking close
write(content, !_channel.getResponse().isIncluding());
channelWrite(content, !_channel.getResponse().isIncluding());
_closeCallback = null;
closeCallback.succeeded();
}
else
{
_closeCallback = null;
write(content, !_channel.getResponse().isIncluding(), closeCallback);
channelWrite(content, !_channel.getResponse().isIncluding(), closeCallback);
}
}
catch (IOException x)
Expand Down Expand Up @@ -489,7 +487,7 @@ public void flush() throws IOException
switch (state)
{
case OPEN:
write(BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER, false);
channelWrite(BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER, false);
return;

case ASYNC:
Expand Down Expand Up @@ -521,39 +519,43 @@ public void flush() throws IOException
@Override
public void write(byte[] b, int off, int len) throws IOException
{
long written = _written + len;
boolean last = _channel.getResponse().isAllContentWritten(written);
int space = _aggregate == null ? getBufferSize() : BufferUtil.space(_aggregate);
// Write will be aggregated if:
// + it is smaller than the commitSize
// + is not the last one, or is last but will fit in an already allocated aggregate buffer.
boolean aggregate = len <= _commitSize && (!last || BufferUtil.hasContent(_aggregate) && len <= space);
boolean flush = last || !aggregate || len >= space;

// Async or Blocking ?
while (true)
{
State state = _state.get();
switch (state)
{
case OPEN:
// process blocking below
// process blocking write below
_written = written;
break;

case ASYNC:
throw new IllegalStateException("isReady() not called");

case READY:
if (!_state.compareAndSet(state, State.PENDING))
if (!_state.compareAndSet(state, flush ? State.PENDING : State.ASYNC))
continue;
_written = written;

// Should we aggregate?
boolean last = isLastContentToWrite(len);
if (!last && len <= _commitSize)
if (aggregate)
{
acquireBuffer();

// YES - fill the aggregate with content from the buffer
int filled = BufferUtil.fill(_aggregate, b, off, len);

// return if we are not complete, not full and filled all the content
if (filled == len && !BufferUtil.isFull(_aggregate))
{
if (!_state.compareAndSet(State.PENDING, State.ASYNC))
throw new IllegalStateException(_state.get().toString());
if (!flush)
return;
}

// adjust offset/length
off += filled;
Expand Down Expand Up @@ -582,20 +584,13 @@ public void write(byte[] b, int off, int len) throws IOException
}

// handle blocking write

// Should we aggregate?
// Yes - if the write is smaller than the commitSize (==aggregate buffer size)
// and the write is not the last one, or is last but will fit in an already allocated aggregate buffer.
boolean last = isLastContentToWrite(len);
if (len <= _commitSize && (!last || len <= BufferUtil.space(_aggregate)))
if (aggregate)
{
acquireBuffer();

// YES - fill the aggregate with content from the buffer
int filled = BufferUtil.fill(_aggregate, b, off, len);

// return if we are not the last write and have aggregated all of the content
if (!last && filled == len && !BufferUtil.isFull(_aggregate))
// return if we are not complete, not full and filled all the content
if (!flush)
return;

// adjust offset/length
Expand All @@ -606,7 +601,7 @@ public void write(byte[] b, int off, int len) throws IOException
// flush any content from the aggregate
if (BufferUtil.hasContent(_aggregate))
{
write(_aggregate, last && len == 0);
channelWrite(_aggregate, last && len == 0);

// should we fill aggregate again from the buffer?
if (len > 0 && !last && len <= _commitSize && len <= BufferUtil.space(_aggregate))
Expand All @@ -622,21 +617,20 @@ public void write(byte[] b, int off, int len) throws IOException
// write a buffer capacity at a time to avoid JVM pooling large direct buffers
// http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6210541
ByteBuffer view = ByteBuffer.wrap(b, off, len);

while (len > getBufferSize())
{
int p = view.position();
int l = p + getBufferSize();
view.limit(p + getBufferSize());
write(view, false);
channelWrite(view, false);
view.limit(p + len);
len -= getBufferSize();
view.limit(l + Math.min(len, getBufferSize()));
view.position(l);
}
write(view, last);
channelWrite(view, last);
}
else if (last)
{
write(BufferUtil.EMPTY_BUFFER, true);
channelWrite(BufferUtil.EMPTY_BUFFER, true);
}
}

Expand Down Expand Up @@ -689,55 +683,55 @@ public void write(ByteBuffer buffer) throws IOException

// flush any content from the aggregate
if (BufferUtil.hasContent(_aggregate))
write(_aggregate, last && len == 0);
channelWrite(_aggregate, last && len == 0);

// write any remaining content in the buffer directly
if (len > 0)
write(buffer, last);
channelWrite(buffer, last);
else if (last)
write(BufferUtil.EMPTY_BUFFER, true);
channelWrite(BufferUtil.EMPTY_BUFFER, true);
}

@Override
public void write(int b) throws IOException
{
_written += 1;
boolean complete = _channel.getResponse().isAllContentWritten(_written);
long written = _written + 1;
boolean last = _channel.getResponse().isAllContentWritten(written);
int space = _aggregate == null ? getBufferSize() : BufferUtil.space(_aggregate);
boolean flush = last || space == 1;

// Async or Blocking ?
while (true)
{
switch (_state.get())
{
case OPEN:
_written = written;
acquireBuffer();
BufferUtil.append(_aggregate, (byte)b);

// Check if all written or full
if (complete || BufferUtil.isFull(_aggregate))
write(_aggregate, complete);
if (flush)
channelWrite(_aggregate, last);
break;

case ASYNC:
throw new IllegalStateException("isReady() not called");

case READY:
if (!_state.compareAndSet(State.READY, State.PENDING))
if (!_state.compareAndSet(State.READY, flush ? State.PENDING : State.ASYNC))
continue;

_written = written;
acquireBuffer();
BufferUtil.append(_aggregate, (byte)b);

// Check if all written or full
if (!complete && !BufferUtil.isFull(_aggregate))
if (flush)
{
if (!_state.compareAndSet(State.PENDING, State.ASYNC))
throw new IllegalStateException();
return;
// Do the asynchronous writing from the callback
new AsyncFlush().iterate();
}

// Do the asynchronous writing from the callback
new AsyncFlush().iterate();
return;

case PENDING:
Expand Down Expand Up @@ -884,7 +878,7 @@ public void sendContent(ByteBuffer content) throws IOException
LOG.debug("sendContent({})", BufferUtil.toDetailString(content));

_written += content.remaining();
write(content, true);
channelWrite(content, true);
}

/**
Expand Down Expand Up @@ -965,7 +959,7 @@ public void sendContent(ByteBuffer content, final Callback callback)
LOG.debug("sendContent(buffer={},{})", BufferUtil.toDetailString(content), callback);

_written += content.remaining();
write(content, true, new Callback.Nested(callback)
channelWrite(content, true, new Callback.Nested(callback)
{
@Override
public void succeeded()
Expand Down Expand Up @@ -1319,7 +1313,7 @@ protected void onCompleteSuccess()
if (_last)
closed();
if (_channel.getState().onWritePossible())
_channel.execute(_channel);
_channel.execute(_channel); // TODO can we call directly? Why execute?
break;

case CLOSED:
Expand Down Expand Up @@ -1356,14 +1350,14 @@ protected Action process()
if (BufferUtil.hasContent(_aggregate))
{
_flushed = true;
write(_aggregate, false, this);
channelWrite(_aggregate, false, this);
return Action.SCHEDULED;
}

if (!_flushed)
{
_flushed = true;
write(BufferUtil.EMPTY_BUFFER, false, this);
channelWrite(BufferUtil.EMPTY_BUFFER, false, this);
return Action.SCHEDULED;
}

Expand Down Expand Up @@ -1408,7 +1402,7 @@ protected Action process()
if (BufferUtil.hasContent(_aggregate))
{
_completed = _len == 0;
write(_aggregate, _last && _completed, this);
channelWrite(_aggregate, _last && _completed, this);
return Action.SCHEDULED;
}

Expand All @@ -1428,7 +1422,7 @@ protected Action process()
if (_slice == null)
{
_completed = true;
write(_buffer, _last, this);
channelWrite(_buffer, _last, this);
return Action.SCHEDULED;
}

Expand All @@ -1440,7 +1434,7 @@ protected Action process()
_buffer.position(pl);
_slice.position(p);
_completed = !_buffer.hasRemaining();
write(_slice, _last && _completed, this);
channelWrite(_slice, _last && _completed, this);
return Action.SCHEDULED;
}

Expand All @@ -1449,7 +1443,7 @@ protected Action process()
if (_last && !_completed)
{
_completed = true;
write(BufferUtil.EMPTY_BUFFER, true, this);
channelWrite(BufferUtil.EMPTY_BUFFER, true, this);
return Action.SCHEDULED;
}

Expand Down Expand Up @@ -1511,7 +1505,7 @@ protected Action process() throws Exception
_buffer.position(0);
_buffer.limit(len);
_written += len;
write(_buffer, _eof, this);
channelWrite(_buffer, _eof, this);
return Action.SCHEDULED;
}

Expand Down Expand Up @@ -1572,7 +1566,7 @@ protected Action process() throws Exception
// write what we have
BufferUtil.flipToFlush(_buffer, 0);
_written += _buffer.remaining();
write(_buffer, _eof, this);
channelWrite(_buffer, _eof, this);

return Action.SCHEDULED;
}
Expand Down
Expand Up @@ -148,7 +148,7 @@ public void flush()
out.flush();
}
}
catch (IOException ex)
catch (Throwable ex)
{
setError(ex);
}
Expand Down

0 comments on commit 41daa3f

Please sign in to comment.