-
Notifications
You must be signed in to change notification settings - Fork 555
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use RecordBatch to write follow up records #10163
Conversation
We have some use case where we want to verify whether we could potentially write certain event count and event batch size to the LogStreamBatchWriter, without already adding it to the batch. This commit adds a new method to check whether the event count and batch size could potentially written.
* Add a new method to return a RecordBatch from the ProcessingResult * Existing ResultBuilder implementation now also writes to a record batch, not only to the streamWriter * ResultBuilder creates a result with ImmutableRecordBatch * RecordBatch is created with new writer check method, in order to verify whether certain entries can be added to the batch
Some of them need to be restored after refactoring, but in a different way.
final ValueType valueType = typeRegistry.get(value.getClass()); | ||
if (valueType == null) { | ||
// usually happens when the record is not registered at the TypedStreamEnvironment | ||
throw new IllegalStateException("Missing value type mapping for record: " + value.getClass()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is right now similar handled in the legacy writer https://github.com/camunda/zeebe/blob/main/engine/src/main/java/io/camunda/zeebe/streamprocessor/LegacyTypedStreamWriterImpl.java#L40-L45
if (value instanceof UnifiedRecordValue unifiedRecordValue) { | ||
mutableRecordBatch.appendRecord( | ||
key, -1, type, intent, rejectionType, rejectionReason, valueType, unifiedRecordValue); | ||
} else { | ||
throw new IllegalStateException( | ||
String.format("The record value %s is not a UnifiedRecordValue", value)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do a similar thing in the Legacy writer see https://github.com/camunda/zeebe/blob/main/engine/src/main/java/io/camunda/zeebe/streamprocessor/LegacyTypedStreamWriterImpl.java#L85-L89
We could change the interface to accept only UnifiedRecordValue, but this would cause more changes on other interfaces which I didn't wanted to do in this PR.
@Test | ||
public void shouldMarkUnhealthyWhenOnErrorHandlingWriteEventFails() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will create a follow-up issue to either restore/rewrite the tests. Lets see whether we find a good way. Right now the writing shouldn't fail anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have one question - I don't understand how this works because it seems like we are writing the followup records twice. Once via RecordBatch in ProcessingStateMachine
and one directly via streamWriter in DirectProcessingResultBuilder
.
If I understood this correctly, you can already
- Remove
streamWriter
fromDirectProcessingResultBuilder
- Use the RecordBatch to verify instead of StreamWriter in
DirectProcessingResultBuilder::canWriteEventOfLength
Let me know what you think, and I will approve the PR after that.
engine/src/main/java/io/camunda/zeebe/engine/api/ProcessingResult.java
Outdated
Show resolved
Hide resolved
throw new IllegalStateException( | ||
String.format("The record value %s is not a UnifiedRecordValue", value)); | ||
} | ||
|
||
streamWriter.appendRecord(key, type, intent, rejectionType, rejectionReason, value); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❓ Doesn't the record gets written twice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, the streamWriter is reseted in the ProcessingStateMachine. The problem here is I can't yet remove it because I have to migrate the TaskResult first to the RecordBatch, then I can remove that part.
Co-authored-by: Deepthi Devaki Akkoorath <deepthidevaki@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
Thanks for your review :) :bors r+ |
bors r- |
bors r+ |
Build succeeded: |
Description
In order to determine the maximum record batch size, we use a new method on the batch writer. This method allows us to check whether we would be able to write a specific event count and batch size to the writer.
During processing a RecordBatch is built up in the ProcessingResultBuilder and given as ImmutableRecordBatch inside the ProcessingResult to the PrcoessingStateMachine. Here we tried to not change the interfaces, which causes us to cast on certain places, this is similar to how it currently is done in the LegacyWriter.
The RecordBatch is consumed by the ProcessingStateMachine in order to write the records. A follow-up PR will clean up other parts, like the unused writer in the ProcessingResult, etc.
Some StreamProcessor tests relied on the writer usage, which has been disabled/removed for now. We will soon rewrite them.
Related issues
related #9724
related #10001
Definition of Done
Not all items need to be done depending on the issue and the pull request.
Code changes:
backport stable/1.3
) to the PR, in case that fails you need to create backports manually.Testing:
Documentation:
Please refer to our review guidelines.