Skip to content

Commit

Permalink
Introduce a way to unwrap RequestContexts (line#4308)
Browse files Browse the repository at this point in the history
Motivation:

Armeria has some core logic which assumes that the concrete default implementations of `[Client|Service]RequestContext` are `Default[Client|Service]RequestContext`.

e.g. If the `ClientRequestContext` isn't an instance of `DefaultClientRequestContext`, then the response timeout may not be respected.
https://github.com/line/armeria/blob/5edaef039abfd047f152a63cde38b46094c8353f/core/src/main/java/com/linecorp/armeria/client/HttpResponseDecoder.java#L401-L407

This usage pattern mostly stems from the fact that `[Client|Service]RequestContext` are public APIs that users are exposed to a lot (especially in decorators). However, Armeria developers are skeptical of adding public methods that are either 1) very specific and used only in core logic 2) have the potential to break behavior if used incorrectly.

Up to now, we have added such methods to the default implementation and checked with `instanceof` when invoking such methods.

With the introduction of line#4232, we are now planning on introducing and extensively utilizing wrapper classes for `[Client|Service]RequestContext`. However, using wrapper classes means previous assumptions of `instanceof Default[Client|Service]RequestContext` will be invalid.

In this pull request, I propose that:

1) `RequestContext` now extends `Unwrappable`, allowing users to peel `RequestContext`s to the intended type.
2) We expose another interface `[Client]RequestContextExtension`, which contains "extension" methods for `ClientRequestContext`, `RequestContext`. In this interface, we can add methods that Armeria uses internally without worrying about public API exposure.
3) `Default[Client|Service]RequestContext` doesn't need to be exposed as a public API. We may simply expose the interface to users and hide the detailed implementation like we do for the `Http[Response|Request]` series.
    - Note: This move was handled in this PR mainly due to `CancellationScheduler`. While extracting `ClientRequestContextExtension`, `CancellationScheduler` which is an internal class was exposed as the return value of `#responseCancellationScheduler`. I had the choice of exposing `CancellationScheduler`, or hiding `DefaultClientRequestContext`. I chose the latter and also moved `DefaultServiceRequestContext` for consistency.

Modifications:

- `RequestContext` now extends `Unwrappable`. `RequestContextWrapper` now extends `AbstractUnwrappable`
- Convert usages of `instanceof DefaultClientRequestContext` to unwrap instead
    - e.g. `ctx instanceof DefaultClientRequestContext` -> `ctx.as(ClientRequestContextExtension.class)`
- `RequestContextExtension`, `ClientRequestContextExtension` has been added to an internal package
- `DefaultClientRequestContext`, `DefaultServiceRequestContext`, `NonWrappingRequestContext` have been moved to an internal package
    - `pathWithQuery` is moved to `ClientUtil` and made public
    - `ClientThreadLocalState` is moved to an internal package and made public
- Class hierarchy has been modified for `RequestContext`
    - Now there is an extra layer between `ClientRequestContext` and `DefaultClientRequestContext`:
        - `ClientRequestContext` -> `ClientRequestContextExtension` -> `DefaultClientRequestContext`
    - Now there is an extra layer between `RequestContext` and `NonWrappingRequestContext`
        - `RequestContext` -> `RequestContextExtension` -> `NonWrappingRequestContext`

Before:
<img width="300" alt="Screen Shot 2022-06-24 at 12 00 49 PM" src="https://user-images.githubusercontent.com/8510579/175453672-15dbdf09-4c2e-4afa-a186-a2147e97afa2.png">
After:
<img width="300" alt="Screen Shot 2022-06-24 at 12 01 15 PM" src="https://user-images.githubusercontent.com/8510579/175453689-3c00edde-7209-4d1c-9480-e776c58537e0.png">

Result:

- Developers may add more utility functions to `RequestContext` without worrying about public API exposure
- Armeria tries to unwrap `RequestContext` instead of simply checking the type.
  • Loading branch information
jrhee17 authored and heowc committed Sep 24, 2022
1 parent c1feec3 commit 58617fc
Show file tree
Hide file tree
Showing 35 changed files with 423 additions and 150 deletions.
Expand Up @@ -25,7 +25,6 @@
import org.slf4j.LoggerFactory;

