New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor StreamProcessor / Engine #9725
Comments
9735: 9725 engine abstraction part 1 r=pihme a=pihme ## Description - adds a new package into which the platform classes will be moved temporarily (with the idea to later move them into a new Maven module) - adds a test that warns about unwanted dependencies to that new package - moves some relevant classes into the new package - adds skeleton interfaces for the engine abstraction - implements replay functionality in new `Engine` class and calls the replay from the existing classes ## Related issues related to #9725 Co-authored-by: pihme <pihme@users.noreply.github.com>
9735: 9725 engine abstraction part 1 r=pihme a=pihme ## Description - adds a new package into which the platform classes will be moved temporarily (with the idea to later move them into a new Maven module) - adds a test that warns about unwanted dependencies to that new package - moves some relevant classes into the new package - adds skeleton interfaces for the engine abstraction - implements replay functionality in new `Engine` class and calls the replay from the existing classes ## Related issues related to #9725 Co-authored-by: pihme <pihme@users.noreply.github.com>
9736: refactor(engine): Remove StreamProcessorLifecycleAware from TypedRecordProcessor r=pihme a=pihme ## Description - Removes the `StreamProcessorLifecycleAware` interface from `TypedRecordProcessor`. Turns out, none of the record processors implemented these methods - This will make the engine abstraction easier, because then the new engine does not need to be listener and relay for those events - Tests were simplified accordingly. I think this deserves most attention in the review. Maybe some tests are now obsolete altogether 🤔 ## Related issues related to #9725 Co-authored-by: pihme <pihme@users.noreply.github.com>
9741: Mark in processing via flag r=Zelldon a=Zelldon ## Description Previously the processing has been marked implicit via the currentProcessor in order to move the processing call out to the engine, we need to mark the processing more explicit. <!-- Please explain the changes you made here. --> ## Related issues <!-- Which issues are closed by this PR or are related --> as preparation for #9725 Co-authored-by: Christopher Zell <zelldon91@googlemail.com>
9741: Mark in processing via flag r=Zelldon a=Zelldon ## Description Previously the processing has been marked implicit via the currentProcessor in order to move the processing call out to the engine, we need to mark the processing more explicit. <!-- Please explain the changes you made here. --> ## Related issues <!-- Which issues are closed by this PR or are related --> as preparation for #9725 Co-authored-by: Christopher Zell <zelldon91@googlemail.com>
There was a question raised by @npepinpe regarding error handling in onError:
My answer would be similar like we do it now: https://github.com/camunda/zeebe/blob/main/engine/src/main/java/io/camunda/zeebe/streamprocessor/ProcessingStateMachine.java#L56-L87
|
To clarify, this means we loop in the engine? |
No. The engine is called with an error and record (onError). If here an exception occurs again then, we again catch that in the StreamProcessor, rollback the transaction and start new a transaction and call onError again. |
Thanks! That's not obvious from the diagram, it looks like we loop in |
Yeah this state machine is the current state, and taken from the current StreamProcessor sorry 😅 |
9743: Refactor and Move `ProcessorContext` r=pihme a=pihme ## Description * Rename `(ReadOnly)ProcessorContext` to `StreamProcessorContext` * Move interface to `api` package, move implementation to `streamprocessor` package * Simplifiy code ## Related issues related to #9725 Co-authored-by: pihme <pihme@users.noreply.github.com>
9743: Refactor and Move `ProcessorContext` r=pihme a=pihme ## Description * Rename `(ReadOnly)ProcessorContext` to `StreamProcessorContext` * Move interface to `api` package, move implementation to `streamprocessor` package * Simplifiy code ## Related issues related to #9725 Co-authored-by: pihme <pihme@users.noreply.github.com>
9779: 9725 rename and improve interfaces r=pihme a=pihme ## Description - Renames interfaces to make it clearer that there could be multiple `RecordProcessor` implementations - Define interfaces for `ProcessingResultBuilder` and `ProcessingResult` ## Related issues <!-- Which issues are closed by this PR or are related --> related to #9725 Co-authored-by: pihme <pihme@users.noreply.github.com>
9835: Shape legacy code into new interfaces good bits part 1 r=pihme a=pihme ## Description The overall trajectory of these PRs is as follows: - Use new interfaces and implement them with classes that use mostly legacy code - Switch engine and platform to use more and more of the new interfaces and less and less of direct legacy code - Refine interfaces according to what is needed - Once the existing code is using mostly the new interfaces, we can implement the new interfaces with new code (e.g. code that uses buffers instead of writing directly to a stream) This process has many baby steps, but should be good to follow. In this PR `ProcessingResult` and `ProcessingResultBuilder` are implemented based on legacy code, and it is in the process of using these interfaces more and more in `ProcessingStateMachine`. The cut for part 1 was everything I did today which didn't break the tests. ## Related issues <!-- Which issues are closed by this PR or are related --> related to #9725 Co-authored-by: pihme <pihme@users.noreply.github.com>
9840: `TypedStreamWriter` that writes to buffer r=pihme a=pihme ## Description - Implements `TypedStreamWriter` that writes to buffer - Implements processing result builder to use the buffered stream writer - does not add tests. I started it, but it would take too much time. Hope I can revisit it after my holidays. For now there is just a follow up issue: #9838 ## Related issues closes #9780 9844: Shape legacy code into new interfaces good bits part 2 r=pihme a=pihme ## Description - Move logic to select processor into engine - Move processing logic into engine - Use `ProcessingResult` to return response and execute side effects - Move error handling logic into engine - Fix tests ## Review Hints - most of the commits have some failing tests - this was unavoidable, because substeps in the modification were invalid/incomplete - In the end, all tests are now passing again. This was quite satisfying acutally - adding more changes and seeing the test failure count go down again - Overall I would say we are through the valley of pain. Was quite volatile recently, but I am confident we can bring some calmness back to development - This achieves that most of the engine abstraction classes are in play; and the division of labor is largely as intended. - Still lots of stuff to do on either side, though ## Related issues related to #9725 Co-authored-by: pihme <pihme@users.noreply.github.com>
9844: Shape legacy code into new interfaces good bits part 2 r=pihme a=pihme ## Description - Move logic to select processor into engine - Move processing logic into engine - Use `ProcessingResult` to return response and execute side effects - Move error handling logic into engine - Fix tests ## Review Hints - most of the commits have some failing tests - this was unavoidable, because substeps in the modification were invalid/incomplete - In the end, all tests are now passing again. This was quite satisfying acutally - adding more changes and seeing the test failure count go down again - Overall I would say we are through the valley of pain. Was quite volatile recently, but I am confident we can bring some calmness back to development - This achieves that most of the engine abstraction classes are in play; and the division of labor is largely as intended. - Still lots of stuff to do on either side, though ## Related issues related to #9725 Co-authored-by: pihme <pihme@users.noreply.github.com>
9853: Engine abstraction - rewrite Writers class to use ResultProcessorBuilder provided by platform r=pihme a=pihme ## Description This is the last PR in the "shape legacy code into new interfaces" series. Previously, the new interfaces `ProcessingResultBuilder` and `ProcessingResult` were introduced. This is how the engine is supposed to return the processing result to the stream processor. Also previously, these classes were passed between stream processor and engine on interface level, and the division of labor between stream processor and engine was established. What was not achieved yet, is that the engine uses these new classes. Instead, the engine used existing legacy code to circumvent the new interfaces. This PR transforms the engine to use the new classes provided by the stream processor: - Central point of transformation is the `Writers` class which at the outset depended on stream writers, state writers, response writers and so on - In the end, the `Writers` class only depends on `ProcessingResultBuilder` which will be swapped out for each record processed, or each error to be handled - To make this transition possible we a) need to refine the `Typed(Response/Rejection/Command)Writer` classes and remove methods like `reset, flush` which will no longer be offered by the new interfaces and b) need to implement the refined interfaces with an implementation that forwards calls to `ProcessingResultBuilder` ## Review hints - `@npepinpe` 95% of changes is in the engine, `@remcowesterhoud` will look into those - These changes are done under "better done than perfect" regime. Some naming might be not exactly right. Some clean up might be done after the fact. The focus is to establish much of the engine abstraction concept, so that both teams afterwards can work without interfering with each other. ## Related issues related to #9725 Co-authored-by: pihme <pihme@users.noreply.github.com>
9852: Move stream processor classes into the stream processor package r=pihme a=pihme ## Description - Move classes that are only used by stream processor into the corresponding package - There were no other changes apart from moving the classes ## Related issues <!-- Which issues are closed by this PR or are related --> related to #9725 **Merge only after** #9853 (this PR can easily be regenerated; the other one cannot) Co-authored-by: pihme <pihme@users.noreply.github.com>
9967: Remove writers from interface r=pihme a=pihme ## Description This PR removes the writers fronm `ReadOnlyStreamProcessorContext`. This is important, because the writers are an engine-internal construct, only valid during calls to `process(...)` or `onError(...)` and must not be handed out to other classes. ## Related issues relates to #9725 replaces #9872 Co-authored-by: pihme <pihme@users.noreply.github.com>
9967: Remove writers from interface r=pihme a=pihme ## Description This PR removes the writers fronm `ReadOnlyStreamProcessorContext`. This is important, because the writers are an engine-internal construct, only valid during calls to `process(...)` or `onError(...)` and must not be handed out to other classes. ## Related issues relates to #9725 replaces #9872 Co-authored-by: pihme <pihme@users.noreply.github.com>
9989: Create engine outside r=Zelldon a=Zelldon ## Description Create the engine outside of the StreamProcessor, which should allow to also inject other RecordProcessors. Furthermore we can fill the Engine with the needed TypedRecordProcessorFactory, which means we no longer need that in our builders and contexts. I hope this helps you in make further progress `@deepthidevaki` Sidenote: This also simplifies the StreamProcessor/RecordProcessor tests, since we do not necessarily need to set up so much anymore. <!-- Please explain the changes you made here. --> ## Related issues <!-- Which issues are closed by this PR or are related --> related to #9725 Co-authored-by: Christopher Zell <zelldon91@googlemail.com>
9989: Create engine outside r=Zelldon a=Zelldon ## Description Create the engine outside of the StreamProcessor, which should allow to also inject other RecordProcessors. Furthermore we can fill the Engine with the needed TypedRecordProcessorFactory, which means we no longer need that in our builders and contexts. I hope this helps you in make further progress `@deepthidevaki` Sidenote: This also simplifies the StreamProcessor/RecordProcessor tests, since we do not necessarily need to set up so much anymore. <!-- Please explain the changes you made here. --> ## Related issues <!-- Which issues are closed by this PR or are related --> related to #9725 10012: refactor: let `dist` build the actor scheduler for gateway r=oleschoenburg a=oleschoenburg ## Description This moves the responsibility of creating an actor scheduler for the gateway to the `dist` module. As an added benefit, this allows us to remove some redundant constructors, simplify BrokerClient to never start the actor scheduler and a deprecated method from `BrokerStartupContext`. ## Related issues <!-- Which issues are closed by this PR or are related --> relates to #9996 Co-authored-by: Christopher Zell <zelldon91@googlemail.com> Co-authored-by: Ole Schönburg <ole.schoenburg@gmail.com>
9989: Create engine outside r=Zelldon a=Zelldon ## Description Create the engine outside of the StreamProcessor, which should allow to also inject other RecordProcessors. Furthermore we can fill the Engine with the needed TypedRecordProcessorFactory, which means we no longer need that in our builders and contexts. I hope this helps you in make further progress `@deepthidevaki` Sidenote: This also simplifies the StreamProcessor/RecordProcessor tests, since we do not necessarily need to set up so much anymore. <!-- Please explain the changes you made here. --> ## Related issues <!-- Which issues are closed by this PR or are related --> related to #9725 9995: Modify process instance by activating elements inside existing flow scopes r=remcowesterhoud a=remcowesterhoud ## Description <!-- Please explain the changes you made here. --> Allows the activation of nested elements, as long as they are activated inside of an existing flow scope. Using the flowscope element id we can search for an element instance matching this id. When found we can return the key of this element instance and use it as the flow scope when activating the element from the activate instruction. ## Related issues <!-- Which issues are closed by this PR or are related --> closes #9642 Co-authored-by: Christopher Zell <zelldon91@googlemail.com> Co-authored-by: Remco Westerhoud <remco@westerhoud.nl>
10076: Engine abstraction remove legacy writer r=pihme a=pihme ## Description - Introduces a new task that can be scheduled by the engine and that can return records to be written - Replaces all references to `LegacyTypedStreamWriter`outside of stream processor with scheduling of the new tasks - The overall result is that the engine no longer depends on any writer that writes directly to the stream. Stream processor is now in full control of when and how records shall be written (Heureka!) ## Related issues related to #9724, #9730, #9725 Co-authored-by: pihme <pihme@users.noreply.github.com>
10076: Engine abstraction remove legacy writer r=pihme a=pihme ## Description - Introduces a new task that can be scheduled by the engine and that can return records to be written - Replaces all references to `LegacyTypedStreamWriter`outside of stream processor with scheduling of the new tasks - The overall result is that the engine no longer depends on any writer that writes directly to the stream. Stream processor is now in full control of when and how records shall be written (Heureka!) ## Related issues related to #9724, #9730, #9725 Co-authored-by: pihme <pihme@users.noreply.github.com>
10076: Engine abstraction remove legacy writer r=Zelldon a=pihme ## Description - Introduces a new task that can be scheduled by the engine and that can return records to be written - Replaces all references to `LegacyTypedStreamWriter`outside of stream processor with scheduling of the new tasks - The overall result is that the engine no longer depends on any writer that writes directly to the stream. Stream processor is now in full control of when and how records shall be written (Heureka!) ## Related issues related to #9724, #9730, #9725 Co-authored-by: pihme <pihme@users.noreply.github.com> Co-authored-by: Christopher Zell <zelldon91@googlemail.com>
The refactoring of the streamprocessor / engine interface is done. There might be some clean up which we will do with #9727 |
Description
Part of #9600
Rename StreamProcessor, to better distinguish between the Platform which drives the Processor, the StreamProcessor interface, and the Engine as implementation, etc. Furthermore, refactor the current implementation to make it possible to split up the modules and have clear boundaries between components.
See
Processing
in #9602 (comment)Todo:
currentProcessor
field shouldn't be used anymore.Implement LifecycleAware?StreamProcessorListener
#10034The text was updated successfully, but these errors were encountered: