Skip to content

Commit

Permalink
External Infinispan as cache - Part 2
Browse files Browse the repository at this point in the history
Includes a new implementation for the providers:

* StickySessionEncoderProviderFactory
* LoadBalancerCheckProviderFactory
* SingleUseObjectProviderFactory

Closes #28648

Signed-off-by: Pedro Ruivo <pruivo@redhat.com>
  • Loading branch information
pruivo committed Apr 23, 2024
1 parent 0f2cfe7 commit c42712c
Show file tree
Hide file tree
Showing 37 changed files with 845 additions and 155 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,19 @@
import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
import org.infinispan.persistence.remote.RemoteStore;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.Transport;
import org.jboss.logging.Logger;
import org.keycloak.Config;
import org.keycloak.cluster.ClusterProvider;
import org.keycloak.cluster.ClusterProviderFactory;
import org.keycloak.common.Profile;
import org.keycloak.common.util.Retry;
import org.keycloak.common.util.Time;
import org.keycloak.connections.infinispan.DefaultInfinispanConnectionProviderFactory;
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
import org.keycloak.connections.infinispan.InfinispanUtil;
import org.keycloak.connections.infinispan.TopologyInfo;
import org.keycloak.infinispan.util.InfinispanUtils;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.connections.infinispan.InfinispanUtil;

