Skip to content

Commit

Permalink
Route processing refactor (#8463)
Browse files Browse the repository at this point in the history
This is a refactor of route execution, excluding filters (those will follow in #8422). The goals are:

* Moving away from reactive APIs where possible.
* Streamlining route execution so that it will be possible to have pre-route filters, and filters that read the request body.

This initial patch factors most of the routing code out of RoutingInboundHandler (the response writing code remains), and moves away from reactive for HttpContentProcessor and the route parameter fulfillment code (now called BaseRouteCompleter and FormRouteCompleter).

The goal of this PR is to get to a point where I am confident that it will be easy to delay filter execution until the body is available. Ideally the full request lifecycle, from before route resolution, through filter execution, to route execution, will be contained in the RouteRunner class.
  • Loading branch information
yawkat committed Dec 14, 2022
1 parent 629db38 commit 5379d93
Show file tree
Hide file tree
Showing 31 changed files with 2,372 additions and 1,602 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,17 @@
*/
package io.micronaut.http.client

import io.micronaut.core.async.annotation.SingleResult
import io.micronaut.context.annotation.Property
import io.micronaut.context.annotation.Requires
import io.micronaut.core.async.annotation.SingleResult
import io.micronaut.core.async.publisher.Publishers
import io.micronaut.core.convert.format.Format
import io.micronaut.core.type.Argument
import io.micronaut.http.*
import io.micronaut.http.HttpRequest
import io.micronaut.http.HttpResponse
import io.micronaut.http.HttpStatus
import io.micronaut.http.MediaType
import io.micronaut.http.MutableHttpRequest
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.http.annotation.Header
Expand Down Expand Up @@ -52,6 +56,7 @@ import java.util.function.Consumer
*/
@MicronautTest
@Property(name = 'spec.name', value = 'HttpGetSpec')
@Property(name = 'micronaut.http.client.read-timeout', value = '30s')
class HttpGetSpec extends Specification {

@Inject
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,22 @@
package io.micronaut.http.server.netty;

import io.micronaut.core.annotation.Internal;
import io.micronaut.core.async.processor.SingleSubscriberProcessor;
import io.micronaut.http.exceptions.ContentLengthExceededException;
import io.micronaut.http.netty.stream.StreamedHttpMessage;
import io.micronaut.http.server.HttpServerConfiguration;
import io.netty.buffer.ByteBufHolder;
import io.netty.util.ReferenceCountUtil;
import org.reactivestreams.Subscriber;

import java.util.Collection;
import java.util.concurrent.atomic.AtomicLong;

/**
* Abstract implementation of the {@link HttpContentProcessor} interface that deals with limiting file upload sizes.
*
* @param <T> The type
* @author Graeme Rocher
* @since 1.0
*/
@Internal
public abstract class AbstractHttpContentProcessor<T> extends SingleSubscriberProcessor<ByteBufHolder, T> implements HttpContentProcessor<T> {
public abstract class AbstractHttpContentProcessor implements HttpContentProcessor {

protected final NettyHttpRequest<?> nettyHttpRequest;
protected final long advertisedLength;
Expand All @@ -57,17 +54,12 @@ public AbstractHttpContentProcessor(NettyHttpRequest<?> nettyHttpRequest, HttpSe
* Called after verifying the data of the message.
*
* @param message The message
* @param out The collection to add any produced messages to
*/
protected abstract void onData(ByteBufHolder message);
protected abstract void onData(ByteBufHolder message, Collection<Object> out) throws Throwable;

@Override
protected final void doSubscribe(Subscriber<? super T> subscriber) {
StreamedHttpMessage message = (StreamedHttpMessage) nettyHttpRequest.getNativeRequest();
message.subscribe(this);
}

@Override
protected final void doOnNext(ByteBufHolder message) {
public void add(ByteBufHolder message, Collection<Object> out) throws Throwable {
long receivedLength = this.receivedLength.addAndGet(message.content().readableBytes());

ReferenceCountUtil.touch(message);
Expand All @@ -76,7 +68,7 @@ protected final void doOnNext(ByteBufHolder message) {
} else if (receivedLength > requestMaxSize) {
fireExceedsLength(receivedLength, requestMaxSize, message);
} else {
onData(message);
onData(message, out);
}
}

Expand All @@ -86,11 +78,7 @@ protected final void doOnNext(ByteBufHolder message) {
* @param message The message to release
*/
protected void fireExceedsLength(long receivedLength, long expected, ByteBufHolder message) {
try {
onError(new ContentLengthExceededException(expected, receivedLength));
} finally {
ReferenceCountUtil.safeRelease(message);
parentSubscription.cancel();
}
ReferenceCountUtil.safeRelease(message);
throw new ContentLengthExceededException(expected, receivedLength);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright 2017-2022 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.http.server.netty;

import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.io.buffer.ReferenceCounted;
import io.micronaut.http.server.netty.multipart.NettyCompletedFileUpload;
import io.micronaut.web.router.RouteMatch;
import io.netty.buffer.ByteBufHolder;
import io.netty.util.ReferenceCountUtil;

/**
* This class consumes objects produced by a {@link HttpContentProcessor}. Normally it just adds
* the data to the {@link NettyHttpRequest}. For multipart data, there is additional logic in
* {@link FormRouteCompleter} that also dynamically binds parameters, though usually this is done
* by the {@link io.micronaut.http.server.binding.RequestArgumentSatisfier}.
*
* @since 4.0.0
* @author Jonas Konrad
*/
@Internal
class BaseRouteCompleter {
final NettyHttpRequest<?> request;
volatile boolean needsInput = true;
/**
* Optional runnable that may be called from other threads (i.e. downstream subscribers) to
* notify that {@link #needsInput} may have changed.
*/
@Nullable
volatile Runnable checkDemand;
RouteMatch<?> routeMatch;
boolean execute = false;

public BaseRouteCompleter(NettyHttpRequest<?> request, RouteMatch<?> routeMatch) {
this.request = request;
this.routeMatch = routeMatch;
}

final void add(Object message) throws Throwable {
try {
if (request.destroyed) {
// we don't want this message anymore
ReferenceCountUtil.release(message);
return;
}

if (message instanceof ByteBufHolder bbh) {
addHolder(bbh);
} else {
((NettyHttpRequest) request).setBody(message);
needsInput = true;
}

// now, a pseudo try-finally with addSuppressed.
} catch (Throwable t) {
try {
ReferenceCountUtil.release(message);
} catch (Throwable u) {
t.addSuppressed(u);
}
throw t;
}

// the upstream processor gives us ownership of the message, so we need to release it.
ReferenceCountUtil.release(message);
}

protected void addHolder(ByteBufHolder holder) {
request.addContent(holder);
needsInput = true;
}

void completeSuccess() {
execute = true;
}

void completeFailure(Throwable failure) {
if (!execute) {
// discard parameters that have already been bound
for (Object toDiscard : routeMatch.getVariableValues().values()) {
if (toDiscard instanceof ReferenceCounted rc) {
rc.release();
}
if (toDiscard instanceof io.netty.util.ReferenceCounted rc) {
rc.release();
}
if (toDiscard instanceof NettyCompletedFileUpload fu) {
fu.discard();
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
package io.micronaut.http.server.netty;

import io.micronaut.core.annotation.Internal;
import io.micronaut.core.async.processor.SingleThreadedBufferingProcessor;
import io.micronaut.core.async.subscriber.SingleThreadedBufferingSubscriber;
import io.micronaut.http.exceptions.ContentLengthExceededException;
import io.micronaut.http.netty.stream.StreamedHttpMessage;
import io.micronaut.http.server.HttpServerConfiguration;
Expand All @@ -26,8 +24,8 @@
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.multipart.HttpData;
import io.netty.util.ReferenceCountUtil;
import org.reactivestreams.Subscriber;

import java.util.Collection;
import java.util.concurrent.atomic.AtomicLong;

/**
Expand All @@ -37,9 +35,9 @@
* @since 1.0
*/
@Internal
public class DefaultHttpContentProcessor extends SingleThreadedBufferingProcessor<ByteBufHolder, ByteBufHolder> implements HttpContentProcessor<ByteBufHolder> {
public class DefaultHttpContentProcessor implements HttpContentProcessor {

protected final NettyHttpRequest nettyHttpRequest;
protected final NettyHttpRequest<?> nettyHttpRequest;
protected final ChannelHandlerContext ctx;
protected final HttpServerConfiguration configuration;
protected final long advertisedLength;
Expand All @@ -65,23 +63,15 @@ public DefaultHttpContentProcessor(NettyHttpRequest<?> nettyHttpRequest, HttpSer
}

@Override
public final void subscribe(Subscriber<? super ByteBufHolder> downstreamSubscriber) {
super.subscribe(downstreamSubscriber);
//ensures the subscriber is present before subscribing to the message
StreamedHttpMessage message = (StreamedHttpMessage) nettyHttpRequest.getNativeRequest();
message.subscribe(this);
}

@Override
protected void onUpstreamMessage(ByteBufHolder message) {
public void add(ByteBufHolder message, Collection<Object> out) {
long receivedLength = this.receivedLength.addAndGet(resolveLength(message));

if (advertisedLength > requestMaxSize) {
fireExceedsLength(advertisedLength, requestMaxSize, message);
} else if (receivedLength > requestMaxSize) {
fireExceedsLength(receivedLength, requestMaxSize, message);
} else {
publishVerifiedContent(message);
out.add(message);
}
}

Expand All @@ -94,16 +84,7 @@ private long resolveLength(ByteBufHolder message) {
}

private void fireExceedsLength(long receivedLength, long expected, ByteBufHolder message) {
upstreamState = SingleThreadedBufferingSubscriber.BackPressureState.DONE;
upstreamSubscription.cancel();
upstreamBuffer.clear();
currentDownstreamSubscriber().ifPresent(subscriber ->
subscriber.onError(new ContentLengthExceededException(expected, receivedLength))
);
ReferenceCountUtil.safeRelease(message);
}

private void publishVerifiedContent(ByteBufHolder message) {
currentDownstreamSubscriber().ifPresent(subscriber -> subscriber.onNext(message));
throw new ContentLengthExceededException(expected, receivedLength);
}
}

0 comments on commit 5379d93

Please sign in to comment.