Skip to content

Commit

Permalink
merge: #10032
Browse files Browse the repository at this point in the history
10032: refactor: let `dist` build gateways broker client r=oleschoenburg a=oleschoenburg

## Description

Builds the broker client in dist, making it available for injection. Because `BrokerClient` needs to schedule actors, the actor scheduler must be started before. To achieve this, `BrokerClient` gets a `start` method that does the scheduling.

## Related issues

<!-- Which issues are closed by this PR or are related -->

relates to #9996 



Co-authored-by: Ole Schönburg <ole.schoenburg@gmail.com>
  • Loading branch information
zeebe-bors-camunda[bot] and lenaschoenburg committed Aug 15, 2022
2 parents 661e6bb + a00eefd commit a3d03d8
Show file tree
Hide file tree
Showing 15 changed files with 126 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,28 @@
import io.atomix.cluster.messaging.MessagingService;
import io.camunda.zeebe.broker.system.configuration.BrokerCfg;
import io.camunda.zeebe.gateway.Gateway;
import io.camunda.zeebe.gateway.impl.broker.BrokerClient;
import io.camunda.zeebe.gateway.impl.broker.BrokerClientImpl;
import io.camunda.zeebe.gateway.impl.configuration.GatewayCfg;
import io.camunda.zeebe.scheduler.ActorSchedulingService;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import java.util.function.Function;

