Skip to content

Commit

Permalink
merge: #9943
Browse files Browse the repository at this point in the history
9943: Notify checkpoint listeners after init and replay r=deepthidevaki a=deepthidevaki

## Description

The listeners must be upddated with the current checkpointId after init and replay. Otherwise if there are no new checkpoint created, after a restart or failover the listeners cannot know about the latest checkpoint. It is important for `InterPartitionCommandSencer` and `InterPartitionCommandReceiver` to keep track of the latest checkpointId to guarantee that the checkpoints are consistent.

## Related issues

closes #9916 



Co-authored-by: Deepthi Devaki Akkoorath <deepthidevaki@gmail.com>
  • Loading branch information
zeebe-bors-camunda[bot] and deepthidevaki committed Aug 2, 2022
2 parents d678d46 + afa7f09 commit 4be7a04
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ public interface CheckpointListener {
/**
* Called when ever a new checkpoint is created.
*
* <p>Will be called immediately after CHECKPOINT:CREATE record is processed if it results in a
* new checkpoint.
* <p>This will be called
* <li>When the processor is initialized with the latest checkpoint
* <li>When CHECKPOINT:CREATE record is processed and if it results in a new checkpoint.
* <li>When CHECKPOINT:CREATED record is replayed
* <li>If there is a valid checkpoint, when the listener is registered
*/
void onNewCheckpointCreated(long checkpointId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,26 @@
*/
package io.camunda.zeebe.backup.processing;

import io.camunda.zeebe.backup.api.CheckpointListener;
import io.camunda.zeebe.backup.processing.state.CheckpointState;
import io.camunda.zeebe.protocol.impl.record.value.management.CheckpointRecord;
import java.util.Set;

public final class CheckpointCreatedEventApplier {

final CheckpointState checkpointState;
private final CheckpointState checkpointState;
private final Set<CheckpointListener> checkpointListeners;

public CheckpointCreatedEventApplier(final CheckpointState checkpointState) {
public CheckpointCreatedEventApplier(
final CheckpointState checkpointState, final Set<CheckpointListener> checkpointListeners) {
this.checkpointState = checkpointState;
this.checkpointListeners = checkpointListeners;
}

public void apply(final CheckpointRecord checkpointRecord) {
checkpointState.setCheckpointInfo(
checkpointRecord.getCheckpointId(), checkpointRecord.getCheckpointPosition());
checkpointListeners.forEach(
listener -> listener.onNewCheckpointCreated(checkpointState.getCheckpointId()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,20 @@
package io.camunda.zeebe.backup.processing;

import io.camunda.zeebe.backup.api.BackupManager;
import io.camunda.zeebe.backup.api.CheckpointListener;
import io.camunda.zeebe.backup.processing.state.CheckpointState;
import io.camunda.zeebe.backup.processing.state.DbCheckpointState;
import io.camunda.zeebe.engine.api.ProcessingResult;
import io.camunda.zeebe.engine.api.ProcessingResultBuilder;
import io.camunda.zeebe.engine.api.ProcessingScheduleService;
import io.camunda.zeebe.engine.api.RecordProcessor;
import io.camunda.zeebe.engine.api.TypedRecord;
import io.camunda.zeebe.protocol.impl.record.value.management.CheckpointRecord;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.management.CheckpointIntent;
import java.time.Duration;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -29,19 +34,33 @@ public final class CheckpointRecordsProcessor implements RecordProcessor<Context
private CheckpointCreateProcessor checkpointCreateProcessor;
private CheckpointCreatedEventApplier checkpointCreatedEventApplier;

// Can be accessed concurrently by other threads to add new listeners. Hence we have to use a
// thread safe collection
private final Set<CheckpointListener> checkpointListeners = new CopyOnWriteArraySet<>();

private DbCheckpointState checkpointState;
private ProcessingScheduleService executor;

public CheckpointRecordsProcessor(final BackupManager backupManager) {
this.backupManager = backupManager;
}

@Override
public void init(final Context recordProcessorContext) {
final var checkpointState =
executor = recordProcessorContext.executor();
checkpointState =
new DbCheckpointState(
recordProcessorContext.zeebeDb(), recordProcessorContext.transactionContext());

checkpointCreateProcessor =
new CheckpointCreateProcessor(checkpointState, backupManager, Set.of());
checkpointCreatedEventApplier = new CheckpointCreatedEventApplier(checkpointState);
new CheckpointCreateProcessor(checkpointState, backupManager, checkpointListeners);
checkpointCreatedEventApplier =
new CheckpointCreatedEventApplier(checkpointState, checkpointListeners);

if (checkpointState.getCheckpointId() != CheckpointState.NO_CHECKPOINT) {
checkpointListeners.forEach(
listener -> listener.onNewCheckpointCreated(checkpointState.getCheckpointId()));
}
}

@Override
Expand Down Expand Up @@ -80,4 +99,25 @@ public ProcessingResult onProcessingError(
// from making progress.
throw new RuntimeException(processingException);
}

/**
* Registers a listener. If a checkpoint exists, then the listener will be immediately notified
* with the current checkpointId.
*
* @param checkpointListener
*/
public void addCheckpointListener(final CheckpointListener checkpointListener) {
checkpointListeners.add(checkpointListener);
// Can read the checkpoint only after init() is called.
if (executor != null) {
executor.runDelayed(
Duration.ZERO,
() -> {
final var checkpointId = checkpointState.getCheckpointId();
if (checkpointId != CheckpointState.NO_CHECKPOINT) {
checkpointListener.onNewCheckpointCreated(checkpointState.getCheckpointId());
}
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@

import io.camunda.zeebe.db.TransactionContext;
import io.camunda.zeebe.db.ZeebeDb;
import io.camunda.zeebe.engine.api.ProcessingScheduleService;

/**
* There is a good chance that we will get rid of this context, and use a "ProcessingContext"
* defined by the StreamProcessor. *
*/
public record Context(ZeebeDb zeebeDb, TransactionContext transactionContext) {}
public record Context(
ZeebeDb zeebeDb, TransactionContext transactionContext, ProcessingScheduleService executor) {}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
package io.camunda.zeebe.backup.processing;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
Expand All @@ -23,10 +25,12 @@
import io.camunda.zeebe.db.impl.rocksdb.RocksDbConfiguration;
import io.camunda.zeebe.db.impl.rocksdb.ZeebeRocksDbFactory;
import io.camunda.zeebe.engine.api.ProcessingResultBuilder;
import io.camunda.zeebe.engine.api.ProcessingScheduleService;
import io.camunda.zeebe.protocol.impl.record.value.management.CheckpointRecord;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.intent.management.CheckpointIntent;
import java.nio.file.Path;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -35,15 +39,13 @@
final class CheckpointRecordsProcessorTest {

@TempDir Path database;
final BackupManager backupManager = mock(BackupManager.class);
private final ProcessingScheduleService executor = mock(ProcessingScheduleService.class);

CheckpointRecordsProcessor processor;

ProcessingResultBuilder resultBuilder;

private CheckpointRecordsProcessor processor;
private ProcessingResultBuilder resultBuilder;
// Used for verifying state in the tests
CheckpointState state;

final BackupManager backupManager = mock(BackupManager.class);
private CheckpointState state;
private ZeebeDb zeebedb;

@BeforeEach
Expand All @@ -52,7 +54,7 @@ void setup() {
new ZeebeRocksDbFactory<>(
new RocksDbConfiguration(), new ConsistencyChecksSettings(true, true))
.createDb(database.toFile());
final var context = new Context(zeebedb, zeebedb.createContext());
final var context = new Context(zeebedb, zeebedb.createContext(), executor);

resultBuilder = new MockProcessingResultBuilder();
processor = new CheckpointRecordsProcessor(backupManager);
Expand Down Expand Up @@ -175,4 +177,93 @@ void shouldReplayIgnoredRecord() {
assertThat(state.getCheckpointId()).isEqualTo(checkpointId);
assertThat(state.getCheckpointPosition()).isEqualTo(checkpointPosition);
}

@Test
void shouldNotifyListenerWhenNewCheckpointCreated() {
// given
final AtomicLong checkpoint = new AtomicLong();
processor.addCheckpointListener(checkpoint::set);

final long checkpointId = 2;
final long checkpointPosition = 20;
final CheckpointRecord value = new CheckpointRecord().setCheckpointId(checkpointId);
final MockTypedCheckpointRecord record =
new MockTypedCheckpointRecord(
checkpointPosition, 0, CheckpointIntent.CREATE, RecordType.COMMAND, value);

// when
processor.process(record, resultBuilder);

// then
assertThat(checkpoint).hasValue(checkpointId);
}

@Test
void shouldNotifyListenerWhenReplayed() {
// given
final AtomicLong checkpoint = new AtomicLong();
processor.addCheckpointListener(checkpoint::set);

final long checkpointId = 3;
final long checkpointPosition = 10;
final CheckpointRecord value =
new CheckpointRecord()
.setCheckpointId(checkpointId)
.setCheckpointPosition(checkpointPosition);
final MockTypedCheckpointRecord record =
new MockTypedCheckpointRecord(
checkpointPosition + 1,
checkpointPosition,
CheckpointIntent.CREATED,
RecordType.EVENT,
value);

// when
processor.replay(record);

// then
assertThat(checkpoint).hasValue(checkpointId);
}

@Test
void shouldNotifyListenerOnInit() {
// given
final var context = new Context(zeebedb, zeebedb.createContext(), null);
processor = new CheckpointRecordsProcessor(backupManager);
final long checkpointId = 3;
final long checkpointPosition = 30;
state.setCheckpointInfo(checkpointId, checkpointPosition);

// when
final AtomicLong checkpoint = new AtomicLong();
processor.addCheckpointListener(checkpoint::set);
processor.init(context);

// then
assertThat(checkpoint).hasValue(checkpointId);
}

@Test
void shouldNotifyWhenListenerIsRegistered() {
// given
final long checkpointId = 3;
final long checkpointPosition = 30;
state.setCheckpointInfo(checkpointId, checkpointPosition);

doAnswer(
invocation -> {
final Runnable callback = (Runnable) invocation.getArguments()[1];
callback.run();
return null;
})
.when(executor)
.runDelayed(any(), any(Runnable.class));

// when
final AtomicLong checkpoint = new AtomicLong();
processor.addCheckpointListener(checkpoint::set);

// then
assertThat(checkpoint).hasValue(checkpointId);
}
}

0 comments on commit 4be7a04

Please sign in to comment.