Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use cache.compute() method to improve the replace retry loop #29081

Merged
merged 1 commit into from Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -17,14 +17,21 @@

package org.keycloak.connections.infinispan;

import java.util.Iterator;
import java.util.ServiceLoader;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;

import org.infinispan.client.hotrod.ProtocolVersion;
import org.infinispan.commons.dataconversion.MediaType;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.eviction.EvictionStrategy;
import org.infinispan.jboss.marshalling.core.JBossUserMarshaller;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.persistence.remote.configuration.RemoteStoreConfigurationBuilder;
Expand All @@ -37,6 +44,7 @@
import org.keycloak.cluster.ClusterProvider;
import org.keycloak.cluster.ManagedCacheManagerProvider;
import org.keycloak.cluster.infinispan.KeycloakHotRodMarshallerFactory;
import org.keycloak.marshalling.Marshalling;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.models.cache.infinispan.ClearCacheEvent;
Expand All @@ -47,14 +55,6 @@
import org.keycloak.provider.InvalidationHandler.ObjectType;
import org.keycloak.provider.ProviderEvent;

import java.util.Iterator;
import java.util.ServiceLoader;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;

import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.ACTION_TOKEN_CACHE;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.AUTHENTICATION_SESSIONS_CACHE_NAME;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.AUTHORIZATION_CACHE_NAME;
Expand Down Expand Up @@ -247,11 +247,7 @@ protected EmbeddedCacheManager initEmbedded() {
gcb.jmx().domain(InfinispanConnectionProvider.JMX_DOMAIN).enable();
}

// For Infinispan 10, we go with the JBoss marshalling.
// TODO: This should be replaced later with the marshalling recommended by infinispan. Probably protostream.
// See https://infinispan.org/docs/stable/titles/developing/developing.html#marshalling for the details
gcb.serialization().marshaller(new JBossUserMarshaller());

