Skip to content

Commit

Permalink
merge: #10290
Browse files Browse the repository at this point in the history
10290: Create and configure scheduler in distribution module r=npepinpe a=npepinpe

## Description

As we want to use certain parts of the system in the actuators, we need to pull out of the broker and up into the distribution the creation of such instances. For this PR, we're moving the `ActorScheduler`' configuration and creation into dist, and instead injecting it directly in the broker at construction time.

There is some duplication as of now with the gateway's `ActorSchedulerComponent`, but I'd rather not deal with this now until we have a clearer picture of where we want to end up.

## Related issues

related to #9996 



Co-authored-by: Nicolas Pepin-Perreault <nicolas.pepin-perreault@camunda.com>
  • Loading branch information
zeebe-bors-camunda[bot] and npepinpe committed Sep 12, 2022
2 parents ba257ec + f711fc1 commit 744e79d
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@
import io.camunda.zeebe.broker.system.configuration.DataCfg;
import io.camunda.zeebe.broker.system.configuration.ExperimentalCfg;
import io.camunda.zeebe.broker.system.configuration.SecurityCfg;
import io.camunda.zeebe.broker.system.configuration.ThreadsCfg;
import io.camunda.zeebe.broker.system.configuration.partitioning.FixedPartitionCfg;
import io.camunda.zeebe.broker.system.configuration.partitioning.Scheme;
import io.camunda.zeebe.scheduler.ActorScheduler;
import io.camunda.zeebe.scheduler.clock.ActorClock;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -45,15 +43,16 @@ public final class SystemContext {

private final BrokerCfg brokerCfg;
private Map<String, String> diagnosticContext;
private ActorScheduler scheduler;
private final ActorScheduler scheduler;

public SystemContext(final BrokerCfg brokerCfg, final String basePath, final ActorClock clock) {
public SystemContext(
final BrokerCfg brokerCfg, final String basePath, final ActorScheduler scheduler) {
this.brokerCfg = brokerCfg;

initSystemContext(clock, basePath);
this.scheduler = scheduler;
initSystemContext(basePath);
}

private void initSystemContext(final ActorClock clock, final String basePath) {
private void initSystemContext(final String basePath) {
LOG.debug("Initializing system with base path {}", basePath);

brokerCfg.init(basePath);
Expand All @@ -63,7 +62,6 @@ private void initSystemContext(final ActorClock clock, final String basePath) {
final String brokerId = String.format("Broker-%d", cluster.getNodeId());

diagnosticContext = Collections.singletonMap(BROKER_ID_LOG_PROPERTY, brokerId);
scheduler = initScheduler(clock, brokerId);
}

private void validateConfiguration() {
Expand Down Expand Up @@ -258,22 +256,6 @@ private void validateNetworkSecurityConfig(final SecurityCfg security) {
}
}

private ActorScheduler initScheduler(final ActorClock clock, final String brokerId) {
final ThreadsCfg cfg = brokerCfg.getThreads();

final int cpuThreads = cfg.getCpuThreadCount();
final int ioThreads = cfg.getIoThreadCount();
final boolean metricsEnabled = brokerCfg.getExperimental().getFeatures().isEnableActorMetrics();

return ActorScheduler.newActorScheduler()
.setActorClock(clock)
.setCpuBoundActorThreadCount(cpuThreads)
.setIoBoundActorThreadCount(ioThreads)
.setMetricsEnabled(metricsEnabled)
.setSchedulerName(brokerId)
.build();
}

public ActorScheduler getScheduler() {
return scheduler;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.camunda.zeebe.broker.system.configuration.BrokerCfg;
import io.camunda.zeebe.engine.state.QueryService;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.scheduler.ActorScheduler;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import java.io.File;
Expand Down Expand Up @@ -63,7 +64,10 @@ public void shouldCallPartitionListenerAfterStart() throws Exception {
final var brokerCfg = new BrokerCfg();
assignSocketAddresses(brokerCfg);
final var systemContext =
new SystemContext(brokerCfg, newTemporaryFolder.getAbsolutePath(), null);
new SystemContext(
brokerCfg,
newTemporaryFolder.getAbsolutePath(),
ActorScheduler.newActorScheduler().build());
systemContext.getScheduler().start();

final var leaderLatch = new CountDownLatch(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.camunda.zeebe.broker.system.configuration.partitioning.FixedPartitionCfg;
import io.camunda.zeebe.broker.system.configuration.partitioning.FixedPartitionCfg.NodeCfg;
import io.camunda.zeebe.broker.system.configuration.partitioning.Scheme;
import io.camunda.zeebe.scheduler.ActorScheduler;
import io.camunda.zeebe.scheduler.clock.ControlledActorClock;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import java.io.File;
Expand Down Expand Up @@ -395,6 +396,12 @@ void shouldThrowExceptionWithNetworkSecurityEnabledAndNoCert() throws Certificat
}

private SystemContext initSystemContext(final BrokerCfg brokerCfg) {
return new SystemContext(brokerCfg, "test", new ControlledActorClock());
final ActorScheduler scheduler =
ActorScheduler.newActorScheduler()
.setCpuBoundActorThreadCount(1)
.setIoBoundActorThreadCount(1)
.setActorClock(new ControlledActorClock())
.build();
return new SystemContext(brokerCfg, "test", scheduler);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.engine.state.QueryService;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.scheduler.ActorScheduler;
import io.camunda.zeebe.scheduler.clock.ControlledActorClock;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
Expand Down Expand Up @@ -222,9 +223,17 @@ public void startBroker(final PartitionListener... listeners) {
throw new RuntimeException("Unable to open configuration", e);
}
}
systemContext =
new SystemContext(brokerCfg, newTemporaryFolder.getAbsolutePath(), controlledActorClock);
systemContext.getScheduler().start();

final var scheduler =
ActorScheduler.newActorScheduler()
.setCpuBoundActorThreadCount(brokerCfg.getThreads().getCpuThreadCount())
.setIoBoundActorThreadCount(brokerCfg.getThreads().getIoThreadCount())
.setMetricsEnabled(brokerCfg.getExperimental().getFeatures().isEnableActorMetrics())
.setActorClock(controlledActorClock)
.build();

systemContext = new SystemContext(brokerCfg, newTemporaryFolder.getAbsolutePath(), scheduler);
scheduler.start();

final var additionalListeners = new ArrayList<>(Arrays.asList(listeners));
final CountDownLatch latch = new CountDownLatch(brokerCfg.getCluster().getPartitionsCount());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.broker;

import io.camunda.zeebe.broker.system.configuration.BrokerCfg;
import io.camunda.zeebe.broker.system.configuration.ThreadsCfg;
import io.camunda.zeebe.scheduler.ActorScheduler;
import io.camunda.zeebe.scheduler.clock.ActorClock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@SuppressWarnings("unused")
@Configuration(proxyBeanMethods = false)
public final class ActorSchedulerConfiguration {
private final BrokerCfg brokerCfg;
private final ActorClock clock;

@Autowired
public ActorSchedulerConfiguration(final BrokerCfg brokerCfg, final ActorClock clock) {
this.brokerCfg = brokerCfg;
this.clock = clock;
}

@Bean("actorScheduler")
public ActorScheduler getScheduler() {
final ThreadsCfg cfg = brokerCfg.getThreads();

final int cpuThreads = cfg.getCpuThreadCount();
final int ioThreads = cfg.getIoThreadCount();
final boolean metricsEnabled = brokerCfg.getExperimental().getFeatures().isEnableActorMetrics();

return ActorScheduler.newActorScheduler()
.setActorClock(clock)
.setCpuBoundActorThreadCount(cpuThreads)
.setIoBoundActorThreadCount(ioThreads)
.setMetricsEnabled(metricsEnabled)
.setSchedulerName(String.format("Broker-%d", brokerCfg.getCluster().getNodeId()))
.build();
}
}
61 changes: 26 additions & 35 deletions dist/src/main/java/io/camunda/zeebe/broker/StandaloneBroker.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@

import io.camunda.zeebe.broker.system.SystemContext;
import io.camunda.zeebe.broker.system.configuration.BrokerCfg;
import io.camunda.zeebe.shared.ActorClockConfiguration;
import io.camunda.zeebe.scheduler.ActorScheduler;
import io.camunda.zeebe.shared.Profile;
import io.camunda.zeebe.util.FileUtil;
import io.camunda.zeebe.util.error.FatalErrorHandler;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.Path;
import java.util.concurrent.ExecutionException;
import org.apache.logging.log4j.LogManager;
import org.slf4j.Logger;
Expand Down Expand Up @@ -48,22 +47,21 @@ public class StandaloneBroker
private final BrokerCfg configuration;
private final Environment springEnvironment;
private final SpringBrokerBridge springBrokerBridge;
private final ActorClockConfiguration clockConfig;
private final ActorScheduler actorScheduler;

private String tempFolder;
private SystemContext systemContext;
private Path tempFolder;
private Broker broker;

@Autowired
public StandaloneBroker(
final BrokerCfg configuration,
final Environment springEnvironment,
final SpringBrokerBridge springBrokerBridge,
final ActorClockConfiguration clockConfig) {
final ActorScheduler actorScheduler) {
this.configuration = configuration;
this.springEnvironment = springEnvironment;
this.springBrokerBridge = springBrokerBridge;
this.clockConfig = clockConfig;
this.actorScheduler = actorScheduler;
}

public static void main(final String[] args) {
Expand All @@ -82,24 +80,35 @@ public static void main(final String[] args) {
}

@Override
public void run(final String... args) {
public void run(final String... args) throws IOException {
final Path workingDirectory = resolveWorkingDirectory();
final SystemContext systemContext =
new SystemContext(configuration, workingDirectory.toString(), actorScheduler);

actorScheduler.start();
broker = new Broker(systemContext, springBrokerBridge);
broker.start();
}

private Path resolveWorkingDirectory() throws IOException {
final Path workingDirectory;
if (shouldUseTemporaryFolder()) {
LOG.info("Launching broker in temporary folder.");
systemContext = createSystemContextInTempDirectory();
LOG.debug(
"Starting broker with a temporary directory; it will be deleted on graceful shutdown");
tempFolder = Files.createTempDirectory("zeebe").toAbsolutePath().normalize();
workingDirectory = tempFolder;
} else {
systemContext = createSystemContextInBaseDirectory();
workingDirectory = Path.of(System.getProperty("basedir", ".")).toAbsolutePath().normalize();
}

systemContext.getScheduler().start();
broker = new Broker(systemContext, springBrokerBridge);
broker.start();
return workingDirectory;
}

@Override
public void onApplicationEvent(final ContextClosedEvent event) {
try {
broker.close();
systemContext.getScheduler().stop().get();
actorScheduler.stop().get();
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn("Shutdown interrupted, most likely harmless", e);
Expand All @@ -116,28 +125,10 @@ private boolean shouldUseTemporaryFolder() {
Profiles.of(Profile.DEVELOPMENT.getId(), Profile.TEST.getId()));
}

private SystemContext createSystemContextInBaseDirectory() {
String basePath = System.getProperty("basedir");

if (basePath == null) {
basePath = Paths.get(".").toAbsolutePath().normalize().toString();
}
return new SystemContext(configuration, basePath, clockConfig.getClock());
}

private SystemContext createSystemContextInTempDirectory() {
try {
tempFolder = Files.createTempDirectory("zeebe").toAbsolutePath().normalize().toString();
return new SystemContext(configuration, tempFolder, clockConfig.getClock());
} catch (final IOException e) {
throw new UncheckedIOException("Could not create system context", e);
}
}

private void deleteTempDirectory() {
if (tempFolder != null) {
try {
FileUtil.deleteFolder(tempFolder);
FileUtil.deleteFolderIfExists(tempFolder);
} catch (final IOException e) {
LOG.error("Failed to delete temporary folder {}", tempFolder, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.springframework.boot.context.properties.ConstructorBinding;
import org.springframework.boot.context.properties.bind.DefaultValue;
import org.springframework.context.annotation.Bean;
import org.springframework.web.context.annotation.ApplicationScope;

@SuppressWarnings("unused")
@ConfigurationProperties("zeebe.clock")
Expand All @@ -38,13 +37,11 @@ public ActorClockConfiguration(@DefaultValue("false") final boolean controlled)
}

@Bean
@ApplicationScope
public ActorClock getClock() {
return clock;
}

@Bean
@ApplicationScope
public ActorClockService getClockService() {
return service;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.atomix.cluster.protocol.SwimMembershipProtocol;
import io.atomix.raft.partition.RaftPartition;
import io.atomix.utils.net.Address;
import io.camunda.zeebe.broker.ActorSchedulerConfiguration;
import io.camunda.zeebe.broker.Broker;
import io.camunda.zeebe.broker.PartitionListener;
import io.camunda.zeebe.broker.SpringBrokerBridge;
Expand Down Expand Up @@ -286,7 +287,10 @@ private Broker createBroker(final int nodeId) {
final File brokerBase = getBrokerBase(nodeId);
final BrokerCfg brokerCfg = getBrokerCfg(nodeId);
final var systemContext =
new SystemContext(brokerCfg, brokerBase.getAbsolutePath(), controlledClock);
new SystemContext(
brokerCfg,
brokerBase.getAbsolutePath(),
new ActorSchedulerConfiguration(brokerCfg, controlledClock).getScheduler());
systemContexts.put(nodeId, systemContext);

systemContext.getScheduler().start();
Expand Down

0 comments on commit 744e79d

Please sign in to comment.