Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new RecordBatch classes and interfaces #10118

Merged
merged 6 commits into from
Aug 23, 2022

Conversation

Zelldon
Copy link
Member

@Zelldon Zelldon commented Aug 18, 2022

Description

With the #9600 and especially with #9724 we wanted to create a way to return results (records) on processing and on scheduled Tasks, without the need to use the LogStreamWriters. This should allow us to reduce the dependency on the log stream and in general, make the test setup more easier.

We did the first attempt with the BufferedProcessingResultBuilder, which only contains a buffer to write into the needed record details. This was also described as a solution in the issue #9724.

During the discussion of the proposal, there was already the idea raised that it would be nicer to have a List of records returned by the Processing- and TaskResult. This was added as a Bonus in #9724.

Today I thought longer about the BufferedProcessingResultBuilder and how we could test it and in general make it nicer to return the processing result. Checking the code of the result builder and LogStreamBatchWriterImpl.java, I realized that we do not really need to copy the record details into a big buffer (as we do it here). We could create a "struct" (some would call it a java record but to avoid confusion I call it struct now) of the record details which we want to write. Most parts of the struct are primitives (like enums intent, valueType etc.) and the RecordValue would be only thing we would need to copy into a buffer. This would allow better debuggability (e.g. we can see during processing what is already part of the result) and no need to mess around with offsets in the implementation. We can keep references to these "structs" in a list and iterate over it in the StreamProcessor to write the record details, similar to what we do in the LogStreamBatchWriter here.

One interesting part after writing the classes and tests, I also realized that the batch entry interface could return the UnifiedRecordValue in order to not have to mess again with buffers (BufferWriters or BufferReaders) and to make the engine tests easier to implement. For example, if we would like to split the modules we could write tests without the stream processor and take the records from the processing result and give it directly again to the engine 🤯

So this PR adds new interfaces and first implementation for so called RecordBatch and RecordBatchEntry, which should be later used in the Processing- and TaskResult. Since the CommandWriter needs similar information as the LogStreamWriter, we can reuse the RecordBatch for both the records to write and for the response, which is part of the Processing Result. No other code was touched/modified.

Right now the interfaces are kept to a minimum and documentation as well. I guess we might need to iterate over them later again, but I think it is a good step forward.

Feel free to propose another names for the classes and interfaces, nothing is set into stone. #namingishard

Related issues

related to #10001

Definition of Done

Not all items need to be done depending on the issue and the pull request.

Code changes:

  • The changes are backwards compatibility with previous versions
  • If it fixes a bug then PRs are created to backport the fix to the last two minor versions. You can trigger a backport by assigning labels (e.g. backport stable/1.3) to the PR, in case that fails you need to create backports manually.

Testing:

  • There are unit/integration tests that verify all acceptance criterias of the issue
  • New tests are written to ensure backwards compatibility with further versions
  • The behavior is tested manually
  • The change has been verified by a QA run
  • The impact of the changes is verified by a benchmark

Documentation:

  • The documentation is updated (e.g. BPMN reference, configuration, examples, get-started guides, etc.)
  • New content is added to the release announcement
  • If the PR changes how BPMN processes are validated (e.g. support new BPMN element) then the Camunda modeling team should be informed to adjust the BPMN linting.

Please refer to our review guidelines.

@github-actions
Copy link
Contributor

github-actions bot commented Aug 18, 2022

Test Results

   851 files  +    4     851 suites  +4   1h 38m 59s ⏱️ - 2m 13s
6 651 tests +402  6 640 ✔️ +402  11 💤 ±0  0 ±0 
6 835 runs  +402  6 824 ✔️ +402  11 💤 ±0  0 ±0 

Results for commit 3fc44ce. ± Comparison against base commit fab1f77.

♻️ This comment has been updated with latest results.

@Zelldon Zelldon marked this pull request as ready for review August 18, 2022 19:05
@Zelldon Zelldon mentioned this pull request Aug 19, 2022
10 tasks
@Zelldon Zelldon requested a review from npepinpe August 19, 2022 08:17
Copy link
Member

