Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport stable/8.0] Only check that the topology reports all replicas #10084

Merged
merged 9 commits into from
Aug 17, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,28 @@
import static io.camunda.zeebe.broker.test.EmbeddedBrokerConfigurator.setGatewayApiPort;
import static io.camunda.zeebe.broker.test.EmbeddedBrokerConfigurator.setGatewayClusterPort;
import static io.camunda.zeebe.broker.test.EmbeddedBrokerConfigurator.setInternalApiPort;
import static io.camunda.zeebe.test.util.TestUtil.waitUntil;

import io.atomix.cluster.AtomixCluster;
import io.camunda.zeebe.broker.Broker;
import io.camunda.zeebe.broker.PartitionListener;
import io.camunda.zeebe.broker.SpringBrokerBridge;
import io.camunda.zeebe.broker.TestLoggers;
import io.camunda.zeebe.broker.clustering.ClusterServices;
import io.camunda.zeebe.broker.system.EmbeddedGatewayService;
import io.camunda.zeebe.broker.system.SystemContext;
import io.camunda.zeebe.broker.system.configuration.BrokerCfg;
import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.engine.state.QueryService;
import io.camunda.zeebe.gateway.impl.broker.BrokerClient;
import io.camunda.zeebe.gateway.impl.broker.cluster.BrokerClusterState;
import io.camunda.zeebe.gateway.impl.broker.cluster.BrokerTopologyManager;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.test.util.TestConfigurationFactory;
import io.camunda.zeebe.test.util.asserts.TopologyAssert;
import io.camunda.zeebe.test.util.record.RecordingExporterTestWatcher;
import io.camunda.zeebe.test.util.socket.SocketUtil;
import io.camunda.zeebe.util.FileUtil;
import io.camunda.zeebe.util.allocation.DirectBufferAllocator;
import io.camunda.zeebe.util.sched.clock.ControlledActorClock;
import io.camunda.zeebe.util.sched.future.ActorFuture;
import io.camunda.zeebe.util.sched.future.CompletableActorFuture;
import io.netty.util.NetUtil;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -51,6 +49,7 @@
import java.util.function.Supplier;
import org.agrona.LangUtil;
import org.assertj.core.util.Files;
import org.awaitility.Awaitility;
import org.junit.rules.ExternalResource;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
Expand Down Expand Up @@ -243,17 +242,24 @@ public void startBroker(final PartitionListener... listeners) {
Thread.currentThread().interrupt();
}

