Skip to content

Commit

Permalink
merge: #10076
Browse files Browse the repository at this point in the history
10076: Engine abstraction remove legacy writer r=Zelldon a=pihme

## Description

- Introduces a new task that can be scheduled by the engine and that can return records to be written
- Replaces all references to `LegacyTypedStreamWriter`outside of stream processor with scheduling of the new tasks
- The overall result is that the engine no longer depends on any writer that writes directly to the stream. Stream processor is now in full control of when and how records shall be written (Heureka!)

## Related issues

related to #9724, #9730, #9725



Co-authored-by: pihme <pihme@users.noreply.github.com>
Co-authored-by: Christopher Zell <zelldon91@googlemail.com>
  • Loading branch information
3 people committed Aug 17, 2022
2 parents 43c17ef + 4e64c68 commit fe164e3
Show file tree
Hide file tree
Showing 46 changed files with 509 additions and 227 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
*/
package io.camunda.zeebe.backup.processing;

import io.camunda.zeebe.engine.api.CommandResponseWriter;
import io.camunda.zeebe.engine.api.PostCommitTask;
import io.camunda.zeebe.engine.api.ProcessingResult;
import io.camunda.zeebe.engine.api.ProcessingResultBuilder;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.CommandResponseWriter;
import io.camunda.zeebe.logstreams.log.LogStreamBatchWriter;
import io.camunda.zeebe.msgpack.UnpackedObject;
import io.camunda.zeebe.protocol.record.RecordType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
import io.camunda.zeebe.broker.transport.partitionapi.InterPartitionCommandReceiverActor;
import io.camunda.zeebe.broker.transport.partitionapi.InterPartitionCommandSenderService;
import io.camunda.zeebe.db.ZeebeDb;
import io.camunda.zeebe.engine.api.CommandResponseWriter;
import io.camunda.zeebe.engine.api.TypedRecord;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessorFactory;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.CommandResponseWriter;
import io.camunda.zeebe.engine.state.QueryService;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.scheduler.ActorControl;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import io.camunda.zeebe.broker.transport.partitionapi.InterPartitionCommandReceiverActor;
import io.camunda.zeebe.broker.transport.partitionapi.InterPartitionCommandSenderService;
import io.camunda.zeebe.db.ZeebeDb;
import io.camunda.zeebe.engine.api.CommandResponseWriter;
import io.camunda.zeebe.engine.api.TypedRecord;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessorFactory;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.CommandResponseWriter;
import io.camunda.zeebe.engine.state.QueryService;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.scheduler.ActorSchedulingService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
*/
package io.camunda.zeebe.broker.transport.commandapi;

import io.camunda.zeebe.engine.api.CommandResponseWriter;
import io.camunda.zeebe.engine.api.TypedRecord;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.CommandResponseWriter;
import java.util.function.Consumer;

public interface CommandApiService {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
import io.camunda.zeebe.broker.transport.backpressure.PartitionAwareRequestLimiter;
import io.camunda.zeebe.broker.transport.backpressure.RequestLimiter;
import io.camunda.zeebe.broker.transport.queryapi.QueryApiRequestHandler;
import io.camunda.zeebe.engine.api.CommandResponseWriter;
import io.camunda.zeebe.engine.api.TypedRecord;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.CommandResponseWriter;
import io.camunda.zeebe.engine.state.QueryService;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.protocol.impl.encoding.BrokerInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import static io.camunda.zeebe.protocol.record.ExecuteCommandResponseEncoder.partitionIdNullValue;
import static io.camunda.zeebe.protocol.record.ExecuteCommandResponseEncoder.valueHeaderLength;

import io.camunda.zeebe.engine.processing.streamprocessor.writers.CommandResponseWriter;
import io.camunda.zeebe.engine.api.CommandResponseWriter;
import io.camunda.zeebe.protocol.Protocol;
import io.camunda.zeebe.protocol.record.ExecuteCommandResponseEncoder;
import io.camunda.zeebe.protocol.record.MessageHeaderEncoder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import io.camunda.zeebe.broker.transport.partitionapi.InterPartitionCommandReceiverActor;
import io.camunda.zeebe.broker.transport.partitionapi.InterPartitionCommandSenderService;
import io.camunda.zeebe.db.ZeebeDb;
import io.camunda.zeebe.engine.api.CommandResponseWriter;
import io.camunda.zeebe.engine.api.TypedRecord;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessorFactory;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.CommandResponseWriter;
import io.camunda.zeebe.engine.state.QueryService;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.scheduler.ActorSchedulingService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.processing.streamprocessor.writers;
package io.camunda.zeebe.engine.api;

import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.RejectionType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
*/
package io.camunda.zeebe.engine.api;

import io.camunda.zeebe.engine.processing.streamprocessor.writers.CommandResponseWriter;
import io.camunda.zeebe.logstreams.log.LogStreamBatchWriter;

public final class EmptyProcessingResult implements ProcessingResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
*/
package io.camunda.zeebe.engine.api;

import io.camunda.zeebe.engine.processing.streamprocessor.writers.CommandResponseWriter;
import io.camunda.zeebe.logstreams.log.LogStreamBatchWriter;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@

public interface ProcessingScheduleService {

void runDelayed(Duration delay, Runnable followUpTask);
void runDelayed(Duration delay, Runnable task);

void runDelayed(Duration delay, Task task);

<T> void runOnCompletion(ActorFuture<T> precedingTask, BiConsumer<T, Throwable> followUpTask);

Expand All @@ -40,4 +42,6 @@ default void runAtFixedRate(final Duration delay, final Runnable task) {
}
});
}

