-
Notifications
You must be signed in to change notification settings - Fork 556
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Run scheduled Task only in Processing Phase #10112
Run scheduled Task only in Processing Phase #10112
Conversation
We need to move the StreamProcessor.Phase into the context, so it is accessible by other classes like the ProcessingScheduleService
We want to execute the scheduled tasks only if the StreamProcessor is in the PROCESSING PHASE, but no processing is currently happening. To make sure that: * we are not running during replay/init phase (the state might not be up-to-date yet) * we are not running during suspending * we are not interfering with the current ongoing processing, such that all transaction changes are available during our task execution
@@ -242,11 +248,22 @@ | |||
return streamProcessor; | |||
} | |||
|
|||
public void pauseProcessing() { | |||
pauseProcessing(getLogName(DEFAULT_PARTITION)); |
Check notice
Code scanning / CodeQL
Deprecated method or constructor invocation
public void pauseProcessing(final String streamName) { | ||
streamContextMap.get(streamName).streamProcessor.pauseProcessing().join(); | ||
LOG.info("Paused processing for stream {}", streamName); | ||
} | ||
|
||
public void resumeProcessing() { | ||
resumeProcessing(getLogName(DEFAULT_PARTITION)); |
Check notice
Code scanning / CodeQL
Deprecated method or constructor invocation
Test Results 856 files + 9 856 suites +9 1h 54m 48s ⏱️ + 14m 17s For more details on these failures, see this check. Results for commit 18e7f58. ± Comparison against base commit c108b34. ♻️ This comment has been updated with latest results. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good in general.
I have one question about the requirement:
we are not interfering with the current ongoing processing, such that all transaction changes are available during our task execution
I thought if everything worked within its own transaction then it shouldn't interfere with each other anyway, no? Maybe I misunderstood how transactions work in RocksDB. Can you explain that to me?
Anyway, so right now it means if I want to schedule something, I will schedule it anyway but might not do anything, and we rely on the runAtFixedRate
implementation to reschedule it, correct? So in a paused service, this means it will keep enqueuing forever until it's unpaused, correct? Could that cause starvation to the things that run at fixed rate?
e.g. I want to do something at fixed rate, but every time I try to schedule, we're currently in processing, so I never do anything.
How likely is this? Unlikely? Cause in a processing system, I expect we're very often "in processing", right?
@@ -61,6 +62,9 @@ public void onSkipped(final LoggedEvent skippedRecord) {} | |||
// safe structure here | |||
private boolean inProcessing; | |||
|
|||
// this is accessed outside, which is why we need to make sure that it is thread-safe | |||
private volatile StreamProcessor.Phase phase = Phase.INITIAL; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❓ What accesses it from the outside?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now the health monitir and I don't want to fix this in this PR so I kept as it is
The stream processor doesn't use any runAtFixed rate anymore. This is just a wrapper around runDelayed and rescheduling. Regarding never executed this should also no issue since we do not use #run in the stream processor anymore, which means everything will appended to the same queue and no skip the line is happening. Hope that helps. P.S. regarding the transaction the requirement came up because the engine wants to schedule jobs (replacement for sideeffects) but they might be not committed yet. |
pauseProcessing(getLogName(DEFAULT_PARTITION)); | ||
} | ||
|
||
// todo remove multi partition support - is not necessary for the StreamProcessor tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a bulletpoint for this here #9727
@@ -26,6 +27,7 @@ public class ProcessingScheduleServiceImpl implements ProcessingScheduleService | |||
private final StreamProcessorContext streamProcessorContext; | |||
private final AbortableRetryStrategy writeRetryStrategy; | |||
|
|||
// todo remove context from CTOR; will be cleaned up after engine abstraction is done |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a bullet point for it here #9727 (comment)
@@ -61,6 +62,9 @@ public void onSkipped(final LoggedEvent skippedRecord) {} | |||
// safe structure here | |||
private boolean inProcessing; | |||
|
|||
// this is accessed outside, which is why we need to make sure that it is thread-safe | |||
private volatile StreamProcessor.Phase phase = Phase.INITIAL; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now the health monitir and I don't want to fix this in this PR so I kept as it is
Maybe I misunderstood, but even if
Couldn't this happen in a loop? My question was, do we consider this so unlikely (that it would loop like this)? Cause I would expect we're most of the time "in processing", right? We're only not in processing between executeSideEffects and processCommand (so only when reading the next record). Since scheduled jobs aren't sequenced with normal jobs (they get sequenced only when the timer expires), there's no guarantee of ordering? Am I missing something? |
I really thought RocksDB transactions were isolated from each other. Is that not true? Or is it because side effects (or the replacement) use the same transaction context as the processing? |
I think we talk about two things here. It is not about whether transactions are isolated or not. It is about data should be written to the state before the state is access again on a scheduled task. For example and to be specific https://github.com/camunda/zeebe/blob/main/engine/src/main/java/io/camunda/zeebe/engine/processing/common/CatchEventBehavior.java#L302-L308 we want that this to be executed AFTER the transaction is committed. Right now this is done via side-effects, which we wanted to replace with the SchjedulingService as you can see here #9723 (comment) But since we talk about it: @pihme created during I was on vacation the post-commit tasks https://github.com/camunda/zeebe/blob/main/engine/src/main/java/io/camunda/zeebe/engine/api/PostCommitTask.java which can be added to the ProcessingResult. We could remove that with the ProcessingScheduleService after this PR, so we would need to maintain only one approach. OR we decide that we keep them and use it for that use case and remove from the ProcessingScheduleService the property that we guarantee the case that it is not executed during inProcessing. Worth to discuss I guess. |
Not really. The runnable is scheduled with the delay and after the delay is done the inProcessing is checked. IF inProcessing is true we add it to the queue, and eventually, we will execute it since all jobs are added to the same queue (no delay is here again). For me it looks similar to our log stream and the processing Right now I don't see an issue, but please tell me if I'm wrong. Update: Sorry I checked the code again and missed the part that the runAtFixedRate is defined here https://github.com/camunda/zeebe/blob/main/engine/src/main/java/io/camunda/zeebe/engine/api/ProcessingScheduleService.java#L35-L42 and schedules outside the check. TBH most of the scheduled task right now doesn't need it, so maybe we go with the post-commit tasks in the ProcessingResult OR We support other methods in the PRocessingScheduleService with different guarantees for different use cases 🤷 |
I guess this would be also interesting, since we have not the issue like here #10112 (comment) that runDelayed and runAtFixedRate are postPoned for too long, because the StateMachine is in processing. |
My bad, I must've been tired when I did the review because it seems pretty obvious now. So we schedule, and if we're "busy", then we enqueue the task for later consumption. Yeah it would be nice if we don't re-schedule immediately but instead only when the task has actually run, cause right now it might cause two tasks which should be scheduled with a delay in between to be run immediately one after the other 🤷 Regarding the transaction, if I understand correctly, it's just about making sure that the scheduled task isn't triggered before the current record processing is finished (since it needs access to whatever will be committed). Sorry, I'm a little slow right now 😅 I'm not familiar with the decisions to remove the side effects/post commit tasks, to be honest, so I'll trust that you all took the best decision and that it's the right thing to do. I think it's not hard to modify the @Override
public void runAtFixedRate(final Duration delay, final Task task) {
/* TODO preliminary implementation; with the direct access
* this only works because this class is scheduled on the same actor as the
* stream processor.
*/
runDelayed(
delay,
toRunnable(
builder -> {
try {
return task.execute(builder);
} finally {
runAtFixedRate(delay, task);
}
}));
} This can probably be "optimized" to avoid rebuilding the runnable/rescheduling task I guess, but you get the idea. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
I'm a little tired and slower these days, sorry for the back and forth, it makes sense to me now.
I think we can improve by only rescheduling the task after it's been actually executed (and not immediately after the converted Runnable
runs). That's simple enough to do I think, so no need for another review.
Previously the ProcessingSchedulingService rescheduled the task, even when the task was not executed (due to no in processing). This can cause a lot of scheduled tasks on the queue.
thanks @npepinpe I did that bors r+ |
Build succeeded: |
10214: Unflake TimerStartEventTest.shouldTriggerOnlyTwice r=korthout a=korthout ## Description <!-- Please explain the changes you made here. --> This test became flaky after a [change in the task scheduler](#10112). Since then, tasks that have been scheduled are only executed when the engine is in the PROCESSING phase, but isn't currently processing a command. If that is the case, then it is rescheduled. That means that sometimes things take longer, especially when the processor is busy. In this specific test case, new process instances are created using 2 timer start events on different processes. Each time they trigger, they produce a bunch of records because they start new process instances. The processing of the relevant commands for these instances can interfere with the DueDateTimerChecker which is a scheduled task. Because of the new task scheduler change, it can happen that the DueDateTimerChecker is postponed. The problem is solved by periodically checking that the assertion holds, and increasing the clock's time every time we check this. This makes sure that the DueDateTimerChecker's task can be executed, even when it's rescheduled. ## Related issues <!-- Which issues are closed by this PR or are related --> closes #10169 Co-authored-by: Nico Korthout <nico.korthout@camunda.com>
Description
This PR makes sure that we no longer run Tasks in other phases than in Processing.
This means when the StreamProcessor is suspended scheduled Tasks are not executed they are put back to the queue this was the simplest solution for now, since we have no Actor suspend feature for now #10006. Furthermore, we no longer run scheduled tasks on replay since this might also lead to issues (because the state is not yet fully rebuild etc.)
Added tests for certain aspects of this feature.
Related issues
closes #9962
closes #9999
closes #8642 (this will no longer happen since the abort condition is also verified in the SchedulingService)
Definition of Done
Not all items need to be done depending on the issue and the pull request.
Code changes:
backport stable/1.3
) to the PR, in case that fails you need to create backports manually.Testing:
Documentation:
Please refer to our review guidelines.