final EmbeddedGatewayService embeddedGatewayService =
broker.getBrokerContext().getEmbeddedGatewayService();
if (embeddedGatewayService != null) {
final BrokerClient brokerClient = embeddedGatewayService.get().getBrokerClient();

waitUntil(
() -> {
final BrokerTopologyManager topologyManager = brokerClient.getTopologyManager();
final BrokerClusterState topology = topologyManager.getTopology();
return topology != null && topology.getLeaderForPartition(1) >= 0;
});
if (brokerCfg.getGateway().isEnable()) {
try (final var client =
ZeebeClient.newClientBuilder()
.gatewayAddress(NetUtil.toSocketAddressString(getGatewayAddress()))
.usePlaintext()
.build()) {
Awaitility.await("until we have a complete topology")
.ignoreExceptions()
.untilAsserted(
() -> {
final var topology = client.newTopologyRequest().send().join();
TopologyAssert.assertThat(topology)
.isComplete(
brokerCfg.getCluster().getClusterSize(),
brokerCfg.getCluster().getPartitionsCount(),
brokerCfg.getCluster().getReplicationFactor());
});
}
}

dataDirectory = broker.getSystemContext().getBrokerConfiguration().getData().getDirectory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import io.camunda.zeebe.snapshots.SnapshotId;
import io.camunda.zeebe.snapshots.impl.FileBasedSnapshotMetadata;
import io.camunda.zeebe.test.util.AutoCloseableRule;
import io.camunda.zeebe.test.util.asserts.TopologyAssert;
import io.camunda.zeebe.test.util.record.RecordingExporterTestWatcher;
import io.camunda.zeebe.test.util.socket.SocketUtil;
import io.camunda.zeebe.util.exception.UncheckedExecutionException;
Expand All @@ -77,7 +78,6 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
Expand All @@ -86,7 +86,6 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.agrona.LangUtil;
Expand Down Expand Up @@ -237,9 +236,8 @@ protected void before() throws IOException {

try {
waitUntilBrokersStarted();
waitForPartitionReplicationFactor();
LOG.info("Full replication factor");
waitUntilBrokersInTopology();
waitForTopology(
assertion -> assertion.isComplete(clusterSize, partitionCount, replicationFactor));
LOG.info("All brokers in topology {}", getTopologyFromClient());

} catch (final Exception e) {
Expand Down Expand Up @@ -297,11 +295,7 @@ private Broker createBroker(final int nodeId) {
getSpringBrokerBridge(nodeId),
Collections.singletonList(new LeaderListener(partitionLatch, nodeId)));

new Thread(
() -> {
broker.start();
})
.start();
CompletableFuture.runAsync(broker::start);
return broker;
}

Expand Down Expand Up @@ -427,22 +421,6 @@ private ZeebeClient createClient() {
return client;
}

private void waitUntilBrokersInTopology() {

final Set<InetSocketAddress> addresses =
brokers.values().stream()
.map(Broker::getConfig)
.map(b -> b.getNetwork().getCommandApi().getAddress())
.collect(Collectors.toSet());

waitForTopology(
topology ->
topology.stream()
.map(b -> new InetSocketAddress(b.getHost(), b.getPort()))
.collect(Collectors.toSet())
.containsAll(addresses));
}

public Topology getTopologyFromClient() {
return Awaitility.await()
.pollInterval(Duration.ofMillis(100))
Expand Down Expand Up @@ -475,33 +453,6 @@ private Optional<BrokerInfo> extractPartitionLeader(
.findFirst();
}

/** Wait for a partition bootstrap in the cluster. */
public void waitForPartitionReplicationFactor() {
waitForTopology(
topology ->
hasPartitionsWithReplicationFactor(topology, partitionCount, replicationFactor));
}

private boolean hasPartitionsWithReplicationFactor(
final List<BrokerInfo> brokers, final int partitionCount, final int replicationFactor) {
final AtomicLong leaders = new AtomicLong();
final AtomicLong followers = new AtomicLong();

brokers.stream()
.flatMap(b -> b.getPartitions().stream())
.forEach(
p -> {
if (p.isLeader()) {
leaders.getAndIncrement();
} else {
followers.getAndIncrement();
}
});

return leaders.get() >= partitionCount
&& followers.get() >= partitionCount * (replicationFactor - 1);
}

/**
* Restarts broker, if the broker is still running it will be closed before.
*
Expand All @@ -513,53 +464,33 @@ public void restartBroker(final int nodeId) {
}

public void startBroker(final int nodeId) {
final Broker broker = getBroker(nodeId).start().join();
final InetSocketAddress commandApi =
broker.getConfig().getNetwork().getCommandApi().getAddress();
waitUntilBrokerIsAddedToTopology(commandApi);
waitForPartitionReplicationFactor();
final var broker = Objects.requireNonNull(getBroker(nodeId), "must get existing broker");
//noinspection resource
broker.start().join();

waitForTopology(
assertion ->
assertion
.containsBroker(nodeId)
.hasLeaderForEachPartition(partitionCount)
.hasExpectedReplicasCount(partitionCount, replicationFactor));
}

public void restartCluster() {
final var brokers =
getBrokers().stream()
.map(b -> b.getConfig().getCluster().getNodeId())
.collect(Collectors.toList());
getBrokers().stream().map(b -> b.getConfig().getCluster().getNodeId()).toList();
brokers.forEach(this::stopBroker);
brokers.forEach(this::getBroker);
try {
waitUntilBrokersStarted();
waitForPartitionReplicationFactor();
waitUntilBrokersInTopology();
waitForTopology(
assertion -> assertion.isComplete(clusterSize, partitionCount, replicationFactor));
} catch (final Exception e) {
LOG.error("Failed to restart cluster", e);
Assert.fail("Failed to restart cluster");
}
}

private void waitUntilBrokerIsAddedToTopology(final InetSocketAddress socketAddress) {
waitForTopology(
topology ->
topology.stream()
.anyMatch(
b ->
b.getHost().equals(socketAddress.getHostName())
&& b.getPort() == socketAddress.getPort()));
}

/** Returns for a given broker the leading partition id's. */
public List<Integer> getBrokersLeadingPartitions(final InetSocketAddress socketAddress) {
return client.newTopologyRequest().send().join().getBrokers().stream()
.filter(
b ->
b.getHost().equals(socketAddress.getHostName())
&& b.getPort() == socketAddress.getPort())
.flatMap(broker -> broker.getPartitions().stream())
.filter(PartitionInfo::isLeader)
.map(PartitionInfo::getPartitionId)
.collect(Collectors.toList());
}

/** Returns the list of available brokers in a cluster. */
public List<InetSocketAddress> getBrokersInCluster() {
return client.newTopologyRequest().send().join().getBrokers().stream()
Expand Down Expand Up @@ -591,10 +522,6 @@ public long getPartitionLeaderCount() {
.count();
}

public void stepDown(final int nodeId, final int partitionId) {
stepDown(getBroker(nodeId), partitionId);
}

public BrokerInfo awaitOtherLeader(final int partitionId, final int previousLeader) {
return Awaitility.await()
.pollInterval(Duration.ofMillis(100))
Expand Down Expand Up @@ -637,21 +564,18 @@ public void connect(final Broker broker) {
public void stopBrokerAndAwaitNewLeader(final int nodeId) {
final Broker broker = brokers.get(nodeId);
if (broker != null) {
final InetSocketAddress socketAddress =
broker.getConfig().getNetwork().getCommandApi().getAddress();
final List<Integer> brokersLeadingPartitions = getBrokersLeadingPartitions(socketAddress);
stopBroker(nodeId);
waitForNewLeaderOfPartitions(brokersLeadingPartitions, socketAddress);
waitForTopology(
assertion ->
assertion.doesNotContainBroker(nodeId).hasLeaderForEachPartition(partitionCount));
}
}

public void stopBroker(final int nodeId) {
final Broker broker = brokers.remove(nodeId);
if (broker != null) {
final InetSocketAddress socketAddress =
broker.getConfig().getNetwork().getCommandApi().getAddress();
broker.close();
waitUntilBrokerIsRemovedFromTopology(socketAddress);
waitForTopology(assertion -> assertion.doesNotContainBroker(nodeId));
try {
final var systemContext = systemContexts.remove(nodeId);
if (systemContext != null) {
Expand Down Expand Up @@ -686,38 +610,16 @@ public void forceClusterToHaveNewLeader(final int expectedLeader) {
awaitOtherLeader(START_PARTITION_ID, previousLeader.getNodeId());
}

private void waitUntilBrokerIsRemovedFromTopology(final InetSocketAddress socketAddress) {
waitForTopology(
topology ->
topology.stream()
.noneMatch(
b ->
b.getHost().equals(socketAddress.getHostName())
&& b.getPort() == socketAddress.getPort()));
}

private void waitForNewLeaderOfPartitions(
final List<Integer> partitions, final InetSocketAddress oldLeader) {
waitForTopology(
topology ->
topology.stream()
.filter(
b ->
!(b.getHost().equals(oldLeader.getHostName())
&& b.getPort() == oldLeader.getPort()))
.flatMap(broker -> broker.getPartitions().stream())
.filter(PartitionInfo::isLeader)
.map(PartitionInfo::getPartitionId)
.collect(Collectors.toSet())
.containsAll(partitions));
}

public void waitForTopology(final Predicate<List<BrokerInfo>> topologyPredicate) {
Awaitility.await()
public void waitForTopology(final Consumer<TopologyAssert> assertions) {
Awaitility.await("until topology satisfies a given condition")
.pollInterval(Duration.ofMillis(100))
.atMost(Duration.ofSeconds(60))
.ignoreExceptions()
.until(() -> getTopologyFromClient().getBrokers(), topologyPredicate);
.untilAsserted(
() -> {
final var topology = getTopologyFromClient();
assertions.accept(TopologyAssert.assertThat(topology));
});
}

public long createProcessInstanceOnPartition(final int partitionId, final String bpmnProcessId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,10 @@ private void awaitTopologyIsComplete() {
.untilAsserted(
() ->
TopologyAssert.assertThat(clusterRule.getTopologyFromClient())
.isComplete(clusterRule.getClusterSize(), clusterRule.getPartitionCount()));
.isComplete(
clusterRule.getClusterSize(),
clusterRule.getPartitionCount(),
clusterRule.getReplicationFactor()));
}

private void awaitBrokerIsRemovedFromTopology(final int nodeId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.agrona.CloseHelper;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -107,9 +106,11 @@ void shouldCommunicateOverProxy() {
.map(ZeebeNode::getInternalHost)
.map(host -> toxiproxy.getProxy(host, ZeebePort.COMMAND.getPort()))
.map(ContainerProxy::getOriginalProxyPort)
.collect(Collectors.toList());
.toList();
TopologyAssert.assertThat(topology)
.isComplete(3, 1)
.hasClusterSize(3)
.hasExpectedReplicasCount(1, 3)
.hasLeaderForEachPartition(1)
.hasBrokerSatisfying(
b ->
assertThat(b.getAddress())
Expand Down