diff --git a/core/src/main/java/org/testcontainers/utility/ResourceReaper.java b/core/src/main/java/org/testcontainers/utility/ResourceReaper.java index 8c2cd3a6108..69c849b6c82 100644 --- a/core/src/main/java/org/testcontainers/utility/ResourceReaper.java +++ b/core/src/main/java/org/testcontainers/utility/ResourceReaper.java @@ -1,30 +1,18 @@ package org.testcontainers.utility; import com.github.dockerjava.api.DockerClient; -import com.github.dockerjava.api.async.ResultCallback; import com.github.dockerjava.api.command.InspectContainerResponse; import com.github.dockerjava.api.exception.NotFoundException; -import com.github.dockerjava.api.model.Bind; import com.github.dockerjava.api.model.Container; -import com.github.dockerjava.api.model.ExposedPort; -import com.github.dockerjava.api.model.Frame; -import com.github.dockerjava.api.model.HostConfig; import com.github.dockerjava.api.model.Network; -import com.github.dockerjava.api.model.PortBinding; -import com.github.dockerjava.api.model.Ports; import com.github.dockerjava.api.model.PruneType; -import com.github.dockerjava.api.model.Volume; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Throwables; import com.google.common.collect.Sets; -import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.rnorth.ducttape.ratelimits.RateLimiter; -import org.rnorth.ducttape.ratelimits.RateLimiterBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.DockerClientFactory; -import org.testcontainers.containers.ContainerState; import java.io.BufferedReader; import java.io.IOException; @@ -32,26 +20,16 @@ import java.io.InputStreamReader; import java.io.OutputStream; import java.io.UnsupportedEncodingException; -import java.net.InetSocketAddress; -import java.net.Socket; import java.net.URLEncoder; -import java.nio.charset.StandardCharsets; import java.util.AbstractMap.SimpleEntry; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; -import java.util.stream.Stream; - -import static org.awaitility.Awaitility.await; /** * Component that responsible for container removal and automatic cleanup of dead containers at JVM shutdown. @@ -61,7 +39,7 @@ public final class ResourceReaper { private static final Logger LOGGER = LoggerFactory.getLogger(ResourceReaper.class); - private static final List>> DEATH_NOTE = new ArrayList<>( + static final List>> DEATH_NOTE = new ArrayList<>( Arrays.asList( DockerClientFactory.DEFAULT_LABELS.entrySet().stream() .>map(it -> new SimpleEntry<>("label", it.getKey() + "=" + it.getValue())) @@ -69,12 +47,6 @@ public final class ResourceReaper { ) ); - private static final RateLimiter RYUK_ACK_RATE_LIMITER = RateLimiterBuilder - .newBuilder() - .withRate(4, TimeUnit.SECONDS) - .withConstantThroughput() - .build(); - private static ResourceReaper instance; private static AtomicBoolean ryukStarted = new AtomicBoolean(false); private final DockerClient dockerClient = DockerClientFactory.lazyClient(); @@ -98,147 +70,11 @@ public static String start(String hostIpAddress, DockerClient client) { * @deprecated internal API */ @Deprecated - @SneakyThrows(InterruptedException.class) public static String start(DockerClient client) { - String ryukImage = ImageNameSubstitutor.instance() - .apply(DockerImageName.parse("testcontainers/ryuk:0.3.3")) - .asCanonicalNameString(); - DockerClientFactory.instance().checkAndPullImage(client, ryukImage); - - List binds = new ArrayList<>(); - binds.add(new Bind(DockerClientFactory.instance().getRemoteDockerUnixSocketPath(), new Volume("/var/run/docker.sock"))); - - ExposedPort ryukExposedPort = ExposedPort.tcp(8080); - String ryukContainerId = client.createContainerCmd(ryukImage) - .withHostConfig( - new HostConfig() - .withAutoRemove(true) - .withPortBindings(new PortBinding(Ports.Binding.empty(), ryukExposedPort)) - ) - .withExposedPorts(ryukExposedPort) - .withName("testcontainers-ryuk-" + DockerClientFactory.SESSION_ID) - .withLabels(Collections.singletonMap(DockerClientFactory.TESTCONTAINERS_LABEL, "true")) - .withBinds(binds) - .withPrivileged(TestcontainersConfiguration.getInstance().isRyukPrivileged()) - .exec() - .getId(); - - client.startContainerCmd(ryukContainerId).exec(); - - StringBuilder ryukLog = new StringBuilder(); - - ResultCallback.Adapter logCallback = client.logContainerCmd(ryukContainerId) - .withSince(0) - .withFollowStream(true) - .withStdOut(true) - .withStdErr(true) - .exec(new ResultCallback.Adapter() { - @Override - public void onNext(Frame frame) { - ryukLog.append(new String(frame.getPayload(), StandardCharsets.UTF_8)); - } - }); - - InspectContainerResponse inspectedContainer; - try { - // inspect container response might initially not contain the mapped port - inspectedContainer = await() - .atMost(5, TimeUnit.SECONDS) - .pollInterval(DynamicPollInterval.ofMillis(50)) - .pollInSameThread() - .until( - () -> client.inspectContainerCmd(ryukContainerId).exec(), - inspectContainerResponse -> { - return inspectContainerResponse - .getNetworkSettings() - .getPorts() - .getBindings() - .values() - .stream() - .anyMatch(Objects::nonNull); - } - ); - } catch (Exception e) { - log.warn("Ryuk container cannot be inspected and probably had a problem starting. Ryuk's logs:\n{}", ryukLog); - throw new IllegalStateException("Ryuk failed to start", e); - } - - ContainerState containerState = new ContainerState() { - - @Override - public List getExposedPorts() { - return Stream.of(getContainerInfo().getConfig().getExposedPorts()) - .map(ExposedPort::getPort) - .collect(Collectors.toList()); - } - - @Override - public InspectContainerResponse getContainerInfo() { - return inspectedContainer; - } - }; - - CountDownLatch ryukScheduledLatch = new CountDownLatch(1); - - String host = containerState.getHost(); - Integer ryukPort = containerState.getFirstMappedPort(); - Thread kiraThread = new Thread( - DockerClientFactory.TESTCONTAINERS_THREAD_GROUP, - () -> { - while (true) { - RYUK_ACK_RATE_LIMITER.doWhenReady(() -> { - int index = 0; - // not set the read timeout, as Ryuk would not send anything unless a new filter is submitted, meaning that we would get a timeout exception pretty quick - try (Socket clientSocket = new Socket()) { - clientSocket.connect(new InetSocketAddress(host, ryukPort), 5 * 1000); - FilterRegistry registry = new FilterRegistry(clientSocket.getInputStream(), clientSocket.getOutputStream()); - - synchronized (DEATH_NOTE) { - while (true) { - if (DEATH_NOTE.size() <= index) { - try { - DEATH_NOTE.wait(1_000); - continue; - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - List> filters = DEATH_NOTE.get(index); - boolean isAcknowledged = registry.register(filters); - if (isAcknowledged) { - log.debug("Received 'ACK' from Ryuk"); - ryukScheduledLatch.countDown(); - index++; - } else { - log.debug("Didn't receive 'ACK' from Ryuk. Will retry to send filters."); - } - } - } - } catch (IOException e) { - log.warn("Can not connect to Ryuk at {}:{}", host, ryukPort, e); - } - }); - } - }, - "testcontainers-ryuk" - ); - kiraThread.setDaemon(true); - kiraThread.start(); - try { - // We need to wait before we can start any containers to make sure that we delete them - if (!ryukScheduledLatch.await(TestcontainersConfiguration.getInstance().getRyukTimeout(), TimeUnit.SECONDS)) { - log.error("Timed out waiting for Ryuk container to start. Ryuk's logs:\n{}", ryukLog); - throw new IllegalStateException(String.format("Could not connect to Ryuk at %s:%s", host, ryukPort)); - } - } finally { - try { - logCallback.close(); - } catch (IOException ignored) { - } - } - + RyukResourceReaper ryuk = new RyukResourceReaper(client); + String containerId = ryuk.getContainerId(); ryukStarted.set(true); - return ryukContainerId; + return containerId; } public synchronized static ResourceReaper instance() { diff --git a/core/src/main/java/org/testcontainers/utility/RyukResourceReaper.java b/core/src/main/java/org/testcontainers/utility/RyukResourceReaper.java new file mode 100644 index 00000000000..e8aed92fa9d --- /dev/null +++ b/core/src/main/java/org/testcontainers/utility/RyukResourceReaper.java @@ -0,0 +1,188 @@ +package org.testcontainers.utility; + +import com.github.dockerjava.api.DockerClient; +import com.github.dockerjava.api.async.ResultCallback; +import com.github.dockerjava.api.command.InspectContainerResponse; +import com.github.dockerjava.api.model.Bind; +import com.github.dockerjava.api.model.ExposedPort; +import com.github.dockerjava.api.model.Frame; +import com.github.dockerjava.api.model.HostConfig; +import com.github.dockerjava.api.model.PortBinding; +import com.github.dockerjava.api.model.Ports; +import com.github.dockerjava.api.model.Volume; +import lombok.Getter; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.rnorth.ducttape.ratelimits.RateLimiter; +import org.rnorth.ducttape.ratelimits.RateLimiterBuilder; +import org.testcontainers.DockerClientFactory; +import org.testcontainers.containers.ContainerState; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.awaitility.Awaitility.await; + +@Slf4j +class RyukResourceReaper { + + private static final RateLimiter RYUK_ACK_RATE_LIMITER = RateLimiterBuilder + .newBuilder() + .withRate(4, TimeUnit.SECONDS) + .withConstantThroughput() + .build(); + + @Getter + private final String containerId; + + @SneakyThrows(InterruptedException.class) + RyukResourceReaper(DockerClient client) { + String ryukImage = ImageNameSubstitutor.instance() + .apply(DockerImageName.parse("testcontainers/ryuk:0.3.3")) + .asCanonicalNameString(); + DockerClientFactory.instance().checkAndPullImage(client, ryukImage); + + List binds = new ArrayList<>(); + binds.add(new Bind(DockerClientFactory.instance().getRemoteDockerUnixSocketPath(), new Volume("/var/run/docker.sock"))); + + ExposedPort ryukExposedPort = ExposedPort.tcp(8080); + containerId = client.createContainerCmd(ryukImage) + .withHostConfig( + new HostConfig() + .withAutoRemove(true) + .withPortBindings(new PortBinding(Ports.Binding.empty(), ryukExposedPort)) + ) + .withExposedPorts(ryukExposedPort) + .withName("testcontainers-ryuk-" + DockerClientFactory.SESSION_ID) + .withLabels(Collections.singletonMap(DockerClientFactory.TESTCONTAINERS_LABEL, "true")) + .withBinds(binds) + .withPrivileged(TestcontainersConfiguration.getInstance().isRyukPrivileged()) + .exec() + .getId(); + + client.startContainerCmd(containerId).exec(); + + StringBuilder ryukLog = new StringBuilder(); + + ResultCallback.Adapter logCallback = client.logContainerCmd(containerId) + .withSince(0) + .withFollowStream(true) + .withStdOut(true) + .withStdErr(true) + .exec(new ResultCallback.Adapter() { + @Override + public void onNext(Frame frame) { + ryukLog.append(new String(frame.getPayload(), StandardCharsets.UTF_8)); + } + }); + + InspectContainerResponse inspectedContainer; + try { + // inspect container response might initially not contain the mapped port + inspectedContainer = await() + .atMost(5, TimeUnit.SECONDS) + .pollInterval(DynamicPollInterval.ofMillis(50)) + .pollInSameThread() + .until( + () -> client.inspectContainerCmd(containerId).exec(), + inspectContainerResponse -> { + return inspectContainerResponse + .getNetworkSettings() + .getPorts() + .getBindings() + .values() + .stream() + .anyMatch(Objects::nonNull); + } + ); + } catch (Exception e) { + log.warn("Ryuk container cannot be inspected and probably had a problem starting. Ryuk's logs:\n{}", ryukLog); + throw new IllegalStateException("Ryuk failed to start", e); + } + + ContainerState containerState = new ContainerState() { + + @Override + public List getExposedPorts() { + return Stream.of(getContainerInfo().getConfig().getExposedPorts()) + .map(ExposedPort::getPort) + .collect(Collectors.toList()); + } + + @Override + public InspectContainerResponse getContainerInfo() { + return inspectedContainer; + } + }; + + CountDownLatch ryukScheduledLatch = new CountDownLatch(1); + + String host = containerState.getHost(); + Integer ryukPort = containerState.getFirstMappedPort(); + Thread kiraThread = new Thread( + DockerClientFactory.TESTCONTAINERS_THREAD_GROUP, + () -> { + while (true) { + RYUK_ACK_RATE_LIMITER.doWhenReady(() -> { + int index = 0; + // not set the read timeout, as Ryuk would not send anything unless a new filter is submitted, meaning that we would get a timeout exception pretty quick + try (Socket clientSocket = new Socket()) { + clientSocket.connect(new InetSocketAddress(host, ryukPort), 5 * 1000); + ResourceReaper.FilterRegistry registry = new ResourceReaper.FilterRegistry(clientSocket.getInputStream(), clientSocket.getOutputStream()); + + synchronized (ResourceReaper.DEATH_NOTE) { + while (true) { + if (ResourceReaper.DEATH_NOTE.size() <= index) { + try { + ResourceReaper.DEATH_NOTE.wait(1_000); + continue; + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + List> filters = ResourceReaper.DEATH_NOTE.get(index); + boolean isAcknowledged = registry.register(filters); + if (isAcknowledged) { + log.debug("Received 'ACK' from Ryuk"); + ryukScheduledLatch.countDown(); + index++; + } else { + log.debug("Didn't receive 'ACK' from Ryuk. Will retry to send filters."); + } + } + } + } catch (IOException e) { + log.warn("Can not connect to Ryuk at {}:{}", host, ryukPort, e); + } + }); + } + }, + "testcontainers-ryuk" + ); + kiraThread.setDaemon(true); + kiraThread.start(); + try { + // We need to wait before we can start any containers to make sure that we delete them + if (!ryukScheduledLatch.await(TestcontainersConfiguration.getInstance().getRyukTimeout(), TimeUnit.SECONDS)) { + log.error("Timed out waiting for Ryuk container to start. Ryuk's logs:\n{}", ryukLog); + throw new IllegalStateException(String.format("Could not connect to Ryuk at %s:%s", host, ryukPort)); + } + } finally { + try { + logCallback.close(); + } catch (IOException ignored) { + } + } + } +}