void runAtFixedRate(final Duration delay, final Task task);
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
*/
package io.camunda.zeebe.engine.api;

import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriter;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import io.camunda.zeebe.logstreams.log.LogStream;

Expand All @@ -18,13 +17,9 @@ public interface ReadonlyStreamProcessorContext {
/**
* @return the logstream, on which the processor runs
*/
@Deprecated // only used in EngineRule; TODO remove this
LogStream getLogStream();

/**
* @return the actual log stream writer, used to write any record
*/
LegacyTypedStreamWriter getLogStreamWriter();

/**
* @return the state, where the data is stored during processing
*/
Expand Down
14 changes: 14 additions & 0 deletions engine/src/main/java/io/camunda/zeebe/engine/api/Task.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.api;

/** Here the interface is just a suggestion. Can be whatever PDT team thinks is best to work with */
public interface Task {

TaskResult execute(TaskResultBuilder taskResultBuilder);
}
16 changes: 16 additions & 0 deletions engine/src/main/java/io/camunda/zeebe/engine/api/TaskResult.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.api;

import io.camunda.zeebe.logstreams.log.LogStreamBatchWriter;

/** Here the interface is just a suggestion. Can be whatever PDT team thinks is best to work with */
public interface TaskResult {

long writeRecordsToStream(LogStreamBatchWriter logStreamBatchWriter);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.api;

import io.camunda.zeebe.protocol.record.RecordValue;
import io.camunda.zeebe.protocol.record.intent.Intent;

/** Here the interface is just a suggestion. Can be whatever PDT team thinks is best to work with */
public interface TaskResultBuilder {

/**
* Appends a record to the result
*
* @return returns itself for method chaining
*/
TaskResultBuilder appendCommandRecord(
final long key, final Intent intent, final RecordValue value);

TaskResult build();
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,14 @@ public JobBackoffChecker(final JobState jobState) {
backOffDueDateChecker =
new DueDateChecker(
BACKOFF_RESOLUTION,
typedCommandWriter ->
taskResultBuilder ->
jobState.findBackedOffJobs(
ActorClock.currentTimeMillis(),
(key, record) -> {
typedCommandWriter.reset();
typedCommandWriter.appendFollowUpCommand(
taskResultBuilder.appendCommandRecord(
key, JobIntent.RECUR_AFTER_BACKOFF, record);

return typedCommandWriter.flush() >= 0;
return true;
}));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@

import io.camunda.zeebe.engine.api.ReadonlyStreamProcessorContext;
import io.camunda.zeebe.engine.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriter;
import io.camunda.zeebe.engine.api.Task;
import io.camunda.zeebe.engine.api.TaskResult;
import io.camunda.zeebe.engine.api.TaskResultBuilder;
import io.camunda.zeebe.engine.state.immutable.JobState;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import java.time.Duration;
Expand All @@ -22,19 +24,20 @@ public final class JobTimeoutTrigger implements StreamProcessorLifecycleAware {

private boolean shouldReschedule = false;

private LegacyTypedStreamWriter writer;
private ReadonlyStreamProcessorContext processingContext;
private final Task deactivateTimedOutJobs;

public JobTimeoutTrigger(final JobState state) {
this.state = state;
deactivateTimedOutJobs = new DeactivateTimeOutJobs();
}

@Override
public void onRecovered(final ReadonlyStreamProcessorContext processingContext) {
this.processingContext = processingContext;
shouldReschedule = true;

scheduleDeactivateTimedOutJobsTask();
writer = processingContext.getLogStreamWriter();
}

@Override
Expand Down Expand Up @@ -62,25 +65,28 @@ public void onResumed() {
private void scheduleDeactivateTimedOutJobsTask() {
processingContext
.getScheduleService()
.runDelayed(TIME_OUT_POLLING_INTERVAL, this::deactivateTimedOutJobs);
.runDelayed(TIME_OUT_POLLING_INTERVAL, deactivateTimedOutJobs);
}

private void cancelTimer() {
shouldReschedule = false;
}

void deactivateTimedOutJobs() {
final long now = currentTimeMillis();
state.forEachTimedOutEntry(
now,
(key, record) -> {
writer.reset();
writer.appendFollowUpCommand(key, JobIntent.TIME_OUT, record);
private final class DeactivateTimeOutJobs implements Task {

return writer.flush() >= 0;
});
if (shouldReschedule) {
scheduleDeactivateTimedOutJobsTask();
@Override
public TaskResult execute(final TaskResultBuilder taskResultBuilder) {
final long now = currentTimeMillis();
state.forEachTimedOutEntry(
now,
(key, record) -> {
taskResultBuilder.appendCommandRecord(key, JobIntent.TIME_OUT, record);
return true;
});
if (shouldReschedule) {
scheduleDeactivateTimedOutJobsTask();
}
return taskResultBuilder.build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ public MessageObserver(
public void onRecovered(final ReadonlyStreamProcessorContext context) {
final var scheduleService = context.getScheduleService();
// it is safe to reuse the write because we running in the same actor/thread
final MessageTimeToLiveChecker timeToLiveChecker =
new MessageTimeToLiveChecker(context.getLogStreamWriter(), messageState);
final MessageTimeToLiveChecker timeToLiveChecker = new MessageTimeToLiveChecker(messageState);
scheduleService.runAtFixedRate(MESSAGE_TIME_TO_LIVE_CHECK_INTERVAL, timeToLiveChecker);

final PendingMessageSubscriptionChecker pendingSubscriptionChecker =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,33 +7,35 @@
*/
package io.camunda.zeebe.engine.processing.message;

import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriter;
import io.camunda.zeebe.engine.api.Task;
import io.camunda.zeebe.engine.api.TaskResult;
import io.camunda.zeebe.engine.api.TaskResultBuilder;
import io.camunda.zeebe.engine.state.immutable.MessageState;
import io.camunda.zeebe.engine.state.message.StoredMessage;
import io.camunda.zeebe.protocol.impl.record.value.message.MessageRecord;
import io.camunda.zeebe.protocol.record.intent.MessageIntent;
import io.camunda.zeebe.scheduler.clock.ActorClock;

public final class MessageTimeToLiveChecker implements Runnable {
public final class MessageTimeToLiveChecker implements Task {

private final LegacyTypedStreamWriter writer;
private final MessageState messageState;

private final MessageRecord deleteMessageCommand = new MessageRecord();

public MessageTimeToLiveChecker(
final LegacyTypedStreamWriter writer, final MessageState messageState) {
this.writer = writer;
public MessageTimeToLiveChecker(final MessageState messageState) {
this.messageState = messageState;
}

@Override
public void run() {
public TaskResult execute(final TaskResultBuilder taskResultBuilder) {
messageState.visitMessagesWithDeadlineBefore(
ActorClock.currentTimeMillis(), this::writeDeleteMessageCommand);
ActorClock.currentTimeMillis(),
message -> writeDeleteMessageCommand(message, taskResultBuilder));
return taskResultBuilder.build();
}

private boolean writeDeleteMessageCommand(final StoredMessage storedMessage) {
private boolean writeDeleteMessageCommand(
final StoredMessage storedMessage, final TaskResultBuilder taskResultBuilder) {
final var message = storedMessage.getMessage();

deleteMessageCommand.reset();
Expand All @@ -47,11 +49,8 @@ private boolean writeDeleteMessageCommand(final StoredMessage storedMessage) {
deleteMessageCommand.setMessageId(message.getMessageIdBuffer());
}

writer.reset();
writer.appendFollowUpCommand(
taskResultBuilder.appendCommandRecord(
storedMessage.getMessageKey(), MessageIntent.EXPIRE, deleteMessageCommand);

final long position = writer.flush();
return position > 0;
return true;
}
}

0 comments on commit fe164e3

Please sign in to comment.