-
Notifications
You must be signed in to change notification settings - Fork 556
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Engine abstraction - rewrite Writers class to use ResultProcessorBuilder provided by platform #9853
Engine abstraction - rewrite Writers class to use ResultProcessorBuilder provided by platform #9853
Conversation
This commit is a bit larger than I would like. It could be split up into a small commit and a large commit, so I just left it as one large commit. It started because I wanted to implement a TypedCommandWriter that uses ProcessingResultBuilder as backend instead of accessing the writers directly. Turns out, TypedCommandWriter contains reset() and flush() methods which will no longer be allowed. (There will be a reset() method for the full result, but not for individual parts of the result). So I created a smaller interface. RestrictedTypedCommandWriter which does not contain these two methods. (will be renamed in next commit) Then I went through the current uses of TypedCommandWriter and replaced all with RestrictedTypedCommandWriter. In one place this was not possible: DeploymentDistributionBehavior, which schedules a background task that uses reset and flush. For this use case, a new class LegacyTask was introduced. We need to migrate all background tasks anyway. The idea of LegacyTask is to be a bag to capture all unwanted dependencies. Then there was a need to schedule such a legacy task and provide the dependencies when the task is run. The effect of both of these are: - Only the background task has a dependency to a writer with flush and reset; and this task is fully in control by the platform - All other places in the engine only write, but don't use reset or flush
…ResultBuilder Refactor classes to reuse similar code
…lder Refactor classes to reuse similar code
Make sure that engine uses only the limited (no reset, flush) interface. Change logic in ProcessingStateMachine regarding how the writers are reset.
…tBuilder Also add a TODO to BufferedProcessingResultBuilder because there is certainly more work to do.
…ngResultBuilder This also required extending the interface. The philosophy here, is that in the result builder interface you have a low level of abstraction (e.g. all fields you can set). In the Typed...Writer interfaces you have a higher level of abstraction and more comfort. So TypedResponseWriter has methods like writeRejectionOnCommand(...), writeEventOnCommand(...), but these are all mapped onto the generic withResponse(...) method in ProcessingResultBuilder
For some unknown reason we need to set the hasResponse flag back to true. This should not be necessary, but it is. Setting it to true by default eliminates ~168 failed tests. Since the direct access shall be replaced with a buffered one I think there is no point in investigating this further. Finally, the error induced in StreamProcessorHealthTest must be moved to a different method to reflect the way the code flow works now.
deploymentDistributionRecord.setPartition(partitionId); | ||
commandWriter.reset(); | ||
commandWriter.appendFollowUpCommand( | ||
key, DeploymentDistributionIntent.COMPLETE, deploymentDistributionRecord); | ||
|
||
final long pos = commandWriter.flush(); | ||
if (pos < 0) { | ||
scheduleService.runDelayed(Duration.ofMillis(100), this); | ||
schedulingService.runDelayed(Duration.ofMillis(100), this); |
Check notice
Code scanning / CodeQL
Deprecated method or constructor invocation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the small commits, that made it a lot better to review this 🙇
I'll be honest I didn't understand everything that's been changed, probably because I'm not that deep into the abstraction topic. I have added some questions and some minor suggestions.
Overall I didn't see any strange stuff and I like the look of the abstraction so far, so great job 🚀
engine/src/main/java/io/camunda/zeebe/streamprocessor/StreamProcessorContext.java
Outdated
Show resolved
Hide resolved
...main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/LegacyTypedStreamWriterProxy.java
Show resolved
Hide resolved
...ava/io/camunda/zeebe/engine/processing/streamprocessor/writers/EventApplyingStateWriter.java
Show resolved
Hide resolved
The class is only used in one test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 It's great to see everything coming together/into shape!
There was only one issue, which is very minor, about a potentially unused field. But nothing that requires a second review I think.
- 💭 As I wrote, not sure about the whole mutex and if it does anything useful, but happy to see how it helps us in practice.
- 💭 It's a bit out of scope, but now that we have an expandable buffer in the result builder, could a large synchronous multi-instance collection cause us to run out of memory by writing tons of event? I guess it's not really all that different from before as we were anyway staging them in memory, right? So just for my peace of mind, the situation is more or less the same as before, correct?
|
||
private final ErrorRecord errorRecord = new ErrorRecord(); | ||
|
||
private final ProcessingResultBuilderMutex resultBuilderMutex = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❓ What is the goal of the mutex? To prevent callers from caching and using a result builder, or to ensure that callers always get the latest request-scoped builder? If it's the later, then all good. If it's the former, I'm not sure it's the best mechanism, as it doesn't really prevent any caching from happening, and I'm not sure what would happen if a processor were to cache it. I don't have much better ideas to be honest, but it feels like unnecessary complexity as it's not really preventing anything. I guess what's missing for me is that we aren't preventing the builder from being used out of scope, and I'm not sure it'll be obvious when we use it out of scope, or what the behavior will be (retry endlessly? blacklist?).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The mutex is mainly to ensure that callers get the latest request-scoped builder. You are right that it doesn't prevent the result from being cached.
If you want to make sure the builder is not used out of scope, the platform could pass a proxy to the engine and invalidate the proxy after it goes out of scope. Then, even if the engine were to cache the result, you could detect it.
.../zeebe/engine/processing/streamprocessor/writers/ResultBuilderBackedTypedResponseWriter.java
Outdated
Show resolved
Hide resolved
engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/writers/Writers.java
Show resolved
Hide resolved
engine/src/main/java/io/camunda/zeebe/streamprocessor/BufferedProcessingResultBuilder.java
Show resolved
Hide resolved
engine/src/main/java/io/camunda/zeebe/streamprocessor/ProcessingScheduleServiceImpl.java
Show resolved
Hide resolved
The expandable buffer is not in use yet. We are still using the direct access. So it is exactly as before |
bors merge |
Build succeeded: |
9852: Move stream processor classes into the stream processor package r=pihme a=pihme ## Description - Move classes that are only used by stream processor into the corresponding package - There were no other changes apart from moving the classes ## Related issues <!-- Which issues are closed by this PR or are related --> related to #9725 **Merge only after** #9853 (this PR can easily be regenerated; the other one cannot) Co-authored-by: pihme <pihme@users.noreply.github.com>
Description
This is the last PR in the "shape legacy code into new interfaces" series.
Previously, the new interfaces
ProcessingResultBuilder
andProcessingResult
were introduced. This is how the engine is supposed to return the processing result to the stream processor. Also previously, these classes were passed between stream processor and engine on interface level, and the division of labor between stream processor and engine was established.What was not achieved yet, is that the engine uses these new classes. Instead, the engine used existing legacy code to circumvent the new interfaces.
This PR transforms the engine to use the new classes provided by the stream processor:
Writers
class which at the outset depended on stream writers, state writers, response writers and so onWriters
class only depends onProcessingResultBuilder
which will be swapped out for each record processed, or each error to be handledTyped(Response/Rejection/Command)Writer
classes and remove methods likereset, flush
which will no longer be offered by the new interfaces and b) need to implement the refined interfaces with an implementation that forwards calls toProcessingResultBuilder
Review hints
Related issues
related to #9725
Definition of Done
Code changes:
backport stable/1.3
) to the PR, in case that fails you need to create backports manually.Testing:
Documentation:
Please refer to our review guidelines.