Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
10412: Add a standalone app that can be used to restore a broker from a backup r=deepthidevaki a=deepthidevaki

## Description

* Added a new module for restore. 
* Adds a RestoreManager that coordinates restoring of all partitions. It first determines which partitions to restore from based on the given `BrokerCfg`. This configuration must be the same one used by the Broker when it is started after the restore. Because of this, restore module depens on broker module. But since it needs access to the configuration and `RaftPartitionGroupFactory`, there is no other way. There is scope of splitting broker to take these shared configs out of it.
* Added a `RestoreApp` in dist. My plan was to add it in restore module. But it needs access to BrokerConfig, which is built in dist. So the app is added to dist module. Alternative is to copy components `BrokerConfiguration` and `WorkerDirectory` to restore module. 
* The backup id to restore from is passed as a command line argument `--backupId=x`. This is parsed by spring boot. I decided to keep it simple for now, since we are planning to release it as experimental feature anyway. If we decide to support this app long term, then I suggest to switch to something like picocli that we use in zdb.

A basic test to verify a broker can be restored is added. More scenarios should be added later.

PS:- App is not added to the distribution yet.

## Related issues

related #10263 



10440: Extract KeyGenerator from ZeebeState r=oleschoenburg a=Zelldon

## Description

As preparation for the StreamProcessor - Engine module split we have to move out the Keygenerator out from the zeebeState, since this part of the stream processing (key is part of RecordMetadata). We have to reset the keys during replay and for that we need access to the key generator. 

The Key generator is now created in the StreamProcessor and passed into the ZeebeState (as dependency), this allowed to make minimal changes. At the start I removed completely the KEygenerator from the ZeebeState, but then we would need to adjust a LOT of setup code, which doesn't felt necessary tbh since it just about where this generator is created.

The change also allowed us to remove the ZeebeState from the StreamProcessorContext 💪 

`@npepinpe` `@saig0` who ever has time to review this. I think `@npepinpe` is currently a bit busy with other stuff related to backup and reviews, maybe you have time to review it `@saig0` 

<!-- Please explain the changes you made here. -->

## Related issues

<!-- Which issues are closed by this PR or are related -->

related to #10130 
related to #9727



10483: deps(maven): bump checkstyle from 10.3.3 to 10.3.4 r=github-actions[bot] a=dependabot[bot]

