Skip to content

Commit

Permalink
External Infinispan as cache - Part 3
Browse files Browse the repository at this point in the history
Implementation of UserLoginFailureProvider using remote caches only.

Closes #28754

Signed-off-by: Pedro Ruivo <pruivo@redhat.com>
  • Loading branch information
pruivo committed May 7, 2024
1 parent c42712c commit b3ba665
Show file tree
Hide file tree
Showing 29 changed files with 1,395 additions and 97 deletions.
41 changes: 41 additions & 0 deletions .github/workflows/ci.yml
Expand Up @@ -364,6 +364,46 @@ jobs:
name: ${{ steps.aurora-init.outputs.name }}
region: ${{ steps.aurora-init.outputs.region }}

external-infinispan-tests:
name: External Infinispan IT
needs: [ build, conditional ]
if: needs.conditional.outputs.ci-store == 'true'
runs-on: ubuntu-latest
timeout-minutes: 150
strategy:
matrix:
variant: [ "remote-cache,multi-site" ]
fail-fast: false
steps:
- uses: actions/checkout@v4

- id: integration-test-setup
name: Integration test setup
uses: ./.github/actions/integration-test-setup

- name: Run base tests without cache
run: |
TESTS=`testsuite/integration-arquillian/tests/base/testsuites/suite.sh persistent-sessions`
echo "Tests: $TESTS"
./mvnw test ${{ env.SUREFIRE_RETRY }} -Pauth-server-quarkus -Pinfinispan-server -Dauth.server.feature=${{ matrix.variant }} -Dauth.server.remote-cache.enabled=true -Dtest=$TESTS -pl testsuite/integration-arquillian/tests/base 2>&1 | misc/log/trimmer.sh
- name: Upload JVM Heapdumps
if: always()
uses: ./.github/actions/upload-heapdumps

- uses: ./.github/actions/upload-flaky-tests
name: Upload flaky tests
env:
GH_TOKEN: ${{ github.token }}
with:
job-name: Remote Infinispan IT

- name: Surefire reports
if: always()
uses: ./.github/actions/archive-surefire-reports
with:
job-id: remote-infinispan-integration-tests

