Skip to content

Commit

Permalink
merge: #9989 #10012
Browse files Browse the repository at this point in the history
9989: Create engine outside r=Zelldon a=Zelldon

## Description

Create the engine outside of the StreamProcessor, which should allow to also inject other RecordProcessors.
Furthermore we can fill the Engine with the needed TypedRecordProcessorFactory, which means we no longer need that in our builders and contexts.

I hope this helps you in make further progress `@deepthidevaki` 


Sidenote:

This also simplifies the StreamProcessor/RecordProcessor tests, since we do not necessarily need to set up so much anymore.
<!-- Please explain the changes you made here. -->

## Related issues

<!-- Which issues are closed by this PR or are related -->

related to #9725



10012: refactor: let `dist` build the actor scheduler for gateway r=oleschoenburg a=oleschoenburg

## Description

This moves the responsibility of creating an actor scheduler for the gateway to the `dist` module. As an added benefit, this allows us to remove some redundant constructors, simplify BrokerClient to never start the actor scheduler and a deprecated method from `BrokerStartupContext`.

## Related issues

<!-- Which issues are closed by this PR or are related -->

relates to #9996 



Co-authored-by: Christopher Zell <zelldon91@googlemail.com>
Co-authored-by: Ole Schönburg <ole.schoenburg@gmail.com>
  • Loading branch information
3 people committed Aug 5, 2022
3 parents bdc2405 + fae722d + bed684b commit c70ef5a
Show file tree
Hide file tree
Showing 25 changed files with 215 additions and 195 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.camunda.zeebe.engine.api.ProcessingResultBuilder;
import io.camunda.zeebe.engine.api.ProcessingScheduleService;
import io.camunda.zeebe.engine.api.RecordProcessor;
import io.camunda.zeebe.engine.api.RecordProcessorContext;
import io.camunda.zeebe.engine.api.TypedRecord;
import io.camunda.zeebe.protocol.impl.record.value.management.CheckpointRecord;
import io.camunda.zeebe.protocol.record.ValueType;
Expand All @@ -26,7 +27,7 @@
import org.slf4j.LoggerFactory;

