Skip to content

Commit

Permalink
fix: reduce the locking of methods reading indexes
Browse files Browse the repository at this point in the history
closes: #5973

Signed-off-by: Steve Hawkins <shawkins@redhat.com>
  • Loading branch information
shawkins authored and manusa committed May 9, 2024
1 parent 57a3f0b commit d067b58
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 187 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* Fix #5867: (crd-generator) Imply schemaFrom via JsonFormat shape (SchemaFrom takes precedence)
* Fix #5867: (java-generator) Add JsonFormat shape to date-time
* Fix #5954: (crd-generator) Sort required properties to ensure deterministic output
* Fix #5973: CacheImpl locking for reading indexes (Cache.byIndex|indexKeys|index) was reduced

#### Dependency Upgrade
* Fix #5695: Upgrade Fabric8 Kubernetes Model to Kubernetes v1.30.0
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Copyright (C) 2015 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand All @@ -37,22 +38,44 @@

/**
* It basically saves and indexes all the entries.
* <br>
* Index reads {@link #byIndex(String, String)}, {@link #indexKeys(String, String)}, {@link #index(String, HasMetadata)}
* are not globally locked and thus may not be fully consistent with the current state
*
* @param <T> type for cache object
*/
public class CacheImpl<T extends HasMetadata> implements Cache<T> {

private static class Index {
private Map<Object, Set<String>> values = new ConcurrentHashMap<Object, Set<String>>();

public void update(String indexKey, String key, boolean remove) {
if (remove) {
values.computeIfPresent(indexKey == null ? this : indexKey, (k, v) -> {
v.remove(key);
return v.isEmpty() ? null : v;
});
} else {
values.computeIfAbsent(indexKey == null ? this : indexKey, k -> ConcurrentHashMap.newKeySet()).add(key);
}
}

public Set<String> get(String indexKey) {
return values.getOrDefault(indexKey == null ? this : indexKey, Collections.emptySet());
}
}

// NAMESPACE_INDEX is the default index function for caching objects
public static final String NAMESPACE_INDEX = "namespace";

// indexers stores index functions by their names
private final Map<String, Function<T, List<String>>> indexers = Collections.synchronizedMap(new HashMap<>());

// items stores object instances
// @GuardedBy("getLockObject")
private ItemStore<T> items;

// indices stores objects' key by their indices
private final ConcurrentMap<String, ConcurrentMap<String, Set<String>>> indices = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Index> indices = new ConcurrentHashMap<>();

public CacheImpl() {
this(NAMESPACE_INDEX, Cache::metaNamespaceIndexFunc, Cache::metaNamespaceKeyFunc);
Expand All @@ -74,11 +97,13 @@ public void setItemStore(ItemStore<T> items) {
*/
@Override
public Map<String, Function<T, List<String>>> getIndexers() {
return Collections.unmodifiableMap(indexers);
synchronized (indexers) {
return Collections.unmodifiableMap(indexers);
}
}

@Override
public void addIndexers(Map<String, Function<T, List<String>>> indexersNew) {
public synchronized void addIndexers(Map<String, Function<T, List<String>>> indexersNew) {
Set<String> intersection = new HashSet<>(indexers.keySet());
intersection.retainAll(indexersNew.keySet());
if (!intersection.isEmpty()) {
Expand All @@ -96,15 +121,12 @@ public void addIndexers(Map<String, Function<T, List<String>>> indexersNew) {
* @param obj the object
* @return the old object
*/
public T put(T obj) {
public synchronized T put(T obj) {
if (obj == null) {
return null;
}
String key = getKey(obj);
T oldObj;
synchronized (getLockObject()) {
oldObj = this.items.put(key, obj);
}
T oldObj = this.items.put(key, obj);
this.updateIndices(oldObj, obj, key);
return oldObj;
}
Expand All @@ -115,14 +137,11 @@ public T put(T obj) {
* @param obj object
* @return the old object
*/
public T remove(T obj) {
public synchronized T remove(T obj) {
String key = getKey(obj);
T old;
synchronized (getLockObject()) {
old = this.items.remove(key);
}
T old = this.items.remove(key);
if (old != null) {
this.deleteFromIndices(old, key);
this.updateIndices(old, null, key);
}
return old;
}
Expand All @@ -134,9 +153,7 @@ public T remove(T obj) {
*/
@Override
public List<String> listKeys() {
synchronized (getLockObject()) {
return this.items.keySet().collect(Collectors.toList());
}
return this.items.keySet().collect(Collectors.toList());
}

/**
Expand All @@ -156,10 +173,8 @@ public T get(T obj) {
*/
@Override
public String getKey(T obj) {
synchronized (getLockObject()) {
String result = this.items.getKey(obj);
return result == null ? "" : result;
}
String result = this.items.getKey(obj);
return result == null ? "" : result;
}

/**
Expand All @@ -169,9 +184,7 @@ public String getKey(T obj) {
*/
@Override
public List<T> list() {
synchronized (getLockObject()) {
return this.items.values().collect(Collectors.toList());
}
return this.items.values().collect(Collectors.toList());
}

/**
Expand All @@ -182,9 +195,7 @@ public List<T> list() {
*/
@Override
public T getByKey(String key) {
synchronized (getLockObject()) {
return this.items.get(key);
}
return this.items.get(key);
}

/**
Expand All @@ -200,38 +211,27 @@ public List<T> index(String indexName, T obj) {
if (indexFunc == null) {
throw new IllegalArgumentException(String.format("index %s doesn't exist!", indexName));
}
Map<String, Set<String>> index = this.indices.get(indexName);
if (index.isEmpty()) {
return new ArrayList<>();
}

Index index = getIndex(indexName);
List<String> indexKeys = indexFunc.apply(obj);
Set<String> returnKeySet = new HashSet<>();
for (String indexKey : indexKeys) {
Set<String> set = index.get(indexKey);
if (set.isEmpty()) {
continue;
}
returnKeySet.addAll(set);
returnKeySet.addAll(index.get(indexKey));
}

return getItems(returnKeySet);
}

private List<T> getItems(Set<String> returnKeySet) {
List<T> items = new ArrayList<>(returnKeySet.size());
for (String absoluteKey : returnKeySet) {
T item;
synchronized (getLockObject()) {
item = this.items.get(absoluteKey);
}
if (item != null) {
items.add(item);
}
Optional.ofNullable(this.items.get(absoluteKey)).ifPresent(items::add);
}
return items;
}

private void checkContainsIndex(String indexName) {
if (!this.indexers.containsKey(indexName)) {
throw new IllegalArgumentException(String.format("index %s doesn't exist!", indexName));
}
private Index getIndex(String indexName) {
return Optional.ofNullable(this.indices.get(indexName))
.orElseThrow(() -> new IllegalArgumentException(String.format("index %s doesn't exist!", indexName)));
}

/**
Expand All @@ -243,13 +243,8 @@ private void checkContainsIndex(String indexName) {
*/
@Override
public List<String> indexKeys(String indexName, String indexKey) {
checkContainsIndex(indexName);
Map<String, Set<String>> index = this.indices.get(indexName);
if (index == null) {
return new ArrayList<>();
}
Set<String> set = index.getOrDefault(indexKey, Collections.emptySet());
return new ArrayList<>(set);
Index index = getIndex(indexName);
return new ArrayList<>(index.get(indexKey));
}

/**
Expand All @@ -261,26 +256,8 @@ public List<String> indexKeys(String indexName, String indexKey) {
*/
@Override
public List<T> byIndex(String indexName, String indexKey) {
checkContainsIndex(indexName);
Map<String, Set<String>> index = this.indices.get(indexName);
if (index == null) {
return new ArrayList<>();
}
Set<String> set = index.get(indexKey);
if (set == null) {
return new ArrayList<>();
}
List<T> items = new ArrayList<>(set.size());
for (String key : set) {
T item;
synchronized (getLockObject()) {
item = this.items.get(key);
}
if (item != null) {
items.add(item);
}
}
return items;
Index index = getIndex(indexName);
return getItems(index.get(indexKey));
}

/**
Expand All @@ -292,63 +269,28 @@ public List<T> byIndex(String indexName, String indexKey) {
* @param newObj new object
* @param key the key
*/
void updateIndices(T oldObj, T newObj, String key) {
if (oldObj != null) {
deleteFromIndices(oldObj, key);
}

synchronized (indexers) {
for (Map.Entry<String, Function<T, List<String>>> indexEntry : indexers.entrySet()) {
String indexName = indexEntry.getKey();
Function<T, List<String>> indexFunc = indexEntry.getValue();
Map<String, Set<String>> index = this.indices.get(indexName);
if (index != null) {
updateIndex(key, newObj, indexFunc, index);
private void updateIndices(T oldObj, T newObj, String key) {
for (Map.Entry<String, Function<T, List<String>>> indexEntry : indexers.entrySet()) {
String indexName = indexEntry.getKey();
Function<T, List<String>> indexFunc = indexEntry.getValue();
Index index = this.indices.get(indexName);
if (index != null) {
if (oldObj != null) {
updateIndex(key, oldObj, indexFunc, index, true);
}
if (newObj != null) {
updateIndex(key, newObj, indexFunc, index, false);
}
}
}
}

private void updateIndex(String key, T newObj, Function<T, List<String>> indexFunc, Map<String, Set<String>> index) {
List<String> indexValues = indexFunc.apply(newObj);
private void updateIndex(String key, T obj, Function<T, List<String>> indexFunc, Index index,
boolean remove) {
List<String> indexValues = indexFunc.apply(obj);
if (indexValues != null && !indexValues.isEmpty()) {
for (String indexValue : indexValues) {
if (indexValue != null) {
index.computeIfAbsent(indexValue, k -> ConcurrentHashMap.newKeySet()).add(key);
}
}
}
}

/**
* Removes the object from each of the managed indexes.
*
* It is intended to be called from a function that already has a lock on the cache.
*
* @param oldObj the old object
* @param key the key
*/
private void deleteFromIndices(T oldObj, String key) {
synchronized (indexers) {
for (Map.Entry<String, Function<T, List<String>>> indexEntry : this.indexers.entrySet()) {
Function<T, List<String>> indexFunc = indexEntry.getValue();
List<String> indexValues = indexFunc.apply(oldObj);
if (indexValues == null || indexValues.isEmpty()) {
continue;
}

Map<String, Set<String>> index = this.indices.get(indexEntry.getKey());
if (index == null) {
continue;
}
for (String indexValue : indexValues) {
if (indexValue != null) {
Set<String> indexSet = index.get(indexValue);
if (indexSet != null) {
indexSet.remove(key);
}
}
}
index.update(indexValue, key, remove);
}
}
}
Expand All @@ -359,16 +301,15 @@ private void deleteFromIndices(T oldObj, String key) {
* @param indexName the index name
* @param indexFunc the index func
*/
public CacheImpl<T> addIndexFunc(String indexName, Function<T, List<String>> indexFunc) {
ConcurrentMap<String, Set<String>> index = new ConcurrentHashMap<>();
synchronized (indexers) {
this.indices.put(indexName, index);
this.indexers.put(indexName, indexFunc);

synchronized (getLockObject()) {
items.values().forEach(v -> updateIndex(getKey(v), v, indexFunc, index));
}
public synchronized CacheImpl<T> addIndexFunc(String indexName, Function<T, List<String>> indexFunc) {
if (this.indices.containsKey(indexName)) {
throw new IllegalArgumentException("Indexer conflict: " + indexName);
}
Index index = new Index();
this.indices.put(indexName, index);
this.indexers.put(indexName, indexFunc);

items.values().forEach(v -> updateIndex(getKey(v), v, indexFunc, index, false));
return this;
}

Expand Down Expand Up @@ -431,19 +372,17 @@ public static List<String> metaNamespaceIndexFunc(Object obj) {
}

@Override
public void removeIndexer(String name) {
public synchronized void removeIndexer(String name) {
this.indices.remove(name);
this.indexers.remove(name);
}

public boolean isFullState() {
synchronized (getLockObject()) {
return items.isFullState();
}
return items.isFullState();
}

public Object getLockObject() {
return this.items;
return this;
}

}

0 comments on commit d067b58

Please sign in to comment.