Skip to content

Commit

Permalink
Merge branch 'main' into zell-clean-up-abstractions
Browse files Browse the repository at this point in the history
  • Loading branch information
Zelldon committed Aug 4, 2022
2 parents 146e2bc + 00fb0db commit 73850d2
Show file tree
Hide file tree
Showing 50 changed files with 900 additions and 501 deletions.
2 changes: 1 addition & 1 deletion broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@
<includePluginDependencies>true</includePluginDependencies>
<mainClass>uk.co.real_logic.sbe.SbeTool</mainClass>
<arguments>
<argument>${project.build.resources[0].directory}/management-schema.xml</argument>
<argument>${project.build.resources[0].directory}/broker-protocol.xml</argument>
</arguments>
<workingDirectory>${project.build.directory}/generated-sources</workingDirectory>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ void startupInternal(
final ConcurrencyControl concurrencyControl,
final ActorFuture<BrokerStartupContext> startupFuture) {
final var schedulingService = brokerStartupContext.getActorSchedulingService();
final var transport = brokerStartupContext.getCommandApiServerTransport();
final var handler = new AdminApiRequestHandler(transport);
final var transport = brokerStartupContext.getGatewayBrokerTransport();
final var handler =
new AdminApiRequestHandler(transport, brokerStartupContext.getPartitionManager());

concurrencyControl.runOnCompletion(
schedulingService.submitActor(handler),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ public interface BrokerStartupContext {

void setAdminApiService(AdminApiRequestHandler adminApiService);

AtomixServerTransport getCommandApiServerTransport();
AtomixServerTransport getGatewayBrokerTransport();

void setCommandApiServerTransport(AtomixServerTransport commandApiServerTransport);
void setGatewayBrokerTransport(AtomixServerTransport gatewayBrokerTransport);

ManagedMessagingService getApiMessagingService();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public final class BrokerStartupContextImpl implements BrokerStartupContext {
private ConcurrencyControl concurrencyControl;
private DiskSpaceUsageMonitor diskSpaceUsageMonitor;
private ClusterServicesImpl clusterServices;
private AtomixServerTransport commandApiServerTransport;
private AtomixServerTransport gatewayBrokerTransport;
private ManagedMessagingService commandApiMessagingService;
private CommandApiServiceImpl commandApiService;
private AdminApiRequestHandler adminApiService;
Expand Down Expand Up @@ -157,13 +157,13 @@ public void setAdminApiService(final AdminApiRequestHandler adminApiService) {
}

@Override
public AtomixServerTransport getCommandApiServerTransport() {
return commandApiServerTransport;
public AtomixServerTransport getGatewayBrokerTransport() {
return gatewayBrokerTransport;
}

@Override
public void setCommandApiServerTransport(final AtomixServerTransport commandApiServerTransport) {
this.commandApiServerTransport = commandApiServerTransport;
public void setGatewayBrokerTransport(final AtomixServerTransport gatewayBrokerTransport) {
this.gatewayBrokerTransport = gatewayBrokerTransport;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,17 @@ private List<StartupStep<BrokerStartupContext>> buildStartupSteps(final BrokerCf
result.add(new ClusterServicesStep());

result.add(new ApiMessagingServiceStep());
result.add(new GatewayBrokerTransportStep());
result.add(new CommandApiServiceStep());
result.add(new AdminApiServiceStep());

if (config.getGateway().isEnable()) {
result.add(new EmbeddedGatewayServiceStep());
}

result.add(new PartitionManagerStep());

result.add(new AdminApiServiceStep());

return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.transport.ServerTransport;
import io.camunda.zeebe.transport.impl.AtomixServerTransport;

final class CommandApiServiceStep extends AbstractBrokerStartupStep {

Expand All @@ -26,7 +25,12 @@ void startupInternal(
final BrokerStartupContext brokerStartupContext,
final ConcurrencyControl concurrencyControl,
final ActorFuture<BrokerStartupContext> startupFuture) {
concurrencyControl.run(() -> startServerTransport(brokerStartupContext, startupFuture));
concurrencyControl.run(
() ->
startCommandApiService(
brokerStartupContext,
brokerStartupContext.getGatewayBrokerTransport(),
startupFuture));
}

@Override
Expand All @@ -36,10 +40,7 @@ void shutdownInternal(
final ActorFuture<BrokerStartupContext> shutdownFuture) {

final var commandApiServiceActor = brokerShutdownContext.getCommandApiService();
if (commandApiServiceActor == null) {
closeServerTransport(brokerShutdownContext, concurrencyControl, shutdownFuture);
return;
}

brokerShutdownContext.removePartitionListener(commandApiServiceActor);
brokerShutdownContext
.getDiskSpaceUsageMonitor()
Expand All @@ -50,33 +51,11 @@ void shutdownInternal(
proceed(
() -> {
brokerShutdownContext.setCommandApiService(null);
closeServerTransport(brokerShutdownContext, concurrencyControl, shutdownFuture);
shutdownFuture.complete(brokerShutdownContext);
},
shutdownFuture));
}

private void startServerTransport(
final BrokerStartupContext brokerStartupContext,
final ActorFuture<BrokerStartupContext> startupFuture) {

final var concurrencyControl = brokerStartupContext.getConcurrencyControl();
final var brokerInfo = brokerStartupContext.getBrokerInfo();
final var schedulingService = brokerStartupContext.getActorSchedulingService();
final var messagingService = brokerStartupContext.getApiMessagingService();

final var atomixServerTransport =
new AtomixServerTransport(brokerInfo.getNodeId(), messagingService);

concurrencyControl.runOnCompletion(
schedulingService.submitActor(atomixServerTransport),
proceed(
() -> {
brokerStartupContext.setCommandApiServerTransport(atomixServerTransport);
startCommandApiService(brokerStartupContext, atomixServerTransport, startupFuture);
},
startupFuture));
}

private void startCommandApiService(
final BrokerStartupContext brokerStartupContext,
final ServerTransport serverTransport,
Expand Down Expand Up @@ -114,24 +93,4 @@ private void startCommandApiService(
},
startupFuture));
}

private void closeServerTransport(
final BrokerStartupContext brokerShutdownContext,
final ConcurrencyControl concurrencyControl,
final ActorFuture<BrokerStartupContext> shutdownFuture) {
final var serverTransport = brokerShutdownContext.getCommandApiServerTransport();

if (serverTransport == null) {
return;
}

concurrencyControl.runOnCompletion(
serverTransport.closeAsync(),
proceed(
() -> {
brokerShutdownContext.setCommandApiServerTransport(null);
shutdownFuture.complete(brokerShutdownContext);
},
shutdownFuture));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.broker.bootstrap;

import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.transport.impl.AtomixServerTransport;

/** Starts the server transport which can receive commands from the gateway * */
final class GatewayBrokerTransportStep extends AbstractBrokerStartupStep {

@Override
public String getName() {
return "Broker Transport";
}

@Override
void startupInternal(
final BrokerStartupContext brokerStartupContext,
final ConcurrencyControl concurrencyControl,
final ActorFuture<BrokerStartupContext> startupFuture) {
concurrencyControl.run(() -> startServerTransport(brokerStartupContext, startupFuture));
}

@Override
void shutdownInternal(
final BrokerStartupContext brokerShutdownContext,
final ConcurrencyControl concurrencyControl,
final ActorFuture<BrokerStartupContext> shutdownFuture) {
closeServerTransport(brokerShutdownContext, concurrencyControl, shutdownFuture);
}

private void startServerTransport(
final BrokerStartupContext brokerStartupContext,
final ActorFuture<BrokerStartupContext> startupFuture) {

final var concurrencyControl = brokerStartupContext.getConcurrencyControl();
final var brokerInfo = brokerStartupContext.getBrokerInfo();
final var schedulingService = brokerStartupContext.getActorSchedulingService();
final var messagingService = brokerStartupContext.getApiMessagingService();

final var atomixServerTransport =
new AtomixServerTransport(brokerInfo.getNodeId(), messagingService);

concurrencyControl.runOnCompletion(
schedulingService.submitActor(atomixServerTransport),
proceed(
() -> {
brokerStartupContext.setGatewayBrokerTransport(atomixServerTransport);
startupFuture.complete(brokerStartupContext);
},
startupFuture));
}

private void closeServerTransport(
final BrokerStartupContext brokerShutdownContext,
final ConcurrencyControl concurrencyControl,
final ActorFuture<BrokerStartupContext> shutdownFuture) {
final var serverTransport = brokerShutdownContext.getGatewayBrokerTransport();

if (serverTransport == null) {
return;
}

concurrencyControl.runOnCompletion(
serverTransport.closeAsync(),
proceed(
() -> {
brokerShutdownContext.setGatewayBrokerTransport(null);
shutdownFuture.complete(brokerShutdownContext);
},
shutdownFuture));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ void startupInternal(
brokerStartupContext.getPartitionListeners(),
brokerStartupContext.getCommandApiService(),
brokerStartupContext.getExporterRepository(),
brokerStartupContext.getCommandApiServerTransport());
brokerStartupContext.getGatewayBrokerTransport());

CompletableFuture.runAsync(
() ->
Expand All @@ -53,9 +53,6 @@ void startupInternal(
() ->
forwardExceptions(
() -> {
final var adminApi =
brokerStartupContext.getAdminApiService();
adminApi.injectPartitionManager(partitionManager);
final var adminService =
brokerStartupContext.getBrokerAdminService();
adminService.injectAdminAccess(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
*/
package io.camunda.zeebe.broker.exporter.stream;

import io.camunda.zeebe.clustering.management.ExporterPositionsDecoder;
import io.camunda.zeebe.clustering.management.ExporterPositionsEncoder;
import io.camunda.zeebe.clustering.management.ExporterPositionsEncoder.PositionsEncoder;
import io.camunda.zeebe.broker.protocol.ExporterPositionsDecoder;
import io.camunda.zeebe.broker.protocol.ExporterPositionsEncoder;
import io.camunda.zeebe.broker.protocol.ExporterPositionsEncoder.PositionsEncoder;
import io.camunda.zeebe.protocol.impl.encoding.SbeBufferWriterReader;
import io.camunda.zeebe.util.buffer.BufferUtil;
import java.util.HashMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,34 @@
*/
package io.camunda.zeebe.broker.partitioning;

import static java.util.Collections.unmodifiableList;
import static java.util.Objects.requireNonNull;

import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.ActorFutureCollector;
import java.util.List;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;

final class MultiPartitionAdminAccess implements PartitionAdminAccess {
private final ConcurrencyControl concurrencyControl;
private final List<? extends PartitionAdminAccess> partitions;
private final Map<Integer, ? extends PartitionAdminAccess> partitions;

MultiPartitionAdminAccess(
final ConcurrencyControl concurrencyControl,
final List<? extends PartitionAdminAccess> partitions) {
final Map<Integer, ? extends PartitionAdminAccess> partitions) {
this.concurrencyControl = requireNonNull(concurrencyControl);
this.partitions = unmodifiableList(requireNonNull(partitions));
this.partitions = Collections.unmodifiableMap(requireNonNull(partitions));
}

/**
* @return A scoped-down admin access that that only act's on the given partition, not all
* partitions
*/
@Override
public Optional<PartitionAdminAccess> forPartition(final int partitionId) {
return Optional.ofNullable(partitions.get(partitionId));
}

@Override
Expand Down Expand Up @@ -56,7 +66,7 @@ private ActorFuture<Void> callOnEachPartition(
final Function<PartitionAdminAccess, ActorFuture<Void>> functionToCall) {
final ActorFuture<Void> response = concurrencyControl.createFuture();
final var aggregatedResult =
partitions.stream()
partitions.values().stream()
.map(functionToCall)
.collect(new ActorFutureCollector<>(concurrencyControl));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,18 @@
import io.camunda.zeebe.broker.Loggers;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import java.util.Optional;
import org.slf4j.Logger;

public final class NoOpPartitionAdminAccess implements PartitionAdminAccess {

private static final Logger LOG = Loggers.SYSTEM_LOGGER;

@Override
public Optional<PartitionAdminAccess> forPartition(final int partitionId) {
return Optional.empty();
}

@Override
public ActorFuture<Void> takeSnapshot() {
logCall();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@
package io.camunda.zeebe.broker.partitioning;

import io.camunda.zeebe.scheduler.future.ActorFuture;
import java.util.Optional;

public interface PartitionAdminAccess {
Optional<PartitionAdminAccess> forPartition(int partitionId);

ActorFuture<Void> takeSnapshot();

ActorFuture<Void> pauseExporting();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,12 @@ public ManagedPartitionGroup getPartitionGroup() {
}

public PartitionAdminAccess createAdminAccess(final ConcurrencyControl concurrencyControl) {
final var adminAccess =
new MultiPartitionAdminAccess(
concurrencyControl,
partitions.stream()
.map(ZeebePartition::createAdminAccess)
.collect(Collectors.toList()));
return adminAccess;
return new MultiPartitionAdminAccess(
concurrencyControl,
partitions.stream()
.collect(
Collectors.toMap(
ZeebePartition::getPartitionId, ZeebePartition::createAdminAccess)));
}

public CompletableFuture<Void> start() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public ZeebePartition(
}

public PartitionAdminAccess createAdminAccess() {
return new ZeebePartitionAdminAccess(actor, adminControl);
return new ZeebePartitionAdminAccess(actor, getPartitionId(), adminControl);
}

@Override
Expand Down

0 comments on commit 73850d2

Please sign in to comment.