Skip to content

Commit

Permalink
merge: #10398
Browse files Browse the repository at this point in the history
10398: Remove multi-partition support from StreamPlatform test extension r=Zelldon a=Zelldon


## Description
Remove the multi partition support in the StreamPlatform, and related extension. This is not necessary for testing the StreamPlatform code

<!-- Please explain the changes you made here. -->

## Related issues

<!-- Which issues are closed by this PR or are related -->

related to #9727



Co-authored-by: Christopher Zell <zelldon91@googlemail.com>
  • Loading branch information
zeebe-bors-camunda[bot] and Zelldon committed Sep 20, 2022
2 parents 90ea644 + 8d993cb commit 4f3222e
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ public void shouldScheduleOnFixedRate() {
void shouldPreserveOrderingOfWritesEvenWithRetries() {
// given
final var dummyProcessorSpy = spy(dummyProcessor);
final var syncLogStream = spy(streamPlatform.getLogStream("stream-1"));
final var syncLogStream = spy(streamPlatform.getLogStream());
final var logStream = spy(syncLogStream.getAsyncLogStream());
final var batchWriter = spy(syncLogStream.newLogStreamBatchWriter());

Expand Down Expand Up @@ -296,9 +296,8 @@ void shouldPreserveOrderingOfWritesEvenWithRetries() {
// then
Awaitility.await("until both records are written to the stream")
.atMost(Duration.ofSeconds(10))
.untilAsserted(
() -> assertThat(streamPlatform.events(syncLogStream.getLogName())).hasSize(2));
assertThat(streamPlatform.events(syncLogStream.getLogName()))
.untilAsserted(() -> assertThat(streamPlatform.events()).hasSize(2));
assertThat(streamPlatform.events())
.as("records were written in order of submitted tasks")
.extracting(LoggedEvent::getKey)
.containsExactly(1L, 2L);
Expand Down
158 changes: 34 additions & 124 deletions engine/src/test/java/io/camunda/zeebe/engine/util/StreamPlatform.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import io.camunda.zeebe.logstreams.log.LogStreamBatchWriter;
import io.camunda.zeebe.logstreams.log.LogStreamReader;
import io.camunda.zeebe.logstreams.log.LoggedEvent;
import io.camunda.zeebe.logstreams.storage.LogStorage;
import io.camunda.zeebe.logstreams.util.ListLogStorage;
import io.camunda.zeebe.logstreams.util.SyncLogStream;
import io.camunda.zeebe.logstreams.util.SynchronousLogStream;
Expand All @@ -43,15 +42,12 @@
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.slf4j.Logger;
Expand All @@ -67,8 +63,8 @@ public final class StreamPlatform {
private final List<AutoCloseable> closeables;
private final ActorScheduler actorScheduler;
private final CommandResponseWriter mockCommandResponseWriter;
private final Map<String, LogContext> logContextMap = new HashMap<>();
private final Map<String, ProcessorContext> streamContextMap = new HashMap<>();
private LogContext logContext;
private ProcessorContext processorContext;
private boolean snapshotWasTaken = false;
private final StreamProcessorMode streamProcessorMode = StreamProcessorMode.PROCESSING;
private List<RecordProcessor> recordProcessors;
Expand Down Expand Up @@ -110,42 +106,28 @@ public StreamPlatform(
closeables.add(() -> recordProcessors.clear());
}

public SynchronousLogStream createLogStream(final String name, final int partitionId) {
final var listLogStorage = new ListLogStorage();
return createLogStream(
name,
partitionId,
listLogStorage,
logStream -> listLogStorage.setPositionListener(logStream::setLastWrittenPosition));
}

private SynchronousLogStream createLogStream(
final String name,
final int partitionId,
final LogStorage logStorage,
final Consumer<SyncLogStream> logStreamConsumer) {
public void createLogStream() {
final var logStorage = new ListLogStorage();
final var logStream =
SyncLogStream.builder()
.withLogName(name)
.withLogName(STREAM_NAME + DEFAULT_PARTITION)
.withLogStorage(logStorage)
.withPartitionId(partitionId)
.withPartitionId(DEFAULT_PARTITION)
.withActorSchedulingService(actorScheduler)
.build();

logStreamConsumer.accept(logStream);
logStorage.setPositionListener(logStream::setLastWrittenPosition);

final LogContext logContext = LogContext.createLogContext(logStream);
logContextMap.put(name, logContext);
closeables.add(() -> logContextMap.remove(name).close());
return logStream;
logContext = new LogContext(logStream);
closeables.add(logContext);
}

public SynchronousLogStream getLogStream(final String name) {
return logContextMap.get(name).getLogStream();
public SynchronousLogStream getLogStream() {
return logContext.logStream();
}

public Stream<LoggedEvent> events(final String logName) {
final SynchronousLogStream logStream = getLogStream(logName);
public Stream<LoggedEvent> events() {
final SynchronousLogStream logStream = getLogStream();

final LogStreamReader reader = logStream.newLogStreamReader();
closeables.add(reader);
Expand Down Expand Up @@ -192,14 +174,12 @@ public StreamPlatform withRecordProcessors(final List<RecordProcessor> recordPro
}

public StreamProcessor startStreamProcessor() {
final var logName = getLogName(DEFAULT_PARTITION);
final SynchronousLogStream stream = getLogStream(logName);
final SynchronousLogStream stream = getLogStream();
return buildStreamProcessor(stream, true);
}

public StreamProcessor startStreamProcessorNotAwaitOpening() {
final var logName = getLogName(DEFAULT_PARTITION);
final SynchronousLogStream stream = getLogStream(logName);
final SynchronousLogStream stream = getLogStream();
return buildStreamProcessor(stream, false);
}

Expand All @@ -223,7 +203,6 @@ public void onRecovered(final ReadonlyStreamProcessorContext context) {
} else {
zeebeDb = zeebeDbFactory.createDb(storage.toFile());
}
final String logName = stream.getLogName();

final var builder =
StreamProcessor.builder()
Expand All @@ -250,68 +229,40 @@ public void onRecovered(final ReadonlyStreamProcessorContext context) {
}
openFuture.join(15, TimeUnit.SECONDS);

final ProcessorContext processorContext =
ProcessorContext.createStreamContext(streamProcessor, zeebeDb, storage, snapshot);
streamContextMap.put(logName, processorContext);
closeables.add(() -> streamContextMap.remove(logName).close());
processorContext = new ProcessorContext(streamProcessor, zeebeDb, storage, snapshot);
closeables.add(processorContext);

return streamProcessor;
}

public void pauseProcessing() {
pauseProcessing(getLogName(DEFAULT_PARTITION));
}

// todo remove multi partition support - is not necessary for the StreamProcessor tests
@Deprecated
public void pauseProcessing(final String streamName) {
streamContextMap.get(streamName).streamProcessor.pauseProcessing().join();
LOG.info("Paused processing for stream {}", streamName);
processorContext.streamProcessor.pauseProcessing().join();
LOG.info("Paused processing for processor {}", processorContext.streamProcessor.getName());
}

public void resumeProcessing() {
resumeProcessing(getLogName(DEFAULT_PARTITION));
}
// todo remove multi partition support - is not necessary for the StreamProcessor tests
@Deprecated
public void resumeProcessing(final String streamName) {
streamContextMap.get(streamName).streamProcessor.resumeProcessing();
LOG.info("Resume processing for stream {}", streamName);
processorContext.streamProcessor.resumeProcessing();
LOG.info("Resume processing for processor {}", processorContext.streamProcessor.getName());
}

public void snapshot(final String streamName) {
streamContextMap.get(streamName).snapshot();
public void snapshot() {
processorContext.snapshot();
snapshotWasTaken = true;
LOG.info("Snapshot database for stream {}", streamName);
}

public void closeProcessor(final String streamName) throws Exception {
streamContextMap.remove(streamName).close();
LOG.info("Closed stream {}", streamName);
}

public List<RecordProcessor> getRecordProcessors() {
return recordProcessors;
LOG.info("Snapshot database for processor {}", processorContext.streamProcessor.getName());
}

public RecordProcessor getDefaultRecordProcessor() {
return defaultRecordProcessor;
}

public StreamProcessor getStreamProcessor() {
return getStreamProcessor(getLogName(DEFAULT_PARTITION));
}

public StreamProcessor getStreamProcessor(final String streamName) {
return Optional.ofNullable(streamContextMap.get(streamName))
return Optional.ofNullable(processorContext)
.map(c -> c.streamProcessor)
.orElseThrow(
() -> new NoSuchElementException("No stream processor found with name: " + streamName));
.orElseThrow(() -> new NoSuchElementException("No stream processor found."));
}

public LogStreamBatchWriter setupBatchWriter(
final String logName, final RecordToWrite[] recordToWrites) {
final SynchronousLogStream logStream = getLogStream(logName);
public LogStreamBatchWriter setupBatchWriter(final RecordToWrite[] recordToWrites) {
final SynchronousLogStream logStream = getLogStream();
final LogStreamBatchWriter logStreamBatchWriter = logStream.newLogStreamBatchWriter();
for (final RecordToWrite recordToWrite : recordToWrites) {
logStreamBatchWriter
Expand All @@ -325,12 +276,8 @@ public LogStreamBatchWriter setupBatchWriter(
return logStreamBatchWriter;
}

public static String getLogName(final int partitionId) {
return STREAM_NAME + partitionId;
}

public long writeBatch(final RecordToWrite... recordsToWrite) {
final var batchWriter = setupBatchWriter(getLogName(DEFAULT_PARTITION), recordsToWrite);
final var batchWriter = setupBatchWriter(recordsToWrite);
return writeActor.submit(batchWriter::tryWrite).join();
}

Expand All @@ -341,60 +288,24 @@ public ActorFuture<Long> submit(final Callable<Long> write) {
}
}

private static final class LogContext implements AutoCloseable {
private final SynchronousLogStream logStream;

private LogContext(final SynchronousLogStream logStream) {
this.logStream = logStream;
}

public static LogContext createLogContext(final SyncLogStream logStream) {
return new LogContext(logStream);
}

private record LogContext(SynchronousLogStream logStream) implements AutoCloseable {
@Override
public void close() {
logStream.close();
}

public SynchronousLogStream getLogStream() {
return logStream;
}
}

private static final class ProcessorContext implements AutoCloseable {
private final ZeebeDb zeebeDb;
private final StreamProcessor streamProcessor;
private final Path runtimePath;
private final Path snapshotPath;
private boolean closed = false;

private ProcessorContext(
final StreamProcessor streamProcessor,
final ZeebeDb zeebeDb,
final Path runtimePath,
final Path snapshotPath) {
this.streamProcessor = streamProcessor;
this.zeebeDb = zeebeDb;
this.runtimePath = runtimePath;
this.snapshotPath = snapshotPath;
}

public static ProcessorContext createStreamContext(
final StreamProcessor streamProcessor,
final ZeebeDb zeebeDb,
final Path runtimePath,
final Path snapshotPath) {
return new ProcessorContext(streamProcessor, zeebeDb, runtimePath, snapshotPath);
}
private record ProcessorContext(
StreamProcessor streamProcessor, ZeebeDb zeebeDb, Path runtimePath, Path snapshotPath)
implements AutoCloseable {

public void snapshot() {
zeebeDb.createSnapshot(snapshotPath.toFile());
}

@Override
public void close() throws Exception {
if (closed) {
if (streamProcessor.isClosed()) {
return;
}

Expand All @@ -404,7 +315,6 @@ public void close() throws Exception {
if (runtimePath.toFile().exists()) {
FileUtil.deleteFolder(runtimePath);
}
closed = true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
*/
package io.camunda.zeebe.engine.util;

import static io.camunda.zeebe.engine.util.StreamProcessingComposite.getLogName;
import static org.junit.platform.commons.util.ReflectionUtils.makeAccessible;

import io.camunda.zeebe.engine.state.DefaultZeebeDbFactory;
Expand Down Expand Up @@ -111,16 +110,15 @@ public StreamProcessorTestContext() {

// streams
streamPlatform = new StreamPlatform(tempFolder, closables, actorScheduler, factory);
final var partitionId = 1;
streamPlatform.createLogStream(getLogName(partitionId), partitionId);
streamPlatform.createLogStream();

} catch (final Exception e) {
ExceptionUtils.throwAsUncheckedException(e);
}
}

@Override
public void close() throws Exception {
public void close() {
Collections.reverse(closables);
CloseHelper.quietCloseAll(closables);
closables.clear();
Expand Down

0 comments on commit 4f3222e

Please sign in to comment.