Bumps [checkstyle](https://github.com/checkstyle/checkstyle) from 10.3.3 to 10.3.4.
<details>
<summary>Release notes</summary>
<p><em>Sourced from <a href="https://github.com/checkstyle/checkstyle/releases">checkstyle's releases</a>.</em></p>
<blockquote>
<h2>checkstyle-10.3.4</h2>
<p><a href="https://checkstyle.org/releasenotes.html#Release_10.3.4">https://checkstyle.org/releasenotes.html#Release_10.3.4</a></p>
</blockquote>
</details>
<details>
<summary>Commits</summary>
<ul>
<li><a href="https://github.com/checkstyle/checkstyle/commit/6de3b9f1feb5c80450af2ad646f82ba34dc250bb"><code>6de3b9f</code></a> [maven-release-plugin] prepare release checkstyle-10.3.4</li>
<li><a href="https://github.com/checkstyle/checkstyle/commit/a4497de578d5a72b386e7ca88676da190584eece"><code>a4497de</code></a> doc: release notes 10.3.4</li>
<li><a href="https://github.com/checkstyle/checkstyle/commit/32e8d374e67e5ef5c19a688064bb2a77b5947651"><code>32e8d37</code></a> Issue <a href="https://github-redirect.dependabot.com/checkstyle/checkstyle/issues/12145">#12145</a>: corrected tokens so all are required</li>
<li><a href="https://github.com/checkstyle/checkstyle/commit/96e3e05a526bca3f7a157ca929666530eb0b089f"><code>96e3e05</code></a> dependency: bump pitest-accelerator-junit5 from 1.0.1 to 1.0.2</li>
<li><a href="https://github.com/checkstyle/checkstyle/commit/37842d2f291b138b2e9d44b33350add3f9562f34"><code>37842d2</code></a> Issue <a href="https://github-redirect.dependabot.com/checkstyle/checkstyle/issues/3955">#3955</a>: corrected tokens so all are required</li>
<li><a href="https://github.com/checkstyle/checkstyle/commit/38ee347a49376e80858c969f742e6461dfe7d731"><code>38ee347</code></a> minor: remove unnecessary checkstyle versions to diff.groovy</li>
<li><a href="https://github.com/checkstyle/checkstyle/commit/318770dc1343e9be90b8304c275d9adaf4ad8d45"><code>318770d</code></a> dependency: bump slf4j-simple from 2.0.1 to 2.0.2</li>
<li><a href="https://github.com/checkstyle/checkstyle/commit/119453600ff9f6accc247f748df8c0dd6c65a2af"><code>1194536</code></a> dependency: bump junit.version from 5.9.0 to 5.9.1</li>
<li><a href="https://github.com/checkstyle/checkstyle/commit/5a56c16882b68e0aaedf3a2a29737f412bbd66be"><code>5a56c16</code></a> Issue <a href="https://github-redirect.dependabot.com/checkstyle/checkstyle/issues/12132">#12132</a>: Fix ArrayIndexOutOfBoundsException in pitest-survival-check-xml...</li>
<li><a href="https://github.com/checkstyle/checkstyle/commit/701bd65dec286ab75fed35b99a580fc77c8ea03f"><code>701bd65</code></a> Issue <a href="https://github-redirect.dependabot.com/checkstyle/checkstyle/issues/12210">#12210</a>: Add method to ignore unstable checker framework violations</li>
<li>Additional commits viewable in <a href="https://github.com/checkstyle/checkstyle/compare/checkstyle-10.3.3...checkstyle-10.3.4">compare view</a></li>
</ul>
</details>
<br />


[![Dependabot compatibility score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=com.puppycrawl.tools:checkstyle&package-manager=maven&previous-version=10.3.3&new-version=10.3.4)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores)

Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting ``@dependabot` rebase`.

[//]: # (dependabot-automerge-start)
[//]: # (dependabot-automerge-end)

---

<details>
<summary>Dependabot commands and options</summary>
<br />

You can trigger Dependabot actions by commenting on this PR:
- ``@dependabot` rebase` will rebase this PR
- ``@dependabot` recreate` will recreate this PR, overwriting any edits that have been made to it
- ``@dependabot` merge` will merge this PR after your CI passes on it
- ``@dependabot` squash and merge` will squash and merge this PR after your CI passes on it
- ``@dependabot` cancel merge` will cancel a previously requested merge and block automerging
- ``@dependabot` reopen` will reopen this PR if it is closed
- ``@dependabot` close` will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually
- ``@dependabot` ignore this major version` will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself)
- ``@dependabot` ignore this minor version` will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself)
- ``@dependabot` ignore this dependency` will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)


</details>

10485: ci(go): use self hosted runner r=megglos a=megglos

## Description

<!-- Please explain the changes you made here. -->

## Related issues

<!-- Which issues are closed by this PR or are related -->

relates #10370



Co-authored-by: Deepthi Devaki Akkoorath <deepthidevaki@gmail.com>
Co-authored-by: Christopher Zell <zelldon91@googlemail.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Meggle (Sebastian Bathke) <sebastian.bathke@camunda.com>
  • Loading branch information
5 people committed Sep 26, 2022
5 parents 32a08ee + f80cb8d + 34bc13b + 3998bf0 + ded9a08 commit db48127
Show file tree
Hide file tree
Showing 36 changed files with 423 additions and 102 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ jobs:
name: Property Tests
go-client:
name: Go client tests
runs-on: ubuntu-latest
runs-on: "n1-standard-8-netssd-preempt"
timeout-minutes: 20
steps:
- uses: actions/checkout@v3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,7 @@ public CompletableFuture<Void> snapshot() {
}

