Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Route processing refactor #8463

Merged
merged 34 commits into from
Dec 14, 2022
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
3308f4b
factor out some RoutingInBoundHandler code, new HttpData implementation
yawkat Dec 1, 2022
ecee25a
remove HttpDataReference
yawkat Dec 2, 2022
bbccb79
docs for MicronautHttpData
yawkat Dec 2, 2022
19870a7
Merge branch '4.0.x' into route-refactor
yawkat Dec 2, 2022
e3a418f
remove one Mono
yawkat Dec 2, 2022
ac3ec90
fix demand from downstream publishers
yawkat Dec 2, 2022
ed495c7
refactor HttpContentProcessor to work independent of reactive
yawkat Dec 5, 2022
0ca937e
remove one reactive step in RouteRunner :)
yawkat Dec 5, 2022
dc610ff
move some more code
yawkat Dec 5, 2022
be23672
move exception handler also
yawkat Dec 5, 2022
da4ddda
make HttpContentProcessor work on an output parameter instead
yawkat Dec 5, 2022
bc825a2
fix context propagation from filters to controller
yawkat Dec 6, 2022
20ac0ef
"fix" test for reactive filters
yawkat Dec 6, 2022
87a9aec
document BaseRouteCompleter
yawkat Dec 6, 2022
7bbbfca
move most RouteExecutor code to a new RequestLifecycle class
yawkat Dec 7, 2022
c422f7c
move websocket handler to RequestLifecycle as well
yawkat Dec 7, 2022
35ebe55
checkstyle
yawkat Dec 7, 2022
2715e87
this test is flaky
yawkat Dec 7, 2022
db6afd5
make HttpContentProcessor an interface again
yawkat Dec 7, 2022
7b4e23f
sonar
yawkat Dec 7, 2022
68cd933
sonar
yawkat Dec 7, 2022
1cfdfe6
address review comments
yawkat Dec 9, 2022
bbfd57b
address review comments
yawkat Dec 9, 2022
d5bafc3
this is wrong after all
yawkat Dec 10, 2022
686de0f
Fix findRouteMatch for aws requests
yawkat Dec 10, 2022
4a040af
Move away from getUri.toString in more places
yawkat Dec 10, 2022
a5aedcd
inline handleNormal0
yawkat Dec 13, 2022
fa2d56e
improve subscriber error handling
yawkat Dec 13, 2022
23cf02b
inline RIB.find
yawkat Dec 13, 2022
c4651fa
simplify instanceof
yawkat Dec 13, 2022
beb53f3
replace HttpContentProcessorAsReactive* with reactor
yawkat Dec 13, 2022
c2f185c
in MultipartBodyArgumentBinder, always assume HttpData
yawkat Dec 13, 2022
7b0b240
sonar
yawkat Dec 13, 2022
31ea9da
checkstyle
yawkat Dec 13, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,17 @@
*/
package io.micronaut.http.client