@npepinpe npepinpe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚀

Nice, I think switching to the engine returning an immutable, ordered batch is much better, and we can be in charge of the complete persistence layer (including serialization/deserialization of records).

  • Naming: I like Mutable and Immutable better than Modifiable/Unmodifiable. Plus, we use them in other places in the project already.
  • ❌ My only concern is regarding the max batch size. See I can't activate jobs with a high max job count #5525 and its fix, Correctly truncate a job activation batch if it will not fit in the dispatcher #8799. Previously we had to use a function to compute the remaining writable bytes in the dispatcher, since the configured maxFragmentSize isn't actually how much bytes you can write (from a producer point of view). After framing, aligning, etc., you can usually write less, and it's variable based on how many entries you write in a single dispatcher batch. Here it seems we're back to using a fixed value? See the BufferedProcessingResultBuilder which has a capacityCalculator function to do that. Is there a reason we don't use it here?

@Zelldon
Copy link
Member Author

Zelldon commented Aug 22, 2022

rocket

Nice, I think switching to the engine returning an immutable, ordered batch is much better, and we can be in charge of the complete persistence layer (including serialization/deserialization of records).

  • Naming: I like Mutable and Immutable better than Modifiable/Unmodifiable. Plus, we use them in other places in the project already.

Fine by me. I can change that :)

  • x My only concern is regarding the max batch size. See I can't activate jobs with a high max job count #5525 and its fix, Correctly truncate a job activation batch if it will not fit in the dispatcher #8799. Previously we had to use a function to compute the remaining writable bytes in the dispatcher, since the configured maxFragmentSize isn't actually how much bytes you can write (from a producer point of view). After framing, aligning, etc., you can usually write less, and it's variable based on how many entries you write in a single dispatcher batch. Here it seems we're back to using a fixed value? See the BufferedProcessingResultBuilder which has a capacityCalculator function to do that. Is there a reason we don't use it here?

Tbh. I still don't get why we have it like that. If I as a user configures something like maxMessageSize then I would expect this is the MAXIMUM size I can sent/use. But this is not the case, one reason is the dispatcher and his headers etc. Why is that? Why isn't the dispatcher size in a way calculated that the message size is fitting plus 10% buffer or something?

Reminds me of an analogy: I want to build a house with 128 sqm, after it is finished I get a house with 100 sqm with the argument the walls are too thick. Why aren't the walls put outside 😅


Anyhow I don't see that as a blocker for the PR. We can adjust that in a follow-up PR. The BufferedResultBuilder, as you said use the calculate function, I can also use that but tbh I tried to avoid it because it doesn't make much sense to me. I would prefer either we increase the dispatcher or reduce the max message size for the buffer (by 10% or something so headers also fit in).

@npepinpe
Copy link
Member

Tbh. I still don't get why we have it like that. If I as a user configures something like maxMessageSize then I would expect this is the MAXIMUM size I can sent/use. But this is not the case, one reason is the dispatcher and his headers etc. Why is that? Why isn't the dispatcher size in a way calculated that the message size is fitting plus 10% buffer or something?

Reminds me of an analogy: I want to build a house with 128 sqm, after it is finished I get a house with 100 sqm with the argument the walls are too thick. Why aren't the walls put outside 😅

I agree, and I wish the dispatcher would instead figure that out, e.g. if I say I want to write at most 4MB, I should be able to do so 😄 If it was just one entry at a time, this would work fine (just remove some space), but since it depends on the number of items in a dispatcher batch, I'm not sure how you'd pre-calculate it. Maybe by fixing an upper bound to how many items in a batch you can add? Then you can calculate a deterministic upper bound (e.g. let D be the difference between the aligned max fragment size and the max fragment size, and E be the max number of events possible, and F be the outer frame length, and F_e be the single even frame length, then total overhead is D + F + (E * F_e) - I think that works, though again single events are also aligned so maybe I forgot something).

I wish it was just simpler tbh, or better, that max fragment size was the max length of a single event, not the max length of a batch or something, and we could handle infinitely large batches (via distributed transactions). But that all seems more complex. If you can figure out a way to calculate a deterministic upper bound, then we can go with that.

