Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract RyukResourceReaper #4959

Merged
merged 3 commits into from Feb 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
172 changes: 4 additions & 168 deletions core/src/main/java/org/testcontainers/utility/ResourceReaper.java
@@ -1,57 +1,35 @@
package org.testcontainers.utility;

import com.github.dockerjava.api.DockerClient;
import com.github.dockerjava.api.async.ResultCallback;
import com.github.dockerjava.api.command.InspectContainerResponse;
import com.github.dockerjava.api.exception.NotFoundException;
import com.github.dockerjava.api.model.Bind;
import com.github.dockerjava.api.model.Container;
import com.github.dockerjava.api.model.ExposedPort;
import com.github.dockerjava.api.model.Frame;
import com.github.dockerjava.api.model.HostConfig;
import com.github.dockerjava.api.model.Network;
import com.github.dockerjava.api.model.PortBinding;
import com.github.dockerjava.api.model.Ports;
import com.github.dockerjava.api.model.PruneType;
import com.github.dockerjava.api.model.Volume;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.collect.Sets;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.rnorth.ducttape.ratelimits.RateLimiter;
import org.rnorth.ducttape.ratelimits.RateLimiterBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.DockerClientFactory;
import org.testcontainers.containers.ContainerState;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.AbstractMap.SimpleEntry;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.awaitility.Awaitility.await;

