Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
10493: [Backport stable/8.0] fix(raft): handle exceptions on partition server init r=oleschoenburg a=megglos

# Description
Backport of #10450 to `stable/8.0`.

10566: [Backport stable/8.0] fix(helm): rename podSecurityContext to containerSecurityContext r=oleschoenburg a=backport-action

# Description
Backport of #10556 to `stable/8.0`.

relates to camunda/camunda-platform-helm#374

10624: [Backport stable/8.0] fix: take snapshot if nothing was exported since last snapshot r=oleschoenburg a=backport-action

# Description
Backport of #10611 to `stable/8.0`.

relates to #9761

10638: [Backport stable/8.0] test: fix unfinished stubbing of command response writer r=oleschoenburg a=backport-action

# Description
Backport of #10605 to `stable/8.0`.

relates to #10604

Co-authored-by: Meggle (Sebastian Bathke) <sebastian.bathke@camunda.com>
Co-authored-by: Ole Schönburg <ole.schoenburg@gmail.com>
  • Loading branch information
3 people committed Oct 7, 2022
5 parents 2588e98 + 87112c5 + 9c3dab5 + d2a299f + 72a119e commit eb1d67f
Show file tree
Hide file tree
Showing 9 changed files with 236 additions and 116 deletions.
Expand Up @@ -34,7 +34,6 @@
import io.atomix.raft.partition.RaftStorageConfig;
import io.atomix.raft.roles.RaftRole;
import io.atomix.raft.storage.RaftStorage;
import io.atomix.raft.storage.StorageException;
import io.atomix.raft.storage.log.RaftLogReader;
import io.atomix.raft.zeebe.ZeebeLogAppender;
import io.atomix.utils.Managed;
Expand Down Expand Up @@ -113,8 +112,7 @@ public CompletableFuture<RaftPartitionServer> start() {
synchronized (this) {
try {
initServer();

} catch (final StorageException e) {
} catch (final RuntimeException e) {
return Futures.exceptionalFuture(e);
}
}
Expand Down
@@ -0,0 +1,67 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.atomix.raft.partition.impl;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import io.atomix.cluster.ClusterMembershipService;
import io.atomix.cluster.MemberId;
import io.atomix.cluster.messaging.ClusterCommunicationService;
import io.atomix.primitive.partition.PartitionId;
import io.atomix.primitive.partition.PartitionMetadata;
import io.atomix.raft.partition.RaftPartition;
import io.atomix.raft.partition.RaftPartitionConfig;
import io.atomix.raft.partition.RaftPartitionGroupConfig;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.junit.jupiter.api.Test;

public class RaftPartitionServerTest {

private final RaftPartition raftPartition = mock(RaftPartition.class);
private final RaftPartitionGroupConfig partitionGroupConfig =
mock(RaftPartitionGroupConfig.class);

@Test
public void testInitServerRuntimeExceptionReturnsExceptionalFuture() {
// given
final MemberId localMemberId = new MemberId("1");
when(raftPartition.members()).thenReturn(List.of(localMemberId));
when(raftPartition.id())
.thenReturn(PartitionId.from("group", Integer.parseInt(localMemberId.id())));

when(partitionGroupConfig.getPartitionConfig()).thenReturn(new RaftPartitionConfig());

final RaftPartitionServer raftPartitionServer =
new RaftPartitionServer(
raftPartition,
partitionGroupConfig,
localMemberId,
mock(ClusterMembershipService.class),
mock(ClusterCommunicationService.class),
mock(PartitionMetadata.class));

// this is called internally by #initServer which we need to ensure does not prevent
// a completableFuture to be returned on failure
when(partitionGroupConfig.getStorageConfig()).thenThrow(RuntimeException.class);

// when
final CompletableFuture<RaftPartitionServer> raftServerStartFuture =
raftPartitionServer.start();

// then
assertThat(raftServerStartFuture)
.failsWithin(Duration.ZERO)
.withThrowableOfType(ExecutionException.class)
.withCauseInstanceOf(RuntimeException.class);
}
}
4 changes: 2 additions & 2 deletions benchmarks/setup/default/zeebe-values.yaml
Expand Up @@ -20,8 +20,8 @@ zeebe:
# IoThreadCount defines how many threads can be used for the exporting on each broker pod
ioThreadCount: 4

# PodSecurityContext defines the security options the broker and gateway container should be run with
podSecurityContext:
# ContainerSecurityContext defines the security options the Zeebe broker container should be run with
containerSecurityContext:
capabilities:
add: ["NET_ADMIN"]

Expand Down
Expand Up @@ -11,6 +11,7 @@
import io.camunda.zeebe.util.sched.ConcurrencyControl;
import io.camunda.zeebe.util.sched.future.ActorFuture;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

final class PartitionManagerStep extends AbstractBrokerStartupStep {
@Override
Expand Down Expand Up @@ -38,75 +39,69 @@ void startupInternal(
brokerStartupContext.getCommandApiService(),
brokerStartupContext.getExporterRepository());

CompletableFuture.runAsync(
() ->
partitionManager
.start()
.whenComplete(
(ok, error) -> {
if (error != null) {
startupFuture.completeExceptionally(error);
return;
}
CompletableFuture.supplyAsync(partitionManager::start)
.thenCompose(Function.identity())
.whenComplete(
(ok, error) -> {
if (error != null) {
startupFuture.completeExceptionally(error);
return;
}

forwardExceptions(
forwardExceptions(
() ->
concurrencyControl.run(
() ->
concurrencyControl.run(
() ->
forwardExceptions(
() -> {
final var adminApi =
brokerStartupContext.getAdminApiService();
adminApi.injectPartitionManager(partitionManager);
final var adminService =
brokerStartupContext.getBrokerAdminService();
adminService.injectAdminAccess(
partitionManager.createAdminAccess(adminService));
adminService.injectPartitionInfoSource(
partitionManager.getPartitions());
forwardExceptions(
() -> {
final var adminApi = brokerStartupContext.getAdminApiService();
adminApi.injectPartitionManager(partitionManager);
final var adminService =
brokerStartupContext.getBrokerAdminService();
adminService.injectAdminAccess(
partitionManager.createAdminAccess(adminService));
adminService.injectPartitionInfoSource(
partitionManager.getPartitions());

brokerStartupContext.setPartitionManager(
partitionManager);
brokerStartupContext.setPartitionManager(partitionManager);

startupFuture.complete(brokerStartupContext);
},
startupFuture)),
startupFuture);
}));
startupFuture.complete(brokerStartupContext);
},
startupFuture)),
startupFuture);
});
}

@Override
void shutdownInternal(
final BrokerStartupContext brokerShutdownContext,
final ConcurrencyControl concurrencyControl,
final ActorFuture<BrokerStartupContext> shutdownFuture) {
final var partitionManger = brokerShutdownContext.getPartitionManager();
if (partitionManger == null) {
final var partitionManager = brokerShutdownContext.getPartitionManager();
if (partitionManager == null) {
shutdownFuture.complete(null);
return;
}

CompletableFuture.runAsync(
() ->
partitionManger
.stop()
.whenComplete(
(ok, error) -> {
if (error != null) {
shutdownFuture.completeExceptionally(error);
return;
}
forwardExceptions(
CompletableFuture.supplyAsync(partitionManager::stop)
.thenCompose(Function.identity())
.whenComplete(
(ok, error) -> {
if (error != null) {
shutdownFuture.completeExceptionally(error);
return;
}
forwardExceptions(
() ->
concurrencyControl.run(
() ->
concurrencyControl.run(
() ->
forwardExceptions(
() -> {
brokerShutdownContext.setPartitionManager(null);
shutdownFuture.complete(brokerShutdownContext);
},
shutdownFuture)),
shutdownFuture);
}));
forwardExceptions(
() -> {
brokerShutdownContext.setPartitionManager(null);
shutdownFuture.complete(brokerShutdownContext);
},
shutdownFuture)),
shutdownFuture);
});
}
}
Expand Up @@ -11,7 +11,7 @@
* Used when there is no entry at the determined snapshot position while taking a transient
* snapshot.
*/
public class NoEntryAtSnapshotPosition extends RuntimeException {
public class NoEntryAtSnapshotPosition extends Exception {

public NoEntryAtSnapshotPosition(final String message) {
super(message);
Expand Down
Expand Up @@ -14,15 +14,13 @@
import io.camunda.zeebe.db.ZeebeDbFactory;
import io.camunda.zeebe.logstreams.impl.Loggers;
import io.camunda.zeebe.snapshots.ConstructableSnapshotStore;
import io.camunda.zeebe.snapshots.PersistedSnapshot;
import io.camunda.zeebe.snapshots.SnapshotException.StateClosedException;
import io.camunda.zeebe.snapshots.TransientSnapshot;
import io.camunda.zeebe.util.FileUtil;
import io.camunda.zeebe.util.sched.ConcurrencyControl;
import io.camunda.zeebe.util.sched.future.ActorFuture;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Optional;
import java.util.function.ToLongFunction;
import org.slf4j.Logger;

Expand Down Expand Up @@ -144,46 +142,20 @@ private void takeTransientSnapshotInternal(
return;
}

long index = 0;
long term = 0;
long exportedPosition = exporterPositionSupplier.applyAsLong(db);

if (exportedPosition != -1) {

final long snapshotPosition =
determineSnapshotPosition(lowerBoundSnapshotPosition, exportedPosition);
final var optionalIndexed = entrySupplier.getPreviousIndexedEntry(snapshotPosition);

if (optionalIndexed.isEmpty()) {
future.completeExceptionally(
new NoEntryAtSnapshotPosition(
String.format(
"Failed to take snapshot. Expected to find an indexed entry for determined snapshot position %d (processedPosition = %d, exportedPosition=%d), but found no matching indexed entry which contains this position.",
snapshotPosition, lowerBoundSnapshotPosition, exportedPosition)));
return;
}

final var snapshotIndexedEntry = optionalIndexed.get();
index = snapshotIndexedEntry.index();
term = snapshotIndexedEntry.term();
} else {
final Optional<PersistedSnapshot> latestSnapshot =
constructableSnapshotStore.getLatestSnapshot();
exportedPosition = 0;

if (latestSnapshot.isPresent()) {
// re-use index and term from the latest snapshot
// to ensure that the records from there are not
// compacted until they get exported
final PersistedSnapshot persistedSnapshot = latestSnapshot.get();
index = persistedSnapshot.getIndex();
term = persistedSnapshot.getTerm();
} // otherwise index/term remains 0
final NextSnapshotId nextSnapshotId;
try {
nextSnapshotId = tryFindNextSnapshotId(lowerBoundSnapshotPosition);
} catch (final NoEntryAtSnapshotPosition e) {
future.completeExceptionally(e);
return;
}

final var transientSnapshot =
constructableSnapshotStore.newTransientSnapshot(
index, term, lowerBoundSnapshotPosition, exportedPosition);
nextSnapshotId.index,
nextSnapshotId.term,
nextSnapshotId.processedPosition,
nextSnapshotId.exportedPosition);

if (transientSnapshot.isLeft()) {
future.completeExceptionally(transientSnapshot.getLeft());
Expand All @@ -192,6 +164,50 @@ private void takeTransientSnapshotInternal(
}
}

private NextSnapshotId tryFindNextSnapshotId(final long lastProcessedPosition)
throws NoEntryAtSnapshotPosition {
final var exportedPosition = exporterPositionSupplier.applyAsLong(db);
if (exportedPosition == -1) {
final var latestSnapshot = constructableSnapshotStore.getLatestSnapshot();
if (latestSnapshot.isPresent()) {
// re-use index and term from the latest snapshot to ensure that the records from there are
// not compacted until they get exported.
final var persistedSnapshot = latestSnapshot.get();
return new NextSnapshotId(
persistedSnapshot.getIndex(), persistedSnapshot.getTerm(), lastProcessedPosition, 0);
}

return new NextSnapshotId(0, 0, lastProcessedPosition, 0);
}

final var snapshotPosition = Math.min(exportedPosition, lastProcessedPosition);
final var logEntry = entrySupplier.getPreviousIndexedEntry(snapshotPosition);

if (logEntry.isPresent()) {
return new NextSnapshotId(
logEntry.get().index(), logEntry.get().term(), lastProcessedPosition, exportedPosition);
}

// No log entry for snapshot position - try to use the index and term of the last snapshot to
// take new one
final var latestSnapshot = constructableSnapshotStore.getLatestSnapshot();
if (latestSnapshot.isPresent()) {
LOG.warn(
"No log entry for next snapshot position {}, using index and term from previous snapshot",
snapshotPosition);
return new NextSnapshotId(
latestSnapshot.get().getIndex(),
latestSnapshot.get().getTerm(),
lastProcessedPosition,
exportedPosition);
}

throw new NoEntryAtSnapshotPosition(
String.format(
"Failed to take snapshot. Expected to find an indexed entry for determined snapshot position %d (processedPosition = %d, exportedPosition=%d) or previous snapshot, but found neither.",
snapshotPosition, lastProcessedPosition, exportedPosition));
}

@SuppressWarnings("rawtypes")
private void openDb(final ActorFuture<ZeebeDb> future) {
try {
Expand Down Expand Up @@ -247,14 +263,6 @@ private void takeSnapshot(
});
}

private long determineSnapshotPosition(
final long lowerBoundSnapshotPosition, final long exportedPosition) {
final long snapshotPosition = Math.min(exportedPosition, lowerBoundSnapshotPosition);
LOG.trace(
"Based on lowest exporter position '{}' and last processed position '{}', determined '{}' as snapshot position.",
exportedPosition,
lowerBoundSnapshotPosition,
snapshotPosition);
return snapshotPosition;
}
private record NextSnapshotId(
long index, long term, long processedPosition, long exportedPosition) {}
}

0 comments on commit eb1d67f

Please sign in to comment.