Skip to content

Commit

Permalink
merge: #10084
Browse files Browse the repository at this point in the history
10084: [Backport stable/8.0] Only check that the topology reports all replicas r=npepinpe a=npepinpe

## Description

This PR backports #10082 to stable/8.0. At the same time, I backported two other flaky test fixes, since one included the `TopologyAssert` helpers used here (and possibly in further flaky test fixes).

There were no conflicts by the way.

## Related issues

backports #10082, #9596, #9502



Co-authored-by: Nicolas Pepin-Perreault <nicolas.pepin-perreault@camunda.com>
Co-authored-by: Ole Schönburg <ole.schoenburg@gmail.com>
  • Loading branch information
3 people committed Aug 17, 2022
2 parents 2167d8c + 0da5eff commit 9531912
Show file tree
Hide file tree
Showing 11 changed files with 676 additions and 282 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,28 @@
import static io.camunda.zeebe.broker.test.EmbeddedBrokerConfigurator.setGatewayApiPort;
import static io.camunda.zeebe.broker.test.EmbeddedBrokerConfigurator.setGatewayClusterPort;
import static io.camunda.zeebe.broker.test.EmbeddedBrokerConfigurator.setInternalApiPort;
import static io.camunda.zeebe.test.util.TestUtil.waitUntil;

import io.atomix.cluster.AtomixCluster;
import io.camunda.zeebe.broker.Broker;
import io.camunda.zeebe.broker.PartitionListener;
import io.camunda.zeebe.broker.SpringBrokerBridge;
import io.camunda.zeebe.broker.TestLoggers;
import io.camunda.zeebe.broker.clustering.ClusterServices;
import io.camunda.zeebe.broker.system.EmbeddedGatewayService;
import io.camunda.zeebe.broker.system.SystemContext;
import io.camunda.zeebe.broker.system.configuration.BrokerCfg;
import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.engine.state.QueryService;
import io.camunda.zeebe.gateway.impl.broker.BrokerClient;
import io.camunda.zeebe.gateway.impl.broker.cluster.BrokerClusterState;
import io.camunda.zeebe.gateway.impl.broker.cluster.BrokerTopologyManager;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.test.util.TestConfigurationFactory;
import io.camunda.zeebe.test.util.asserts.TopologyAssert;
import io.camunda.zeebe.test.util.record.RecordingExporterTestWatcher;
import io.camunda.zeebe.test.util.socket.SocketUtil;
import io.camunda.zeebe.util.FileUtil;
import io.camunda.zeebe.util.allocation.DirectBufferAllocator;
import io.camunda.zeebe.util.sched.clock.ControlledActorClock;
import io.camunda.zeebe.util.sched.future.ActorFuture;
import io.camunda.zeebe.util.sched.future.CompletableActorFuture;
import io.netty.util.NetUtil;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -51,6 +49,7 @@
import java.util.function.Supplier;
import org.agrona.LangUtil;
import org.assertj.core.util.Files;
import org.awaitility.Awaitility;
import org.junit.rules.ExternalResource;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
Expand Down Expand Up @@ -243,17 +242,24 @@ public void startBroker(final PartitionListener... listeners) {
Thread.currentThread().interrupt();
}

