Skip to content

Commit

Permalink
Allow Pulsar default WaitStrategy to honour startup timeout
Browse files Browse the repository at this point in the history
## Motivation

Currently, the PulsarContainer WaitStrategy is configured after the user
has started the container. This means that any modifications that the
user makes to the WaitStrategy or startupTimeout are discarded.

## Modifications

This change honours the user's modifications to the WaitStrategy or
startupTimeout.
  • Loading branch information
Dave Maughan committed Sep 14, 2022
1 parent 6e46662 commit 5a13395
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 12 deletions.
Expand Up @@ -2,11 +2,9 @@

import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.containers.wait.strategy.WaitAllStrategy;
import org.testcontainers.containers.wait.strategy.WaitStrategy;
import org.testcontainers.utility.DockerImageName;

import java.util.ArrayList;
import java.util.List;
import java.time.Duration;

/**
* This container wraps Apache Pulsar running in standalone mode
Expand Down Expand Up @@ -40,6 +38,8 @@ public class PulsarContainer extends GenericContainer<PulsarContainer> {

private boolean transactionsEnabled = false;

private Duration startupTimeout;

/**
* @deprecated use {@link PulsarContainer(DockerImageName)} instead
*/
Expand Down Expand Up @@ -78,6 +78,12 @@ public PulsarContainer withTransactions() {
return this;
}

@Override
public PulsarContainer withStartupTimeout(Duration startupTimeout) {
this.startupTimeout = startupTimeout;
return super.withStartupTimeout(startupTimeout);
}

public String getPulsarBrokerUrl() {
return String.format("pulsar://%s:%s", getHost(), getMappedPort(BROKER_PORT));
}
Expand All @@ -96,23 +102,29 @@ protected void setupCommandAndEnv() {

withCommand("/bin/bash", "-c", standaloneBaseCommand);

WaitAllStrategy waitAllStrategy = new WaitAllStrategy();

final String clusterName = getEnvMap().getOrDefault("PULSAR_PREFIX_clusterName", "standalone");
final String response = String.format("[\"%s\"]", clusterName);

List<WaitStrategy> waitStrategies = new ArrayList<>();
waitStrategies.add(Wait.defaultWaitStrategy());
waitStrategies.add(
waitAllStrategy.withStrategy(
Wait.forHttp(ADMIN_CLUSTERS_ENDPOINT).forPort(BROKER_HTTP_PORT).forResponsePredicate(response::equals)
);

if (transactionsEnabled) {
withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "true");
waitStrategies.add(Wait.forHttp(TRANSACTION_TOPIC_ENDPOINT).forStatusCode(200).forPort(BROKER_HTTP_PORT));
waitAllStrategy.withStrategy(
Wait.forHttp(TRANSACTION_TOPIC_ENDPOINT).forStatusCode(200).forPort(BROKER_HTTP_PORT)
);
}
if (functionsWorkerEnabled) {
waitStrategies.add(Wait.forLogMessage(".*Function worker service started.*", 1));
waitAllStrategy.withStrategy(Wait.forLogMessage(".*Function worker service started.*", 1));
}
final WaitAllStrategy compoundedWaitStrategy = new WaitAllStrategy();
waitStrategies.forEach(compoundedWaitStrategy::withStrategy);
waitingFor(compoundedWaitStrategy);

if (startupTimeout != null) {
waitAllStrategy.withStartupTimeout(startupTimeout);
}

waitAllStrategy.withStrategy(waitStrategy);
waitStrategy = waitAllStrategy;
}
}
@@ -1,5 +1,6 @@
package org.testcontainers.containers;

import java.time.Duration;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
Expand Down Expand Up @@ -144,6 +145,13 @@ public void testClusterFullyInitialized() throws Exception {
}
}

@Test
public void testStartupTimeoutIsHonored() {
try (PulsarContainer pulsar = new PulsarContainer(PULSAR_IMAGE).withStartupTimeout(Duration.ZERO)) {
assertThatThrownBy(pulsar::start).hasRootCauseMessage("Precondition failed: timeout must be greater than zero");
}
}

protected void testPulsarFunctionality(String pulsarBrokerUrl) throws Exception {
try (
PulsarClient client = PulsarClient.builder().serviceUrl(pulsarBrokerUrl).build();
Expand Down

0 comments on commit 5a13395

Please sign in to comment.