Skip to content

Commit

Permalink
Allow Pulsar default WaitStrategy to honour startup timeout
Browse files Browse the repository at this point in the history
## Motivation

Currently, the PulsarContainer WaitStrategy is configured after the user
has started the container. This means that any modifications that the
user makes to the WaitStrategy or startupTimeout are discarded.

## Modifications

This change honours the user's modifications to the WaitStrategy or
startupTimeout.
  • Loading branch information
Dave Maughan committed Aug 3, 2022
1 parent 7c3c00e commit 645797d
Showing 1 changed file with 24 additions and 12 deletions.
Expand Up @@ -2,11 +2,9 @@

import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.containers.wait.strategy.WaitAllStrategy;
import org.testcontainers.containers.wait.strategy.WaitStrategy;
import org.testcontainers.utility.DockerImageName;

import java.util.ArrayList;
import java.util.List;
import java.time.Duration;

/**
* This container wraps Apache Pulsar running in standalone mode
Expand Down Expand Up @@ -40,6 +38,8 @@ public class PulsarContainer extends GenericContainer<PulsarContainer> {

private boolean transactionsEnabled = false;

private Duration startupTimeout;

/**
* @deprecated use {@link PulsarContainer(DockerImageName)} instead
*/
Expand Down Expand Up @@ -78,6 +78,12 @@ public PulsarContainer withTransactions() {
return this;
}

@Override
public PulsarContainer withStartupTimeout(Duration startupTimeout) {
this.startupTimeout = startupTimeout;
return super.withStartupTimeout(startupTimeout);
}

public String getPulsarBrokerUrl() {
return String.format("pulsar://%s:%s", getHost(), getMappedPort(BROKER_PORT));
}
Expand All @@ -96,23 +102,29 @@ protected void setupCommandAndEnv() {

withCommand("/bin/bash", "-c", standaloneBaseCommand);

WaitAllStrategy waitAllStrategy = new WaitAllStrategy();

final String clusterName = getEnvMap().getOrDefault("PULSAR_PREFIX_clusterName", "standalone");
final String response = String.format("[\"%s\"]", clusterName);

List<WaitStrategy> waitStrategies = new ArrayList<>();
waitStrategies.add(Wait.defaultWaitStrategy());
waitStrategies.add(
waitAllStrategy.withStrategy(
Wait.forHttp(ADMIN_CLUSTERS_ENDPOINT).forPort(BROKER_HTTP_PORT).forResponsePredicate(response::equals)
);

if (transactionsEnabled) {
withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "true");
waitStrategies.add(Wait.forHttp(TRANSACTION_TOPIC_ENDPOINT).forStatusCode(200).forPort(BROKER_HTTP_PORT));
waitAllStrategy.withStrategy(
Wait.forHttp(TRANSACTION_TOPIC_ENDPOINT).forStatusCode(200).forPort(BROKER_HTTP_PORT)
);
}
if (functionsWorkerEnabled) {
waitStrategies.add(Wait.forLogMessage(".*Function worker service started.*", 1));
waitAllStrategy.withStrategy(Wait.forLogMessage(".*Function worker service started.*", 1));
}
final WaitAllStrategy compoundedWaitStrategy = new WaitAllStrategy();
waitStrategies.forEach(compoundedWaitStrategy::withStrategy);
waitingFor(compoundedWaitStrategy);

if (startupTimeout != null) {
waitAllStrategy.withStartupTimeout(startupTimeout);
}

waitAllStrategy.withStrategy(waitStrategy);
waitStrategy = waitAllStrategy;
}
}

0 comments on commit 645797d

Please sign in to comment.