Skip to content

Commit

Permalink
merge: #10203 #10212
Browse files Browse the repository at this point in the history
10203: Remove legacy writers r=deepthidevaki a=Zelldon

## Description

Removes the usage of the old LegacyWriter interface and implementation. Furthermore, it removes the need of the TypedStreamWriterFactor from the Context and tests.

The StreamProcessorHealthTests have been rewritten to use the new extension and mocked record processor. It felt much easier to write such test I have to say.

<!-- Please explain the changes you made here. -->

## Related issues

<!-- Which issues are closed by this PR or are related -->

closes #9724



10212: Cleanup unused parameters and methods r=oleschoenburg a=deepthidevaki

## Description

- Cleanup unused parameters and methods
- Removed `fail` from `InProgressBackup` interface. It exposes a `close` method, which is enough to cleanup after a failure. 

## Related issues

related #9979 


Co-authored-by: Christopher Zell <zelldon91@googlemail.com>
Co-authored-by: Deepthi Devaki Akkoorath <deepthidevaki@gmail.com>
  • Loading branch information
3 people committed Aug 29, 2022
3 parents 38bb232 + b565ad3 + edffba3 commit bbd0e5d
Show file tree
Hide file tree
Showing 18 changed files with 83 additions and 464 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ public BackupService(
this.snapshotStore = snapshotStore;
this.segmentsDirectory = segmentsDirectory;
this.isSegmentsFile = isSegmentsFile;
internalBackupManager =
new BackupServiceImpl(nodeId, partitionId, numberOfPartitions, backupStore);
internalBackupManager = new BackupServiceImpl(backupStore);
actorName = buildActorName(nodeId, "BackupService", partitionId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,12 @@
import java.util.function.Consumer;

final class BackupServiceImpl {
private final int nodeId;
private final int partitionId;

private final int numberOfPartitions;

private final Set<InProgressBackup> backupsInProgress = new HashSet<>();
private final BackupStore backupStore;
private ConcurrencyControl concurrencyControl;

BackupServiceImpl(
final int nodeId,
final int partitionId,
final int numberOfPartitions,
final BackupStore backupStore) {
this.nodeId = nodeId;
this.partitionId = partitionId;
this.numberOfPartitions = numberOfPartitions;
BackupServiceImpl(final BackupStore backupStore) {

this.backupStore = backupStore;
}

Expand Down Expand Up @@ -109,7 +98,7 @@ private void failBackup(
final ActorFuture<Void> backupSaved,
final Throwable error) {
backupSaved.completeExceptionally(error);
inProgressBackup.fail(error);
backupStore.markFailed(inProgressBackup.id());
closeInProgressBackup(inProgressBackup);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package io.camunda.zeebe.backup.management;

import io.camunda.zeebe.backup.api.Backup;
import io.camunda.zeebe.backup.api.BackupIdentifier;
import io.camunda.zeebe.scheduler.future.ActorFuture;

interface InProgressBackup {
Expand All @@ -16,6 +17,8 @@ interface InProgressBackup {

long checkpointPosition();

BackupIdentifier id();

ActorFuture<Void> findValidSnapshot();

ActorFuture<Void> reserveSnapshot();
Expand All @@ -26,7 +29,5 @@ interface InProgressBackup {

Backup createBackup();

void fail(Throwable error);

void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

final class InProgressBackupImpl implements InProgressBackup {

private static final Logger LOG = LoggerFactory.getLogger(InProgressBackup.class);
private static final Logger LOG = LoggerFactory.getLogger(InProgressBackupImpl.class);

private static final String ERROR_MSG_NO_VALID_SNAPSHOT =
"Cannot find a snapshot that can be included in the backup %d. All available snapshots (%s) have processedPosition or lastFollowupEventPosition > checkpointPosition %d";
Expand Down Expand Up @@ -85,6 +85,11 @@ public long checkpointPosition() {
return checkpointPosition;
}

@Override
public BackupIdentifier id() {
return backupId;
}

@Override
public ActorFuture<Void> findValidSnapshot() {
final ActorFuture<Void> result = concurrencyControl.createFuture();
Expand Down Expand Up @@ -208,11 +213,6 @@ public Backup createBackup() {
return new BackupImpl(backupId, backupDescriptor, snapshotFileSet, segmentsFileSet);
}

@Override
public void fail(final Throwable error) {
// To be implemented
}

@Override
public void close() {
if (snapshotReservation != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class BackupServiceImplTest {

@BeforeEach
void setup() {
backupService = new BackupServiceImpl(0, 1, 1, backupStore);
backupService = new BackupServiceImpl(backupStore);
}

@Test
Expand All @@ -50,6 +50,7 @@ void shouldTakeBackup() {

// then
assertThat(result).succeedsWithin(Duration.ofMillis(100));
verify(backupStore).save(any());
}

@Test
Expand Down Expand Up @@ -97,8 +98,7 @@ void shouldFailBackupWhenNoValidSnapshotFound() {
.failsWithin(Duration.ofMillis(1000))
.withThrowableOfType(ExecutionException.class)
.withMessageContaining("Expected");
verify(inProgressBackup).fail(any());
verify(inProgressBackup).close();
verifyInProgressBackupIsCleanedUpAfterFailure();
}

@Test
Expand All @@ -113,8 +113,7 @@ void shouldFailBackupWhenSnapshotCannotBeReserved() {

// then
assertThat(result).failsWithin(Duration.ofMillis(100));
verify(inProgressBackup).fail(any());
verify(inProgressBackup).close();
verifyInProgressBackupIsCleanedUpAfterFailure();
}

@Test
Expand All @@ -130,8 +129,7 @@ void shouldFailBackupWhenSnapshotFilesCannotBeCollected() {

// then
assertThat(result).failsWithin(Duration.ofMillis(100));
verify(inProgressBackup).fail(any());
verify(inProgressBackup).close();
verifyInProgressBackupIsCleanedUpAfterFailure();
}

@Test
Expand All @@ -144,8 +142,7 @@ void shouldFailBackupWhenSegmentFilesCannotBeCollected() {

// then
assertThat(result).failsWithin(Duration.ofMillis(100));
verify(inProgressBackup).fail(any());
verify(inProgressBackup).close();
verifyInProgressBackupIsCleanedUpAfterFailure();
}

@Test
Expand All @@ -163,8 +160,7 @@ void shouldFailBackupIfStoringFailed() {

// then
assertThat(result).failsWithin(Duration.ofMillis(100));
verify(inProgressBackup).fail(any());
verify(inProgressBackup).close();
verifyInProgressBackupIsCleanedUpAfterFailure();
}

private ActorFuture<Void> failedFuture() {
Expand Down Expand Up @@ -203,4 +199,9 @@ private void mockFindSegmentFiles() {
when(inProgressBackup.findSegmentFiles())
.thenReturn(concurrencyControl.createCompletedFuture());
}

private void verifyInProgressBackupIsCleanedUpAfterFailure() {
verify(backupStore).markFailed(any());
verify(inProgressBackup).close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,10 @@ final class DirectProcessingResultBuilder implements ProcessingResultBuilder {

private final List<PostCommitTask> postCommitTasks = new ArrayList<>();

private final LegacyTypedStreamWriter streamWriter;

private final RecordBatch mutableRecordBatch;
private ProcessingResponseImpl processingResponse;

DirectProcessingResultBuilder(
final StreamProcessorContext context, final RecordBatchSizePredicate predicate) {
streamWriter = context.getLogStreamWriter();
DirectProcessingResultBuilder(final RecordBatchSizePredicate predicate) {
mutableRecordBatch = new RecordBatch(predicate);
}

Expand Down Expand Up @@ -75,7 +71,6 @@ public Either<RuntimeException, ProcessingResultBuilder> appendRecordReturnEithe
String.format("The record value %s is not a UnifiedRecordValue", value));
}

streamWriter.appendRecord(key, type, intent, rejectionType, rejectionReason, value);
return Either.right(this);
}

Expand Down

This file was deleted.

This file was deleted.

0 comments on commit bbd0e5d

Please sign in to comment.