Skip to content

Commit

Permalink
merge: #9624
Browse files Browse the repository at this point in the history
9624: fix(broker): do not log error if follower fails to take snapshot when log is not uptodate r=deepthidevaki a=deepthidevaki

## Description

Previously, this case was logged as error. Now AsyncSnapshotDirector treat it as a special case, log it in debug level.

Alternative options where to prevent taking snapshot in such cases. But it is very difficult to identify this case. All options prevented taking snapshots in scenarios where we want to take snapshot.
1. One option was to return -1 as processed position in ReplayStateMachine when no events after snapshot position is replayed. But this also prevents taking snapshot when there are no new records, but the exporter position increases.
2. Another options was to skip snapshot in StateController, when the determined snapshot position <= position in latest snapshot. This would not fix the bug, because if the exporter position has changed, it will still try to take snapshot and enter the same situation where it cannot find the indexed entry. Other options were attempted, but all failed in one or the other edge case.

So, the easiest solution is to treat it as a special case in AsyncSnapshotDirector.

## Related issues

closes #7911 



Co-authored-by: Deepthi Devaki Akkoorath <deepthidevaki@gmail.com>
  • Loading branch information
zeebe-bors-camunda[bot] and deepthidevaki committed Jun 28, 2022
2 parents b48ee50 + 372367d commit d7b830f
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.broker.system.partitions;