import com.linecorp.armeria.client.ClientRequestContext;
import com.linecorp.armeria.client.DefaultClientRequestContext;
import com.linecorp.armeria.client.HttpClient;
import com.linecorp.armeria.client.SimpleDecoratingHttpClient;
import com.linecorp.armeria.common.HttpRequest;
Expand All @@ -34,6 +33,7 @@
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.brave.RequestContextCurrentTraceContext;
import com.linecorp.armeria.common.logging.ClientConnectionTimings;
import com.linecorp.armeria.internal.common.RequestContextExtension;
import com.linecorp.armeria.internal.common.brave.SpanTags;

import brave.Span;
Expand Down Expand Up @@ -121,10 +121,10 @@ public HttpResponse execute(ClientRequestContext ctx, HttpRequest req) throws Ex
req = req.withHeaders(newHeaders);
ctx.updateRequest(req);

if (currentTraceContext != null && !span.isNoop() && ctx instanceof DefaultClientRequestContext) {
final DefaultClientRequestContext defaultCtx = (DefaultClientRequestContext) ctx;
final RequestContextExtension ctxExtension = ctx.as(RequestContextExtension.class);
if (currentTraceContext != null && !span.isNoop() && ctxExtension != null) {
// Make the span the current span and run scope decorators when the ctx is pushed.
defaultCtx.hook(() -> currentTraceContext.newScope(span.context()));
ctxExtension.hook(() -> currentTraceContext.newScope(span.context()));
}

maybeAddTagsToSpan(ctx, braveReq, span);
Expand Down
Expand Up @@ -23,8 +23,8 @@
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.brave.RequestContextCurrentTraceContext;
import com.linecorp.armeria.internal.common.RequestContextExtension;
import com.linecorp.armeria.internal.common.brave.SpanTags;
import com.linecorp.armeria.server.DefaultServiceRequestContext;
import com.linecorp.armeria.server.HttpService;
import com.linecorp.armeria.server.ServiceRequestContext;
import com.linecorp.armeria.server.SimpleDecoratingHttpService;
Expand Down Expand Up @@ -100,12 +100,11 @@ public HttpResponse serve(ServiceRequestContext ctx, HttpRequest req) throws Exc
final HttpServerRequest braveReq = ServiceRequestContextAdapter.asHttpServerRequest(ctx);
final Span span = handler.handleReceive(braveReq);

if (currentTraceContext.scopeDecoratorAdded() && !span.isNoop() &&
ctx instanceof DefaultServiceRequestContext) {
final DefaultServiceRequestContext defaultCtx = (DefaultServiceRequestContext) ctx;
final RequestContextExtension ctxExtension = ctx.as(RequestContextExtension.class);
if (currentTraceContext.scopeDecoratorAdded() && !span.isNoop() && ctxExtension != null) {
// Run the scope decorators when the ctx is pushed to the thread local.
defaultCtx.hook(() -> currentTraceContext.decorateScope(span.context(),
SERVICE_REQUEST_DECORATING_SCOPE));
ctxExtension.hook(() -> currentTraceContext.decorateScope(span.context(),
SERVICE_REQUEST_DECORATING_SCOPE));
}

maybeAddTagsToSpan(ctx, braveReq, span);
Expand Down
Expand Up @@ -534,4 +534,14 @@ default void timeoutNow() {
@Override
@UnstableApi
ExchangeType exchangeType();

@Override
default ClientRequestContext unwrap() {
return (ClientRequestContext) RequestContext.super.unwrap();
}

@Override
default ClientRequestContext unwrapAll() {
return (ClientRequestContext) RequestContext.super.unwrapAll();
}
}
Expand Up @@ -33,6 +33,7 @@
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.logging.ClientConnectionTimings;
import com.linecorp.armeria.common.util.SystemInfo;
import com.linecorp.armeria.internal.client.DefaultClientRequestContext;
import com.linecorp.armeria.internal.common.CancellationScheduler;
import com.linecorp.armeria.internal.common.CancellationScheduler.CancellationTask;

Expand Down
Expand Up @@ -171,4 +171,9 @@ public CompletableFuture<Void> whenResponseTimingOut() {
public CompletableFuture<Void> whenResponseTimedOut() {
return delegate().whenResponseTimedOut();
}

@Override
public ClientRequestContext unwrapAll() {
return (ClientRequestContext) super.unwrapAll();
}
}
Expand Up @@ -32,6 +32,7 @@
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.util.SafeCloseable;
import com.linecorp.armeria.common.util.Unwrappable;
import com.linecorp.armeria.internal.client.ClientThreadLocalState;

/**
* Creates a new client that connects to a specified {@link URI}.
Expand Down
Expand Up @@ -16,14 +16,13 @@

package com.linecorp.armeria.client;

import static com.linecorp.armeria.internal.client.ClientUtil.pathWithQuery;
import static com.linecorp.armeria.internal.common.ArmeriaHttpUtil.concatPaths;
import static com.linecorp.armeria.internal.common.ArmeriaHttpUtil.isAbsoluteUri;
import static java.util.Objects.requireNonNull;

import java.net.URI;

import com.google.common.base.Strings;

import com.linecorp.armeria.client.endpoint.EndpointGroup;
import com.linecorp.armeria.common.ExchangeType;
import com.linecorp.armeria.common.HttpRequest;
Expand Down Expand Up @@ -150,14 +149,4 @@ public RestClient asRestClient() {
public HttpClient unwrap() {
return (HttpClient) super.unwrap();
}

static String pathWithQuery(URI uri, @Nullable String query) {
String path = uri.getRawPath();
if (Strings.isNullOrEmpty(path)) {
path = query == null ? "/" : "/?" + query;
} else if (query != null) {
path = path + '?' + query;
}
return path;
}
}
Expand Up @@ -35,6 +35,7 @@
import com.linecorp.armeria.common.stream.CancelledSubscriptionException;
import com.linecorp.armeria.common.stream.StreamWriter;
import com.linecorp.armeria.common.util.Exceptions;
import com.linecorp.armeria.internal.client.ClientRequestContextExtension;
import com.linecorp.armeria.internal.common.CancellationScheduler;
import com.linecorp.armeria.internal.common.CancellationScheduler.CancellationTask;
import com.linecorp.armeria.internal.common.InboundTrafficController;
Expand Down Expand Up @@ -354,9 +355,11 @@ private void cancelAction(@Nullable Throwable cause) {
private void cancelTimeoutOrLog(@Nullable Throwable cause, boolean cancel) {

CancellationScheduler responseCancellationScheduler = null;
if (ctx instanceof DefaultClientRequestContext) {
responseCancellationScheduler =
((DefaultClientRequestContext) ctx).responseCancellationScheduler();
if (ctx != null) {
final ClientRequestContextExtension ctxExtension = ctx.as(ClientRequestContextExtension.class);
if (ctxExtension != null) {
responseCancellationScheduler = ctxExtension.responseCancellationScheduler();
}
}

if (responseCancellationScheduler == null || !responseCancellationScheduler.isFinished()) {
Expand Down Expand Up @@ -398,9 +401,13 @@ private void cancelTimeoutOrLog(@Nullable Throwable cause, boolean cancel) {
}

void initTimeout() {
if (ctx instanceof DefaultClientRequestContext) {
if (ctx == null) {
return;
}
final ClientRequestContextExtension ctxExtension = ctx.as(ClientRequestContextExtension.class);
if (ctxExtension != null) {
final CancellationScheduler responseCancellationScheduler =
((DefaultClientRequestContext) ctx).responseCancellationScheduler();
ctxExtension.responseCancellationScheduler();
responseCancellationScheduler.init(
ctx.eventLoop(), newCancellationTask(),
TimeUnit.MILLISECONDS.toNanos(responseTimeoutMillis), /* server */ false);
Expand Down
Expand Up @@ -40,6 +40,7 @@
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.util.AbstractUnwrappable;
import com.linecorp.armeria.common.util.SystemInfo;
import com.linecorp.armeria.internal.client.DefaultClientRequestContext;

import io.micrometer.core.instrument.MeterRegistry;

Expand Down
Expand Up @@ -30,7 +30,6 @@
import org.slf4j.LoggerFactory;

import com.linecorp.armeria.client.ClientRequestContext;
import com.linecorp.armeria.client.DefaultClientRequestContext;
import com.linecorp.armeria.client.HttpClient;
import com.linecorp.armeria.client.ResponseTimeoutException;
import com.linecorp.armeria.client.endpoint.EndpointGroup;
Expand All @@ -49,6 +48,7 @@
import com.linecorp.armeria.common.stream.AbortedStreamException;
import com.linecorp.armeria.internal.client.AggregatedHttpRequestDuplicator;
import com.linecorp.armeria.internal.client.ClientPendingThrowableUtil;
import com.linecorp.armeria.internal.client.ClientRequestContextExtension;
import com.linecorp.armeria.internal.client.TruncatingHttpResponse;

import io.netty.handler.codec.DateFormatter;
Expand Down Expand Up @@ -309,14 +309,15 @@ private void doExecute0(ClientRequestContext ctx, HttpRequestDuplicator rootReqD

final HttpResponse response;
final EndpointGroup endpointGroup = derivedCtx.endpointGroup();
if (!initialAttempt && derivedCtx instanceof DefaultClientRequestContext &&
final ClientRequestContextExtension ctxExtension = derivedCtx.as(ClientRequestContextExtension.class);
if (!initialAttempt && ctxExtension != null &&
endpointGroup != null && derivedCtx.endpoint() == null) {
// clear the pending throwable to retry endpoint selection
ClientPendingThrowableUtil.removePendingThrowable(derivedCtx);
// if the endpoint hasn't been selected, try to initialize the ctx with a new endpoint/event loop
final DefaultClientRequestContext casted = (DefaultClientRequestContext) derivedCtx;
response = initContextAndExecuteWithFallback(unwrap(), casted, endpointGroup, HttpResponse::from,
(context, cause) -> HttpResponse.ofFailure(cause));
response = initContextAndExecuteWithFallback(
unwrap(), ctxExtension, endpointGroup, HttpResponse::from,
(context, cause) -> HttpResponse.ofFailure(cause));
} else {
response = executeWithFallback(unwrap(), derivedCtx,
(context, cause) -> HttpResponse.ofFailure(cause));
Expand Down
Expand Up @@ -23,7 +23,6 @@
import java.util.function.Function;

import com.linecorp.armeria.client.ClientRequestContext;
import com.linecorp.armeria.client.DefaultClientRequestContext;
import com.linecorp.armeria.client.ResponseTimeoutException;
import com.linecorp.armeria.client.RpcClient;
import com.linecorp.armeria.client.endpoint.EndpointGroup;
Expand All @@ -32,6 +31,7 @@
import com.linecorp.armeria.common.RpcRequest;
import com.linecorp.armeria.common.RpcResponse;
import com.linecorp.armeria.internal.client.ClientPendingThrowableUtil;
import com.linecorp.armeria.internal.client.ClientRequestContextExtension;
import com.linecorp.armeria.internal.common.util.StringUtil;

/**
Expand Down Expand Up @@ -172,14 +172,14 @@ private void doExecute0(ClientRequestContext ctx, RpcRequest req,

final RpcResponse res;

final ClientRequestContextExtension ctxExtension = derivedCtx.as(ClientRequestContextExtension.class);
final EndpointGroup endpointGroup = derivedCtx.endpointGroup();
if (!initialAttempt && derivedCtx instanceof DefaultClientRequestContext &&
if (!initialAttempt && ctxExtension != null &&
endpointGroup != null && derivedCtx.endpoint() == null) {
// clear the pending throwable to retry endpoint selection
ClientPendingThrowableUtil.removePendingThrowable(derivedCtx);
// if the endpoint hasn't been selected, try to initialize the ctx with a new endpoint/event loop
final DefaultClientRequestContext casted = (DefaultClientRequestContext) derivedCtx;
res = initContextAndExecuteWithFallback(unwrap(), casted, endpointGroup, RpcResponse::from,
res = initContextAndExecuteWithFallback(unwrap(), ctxExtension, endpointGroup, RpcResponse::from,
(context, cause) -> RpcResponse.ofFailure(cause));
} else {
res = executeWithFallback(unwrap(), derivedCtx,
Expand Down
Expand Up @@ -47,6 +47,7 @@
import com.linecorp.armeria.common.logging.RequestLogAccess;
import com.linecorp.armeria.common.logging.RequestLogBuilder;
import com.linecorp.armeria.common.util.SafeCloseable;
import com.linecorp.armeria.common.util.Unwrappable;
import com.linecorp.armeria.internal.common.JavaVersionSpecific;
import com.linecorp.armeria.internal.common.RequestContextUtil;
import com.linecorp.armeria.server.ServiceRequestContext;
Expand All @@ -61,7 +62,7 @@
* A server-side {@link Request} has a {@link ServiceRequestContext} and
* a client-side {@link Request} has a {@link ClientRequestContext}.
*/
public interface RequestContext {
public interface RequestContext extends Unwrappable {

/**
* Returns the context of the {@link Request} that is being handled in the current thread.
Expand Down Expand Up @@ -464,6 +465,16 @@ default ByteBufAllocator alloc() {
@MustBeClosed
SafeCloseable push();

@Override
default RequestContext unwrap() {
return (RequestContext) Unwrappable.super.unwrap();
}

@Override
default RequestContext unwrapAll() {
return (RequestContext) Unwrappable.super.unwrapAll();
}

/**
* Immediately run a given {@link Runnable} with this context.
*/
Expand Down

0 comments on commit 58617fc

Please sign in to comment.