Skip to content

Commit

Permalink
merge: #10196
Browse files Browse the repository at this point in the history
10196: Introduce ProcessingResponse r=Zelldon a=Zelldon

## Description

blocked by #10191 ✅ 
blocked by #10188 ✅ 

Introduce the ProcessingResponse, which encapsulates the information of the Record which should be send as response on a user command and the request- and streamId which identifies the request.

The ProcessingResponse usage replaces the usage of the CommandResponseWriter, and deletes several now unused code.

<!-- Please explain the changes you made here. -->

## Related issues

<!-- Which issues are closed by this PR or are related -->

closes #10001 



Co-authored-by: Christopher Zell <zelldon91@googlemail.com>
  • Loading branch information
zeebe-bors-camunda[bot] and Zelldon committed Aug 26, 2022
2 parents 8bc7e82 + 0697540 commit 82d6a8a
Show file tree
Hide file tree
Showing 13 changed files with 111 additions and 334 deletions.
Expand Up @@ -7,8 +7,8 @@
*/
package io.camunda.zeebe.backup.processing;

import io.camunda.zeebe.engine.api.CommandResponseWriter;
import io.camunda.zeebe.engine.api.PostCommitTask;
import io.camunda.zeebe.engine.api.ProcessingResponse;
import io.camunda.zeebe.engine.api.ProcessingResult;
import io.camunda.zeebe.engine.api.ProcessingResultBuilder;
import io.camunda.zeebe.engine.api.records.ImmutableRecordBatch;
Expand All @@ -22,6 +22,7 @@
import io.camunda.zeebe.util.Either;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

