Skip to content

Commit

Permalink
Merge pull request #5144 from franz1981/4.x_less_sync
Browse files Browse the repository at this point in the history
Fix InboundBuffer::pause & write performance
  • Loading branch information
vietj committed May 16, 2024
2 parents 645b816 + a789e11 commit f8244af
Show file tree
Hide file tree
Showing 3 changed files with 244 additions and 190 deletions.
65 changes: 40 additions & 25 deletions src/main/java/io/vertx/core/http/impl/Http1xServerRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,21 +124,6 @@ void setRequest(HttpRequest request) {
}
}

private InboundBuffer<Object> pendingQueue() {
if (pending == null) {
pending = new InboundBuffer<>(context, 8);
pending.drainHandler(v -> conn.doResume());
pending.handler(buffer -> {
if (buffer == InboundBuffer.END_SENTINEL) {
onEnd();
} else {
onData((Buffer) buffer);
}
});
}
return pending;
}

void handleContent(Buffer buffer) {
InboundBuffer<Object> queue;
synchronized (conn) {
Expand Down Expand Up @@ -348,15 +333,37 @@ public HttpServerRequest exceptionHandler(Handler<Throwable> handler) {
@Override
public HttpServerRequest pause() {
synchronized (conn) {
pendingQueue().pause();
if (pending != null) {
pending.pause();
} else {
pending = InboundBuffer.createPaused(context, 8, pendingDrainHandler(), pendingHandler());
}
return this;
}
}

private Handler<Object> pendingHandler() {
return buffer -> {
if (buffer == InboundBuffer.END_SENTINEL) {
onEnd();
} else {
onData((Buffer) buffer);
}
};
}

private Handler<Void> pendingDrainHandler() {
return v -> conn.doResume();
}

@Override
public HttpServerRequest fetch(long amount) {
synchronized (conn) {
pendingQueue().fetch(amount);
if (pending != null) {
pending.fetch(amount);
} else {
pending = InboundBuffer.createAndFetch(context, 8, amount, pendingDrainHandler(), pendingHandler());
}
return this;
}
}
Expand Down Expand Up @@ -571,29 +578,37 @@ private void onData(Buffer data) {

void handleEnd() {
InboundBuffer<Object> queue;
HttpEventHandler handler = null;
synchronized (conn) {
ended = true;
queue = pending;
if (queue == null) {
handler = endRequest();
}
}
if (queue != null) {
queue.write(InboundBuffer.END_SENTINEL);
} else {
onEnd();
} else if (handler != null) {
handler.handleEnd();
}
}

private void onEnd() {
private HttpEventHandler endRequest() {
assert Thread.holdsLock(conn);
if (METRICS_ENABLED) {
reportRequestComplete();
}
if (decoder != null) {
endDecode();
}
return eventHandler;
}

private void onEnd() {
HttpEventHandler handler;
synchronized (conn) {
if (decoder != null) {
endDecode();
}
handler = eventHandler;
handler = endRequest();
}
// If there have been uploads then we let the last one call the end handler once any fileuploads are complete
if (handler != null) {
handler.handleEnd();
}
Expand Down
66 changes: 53 additions & 13 deletions src/main/java/io/vertx/core/streams/impl/InboundBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,13 @@
*/
package io.vertx.core.streams.impl;

import io.netty.util.concurrent.FastThreadLocalThread;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;

import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Objects;

/**
* A buffer that transfers elements to an handler with back-pressure.
Expand Down Expand Up @@ -87,6 +86,28 @@ public InboundBuffer(Context context) {
}

public InboundBuffer(Context context, long highWaterMark) {
this(context, highWaterMark, Long.MAX_VALUE, null, null);
}

public static <E> InboundBuffer<E> createPaused(Context context, long highWaterMark, Handler<Void> drainHandler, Handler<E> handler) {
Objects.requireNonNull(drainHandler);
Objects.requireNonNull(handler);
return new InboundBuffer<>(context, highWaterMark, 0L, drainHandler, handler);
}

public static <E> InboundBuffer<E> createAndFetch(Context context, long highWaterMark, long demand, Handler<Void> drainHandler, Handler<E> handler) {
Objects.requireNonNull(drainHandler);
Objects.requireNonNull(handler);
checkPositiveAmount(demand);
InboundBuffer<E> inboundBuffer = new InboundBuffer<>(context, highWaterMark, Long.MAX_VALUE, drainHandler, handler);
if (inboundBuffer.emit(demand)) {
inboundBuffer.asyncDrain();
inboundBuffer.context.runOnContext(v -> inboundBuffer.drain());
}
return inboundBuffer;
}

private InboundBuffer(Context context, long highWaterMark, long demand, Handler<Void> drainHandler, Handler<E> handler) {
if (context == null) {
throw new NullPointerException("context must not be null");
}
Expand All @@ -95,9 +116,11 @@ public InboundBuffer(Context context, long highWaterMark) {
}
this.context = (ContextInternal) context;
this.highWaterMark = highWaterMark;
this.demand = Long.MAX_VALUE;
this.demand = demand;
// empty ArrayDeque's constructor ArrayDeque allocates 16 elements; let's delay the allocation to be of the proper size
this.pending = null;
this.drainHandler = drainHandler;
this.handler = handler;
}

private void checkThread() {
Expand Down Expand Up @@ -139,7 +162,8 @@ private boolean checkWritable() {
if (demand == Long.MAX_VALUE) {
return true;
} else {
long actual = size() - demand;
int size = pending == null ? 0 : pending.size();
long actual = size - demand;
boolean writable = actual < highWaterMark;
overflow |= !writable;
return writable;
Expand Down Expand Up @@ -279,23 +303,39 @@ private void handleException(Throwable err) {
* @return {@code true} when the buffer will be drained
*/
public boolean fetch(long amount) {
if (amount < 0L) {
throw new IllegalArgumentException();
}
checkPositiveAmount(amount);
synchronized (this) {
demand += amount;
if (demand < 0L) {
demand = Long.MAX_VALUE;
}
if (emitting || (isEmpty() && !overflow)) {
if (!emit(amount)) {
return false;
}
emitting = true;
}
asyncDrain();
return true;
}

private void asyncDrain() {
context.runOnContext(v -> drain());
}

private boolean emit(long amount) {
assert amount >= 0L;
demand += amount;
if (demand < 0L) {
demand = Long.MAX_VALUE;
}
if (emitting || (isEmpty() && !overflow)) {
return false;
}
emitting = true;
return true;
}

private static void checkPositiveAmount(long amount) {
if (amount < 0L) {
throw new IllegalArgumentException();
}
}

/**
* Read the most recent element synchronously.
* <p/>
Expand Down

0 comments on commit f8244af

Please sign in to comment.