From 257bd42d4c4a574cf57aedddbfeae406a11e16f3 Mon Sep 17 00:00:00 2001 From: Pedro Ruivo Date: Thu, 25 Apr 2024 09:16:37 +0200 Subject: [PATCH] Use cache.compute() method to improve the replace retry loop Closes #29073 Signed-off-by: Pedro Ruivo --- ...ltInfinispanConnectionProviderFactory.java | 24 +++--- .../org/keycloak/marshalling/Marshalling.java | 25 +++++++ .../InfinispanChangelogBasedTransaction.java | 56 ++++---------- .../infinispan/changes/ReplaceFunction.java | 73 +++++++++++++++++++ .../infinispan/CacheManagerFactory.java | 8 +- 5 files changed, 125 insertions(+), 61 deletions(-) create mode 100644 model/infinispan/src/main/java/org/keycloak/marshalling/Marshalling.java create mode 100644 model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/ReplaceFunction.java diff --git a/model/infinispan/src/main/java/org/keycloak/connections/infinispan/DefaultInfinispanConnectionProviderFactory.java b/model/infinispan/src/main/java/org/keycloak/connections/infinispan/DefaultInfinispanConnectionProviderFactory.java index 25b7340aa9f..eb77f6e2115 100755 --- a/model/infinispan/src/main/java/org/keycloak/connections/infinispan/DefaultInfinispanConnectionProviderFactory.java +++ b/model/infinispan/src/main/java/org/keycloak/connections/infinispan/DefaultInfinispanConnectionProviderFactory.java @@ -17,6 +17,14 @@ package org.keycloak.connections.infinispan; +import java.util.Iterator; +import java.util.ServiceLoader; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Supplier; + import org.infinispan.client.hotrod.ProtocolVersion; import org.infinispan.commons.dataconversion.MediaType; import org.infinispan.configuration.cache.CacheMode; @@ -24,7 +32,6 @@ import org.infinispan.configuration.cache.ConfigurationBuilder; import org.infinispan.configuration.global.GlobalConfigurationBuilder; import org.infinispan.eviction.EvictionStrategy; -import org.infinispan.jboss.marshalling.core.JBossUserMarshaller; import org.infinispan.manager.DefaultCacheManager; import org.infinispan.manager.EmbeddedCacheManager; import org.infinispan.persistence.remote.configuration.RemoteStoreConfigurationBuilder; @@ -37,6 +44,7 @@ import org.keycloak.cluster.ClusterProvider; import org.keycloak.cluster.ManagedCacheManagerProvider; import org.keycloak.cluster.infinispan.KeycloakHotRodMarshallerFactory; +import org.keycloak.marshalling.Marshalling; import org.keycloak.models.KeycloakSession; import org.keycloak.models.KeycloakSessionFactory; import org.keycloak.models.cache.infinispan.ClearCacheEvent; @@ -47,14 +55,6 @@ import org.keycloak.provider.InvalidationHandler.ObjectType; import org.keycloak.provider.ProviderEvent; -import java.util.Iterator; -import java.util.ServiceLoader; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.function.Supplier; - import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.ACTION_TOKEN_CACHE; import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.AUTHENTICATION_SESSIONS_CACHE_NAME; import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.AUTHORIZATION_CACHE_NAME; @@ -247,11 +247,7 @@ protected EmbeddedCacheManager initEmbedded() { gcb.jmx().domain(InfinispanConnectionProvider.JMX_DOMAIN).enable(); } - // For Infinispan 10, we go with the JBoss marshalling. - // TODO: This should be replaced later with the marshalling recommended by infinispan. Probably protostream. - // See https://infinispan.org/docs/stable/titles/developing/developing.html#marshalling for the details - gcb.serialization().marshaller(new JBossUserMarshaller()); - + Marshalling.configure(gcb); EmbeddedCacheManager cacheManager = new DefaultCacheManager(gcb.build()); if (useKeycloakTimeService) { setTimeServiceToKeycloakTime(cacheManager); diff --git a/model/infinispan/src/main/java/org/keycloak/marshalling/Marshalling.java b/model/infinispan/src/main/java/org/keycloak/marshalling/Marshalling.java new file mode 100644 index 00000000000..e710f09def9 --- /dev/null +++ b/model/infinispan/src/main/java/org/keycloak/marshalling/Marshalling.java @@ -0,0 +1,25 @@ +package org.keycloak.marshalling; + +import org.infinispan.configuration.global.GlobalConfigurationBuilder; +import org.infinispan.jboss.marshalling.core.JBossUserMarshaller; +import org.keycloak.models.sessions.infinispan.changes.ReplaceFunction; + +@SuppressWarnings("removal") +public final class Marshalling { + + private Marshalling() { + } + + // Note: Min ID is 2500 + public static final Integer REPLACE_FUNCTION_ID = 2500; + + // For Infinispan 10, we go with the JBoss marshalling. + // TODO: This should be replaced later with the marshalling recommended by infinispan. Probably protostream. + // See https://infinispan.org/docs/stable/titles/developing/developing.html#marshalling for the details + public static void configure(GlobalConfigurationBuilder builder) { + builder.serialization() + .marshaller(new JBossUserMarshaller()) + .addAdvancedExternalizer(ReplaceFunction.INSTANCE); + } + +} diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/InfinispanChangelogBasedTransaction.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/InfinispanChangelogBasedTransaction.java index a062d4ad1fc..ca344d0a6e7 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/InfinispanChangelogBasedTransaction.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/InfinispanChangelogBasedTransaction.java @@ -20,7 +20,6 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; -import java.util.Random; import java.util.concurrent.TimeUnit; import org.infinispan.Cache; @@ -239,58 +238,33 @@ private void runOperationInCluster(K key, MergedUpdate task, SessionEntityWr private void replace(K key, MergedUpdate task, SessionEntityWrapper oldVersionEntity, long lifespanMs, long maxIdleTimeMs) { serializer.runSerialized(key, () -> { SessionEntityWrapper oldVersion = oldVersionEntity; - boolean replaced = false; + SessionEntityWrapper returnValue = null; int iteration = 0; V session = oldVersion.getEntity(); - - while (!replaced && iteration < InfinispanUtil.MAXIMUM_REPLACE_RETRIES) { - iteration++; - + var writeCache = CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache); + while (iteration++ < InfinispanUtil.MAXIMUM_REPLACE_RETRIES) { SessionEntityWrapper newVersionEntity = generateNewVersionAndWrapEntity(session, oldVersion.getLocalMetadata()); + returnValue = writeCache.computeIfPresent(key, new ReplaceFunction<>(oldVersion.getVersion(), newVersionEntity), lifespanMs, TimeUnit.MILLISECONDS, maxIdleTimeMs, TimeUnit.MILLISECONDS); - // Atomic cluster-aware replace - replaced = CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache).replace(key, oldVersion, newVersionEntity, lifespanMs, TimeUnit.MILLISECONDS, maxIdleTimeMs, TimeUnit.MILLISECONDS); - - // Replace fail. Need to load latest entity from cache, apply updates again and try to replace in cache again - if (!replaced) { - if (logger.isDebugEnabled()) { - logger.debugf("Replace failed for entity: %s, old version %s, new version %s. Will try again", key, oldVersion.getVersion(), newVersionEntity.getVersion()); - } - backoff(iteration); - - oldVersion = cache.get(key); - - if (oldVersion == null) { - logger.debugf("Entity %s not found. Maybe removed in the meantime. Replace task will be ignored", key); - return; - } - - session = oldVersion.getEntity(); + if (returnValue == null) { + logger.debugf("Entity %s not found. Maybe removed in the meantime. Replace task will be ignored", key); + return; + } - task.runUpdate(session); - } else { + if (returnValue.getVersion().equals(newVersionEntity.getVersion())){ if (logger.isTraceEnabled()) { logger.tracef("Replace SUCCESS for entity: %s . old version: %s, new version: %s, Lifespan: %d ms, MaxIdle: %d ms", key, oldVersion.getVersion(), newVersionEntity.getVersion(), task.getLifespanMs(), task.getMaxIdleTimeMs()); } + return; } - } - if (!replaced) { - logger.warnf("Failed to replace entity '%s' in cache '%s'", key, cache.getName()); + oldVersion = returnValue; + session = oldVersion.getEntity(); + task.runUpdate(session); } - }); - } - /** - * Wait a random amount of time to avoid a conflict with other concurrent actors on the next attempt. - */ - private static void backoff(int iteration) { - try { - Thread.sleep(new Random().nextInt(iteration)); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } + logger.warnf("Failed to replace entity '%s' in cache '%s'. Expected: %s, Current: %s", key, cache.getName(), oldVersion, returnValue); + }); } @Override diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/ReplaceFunction.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/ReplaceFunction.java new file mode 100644 index 00000000000..10aad034332 --- /dev/null +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/ReplaceFunction.java @@ -0,0 +1,73 @@ +package org.keycloak.models.sessions.infinispan.changes; + +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.Objects; +import java.util.Set; +import java.util.UUID; +import java.util.function.BiFunction; + +import org.infinispan.commons.marshall.AdvancedExternalizer; +import org.infinispan.commons.marshall.MarshallUtil; +import org.keycloak.marshalling.Marshalling; +import org.keycloak.models.sessions.infinispan.entities.SessionEntity; + +/** + * Performs an entity replacement in Infinispan, using its versions instead of equality. + * + * @param The Infinispan key type. + * @param The Infinispan value type (Keycloak entity) + */ +public class ReplaceFunction implements BiFunction, SessionEntityWrapper> { + + @SuppressWarnings({"removal", "rawtypes"}) + public static final AdvancedExternalizer INSTANCE = new Externalizer(); + private final UUID expectedVersion; + private final SessionEntityWrapper newValue; + + public ReplaceFunction(UUID expectedVersion, SessionEntityWrapper newValue) { + this.expectedVersion = Objects.requireNonNull(expectedVersion); + this.newValue = Objects.requireNonNull(newValue); + } + + @Override + public SessionEntityWrapper apply(K key, SessionEntityWrapper currentValue) { + assert currentValue != null; + return expectedVersion.equals(currentValue.getVersion()) ? newValue : currentValue; + } + + @SuppressWarnings({"removal", "rawtypes"}) + private static class Externalizer implements AdvancedExternalizer { + + private static final SessionEntityWrapper.ExternalizerImpl EXTERNALIZER = new SessionEntityWrapper.ExternalizerImpl(); + private static final byte VERSION_1 = 1; + + @Override + public Set> getTypeClasses() { + return Set.of(ReplaceFunction.class); + } + + @Override + public Integer getId() { + return Marshalling.REPLACE_FUNCTION_ID; + } + + @Override + public void writeObject(ObjectOutput output, ReplaceFunction object) throws IOException { + output.writeByte(VERSION_1); + MarshallUtil.marshallUUID(object.expectedVersion, output, false); + EXTERNALIZER.writeObject(output, object.newValue); + } + + @Override + public ReplaceFunction readObject(ObjectInput input) throws IOException, ClassNotFoundException { + var version = input.readByte(); + if (version != VERSION_1) { + throw new IOException("Invalid version: " + version); + } + //noinspection unchecked + return new ReplaceFunction(MarshallUtil.unmarshallUUID(input, false), EXTERNALIZER.readObject(input)); + } + } +} diff --git a/quarkus/runtime/src/main/java/org/keycloak/quarkus/runtime/storage/legacy/infinispan/CacheManagerFactory.java b/quarkus/runtime/src/main/java/org/keycloak/quarkus/runtime/storage/legacy/infinispan/CacheManagerFactory.java index b29f0a33d4f..f05a76a03d1 100644 --- a/quarkus/runtime/src/main/java/org/keycloak/quarkus/runtime/storage/legacy/infinispan/CacheManagerFactory.java +++ b/quarkus/runtime/src/main/java/org/keycloak/quarkus/runtime/storage/legacy/infinispan/CacheManagerFactory.java @@ -33,7 +33,6 @@ import org.infinispan.configuration.global.GlobalConfiguration; import org.infinispan.configuration.parsing.ConfigurationBuilderHolder; import org.infinispan.configuration.parsing.ParserRegistry; -import org.infinispan.jboss.marshalling.core.JBossUserMarshaller; import org.infinispan.manager.DefaultCacheManager; import org.infinispan.metrics.config.MicrometerMeterRegisterConfigurationBuilder; import org.infinispan.persistence.remote.configuration.ExhaustedAction; @@ -47,6 +46,7 @@ import org.keycloak.common.Profile; import org.keycloak.config.CachingOptions; import org.keycloak.config.MetricsOptions; +import org.keycloak.marshalling.Marshalling; import org.keycloak.quarkus.runtime.configuration.Configuration; import javax.net.ssl.SSLContext; @@ -134,11 +134,7 @@ private DefaultCacheManager startEmbeddedCacheManager() { builder.getNamedConfigurationBuilders().forEach((s, configurationBuilder) -> configurationBuilder.statistics().enabled(true)); } - // For Infinispan 10, we go with the JBoss marshalling. - // TODO: This should be replaced later with the marshalling recommended by infinispan. Probably protostream. - // See https://infinispan.org/docs/stable/titles/developing/developing.html#marshalling for the details - builder.getGlobalConfigurationBuilder().serialization().marshaller(new JBossUserMarshaller()); - + Marshalling.configure(builder.getGlobalConfigurationBuilder()); return new DefaultCacheManager(builder, isStartEagerly()); }