record MockProcessingResult(List<Event> records) implements ProcessingResult {

Expand All @@ -31,8 +32,8 @@ public ImmutableRecordBatch getRecordBatch() {
}

@Override
public boolean writeResponse(final CommandResponseWriter commandResponseWriter) {
return false;
public Optional<ProcessingResponse> getProcessingResponse() {
return Optional.empty();
}

@Override
Expand Down
Expand Up @@ -9,6 +9,7 @@

import io.camunda.zeebe.engine.api.records.ImmutableRecordBatch;
import io.camunda.zeebe.engine.api.records.RecordBatch;
import java.util.Optional;

public final class EmptyProcessingResult implements ProcessingResult {

Expand All @@ -25,8 +26,8 @@ public ImmutableRecordBatch getRecordBatch() {
}

@Override
public boolean writeResponse(final CommandResponseWriter commandResponseWriter) {
return true;
public Optional<ProcessingResponse> getProcessingResponse() {
return Optional.empty();
}

@Override
Expand Down
@@ -0,0 +1,33 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.api;

import io.camunda.zeebe.engine.api.records.RecordBatchEntry;

/**
* The response which has been created during processing, for a request which can be identified by
* {@link #requestId} and {@link #requestStreamId}.
*/
public interface ProcessingResponse {

/**
* @return the id which together with the stream id identifies the request
*/
long requestId();

/**
* @return the id of the stream on which the request was sent, together with the request id the
* request can be identified
*/
int requestStreamId();

/**
* @return the value of the response which should be sent as answer of the request
*/
RecordBatchEntry responseValue();
}
Expand Up @@ -9,6 +9,7 @@

import io.camunda.zeebe.engine.api.records.ImmutableRecordBatch;
import io.camunda.zeebe.engine.api.records.RecordBatchEntry;
import java.util.Optional;

/**
* Here the interface is just a suggestion. Can be whatever PDT teams thinks is best to work with
Expand All @@ -25,7 +26,11 @@ public interface ProcessingResult {
*/
ImmutableRecordBatch getRecordBatch();

boolean writeResponse(CommandResponseWriter commandResponseWriter);
/**
* @return the processing response, which should be sent as answer of a user command. Can be empty
* if no user command was processed.
*/
Optional<ProcessingResponse> getProcessingResponse();

/**
* @return <code>false</code> to indicate that the side effect could not be applied successfully
Expand Down
Expand Up @@ -7,13 +7,15 @@
*/
package io.camunda.zeebe.streamprocessor;

import io.camunda.zeebe.engine.api.CommandResponseWriter;
import io.camunda.zeebe.engine.api.PostCommitTask;
import io.camunda.zeebe.engine.api.ProcessingResponse;
import io.camunda.zeebe.engine.api.ProcessingResult;
import io.camunda.zeebe.engine.api.TaskResult;
import io.camunda.zeebe.engine.api.records.ImmutableRecordBatch;
import io.camunda.zeebe.streamprocessor.DirectProcessingResultBuilder.ProcessingResponseImpl;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

/**
* Implementation of {@code ProcessingResult} that uses direct access to the stream and to response
Expand All @@ -24,19 +26,16 @@
final class DirectProcessingResult implements ProcessingResult, TaskResult {

private final List<PostCommitTask> postCommitTasks;
private final DirectTypedResponseWriter responseWriter;
private boolean hasResponse;
private final ImmutableRecordBatch immutableRecordBatch;
private final ProcessingResponseImpl processingResponse;

DirectProcessingResult(
final StreamProcessorContext context,
final ImmutableRecordBatch immutableRecordBatch,
final List<PostCommitTask> postCommitTasks,
final boolean hasResponse) {
final ProcessingResponseImpl processingResponse,
final List<PostCommitTask> postCommitTasks) {
this.postCommitTasks = new ArrayList<>(postCommitTasks);
responseWriter = context.getTypedResponseWriter();
this.processingResponse = processingResponse;
this.immutableRecordBatch = immutableRecordBatch;
this.hasResponse = hasResponse;
}

@Override
Expand All @@ -45,15 +44,8 @@ public ImmutableRecordBatch getRecordBatch() {
}

@Override
public boolean writeResponse(final CommandResponseWriter commandResponseWriter) {
// here we must assume that response writer is backed up by command response writer internally

if (hasResponse) {
hasResponse = false;
return responseWriter.flush();
} else {
return true;
}
public Optional<ProcessingResponse> getProcessingResponse() {
return Optional.ofNullable(processingResponse);
}

@Override
Expand Down
Expand Up @@ -10,9 +10,11 @@
import static io.camunda.zeebe.engine.processing.streamprocessor.TypedEventRegistry.TYPE_REGISTRY;

import io.camunda.zeebe.engine.api.PostCommitTask;
import io.camunda.zeebe.engine.api.ProcessingResponse;
import io.camunda.zeebe.engine.api.ProcessingResult;
import io.camunda.zeebe.engine.api.ProcessingResultBuilder;
import io.camunda.zeebe.engine.api.records.RecordBatch;
import io.camunda.zeebe.engine.api.records.RecordBatchEntry;
import io.camunda.zeebe.engine.api.records.RecordBatchSizePredicate;
import io.camunda.zeebe.msgpack.UnpackedObject;
import io.camunda.zeebe.protocol.impl.record.UnifiedRecordValue;
Expand All @@ -35,24 +37,14 @@ final class DirectProcessingResultBuilder implements ProcessingResultBuilder {

private final List<PostCommitTask> postCommitTasks = new ArrayList<>();

private final StreamProcessorContext context;
private final LegacyTypedStreamWriter streamWriter;
private final DirectTypedResponseWriter responseWriter;

private boolean hasResponse =
true; // TODO figure out why this still needs to be true for tests to pass
private final long sourceRecordPosition;
private final RecordBatch mutableRecordBatch;
private ProcessingResponseImpl processingResponse;

DirectProcessingResultBuilder(
final StreamProcessorContext context,
final long sourceRecordPosition,
final RecordBatchSizePredicate predicate) {
this.context = context;
this.sourceRecordPosition = sourceRecordPosition;
final StreamProcessorContext context, final RecordBatchSizePredicate predicate) {
streamWriter = context.getLogStreamWriter();
streamWriter.configureSourceContext(sourceRecordPosition);
responseWriter = context.getTypedResponseWriter();
mutableRecordBatch = new RecordBatch(predicate);
}

Expand Down Expand Up @@ -98,17 +90,10 @@ public ProcessingResultBuilder withResponse(
final String rejectionReason,
final long requestId,
final int requestStreamId) {
hasResponse = true;
responseWriter.writeResponse(
recordType,
key,
intent,
value,
valueType,
rejectionType,
rejectionReason,
requestId,
requestStreamId);
final var entry =
RecordBatchEntry.createEntry(
key, -1, recordType, intent, rejectionType, rejectionReason, valueType, value);
processingResponse = new ProcessingResponseImpl(entry, requestId, requestStreamId);
return this;
}

Expand All @@ -120,9 +105,6 @@ public ProcessingResultBuilder appendPostCommitTask(final PostCommitTask task) {

@Override
public ProcessingResultBuilder reset() {
streamWriter.reset();
streamWriter.configureSourceContext(sourceRecordPosition);
responseWriter.reset();
postCommitTasks.clear();
return this;
}
Expand All @@ -135,11 +117,14 @@ public ProcessingResultBuilder resetPostCommitTasks() {

@Override
public ProcessingResult build() {
return new DirectProcessingResult(context, mutableRecordBatch, postCommitTasks, hasResponse);
return new DirectProcessingResult(mutableRecordBatch, processingResponse, postCommitTasks);
}

@Override
public boolean canWriteEventOfLength(final int eventLength) {
return mutableRecordBatch.canAppendRecordOfLength(eventLength);
}

record ProcessingResponseImpl(RecordBatchEntry responseValue, long requestId, int requestStreamId)
implements ProcessingResponse {}
}
Expand Up @@ -19,7 +19,6 @@
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import java.util.Collections;

/**
* Implementation of {@code TaskResultBuilder} that uses direct access to the stream. This
Expand All @@ -29,12 +28,9 @@
*/
final class DirectTaskResultBuilder implements TaskResultBuilder {

private final StreamProcessorContext context;
private final MutableRecordBatch mutableRecordBatch;

DirectTaskResultBuilder(
final StreamProcessorContext context, final RecordBatchSizePredicate predicate) {
this.context = context;
DirectTaskResultBuilder(final RecordBatchSizePredicate predicate) {
mutableRecordBatch = new RecordBatch(predicate);
}

Expand All @@ -57,6 +53,6 @@ public boolean appendCommandRecord(

@Override
public TaskResult build() {
return new DirectProcessingResult(context, mutableRecordBatch, Collections.emptyList(), false);
return () -> mutableRecordBatch;
}
}

This file was deleted.

0 comments on commit 82d6a8a

Please sign in to comment.