store-integration-tests:
name: Store IT
needs: [build, conditional]
Expand Down Expand Up @@ -827,6 +867,7 @@ jobs:
- webauthn-integration-tests
- sssd-unit-tests
- migration-tests
- external-infinispan-tests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
Expand Down
Expand Up @@ -26,21 +26,16 @@
public class RemoteInfinispanClusterProvider implements ClusterProvider {

private static final Logger logger = Logger.getLogger(MethodHandles.lookup().lookupClass());
private final int clusterStartupTime;
private final RemoteCache<String, LockEntry> cache;
private final RemoteInfinispanNotificationManager notificationManager;
private final Executor executor;

public RemoteInfinispanClusterProvider(int clusterStartupTime, RemoteCache<String, LockEntry> cache, RemoteInfinispanNotificationManager notificationManager, Executor executor) {
this.clusterStartupTime = clusterStartupTime;
this.cache = Objects.requireNonNull(cache);
this.notificationManager = Objects.requireNonNull(notificationManager);
this.executor = Objects.requireNonNull(executor);
private final SharedData data;

public RemoteInfinispanClusterProvider(SharedData data) {
this.data = Objects.requireNonNull(data);
}


@Override
public int getClusterStartupTime() {
return clusterStartupTime;
return data.clusterStartupTime();
}

@Override
Expand Down Expand Up @@ -68,7 +63,7 @@ public <T> ExecutionResult<T> executeIfNotExecuted(String taskKey, int taskTimeo
@Override
public Future<Boolean> executeIfNotExecutedAsync(String taskKey, int taskTimeoutInSeconds, Callable task) {
TaskCallback newCallback = new TaskCallback();
TaskCallback callback = notificationManager.registerTaskCallback(TASK_KEY_PREFIX + taskKey, newCallback);
TaskCallback callback = data.notificationManager().registerTaskCallback(TASK_KEY_PREFIX + taskKey, newCallback);

// We successfully submitted our task
if (newCallback == callback) {
Expand All @@ -87,7 +82,7 @@ public Future<Boolean> executeIfNotExecutedAsync(String taskKey, int taskTimeout
return callback.isSuccess();
};

callback.setFuture(CompletableFuture.supplyAsync(wrappedTask, executor));
callback.setFuture(CompletableFuture.supplyAsync(wrappedTask, data.executor()));
} else {
logger.infof("Task already in progress on this cluster node. Will wait until it's finished");
}
Expand All @@ -97,12 +92,12 @@ public Future<Boolean> executeIfNotExecutedAsync(String taskKey, int taskTimeout

@Override
public void registerListener(String taskKey, ClusterListener task) {
notificationManager.registerListener(taskKey, task);
data.notificationManager().registerListener(taskKey, task);
}

@Override
public void notify(String taskKey, ClusterEvent event, boolean ignoreSender, DCNotify dcNotify) {
notificationManager.notify(taskKey, event, ignoreSender, dcNotify);
data.notificationManager().notify(taskKey, event, ignoreSender, dcNotify);
}

@Override
Expand All @@ -113,7 +108,7 @@ public void close() {
private boolean tryLock(String cacheKey, int taskTimeoutInSeconds) {
LockEntry myLock = createLockEntry();

LockEntry existingLock = putIfAbsentWithRetries(cache, cacheKey, myLock, taskTimeoutInSeconds);
LockEntry existingLock = putIfAbsentWithRetries(data.cache(), cacheKey, myLock, taskTimeoutInSeconds);
if (existingLock != null) {
if (logger.isTraceEnabled()) {
logger.tracef("Task %s in progress already by node %s. Ignoring task.", cacheKey, existingLock.getNode());
Expand All @@ -129,18 +124,25 @@ private boolean tryLock(String cacheKey, int taskTimeoutInSeconds) {

private LockEntry createLockEntry() {
LockEntry lock = new LockEntry();
lock.setNode(notificationManager.getMyNodeName());
lock.setNode(data.notificationManager().getMyNodeName());
lock.setTimestamp(Time.currentTime());
return lock;
}

private void removeFromCache(String cacheKey) {
// More attempts to send the message (it may fail if some node fails in the meantime)
Retry.executeWithBackoff((int iteration) -> {
cache.remove(cacheKey);
data.cache().remove(cacheKey);
if (logger.isTraceEnabled()) {
logger.tracef("Task %s removed from the cache", cacheKey);
}
}, 10, 10);
}

public interface SharedData {
int clusterStartupTime();
RemoteCache<String, LockEntry> cache();
RemoteInfinispanNotificationManager notificationManager();
Executor executor();
}
}
@@ -1,5 +1,10 @@
package org.keycloak.cluster.infinispan.remote;

import java.io.Serializable;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.exceptions.HotRodClientException;
import org.infinispan.commons.util.ByRef;
Expand All @@ -12,19 +17,13 @@
import org.keycloak.common.util.Retry;
import org.keycloak.common.util.Time;
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
import org.keycloak.connections.infinispan.TopologyInfo;
import org.keycloak.infinispan.util.InfinispanUtils;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;

import java.io.Serializable;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.WORK_CACHE_NAME;

public class RemoteInfinispanClusterProviderFactory implements ClusterProviderFactory {
public class RemoteInfinispanClusterProviderFactory implements ClusterProviderFactory, RemoteInfinispanClusterProvider.SharedData {

private static final Logger logger = Logger.getLogger(MethodHandles.lookup().lookupClass());

Expand All @@ -35,10 +34,14 @@ public class RemoteInfinispanClusterProviderFactory implements ClusterProviderFa

@Override
public ClusterProvider create(KeycloakSession session) {
if (workCache == null) {
// Keycloak does not ensure postInit() is invoked before create()
lazyInit(session);
}
assert workCache != null;
assert notificationManager != null;
assert executor != null;
return new RemoteInfinispanClusterProvider(clusterStartupTime, workCache, notificationManager, executor);
return new RemoteInfinispanClusterProvider(this);
}

@Override
Expand All @@ -47,16 +50,9 @@ public void init(Config.Scope config) {
}

@Override
public synchronized void postInit(KeycloakSessionFactory factory) {
public void postInit(KeycloakSessionFactory factory) {
try (var session = factory.create()) {
var ispnProvider = session.getProvider(InfinispanConnectionProvider.class);
executor = ispnProvider.getExecutor("cluster-provider");
workCache = ispnProvider.getRemoteCache(WORK_CACHE_NAME);
clusterStartupTime = initClusterStartupTime(ispnProvider.getRemoteCache(WORK_CACHE_NAME), (int) (factory.getServerStartupTimestamp() / 1000));
notificationManager = new RemoteInfinispanNotificationManager(executor, ispnProvider.getRemoteCache(WORK_CACHE_NAME), getTopologyInfo(factory));
notificationManager.addClientListener();

logger.debugf("Provider initialized. Cluster startup time: %s", Time.toDate(clusterStartupTime));
lazyInit(session);
}
}

Expand All @@ -82,18 +78,25 @@ public boolean isSupported(Config.Scope config) {
return InfinispanUtils.isRemoteInfinispan();
}

private static TopologyInfo getTopologyInfo(KeycloakSessionFactory factory) {
try (var session = factory.create()) {
return session.getProvider(InfinispanConnectionProvider.class).getTopologyInfo();
private synchronized void lazyInit(KeycloakSession session) {
if (workCache != null) {
return;
}
var provider = session.getProvider(InfinispanConnectionProvider.class);
executor = provider.getExecutor("cluster-provider");
clusterStartupTime = initClusterStartupTime(provider.getRemoteCache(WORK_CACHE_NAME), (int) (session.getKeycloakSessionFactory().getServerStartupTimestamp() / 1000));
notificationManager = new RemoteInfinispanNotificationManager(executor, provider.getRemoteCache(WORK_CACHE_NAME), provider.getTopologyInfo());
notificationManager.addClientListener();
workCache = provider.getRemoteCache(WORK_CACHE_NAME);

logger.debugf("Provider initialized. Cluster startup time: %s", Time.toDate(clusterStartupTime));
}

private static int initClusterStartupTime(RemoteCache<String, Integer> cache, int serverStartupTime) {
Integer clusterStartupTime = putIfAbsentWithRetries(cache, InfinispanClusterProvider.CLUSTER_STARTUP_TIME_KEY, serverStartupTime, -1);
return clusterStartupTime == null ? serverStartupTime : clusterStartupTime;
}


static <V extends Serializable> V putIfAbsentWithRetries(RemoteCache<String, V> workCache, String key, V value, int taskTimeoutInSeconds) {
ByRef<V> ref = new ByRef<>(null);

Expand All @@ -115,4 +118,24 @@ static <V extends Serializable> V putIfAbsentWithRetries(RemoteCache<String, V>

return ref.get();
}

@Override
public int clusterStartupTime() {
return clusterStartupTime;
}

@Override
public RemoteCache<String, LockEntry> cache() {
return workCache;
}

@Override
public RemoteInfinispanNotificationManager notificationManager() {
return notificationManager;
}

@Override
public Executor executor() {
return executor;
}
}
Expand Up @@ -346,9 +346,9 @@ protected EmbeddedCacheManager initEmbedded() {
defineClusteredCache(cacheManager, OFFLINE_USER_SESSION_CACHE_NAME, clusteredConfiguration);
defineClusteredCache(cacheManager, CLIENT_SESSION_CACHE_NAME, clusteredConfiguration);
defineClusteredCache(cacheManager, OFFLINE_CLIENT_SESSION_CACHE_NAME, clusteredConfiguration);
defineClusteredCache(cacheManager, LOGIN_FAILURE_CACHE_NAME, clusteredConfiguration);

if (InfinispanUtils.isEmbeddedInfinispan()) {
defineClusteredCache(cacheManager, LOGIN_FAILURE_CACHE_NAME, clusteredConfiguration);
defineClusteredCache(cacheManager, AUTHENTICATION_SESSIONS_CACHE_NAME, clusteredConfiguration);

var actionTokenBuilder = getActionTokenCacheConfig();
Expand Down
Expand Up @@ -48,14 +48,15 @@
import org.keycloak.models.sessions.infinispan.util.SessionTimeouts;
import org.keycloak.models.utils.KeycloakModelUtils;
import org.keycloak.models.utils.PostMigrationEvent;
import org.keycloak.provider.EnvironmentDependentProviderFactory;

import java.io.Serializable;
import java.util.Set;

/**
* @author <a href="mailto:mkanis@redhat.com">Martin Kanis</a>
*/
public class InfinispanUserLoginFailureProviderFactory implements UserLoginFailureProviderFactory {
public class InfinispanUserLoginFailureProviderFactory implements UserLoginFailureProviderFactory<InfinispanUserLoginFailureProvider>, EnvironmentDependentProviderFactory {

private static final Logger log = Logger.getLogger(InfinispanUserLoginFailureProviderFactory.class);

Expand All @@ -69,7 +70,7 @@ public class InfinispanUserLoginFailureProviderFactory implements UserLoginFailu
SerializeExecutionsByKey<LoginFailureKey> serializer = new SerializeExecutionsByKey<>();

@Override
public UserLoginFailureProvider create(KeycloakSession session) {
public InfinispanUserLoginFailureProvider create(KeycloakSession session) {
InfinispanConnectionProvider connections = session.getProvider(InfinispanConnectionProvider.class);
Cache<LoginFailureKey, SessionEntityWrapper<LoginFailureEntity>> loginFailures = connections.getCache(InfinispanConnectionProvider.LOGIN_FAILURE_CACHE_NAME);

Expand All @@ -90,14 +91,9 @@ public void postInit(final KeycloakSessionFactory factory) {
KeycloakModelUtils.runJobInTransaction(factory, (KeycloakSession session) -> {
checkRemoteCaches(session);
registerClusterListeners(session);
// TODO [pruivo] to remove: workaround to run the testsuite.
if (InfinispanUtils.isEmbeddedInfinispan()) {
loadLoginFailuresFromRemoteCaches(session);
}
loadLoginFailuresFromRemoteCaches(session);
});
} else if (event instanceof UserModel.UserRemovedEvent) {
UserModel.UserRemovedEvent userRemovedEvent = (UserModel.UserRemovedEvent) event;

} else if (event instanceof UserModel.UserRemovedEvent userRemovedEvent) {
UserLoginFailureProvider provider = userRemovedEvent.getKeycloakSession().getProvider(UserLoginFailureProvider.class, getId());
provider.removeUserLoginFailure(userRemovedEvent.getRealm(), userRemovedEvent.getUser().getId());
}
Expand Down Expand Up @@ -226,4 +222,9 @@ public String getId() {
public int order() {
return InfinispanUtils.PROVIDER_ORDER;
}

@Override
public boolean isSupported(Config.Scope config) {
return InfinispanUtils.isEmbeddedInfinispan();
}
}

0 comments on commit b3ba665

Please sign in to comment.