Skip to content

Commit

Permalink
merge: #10354
Browse files Browse the repository at this point in the history
10354: Allow AtomixCluster to be autowired everywhere r=npepinpe a=npepinpe

## Description

This PR extracts building the `AtomixCluster` from the broker into the `dist` module. Unfortunately, because many of our testing utilities break all kinds of abstraction, and our tests still heavily rely on modifying the cluster via the `BrokerCfg`, mapping the `BrokerCfg` to the `AtomixCluster` is still done in the `broker` module. However, everything is prepared to eventually move this out to the dist module itself, such that our configuration would simply instantiate the Atomix `ClusterConfig` type, and later inject a `AtomixCluster` in the broker. See the `StandaloneGateway` as an example; there is plenty of opportunity to DRY it up once both are simply configuring `ClusterConfig` directly, and it will remove intermediate layers of builders/config.

In order to achieve that at the moment, since the network configuration relies on being initialized with the right broker base path, I also had to pull up the working directory into its own configuration, such that it can be autowired as well. This is a little more abstraction than I'd like, but it's due to how the `BrokerCfg` is still the central point for all the components, even those not specific to the broker.

The broker configuration is thus not initialized in the system context or broker anymore, but is given to the broker already initialized. This is done via the Spring configuration `BrokerConfiguration`. It uses two beans to do this, which I dislike, but I'm not sure what's the best way to do this. I generally dislike that the configuration must be initialized anyway, it's very brittle and easy to forget, so I'd rather figure a way to avoid this in the future. I would propose a follow up which removes this, and instead enforces that we create the broker config either passing the base already, or using a factory method which does this. I can also do it here, but it will increase how much changes are here.

I think in the future the cluster services should be started before the broker itself, but we can figure this out later. For now, it means that if the actuator wants to make use of them before the broker finishes starting, it will fail - this is something we can handle semi-gracefully, so I would leave it as is.

## Related issues

related to #9996



Co-authored-by: Nicolas Pepin-Perreault <nicolas.pepin-perreault@camunda.com>
  • Loading branch information
zeebe-bors-camunda[bot] and npepinpe committed Sep 15, 2022
2 parents b3d7535 + 870286d commit 9f3dd7c
Show file tree
Hide file tree
Showing 37 changed files with 562 additions and 439 deletions.
11 changes: 6 additions & 5 deletions broker/pom.xml
Expand Up @@ -146,11 +146,6 @@
<artifactId>spring-boot-actuator</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot</artifactId>
</dependency>

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
Expand All @@ -177,6 +172,12 @@
</dependency>

