Skip to content

Commit

Permalink
fix #3969 #4222: deferring add events until cache is complete
Browse files Browse the repository at this point in the history
also omitting resync events on relist
  • Loading branch information
shawkins committed Jun 23, 2022
1 parent d686ed0 commit ad60ae6
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 14 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## CHANGELOG

### 5.12.3
* Fix #3969: relist will not trigger sync events
* Fix #4222: backport of #4082 - to not process events until the cache is complete

### 5.12.2 (2022-04-06)
* Fix #3582: SSL truststore can be loaded in FIPS enabled environments
* Fix #3797: Implement SchemaSwap; generate CRD from model not owned
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
package io.fabric8.kubernetes.client.informers.cache;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.informers.cache.ProcessorListener.Notification;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Wraps a {@link Cache} and a {@link SharedProcessor} to distribute events related to changes and syncs
Expand All @@ -29,6 +32,8 @@ public class ProcessorStore<T extends HasMetadata> implements SyncableStore<T> {

private Cache<T> cache;
private SharedProcessor<T> processor;
private AtomicBoolean synced = new AtomicBoolean();
private List<String> deferredAdd = new ArrayList<>();

public ProcessorStore(Cache<T> cache, SharedProcessor<T> processor) {
this.cache = cache;
Expand All @@ -40,14 +45,26 @@ public void add(T obj) {
update(obj);
}

@Override
public void update(T obj) {
private Notification<T> updateInternal(T obj) {
T oldObj = this.cache.put(obj);
Notification<T> notification = null;
if (oldObj != null) {
this.processor.distribute(new ProcessorListener.UpdateNotification<>(oldObj, obj),
Objects.equals(oldObj.getMetadata().getResourceVersion(), obj.getMetadata().getResourceVersion()));
if (!Objects.equals(oldObj.getMetadata().getResourceVersion(), obj.getMetadata().getResourceVersion())) {
notification = new ProcessorListener.UpdateNotification<>(oldObj, obj);
}
} else if (synced.get()) {
notification = new ProcessorListener.AddNotification<>(obj);
} else {
this.processor.distribute(new ProcessorListener.AddNotification<>(obj), false);
deferredAdd.add(getKey(obj));
}
return notification;
}

@Override
public void update(T obj) {
Notification<T> notification = updateInternal(obj);
if (notification != null) {
this.processor.distribute(notification, false);
}
}

Expand Down Expand Up @@ -81,6 +98,11 @@ public T getByKey(String key) {

@Override
public void retainAll(Set<String> nextKeys) {
if (synced.compareAndSet(false, true)) {
deferredAdd.stream().map(cache::getByKey).filter(Objects::nonNull)
.forEach(v -> this.processor.distribute(new ProcessorListener.AddNotification<>(v), false));
deferredAdd.clear();
}
List<T> current = cache.list();
if (nextKeys.isEmpty() && current.isEmpty()) {
this.processor.distribute(l -> l.getHandler().onNothing(), false);
Expand All @@ -90,11 +112,11 @@ public void retainAll(Set<String> nextKeys) {
String key = cache.getKey(v);
if (!nextKeys.contains(key)) {
cache.remove(v);
this.processor.distribute(new ProcessorListener.DeleteNotification<>(v, true), false);
this.processor.distribute(new ProcessorListener.DeleteNotification<>(v, true), false);
}
});
}

@Override
public String getKey(T obj) {
return cache.getKey(obj);
Expand All @@ -106,4 +128,4 @@ public void resync() {
.forEach(i -> this.processor.distribute(new ProcessorListener.UpdateNotification<>(i, i), true));
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand All @@ -43,6 +45,9 @@ public void testEvents() {
ProcessorStore<Pod> processorStore = new ProcessorStore<>(podCache, processor);
Pod pod = new PodBuilder().withNewMetadata().withName("pod").endMetadata().build();

// initial sync complete
processorStore.retainAll(Collections.emptySet());

// add notification
processorStore.add(pod);

Expand All @@ -60,21 +65,19 @@ public void testEvents() {
Mockito.when(podCache.remove(pod)).thenReturn(pod);
processorStore.delete(pod);

Mockito.verify(processor, Mockito.times(4)).distribute(notificationCaptor.capture(), syncCaptor.capture());
Mockito.verify(processor, Mockito.times(3)).distribute(notificationCaptor.capture(), syncCaptor.capture());

List<Notification<Pod>> notifications = notificationCaptor.getAllValues();

assertThat(notifications.get(0)).isInstanceOf(AddNotification.class);
assertThat(notifications.get(1)).isInstanceOf(AddNotification.class);
assertThat(notifications.get(2)).isInstanceOf(UpdateNotification.class);
assertThat(notifications.get(3)).isInstanceOf(DeleteNotification.class);
assertThat(notifications.get(2)).isInstanceOf(DeleteNotification.class);

List<Boolean> syncValues = syncCaptor.getAllValues();

assertThat(syncValues.get(0)).isFalse();
assertThat(syncValues.get(1)).isFalse();
assertThat(syncValues.get(2)).isTrue(); // same object/revision, so it's sync
assertThat(syncValues.get(3)).isFalse();
assertThat(syncValues.get(2)).isFalse();
}

@Test
Expand All @@ -86,13 +89,20 @@ public void testSyncEvents() {

ProcessorStore<Pod> processorStore = new ProcessorStore<>(podCache, processor);

Pod pod = new PodBuilder().withNewMetadata().endMetadata().build();
Pod pod = new PodBuilder().withNewMetadata().withName("pod1").withResourceVersion("1").endMetadata().build();
Pod pod2 = new PodBuilder().withNewMetadata().withName("pod2").endMetadata().build();

// replace empty store with two values
processorStore.add(pod);
processorStore.add(pod2);

// add events should not be called until retainAll
Mockito.verify(processor, Mockito.times(0)).distribute(notificationCaptor.capture(), syncCaptor.capture());

List<Pod> pods = Arrays.asList(pod, pod2);

processorStore.retainAll(pods.stream().map(Cache::metaNamespaceKeyFunc).collect(Collectors.toSet()));

// resync two values
processorStore.resync();

Expand Down

0 comments on commit ad60ae6

Please sign in to comment.