In the meantime, I think merging this PR and using it will re-open the bug #5525, no? Of course if we merge and don't use it's fine, but I'd be worried we start using it without fixing it, and then we're essentially re-introducing #5525. Or am I missing something? 🤔

@Zelldon
Copy link
Member Author

Zelldon commented Aug 22, 2022

Ok thanks for your input. With the current PR it is not yet used, and it would only be in upcoming PR's but Ok. I will time-box it and take a look at the dispatcher (and potential replacement). Otherwise I will use than a similar approach as we do it right now with the BufferedResultBuilder (giving it a calculation function). The problem I see here is that we then can't remove the ResultBuilder from the processing interface which is bad :/ Only if the ResultBatch has the knowledge of how big a fragment is (which ofc makes not really sense).

@npepinpe
Copy link
Member

Yeah I get that it's frustrating, sorry. I'm not sure what's a good intermediate solution =/ The important is that in the job batch collector, we can probe if it's safe to add one more job or not in an accurate way - it might be fine if we're even a little too pessimistic, as long as the error reporting is good.

Or perhaps it's time to force looking into removing the constraint that all follow up events are contained in a single dispatcher batch - a naive approach would be to just not have a max fragment size for a short while, and re-introduce it later simply as an internal configuration setting when we've figured out how to spread follow up records across multiple Raft entries. Not sure what the consequences in the intermediate step would be though.

@Zelldon
Copy link
Member Author

Zelldon commented Aug 22, 2022

Or perhaps it's time to force looking into removing the constraint that all follow up events are contained in a single dispatcher batch - a naive approach would be to just not have a max fragment size for a short while, and re-introduce it later simply as an internal configuration setting when we've figured out how to spread follow up records across multiple Raft entries. Not sure what the consequences in the intermediate step would be though.

Couldn't we spread already records into multiple dispatcher fragments as long as they end in the same raft entry? 🤔 Like the LogStorageAppender could collect all corresponding events together, but not sure how this would help 🤔 The maxMessageSize is also the maximum for the raft entry right ? 🤔 Or does this no longer exist, because we have the mmap files?

@npepinpe
Copy link
Member

The max entry is sort of there for transmission, maybe? We'd have to make sure we chunk append entries, and I can't remember if we do or not.

But it's a good idea. The journal doesn't really (or shouldnt) care about the max entry size, it's just limited by the max segment size and framing.

@npepinpe
Copy link
Member

npepinpe commented Aug 22, 2022

I checked, there is no concept of max entry size in the journal - we don't support an entry spanning multiple segments though, so there's an upper bound being the segment size (minus some fixed framing). Downside there is that by the time you realize that something doesn't fit in a single entry/segment, it's quite far downstream and you can't react on the producer side.

To avoid having to buffer multiple dispatcher fragments in memory before writing it to the Raft entry, we would also need to "stream" those directly to file, which we also don't support. I don't know how much easier it is. Or we do buffer them all in memory, I guess, and maybe it's not so bad performance wise...? :S Could be a memory issue I guess.

EDIT: my proposal here would be to keep what we have (use the capacity calculator), and create a new epic/issue which we can try to get prioritized sooner than later. We could try in the meantime to adapt the interface such that it's not too bad to expose this? Maybe we adapt it to something like BiFunction<Integer, UnifiedRecordValue, Boolean>? So it's still closer to the new design?

If you need someone to brainstorm or to focus more on this, I guess this week it's difficult for me, so I trust you and Deepthi to figure out something great :)

Copy link
Contributor

@deepthidevaki deepthidevaki left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. I'm missing the broad picture of how this will be extended. So my questions are based on that. I approved the PR assuming, those will be handled in later PRs.

I agree with Nicolas' suggestion regarding naming Mutable and Immutable. Also the issue with maxBatchSize should be fixed, but I'm ok if it is done in a different PR.

* Modifiable -> Mutable
* Unmodifiable -> Immutable
Predicate allows to verify whether the potential batch count or batch size reaches a certain limit, which needs to be rejected.
@Zelldon
Copy link
Member Author