/** Process and replays records related to Checkpoint. */
public final class CheckpointRecordsProcessor implements RecordProcessor<Context> {
public final class CheckpointRecordsProcessor implements RecordProcessor {

private static final Logger LOG = LoggerFactory.getLogger(CheckpointRecordsProcessor.class);

Expand All @@ -46,11 +47,11 @@ public CheckpointRecordsProcessor(final BackupManager backupManager) {
}

@Override
public void init(final Context recordProcessorContext) {
executor = recordProcessorContext.executor();
public void init(final RecordProcessorContext recordProcessorContext) {
executor = recordProcessorContext.getScheduleService();
checkpointState =
new DbCheckpointState(
recordProcessorContext.zeebeDb(), recordProcessorContext.transactionContext());
recordProcessorContext.getZeebeDb(), recordProcessorContext.getTransactionContext());

checkpointCreateProcessor =
new CheckpointCreateProcessor(checkpointState, backupManager, checkpointListeners);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.camunda.zeebe.protocol.impl.record.value.management.CheckpointRecord;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.intent.management.CheckpointIntent;
import io.camunda.zeebe.streamprocessor.RecordProcessorContextImpl;
import java.nio.file.Path;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.jupiter.api.AfterEach;
Expand All @@ -54,7 +55,7 @@ void setup() {
new ZeebeRocksDbFactory<>(
new RocksDbConfiguration(), new ConsistencyChecksSettings(true, true))
.createDb(database.toFile());
final var context = new Context(zeebedb, zeebedb.createContext(), executor);
final RecordProcessorContextImpl context = createContext(executor, zeebedb);

resultBuilder = new MockProcessingResultBuilder();
processor = new CheckpointRecordsProcessor(backupManager);
Expand All @@ -63,6 +64,12 @@ void setup() {
state = new DbCheckpointState(zeebedb, zeebedb.createContext());
}

private RecordProcessorContextImpl createContext(
final ProcessingScheduleService executor, final ZeebeDb zeebeDb) {
return new RecordProcessorContextImpl(
1, executor, zeebeDb, zeebeDb.createContext(), null, null, null);
}

@AfterEach
void after() throws Exception {
zeebedb.close();
Expand Down Expand Up @@ -228,7 +235,7 @@ void shouldNotifyListenerWhenReplayed() {
@Test
void shouldNotifyListenerOnInit() {
// given
final var context = new Context(zeebedb, zeebedb.createContext(), null);
final RecordProcessorContextImpl context = createContext(null, zeebedb);
processor = new CheckpointRecordsProcessor(backupManager);
final long checkpointId = 3;
final long checkpointPosition = 30;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.camunda.zeebe.broker.transport.adminapi.AdminApiRequestHandler;
import io.camunda.zeebe.broker.transport.commandapi.CommandApiServiceImpl;
import io.camunda.zeebe.protocol.impl.encoding.BrokerInfo;
import io.camunda.zeebe.scheduler.ActorScheduler;
import io.camunda.zeebe.scheduler.ActorSchedulingService;
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.transport.impl.AtomixServerTransport;
Expand All @@ -42,9 +41,6 @@ public interface BrokerStartupContext {

ActorSchedulingService getActorSchedulingService();

@Deprecated // use getActorSchedulingService instead
ActorScheduler getActorScheduler();

ConcurrencyControl getConcurrencyControl();

BrokerHealthCheckService getHealthCheckService();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import io.camunda.zeebe.broker.transport.adminapi.AdminApiRequestHandler;
import io.camunda.zeebe.broker.transport.commandapi.CommandApiServiceImpl;
import io.camunda.zeebe.protocol.impl.encoding.BrokerInfo;
import io.camunda.zeebe.scheduler.ActorScheduler;
import io.camunda.zeebe.scheduler.ActorSchedulingService;
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.transport.impl.AtomixServerTransport;
Expand All @@ -37,7 +36,7 @@ public final class BrokerStartupContextImpl implements BrokerStartupContext {
private final BrokerInfo brokerInfo;
private final BrokerCfg configuration;
private final SpringBrokerBridge springBrokerBridge;
private final ActorScheduler actorScheduler;
private final ActorSchedulingService actorScheduler;
private final BrokerHealthCheckService healthCheckService;
private final ExporterRepository exporterRepository;

Expand All @@ -58,7 +57,7 @@ public BrokerStartupContextImpl(
final BrokerInfo brokerInfo,
final BrokerCfg configuration,
final SpringBrokerBridge springBrokerBridge,
final ActorScheduler actorScheduler,
final ActorSchedulingService actorScheduler,
final BrokerHealthCheckService healthCheckService,
final ExporterRepository exporterRepository,
final List<PartitionListener> additionalPartitionListeners) {
Expand Down Expand Up @@ -92,11 +91,6 @@ public ActorSchedulingService getActorSchedulingService() {
return actorScheduler;
}

@Override
public ActorScheduler getActorScheduler() {
return actorScheduler;
}

@Override
public ConcurrencyControl getConcurrencyControl() {
return concurrencyControl;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ void startupInternal(

final var clusterServices = brokerStartupContext.getClusterServices();

@SuppressWarnings("resource")
final var embeddedGatewayService =
new EmbeddedGatewayService(
brokerStartupContext.getBrokerConfiguration(),
brokerStartupContext.getActorScheduler(),
brokerStartupContext.getActorSchedulingService(),
clusterServices.getMessagingService(),
clusterServices.getMembershipService(),
clusterServices.getEventService());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import io.camunda.zeebe.gateway.impl.broker.BrokerClient;
import io.camunda.zeebe.gateway.impl.broker.BrokerClientImpl;
import io.camunda.zeebe.gateway.impl.configuration.GatewayCfg;
import io.camunda.zeebe.scheduler.ActorScheduler;
import io.camunda.zeebe.scheduler.ActorSchedulingService;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import java.util.function.Function;

Expand All @@ -24,14 +24,14 @@ public final class EmbeddedGatewayService implements AutoCloseable {

public EmbeddedGatewayService(
final BrokerCfg configuration,
final ActorScheduler actorScheduler,
final ActorSchedulingService actorScheduler,
final MessagingService messagingService,
final ClusterMembershipService membershipService,
final ClusterEventService eventService) {
final Function<GatewayCfg, BrokerClient> brokerClientFactory =
cfg ->
new BrokerClientImpl(
cfg, messagingService, membershipService, eventService, actorScheduler, false);
cfg, messagingService, membershipService, eventService, actorScheduler);
gateway = new Gateway(configuration.getGateway(), brokerClientFactory, actorScheduler);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ public Consumer<TypedRecord<?>> getOnProcessedListener() {
}

@Override
public TypedRecordProcessorFactory getStreamProcessorFactory() {
public TypedRecordProcessorFactory getTypedRecordProcessorFactory() {
return typedRecordProcessorsFactory::createTypedStreamProcessor;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public interface PartitionTransitionContext extends PartitionContext {

Consumer<TypedRecord<?>> getOnProcessedListener();

TypedRecordProcessorFactory getStreamProcessorFactory();
TypedRecordProcessorFactory getTypedRecordProcessorFactory();

ConcurrencyControl getConcurrencyControl();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.atomix.raft.RaftServer.Role;
import io.camunda.zeebe.broker.system.partitions.PartitionTransitionContext;
import io.camunda.zeebe.broker.system.partitions.PartitionTransitionStep;
import io.camunda.zeebe.engine.Engine;
import io.camunda.zeebe.engine.state.appliers.EventAppliers;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.streamprocessor.StreamProcessor;
Expand Down Expand Up @@ -127,11 +128,11 @@ private static StreamProcessor createStreamProcessor(
.logStream(context.getLogStream())
.actorSchedulingService(context.getActorSchedulingService())
.zeebeDb(context.getZeebeDb())
.recordProcessor(new Engine(context.getTypedRecordProcessorFactory()))
.eventApplierFactory(EventAppliers::new)
.nodeId(context.getNodeId())
.commandResponseWriter(context.getCommandResponseWriter())
.listener(processedCommand -> context.getOnProcessedListener().accept(processedCommand))
.streamProcessorFactory(context.getStreamProcessorFactory())
.streamProcessorMode(streamProcessorMode)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class TestPartitionTransitionContext implements PartitionTransitionContex
private Role currentRole;
private long currentTerm;
private HealthMonitor healthMonitor;
private TypedRecordProcessorFactory streamProcessorFactory;
private TypedRecordProcessorFactory typedRecordProcessorFactory;
private ExporterDirector exporterDirector;
private LogStream logStream;
private StreamProcessor streamProcessor;
Expand Down Expand Up @@ -264,12 +264,8 @@ public Consumer<TypedRecord<?>> getOnProcessedListener() {
}

@Override
public TypedRecordProcessorFactory getStreamProcessorFactory() {
return streamProcessorFactory;
}

public void setStreamProcessorFactory(final TypedRecordProcessorFactory streamProcessorFactory) {
this.streamProcessorFactory = streamProcessorFactory;
public TypedRecordProcessorFactory getTypedRecordProcessorFactory() {
return typedRecordProcessorFactory;
}

@Override
Expand All @@ -282,6 +278,11 @@ public void setConcurrencyControl(final ConcurrencyControl concurrencyControl) {
this.concurrencyControl = concurrencyControl;
}

public void setTypedRecordProcessorFactory(
final TypedRecordProcessorFactory typedRecordProcessorFactory) {
this.typedRecordProcessorFactory = typedRecordProcessorFactory;
}

public void setRaftPartition(final RaftPartition raftPartition) {
this.raftPartition = raftPartition;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.gateway;

import io.camunda.zeebe.gateway.impl.configuration.GatewayCfg;
import io.camunda.zeebe.scheduler.ActorScheduler;
import io.camunda.zeebe.shared.ActorClockConfiguration;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
final class ActorSchedulerComponent {

private final GatewayCfg config;
private final ActorClockConfiguration clockConfiguration;

@Autowired
public ActorSchedulerComponent(
final GatewayCfg config, final ActorClockConfiguration clockConfiguration) {
this.config = config;
this.clockConfiguration = clockConfiguration;
}

@Bean("actorScheduler")
ActorScheduler createActorSchedulingService() {
return ActorScheduler.newActorScheduler()
.setCpuBoundActorThreadCount(config.getThreads().getManagementThreads())
.setIoBoundActorThreadCount(0)
.setSchedulerName("gateway-scheduler")
.setActorClock(clockConfiguration.getClock())
.build();
}
}
21 changes: 4 additions & 17 deletions dist/src/main/java/io/camunda/zeebe/gateway/StandaloneGateway.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import io.camunda.zeebe.gateway.impl.broker.cluster.BrokerTopologyManager;
import io.camunda.zeebe.gateway.impl.configuration.GatewayCfg;
import io.camunda.zeebe.scheduler.ActorScheduler;
import io.camunda.zeebe.shared.ActorClockConfiguration;
import io.camunda.zeebe.shared.Profile;
import io.camunda.zeebe.util.CloseableSilently;
import io.camunda.zeebe.util.VersionUtil;
Expand Down Expand Up @@ -51,21 +50,20 @@ public class StandaloneGateway

private final GatewayCfg configuration;
private final SpringGatewayBridge springGatewayBridge;
private final ActorClockConfiguration clockConfig;
private final ActorScheduler actorScheduler;
private final AtomixCluster atomixCluster;

private Gateway gateway;
private ActorScheduler actorScheduler;

@Autowired
public StandaloneGateway(
final GatewayCfg configuration,
final SpringGatewayBridge springGatewayBridge,
final ActorClockConfiguration clockConfig,
final ActorScheduler actorScheduler,
final AtomixCluster atomixCluster) {
this.configuration = configuration;
this.springGatewayBridge = springGatewayBridge;
this.clockConfig = clockConfig;
this.actorScheduler = actorScheduler;
this.atomixCluster = atomixCluster;
}

Expand Down Expand Up @@ -93,7 +91,6 @@ public void run(final String... args) throws Exception {
LOG.info("Starting standalone gateway with configuration {}", configuration.toJson());
}

actorScheduler = createActorScheduler(configuration);
gateway = new Gateway(configuration, this::createBrokerClient, actorScheduler);

springGatewayBridge.registerBrokerClientSupplier(gateway::getBrokerClient);
Expand Down Expand Up @@ -149,16 +146,6 @@ private BrokerClient createBrokerClient(final GatewayCfg config) {
atomixCluster.getMessagingService(),
atomixCluster.getMembershipService(),
atomixCluster.getEventService(),
actorScheduler,
false);
}

private ActorScheduler createActorScheduler(final GatewayCfg config) {
return ActorScheduler.newActorScheduler()
.setCpuBoundActorThreadCount(config.getThreads().getManagementThreads())
.setIoBoundActorThreadCount(0)
.setSchedulerName("gateway-scheduler")
.setActorClock(clockConfig.getClock())
.build();
actorScheduler);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,13 @@ private GatewayCfg createGatewayCfg() {

private StandaloneGateway buildGateway(final GatewayCfg gatewayCfg) {
final AtomixComponent clusterComponent = new AtomixComponent(gatewayCfg);
final ActorSchedulerComponent actorSchedulerComponent =
new ActorSchedulerComponent(gatewayCfg, new ActorClockConfiguration(false));

return new StandaloneGateway(
gatewayCfg,
new SpringGatewayBridge(),
new ActorClockConfiguration(false),
actorSchedulerComponent.createActorSchedulingService(),
clusterComponent.createAtomixCluster());
}
}

0 comments on commit c70ef5a

Please sign in to comment.