Skip to content

Commit

Permalink
Merge pull request #2428 from manusa/fix/watch-test
Browse files Browse the repository at this point in the history
  • Loading branch information
fusesource-ci committed Aug 24, 2020
2 parents adb6d42 + 2193158 commit 64bdd32
Showing 1 changed file with 98 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,176 +16,192 @@

package io.fabric8.kubernetes.client.mock;

import static io.fabric8.kubernetes.client.Watcher.Action.DELETED;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import io.fabric8.kubernetes.api.model.ListOptionsBuilder;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.Status;
import io.fabric8.kubernetes.api.model.StatusBuilder;
import io.fabric8.kubernetes.api.model.WatchEvent;
import io.fabric8.kubernetes.api.model.WatchEventBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.dsl.Watchable;
import io.fabric8.kubernetes.client.server.mock.KubernetesServer;
import junit.framework.AssertionFailedError;

import java.net.HttpURLConnection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.junit.Rule;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.migrationsupport.rules.EnableRuleMigrationSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@EnableRuleMigrationSupport
public class WatchTest {
Logger logger = LoggerFactory.getLogger(WatchTest.class);
class WatchTest {

private static final Long EVENT_WAIT_PERIOD_MS = 10L;

@Rule
public KubernetesServer server = new KubernetesServer(false);

static final Pod pod1 = new PodBuilder().withNewMetadata().withNamespace("test").withName("pod1")
.withResourceVersion("1").endMetadata().build();
private KubernetesClient client;
private Pod pod1;

static final Status outdatedStatus = new StatusBuilder().withCode(HttpURLConnection.HTTP_GONE)
.withMessage(
"401: The event in requested index is outdated and cleared (the requested history has been cleared [3/1]) [2]")
.build();
static final WatchEvent outdatedEvent = new WatchEventBuilder().withStatusObject(outdatedStatus).build();
static final Long EVENT_WAIT_PERIOD = 10L;
@BeforeEach
void setUp() {
client = server.getClient().inNamespace("test");
pod1 = new PodBuilder().withNewMetadata().withNamespace("test").withName("pod1")
.withResourceVersion("1").endMetadata().build();
}

@Test
public void testDeletedAndOutdated() throws InterruptedException {
logger.info("testDeletedAndOutdated");
KubernetesClient client = server.getClient().inNamespace("test");

// DELETED event, then history outdated
@DisplayName("TryWithResources, connects and receives event then receives GONE, should receive first event and then close")
void testTryWithResourcesConnectsThenReceivesEvent() throws InterruptedException {
// Given
server.expect()
.withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&watch=true")
.andUpgradeToWebSocket().open().waitFor(EVENT_WAIT_PERIOD).andEmit(new WatchEvent(pod1, "DELETED")).waitFor(EVENT_WAIT_PERIOD)
.andEmit(outdatedEvent).done().once();

.andUpgradeToWebSocket().open()
.waitFor(EVENT_WAIT_PERIOD_MS).andEmit(new WatchEvent(pod1, "DELETED"))
.waitFor(EVENT_WAIT_PERIOD_MS).andEmit(outdatedEvent()).done().once();
final CountDownLatch deleteLatch = new CountDownLatch(1);
final CountDownLatch closeLatch = new CountDownLatch(1);
try (Watch watch = client.pods().withName("pod1").withResourceVersion("1").watch(new Watcher<Pod>() {
final Watcher<Pod> watcher = new Watcher<Pod>() {
@Override
public void eventReceived(Action action, Pod resource) {
switch (action) {
case DELETED:
deleteLatch.countDown();
break;
default:
throw new AssertionFailedError();
if (action != DELETED) {
fail();
}
deleteLatch.countDown();
}

@Override
public void onClose(KubernetesClientException cause) {
assertEquals(410, cause.getCode());
closeLatch.countDown();
}
})) /* autoclose */ {
};
// When
try (Watch watch = client.pods().withName("pod1").withResourceVersion("1").watch(watcher)) {
// Then
assertNotNull(watch);
assertTrue(deleteLatch.await(10, TimeUnit.SECONDS));
assertTrue(closeLatch.await(10, TimeUnit.SECONDS));
}
}

@Test
public void testHttpErrorWithOutdated() {
Assertions.assertThrows(KubernetesClientException.class, () -> {
logger.info("testHttpErrorWithOutdated");
KubernetesClient client = server.getClient().inNamespace("test");
// http error: history outdated
server.expect()
.withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&watch=true")
.andReturn(410, outdatedEvent).once();
final boolean[] onCloseCalled = {false};
try (Watch watch = client.pods().withName("pod1").withResourceVersion("1").watch(new Watcher<Pod>() {
@Override
public void eventReceived(Action action, Pod resource) {
throw new AssertionFailedError();
}

@Override
public void onClose(KubernetesClientException cause) {
onCloseCalled[0] =true;
}
})) {
@DisplayName("TryWithResources, receives error when connecting, should NOT receive events and close before propagating the connect exception")
void testTryWithResourcesCantConnectShouldCloseAndThenThrowException() throws Exception {
// Given
final CountDownLatch closeLatch = new CountDownLatch(1);
server.expect()
.withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&watch=true")
.andReturn(410, outdatedEvent()).once();
final Watcher<Pod> watcher = new Watcher<Pod>() {
@Override
public void eventReceived(Action action, Pod resource) {
fail();
}

@Override
public void onClose(KubernetesClientException cause) {
assertNull("Close event should be invoked by try-with-resources successful completion, not by exception", cause);
closeLatch.countDown();
}
};
final Watchable<Watch, Watcher<Pod>> watchable = client.pods().withName("pod1").withResourceVersion("1");
// When
final KubernetesClientException result = Assertions.assertThrows(KubernetesClientException.class, () -> {
try (Watch watch = watchable.watch(watcher)) {
assertNull(watch);
fail("Close with resources should call Watcher#onClose when closing watch");
}
assertTrue(onCloseCalled[0]);
});
// Then
assertTrue(closeLatch.await(10, TimeUnit.SECONDS));
assertEquals(410, result.getCode());
}

@Test
public void testWithTimeoutSeconds() throws InterruptedException {
void testWithTimeoutSecondsShouldAddQueryParam() throws InterruptedException {
// Given
server.expect()
.withPath("/api/v1/namespaces/test/pods?timeoutSeconds=30&watch=true")
.andUpgradeToWebSocket().open().waitFor(EVENT_WAIT_PERIOD).andEmit(new WatchEvent(pod1, "DELETED")).waitFor(EVENT_WAIT_PERIOD)
.andEmit(outdatedEvent).done().once();
.andUpgradeToWebSocket().open()
.waitFor(EVENT_WAIT_PERIOD_MS).andEmit(new WatchEvent(pod1, "DELETED"))
.waitFor(EVENT_WAIT_PERIOD_MS).andEmit(outdatedEvent()).done().once();

KubernetesClient client = server.getClient();

// When
final CountDownLatch eventRecievedLatch = new CountDownLatch(1);
final CountDownLatch eventReceivedLatch = new CountDownLatch(1);
Watch watch = client.pods().watch(new ListOptionsBuilder().withTimeoutSeconds(30L).build(), new Watcher<Pod>() {
@Override
public void eventReceived(Action action, Pod resource) { eventRecievedLatch.countDown(); }
public void eventReceived(Action action, Pod resource) { eventReceivedLatch.countDown(); }

@Override
public void onClose(KubernetesClientException cause) { }
});

// Then
assertTrue(eventRecievedLatch.await(3, TimeUnit.SECONDS));
assertTrue(eventReceivedLatch.await(3, TimeUnit.SECONDS));
watch.close();
}

/**
* Will attempt a reconnect after 10ms..20ms...40ms....80ms.....160ms......320ms
*/
@Test
public void testHttpErrorReconnect() throws InterruptedException {
logger.info("testHttpErrorReconnect");
String path = "/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&watch=true";
KubernetesClient client = server.getClient().inNamespace("test");
void testHttpErrorReconnect() throws InterruptedException {
// Given
client.getConfiguration().setWatchReconnectInterval(10);
final String path = "/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&watch=true";
// accept watch and disconnect
server.expect().withPath(path).andUpgradeToWebSocket().open().done().once();
// refuse reconnect attempts 6 times
server.expect().withPath(path).andReturn(503, new StatusBuilder().withCode(503).build()).times(6);
// accept next reconnect and send outdated event to stop the watch
server.expect().withPath(path).andUpgradeToWebSocket().open(outdatedEvent).done().once();

server.expect().withPath(path).andUpgradeToWebSocket().open(outdatedEvent()).done().once();
final CountDownLatch closeLatch = new CountDownLatch(1);
try (Watch watch = client.pods().withName("pod1").withResourceVersion("1").watch(new Watcher<Pod>() {
final Watcher<Pod> watcher = new Watcher<Pod>() {
@Override
public void eventReceived(Action action, Pod resource) {
throw new AssertionFailedError();
fail();
}

@Override
public void onClose(KubernetesClientException cause) {
logger.debug("onClose", cause);
assertEquals(410, cause.getCode());
closeLatch.countDown();
}
})) /* autoclose */ {
};
// When
try (Watch watch = client.pods().withName("pod1").withResourceVersion("1").watch(watcher)) {
// Then
assertNotNull(watch);
assertTrue(closeLatch.await(3, TimeUnit.MINUTES));
}
}

@Test
public void testOnCloseEvent() throws InterruptedException {
logger.info("testOnCloseEvent");
void testOnCloseEvent() throws InterruptedException {
final CountDownLatch eventLatch = new CountDownLatch(2);
final CountDownLatch closeLatch = new CountDownLatch(1);
KubernetesClient client = server.getClient().inNamespace("test");

server.expect()
.withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&watch=true")
.andUpgradeToWebSocket().open().waitFor(EVENT_WAIT_PERIOD).andEmit(new WatchEvent(pod1, "MODIFIED")).waitFor(EVENT_WAIT_PERIOD)
.andUpgradeToWebSocket().open().waitFor(EVENT_WAIT_PERIOD_MS).andEmit(new WatchEvent(pod1, "MODIFIED")).waitFor(EVENT_WAIT_PERIOD_MS)
.andEmit(new WatchEvent(pod1, "MODIFIED")).done().once();

Watch watch = client.pods().withName("pod1").withResourceVersion("1").watch(new Watcher<Pod>() {
Expand All @@ -206,11 +222,8 @@ public void onClose(KubernetesClientException cause) {
}

@Test
public void testReconnectsWithLastResourceVersion() throws InterruptedException {
logger.info("testOnCloseEvent");
void testReconnectsWithLastResourceVersion() throws InterruptedException {
final CountDownLatch eventLatch = new CountDownLatch(3);
KubernetesClient client = server.getClient().inNamespace("test");


final Pod pod1initial = new PodBuilder().withNewMetadata().withNamespace("test").withName("pod1")
.withResourceVersion("9").endMetadata().build();
Expand All @@ -224,16 +237,16 @@ public void testReconnectsWithLastResourceVersion() throws InterruptedException
server.expect()
.withPath(path)
.andUpgradeToWebSocket().open()
.waitFor(EVENT_WAIT_PERIOD).andEmit(new WatchEvent(pod1initial, "MODIFIED"))
.waitFor(EVENT_WAIT_PERIOD).andEmit(new WatchEvent(pod1update, "MODIFIED"))
.waitFor(EVENT_WAIT_PERIOD_MS).andEmit(new WatchEvent(pod1initial, "MODIFIED"))
.waitFor(EVENT_WAIT_PERIOD_MS).andEmit(new WatchEvent(pod1update, "MODIFIED"))
.done().once();

final String reconnectPath = "/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=10&watch=true";

server.expect()
.withPath(reconnectPath)
.andUpgradeToWebSocket().open()
.waitFor(EVENT_WAIT_PERIOD).andEmit(new WatchEvent(pod1update, "MODIFIED"))
.waitFor(EVENT_WAIT_PERIOD_MS).andEmit(new WatchEvent(pod1update, "MODIFIED"))
.done().once();


Expand All @@ -252,4 +265,11 @@ public void onClose(KubernetesClientException cause) {
watch.close();
}

private static WatchEvent outdatedEvent() {
return new WatchEventBuilder().withStatusObject(
new StatusBuilder().withCode(HttpURLConnection.HTTP_GONE)
.withMessage(
"410: The event in requested index is outdated and cleared (the requested history has been cleared [3/1]) [2]")
.build()).build();
}
}

0 comments on commit 64bdd32

Please sign in to comment.