Zelldon commented Aug 23, 2022

Thanks @npepinpe and @deepthidevaki for your review and input!

@Zelldon
Copy link
Member Author

Zelldon commented Aug 23, 2022

bors r+

zeebe-bors-camunda bot added a commit that referenced this pull request Aug 23, 2022
10118: Add new RecordBatch classes and interfaces r=Zelldon a=Zelldon

## Description

With the #9600 and especially with #9724 we wanted to create a way to return results (records) on processing and on scheduled Tasks, without the need to use the LogStreamWriters. This should allow us to reduce the dependency on the log stream and in general, make the test setup more easier. 

We did the first attempt with the [BufferedProcessingResultBuilder](https://github.com/camunda/zeebe/blob/main/engine/src/main/java/io/camunda/zeebe/streamprocessor/BufferedProcessingResultBuilder.java), which only contains a buffer to write into the needed record details. This was also described as a solution in the issue #9724. 

During the discussion of the proposal, there was already the idea raised that it would be nicer to have a List of records returned by the Processing- and TaskResult. This was added as a Bonus in #9724.

Today I thought longer about the `BufferedProcessingResultBuilder` and how we could test it and in general make it nicer to return the processing result. Checking the code of the result builder and [LogStreamBatchWriterImpl.java](https://github.com/camunda/zeebe/blob/main/logstreams/src/main/java/io/camunda/zeebe/logstreams/impl/log/LogStreamBatchWriterImpl.java), I realized that we do not really need to copy the record details into a big buffer (as we do it [here](https://github.com/camunda/zeebe/blob/main/logstreams/src/main/java/io/camunda/zeebe/logstreams/impl/log/LogStreamBatchWriterImpl.java#L163-L195)). We could create a "struct" (some would call it a java record but to avoid confusion I call it struct now) of the record details which we want to write. Most parts of the struct are primitives (like enums intent, valueType etc.) and the RecordValue would be only thing we would need to copy into a buffer. This would allow better debuggability (e.g. we can see during processing what is already part of the result) and no need to mess around with offsets in the implementation. We can keep references to these "structs" in a list and iterate over it in the StreamProcessor to write the record details, similar to what we [do in the LogStreamBatchWriter here](https://github.com/camunda/zeebe/blob/main/logstreams/src/main/java/io/camunda/zeebe/logstreams/impl/log/LogStreamBatchWriterImpl.java#L240). 

One interesting part after writing the classes and tests, I also realized that the batch entry interface could return the `UnifiedRecordValue` in order to not have to mess again with buffers (BufferWriters or BufferReaders) and to make the engine tests easier to implement. For example, if we would like to split the modules we could write tests without the stream processor and take the records from the processing result and give it directly again to the engine 🤯

So this PR adds new interfaces and first implementation for so called RecordBatch and RecordBatchEntry, which should be later used in the Processing- and TaskResult. Since the CommandWriter needs similar information as the LogStreamWriter, we can reuse the RecordBatch for both the records to write and for the response, which is part of the Processing Result. No other code was touched/modified.

Right now the interfaces are kept to a minimum and documentation as well. I guess we might need to iterate over them later again, but I think it is a good step forward. 

Feel free to propose another names for the classes and interfaces, nothing is set into stone. #namingishard

<!-- Please explain the changes you made here. -->

## Related issues

<!-- Which issues are closed by this PR or are related -->

related to #10001



Co-authored-by: Christopher Zell <zelldon91@googlemail.com>
@zeebe-bors-camunda
Copy link
Contributor

Build failed:

@Zelldon
Copy link
Member Author

Zelldon commented Aug 23, 2022

bors r+

@zeebe-bors-camunda
Copy link
Contributor

Build succeeded:

@zeebe-bors-camunda zeebe-bors-camunda bot merged commit 791c8cf into main Aug 23, 2022
@zeebe-bors-camunda zeebe-bors-camunda bot deleted the zell-buffered-result branch August 23, 2022 14:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants