Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow Pulsar default WaitStrategy to honour startup timeout #5674

Merged
merged 2 commits into from Sep 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -2,12 +2,8 @@

import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.containers.wait.strategy.WaitAllStrategy;
import org.testcontainers.containers.wait.strategy.WaitStrategy;
import org.testcontainers.utility.DockerImageName;

import java.util.ArrayList;
import java.util.List;

/**
* This container wraps Apache Pulsar running in standalone mode
*/
Expand Down Expand Up @@ -36,6 +32,8 @@ public class PulsarContainer extends GenericContainer<PulsarContainer> {
@Deprecated
private static final String DEFAULT_TAG = "2.10.0";

private final WaitAllStrategy waitAllStrategy = new WaitAllStrategy();

private boolean functionsWorkerEnabled = false;

private boolean transactionsEnabled = false;
Expand All @@ -60,6 +58,7 @@ public PulsarContainer(final DockerImageName dockerImageName) {
super(dockerImageName);
dockerImageName.assertCompatibleWith(DockerImageName.parse("apachepulsar/pulsar"));
withExposedPorts(BROKER_PORT, BROKER_HTTP_PORT);
setWaitStrategy(waitAllStrategy);
}

@Override
Expand Down Expand Up @@ -98,21 +97,18 @@ protected void setupCommandAndEnv() {

final String clusterName = getEnvMap().getOrDefault("PULSAR_PREFIX_clusterName", "standalone");
final String response = String.format("[\"%s\"]", clusterName);

List<WaitStrategy> waitStrategies = new ArrayList<>();
waitStrategies.add(Wait.defaultWaitStrategy());
waitStrategies.add(
waitAllStrategy.withStrategy(
Wait.forHttp(ADMIN_CLUSTERS_ENDPOINT).forPort(BROKER_HTTP_PORT).forResponsePredicate(response::equals)
);

if (transactionsEnabled) {
withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "true");
waitStrategies.add(Wait.forHttp(TRANSACTION_TOPIC_ENDPOINT).forStatusCode(200).forPort(BROKER_HTTP_PORT));
waitAllStrategy.withStrategy(
Wait.forHttp(TRANSACTION_TOPIC_ENDPOINT).forStatusCode(200).forPort(BROKER_HTTP_PORT)
);
}
if (functionsWorkerEnabled) {
waitStrategies.add(Wait.forLogMessage(".*Function worker service started.*", 1));
waitAllStrategy.withStrategy(Wait.forLogMessage(".*Function worker service started.*", 1));
}
final WaitAllStrategy compoundedWaitStrategy = new WaitAllStrategy();
waitStrategies.forEach(compoundedWaitStrategy::withStrategy);
waitingFor(compoundedWaitStrategy);
}
}
Expand Up @@ -12,6 +12,7 @@
import org.junit.Test;
import org.testcontainers.utility.DockerImageName;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -144,6 +145,14 @@ public void testClusterFullyInitialized() throws Exception {
}
}

@Test
public void testStartupTimeoutIsHonored() {
try (PulsarContainer pulsar = new PulsarContainer(PULSAR_IMAGE).withStartupTimeout(Duration.ZERO)) {
kiview marked this conversation as resolved.
Show resolved Hide resolved
assertThatThrownBy(pulsar::start)
.hasRootCauseMessage("Precondition failed: timeout must be greater than zero");
}
}

protected void testPulsarFunctionality(String pulsarBrokerUrl) throws Exception {
try (
PulsarClient client = PulsarClient.builder().serviceUrl(pulsarBrokerUrl).build();
Expand Down