public final class EmbeddedGatewayService implements AutoCloseable {
private final Gateway gateway;
private final BrokerClientImpl brokerClient;

public EmbeddedGatewayService(
final BrokerCfg configuration,
final ActorSchedulingService actorScheduler,
final MessagingService messagingService,
final ClusterMembershipService membershipService,
final ClusterEventService eventService) {
final Function<GatewayCfg, BrokerClient> brokerClientFactory =
cfg ->
new BrokerClientImpl(
cfg, messagingService, membershipService, eventService, actorScheduler);
gateway = new Gateway(configuration.getGateway(), brokerClientFactory, actorScheduler);
brokerClient =
new BrokerClientImpl(
configuration.getGateway(),
messagingService,
membershipService,
eventService,
actorScheduler);
gateway = new Gateway(configuration.getGateway(), brokerClient, actorScheduler);
}

@Override
Expand All @@ -47,6 +48,7 @@ public Gateway get() {
}

public ActorFuture<Gateway> start() {
brokerClient.start();
return gateway.start();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
*
* <p>See {@link #main(String[])} for more.
*/
@SpringBootApplication(scanBasePackages = {"io.camunda.zeebe.broker", "io.camunda.zeebe.shared"})
@SpringBootApplication(
proxyBeanMethods = false,
scanBasePackages = {"io.camunda.zeebe.broker", "io.camunda.zeebe.shared"})
@ConfigurationPropertiesScan(basePackages = {"io.camunda.zeebe.broker", "io.camunda.zeebe.shared"})
public class StandaloneBroker
implements CommandLineRunner, ApplicationListener<ContextClosedEvent> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.gateway;

import io.atomix.cluster.AtomixCluster;
import io.camunda.zeebe.gateway.impl.broker.BrokerClient;
import io.camunda.zeebe.gateway.impl.broker.BrokerClientImpl;
import io.camunda.zeebe.gateway.impl.configuration.GatewayCfg;
import io.camunda.zeebe.scheduler.ActorScheduler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
final class BrokerClientComponent {
final GatewayCfg config;
final AtomixCluster atomixCluster;
final ActorScheduler actorScheduler;

@Autowired
BrokerClientComponent(
final GatewayCfg config,
final AtomixCluster atomixCluster,
final ActorScheduler actorScheduler) {
this.config = config;
this.atomixCluster = atomixCluster;
this.actorScheduler = actorScheduler;
}

@Bean("brokerClient")
BrokerClient createBrokerClient() {
return new BrokerClientImpl(
config,
atomixCluster.getMessagingService(),
atomixCluster.getMembershipService(),
atomixCluster.getEventService(),
actorScheduler);
}
}
19 changes: 7 additions & 12 deletions dist/src/main/java/io/camunda/zeebe/gateway/StandaloneGateway.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import io.atomix.cluster.AtomixCluster;
import io.camunda.zeebe.gateway.impl.SpringGatewayBridge;
import io.camunda.zeebe.gateway.impl.broker.BrokerClient;
import io.camunda.zeebe.gateway.impl.broker.BrokerClientImpl;
import io.camunda.zeebe.gateway.impl.broker.cluster.BrokerTopologyManager;
import io.camunda.zeebe.gateway.impl.configuration.GatewayCfg;
import io.camunda.zeebe.scheduler.ActorScheduler;
Expand Down Expand Up @@ -38,6 +37,7 @@
* <p>See {@link #main(String[])} for more.
*/
@SpringBootApplication(
proxyBeanMethods = false,
scanBasePackages = {
"io.camunda.zeebe.gateway",
"io.camunda.zeebe.shared",
Expand All @@ -52,6 +52,7 @@ public class StandaloneGateway
private final SpringGatewayBridge springGatewayBridge;
private final ActorScheduler actorScheduler;
private final AtomixCluster atomixCluster;
private final BrokerClient brokerClient;

private Gateway gateway;

Expand All @@ -60,11 +61,13 @@ public StandaloneGateway(
final GatewayCfg configuration,
final SpringGatewayBridge springGatewayBridge,
final ActorScheduler actorScheduler,
final AtomixCluster atomixCluster) {
final AtomixCluster atomixCluster,
final BrokerClient brokerClient) {
this.configuration = configuration;
this.springGatewayBridge = springGatewayBridge;
this.actorScheduler = actorScheduler;
this.atomixCluster = atomixCluster;
this.brokerClient = brokerClient;
}

public static void main(final String[] args) {
Expand All @@ -91,7 +94,7 @@ public void run(final String... args) throws Exception {
LOG.info("Starting standalone gateway with configuration {}", configuration.toJson());
}

gateway = new Gateway(configuration, this::createBrokerClient, actorScheduler);
gateway = new Gateway(configuration, brokerClient, actorScheduler);

springGatewayBridge.registerBrokerClientSupplier(gateway::getBrokerClient);
springGatewayBridge.registerGatewayStatusSupplier(gateway::getStatus);
Expand All @@ -103,6 +106,7 @@ public void run(final String... args) throws Exception {

actorScheduler.start();
atomixCluster.start();
brokerClient.start();
gateway.start().join(30, TimeUnit.SECONDS);
}

Expand Down Expand Up @@ -139,13 +143,4 @@ public void close() {

LogManager.shutdown();
}

private BrokerClient createBrokerClient(final GatewayCfg config) {
return new BrokerClientImpl(
config,
atomixCluster.getMessagingService(),
atomixCluster.getMembershipService(),
atomixCluster.getEventService(),
actorScheduler);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,13 @@ private StandaloneGateway buildGateway(final GatewayCfg gatewayCfg) {
final AtomixComponent clusterComponent = new AtomixComponent(gatewayCfg);
final ActorSchedulerComponent actorSchedulerComponent =
new ActorSchedulerComponent(gatewayCfg, new ActorClockConfiguration(false));
final var atomixCluster = clusterComponent.createAtomixCluster();
final var actorScheduler = actorSchedulerComponent.createActorSchedulingService();
final BrokerClientComponent brokerClientComponent =
new BrokerClientComponent(gatewayCfg, atomixCluster, actorScheduler);
final var brokerClient = brokerClientComponent.createBrokerClient();

return new StandaloneGateway(
gatewayCfg,
new SpringGatewayBridge(),
actorSchedulerComponent.createActorSchedulingService(),
clusterComponent.createAtomixCluster());
gatewayCfg, new SpringGatewayBridge(), actorScheduler, atomixCluster, brokerClient);
}
}
50 changes: 5 additions & 45 deletions gateway/src/main/java/io/camunda/zeebe/gateway/Gateway.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,10 @@
*/
package io.camunda.zeebe.gateway;

import io.atomix.cluster.ClusterMembershipService;
import io.atomix.cluster.messaging.ClusterEventService;
import io.atomix.cluster.messaging.MessagingService;
import io.camunda.zeebe.gateway.health.GatewayHealthManager;
import io.camunda.zeebe.gateway.health.Status;
import io.camunda.zeebe.gateway.health.impl.GatewayHealthManagerImpl;
import io.camunda.zeebe.gateway.impl.broker.BrokerClient;
import io.camunda.zeebe.gateway.impl.broker.BrokerClientImpl;
import io.camunda.zeebe.gateway.impl.configuration.GatewayCfg;
import io.camunda.zeebe.gateway.impl.configuration.NetworkCfg;
import io.camunda.zeebe.gateway.impl.configuration.SecurityCfg;
Expand Down Expand Up @@ -51,50 +47,24 @@

public final class Gateway {
private static final Logger LOG = Loggers.GATEWAY_LOGGER;
private static final Function<GatewayCfg, ServerBuilder> DEFAULT_SERVER_BUILDER_FACTORY =
cfg -> setNetworkConfig(cfg.getNetwork());
private static final MonitoringServerInterceptor MONITORING_SERVER_INTERCEPTOR =
MonitoringServerInterceptor.create(Configuration.allMetrics());

private final Function<GatewayCfg, ServerBuilder> serverBuilderFactory;
private final Function<GatewayCfg, BrokerClient> brokerClientFactory;
private final Function<GatewayCfg, ServerBuilder> serverBuilderFactory =
cfg -> setNetworkConfig(cfg.getNetwork());
private final GatewayCfg gatewayCfg;
private final ActorSchedulingService actorSchedulingService;
private final GatewayHealthManager healthManager;

private Server server;
private BrokerClient brokerClient;

public Gateway(
final GatewayCfg gatewayCfg,
final MessagingService messagingService,
final ClusterMembershipService membershipService,
final ClusterEventService eventService,
final ActorSchedulingService actorSchedulingService) {
this(
gatewayCfg,
cfg ->
new BrokerClientImpl(
cfg, messagingService, membershipService, eventService, actorSchedulingService),
DEFAULT_SERVER_BUILDER_FACTORY,
actorSchedulingService);
}

public Gateway(
final GatewayCfg gatewayCfg,
final Function<GatewayCfg, BrokerClient> brokerClientFactory,
final ActorSchedulingService actorSchedulingService) {
this(gatewayCfg, brokerClientFactory, DEFAULT_SERVER_BUILDER_FACTORY, actorSchedulingService);
}
private final BrokerClient brokerClient;

public Gateway(
final GatewayCfg gatewayCfg,
final Function<GatewayCfg, BrokerClient> brokerClientFactory,
final Function<GatewayCfg, ServerBuilder> serverBuilderFactory,
final BrokerClient brokerClient,
final ActorSchedulingService actorSchedulingService) {
this.gatewayCfg = gatewayCfg;
this.brokerClientFactory = brokerClientFactory;
this.serverBuilderFactory = serverBuilderFactory;
this.brokerClient = brokerClient;
this.actorSchedulingService = actorSchedulingService;

healthManager = new GatewayHealthManagerImpl();
Expand All @@ -116,7 +86,6 @@ public ActorFuture<Gateway> start() {
final var resultFuture = new CompletableActorFuture<Gateway>();

healthManager.setStatus(Status.STARTING);
brokerClient = buildBrokerClient();

createAndStartActivateJobsHandler(brokerClient)
.whenComplete(
Expand Down Expand Up @@ -218,10 +187,6 @@ private void setSecurityConfig(final ServerBuilder<?> serverBuilder, final Secur
serverBuilder.useTransportSecurity(certificateChainPath, privateKeyPath);
}

private BrokerClient buildBrokerClient() {
return brokerClientFactory.apply(gatewayCfg);
}

private CompletableFuture<ActivateJobsHandler> createAndStartActivateJobsHandler(
final BrokerClient brokerClient) {
final var handler = buildActivateJobsHandler(brokerClient);
Expand Down Expand Up @@ -282,10 +247,5 @@ public void stop() {
server = null;
}
}

if (brokerClient != null) {
brokerClient.close();
brokerClient = null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

public interface BrokerClient extends AutoCloseable {

void start();

@Override
void close();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public final class BrokerClientImpl implements BrokerClient {
private boolean isClosed;
private Subscription jobAvailableSubscription;
private final ClusterEventService eventService;
private final ActorSchedulingService schedulingService;
private final AtomixClientTransportAdapter atomixTransportAdapter;

public BrokerClientImpl(
final GatewayCfg configuration,
Expand All @@ -43,24 +45,29 @@ public BrokerClientImpl(
final ClusterEventService eventService,
final ActorSchedulingService schedulingService) {
this.eventService = eventService;
this.schedulingService = schedulingService;

final ClusterCfg clusterCfg = configuration.getCluster();
topologyManager = new BrokerTopologyManagerImpl(membershipService::getMembers);
schedulingService.submitActor(topologyManager);
membershipService.addListener(topologyManager);
membershipService
.getMembers()
.forEach(
member -> topologyManager.event(new ClusterMembershipEvent(Type.MEMBER_ADDED, member)));

final var atomixTransportAdapter = new AtomixClientTransportAdapter(messagingService);
schedulingService.submitActor(atomixTransportAdapter);
atomixTransportAdapter = new AtomixClientTransportAdapter(messagingService);
requestManager =
new BrokerRequestManager(
atomixTransportAdapter,
topologyManager,
new RoundRobinDispatchStrategy(topologyManager),
clusterCfg.getRequestTimeout());
}

@Override
public void start() {
schedulingService.submitActor(topologyManager);
schedulingService.submitActor(atomixTransportAdapter);
schedulingService.submitActor(requestManager);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.camunda.zeebe.client.api.command.ClientStatusException;
import io.camunda.zeebe.client.api.command.FinalCommandStep;
import io.camunda.zeebe.gateway.Gateway;
import io.camunda.zeebe.gateway.impl.broker.BrokerClientImpl;
import io.camunda.zeebe.gateway.impl.configuration.GatewayCfg;
import io.camunda.zeebe.gateway.impl.configuration.NetworkCfg;
import io.camunda.zeebe.scheduler.ActorScheduler;
Expand Down Expand Up @@ -54,13 +55,16 @@ static void setUp() throws IOException {
actorScheduler = ActorScheduler.newActorScheduler().build();
actorScheduler.start();

gateway =
new Gateway(
new GatewayCfg().setNetwork(networkCfg),
final var brokerClient =
new BrokerClientImpl(
config,
cluster.getMessagingService(),
cluster.getMembershipService(),
cluster.getEventService(),
actorScheduler);
brokerClient.start();

gateway = new Gateway(config, brokerClient, actorScheduler);
gateway.start().join();

final String gatewayAddress = NetUtil.toSocketAddressString(networkCfg.toSocketAddress());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ public final class StubbedBrokerClient implements BrokerClient {

public StubbedBrokerClient() {}

@Override
public void start() {}

@Override
public void close() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ public void setUp() {
atomixCluster.getEventService(),
actorScheduler.get());

client.start();

final BrokerClusterStateImpl topology = new BrokerClusterStateImpl();
topology.addPartitionIfAbsent(START_PARTITION_ID);
topology.setPartitionLeader(START_PARTITION_ID, 0, 1);
Expand Down

0 comments on commit a3d03d8

Please sign in to comment.