Skip to content

Commit

Permalink
Issue #6642 - never shutdown output after generating a request.
Browse files Browse the repository at this point in the history
Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
  • Loading branch information
lachlan-roberts committed Aug 26, 2021
1 parent fd6c72c commit 83f2265
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 32 deletions.
Expand Up @@ -21,7 +21,11 @@
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;

public class BufferCallbackAccumulator
/**
* This class can be used to accumulate pairs of {@link ByteBuffer} and {@link Callback}, and eventually copy
* these into a single {@link ByteBuffer} or byte array and succeed the callbacks.
*/
public class ByteBufferCallbackAccumulator
{
private final List<Entry> _entries = new ArrayList<>();
private int _length;
Expand All @@ -45,8 +49,6 @@ public void addEntry(ByteBuffer buffer, Callback callback)
}

/**
* Get the amount of bytes which have been accumulated.
* This will add up the remaining of each buffer in the accumulator.
* @return the total length of the content in the accumulator.
*/
public int getLength()
Expand Down
16 changes: 8 additions & 8 deletions jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java
Expand Up @@ -108,7 +108,7 @@ public InvocationType getInvocationType()
}

/**
* Create a callback from the passed success and failure
* Creates a callback from the passed success and failure
*
* @param success Called when the callback succeeds
* @param failure Called when the callback fails
Expand All @@ -133,7 +133,7 @@ public void failed(Throwable x)
}

/**
* Create a callback that runs completed when it succeeds or fails
* Creates a callback that runs completed when it succeeds or fails
*
* @param completed The completion to run on success or failure
* @return a new callback
Expand All @@ -150,7 +150,7 @@ public void completed()
}

/**
* Create a nested callback that runs completed after
* Creates a nested callback that runs completed after
* completing the nested callback.
*
* @param callback The nested callback
Expand All @@ -169,7 +169,7 @@ public void completed()
}

/**
* Create a nested callback that runs completed before
* Creates a nested callback that runs completed before
* completing the nested callback.
*
* @param callback The nested callback
Expand Down Expand Up @@ -212,7 +212,7 @@ public void failed(Throwable x)
}

/**
* Create a nested callback which always fails the nested callback on completion.
* Creates a nested callback which always fails the nested callback on completion.
*
* @param callback The nested callback
* @param cause The cause to fail the nested callback, if the new callback is failed the reason
Expand All @@ -239,7 +239,7 @@ public void failed(Throwable x)
}

/**
* Create a callback which combines two other callbacks and will succeed or fail them both.
* Creates a callback which combines two other callbacks and will succeed or fail them both.
* @param callback1 The first callback
* @param callback2 The second callback
* @return a new callback.
Expand Down Expand Up @@ -397,14 +397,14 @@ public InvocationType getInvocationType()
class Completable extends CompletableFuture<Void> implements Callback
{
/**
* Create a completable future given a callback.
* Creates a completable future given a callback.
*
* @param callback The nested callback.
* @return a new Completable which will succeed this callback when completed.
*/
public static Completable from(Callback callback)
{
return new Completable()
return new Completable(callback.getInvocationType())
{
@Override
public void succeeded()
Expand Down
Expand Up @@ -17,7 +17,7 @@
import java.lang.invoke.MethodType;
import java.nio.ByteBuffer;

import org.eclipse.jetty.io.BufferCallbackAccumulator;
import org.eclipse.jetty.io.ByteBufferCallbackAccumulator;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.CoreSession;
Expand All @@ -28,7 +28,7 @@
public class ByteArrayMessageSink extends AbstractMessageSink
{
private static final byte[] EMPTY_BUFFER = new byte[0];
private BufferCallbackAccumulator out;
private ByteBufferCallbackAccumulator out;

public ByteArrayMessageSink(CoreSession session, MethodHandle methodHandle)
{
Expand Down Expand Up @@ -77,7 +77,7 @@ public void accept(Frame frame, Callback callback)
{
ByteBuffer payload = frame.getPayload();
if (out == null)
out = new BufferCallbackAccumulator();
out = new ByteBufferCallbackAccumulator();
out.addEntry(payload, callback);
}

Expand Down
Expand Up @@ -18,7 +18,7 @@
import java.nio.ByteBuffer;
import java.util.Objects;

import org.eclipse.jetty.io.BufferCallbackAccumulator;
import org.eclipse.jetty.io.ByteBufferCallbackAccumulator;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
Expand All @@ -29,7 +29,7 @@

public class ByteBufferMessageSink extends AbstractMessageSink
{
private BufferCallbackAccumulator out;
private ByteBufferCallbackAccumulator out;

public ByteBufferMessageSink(CoreSession session, MethodHandle methodHandle)
{
Expand Down Expand Up @@ -75,7 +75,7 @@ public void accept(Frame frame, Callback callback)
{
ByteBuffer payload = frame.getPayload();
if (out == null)
out = new BufferCallbackAccumulator();
out = new ByteBufferCallbackAccumulator();
out.addEntry(payload, callback);
}

Expand Down
Expand Up @@ -151,11 +151,15 @@ public void accept(Frame frame, final Callback callback)
}
else
{
frameCallback = Callback.from(() ->
frameCallback = new Callback.Nested(callback)
{
callback.succeeded();
session.demand(1);
}, callback::failed);
@Override
public void succeeded()
{
session.demand(1);
super.succeeded();
}
};
}

typeSink.accept(frame, frameCallback);
Expand Down
Expand Up @@ -225,25 +225,16 @@ public void onFrame(Frame frame, Callback callback)
}
}

// Demand after succeeding any received frame
Callback demandingCallback = Callback.from(() ->
{
demand();
callback.succeeded();
},
callback::failed
);

switch (frame.getOpCode())
{
case OpCode.CLOSE:
onCloseFrame(frame, callback);
break;
case OpCode.PING:
onPingFrame(frame, demandingCallback);
onPingFrame(frame, callback);
break;
case OpCode.PONG:
onPongFrame(frame, demandingCallback);
onPongFrame(frame, callback);
break;
case OpCode.TEXT:
onTextFrame(frame, callback);
Expand Down Expand Up @@ -381,7 +372,9 @@ private void onPingFrame(Frame frame, Callback callback)
ByteBuffer payload = BufferUtil.copy(frame.getPayload());
getSession().getRemote().sendPong(payload, WriteCallback.NOOP);
}

callback.succeeded();
demand();
}

private void onPongFrame(Frame frame, Callback callback)
Expand All @@ -401,7 +394,9 @@ private void onPongFrame(Frame frame, Callback callback)
throw new WebSocketException(endpointInstance.getClass().getSimpleName() + " PONG method error: " + cause.getMessage(), cause);
}
}

callback.succeeded();
demand();
}

private void onTextFrame(Frame frame, Callback callback)
Expand Down

0 comments on commit 83f2265

Please sign in to comment.