Marshalling.configure(gcb);
EmbeddedCacheManager cacheManager = new DefaultCacheManager(gcb.build());
if (useKeycloakTimeService) {
setTimeServiceToKeycloakTime(cacheManager);
Expand Down
@@ -0,0 +1,25 @@
package org.keycloak.marshalling;

import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.jboss.marshalling.core.JBossUserMarshaller;
import org.keycloak.models.sessions.infinispan.changes.ReplaceFunction;

@SuppressWarnings("removal")
public final class Marshalling {

private Marshalling() {
}

// Note: Min ID is 2500
public static final Integer REPLACE_FUNCTION_ID = 2500;

// For Infinispan 10, we go with the JBoss marshalling.
// TODO: This should be replaced later with the marshalling recommended by infinispan. Probably protostream.
// See https://infinispan.org/docs/stable/titles/developing/developing.html#marshalling for the details
public static void configure(GlobalConfigurationBuilder builder) {
builder.serialization()
.marshaller(new JBossUserMarshaller())
.addAdvancedExternalizer(ReplaceFunction.INSTANCE);
}

}
Expand Up @@ -20,7 +20,6 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.TimeUnit;

import org.infinispan.Cache;
Expand Down Expand Up @@ -239,58 +238,33 @@ private void runOperationInCluster(K key, MergedUpdate<V> task, SessionEntityWr
private void replace(K key, MergedUpdate<V> task, SessionEntityWrapper<V> oldVersionEntity, long lifespanMs, long maxIdleTimeMs) {
serializer.runSerialized(key, () -> {
SessionEntityWrapper<V> oldVersion = oldVersionEntity;
boolean replaced = false;
SessionEntityWrapper<V> returnValue = null;
int iteration = 0;
V session = oldVersion.getEntity();

while (!replaced && iteration < InfinispanUtil.MAXIMUM_REPLACE_RETRIES) {
iteration++;

var writeCache = CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache);
while (iteration++ < InfinispanUtil.MAXIMUM_REPLACE_RETRIES) {
SessionEntityWrapper<V> newVersionEntity = generateNewVersionAndWrapEntity(session, oldVersion.getLocalMetadata());
returnValue = writeCache.computeIfPresent(key, new ReplaceFunction<>(oldVersion.getVersion(), newVersionEntity), lifespanMs, TimeUnit.MILLISECONDS, maxIdleTimeMs, TimeUnit.MILLISECONDS);

// Atomic cluster-aware replace
replaced = CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache).replace(key, oldVersion, newVersionEntity, lifespanMs, TimeUnit.MILLISECONDS, maxIdleTimeMs, TimeUnit.MILLISECONDS);

// Replace fail. Need to load latest entity from cache, apply updates again and try to replace in cache again
if (!replaced) {
if (logger.isDebugEnabled()) {
logger.debugf("Replace failed for entity: %s, old version %s, new version %s. Will try again", key, oldVersion.getVersion(), newVersionEntity.getVersion());
}
backoff(iteration);

oldVersion = cache.get(key);

if (oldVersion == null) {
logger.debugf("Entity %s not found. Maybe removed in the meantime. Replace task will be ignored", key);
return;
}

session = oldVersion.getEntity();
if (returnValue == null) {
logger.debugf("Entity %s not found. Maybe removed in the meantime. Replace task will be ignored", key);
return;
}

task.runUpdate(session);
} else {
if (returnValue.getVersion().equals(newVersionEntity.getVersion())){
if (logger.isTraceEnabled()) {
logger.tracef("Replace SUCCESS for entity: %s . old version: %s, new version: %s, Lifespan: %d ms, MaxIdle: %d ms", key, oldVersion.getVersion(), newVersionEntity.getVersion(), task.getLifespanMs(), task.getMaxIdleTimeMs());
}
return;
}
}

if (!replaced) {
logger.warnf("Failed to replace entity '%s' in cache '%s'", key, cache.getName());
oldVersion = returnValue;
session = oldVersion.getEntity();
task.runUpdate(session);
}
});
}

/**
* Wait a random amount of time to avoid a conflict with other concurrent actors on the next attempt.
*/
private static void backoff(int iteration) {
try {
Thread.sleep(new Random().nextInt(iteration));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
logger.warnf("Failed to replace entity '%s' in cache '%s'. Expected: %s, Current: %s", key, cache.getName(), oldVersion, returnValue);
});
}

@Override
Expand Down
@@ -0,0 +1,73 @@
package org.keycloak.models.sessions.infinispan.changes;

import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.function.BiFunction;

import org.infinispan.commons.marshall.AdvancedExternalizer;
import org.infinispan.commons.marshall.MarshallUtil;
import org.keycloak.marshalling.Marshalling;
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;

/**
* Performs an entity replacement in Infinispan, using its versions instead of equality.
*
* @param <K> The Infinispan key type.
* @param <T> The Infinispan value type (Keycloak entity)
*/
public class ReplaceFunction<K, T extends SessionEntity> implements BiFunction<K, SessionEntityWrapper<T>, SessionEntityWrapper<T>> {

@SuppressWarnings({"removal", "rawtypes"})
public static final AdvancedExternalizer<ReplaceFunction> INSTANCE = new Externalizer();
private final UUID expectedVersion;
private final SessionEntityWrapper<T> newValue;
ahus1 marked this conversation as resolved.
Show resolved Hide resolved

public ReplaceFunction(UUID expectedVersion, SessionEntityWrapper<T> newValue) {
this.expectedVersion = Objects.requireNonNull(expectedVersion);
this.newValue = Objects.requireNonNull(newValue);
}

@Override
public SessionEntityWrapper<T> apply(K key, SessionEntityWrapper<T> currentValue) {
assert currentValue != null;
return expectedVersion.equals(currentValue.getVersion()) ? newValue : currentValue;
}

@SuppressWarnings({"removal", "rawtypes"})
private static class Externalizer implements AdvancedExternalizer<ReplaceFunction> {

private static final SessionEntityWrapper.ExternalizerImpl EXTERNALIZER = new SessionEntityWrapper.ExternalizerImpl();
private static final byte VERSION_1 = 1;

@Override
public Set<Class<? extends ReplaceFunction>> getTypeClasses() {
return Set.of(ReplaceFunction.class);
}

@Override
public Integer getId() {
return Marshalling.REPLACE_FUNCTION_ID;
}

@Override
public void writeObject(ObjectOutput output, ReplaceFunction object) throws IOException {
output.writeByte(VERSION_1);
MarshallUtil.marshallUUID(object.expectedVersion, output, false);
EXTERNALIZER.writeObject(output, object.newValue);
}

@Override
public ReplaceFunction<?,?> readObject(ObjectInput input) throws IOException, ClassNotFoundException {
var version = input.readByte();
if (version != VERSION_1) {
throw new IOException("Invalid version: " + version);
}
//noinspection unchecked
return new ReplaceFunction<Object, SessionEntity>(MarshallUtil.unmarshallUUID(input, false), EXTERNALIZER.readObject(input));
}
}
}
Expand Up @@ -33,7 +33,6 @@
import org.infinispan.configuration.global.GlobalConfiguration;
import org.infinispan.configuration.parsing.ConfigurationBuilderHolder;
import org.infinispan.configuration.parsing.ParserRegistry;
import org.infinispan.jboss.marshalling.core.JBossUserMarshaller;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.metrics.config.MicrometerMeterRegisterConfigurationBuilder;
import org.infinispan.persistence.remote.configuration.ExhaustedAction;
Expand All @@ -47,6 +46,7 @@
import org.keycloak.common.Profile;
import org.keycloak.config.CachingOptions;
import org.keycloak.config.MetricsOptions;
import org.keycloak.marshalling.Marshalling;
import org.keycloak.quarkus.runtime.configuration.Configuration;

import javax.net.ssl.SSLContext;
Expand Down Expand Up @@ -134,11 +134,7 @@ private DefaultCacheManager startEmbeddedCacheManager() {
builder.getNamedConfigurationBuilders().forEach((s, configurationBuilder) -> configurationBuilder.statistics().enabled(true));
}

// For Infinispan 10, we go with the JBoss marshalling.
// TODO: This should be replaced later with the marshalling recommended by infinispan. Probably protostream.
// See https://infinispan.org/docs/stable/titles/developing/developing.html#marshalling for the details
builder.getGlobalConfigurationBuilder().serialization().marshaller(new JBossUserMarshaller());

Marshalling.configure(builder.getGlobalConfigurationBuilder());
return new DefaultCacheManager(builder, isStartEagerly());
}

Expand Down