Skip to content

Commit

Permalink
refactor(engine): implement TypedResponseProcessor backed by Processi…
Browse files Browse the repository at this point in the history
…ngResultBuilder

This also required extending the interface. The philosophy here, is that in the result builder interface you have a
low level of abstraction (e.g. all fields you can set).

In the Typed...Writer interfaces you have a higher level of abstraction and more comfort.
So TypedResponseWriter has methods like writeRejectionOnCommand(...), writeEventOnCommand(...), but these are all mapped onto the generic withResponse(...) method in ProcessingResultBuilder
  • Loading branch information
pihme committed Jul 21, 2022
1 parent 463366f commit 1ee62bf
Show file tree
Hide file tree
Showing 7 changed files with 176 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,13 @@ ProcessingResultBuilder appendRecord(
* @return returns itself for method chaining
*/
ProcessingResultBuilder withResponse(
final long eventKey,
final Intent eventState,
final UnpackedObject eventValue,
final RecordType type,
final long key,
final Intent intent,
final UnpackedObject value,
final ValueType valueType,
final RejectionType rejectionType,
final String rejectionReason,
final long requestId,
final int requestStreamId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.camunda.zeebe.engine.api.TypedRecord;
import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectProducer;
import io.camunda.zeebe.msgpack.UnpackedObject;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.Intent;
Expand All @@ -35,6 +36,17 @@ void writeResponse(
long requestId,
int requestStreamId);

void writeResponse(
final RecordType recordType,
final long key,
final Intent intent,
final UnpackedObject value,
final ValueType valueType,
final RejectionType rejectionType,
final String rejectionReason,
final long requestId,
final int requestStreamId);

/**
* Submits the response to transport.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,33 @@ public void writeResponse(
eventValue);
}

@Override
public void writeResponse(
final RecordType recordType,
final long key,
final Intent intent,
final UnpackedObject value,
final ValueType valueType,
final RejectionType rejectionType,
final String rejectionReason,
final long requestId,
final int requestStreamId) {

final byte[] bytes = rejectionReason.getBytes(StandardCharsets.UTF_8);
stringWrapper.wrap(bytes);

stage(
recordType,
intent,
key,
rejectionType,
stringWrapper,
valueType,
requestId,
requestStreamId,
value);
}

@Override
public boolean flush() {
if (isResponseStaged) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import io.camunda.zeebe.engine.api.TypedRecord;
import io.camunda.zeebe.msgpack.UnpackedObject;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.Intent;
Expand Down Expand Up @@ -38,6 +39,18 @@ public void writeResponse(
final long requestId,
final int requestStreamId) {}

@Override
public void writeResponse(
final RecordType recordType,
final long key,
final Intent intent,
final UnpackedObject value,
final ValueType valueType,
final RejectionType rejectionType,
final String rejectionReason,
final long requestId,
final int requestStreamId) {}

@Override
public boolean flush() {
return false;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.processing.streamprocessor.writers;

import io.camunda.zeebe.engine.api.ProcessingResultBuilder;
import io.camunda.zeebe.engine.api.TypedRecord;
import io.camunda.zeebe.msgpack.UnpackedObject;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import java.nio.charset.StandardCharsets;
import java.util.function.Supplier;
import org.agrona.concurrent.UnsafeBuffer;

public class ResultBuilderBackedTypedResponseWriter extends AbstractResultBuilderBackedWriter
implements TypedResponseWriter {

private final UnsafeBuffer stringWrapper = new UnsafeBuffer(0, 0);

ResultBuilderBackedTypedResponseWriter(
final Supplier<ProcessingResultBuilder> resultBuilderSupplier) {
super(resultBuilderSupplier);
}

@Override
public void writeRejectionOnCommand(
final TypedRecord<?> command, final RejectionType type, final String reason) {
final byte[] bytes = reason.getBytes(StandardCharsets.UTF_8);
stringWrapper.wrap(bytes);

resultBuilder()
.withResponse(
RecordType.COMMAND_REJECTION,
command.getKey(),
command.getIntent(),
command.getValue(),
command.getValueType(),
type,
reason,
command.getRequestId(),
command.getRequestStreamId());
}

@Override
public void writeEvent(final TypedRecord<?> event) {
writeResponse(
event.getKey(),
event.getIntent(),
event.getValue(),
event.getValueType(),
event.getRequestId(),
event.getRequestStreamId());
}

@Override
public void writeEventOnCommand(
final long eventKey,
final Intent eventState,
final UnpackedObject eventValue,
final TypedRecord<?> command) {
writeResponse(
eventKey,
eventState,
eventValue,
command.getValueType(),
command.getRequestId(),
command.getRequestStreamId());
}

@Override
public void writeResponse(
final long eventKey,
final Intent eventState,
final UnpackedObject eventValue,
final ValueType valueType,
final long requestId,
final int requestStreamId) {

resultBuilder()
.withResponse(
RecordType.EVENT,
eventKey,
eventState,
eventValue,
valueType,
RejectionType.NULL_VAL,
"",
requestId,
requestStreamId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,13 @@ this class doesn't yet

@Override
public ProcessingResultBuilder withResponse(
final long eventKey,
final Intent eventState,
final UnpackedObject eventValue,
final RecordType recordType,
final long key,
final Intent intent,
final UnpackedObject value,
final ValueType valueType,
final RejectionType rejectionType,
final String rejectionReason,
final long requestId,
final int requestStreamId) {
throw new RuntimeException("Not yet implemented");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,26 @@ public ProcessingResultBuilder appendRecord(

@Override
public ProcessingResultBuilder withResponse(
final long eventKey,
final Intent eventState,
final UnpackedObject eventValue,
final RecordType recordType,
final long key,
final Intent intent,
final UnpackedObject value,
final ValueType valueType,
final RejectionType rejectionType,
final String rejectionReason,
final long requestId,
final int requestStreamId) {
hasResponse = true;
responseWriter.writeResponse(
eventKey, eventState, eventValue, valueType, requestId, requestStreamId);
recordType,
key,
intent,
value,
valueType,
rejectionType,
rejectionReason,
requestId,
requestStreamId);
return this;
}

Expand Down

0 comments on commit 1ee62bf

Please sign in to comment.