/**
* Component that responsible for container removal and automatic cleanup of dead containers at JVM shutdown.
Expand All @@ -61,20 +39,14 @@ public final class ResourceReaper {

private static final Logger LOGGER = LoggerFactory.getLogger(ResourceReaper.class);

private static final List<List<Map.Entry<String, String>>> DEATH_NOTE = new ArrayList<>(
static final List<List<Map.Entry<String, String>>> DEATH_NOTE = new ArrayList<>(
Arrays.asList(
DockerClientFactory.DEFAULT_LABELS.entrySet().stream()
.<Map.Entry<String, String>>map(it -> new SimpleEntry<>("label", it.getKey() + "=" + it.getValue()))
.collect(Collectors.toList())
)
);

private static final RateLimiter RYUK_ACK_RATE_LIMITER = RateLimiterBuilder
.newBuilder()
.withRate(4, TimeUnit.SECONDS)
.withConstantThroughput()
.build();

private static ResourceReaper instance;
private static AtomicBoolean ryukStarted = new AtomicBoolean(false);
private final DockerClient dockerClient = DockerClientFactory.lazyClient();
Expand All @@ -98,147 +70,11 @@ public static String start(String hostIpAddress, DockerClient client) {
* @deprecated internal API
*/
@Deprecated
@SneakyThrows(InterruptedException.class)
public static String start(DockerClient client) {
String ryukImage = ImageNameSubstitutor.instance()
.apply(DockerImageName.parse("testcontainers/ryuk:0.3.3"))
.asCanonicalNameString();
DockerClientFactory.instance().checkAndPullImage(client, ryukImage);

List<Bind> binds = new ArrayList<>();
binds.add(new Bind(DockerClientFactory.instance().getRemoteDockerUnixSocketPath(), new Volume("/var/run/docker.sock")));

ExposedPort ryukExposedPort = ExposedPort.tcp(8080);
String ryukContainerId = client.createContainerCmd(ryukImage)
.withHostConfig(
new HostConfig()
.withAutoRemove(true)
.withPortBindings(new PortBinding(Ports.Binding.empty(), ryukExposedPort))
)
.withExposedPorts(ryukExposedPort)
.withName("testcontainers-ryuk-" + DockerClientFactory.SESSION_ID)
.withLabels(Collections.singletonMap(DockerClientFactory.TESTCONTAINERS_LABEL, "true"))
.withBinds(binds)
.withPrivileged(TestcontainersConfiguration.getInstance().isRyukPrivileged())
.exec()
.getId();

client.startContainerCmd(ryukContainerId).exec();

StringBuilder ryukLog = new StringBuilder();

ResultCallback.Adapter<Frame> logCallback = client.logContainerCmd(ryukContainerId)
.withSince(0)
.withFollowStream(true)
.withStdOut(true)
.withStdErr(true)
.exec(new ResultCallback.Adapter<Frame>() {
@Override
public void onNext(Frame frame) {
ryukLog.append(new String(frame.getPayload(), StandardCharsets.UTF_8));
}
});

InspectContainerResponse inspectedContainer;
try {
// inspect container response might initially not contain the mapped port
inspectedContainer = await()
.atMost(5, TimeUnit.SECONDS)
.pollInterval(DynamicPollInterval.ofMillis(50))
.pollInSameThread()
.until(
() -> client.inspectContainerCmd(ryukContainerId).exec(),
inspectContainerResponse -> {
return inspectContainerResponse
.getNetworkSettings()
.getPorts()
.getBindings()
.values()
.stream()
.anyMatch(Objects::nonNull);
}
);
} catch (Exception e) {
log.warn("Ryuk container cannot be inspected and probably had a problem starting. Ryuk's logs:\n{}", ryukLog);
throw new IllegalStateException("Ryuk failed to start", e);
}

ContainerState containerState = new ContainerState() {

@Override
public List<Integer> getExposedPorts() {
return Stream.of(getContainerInfo().getConfig().getExposedPorts())
.map(ExposedPort::getPort)
.collect(Collectors.toList());
}

@Override
public InspectContainerResponse getContainerInfo() {
return inspectedContainer;
}
};

CountDownLatch ryukScheduledLatch = new CountDownLatch(1);

String host = containerState.getHost();
Integer ryukPort = containerState.getFirstMappedPort();
Thread kiraThread = new Thread(
DockerClientFactory.TESTCONTAINERS_THREAD_GROUP,
() -> {
while (true) {
RYUK_ACK_RATE_LIMITER.doWhenReady(() -> {
int index = 0;
// not set the read timeout, as Ryuk would not send anything unless a new filter is submitted, meaning that we would get a timeout exception pretty quick
try (Socket clientSocket = new Socket()) {
clientSocket.connect(new InetSocketAddress(host, ryukPort), 5 * 1000);
FilterRegistry registry = new FilterRegistry(clientSocket.getInputStream(), clientSocket.getOutputStream());

synchronized (DEATH_NOTE) {
while (true) {
if (DEATH_NOTE.size() <= index) {
try {
DEATH_NOTE.wait(1_000);
continue;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
List<Map.Entry<String, String>> filters = DEATH_NOTE.get(index);
boolean isAcknowledged = registry.register(filters);
if (isAcknowledged) {
log.debug("Received 'ACK' from Ryuk");
ryukScheduledLatch.countDown();
index++;
} else {
log.debug("Didn't receive 'ACK' from Ryuk. Will retry to send filters.");
}
}
}
} catch (IOException e) {
log.warn("Can not connect to Ryuk at {}:{}", host, ryukPort, e);
}
});
}
},
"testcontainers-ryuk"
);
kiraThread.setDaemon(true);
kiraThread.start();
try {
// We need to wait before we can start any containers to make sure that we delete them
if (!ryukScheduledLatch.await(TestcontainersConfiguration.getInstance().getRyukTimeout(), TimeUnit.SECONDS)) {
log.error("Timed out waiting for Ryuk container to start. Ryuk's logs:\n{}", ryukLog);
throw new IllegalStateException(String.format("Could not connect to Ryuk at %s:%s", host, ryukPort));
}
} finally {
try {
logCallback.close();
} catch (IOException ignored) {
}
}

RyukResourceReaper ryuk = new RyukResourceReaper(client);
String containerId = ryuk.getContainerId();
ryukStarted.set(true);
return ryukContainerId;
return containerId;
}

public synchronized static ResourceReaper instance() {
Expand Down