import java.io.Serializable;
import java.util.Collection;
Expand All @@ -58,8 +57,6 @@
*/
public class InfinispanClusterProviderFactory implements ClusterProviderFactory {

public static final String PROVIDER_ID = "infinispan";

protected static final Logger logger = Logger.getLogger(InfinispanClusterProviderFactory.class);

// Infinispan cache
Expand All @@ -73,7 +70,7 @@ public class InfinispanClusterProviderFactory implements ClusterProviderFactory
// Just to extract notifications related stuff to separate class
private InfinispanNotificationsManager notificationsManager;

private ExecutorService localExecutor = Executors.newCachedThreadPool(r -> {
private final ExecutorService localExecutor = Executors.newCachedThreadPool(r -> {
Thread thread = Executors.defaultThreadFactory().newThread(r);
thread.setName(this.getClass().getName() + "-" + thread.getName());
return thread;
Expand Down Expand Up @@ -189,30 +186,29 @@ public void close() {

@Override
public String getId() {
return PROVIDER_ID;
return InfinispanUtils.EMBEDDED_PROVIDER_ID;
}

@Override
public boolean isSupported(Config.Scope config) {
return !Profile.isFeatureEnabled(Profile.Feature.MULTI_SITE) || !Profile.isFeatureEnabled(Profile.Feature.REMOTE_CACHE);
return InfinispanUtils.isEmbeddedInfinispan();
}

@Listener
public class ViewChangeListener {

@ViewChanged
public void viewChanged(ViewChangedEvent event) {
final Set<String> removedNodesAddresses = convertAddresses(event.getOldMembers());
final Set<String> newAddresses = convertAddresses(event.getNewMembers());
Set<String> removedNodesAddresses = convertAddresses(event.getOldMembers());
Set<String> newAddresses = convertAddresses(event.getNewMembers());

// Use separate thread to avoid potential deadlock
localExecutor.execute(() -> {
try {
EmbeddedCacheManager cacheManager = workCache.getCacheManager();
Transport transport = cacheManager.getTransport();

// Coordinator makes sure that entries for outdated nodes are cleaned up
if (transport != null && transport.isCoordinator()) {
if (cacheManager.isCoordinator()) {

removedNodesAddresses.removeAll(newAddresses);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
import org.keycloak.cluster.ClusterProviderFactory;
import org.keycloak.cluster.infinispan.InfinispanClusterProvider;
import org.keycloak.cluster.infinispan.LockEntry;
import org.keycloak.common.Profile;
import org.keycloak.common.util.Retry;
import org.keycloak.common.util.Time;
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
import org.keycloak.connections.infinispan.TopologyInfo;
import org.keycloak.infinispan.util.InfinispanUtils;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;

Expand All @@ -26,13 +26,12 @@

public class RemoteInfinispanClusterProviderFactory implements ClusterProviderFactory {

public static final String PROVIDER_ID = "remote-infinispan";
private static final Logger logger = Logger.getLogger(MethodHandles.lookup().lookupClass());

private RemoteCache<String, LockEntry> workCache;
private int clusterStartupTime;
private RemoteInfinispanNotificationManager notificationManager;
private Executor executor;
private volatile RemoteCache<String, LockEntry> workCache;
private volatile int clusterStartupTime;
private volatile RemoteInfinispanNotificationManager notificationManager;
private volatile Executor executor;

@Override
public ClusterProvider create(KeycloakSession session) {
Expand Down Expand Up @@ -75,13 +74,12 @@ public synchronized void close() {

@Override
public String getId() {
return PROVIDER_ID;
return InfinispanUtils.REMOTE_PROVIDER_ID;
}

@SuppressWarnings("deprecation")
@Override
public boolean isSupported() {
return Profile.isFeatureEnabled(Profile.Feature.MULTI_SITE) && Profile.isFeatureEnabled(Profile.Feature.REMOTE_CACHE);
public boolean isSupported(Config.Scope config) {
return InfinispanUtils.isRemoteInfinispan();
}

private static TopologyInfo getTopologyInfo(KeycloakSessionFactory factory) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@

import org.infinispan.Cache;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.factories.KnownComponentNames;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.util.concurrent.BlockingManager;

import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;

/**
* @author <a href="mailto:sthorger@redhat.com">Stian Thorgersen</a>
Expand Down Expand Up @@ -59,6 +61,11 @@ public Executor getExecutor(String name) {
return cacheManager.getGlobalComponentRegistry().getComponent(BlockingManager.class).asExecutor(name);
}

@Override
public ScheduledExecutorService getScheduledExecutor() {
return cacheManager.getGlobalComponentRegistry().getComponent(ScheduledExecutorService.class, KnownComponentNames.TIMEOUT_SCHEDULE_EXECUTOR);
}

@Override
public void close() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
import org.keycloak.cluster.ClusterProvider;
import org.keycloak.cluster.ManagedCacheManagerProvider;
import org.keycloak.cluster.infinispan.KeycloakHotRodMarshallerFactory;
import org.keycloak.common.Profile;
import org.keycloak.connections.infinispan.remote.RemoteInfinispanConnectionProvider;
import org.keycloak.infinispan.util.InfinispanUtils;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.models.cache.infinispan.ClearCacheEvent;
Expand All @@ -50,6 +50,7 @@
import org.keycloak.provider.InvalidationHandler.ObjectType;
import org.keycloak.provider.ProviderEvent;

import java.util.Arrays;
import java.util.Iterator;
import java.util.ServiceLoader;
import java.util.concurrent.TimeUnit;
Expand All @@ -64,6 +65,7 @@
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.AUTHORIZATION_REVISIONS_CACHE_DEFAULT_MAX;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.AUTHORIZATION_REVISIONS_CACHE_NAME;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.CLIENT_SESSION_CACHE_NAME;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.CLUSTERED_CACHE_NAMES;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.LOGIN_FAILURE_CACHE_NAME;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.OFFLINE_CLIENT_SESSION_CACHE_NAME;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.OFFLINE_USER_SESSION_CACHE_NAME;
Expand All @@ -75,7 +77,6 @@
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.USER_REVISIONS_CACHE_NAME;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.USER_SESSION_CACHE_NAME;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.WORK_CACHE_NAME;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.DISTRIBUTED_REPLICATED_CACHE_NAMES;
import static org.keycloak.connections.infinispan.InfinispanUtil.configureTransport;
import static org.keycloak.connections.infinispan.InfinispanUtil.createCacheConfigurationBuilder;
import static org.keycloak.connections.infinispan.InfinispanUtil.getActionTokenCacheConfig;
Expand Down Expand Up @@ -107,11 +108,10 @@ public class DefaultInfinispanConnectionProviderFactory implements InfinispanCon
public InfinispanConnectionProvider create(KeycloakSession session) {
lazyInit();

if (Profile.isFeatureEnabled(Profile.Feature.MULTI_SITE) && Profile.isFeatureEnabled(Profile.Feature.REMOTE_CACHE)) {
return new RemoteInfinispanConnectionProvider(cacheManager, remoteCacheManager, topologyInfo);
}
return InfinispanUtils.isRemoteInfinispan() ?
new RemoteInfinispanConnectionProvider(cacheManager, remoteCacheManager, topologyInfo) :
new DefaultInfinispanConnectionProvider(cacheManager, remoteCacheProvider, topologyInfo);

return new DefaultInfinispanConnectionProvider(cacheManager, remoteCacheProvider, topologyInfo);
}

/*
Expand Down Expand Up @@ -202,7 +202,7 @@ protected void lazyInit() {
}

managedCacheManager = provider.getEmbeddedCacheManager(config);
if (Profile.isFeatureEnabled(Profile.Feature.MULTI_SITE) && Profile.isFeatureEnabled(Profile.Feature.REMOTE_CACHE)) {
if (InfinispanUtils.isRemoteInfinispan()) {
rcm = provider.getRemoteCacheManager(config);
}
}
Expand All @@ -214,7 +214,7 @@ protected void lazyInit() {
throw new RuntimeException("No " + ManagedCacheManagerProvider.class.getName() + " found. If running in embedded mode set the [embedded] property to this provider.");
}
localCacheManager = initEmbedded();
if (Profile.isFeatureEnabled(Profile.Feature.MULTI_SITE) && Profile.isFeatureEnabled(Profile.Feature.REMOTE_CACHE)) {
if (InfinispanUtils.isRemoteInfinispan()) {
rcm = initRemote();
}
} else {
Expand Down Expand Up @@ -246,7 +246,7 @@ private RemoteCacheManager initRemote() {
RemoteCacheManager remoteCacheManager = new RemoteCacheManager(builder.build());

// establish connection to all caches
DISTRIBUTED_REPLICATED_CACHE_NAMES.forEach(remoteCacheManager::getCache);
Arrays.stream(CLUSTERED_CACHE_NAMES).forEach(remoteCacheManager::getCache);
return remoteCacheManager;

}
Expand Down Expand Up @@ -348,17 +348,16 @@ protected EmbeddedCacheManager initEmbedded() {
defineClusteredCache(cacheManager, OFFLINE_CLIENT_SESSION_CACHE_NAME, clusteredConfiguration);
defineClusteredCache(cacheManager, LOGIN_FAILURE_CACHE_NAME, clusteredConfiguration);

var actionTokenBuilder = getActionTokenCacheConfig();
if (clustered) {
actionTokenBuilder.simpleCache(false);
actionTokenBuilder.clustering().cacheMode(async ? CacheMode.REPL_ASYNC : CacheMode.REPL_SYNC);
}
defineClusteredCache(cacheManager, ACTION_TOKEN_CACHE, actionTokenBuilder.build());


if (!Profile.isFeatureEnabled(Profile.Feature.MULTI_SITE) || !Profile.isFeatureEnabled(Profile.Feature.REMOTE_CACHE)) {
if (InfinispanUtils.isEmbeddedInfinispan()) {
defineClusteredCache(cacheManager, AUTHENTICATION_SESSIONS_CACHE_NAME, clusteredConfiguration);

var actionTokenBuilder = getActionTokenCacheConfig();
if (clustered) {
actionTokenBuilder.simpleCache(false);
actionTokenBuilder.clustering().cacheMode(async ? CacheMode.REPL_ASYNC : CacheMode.REPL_SYNC);
}
defineClusteredCache(cacheManager, ACTION_TOKEN_CACHE, actionTokenBuilder.build());

var workBuilder = createCacheConfigurationBuilder()
.expiration().enableReaper().wakeUpInterval(15, TimeUnit.SECONDS);
if (clustered) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.provider.Provider;

import java.util.List;
import java.util.Arrays;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Stream;

/**
* @author <a href="mailto:sthorger@redhat.com">Stian Thorgersen</a>
Expand Down Expand Up @@ -70,34 +72,30 @@ public interface InfinispanConnectionProvider extends Provider {
// Constant used as the prefix of the current node if "jboss.node.name" is not configured
String NODE_PREFIX = "node_";

String[] ALL_CACHES_NAME = {
// list of cache name for local caches (not replicated)
String[] LOCAL_CACHE_NAMES = {
REALM_CACHE_NAME,
REALM_REVISIONS_CACHE_NAME,
USER_CACHE_NAME,
USER_REVISIONS_CACHE_NAME,
USER_SESSION_CACHE_NAME,
CLIENT_SESSION_CACHE_NAME,
OFFLINE_USER_SESSION_CACHE_NAME,
OFFLINE_CLIENT_SESSION_CACHE_NAME,
LOGIN_FAILURE_CACHE_NAME,
AUTHENTICATION_SESSIONS_CACHE_NAME,
WORK_CACHE_NAME,
AUTHORIZATION_CACHE_NAME,
AUTHORIZATION_REVISIONS_CACHE_NAME,
ACTION_TOKEN_CACHE,
KEYS_CACHE_NAME
};

// list of cache name which could be defined as distributed or replicated
public static List<String> DISTRIBUTED_REPLICATED_CACHE_NAMES = List.of(
String[] CLUSTERED_CACHE_NAMES = {
USER_SESSION_CACHE_NAME,
CLIENT_SESSION_CACHE_NAME,
OFFLINE_USER_SESSION_CACHE_NAME,
OFFLINE_CLIENT_SESSION_CACHE_NAME,
LOGIN_FAILURE_CACHE_NAME,
AUTHENTICATION_SESSIONS_CACHE_NAME,
ACTION_TOKEN_CACHE,
WORK_CACHE_NAME);
WORK_CACHE_NAME
};

String[] ALL_CACHES_NAME = Stream.concat(Arrays.stream(LOCAL_CACHE_NAMES), Arrays.stream(CLUSTERED_CACHE_NAMES)).toArray(String[]::new);

/**
*
Expand Down Expand Up @@ -141,6 +139,11 @@ default <K, V> Cache<K, V> getCache(String name) {
*/
Executor getExecutor(String name);

/**
* @return The Infinispan {@link ScheduledExecutorService}. Long or blocking operations must not be executed directly.
*/
ScheduledExecutorService getScheduledExecutor();

/**
* Syntactic sugar to get a {@link RemoteCache}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.keycloak.connections.infinispan;

import org.infinispan.persistence.manager.PersistenceManager;
import org.jboss.logging.Logger;
import org.keycloak.Config;
import org.keycloak.common.Profile;
Expand All @@ -26,15 +27,13 @@
import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.provider.EnvironmentDependentProviderFactory;

import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.ALL_CACHES_NAME;


public class InfinispanMultiSiteLoadBalancerCheckProviderFactory implements LoadBalancerCheckProviderFactory, EnvironmentDependentProviderFactory {

private LoadBalancerCheckProvider loadBalancerCheckProvider;
private static final LoadBalancerCheckProvider ALWAYS_HEALTHY = new LoadBalancerCheckProvider() {
@Override public boolean isDown() { return false; }
@Override public void close() {}
};
public static final LoadBalancerCheckProvider ALWAYS_HEALTHY = () -> false;
private static final Logger LOG = Logger.getLogger(InfinispanMultiSiteLoadBalancerCheckProviderFactory.class);

@Override
Expand All @@ -45,7 +44,7 @@ public LoadBalancerCheckProvider create(KeycloakSession session) {
LOG.warn("InfinispanConnectionProvider is not available. Load balancer check will be always healthy for Infinispan.");
loadBalancerCheckProvider = ALWAYS_HEALTHY;
} else {
loadBalancerCheckProvider = new InfinispanMultiSiteLoadBalancerCheckProvider(infinispanConnectionProvider);
loadBalancerCheckProvider = () -> isEmbeddedCachesDown(infinispanConnectionProvider);
}
}
return loadBalancerCheckProvider;
Expand Down Expand Up @@ -73,6 +72,32 @@ public String getId() {

@Override
public boolean isSupported(Config.Scope config) {
return Profile.isFeatureEnabled(Profile.Feature.MULTI_SITE);
return Profile.isFeatureEnabled(Profile.Feature.MULTI_SITE) && !Profile.isFeatureEnabled(Profile.Feature.REMOTE_CACHE);
}

private boolean isEmbeddedCachesDown(InfinispanConnectionProvider provider) {
return isAnyEmbeddedCachesDown(provider, ALL_CACHES_NAME, LOG);
}

public static boolean isAnyEmbeddedCachesDown(InfinispanConnectionProvider connectionProvider, String[] cacheNames, Logger logger) {
for (var name : cacheNames) {
var cache = connectionProvider.getCache(name, false);

// check if cache is started
if (cache == null || !cache.getStatus().allowInvocations()) {
logger.debugf("Cache '%s' is not started yet.", name);
return true; // no need to check other caches
}

var persistenceManager = cache.getAdvancedCache()
.getComponentRegistry()
.getComponent(PersistenceManager.class);

if (persistenceManager != null && !persistenceManager.isAvailable()) {
logger.debugf("Persistence for embedded cache '%s' is down.", name);
return true; // no need to check other caches
}
}
return false;
}
}

0 comments on commit c42712c

Please sign in to comment.