final EmbeddedGatewayService embeddedGatewayService =
broker.getBrokerContext().getEmbeddedGatewayService();
if (embeddedGatewayService != null) {
final BrokerClient brokerClient = embeddedGatewayService.get().getBrokerClient();

waitUntil(
() -> {
final BrokerTopologyManager topologyManager = brokerClient.getTopologyManager();
final BrokerClusterState topology = topologyManager.getTopology();
return topology != null && topology.getLeaderForPartition(1) >= 0;
});
if (brokerCfg.getGateway().isEnable()) {
try (final var client =
ZeebeClient.newClientBuilder()
.gatewayAddress(NetUtil.toSocketAddressString(getGatewayAddress()))
.usePlaintext()
.build()) {
Awaitility.await("until we have a complete topology")
.ignoreExceptions()
.untilAsserted(
() -> {
final var topology = client.newTopologyRequest().send().join();
TopologyAssert.assertThat(topology)
.isComplete(
brokerCfg.getCluster().getClusterSize(),
brokerCfg.getCluster().getPartitionsCount(),
brokerCfg.getCluster().getReplicationFactor());
});
}
}

dataDirectory = broker.getSystemContext().getBrokerConfiguration().getData().getDirectory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import io.camunda.zeebe.snapshots.SnapshotId;
import io.camunda.zeebe.snapshots.impl.FileBasedSnapshotMetadata;
import io.camunda.zeebe.test.util.AutoCloseableRule;
import io.camunda.zeebe.test.util.asserts.TopologyAssert;
import io.camunda.zeebe.test.util.record.RecordingExporterTestWatcher;
import io.camunda.zeebe.test.util.socket.SocketUtil;
import io.camunda.zeebe.util.exception.UncheckedExecutionException;
Expand All @@ -77,7 +78,6 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
Expand All @@ -86,7 +86,6 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.agrona.LangUtil;
Expand Down Expand Up @@ -237,9 +236,8 @@ protected void before() throws IOException {

try {
waitUntilBrokersStarted();
waitForPartitionReplicationFactor();
LOG.info("Full replication factor");
waitUntilBrokersInTopology();
waitForTopology(
assertion -> assertion.isComplete(clusterSize, partitionCount, replicationFactor));
LOG.info("All brokers in topology {}", getTopologyFromClient());

} catch (final Exception e) {
Expand Down Expand Up @@ -297,11 +295,7 @@ private Broker createBroker(final int nodeId) {
getSpringBrokerBridge(nodeId),
Collections.singletonList(new LeaderListener(partitionLatch, nodeId)));

new Thread(
() -> {
broker.start();
})
.start();
CompletableFuture.runAsync(broker::start);
return broker;
}

Expand Down Expand Up @@ -427,22 +421,6 @@ private ZeebeClient createClient() {
return client;
}

private void waitUntilBrokersInTopology() {

final Set<InetSocketAddress> addresses =
brokers.values().stream()
.map(Broker::getConfig)
.map(b -> b.getNetwork().getCommandApi().getAddress())
.collect(Collectors.toSet());

waitForTopology(
topology ->
topology.stream()
.map(b -> new InetSocketAddress(b.getHost(), b.getPort()))
.collect(Collectors.toSet())
.containsAll(addresses));
}

public Topology getTopologyFromClient() {
return Awaitility.await()
.pollInterval(Duration.ofMillis(100))
Expand Down Expand Up @@ -475,33 +453,6 @@ private Optional<BrokerInfo> extractPartitionLeader(
.findFirst();
}

/** Wait for a partition bootstrap in the cluster. */
public void waitForPartitionReplicationFactor() {
waitForTopology(
topology ->
hasPartitionsWithReplicationFactor(topology, partitionCount, replicationFactor));
}

private boolean hasPartitionsWithReplicationFactor(
final List<BrokerInfo> brokers, final int partitionCount, final int replicationFactor) {
final AtomicLong leaders = new AtomicLong();
final AtomicLong followers = new AtomicLong();

brokers.stream()
.flatMap(b -> b.getPartitions().stream())
.forEach(
p -> {
if (p.isLeader()) {
leaders.getAndIncrement();
} else {
followers.getAndIncrement();
}
});

return leaders.get() >= partitionCount
&& followers.get() >= partitionCount * (replicationFactor - 1);
}

/**
* Restarts broker, if the broker is still running it will be closed before.
*
Expand All @@ -513,53 +464,33 @@ public void restartBroker(final int nodeId) {
}

public void startBroker(final int nodeId) {
final Broker broker = getBroker(nodeId).start().join();
final InetSocketAddress commandApi =
broker.getConfig().getNetwork().getCommandApi().getAddress();
waitUntilBrokerIsAddedToTopology(commandApi);
waitForPartitionReplicationFactor();
final var broker = Objects.requireNonNull(getBroker(nodeId), "must get existing broker");
//noinspection resource
broker.start().join();

waitForTopology(
assertion ->
assertion
.containsBroker(nodeId)
.hasLeaderForEachPartition(partitionCount)
.hasExpectedReplicasCount(partitionCount, replicationFactor));
}

public void restartCluster() {
final var brokers =
getBrokers().stream()
.map(b -> b.getConfig().getCluster().getNodeId())
.collect(Collectors.toList());
getBrokers().stream().map(b -> b.getConfig().getCluster().getNodeId()).toList();
brokers.forEach(this::stopBroker);
brokers.forEach(this::getBroker);
try {
waitUntilBrokersStarted();
waitForPartitionReplicationFactor();
waitUntilBrokersInTopology();
waitForTopology(
assertion -> assertion.isComplete(clusterSize, partitionCount, replicationFactor));
} catch (final Exception e) {
LOG.error("Failed to restart cluster", e);
Assert.fail("Failed to restart cluster");
}
}

private void waitUntilBrokerIsAddedToTopology(final InetSocketAddress socketAddress) {
waitForTopology(
topology ->
topology.stream()
.anyMatch(
b ->
b.getHost().equals(socketAddress.getHostName())
&& b.getPort() == socketAddress.getPort()));
}

/** Returns for a given broker the leading partition id's. */
public List<Integer> getBrokersLeadingPartitions(final InetSocketAddress socketAddress) {
return client.newTopologyRequest().send().join().getBrokers().stream()
.filter(
b ->
b.getHost().equals(socketAddress.getHostName())
&& b.getPort() == socketAddress.getPort())
.flatMap(broker -> broker.getPartitions().stream())
.filter(PartitionInfo::isLeader)
.map(PartitionInfo::getPartitionId)
.collect(Collectors.toList());
}

/** Returns the list of available brokers in a cluster. */
public List<InetSocketAddress> getBrokersInCluster() {
return client.newTopologyRequest().send().join().getBrokers().stream()
Expand Down Expand Up @@ -591,10 +522,6 @@ public long getPartitionLeaderCount() {
.count();
}

public void stepDown(final int nodeId, final int partitionId) {
stepDown(getBroker(nodeId), partitionId);
}

public BrokerInfo awaitOtherLeader(final int partitionId, final int previousLeader) {
return Awaitility.await()
.pollInterval(Duration.ofMillis(100))
Expand Down Expand Up @@ -637,21 +564,18 @@ public void connect(final Broker broker) {
public void stopBrokerAndAwaitNewLeader(final int nodeId) {
final Broker broker = brokers.get(nodeId);
if (broker != null) {
final InetSocketAddress socketAddress =
broker.getConfig().getNetwork().getCommandApi().getAddress();
final List<Integer> brokersLeadingPartitions = getBrokersLeadingPartitions(socketAddress);
stopBroker(nodeId);
waitForNewLeaderOfPartitions(brokersLeadingPartitions, socketAddress);
waitForTopology(
assertion ->
assertion.doesNotContainBroker(nodeId).hasLeaderForEachPartition(partitionCount));
}
}

public void stopBroker(final int nodeId) {
final Broker broker = brokers.remove(nodeId);
if (broker != null) {
final InetSocketAddress socketAddress =
broker.getConfig().getNetwork().getCommandApi().getAddress();
broker.close();
waitUntilBrokerIsRemovedFromTopology(socketAddress);
waitForTopology(assertion -> assertion.doesNotContainBroker(nodeId));
try {
final var systemContext = systemContexts.remove(nodeId);
if (systemContext != null) {
Expand Down Expand Up @@ -686,38 +610,16 @@ public void forceClusterToHaveNewLeader(final int expectedLeader) {
awaitOtherLeader(START_PARTITION_ID, previousLeader.getNodeId());
}

private void waitUntilBrokerIsRemovedFromTopology(final InetSocketAddress socketAddress) {
waitForTopology(
topology ->
topology.stream()
.noneMatch(
b ->
b.getHost().equals(socketAddress.getHostName())
&& b.getPort() == socketAddress.getPort()));
}

private void waitForNewLeaderOfPartitions(
final List<Integer> partitions, final InetSocketAddress oldLeader) {
waitForTopology(
topology ->
topology.stream()
.filter(
b ->
!(b.getHost().equals(oldLeader.getHostName())
&& b.getPort() == oldLeader.getPort()))
.flatMap(broker -> broker.getPartitions().stream())
.filter(PartitionInfo::isLeader)
.map(PartitionInfo::getPartitionId)
.collect(Collectors.toSet())
.containsAll(partitions));
}

public void waitForTopology(final Predicate<List<BrokerInfo>> topologyPredicate) {
Awaitility.await()
public void waitForTopology(final Consumer<TopologyAssert> assertions) {
Awaitility.await("until topology satisfies a given condition")
.pollInterval(Duration.ofMillis(100))
.atMost(Duration.ofSeconds(60))
.ignoreExceptions()
.until(() -> getTopologyFromClient().getBrokers(), topologyPredicate);
.untilAsserted(
() -> {
final var topology = getTopologyFromClient();
assertions.accept(TopologyAssert.assertThat(topology));
});
}

public long createProcessInstanceOnPartition(final int partitionId, final String bpmnProcessId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,10 @@ private void awaitTopologyIsComplete() {
.untilAsserted(
() ->
TopologyAssert.assertThat(clusterRule.getTopologyFromClient())
.isComplete(clusterRule.getClusterSize(), clusterRule.getPartitionCount()));
.isComplete(
clusterRule.getClusterSize(),
clusterRule.getPartitionCount(),
clusterRule.getReplicationFactor()));
}

private void awaitBrokerIsRemovedFromTopology(final int nodeId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.agrona.CloseHelper;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -107,9 +106,11 @@ void shouldCommunicateOverProxy() {
.map(ZeebeNode::getInternalHost)
.map(host -> toxiproxy.getProxy(host, ZeebePort.COMMAND.getPort()))
.map(ContainerProxy::getOriginalProxyPort)
.collect(Collectors.toList());
.toList();
TopologyAssert.assertThat(topology)
.isComplete(3, 1)
.hasClusterSize(3)
.hasExpectedReplicasCount(1, 3)
.hasLeaderForEachPartition(1)
.hasBrokerSatisfying(
b ->
assertThat(b.getAddress())
Expand Down

0 comments on commit 9531912

Please sign in to comment.