diff --git a/modules/pulsar/src/main/java/org/testcontainers/containers/PulsarContainer.java b/modules/pulsar/src/main/java/org/testcontainers/containers/PulsarContainer.java index 10dc35dc4c8..70f651c0c20 100644 --- a/modules/pulsar/src/main/java/org/testcontainers/containers/PulsarContainer.java +++ b/modules/pulsar/src/main/java/org/testcontainers/containers/PulsarContainer.java @@ -2,11 +2,9 @@ import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.containers.wait.strategy.WaitAllStrategy; -import org.testcontainers.containers.wait.strategy.WaitStrategy; import org.testcontainers.utility.DockerImageName; -import java.util.ArrayList; -import java.util.List; +import java.time.Duration; /** * This container wraps Apache Pulsar running in standalone mode @@ -40,6 +38,8 @@ public class PulsarContainer extends GenericContainer { private boolean transactionsEnabled = false; + private Duration startupTimeout; + /** * @deprecated use {@link PulsarContainer(DockerImageName)} instead */ @@ -78,6 +78,12 @@ public PulsarContainer withTransactions() { return this; } + @Override + public PulsarContainer withStartupTimeout(Duration startupTimeout) { + this.startupTimeout = startupTimeout; + return super.withStartupTimeout(startupTimeout); + } + public String getPulsarBrokerUrl() { return String.format("pulsar://%s:%s", getHost(), getMappedPort(BROKER_PORT)); } @@ -96,23 +102,29 @@ protected void setupCommandAndEnv() { withCommand("/bin/bash", "-c", standaloneBaseCommand); + WaitAllStrategy waitAllStrategy = new WaitAllStrategy(); + final String clusterName = getEnvMap().getOrDefault("PULSAR_PREFIX_clusterName", "standalone"); final String response = String.format("[\"%s\"]", clusterName); - - List waitStrategies = new ArrayList<>(); - waitStrategies.add(Wait.defaultWaitStrategy()); - waitStrategies.add( + waitAllStrategy.withStrategy( Wait.forHttp(ADMIN_CLUSTERS_ENDPOINT).forPort(BROKER_HTTP_PORT).forResponsePredicate(response::equals) ); + if (transactionsEnabled) { withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "true"); - waitStrategies.add(Wait.forHttp(TRANSACTION_TOPIC_ENDPOINT).forStatusCode(200).forPort(BROKER_HTTP_PORT)); + waitAllStrategy.withStrategy( + Wait.forHttp(TRANSACTION_TOPIC_ENDPOINT).forStatusCode(200).forPort(BROKER_HTTP_PORT) + ); } if (functionsWorkerEnabled) { - waitStrategies.add(Wait.forLogMessage(".*Function worker service started.*", 1)); + waitAllStrategy.withStrategy(Wait.forLogMessage(".*Function worker service started.*", 1)); } - final WaitAllStrategy compoundedWaitStrategy = new WaitAllStrategy(); - waitStrategies.forEach(compoundedWaitStrategy::withStrategy); - waitingFor(compoundedWaitStrategy); + + if (startupTimeout != null) { + waitAllStrategy.withStartupTimeout(startupTimeout); + } + + waitAllStrategy.withStrategy(waitStrategy); + waitStrategy = waitAllStrategy; } }