Skip to content

Commit

Permalink
Close the session within resteasy boundaries (#15193)
Browse files Browse the repository at this point in the history
Closes #15192
  • Loading branch information
pedroigor committed Nov 1, 2022
1 parent 1711782 commit f698594
Show file tree
Hide file tree
Showing 10 changed files with 240 additions and 89 deletions.
Expand Up @@ -64,6 +64,7 @@
import io.quarkus.deployment.annotations.Consume;
import io.quarkus.deployment.builditem.BootstrapConfigSetupCompleteBuildItem;
import io.quarkus.deployment.builditem.CombinedIndexBuildItem;
import io.quarkus.deployment.builditem.ExecutorBuildItem;
import io.quarkus.deployment.builditem.GeneratedResourceBuildItem;
import io.quarkus.deployment.builditem.HotDeploymentWatchedFileBuildItem;
import io.quarkus.deployment.builditem.IndexDependencyBuildItem;
Expand Down Expand Up @@ -515,9 +516,10 @@ void indexLegacyJpaStore(BuildProducer<IndexDependencyBuildItem> indexDependency
indexDependencyBuildItemBuildProducer.produce(new IndexDependencyBuildItem("org.keycloak", "keycloak-model-jpa"));
}

@Record(ExecutionTime.STATIC_INIT)
@Record(ExecutionTime.RUNTIME_INIT)
@BuildStep
void initializeFilter(BuildProducer<FilterBuildItem> filters, KeycloakRecorder recorder, HttpBuildTimeConfig httpBuildConfig) {
void initializeFilter(BuildProducer<FilterBuildItem> filters, KeycloakRecorder recorder, HttpBuildTimeConfig httpBuildConfig,
ExecutorBuildItem executor) {
String rootPath = httpBuildConfig.rootPath;
List<String> ignoredPaths = new ArrayList<>();

Expand All @@ -529,7 +531,7 @@ void initializeFilter(BuildProducer<FilterBuildItem> filters, KeycloakRecorder r
ignoredPaths.add(rootPath + "metrics");
}

filters.produce(new FilterBuildItem(recorder.createRequestFilter(ignoredPaths),FilterBuildItem.AUTHORIZATION - 10));
filters.produce(new FilterBuildItem(recorder.createRequestFilter(ignoredPaths, executor.getExecutorProxy()),FilterBuildItem.AUTHORIZATION - 10));
}

@BuildStep
Expand Down
Expand Up @@ -20,6 +20,7 @@
import java.lang.annotation.Annotation;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.function.BiConsumer;
import java.util.function.Predicate;

Expand Down Expand Up @@ -143,8 +144,8 @@ public void contributeRuntimeProperties(BiConsumer<String, Object> propertyColle
};
}

public QuarkusRequestFilter createRequestFilter(List<String> ignoredPaths) {
return new QuarkusRequestFilter(createIgnoredHttpPathsPredicate(ignoredPaths));
public QuarkusRequestFilter createRequestFilter(List<String> ignoredPaths, ExecutorService executor) {
return new QuarkusRequestFilter(createIgnoredHttpPathsPredicate(ignoredPaths), executor);
}

private Predicate<RoutingContext> createIgnoredHttpPathsPredicate(List<String> ignoredPaths) {
Expand Down
@@ -0,0 +1,55 @@
/*
* Copyright 2022 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.keycloak.quarkus.runtime.integration.jaxrs;

import java.io.IOException;
import java.util.stream.Stream;
import javax.annotation.Priority;
import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.container.ContainerResponseContext;
import javax.ws.rs.container.ContainerResponseFilter;
import javax.ws.rs.container.PreMatching;
import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.ext.Provider;
import org.keycloak.common.util.Resteasy;
import org.keycloak.models.KeycloakSession;
import org.keycloak.quarkus.runtime.transaction.TransactionalSessionHandler;

@Provider
@PreMatching
@Priority(1)
public class TransactionalResponseFilter implements ContainerResponseFilter, TransactionalSessionHandler {

@Override
public void filter(ContainerRequestContext requestContext, ContainerResponseContext responseContext)
throws IOException {
Object entity = responseContext.getEntity();

if (shouldDelaySessionClose(entity)) {
return;
}

close(Resteasy.getContextData(KeycloakSession.class));
}

private static boolean shouldDelaySessionClose(Object entity) {
// do not close the session if the response entity is a stream
// that is because we need the session open until the stream is transformed as it might require access to the database
return entity instanceof Stream || entity instanceof StreamingOutput;
}
}
@@ -0,0 +1,48 @@
/*
* Copyright 2022 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.keycloak.quarkus.runtime.integration.jaxrs;

import java.io.IOException;
import javax.annotation.Priority;
import javax.ws.rs.ConstrainedTo;
import javax.ws.rs.RuntimeType;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.ext.Provider;
import javax.ws.rs.ext.WriterInterceptor;
import javax.ws.rs.ext.WriterInterceptorContext;
import org.keycloak.common.util.Resteasy;
import org.keycloak.models.KeycloakSession;
import org.keycloak.quarkus.runtime.transaction.TransactionalSessionHandler;

@Provider
@ConstrainedTo(RuntimeType.SERVER)
@Priority(10000)
public class TransactionalResponseInterceptor implements WriterInterceptor, TransactionalSessionHandler {
@Override
public void aroundWriteTo(WriterInterceptorContext context) throws IOException, WebApplicationException {
KeycloakSession session = Resteasy.getContextData(KeycloakSession.class);

try {
context.proceed();
} finally {
// make sure response is closed after writing to the response output stream
// this is needed in order to support streams from endpoints as they need access to underlying resources like database
close(session);
}
}
}
Expand Up @@ -17,47 +17,46 @@

package org.keycloak.quarkus.runtime.integration.web;

import static org.keycloak.services.resources.KeycloakApplication.getSessionFactory;

import java.util.concurrent.ExecutorService;
import java.util.function.Predicate;
import org.keycloak.common.ClientConnection;
import org.keycloak.common.util.Resteasy;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.models.KeycloakTransactionManager;
import org.keycloak.quarkus.runtime.transaction.TransactionalSessionHandler;

import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.ext.web.RoutingContext;

/**
* <p>This filter is responsible for managing the request lifecycle as well as setting up the necessary context to process incoming
* requests.
* requests. We need this filter running on the top of the chain in order to push contextual objects before executing Resteasy. It is not
* possible to use a {@link javax.ws.rs.container.ContainerRequestFilter} for this purpose because some mechanisms like error handling
* will not be able to access these contextual objects.
*
* <p>The filter itself runs in a event loop and should delegate to worker threads any blocking code (for now, all requests are handled
* <p>The filter itself runs in an event loop and should delegate to worker threads any blocking code (for now, all requests are handled
* as blocking).
*
* <p>Note that this filter is only responsible to close the {@link KeycloakSession} if not already closed when running Resteasy code. The reason is that closing it should be done at the
* Resteasy level so that we don't block event loop threads even if they execute in a worker thread. Vert.x handlers and their
* callbacks are not designed to run blocking code. If the session is eventually closed here is because Resteasy was not executed.
*
* @see org.keycloak.quarkus.runtime.integration.jaxrs.TransactionalResponseInterceptor
* @see org.keycloak.quarkus.runtime.integration.jaxrs.TransactionalResponseFilter
*/
public class QuarkusRequestFilter implements Handler<RoutingContext> {
public class QuarkusRequestFilter implements Handler<RoutingContext>, TransactionalSessionHandler {

private static final Handler<AsyncResult<Object>> EMPTY_RESULT = result -> {
// we don't really care about the result because any exception thrown should be handled by the parent class
};
private final ExecutorService executor;

private Predicate<RoutingContext> contextFilter;

public QuarkusRequestFilter() {
this(null);
this(null, null);
}

public QuarkusRequestFilter(Predicate<RoutingContext> contextFilter) {
public QuarkusRequestFilter(Predicate<RoutingContext> contextFilter, ExecutorService executor) {
this.contextFilter = contextFilter;
this.executor = executor;
}

@Override
Expand All @@ -68,89 +67,43 @@ public void handle(RoutingContext context) {
}
// our code should always be run as blocking until we don't provide a better support for running non-blocking code
// in the event loop
context.vertx().executeBlocking(createBlockingHandler(context), false, EMPTY_RESULT);
executor.execute(createBlockingHandler(context));
}

private boolean ignoreContext(RoutingContext context) {
return contextFilter != null && contextFilter.test(context);
}

private Handler<Promise<Object>> createBlockingHandler(RoutingContext context) {
return promise -> {
KeycloakSessionFactory sessionFactory = getSessionFactory();
KeycloakSession session = sessionFactory.create();

configureContextualData(context, createClientConnection(context.request()), session);
configureEndHandler(context, session);

KeycloakTransactionManager tx = session.getTransactionManager();
private Runnable createBlockingHandler(RoutingContext context) {
return () -> {
KeycloakSession session = configureContextualData(context);

try {
tx.begin();
context.next();
promise.tryComplete();
} catch (Throwable cause) {
promise.fail(cause);
// re-throw so that the any exception is handled from parent
throw new RuntimeException(cause);
} finally {
if (!context.response().headWritten()) {
// make sure the session is closed in case the handler is not called
// it might happen that, for whatever reason, downstream handlers do not end the response or
// no data was written to the response
close(session);
}
}
};
}

/**
* Creates a handler to close the {@link KeycloakSession} before the response is written to response but after Resteasy
* is done with processing its output.
*/
private void configureEndHandler(RoutingContext context, KeycloakSession session) {
context.addHeadersEndHandler(event -> {
try {
// force closing the session if not already closed
// under some circumstances resteasy might not be invoked like when no route is found for a particular path
// in this case context is set with status code 404, and we need to close the session
close(session);
} catch (Throwable cause) {
unexpectedErrorResponse(context.response());
}
});
};
}

private void unexpectedErrorResponse(HttpServerResponse response) {
response.headers().clear();
response.putHeader(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.TEXT_PLAIN);
response.putHeader(HttpHeaderNames.CONTENT_LENGTH, "0");
response.setStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.code());
response.putHeader(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
// writes an empty buffer to replace any data previously written
response.write(Buffer.buffer(""));
}
private KeycloakSession configureContextualData(RoutingContext context) {
KeycloakSession session = create();

private void configureContextualData(RoutingContext context, ClientConnection connection, KeycloakSession session) {
Resteasy.pushContext(ClientConnection.class, connection);
Resteasy.pushContext(KeycloakSession.class, session);
// quarkus-resteasy changed and clears the context map before dispatching
// need to push keycloak contextual objects into the routing context for retrieving it later
context.put(KeycloakSession.class.getName(), session);
context.put(ClientConnection.class.getName(), connection);
}

protected void close(KeycloakSession session) {
KeycloakTransactionManager tx = session.getTransactionManager();
ClientConnection connection = createClientConnection(context.request());

try {
if (tx.isActive()) {
if (tx.getRollbackOnly()) {
tx.rollback();
} else {
tx.commit();
}
}
} finally {
session.close();
}
Resteasy.pushContext(ClientConnection.class, connection);
context.put(ClientConnection.class.getName(), connection);

return session;
}

private ClientConnection createClientConnection(HttpServerRequest request) {
Expand Down
@@ -0,0 +1,71 @@
/*
* Copyright 2022 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.keycloak.quarkus.runtime.transaction;

import static org.keycloak.services.resources.KeycloakApplication.getSessionFactory;

import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.models.KeycloakTransactionManager;
import org.keycloak.services.DefaultKeycloakSession;

/**
* <p>A {@link TransactionalSessionHandler} is responsible for managing transaction sessions and its lifecycle. Its subtypes
* are usually related to components available from the underlying stack that runs on top of the request processing chain
* as well as at the end in order to create transaction sessions and close them accordingly, respectively.
*/
public interface TransactionalSessionHandler {

/**
* Creates a transactional {@link KeycloakSession}.
*
* @return a transactional keycloak session
*/
default KeycloakSession create() {
KeycloakSessionFactory sessionFactory = getSessionFactory();
KeycloakSession session = sessionFactory.create();
KeycloakTransactionManager tx = session.getTransactionManager();
tx.begin();
return session;
}

/**
* Closes a transactional {@link KeycloakSession}.
*
* @param session a transactional session
*/
default void close(KeycloakSession session) {
if (DefaultKeycloakSession.class.cast(session).isClosed()) {
return;
}

KeycloakTransactionManager tx = session.getTransactionManager();

try {
if (tx.isActive()) {
if (tx.getRollbackOnly()) {
tx.rollback();
} else {
tx.commit();
}
}
} finally {
session.close();
}
}
}

0 comments on commit f698594

Please sign in to comment.