Skip to content

Commit

Permalink
merge: #9635
Browse files Browse the repository at this point in the history
9635: [Backport stable/1.3] fix(broker): do not log error if follower fails to take snapshot when log is not uptodate r=deepthidevaki a=backport-action

# Description
Backport of #9624 to `stable/1.3`.

relates to #7911

Co-authored-by: Deepthi Devaki Akkoorath <deepthidevaki@gmail.com>
  • Loading branch information
zeebe-bors-camunda[bot] and deepthidevaki committed Jun 29, 2022
2 parents 820310c + 57c7762 commit 0870062
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.broker.system.partitions;

/**
* Used when there is no entry at the determined snapshot position while taking a transient
* snapshot.
*/
public class NoEntryAtSnapshotPosition extends RuntimeException {

public NoEntryAtSnapshotPosition(final String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import io.atomix.raft.RaftCommittedEntryListener;
import io.atomix.raft.storage.log.IndexedRaftLogEntry;
import io.camunda.zeebe.broker.system.partitions.NoEntryAtSnapshotPosition;
import io.camunda.zeebe.broker.system.partitions.StateController;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorMode;
Expand Down Expand Up @@ -51,6 +52,7 @@ public final class AsyncSnapshotDirector extends Actor
private final String processorName;
private final StreamProcessor streamProcessor;
private final String actorName;
private final StreamProcessorMode streamProcessorMode;
private final Set<FailureListener> listeners = new HashSet<>();
private final BooleanSupplier isLastWrittenPositionCommitted;

Expand Down Expand Up @@ -79,6 +81,7 @@ private AsyncSnapshotDirector(
this.snapshotRate = snapshotRate;
this.partitionId = partitionId;
actorName = buildActorName(nodeId, "SnapshotDirector", this.partitionId);
this.streamProcessorMode = streamProcessorMode;
if (streamProcessorMode == StreamProcessorMode.REPLAY) {
isLastWrittenPositionCommitted = () -> true;
} else {
Expand Down Expand Up @@ -256,11 +259,7 @@ private void takeSnapshot() {
transientSnapshotFuture.onComplete(
(transientSnapshot, snapshotTakenError) -> {
if (snapshotTakenError != null) {
if (snapshotTakenError instanceof SnapshotException.SnapshotAlreadyExistsException) {
LOG.debug("Did not take a snapshot. {}", snapshotTakenError.getMessage());
} else {
LOG.error("Failed to take a snapshot for {}", processorName, snapshotTakenError);
}
logSnapshotTakenError(snapshotTakenError);
resetStateOnFailure(snapshotTakenError);
return;
}
Expand All @@ -269,6 +268,20 @@ private void takeSnapshot() {
});
}

private void logSnapshotTakenError(final Throwable snapshotTakenError) {
if (snapshotTakenError instanceof SnapshotException.SnapshotAlreadyExistsException) {
LOG.debug("Did not take a snapshot. {}", snapshotTakenError.getMessage());
} else if (snapshotTakenError instanceof NoEntryAtSnapshotPosition
&& streamProcessorMode == StreamProcessorMode.REPLAY) {
LOG.debug(
"Did not take a snapshot: {}. Most likely this partition has not received the entry yet. Will retry in {}",
snapshotTakenError.getMessage(),
snapshotRate);
} else {
LOG.error("Failed to take a snapshot for {}", processorName, snapshotTakenError);
}
}

private void onTransientSnapshotTaken(final TransientSnapshot transientSnapshot) {

pendingSnapshot = transientSnapshot;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package io.camunda.zeebe.broker.system.partitions.impl;

import io.camunda.zeebe.broker.system.partitions.AtomixRecordEntrySupplier;
import io.camunda.zeebe.broker.system.partitions.NoEntryAtSnapshotPosition;
import io.camunda.zeebe.broker.system.partitions.StateController;
import io.camunda.zeebe.db.ZeebeDb;
import io.camunda.zeebe.db.ZeebeDbFactory;
Expand Down Expand Up @@ -155,7 +156,7 @@ private void takeTransientSnapshotInternal(

if (optionalIndexed.isEmpty()) {
future.completeExceptionally(
new IllegalStateException(
new NoEntryAtSnapshotPosition(
String.format(
"Failed to take snapshot. Expected to find an indexed entry for determined snapshot position %d (processedPosition = %d, exportedPosition=%d), but found no matching indexed entry which contains this position.",
snapshotPosition, lowerBoundSnapshotPosition, exportedPosition)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import io.atomix.raft.storage.log.entry.ApplicationEntry;
import io.camunda.zeebe.broker.system.partitions.AtomixRecordEntrySupplier;
import io.camunda.zeebe.broker.system.partitions.NoEntryAtSnapshotPosition;
import io.camunda.zeebe.broker.system.partitions.TestIndexedRaftLogEntry;
import io.camunda.zeebe.db.impl.rocksdb.ZeebeRocksDbFactory;
import io.camunda.zeebe.snapshots.PersistedSnapshot;
Expand All @@ -28,6 +30,7 @@
import java.nio.file.StandardOpenOption;
import java.util.Comparator;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import org.agrona.collections.MutableLong;
import org.agrona.concurrent.UnsafeBuffer;
import org.junit.Before;
Expand All @@ -46,6 +49,14 @@ public final class StateControllerImplTest {
private FileBasedSnapshotStore store;
private Path runtimeDirectory;

private final AtomixRecordEntrySupplier indexedRaftLogEntry =
l ->
Optional.of(
new TestIndexedRaftLogEntry(l, 1, new ApplicationEntry(1, 10, new UnsafeBuffer())));
private final AtomixRecordEntrySupplier emptyEntrySupplier = l -> Optional.empty();
private final AtomicReference<AtomixRecordEntrySupplier> atomixRecordEntrySupplier =
new AtomicReference<>(indexedRaftLogEntry);

@Before
public void setup() throws IOException {

Expand All @@ -64,10 +75,7 @@ public void setup() throws IOException {
ZeebeRocksDbFactory.newFactory(),
store,
runtimeDirectory,
l ->
Optional.of(
new TestIndexedRaftLogEntry(
l, 1, new ApplicationEntry(1, 10, new UnsafeBuffer()))),
l -> atomixRecordEntrySupplier.get().getPreviousIndexedEntry(l),
db -> exporterPosition.get(),
store);

Expand All @@ -84,6 +92,17 @@ public void shouldNotTakeSnapshotIfDbIsClosed() {
.hasCauseInstanceOf(StateClosedException.class);
}

@Test
public void shouldNotTakeSnapshotIfNoIndexedEntry() {
// given
atomixRecordEntrySupplier.set(emptyEntrySupplier);
snapshotController.recover().join();

// then
assertThatThrownBy(() -> snapshotController.takeTransientSnapshot(1).join())
.hasCauseInstanceOf(NoEntryAtSnapshotPosition.class);
}

@Test
public void shouldTakeTempSnapshotWithExporterPosition() {
// given
Expand Down

0 comments on commit 0870062

Please sign in to comment.