/**
* Used when there is no entry at the determined snapshot position while taking a transient
* snapshot.
*/
public class NoEntryAtSnapshotPosition extends RuntimeException {

public NoEntryAtSnapshotPosition(final String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import io.atomix.raft.RaftCommittedEntryListener;
import io.atomix.raft.storage.log.IndexedRaftLogEntry;
import io.camunda.zeebe.broker.system.partitions.NoEntryAtSnapshotPosition;
import io.camunda.zeebe.broker.system.partitions.StateController;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorMode;
Expand Down Expand Up @@ -50,6 +51,7 @@ public final class AsyncSnapshotDirector extends Actor
private final String processorName;
private final StreamProcessor streamProcessor;
private final String actorName;
private final StreamProcessorMode streamProcessorMode;
private final Set<FailureListener> listeners = new HashSet<>();
private final BooleanSupplier isLastWrittenPositionCommitted;

Expand Down Expand Up @@ -78,6 +80,7 @@ private AsyncSnapshotDirector(
this.snapshotRate = snapshotRate;
this.partitionId = partitionId;
actorName = buildActorName(nodeId, "SnapshotDirector", this.partitionId);
this.streamProcessorMode = streamProcessorMode;
if (streamProcessorMode == StreamProcessorMode.REPLAY) {
isLastWrittenPositionCommitted = () -> true;
} else {
Expand Down Expand Up @@ -254,11 +257,7 @@ private void takeSnapshot() {
transientSnapshotFuture.onComplete(
(transientSnapshot, snapshotTakenError) -> {
if (snapshotTakenError != null) {
if (snapshotTakenError instanceof SnapshotException.SnapshotAlreadyExistsException) {
LOG.debug("Did not take a snapshot. {}", snapshotTakenError.getMessage());
} else {
LOG.error("Failed to take a snapshot for {}", processorName, snapshotTakenError);
}
logSnapshotTakenError(snapshotTakenError);
resetStateOnFailure(snapshotTakenError);
return;
}
Expand All @@ -267,6 +266,20 @@ private void takeSnapshot() {
});
}

private void logSnapshotTakenError(final Throwable snapshotTakenError) {
if (snapshotTakenError instanceof SnapshotException.SnapshotAlreadyExistsException) {
LOG.debug("Did not take a snapshot. {}", snapshotTakenError.getMessage());
} else if (snapshotTakenError instanceof NoEntryAtSnapshotPosition
&& streamProcessorMode == StreamProcessorMode.REPLAY) {
LOG.debug(
"Did not take a snapshot: {}. Most likely this partition has not received the entry yet. Will retry in {}",
snapshotTakenError.getMessage(),
snapshotRate);
} else {
LOG.error("Failed to take a snapshot for {}", processorName, snapshotTakenError);
}
}

private void onTransientSnapshotTaken(final TransientSnapshot transientSnapshot) {

pendingSnapshot = transientSnapshot;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package io.camunda.zeebe.broker.system.partitions.impl;

import io.camunda.zeebe.broker.system.partitions.AtomixRecordEntrySupplier;
import io.camunda.zeebe.broker.system.partitions.NoEntryAtSnapshotPosition;
import io.camunda.zeebe.broker.system.partitions.StateController;
import io.camunda.zeebe.db.ZeebeDb;
import io.camunda.zeebe.db.ZeebeDbFactory;
Expand Down Expand Up @@ -155,7 +156,7 @@ private void takeTransientSnapshotInternal(

if (optionalIndexed.isEmpty()) {
future.completeExceptionally(
new IllegalStateException(
new NoEntryAtSnapshotPosition(
String.format(
"Failed to take snapshot. Expected to find an indexed entry for determined snapshot position %d (processedPosition = %d, exportedPosition=%d), but found no matching indexed entry which contains this position.",
snapshotPosition, lowerBoundSnapshotPosition, exportedPosition)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import io.atomix.raft.storage.log.entry.ApplicationEntry;
import io.camunda.zeebe.broker.system.partitions.AtomixRecordEntrySupplier;
import io.camunda.zeebe.broker.system.partitions.NoEntryAtSnapshotPosition;
import io.camunda.zeebe.broker.system.partitions.TestIndexedRaftLogEntry;
import io.camunda.zeebe.engine.state.DefaultZeebeDbFactory;
import io.camunda.zeebe.scheduler.testing.ActorSchedulerRule;
Expand All @@ -28,6 +30,7 @@
import java.nio.file.StandardOpenOption;
import java.util.Comparator;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import org.agrona.collections.MutableLong;
import org.agrona.concurrent.UnsafeBuffer;
import org.junit.Before;
Expand All @@ -46,6 +49,14 @@ public final class StateControllerImplTest {
private FileBasedSnapshotStore store;
private Path runtimeDirectory;

private final AtomixRecordEntrySupplier indexedRaftLogEntry =
l ->
Optional.of(
new TestIndexedRaftLogEntry(l, 1, new ApplicationEntry(1, 10, new UnsafeBuffer())));
private final AtomixRecordEntrySupplier emptyEntrySupplier = l -> Optional.empty();
private final AtomicReference<AtomixRecordEntrySupplier> atomixRecordEntrySupplier =
new AtomicReference<>(indexedRaftLogEntry);

@Before
public void setup() throws IOException {

Expand All @@ -64,10 +75,7 @@ public void setup() throws IOException {
DefaultZeebeDbFactory.defaultFactory(),
store,
runtimeDirectory,
l ->
Optional.of(
new TestIndexedRaftLogEntry(
l, 1, new ApplicationEntry(1, 10, new UnsafeBuffer()))),
l -> atomixRecordEntrySupplier.get().getPreviousIndexedEntry(l),
db -> exporterPosition.get(),
store);

Expand All @@ -84,6 +92,17 @@ public void shouldNotTakeSnapshotIfDbIsClosed() {
.hasCauseInstanceOf(StateClosedException.class);
}

@Test
public void shouldNotTakeSnapshotIfNoIndexedEntry() {
// given
atomixRecordEntrySupplier.set(emptyEntrySupplier);
snapshotController.recover().join();

// then
assertThatThrownBy(() -> snapshotController.takeTransientSnapshot(1).join())
.hasCauseInstanceOf(NoEntryAtSnapshotPosition.class);
}

@Test
public void shouldTakeTempSnapshotWithExporterPosition() {
// given
Expand Down

0 comments on commit d7b830f

Please sign in to comment.