/** Opens the partition. */
CompletableFuture<Partition> open(
final PartitionMetadata metadata, final PartitionManagementService managementService) {
partitionMetadata = metadata;
CompletableFuture<Partition> open(final PartitionManagementService managementService) {
if (partitionMetadata
.members()
.contains(managementService.getMembershipService().getLocalMember().id())) {
Expand Down Expand Up @@ -251,4 +249,12 @@ private boolean shouldStepDown() {
public CompletableFuture<Void> goInactive() {
return server.goInactive();
}

public PartitionMetadata getMetadata() {
return partitionMetadata;
}

public void setMetadata(final PartitionMetadata partitionMetadata) {
this.partitionMetadata = partitionMetadata;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class RaftPartitionGroup implements ManagedPartitionGroup {
private final Map<PartitionId, RaftPartition> partitions = Maps.newConcurrentMap();
private final List<PartitionId> sortedPartitionIds = Lists.newCopyOnWriteArrayList();
private final String snapshotSubject;
private Collection<PartitionMetadata> metadata;
private final Collection<PartitionMetadata> metadata;
private ClusterCommunicationService communicationService;

public RaftPartitionGroup(final RaftPartitionGroupConfig config) {
Expand All @@ -81,6 +81,24 @@ public RaftPartitionGroup(final RaftPartitionGroupConfig config) {
sortedPartitionIds.add(p.id());
});
Collections.sort(sortedPartitionIds);

metadata = determinePartitionDistribution(config);
}

private Collection<PartitionMetadata> determinePartitionDistribution(
final RaftPartitionGroupConfig config) {
final Collection<PartitionMetadata> metadataCollection;
final var members =
config.getMembers().stream().map(MemberId::from).collect(Collectors.toSet());
metadataCollection =
config
.getPartitionConfig()
.getPartitionDistributor()
.distributePartitions(members, sortedPartitionIds, replicationFactor);

metadataCollection.forEach(
partitionMetadata -> partitions.get(partitionMetadata.id()).setMetadata(partitionMetadata));
return metadataCollection;
}

private static Collection<RaftPartition> buildPartitions(final RaftPartitionGroupConfig config) {
Expand Down Expand Up @@ -173,26 +191,16 @@ private CompletableFuture<Void> handleSnapshot() {
public CompletableFuture<ManagedPartitionGroup> join(
final PartitionManagementService managementService) {

// We expect to bootstrap partitions where leadership is equally distributed.
// First member of a PartitionMetadata is the bootstrap leader
final var members =
config.getMembers().stream().map(MemberId::from).collect(Collectors.toSet());
metadata =
config
.getPartitionConfig()
.getPartitionDistributor()
.distributePartitions(members, sortedPartitionIds, replicationFactor);

communicationService = managementService.getMessagingService();
communicationService.<Void, Void>subscribe(snapshotSubject, m -> handleSnapshot());
final List<CompletableFuture<Partition>> futures =
metadata.stream()
.map(
metadata -> {
final RaftPartition partition = partitions.get(metadata.id());
return partition.open(metadata, managementService);
partitionMetadata -> {
final RaftPartition partition = partitions.get(partitionMetadata.id());
return partition.open(managementService);
})
.collect(Collectors.toList());
.toList();
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
.thenApply(
v -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.camunda.zeebe.db.impl.rocksdb.ZeebeRocksDbFactory;
import io.camunda.zeebe.engine.api.ProcessingResultBuilder;
import io.camunda.zeebe.engine.api.ProcessingScheduleService;
import io.camunda.zeebe.engine.state.processing.DbKeyGenerator;
import io.camunda.zeebe.protocol.impl.record.value.management.CheckpointRecord;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.intent.management.CheckpointIntent;
Expand Down Expand Up @@ -66,8 +67,9 @@ void setup() {

private RecordProcessorContextImpl createContext(
final ProcessingScheduleService executor, final ZeebeDb zeebeDb) {
final var context = zeebeDb.createContext();
return new RecordProcessorContextImpl(
1, executor, zeebeDb, zeebeDb.createContext(), null, null);
1, executor, zeebeDb, context, null, null, new DbKeyGenerator(1, zeebeDb, context));
}

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
import java.util.ArrayList;
import java.util.List;

final class RaftPartitionGroupFactory {
public final class RaftPartitionGroupFactory {

RaftPartitionGroup buildRaftPartitionGroup(
public RaftPartitionGroup buildRaftPartitionGroup(
final BrokerCfg configuration, final ReceivableSnapshotStoreFactory snapshotStoreFactory) {

final DataCfg dataConfiguration = configuration.getData();
Expand Down
16 changes: 16 additions & 0 deletions dist/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@
<artifactId>zeebe-gateway</artifactId>
</dependency>

<dependency>
<groupId>io.camunda</groupId>
<artifactId>zeebe-restore</artifactId>
</dependency>

<dependency>
<groupId>io.camunda</groupId>
<artifactId>zeebe-elasticsearch-exporter</artifactId>
Expand All @@ -56,6 +61,16 @@
<artifactId>zeebe-atomix-utils</artifactId>
</dependency>

<dependency>
<groupId>io.camunda</groupId>
<artifactId>zeebe-backup</artifactId>
</dependency>

<dependency>
<groupId>io.camunda</groupId>
<artifactId>zeebe-backup-store-s3</artifactId>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down Expand Up @@ -203,6 +218,7 @@
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
package io.camunda.zeebe.broker;

import io.atomix.cluster.AtomixCluster;
import io.camunda.zeebe.broker.WorkingDirectoryConfiguration.WorkingDirectory;
import io.camunda.zeebe.broker.shared.WorkingDirectoryConfiguration.WorkingDirectory;
import io.camunda.zeebe.broker.system.SystemContext;
import io.camunda.zeebe.broker.system.configuration.BrokerCfg;
import io.camunda.zeebe.gateway.impl.broker.BrokerClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.broker;
package io.camunda.zeebe.broker.shared;

import io.camunda.zeebe.broker.WorkingDirectoryConfiguration.WorkingDirectory;
import io.camunda.zeebe.broker.shared.WorkingDirectoryConfiguration.WorkingDirectory;
import io.camunda.zeebe.broker.system.configuration.BrokerCfg;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.broker;
package io.camunda.zeebe.broker.shared;

import io.camunda.zeebe.broker.Loggers;
import io.camunda.zeebe.shared.Profile;
import java.io.IOException;
import java.io.UncheckedIOException;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.restore;

import io.camunda.zeebe.backup.api.BackupStore;
import io.camunda.zeebe.backup.s3.S3BackupConfig;
import io.camunda.zeebe.backup.s3.S3BackupStore;
import io.camunda.zeebe.broker.system.configuration.BrokerCfg;
import io.camunda.zeebe.broker.system.configuration.backup.BackupStoreCfg;
import io.camunda.zeebe.broker.system.configuration.backup.BackupStoreCfg.BackupStoreType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
final class BackupStoreComponent {

private final BrokerCfg brokerCfg;

@Autowired
BackupStoreComponent(final BrokerCfg brokerCfg) {
this.brokerCfg = brokerCfg;
}

@Bean(destroyMethod = "close")
BackupStore backupStore() {
return buildBackupStore(brokerCfg.getData().getBackup());
}

private BackupStore buildBackupStore(final BackupStoreCfg backupCfg) {
final var store = backupCfg.getStore();
if (store == BackupStoreType.NONE) {
throw new IllegalArgumentException("No backup store configured, cannot restore from backup.");
}

if (store == BackupStoreType.S3) {
final var s3Config = backupCfg.getS3();
final S3BackupConfig storeConfig =
S3BackupConfig.from(
s3Config.getBucketName(),
s3Config.getEndpoint(),
s3Config.getRegion(),
s3Config.getAccessKey(),
s3Config.getSecretKey());
return new S3BackupStore(storeConfig);
} else {
throw new IllegalArgumentException(
"The configured backup store type (%s) is unsupported. Cannot restore from backup"
.formatted(store));
}
}
}
58 changes: 58 additions & 0 deletions dist/src/main/java/io/camunda/zeebe/restore/RestoreApp.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.restore;

import io.camunda.zeebe.backup.api.BackupStore;
import io.camunda.zeebe.broker.system.configuration.BrokerCfg;
import io.camunda.zeebe.shared.Profile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.context.properties.ConfigurationPropertiesScan;

@SpringBootApplication(
scanBasePackages = {"io.camunda.zeebe.restore", "io.camunda.zeebe.broker.shared"})
@ConfigurationPropertiesScan(basePackages = {"io.camunda.zeebe.broker.shared"})
public class RestoreApp implements ApplicationRunner {

private static final Logger LOG = LoggerFactory.getLogger(RestoreApp.class);
private final BrokerCfg configuration;
private final BackupStore backupStore;

@Value("${backupId}")
// Parsed from commandline Eg:-`--backupId=100`
private long backupId;

@Autowired
public RestoreApp(final BrokerCfg configuration, final BackupStore backupStore) {
this.configuration = configuration;
this.backupStore = backupStore;
}

public static void main(final String[] args) {
final var application =
new SpringApplicationBuilder(RestoreApp.class)
.logStartupInfo(true)
.profiles(Profile.RESTORE.getId())
.build();

application.run(args);
}

@Override
public void run(final ApplicationArguments args) {
LOG.info("Starting to restore from backup {}", backupId);
new RestoreManager(configuration, backupStore).restore(backupId).join();
LOG.info("Successfully restored broker from backup {}", backupId);
}
}
1 change: 1 addition & 0 deletions dist/src/main/java/io/camunda/zeebe/shared/Profile.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public enum Profile {
// application specific profiles
BROKER("broker"),
GATEWAY("gateway"),
RESTORE("restore"),

// environment profiles
TEST("test"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import static org.assertj.core.api.Assertions.assertThat;

import io.camunda.zeebe.broker.shared.WorkingDirectoryConfiguration;
import io.camunda.zeebe.shared.Profile;
import java.nio.file.Path;
import java.util.Properties;
Expand Down
3 changes: 2 additions & 1 deletion engine/src/main/java/io/camunda/zeebe/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ public void init(final RecordProcessorContext recordProcessorContext) {
new ZeebeDbState(
recordProcessorContext.getPartitionId(),
recordProcessorContext.getZeebeDb(),
recordProcessorContext.getTransactionContext());
recordProcessorContext.getTransactionContext(),
recordProcessorContext.getKeyGenerator());
eventApplier = recordProcessorContext.getEventApplierFactory().apply(zeebeState);

writers = new Writers(resultBuilderMutex, eventApplier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
*/
package io.camunda.zeebe.engine.api;

import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import io.camunda.zeebe.logstreams.log.LogStream;

public interface ReadonlyStreamProcessorContext {
Expand All @@ -20,11 +19,6 @@ public interface ReadonlyStreamProcessorContext {
@Deprecated // only used in EngineRule; TODO remove this
LogStream getLogStream();

/**
* @return the state, where the data is stored during processing
*/
MutableZeebeState getZeebeState();

/**
* Returns the partition ID
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.camunda.zeebe.db.TransactionContext;
import io.camunda.zeebe.db.ZeebeDb;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.KeyGenerator;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import java.util.List;
import java.util.function.Function;
Expand All @@ -31,4 +32,6 @@ public interface RecordProcessorContext {
void addLifecycleListeners(final List<StreamProcessorLifecycleAware> lifecycleListeners);

InterPartitionCommandSender getPartitionCommandSender();

KeyGenerator getKeyGenerator();
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@

public final class EngineProcessors {

private EngineProcessors() {}

public static TypedRecordProcessors createEngineProcessors(
final TypedRecordProcessorContext typedRecordProcessorContext,
final int partitionsCount,
Expand All @@ -54,7 +56,7 @@ public static TypedRecordProcessors createEngineProcessors(
TypedRecordProcessors.processors(zeebeState.getKeyGenerator(), writers);

// register listener that handles migrations immediately, so it is the first to be called
typedRecordProcessors.withListener(new DbMigrationController());
typedRecordProcessors.withListener(new DbMigrationController(zeebeState));

typedRecordProcessors.withListener(zeebeState);

Expand Down

0 comments on commit db48127

Please sign in to comment.