<!-- Test dependencies -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.camunda</groupId>
<artifactId>zeebe-atomix-cluster</artifactId>
Expand Down
10 changes: 5 additions & 5 deletions broker/src/main/java/io/camunda/zeebe/broker/Broker.java
Expand Up @@ -10,6 +10,7 @@
import io.camunda.zeebe.broker.bootstrap.BrokerContext;
import io.camunda.zeebe.broker.bootstrap.BrokerStartupContextImpl;
import io.camunda.zeebe.broker.bootstrap.BrokerStartupProcess;
import io.camunda.zeebe.broker.clustering.ClusterServicesImpl;
import io.camunda.zeebe.broker.exporter.repo.ExporterLoadException;
import io.camunda.zeebe.broker.exporter.repo.ExporterRepository;
import io.camunda.zeebe.broker.system.SystemContext;
Expand Down Expand Up @@ -37,12 +38,10 @@ public final class Broker implements AutoCloseable {
private boolean isClosed = false;

private CompletableFuture<Broker> startFuture;
private final ActorScheduler scheduler;
private BrokerHealthCheckService healthCheckService;

// TODO make Broker class itself the actor
private final BrokerStartupActor brokerStartupActor;
private final BrokerInfo localBroker;
private BrokerContext brokerContext;
// TODO make Broker class itself the actor

Expand All @@ -55,20 +54,21 @@ public Broker(
final SpringBrokerBridge springBrokerBridge,
final List<PartitionListener> additionalPartitionListeners) {
this.systemContext = systemContext;
scheduler = this.systemContext.getScheduler();

localBroker = createBrokerInfo(getConfig());
final ActorScheduler scheduler = this.systemContext.getScheduler();
final BrokerInfo localBroker = createBrokerInfo(getConfig());

healthCheckService = new BrokerHealthCheckService(localBroker);

final BrokerStartupContextImpl startupContext =
final var startupContext =
new BrokerStartupContextImpl(
localBroker,
systemContext.getBrokerConfiguration(),
springBrokerBridge,
scheduler,
healthCheckService,
buildExporterRepository(getConfig()),
new ClusterServicesImpl(systemContext.getCluster()),
additionalPartitionListeners);

brokerStartupActor = new BrokerStartupActor(startupContext);
Expand Down
Expand Up @@ -7,7 +7,6 @@
*/
package io.camunda.zeebe.broker.bootstrap;

import io.atomix.cluster.AtomixCluster;
import io.camunda.zeebe.broker.clustering.ClusterServices;
import io.camunda.zeebe.broker.partitioning.PartitionManager;
import io.camunda.zeebe.broker.system.EmbeddedGatewayService;
Expand All @@ -19,8 +18,6 @@ public interface BrokerContext {

ClusterServices getClusterServices();

AtomixCluster getAtomixCluster();

EmbeddedGatewayService getEmbeddedGatewayService();

/**
Expand Down
Expand Up @@ -9,7 +9,6 @@

import static java.util.Objects.requireNonNull;

import io.atomix.cluster.AtomixCluster;
import io.camunda.zeebe.broker.clustering.ClusterServicesImpl;
import io.camunda.zeebe.broker.partitioning.PartitionManager;
import io.camunda.zeebe.broker.system.EmbeddedGatewayService;
Expand Down Expand Up @@ -42,11 +41,6 @@ public ClusterServicesImpl getClusterServices() {
return clusterServices;
}

@Override
public AtomixCluster getAtomixCluster() {
return clusterServices.getAtomixCluster();
}

@Override
public EmbeddedGatewayService getEmbeddedGatewayService() {
return embeddedGatewayService;
Expand Down
Expand Up @@ -53,8 +53,6 @@ public interface BrokerStartupContext {

ClusterServicesImpl getClusterServices();

void setClusterServices(ClusterServicesImpl o);

CommandApiServiceImpl getCommandApiService();

void setCommandApiService(CommandApiServiceImpl commandApiService);
Expand Down
Expand Up @@ -39,12 +39,11 @@ public final class BrokerStartupContextImpl implements BrokerStartupContext {
private final ActorSchedulingService actorScheduler;
private final BrokerHealthCheckService healthCheckService;
private final ExporterRepository exporterRepository;

private final ClusterServicesImpl clusterServices;
private final List<PartitionListener> partitionListeners = new ArrayList<>();

private ConcurrencyControl concurrencyControl;
private DiskSpaceUsageMonitor diskSpaceUsageMonitor;
private ClusterServicesImpl clusterServices;
private AtomixServerTransport gatewayBrokerTransport;
private ManagedMessagingService commandApiMessagingService;
private CommandApiServiceImpl commandApiService;
Expand All @@ -60,6 +59,7 @@ public BrokerStartupContextImpl(
final ActorSchedulingService actorScheduler,
final BrokerHealthCheckService healthCheckService,
final ExporterRepository exporterRepository,
final ClusterServicesImpl clusterServices,
final List<PartitionListener> additionalPartitionListeners) {

this.brokerInfo = requireNonNull(brokerInfo);
Expand All @@ -68,6 +68,7 @@ public BrokerStartupContextImpl(
this.actorScheduler = requireNonNull(actorScheduler);
this.healthCheckService = requireNonNull(healthCheckService);
this.exporterRepository = requireNonNull(exporterRepository);
this.clusterServices = requireNonNull(clusterServices);
partitionListeners.addAll(additionalPartitionListeners);
}

Expand Down Expand Up @@ -125,11 +126,6 @@ public ClusterServicesImpl getClusterServices() {
return clusterServices;
}

@Override
public void setClusterServices(final ClusterServicesImpl clusterServices) {
this.clusterServices = clusterServices;
}

@Override
public CommandApiServiceImpl getCommandApiService() {
return commandApiService;
Expand Down
Expand Up @@ -46,15 +46,15 @@ public BrokerStartupProcess(final BrokerStartupContext brokerStartupContext) {
private List<StartupStep<BrokerStartupContext>> buildStartupSteps(final BrokerCfg config) {
final var result = new ArrayList<StartupStep<BrokerStartupContext>>();

result.add(new ClusterServicesStep());

if (config.getData().isDiskUsageMonitoringEnabled()) {
// must be executed before any disk space usage listeners are registered
result.add(new DiskSpaceUsageMonitorStep());
}
result.add(new MonitoringServerStep());
result.add(new BrokerAdminServiceStep());

result.add(new ClusterServicesStep());

result.add(new ApiMessagingServiceStep());
result.add(new GatewayBrokerTransportStep());
result.add(new CommandApiServiceStep());
Expand Down
Expand Up @@ -7,8 +7,6 @@
*/
package io.camunda.zeebe.broker.bootstrap;

import io.camunda.zeebe.broker.clustering.AtomixClusterFactory;
import io.camunda.zeebe.broker.clustering.ClusterServicesImpl;
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;

Expand All @@ -25,13 +23,8 @@ void startupInternal(
final ConcurrencyControl concurrencyControl,
final ActorFuture<BrokerStartupContext> startupFuture) {

final var atomix =
AtomixClusterFactory.fromConfiguration(brokerStartupContext.getBrokerConfiguration());
final var clusterServices = new ClusterServicesImpl(atomix);

brokerStartupContext.setClusterServices(clusterServices);

clusterServices
brokerStartupContext
.getClusterServices()
.start()
.whenComplete(
(ok, error) -> {
Expand Down Expand Up @@ -63,7 +56,6 @@ void shutdownInternal(
if (error != null) {
shutdownFuture.completeExceptionally(error);
} else {
brokerShutdownContext.setClusterServices(null);
shutdownFuture.complete(brokerShutdownContext);
}
});
Expand Down

This file was deleted.

@@ -0,0 +1,92 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.broker.clustering;

import io.atomix.cluster.ClusterConfig;
import io.atomix.cluster.MemberConfig;
import io.atomix.cluster.NodeConfig;
import io.atomix.cluster.discovery.BootstrapDiscoveryConfig;
import io.atomix.cluster.messaging.MessagingConfig;
import io.atomix.cluster.protocol.SwimMembershipProtocolConfig;
import io.atomix.utils.net.Address;
import io.camunda.zeebe.broker.system.configuration.BrokerCfg;
import io.camunda.zeebe.broker.system.configuration.ClusterCfg;
import io.camunda.zeebe.broker.system.configuration.MembershipCfg;
import io.camunda.zeebe.broker.system.configuration.NetworkCfg;
import io.camunda.zeebe.broker.system.configuration.SocketBindingCfg;
import java.util.Collection;
import java.util.Collections;
import java.util.stream.Collectors;

// TODO: move this to BrokerClusterConfiguration in the dist module
public final class ClusterConfigFactory {
public ClusterConfig mapConfiguration(final BrokerCfg config) {
final var cluster = config.getCluster();
final var name = cluster.getClusterName();
final var discovery = discoveryConfig(cluster.getInitialContactPoints());
final var membership = membershipConfig(cluster.getMembership());
final var network = config.getNetwork();

final var messaging = messagingConfig(cluster, network);
final var member = memberConfig(network.getInternalApi(), cluster.getNodeId());

return new ClusterConfig()
.setClusterId(name)
.setNodeConfig(member)
.setDiscoveryConfig(discovery)
.setMessagingConfig(messaging)
.setProtocolConfig(membership);
}

private MemberConfig memberConfig(final SocketBindingCfg network, final int nodeId) {
final var advertisedAddress =
Address.from(network.getAdvertisedHost(), network.getAdvertisedPort());

return new MemberConfig().setAddress(advertisedAddress).setId(String.valueOf(nodeId));
}

private SwimMembershipProtocolConfig membershipConfig(final MembershipCfg config) {
return new SwimMembershipProtocolConfig()
.setBroadcastDisputes(config.isBroadcastDisputes())
.setBroadcastUpdates(config.isBroadcastUpdates())
.setFailureTimeout(config.getFailureTimeout())
.setGossipFanout(config.getGossipFanout())
.setGossipInterval(config.getGossipInterval())
.setNotifySuspect(config.isNotifySuspect())
.setProbeInterval(config.getProbeInterval())
.setProbeTimeout(config.getProbeTimeout())
.setSuspectProbes(config.getSuspectProbes())
.setSyncInterval(config.getSyncInterval());
}

private BootstrapDiscoveryConfig discoveryConfig(final Collection<String> contactPoints) {
final var nodes =
contactPoints.stream()
.map(Address::from)
.map(address -> new NodeConfig().setAddress(address))
.collect(Collectors.toSet());

return new BootstrapDiscoveryConfig().setNodes(nodes);
}

private MessagingConfig messagingConfig(final ClusterCfg cluster, final NetworkCfg network) {
final var messaging =
new MessagingConfig()
.setCompressionAlgorithm(cluster.getMessageCompression())
.setInterfaces(Collections.singletonList(network.getInternalApi().getHost()))
.setPort(network.getInternalApi().getPort());

if (network.getSecurity().isEnabled()) {
messaging
.setTlsEnabled(true)
.setCertificateChain(network.getSecurity().getCertificateChainPath())
.setPrivateKey(network.getSecurity().getPrivateKeyPath());
}
return messaging;
}
}

0 comments on commit 9f3dd7c

Please sign in to comment.