import io.micronaut.core.async.annotation.SingleResult
import io.micronaut.context.annotation.Property
import io.micronaut.context.annotation.Requires
import io.micronaut.core.async.annotation.SingleResult
import io.micronaut.core.async.publisher.Publishers
import io.micronaut.core.convert.format.Format
import io.micronaut.core.type.Argument
import io.micronaut.http.*
import io.micronaut.http.HttpRequest
import io.micronaut.http.HttpResponse
import io.micronaut.http.HttpStatus
import io.micronaut.http.MediaType
import io.micronaut.http.MutableHttpRequest
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.http.annotation.Header
Expand Down Expand Up @@ -52,6 +56,7 @@ import java.util.function.Consumer
*/
@MicronautTest
@Property(name = 'spec.name', value = 'HttpGetSpec')
@Property(name = 'micronaut.http.client.read-timeout', value = '30s')
class HttpGetSpec extends Specification {

@Inject
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,22 @@
package io.micronaut.http.server.netty;

import io.micronaut.core.annotation.Internal;
import io.micronaut.core.async.processor.SingleSubscriberProcessor;
import io.micronaut.http.exceptions.ContentLengthExceededException;
import io.micronaut.http.netty.stream.StreamedHttpMessage;
import io.micronaut.http.server.HttpServerConfiguration;
import io.netty.buffer.ByteBufHolder;
import io.netty.util.ReferenceCountUtil;
import org.reactivestreams.Subscriber;

import java.util.Collection;
import java.util.concurrent.atomic.AtomicLong;

/**
* Abstract implementation of the {@link HttpContentProcessor} interface that deals with limiting file upload sizes.
*
* @param <T> The type
* @author Graeme Rocher
* @since 1.0
*/
@Internal
public abstract class AbstractHttpContentProcessor<T> extends SingleSubscriberProcessor<ByteBufHolder, T> implements HttpContentProcessor<T> {
public abstract class AbstractHttpContentProcessor implements HttpContentProcessor {

protected final NettyHttpRequest<?> nettyHttpRequest;
protected final long advertisedLength;
Expand All @@ -57,17 +54,12 @@ public AbstractHttpContentProcessor(NettyHttpRequest<?> nettyHttpRequest, HttpSe
* Called after verifying the data of the message.
*
* @param message The message
* @param out The collection to add any produced messages to
*/
protected abstract void onData(ByteBufHolder message);
protected abstract void onData(ByteBufHolder message, Collection<Object> out) throws Throwable;

@Override
protected final void doSubscribe(Subscriber<? super T> subscriber) {
StreamedHttpMessage message = (StreamedHttpMessage) nettyHttpRequest.getNativeRequest();
message.subscribe(this);
}

@Override
protected final void doOnNext(ByteBufHolder message) {
public void add(ByteBufHolder message, Collection<Object> out) throws Throwable {
long receivedLength = this.receivedLength.addAndGet(message.content().readableBytes());

ReferenceCountUtil.touch(message);
Expand All @@ -76,7 +68,7 @@ protected final void doOnNext(ByteBufHolder message) {
} else if (receivedLength > requestMaxSize) {
fireExceedsLength(receivedLength, requestMaxSize, message);
} else {
onData(message);
onData(message, out);
}
}

Expand All @@ -86,11 +78,7 @@ protected final void doOnNext(ByteBufHolder message) {
* @param message The message to release
*/
protected void fireExceedsLength(long receivedLength, long expected, ByteBufHolder message) {
try {
onError(new ContentLengthExceededException(expected, receivedLength));
} finally {
ReferenceCountUtil.safeRelease(message);
parentSubscription.cancel();
}
ReferenceCountUtil.safeRelease(message);
throw new ContentLengthExceededException(expected, receivedLength);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright 2017-2022 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.http.server.netty;

import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.io.buffer.ReferenceCounted;
import io.micronaut.http.server.netty.multipart.NettyCompletedFileUpload;
import io.micronaut.web.router.RouteMatch;
import io.netty.buffer.ByteBufHolder;
import io.netty.util.ReferenceCountUtil;

/**
* This class consumes objects produced by a {@link HttpContentProcessor}. Normally it just adds
* the data to the {@link NettyHttpRequest}. For multipart data, there is additional logic in
* {@link FormRouteCompleter} that also dynamically binds parameters, though usually this is done
* by the {@link io.micronaut.http.server.binding.RequestArgumentSatisfier}.
*
* @since 4.0.0
* @author Jonas Konrad
*/
@Internal
class BaseRouteCompleter {
final NettyHttpRequest<?> request;
volatile boolean needsInput = true;
/**
* Optional runnable that may be called from other threads (i.e. downstream subscribers) to
* notify that {@link #needsInput} may have changed.
*/
@Nullable
volatile Runnable checkDemand;
RouteMatch<?> routeMatch;
boolean execute = false;

public BaseRouteCompleter(NettyHttpRequest<?> request, RouteMatch<?> routeMatch) {
this.request = request;
this.routeMatch = routeMatch;
}

final void add(Object message) throws Throwable {
try {
if (request.destroyed) {
// we don't want this message anymore
ReferenceCountUtil.release(message);
return;
yawkat marked this conversation as resolved.
Show resolved Hide resolved
}

if (message instanceof ByteBufHolder bbh) {
addHolder(bbh);
} else {
((NettyHttpRequest) request).setBody(message);
needsInput = true;
}

// now, a pseudo try-finally with addSuppressed.
} catch (Throwable t) {
try {
ReferenceCountUtil.release(message);
} catch (Throwable u) {
t.addSuppressed(u);
}
throw t;
}

// the upstream processor gives us ownership of the message, so we need to release it.
ReferenceCountUtil.release(message);
}

protected void addHolder(ByteBufHolder holder) {
request.addContent(holder);
needsInput = true;
}

void completeSuccess() {
execute = true;
}

void completeFailure(Throwable failure) {
if (!execute) {
// discard parameters that have already been bound
for (Object toDiscard : routeMatch.getVariableValues().values()) {
if (toDiscard instanceof ReferenceCounted rc) {
rc.release();
}
if (toDiscard instanceof io.netty.util.ReferenceCounted rc) {
rc.release();
}
if (toDiscard instanceof NettyCompletedFileUpload fu) {
fu.discard();
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
package io.micronaut.http.server.netty;

import io.micronaut.core.annotation.Internal;
import io.micronaut.core.async.processor.SingleThreadedBufferingProcessor;
import io.micronaut.core.async.subscriber.SingleThreadedBufferingSubscriber;
import io.micronaut.http.exceptions.ContentLengthExceededException;
import io.micronaut.http.netty.stream.StreamedHttpMessage;
import io.micronaut.http.server.HttpServerConfiguration;
Expand All @@ -26,8 +24,8 @@
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.multipart.HttpData;
import io.netty.util.ReferenceCountUtil;
import org.reactivestreams.Subscriber;

import java.util.Collection;
import java.util.concurrent.atomic.AtomicLong;

/**
Expand All @@ -37,9 +35,9 @@
* @since 1.0
*/
@Internal
public class DefaultHttpContentProcessor extends SingleThreadedBufferingProcessor<ByteBufHolder, ByteBufHolder> implements HttpContentProcessor<ByteBufHolder> {
public class DefaultHttpContentProcessor implements HttpContentProcessor {

protected final NettyHttpRequest nettyHttpRequest;
protected final NettyHttpRequest<?> nettyHttpRequest;
protected final ChannelHandlerContext ctx;
protected final HttpServerConfiguration configuration;
protected final long advertisedLength;
Expand All @@ -65,23 +63,15 @@ public DefaultHttpContentProcessor(NettyHttpRequest<?> nettyHttpRequest, HttpSer
}

@Override
public final void subscribe(Subscriber<? super ByteBufHolder> downstreamSubscriber) {
super.subscribe(downstreamSubscriber);
//ensures the subscriber is present before subscribing to the message
StreamedHttpMessage message = (StreamedHttpMessage) nettyHttpRequest.getNativeRequest();
message.subscribe(this);
}

@Override
protected void onUpstreamMessage(ByteBufHolder message) {
public void add(ByteBufHolder message, Collection<Object> out) {
long receivedLength = this.receivedLength.addAndGet(resolveLength(message));

if (advertisedLength > requestMaxSize) {
fireExceedsLength(advertisedLength, requestMaxSize, message);
} else if (receivedLength > requestMaxSize) {
fireExceedsLength(receivedLength, requestMaxSize, message);
} else {
publishVerifiedContent(message);
out.add(message);
}
}

Expand All @@ -94,16 +84,7 @@ private long resolveLength(ByteBufHolder message) {
}

private void fireExceedsLength(long receivedLength, long expected, ByteBufHolder message) {
upstreamState = SingleThreadedBufferingSubscriber.BackPressureState.DONE;
upstreamSubscription.cancel();
upstreamBuffer.clear();
currentDownstreamSubscriber().ifPresent(subscriber ->
subscriber.onError(new ContentLengthExceededException(expected, receivedLength))
);
ReferenceCountUtil.safeRelease(message);
}

private void publishVerifiedContent(ByteBufHolder message) {
currentDownstreamSubscriber().ifPresent(subscriber -> subscriber.onNext(message));
throw new ContentLengthExceededException(expected, receivedLength);
}
}